279 lines
8.6 KiB
C#
279 lines
8.6 KiB
C#
using System;
|
|
using System.Threading;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using ln.logging;
|
|
|
|
namespace ln.threading
|
|
{
|
|
public enum PoolThreadState { READY, WORKING, EXITED }
|
|
public delegate void PoolJobFinished(Pool pool, PoolJob job);
|
|
|
|
public enum PoolState
|
|
{
|
|
INITIALIZE, // Pool Instanz erstellt
|
|
RUN, // Pool ist in Betrieb
|
|
SHUTDOWN, // Pool wird heruntergefahren (es warten noch laufende Jobs)
|
|
STOPPED // Pool wurde beendet, alle Resourcen freigegeben
|
|
}
|
|
|
|
public partial class Pool : IDisposable
|
|
{
|
|
public static bool DEBUG = false;
|
|
|
|
public PoolState State { get; protected set; } = PoolState.INITIALIZE;
|
|
|
|
public event PoolJobFinished PoolJobFinished;
|
|
|
|
public PoolThread[] PoolThreads => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.ToArray());
|
|
|
|
public int PoolSize { get; }
|
|
public int CurrentPoolSize => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Count);
|
|
public PoolJob[] CurrentPoolJobs => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Select((t) => t.CurrentJob).Where((j) => j != null).ToArray());
|
|
|
|
public PoolJob[] QueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.ToArray());
|
|
public int NumQueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.Count);
|
|
|
|
private List<PoolThread> poolThreads = new List<PoolThread>();
|
|
|
|
protected HashSet<PoolThread> workingThreads = new HashSet<PoolThread>();
|
|
protected HashSet<PoolThread> waitingThreads = new HashSet<PoolThread>();
|
|
|
|
private Queue<PoolJob> queuedJobs = new Queue<PoolJob>();
|
|
|
|
public Pool() : this(Environment.ProcessorCount){}
|
|
public Pool(int numThreads)
|
|
{
|
|
PoolSize = numThreads;
|
|
}
|
|
|
|
public virtual void Start()
|
|
{
|
|
if ((State == PoolState.RUN) || (State == PoolState.SHUTDOWN))
|
|
throw new NotSupportedException("Pool can only be started if not running");
|
|
|
|
State = PoolState.RUN;
|
|
|
|
for (int n = 0; n < PoolSize; n++)
|
|
{
|
|
CreatePoolThread();
|
|
}
|
|
}
|
|
|
|
public virtual void Stop() => Stop(false);
|
|
public virtual void Stop(bool abort)
|
|
{
|
|
if (State != PoolState.RUN)
|
|
throw new NotSupportedException("Pool must be running to be able to stop");
|
|
|
|
State = PoolState.SHUTDOWN;
|
|
|
|
lock (waitingThreads)
|
|
{
|
|
queuedJobs.Clear();
|
|
}
|
|
|
|
if (abort)
|
|
Abort();
|
|
|
|
while (CurrentPoolSize > ((PoolThread.CurrentPoolThread.Value?.Pool == this) ? 1 : 0))
|
|
{
|
|
try
|
|
{
|
|
lock (waitingThreads)
|
|
Monitor.PulseAll(waitingThreads);
|
|
Thread.Sleep(100);
|
|
}
|
|
catch (ThreadInterruptedException)
|
|
{ }
|
|
}
|
|
|
|
State = PoolState.STOPPED;
|
|
}
|
|
|
|
public void Enqueue(JobDelegate job) => Enqueue((poolJob) => job());
|
|
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate) => Enqueue(extendedJobDelegate, "N/A");
|
|
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate, string jobName) => EnqueuePoolJob(new PoolJob(jobName, extendedJobDelegate));
|
|
public bool Enqueue(PoolJob poolJob) => EnqueuePoolJob(poolJob) != null;
|
|
|
|
protected virtual PoolJob EnqueuePoolJob(PoolJob poolJob)
|
|
{
|
|
if (State != PoolState.RUN)
|
|
throw new NotSupportedException("Pool not running");
|
|
|
|
lock (waitingThreads)
|
|
{
|
|
if (queuedJobs.Contains(poolJob) || CurrentPoolJobs.Contains(poolJob))
|
|
return null;
|
|
|
|
poolJob.Prepare();
|
|
queuedJobs.Enqueue(poolJob);
|
|
|
|
PulseWaitingThread();
|
|
}
|
|
|
|
return poolJob;
|
|
}
|
|
|
|
protected virtual void PulseWaitingThread()
|
|
{
|
|
lock (waitingThreads)
|
|
Monitor.Pulse(waitingThreads);
|
|
}
|
|
|
|
public virtual int Enqueue(IEnumerable<PoolJob> poolJobs)
|
|
{
|
|
if (State != PoolState.RUN)
|
|
throw new NotSupportedException("Pool not running");
|
|
|
|
int count = 0;
|
|
|
|
lock (waitingThreads)
|
|
{
|
|
HashSet<PoolJob> currentPoolJobs = new HashSet<PoolJob>(CurrentPoolJobs);
|
|
foreach (PoolJob j in queuedJobs)
|
|
currentPoolJobs.Add(j);
|
|
|
|
foreach (PoolJob poolJob in poolJobs)
|
|
{
|
|
if (!currentPoolJobs.Contains(poolJob))
|
|
{
|
|
poolJob.Prepare();
|
|
queuedJobs.Enqueue(poolJob);
|
|
count++;
|
|
}
|
|
}
|
|
Monitor.PulseAll(waitingThreads);
|
|
}
|
|
|
|
return count;
|
|
}
|
|
|
|
protected virtual PoolJob Dequeue() => Dequeue(PoolThread.CurrentPoolThread?.Value);
|
|
protected virtual PoolJob Dequeue(PoolThread poolThread)
|
|
{
|
|
if (poolThread == null)
|
|
throw new NullReferenceException();
|
|
|
|
if (State == PoolState.RUN)
|
|
{
|
|
lock (waitingThreads)
|
|
{
|
|
if (queuedJobs.Count > 0)
|
|
return queuedJobs.Dequeue();
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
protected virtual PoolJob WaitForJob(PoolThread poolThread)
|
|
{
|
|
while (State == PoolState.RUN)
|
|
{
|
|
lock (waitingThreads)
|
|
{
|
|
PoolJob poolJob = Dequeue(poolThread);
|
|
if (poolJob != null)
|
|
return poolJob;
|
|
|
|
waitingThreads.Add(poolThread);
|
|
Monitor.Wait(waitingThreads);
|
|
waitingThreads.Remove(poolThread);
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
|
|
protected virtual void WorkerThreadLoop()
|
|
{
|
|
PoolThread poolThread = PoolThread.CurrentPoolThread.Value;
|
|
if (poolThread == null)
|
|
throw new NotSupportedException("Pool.WorkerThreadLoop(): Must be called from PoolThread");
|
|
|
|
lock (poolThreads)
|
|
poolThreads.Add(poolThread);
|
|
|
|
try
|
|
{
|
|
while (State == PoolState.RUN)
|
|
{
|
|
PoolJob poolJob = WaitForJob(poolThread);
|
|
if (poolJob == null)
|
|
{
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
lock (workingThreads)
|
|
workingThreads.Add(poolThread);
|
|
|
|
try
|
|
{
|
|
poolJob.Run(this);
|
|
if (PoolJobFinished != null)
|
|
PoolJobFinished(this,poolJob);
|
|
}
|
|
finally
|
|
{
|
|
lock (workingThreads)
|
|
workingThreads.Remove(poolThread);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (PoolThreadMustExitException)
|
|
{}
|
|
catch (Exception e)
|
|
{
|
|
Logging.Log(e);
|
|
}
|
|
finally
|
|
{
|
|
lock (poolThreads)
|
|
poolThreads.Remove(poolThread);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public void Abort()
|
|
{
|
|
lock (poolThreads)
|
|
{
|
|
foreach (PoolThread poolThread in poolThreads)
|
|
poolThread?.CurrentJob?.RequestAbort();
|
|
}
|
|
}
|
|
|
|
private bool AmIPoolThread() => PoolThread.CurrentPoolThread.Value != null;
|
|
|
|
public PoolThread FindPoolThread(PoolJob job)
|
|
{
|
|
lock (poolThreads)
|
|
{
|
|
foreach (PoolThread poolThread in poolThreads)
|
|
{
|
|
if (poolThread.CurrentJob == job)
|
|
return poolThread;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
|
|
|
|
protected virtual PoolThread CreatePoolThread()
|
|
{
|
|
PoolThread poolThread = new PoolThread(WorkerThreadLoop);
|
|
return poolThread;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (State == PoolState.RUN)
|
|
Stop();
|
|
}
|
|
}
|
|
}
|