diff --git a/ln.types.csproj b/ln.types.csproj index 5a162cf..101326b 100644 --- a/ln.types.csproj +++ b/ln.types.csproj @@ -97,6 +97,7 @@ + diff --git a/threads/Pool.cs b/threads/Pool.cs index 0cb0fc2..5cbdf46 100644 --- a/threads/Pool.cs +++ b/threads/Pool.cs @@ -9,197 +9,132 @@ namespace ln.types.threads public enum PoolThreadState { READY, WORKING, EXITED } public delegate void PoolJobFinished(Pool pool, PoolJob job); - public class Pool : IDisposable + public enum PoolState + { + INITIALIZE, // Pool Instanz erstellt + RUN, // Pool ist in Betrieb + SHUTDOWN, // Pool wird heruntergefahren (es warten noch laufende Jobs) + STOPPED // Pool wurde beendet, alle Resourcen freigegeben + } + + public partial class Pool : IDisposable { public static bool DEBUG = false; + public PoolState State { get; protected set; } = PoolState.INITIALIZE; + public event PoolJobFinished PoolJobFinished; - public PoolThread[] PoolThreads - { - get - { - lock (poolThreads) - { - return poolThreads.ToArray(); - } - } - } - public int CurrentPoolSize - { - get - { - lock (poolThreads) { return poolThreads.Count; } - } - set - { - TargetPoolSize = value; - } - } - public int TargetPoolSize { get; set; } + public PoolThread[] PoolThreads => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.ToArray()); - public PoolThreadState[] ThreadStates - { - get - { - lock (poolThreads) - { - return poolThreads.Select((x) => x.State).ToArray(); - } - } - } + public int PoolSize { get; } + public int CurrentPoolSize => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Count); + public PoolJob[] CurrentPoolJobs => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Select((t) => t.CurrentJob).Where((j) => j != null).ToArray()); - public int NumQueuedJobs => queuedJobs.Count; - - public PoolJob[] CurrentPoolJobs - { - get - { - lock (poolThreads) - { - return poolThreads.Select((t) => t.CurrentJob).Where((j) => j != null).ToArray(); - } - } - } - public PoolJob[] QueuedJobs - { - get - { - lock (queuedJobs) - { - return queuedJobs.ToArray(); - } - } - } + public PoolJob[] QueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.ToArray()); + public int NumQueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.Count); private List poolThreads = new List(); + + protected HashSet workingThreads = new HashSet(); + protected HashSet waitingThreads = new HashSet(); + private Queue queuedJobs = new Queue(); + private bool stopping; private int releaseThreads = 0; private Thread supervisorThread; - public Pool() - { - SetPoolSize(4); - } - + public Pool() : this(Environment.ProcessorCount){} public Pool(int numThreads) { - SetPoolSize(numThreads); + PoolSize = numThreads; } - public void Close() + public virtual void Start() { - lock (queuedJobs) + if ((State == PoolState.RUN) || (State == PoolState.SHUTDOWN)) + throw new NotSupportedException("Pool can only be started if not running"); + + for (int n = 0; n < PoolSize; n++) + { + CreatePoolThread(); + } + + State = PoolState.RUN; + } + + public virtual void Stop() => Stop(false); + public virtual void Stop(bool abort) + { + if (State != PoolState.RUN) + throw new NotSupportedException("Pool must be running to be able to stop"); + + State = PoolState.SHUTDOWN; + + lock (waitingThreads) { - stopping = true; - foreach (PoolJob job in queuedJobs) - { - job.RequestAbort(); - job.ForceFail(); - } queuedJobs.Clear(); } - TargetPoolSize = 0; + if (abort) + Abort(); - int waitFor = AmIPoolThread() ? 1 : 0; - while (CurrentPoolSize > waitFor) + while (CurrentPoolSize > ((PoolThread.CurrentPoolThread.Value?.Pool == this) ? 1 : 0)) { - Thread.Sleep(250); - } - - stopping = false; - } - - public void Abort() - { - foreach (PoolJob job in CurrentPoolJobs) - { - job.RequestAbort(); - } - } - - private PoolThread FindWaitingThread() - { - lock (queuedJobs) - { - foreach (PoolThread poolThread in poolThreads) - if (poolThread.State == PoolThreadState.READY) - return poolThread; - } - return null; - } - - private bool AmIPoolThread() - { - Thread thread = Thread.CurrentThread; - - lock (poolThreads) - { - foreach (PoolThread poolThread in poolThreads) + try { - if (thread.Equals(poolThread.Thread)) - return true; + lock (waitingThreads) + Monitor.PulseAll(waitingThreads); + Thread.Sleep(100); } + catch (ThreadInterruptedException) + { } } - return false; + State = PoolState.STOPPED; } - public void SetPoolSize(int poolSize) + public void Enqueue(JobDelegate job) => Enqueue((poolJob) => job()); + public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate) => Enqueue(extendedJobDelegate, "N/A"); + public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate, string jobName) => EnqueuePoolJob(new PoolJob(jobName, extendedJobDelegate)); + public bool Enqueue(PoolJob poolJob) => EnqueuePoolJob(poolJob) != null; + + protected virtual PoolJob EnqueuePoolJob(PoolJob poolJob) { - TargetPoolSize = poolSize; - if ((CurrentPoolSize == 0) && (TargetPoolSize > 0)) + if (State != PoolState.RUN) + throw new NotSupportedException("Pool not running"); + + lock (waitingThreads) { - new PoolThread(this); - new Thread(() => Supervisor()).Start(); + if (queuedJobs.Contains(poolJob) || CurrentPoolJobs.Contains(poolJob)) + return null; + + poolJob.Prepare(); + queuedJobs.Enqueue(poolJob); + + PulseWaitingThread(); } - } - - public void Enqueue(JobDelegate job) - { - Enqueue((poolJob) => job()); - } - - public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate) - { - return Enqueue(extendedJobDelegate, "N/A"); - } - public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate,string jobName) - { - PoolJob poolJob = new PoolJob(jobName, extendedJobDelegate); - - Enqueue(poolJob); return poolJob; } - public bool Enqueue(PoolJob poolJob) + + protected virtual void PulseWaitingThread() { - if (stopping) - throw new OperationCanceledException("Thread Pool to be shut down"); - - lock (queuedJobs) - { - if (queuedJobs.Contains(poolJob) || CurrentPoolJobs.Contains(poolJob)) - return false; - - queuedJobs.Enqueue(poolJob); - Monitor.Pulse(queuedJobs); - } - - return true; + lock (waitingThreads) + Monitor.Pulse(waitingThreads); } - public int Enqueue(IEnumerable poolJobs) + + public virtual int Enqueue(IEnumerable poolJobs) { - if (stopping) - throw new OperationCanceledException("Thread Pool to be shut down"); + if (State != PoolState.RUN) + throw new NotSupportedException("Pool not running"); int count = 0; - lock (queuedJobs) + lock (waitingThreads) { HashSet currentPoolJobs = new HashSet(CurrentPoolJobs); foreach (PoolJob j in queuedJobs) @@ -214,160 +149,134 @@ namespace ln.types.threads count++; } } - Monitor.PulseAll(queuedJobs); + Monitor.PulseAll(waitingThreads); } + return count; } - private PoolJob Dequeue() + + protected virtual PoolJob Dequeue() => Dequeue(PoolThread.CurrentPoolThread?.Value); + protected virtual PoolJob Dequeue(PoolThread poolThread) { - lock (queuedJobs) + if (poolThread == null) + throw new NullReferenceException(); + + if (State == PoolState.RUN) { - if (queuedJobs.Count == 0) + lock (waitingThreads) { - Monitor.Wait(queuedJobs); + if (queuedJobs.Count > 0) + return queuedJobs.Dequeue(); } - - if (releaseThreads > 0) - { - releaseThreads--; - throw new TimeoutException(); - } - - if (queuedJobs.Count == 0) - return null; - - return queuedJobs.Dequeue(); } + return null; } - private void Supervisor() + protected virtual PoolJob WaitForJob(PoolThread poolThread) { - supervisorThread = Thread.CurrentThread; - - lock (supervisorThread) + while (State == PoolState.RUN) { - - while (CurrentPoolSize > 0) + lock (waitingThreads) { - Monitor.Wait(supervisorThread, 1000); + PoolJob poolJob = Dequeue(poolThread); + if (poolJob != null) + return poolJob; - while (CurrentPoolSize != TargetPoolSize) - { - if (CurrentPoolSize - TargetPoolSize > 0) - { - lock (queuedJobs) - { - releaseThreads = CurrentPoolSize - TargetPoolSize; - } - - if (CurrentPoolSize - TargetPoolSize > 0) - { - lock (queuedJobs) - { - Monitor.PulseAll(queuedJobs); - } - } - } - else if (CurrentPoolSize - TargetPoolSize < 0) - { - for (int n = CurrentPoolSize; n < TargetPoolSize; n++) - new PoolThread(this); - } - } + waitingThreads.Add(poolThread); + Monitor.Wait(waitingThreads); + waitingThreads.Remove(poolThread); } } - supervisorThread = null; + return null; } - - - public class PoolThread + protected virtual void WorkerThreadLoop() { - public Pool Pool { get; } - public Thread Thread { get; } - public PoolThreadState State { get; private set; } + PoolThread poolThread = PoolThread.CurrentPoolThread.Value; + if (poolThread == null) + throw new NotSupportedException("Pool.WorkerThreadLoop(): Must be called from PoolThread"); - public PoolJob CurrentJob { get; private set; } + lock (poolThreads) + poolThreads.Add(poolThread); - private bool exitCalled { get; set; } - - public PoolThread(Pool pool) + try { - State = PoolThreadState.WORKING; - Pool = pool; - Thread = new Thread(thread); - - lock (Pool.poolThreads) + while (State == PoolState.RUN) { - Pool.poolThreads.Add(this); - } - - WaitStarted(); - } - - private void WaitStarted() - { - lock (this) - { - Thread.Start(); - Monitor.Wait(this, 5000); - } - } - - private void thread() - { - lock (this) - { - Monitor.Pulse(this); - } - - State = PoolThreadState.READY; - - while (!exitCalled) - { - PoolJob poolJob = null; - try - { - poolJob = Pool.Dequeue(); - } catch (TimeoutException) + PoolJob poolJob = WaitForJob(poolThread); + if (poolJob == null) { break; } - - if (poolJob != null) + else { - State = PoolThreadState.WORKING; - CurrentJob = poolJob; - CurrentJob.Run(Pool); + lock (workingThreads) + workingThreads.Add(poolThread); - if (Pool.PoolJobFinished != null) - Pool.PoolJobFinished.Invoke(Pool, CurrentJob); - - CurrentJob = null; - State = PoolThreadState.READY; + try + { + poolJob.Run(this); + } + finally + { + lock (workingThreads) + workingThreads.Remove(poolThread); + } } } - - State = PoolThreadState.EXITED; - lock (Pool.poolThreads) - { - Pool.poolThreads.Remove(this); - } - } - - public void Exit() + } + catch (PoolThreadMustExitException) + {} + catch (Exception e) { - lock (this) + Logging.Log(e); + } + finally + { + lock (poolThreads) + poolThreads.Remove(poolThread); + } + } + + + + public void Abort() + { + lock (poolThreads) + { + foreach (PoolThread poolThread in poolThreads) + poolThread?.CurrentJob?.RequestAbort(); + } + } + + private bool AmIPoolThread() => PoolThread.CurrentPoolThread.Value != null; + + public PoolThread FindPoolThread(PoolJob job) + { + lock (poolThreads) + { + foreach (PoolThread poolThread in poolThreads) { - exitCalled = true; + if (poolThread.CurrentJob == job) + return poolThread; } } + return null; + } + + + + protected virtual PoolThread CreatePoolThread() + { + PoolThread poolThread = new PoolThread(WorkerThreadLoop); + return poolThread; } public void Dispose() { - Close(); + if (State == PoolState.RUN) + Stop(); } } } diff --git a/threads/PoolJob.cs b/threads/PoolJob.cs index a9a596d..2c5fb9f 100644 --- a/threads/PoolJob.cs +++ b/threads/PoolJob.cs @@ -20,7 +20,7 @@ namespace ln.types.threads public PoolJobState JobState { get; private set; } - public bool AbortRequested { get; private set; } + public bool JobAbortRequested { get; private set; } private Pool Pool { get; set; } @@ -52,7 +52,16 @@ namespace ln.types.threads public void RequestAbort() { - AbortRequested = true; + JobAbortRequested = true; + if (Pool != null) + { + PoolThread poolThread = Pool.FindPoolThread(this); + if (poolThread != null) + { + if (poolThread.Thread.ThreadState == ThreadState.WaitSleepJoin) + poolThread.Thread.Interrupt(); + } + } } public void ForceFail() @@ -142,7 +151,7 @@ namespace ln.types.threads break; } - if (AbortRequested) + if (JobAbortRequested) job.RequestAbort(); } diff --git a/threads/PoolThread.cs b/threads/PoolThread.cs new file mode 100644 index 0000000..ce9365e --- /dev/null +++ b/threads/PoolThread.cs @@ -0,0 +1,79 @@ +using System; +using System.Threading; + +namespace ln.types.threads +{ + public class PoolThreadMustExitException : Exception + {} + + public class PoolThread + { + public static ThreadLocal CurrentPoolThread { get; } = new ThreadLocal(); + + public Pool Pool { get; private set; } + public Thread Thread { get; } + + public PoolJob CurrentJob { get; private set; } + + private Action ThreadLoop; + + public PoolThread(Action threadLoop) + { + ThreadLoop = threadLoop; + + Thread = new Thread(thread); + Thread.Start(); + } + + private void thread() + { + lock (this) + { + CurrentPoolThread.Value = this; + } + + ThreadLoop(); + + lock (this) + { + CurrentPoolThread.Value = null; + } + } + //while (Pool.State != PoolState.SHUTDOWN) + //{ + // PoolJob poolJob = null; + // try + // { + // poolJob = Pool.Dequeue(); + // } + // catch (TimeoutException) + // { + // break; + // } + + // if (poolJob != null) + // { + // State = PoolThreadState.WORKING; + // CurrentJob = poolJob; + // CurrentJob.Run(Pool); + + // if (Pool.PoolJobFinished != null) + // Pool.PoolJobFinished.Invoke(Pool, CurrentJob); + + // CurrentJob = null; + // State = PoolThreadState.READY; + // } + //} + + //CurrentPoolThread.Value = null; + + //State = PoolThreadState.EXITED; + + //lock (Pool.poolThreads) + //{ + // Pool.poolThreads.Remove(this); + // Monitor.Pulse(Pool.poolThreads); + //} + + } +}