using System; using System.Threading; using System.Collections.Generic; using System.Linq; using ln.logging; namespace ln.threading { public enum PoolThreadState { READY, WORKING, EXITED } public delegate void PoolJobFinished(Pool pool, PoolJob job); public enum PoolState { INITIALIZE, // Pool Instanz erstellt RUN, // Pool ist in Betrieb SHUTDOWN, // Pool wird heruntergefahren (es warten noch laufende Jobs) STOPPED // Pool wurde beendet, alle Resourcen freigegeben } public partial class Pool : IDisposable { public static bool DEBUG = false; public PoolState State { get; protected set; } = PoolState.INITIALIZE; public event PoolJobFinished PoolJobFinished; public PoolThread[] PoolThreads => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.ToArray()); public int PoolSize { get; } public int CurrentPoolSize => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Count); public PoolJob[] CurrentPoolJobs => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Select((t) => t.CurrentJob).Where((j) => j != null).ToArray()); public PoolJob[] QueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.ToArray()); public int NumQueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.Count); private List poolThreads = new List(); protected HashSet workingThreads = new HashSet(); protected HashSet waitingThreads = new HashSet(); private Queue queuedJobs = new Queue(); public Pool() : this(Environment.ProcessorCount){} public Pool(int numThreads) { PoolSize = numThreads; } public virtual void Start() { if ((State == PoolState.RUN) || (State == PoolState.SHUTDOWN)) throw new NotSupportedException("Pool can only be started if not running"); State = PoolState.RUN; for (int n = 0; n < PoolSize; n++) { CreatePoolThread(); } } public virtual void Stop() => Stop(false); public virtual void Stop(bool abort) { if (State != PoolState.RUN) throw new NotSupportedException("Pool must be running to be able to stop"); State = PoolState.SHUTDOWN; lock (waitingThreads) { queuedJobs.Clear(); } if (abort) Abort(); while (CurrentPoolSize > ((PoolThread.CurrentPoolThread.Value?.Pool == this) ? 1 : 0)) { try { lock (waitingThreads) Monitor.PulseAll(waitingThreads); Thread.Sleep(100); } catch (ThreadInterruptedException) { } } State = PoolState.STOPPED; } public void Enqueue(JobDelegate job) => Enqueue((poolJob) => job()); public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate) => Enqueue(extendedJobDelegate, "N/A"); public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate, string jobName) => EnqueuePoolJob(new PoolJob(jobName, extendedJobDelegate)); public bool Enqueue(PoolJob poolJob) => EnqueuePoolJob(poolJob) != null; protected virtual PoolJob EnqueuePoolJob(PoolJob poolJob) { if (State != PoolState.RUN) throw new NotSupportedException("Pool not running"); lock (waitingThreads) { if (queuedJobs.Contains(poolJob) || CurrentPoolJobs.Contains(poolJob)) return null; poolJob.Prepare(); queuedJobs.Enqueue(poolJob); PulseWaitingThread(); } return poolJob; } protected virtual void PulseWaitingThread() { lock (waitingThreads) Monitor.Pulse(waitingThreads); } public virtual int Enqueue(IEnumerable poolJobs) { if (State != PoolState.RUN) throw new NotSupportedException("Pool not running"); int count = 0; lock (waitingThreads) { HashSet currentPoolJobs = new HashSet(CurrentPoolJobs); foreach (PoolJob j in queuedJobs) currentPoolJobs.Add(j); foreach (PoolJob poolJob in poolJobs) { if (!currentPoolJobs.Contains(poolJob)) { poolJob.Prepare(); queuedJobs.Enqueue(poolJob); count++; } } Monitor.PulseAll(waitingThreads); } return count; } protected virtual PoolJob Dequeue() => Dequeue(PoolThread.CurrentPoolThread?.Value); protected virtual PoolJob Dequeue(PoolThread poolThread) { if (poolThread == null) throw new NullReferenceException(); if (State == PoolState.RUN) { lock (waitingThreads) { if (queuedJobs.Count > 0) return queuedJobs.Dequeue(); } } return null; } protected virtual PoolJob WaitForJob(PoolThread poolThread) { while (State == PoolState.RUN) { lock (waitingThreads) { PoolJob poolJob = Dequeue(poolThread); if (poolJob != null) return poolJob; waitingThreads.Add(poolThread); Monitor.Wait(waitingThreads); waitingThreads.Remove(poolThread); } } return null; } protected virtual void WorkerThreadLoop() { PoolThread poolThread = PoolThread.CurrentPoolThread.Value; if (poolThread == null) throw new NotSupportedException("Pool.WorkerThreadLoop(): Must be called from PoolThread"); lock (poolThreads) poolThreads.Add(poolThread); try { while (State == PoolState.RUN) { PoolJob poolJob = WaitForJob(poolThread); if (poolJob == null) { break; } else { lock (workingThreads) workingThreads.Add(poolThread); try { poolJob.Run(this); if (PoolJobFinished != null) PoolJobFinished(this,poolJob); } finally { lock (workingThreads) workingThreads.Remove(poolThread); } } } } catch (PoolThreadMustExitException) {} catch (Exception e) { Logging.Log(e); } finally { lock (poolThreads) poolThreads.Remove(poolThread); } } public void Abort() { lock (poolThreads) { foreach (PoolThread poolThread in poolThreads) poolThread?.CurrentJob?.RequestAbort(); } } private bool AmIPoolThread() => PoolThread.CurrentPoolThread.Value != null; public PoolThread FindPoolThread(PoolJob job) { lock (poolThreads) { foreach (PoolThread poolThread in poolThreads) { if (poolThread.CurrentJob == job) return poolThread; } } return null; } protected virtual PoolThread CreatePoolThread() { PoolThread poolThread = new PoolThread(WorkerThreadLoop); return poolThread; } public void Dispose() { if (State == PoolState.RUN) Stop(); } } }