Improve threading.Pool

dev_timestamp
Harald Wolff 2019-09-11 09:28:03 +02:00
parent 80f950da0b
commit 6ffce5cc2d
4 changed files with 262 additions and 264 deletions

View File

@ -97,6 +97,7 @@
<Compile Include="odb\ng\index\IndexPath.cs" /> <Compile Include="odb\ng\index\IndexPath.cs" />
<Compile Include="PathHelper.cs" /> <Compile Include="PathHelper.cs" />
<Compile Include="threads\ThreadHelpers.cs" /> <Compile Include="threads\ThreadHelpers.cs" />
<Compile Include="threads\PoolThread.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Folder Include="odb\" /> <Folder Include="odb\" />

View File

@ -9,197 +9,132 @@ namespace ln.types.threads
public enum PoolThreadState { READY, WORKING, EXITED } public enum PoolThreadState { READY, WORKING, EXITED }
public delegate void PoolJobFinished(Pool pool, PoolJob job); public delegate void PoolJobFinished(Pool pool, PoolJob job);
public class Pool : IDisposable public enum PoolState
{
INITIALIZE, // Pool Instanz erstellt
RUN, // Pool ist in Betrieb
SHUTDOWN, // Pool wird heruntergefahren (es warten noch laufende Jobs)
STOPPED // Pool wurde beendet, alle Resourcen freigegeben
}
public partial class Pool : IDisposable
{ {
public static bool DEBUG = false; public static bool DEBUG = false;
public PoolState State { get; protected set; } = PoolState.INITIALIZE;
public event PoolJobFinished PoolJobFinished; public event PoolJobFinished PoolJobFinished;
public PoolThread[] PoolThreads public PoolThread[] PoolThreads => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.ToArray());
{
get
{
lock (poolThreads)
{
return poolThreads.ToArray();
}
}
}
public int CurrentPoolSize
{
get
{
lock (poolThreads) { return poolThreads.Count; }
}
set
{
TargetPoolSize = value;
}
}
public int TargetPoolSize { get; set; }
public PoolThreadState[] ThreadStates public int PoolSize { get; }
{ public int CurrentPoolSize => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Count);
get public PoolJob[] CurrentPoolJobs => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Select((t) => t.CurrentJob).Where((j) => j != null).ToArray());
{
lock (poolThreads)
{
return poolThreads.Select((x) => x.State).ToArray();
}
}
}
public int NumQueuedJobs => queuedJobs.Count; public PoolJob[] QueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.ToArray());
public int NumQueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.Count);
public PoolJob[] CurrentPoolJobs
{
get
{
lock (poolThreads)
{
return poolThreads.Select((t) => t.CurrentJob).Where((j) => j != null).ToArray();
}
}
}
public PoolJob[] QueuedJobs
{
get
{
lock (queuedJobs)
{
return queuedJobs.ToArray();
}
}
}
private List<PoolThread> poolThreads = new List<PoolThread>(); private List<PoolThread> poolThreads = new List<PoolThread>();
protected HashSet<PoolThread> workingThreads = new HashSet<PoolThread>();
protected HashSet<PoolThread> waitingThreads = new HashSet<PoolThread>();
private Queue<PoolJob> queuedJobs = new Queue<PoolJob>(); private Queue<PoolJob> queuedJobs = new Queue<PoolJob>();
private bool stopping; private bool stopping;
private int releaseThreads = 0; private int releaseThreads = 0;
private Thread supervisorThread; private Thread supervisorThread;
public Pool() public Pool() : this(Environment.ProcessorCount){}
{
SetPoolSize(4);
}
public Pool(int numThreads) public Pool(int numThreads)
{ {
SetPoolSize(numThreads); PoolSize = numThreads;
} }
public void Close() public virtual void Start()
{ {
lock (queuedJobs) if ((State == PoolState.RUN) || (State == PoolState.SHUTDOWN))
throw new NotSupportedException("Pool can only be started if not running");
for (int n = 0; n < PoolSize; n++)
{
CreatePoolThread();
}
State = PoolState.RUN;
}
public virtual void Stop() => Stop(false);
public virtual void Stop(bool abort)
{
if (State != PoolState.RUN)
throw new NotSupportedException("Pool must be running to be able to stop");
State = PoolState.SHUTDOWN;
lock (waitingThreads)
{ {
stopping = true;
foreach (PoolJob job in queuedJobs)
{
job.RequestAbort();
job.ForceFail();
}
queuedJobs.Clear(); queuedJobs.Clear();
} }
TargetPoolSize = 0; if (abort)
Abort();
int waitFor = AmIPoolThread() ? 1 : 0; while (CurrentPoolSize > ((PoolThread.CurrentPoolThread.Value?.Pool == this) ? 1 : 0))
while (CurrentPoolSize > waitFor)
{ {
Thread.Sleep(250); try
}
stopping = false;
}
public void Abort()
{
foreach (PoolJob job in CurrentPoolJobs)
{
job.RequestAbort();
}
}
private PoolThread FindWaitingThread()
{
lock (queuedJobs)
{
foreach (PoolThread poolThread in poolThreads)
if (poolThread.State == PoolThreadState.READY)
return poolThread;
}
return null;
}
private bool AmIPoolThread()
{
Thread thread = Thread.CurrentThread;
lock (poolThreads)
{
foreach (PoolThread poolThread in poolThreads)
{ {
if (thread.Equals(poolThread.Thread)) lock (waitingThreads)
return true; Monitor.PulseAll(waitingThreads);
Thread.Sleep(100);
} }
catch (ThreadInterruptedException)
{ }
} }
return false; State = PoolState.STOPPED;
} }
public void SetPoolSize(int poolSize) public void Enqueue(JobDelegate job) => Enqueue((poolJob) => job());
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate) => Enqueue(extendedJobDelegate, "N/A");
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate, string jobName) => EnqueuePoolJob(new PoolJob(jobName, extendedJobDelegate));
public bool Enqueue(PoolJob poolJob) => EnqueuePoolJob(poolJob) != null;
protected virtual PoolJob EnqueuePoolJob(PoolJob poolJob)
{ {
TargetPoolSize = poolSize; if (State != PoolState.RUN)
if ((CurrentPoolSize == 0) && (TargetPoolSize > 0)) throw new NotSupportedException("Pool not running");
lock (waitingThreads)
{ {
new PoolThread(this); if (queuedJobs.Contains(poolJob) || CurrentPoolJobs.Contains(poolJob))
new Thread(() => Supervisor()).Start(); return null;
poolJob.Prepare();
queuedJobs.Enqueue(poolJob);
PulseWaitingThread();
} }
}
public void Enqueue(JobDelegate job)
{
Enqueue((poolJob) => job());
}
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate)
{
return Enqueue(extendedJobDelegate, "N/A");
}
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate,string jobName)
{
PoolJob poolJob = new PoolJob(jobName, extendedJobDelegate);
Enqueue(poolJob);
return poolJob; return poolJob;
} }
public bool Enqueue(PoolJob poolJob)
protected virtual void PulseWaitingThread()
{ {
if (stopping) lock (waitingThreads)
throw new OperationCanceledException("Thread Pool to be shut down"); Monitor.Pulse(waitingThreads);
lock (queuedJobs)
{
if (queuedJobs.Contains(poolJob) || CurrentPoolJobs.Contains(poolJob))
return false;
queuedJobs.Enqueue(poolJob);
Monitor.Pulse(queuedJobs);
}
return true;
} }
public int Enqueue(IEnumerable<PoolJob> poolJobs)
public virtual int Enqueue(IEnumerable<PoolJob> poolJobs)
{ {
if (stopping) if (State != PoolState.RUN)
throw new OperationCanceledException("Thread Pool to be shut down"); throw new NotSupportedException("Pool not running");
int count = 0; int count = 0;
lock (queuedJobs) lock (waitingThreads)
{ {
HashSet<PoolJob> currentPoolJobs = new HashSet<PoolJob>(CurrentPoolJobs); HashSet<PoolJob> currentPoolJobs = new HashSet<PoolJob>(CurrentPoolJobs);
foreach (PoolJob j in queuedJobs) foreach (PoolJob j in queuedJobs)
@ -214,160 +149,134 @@ namespace ln.types.threads
count++; count++;
} }
} }
Monitor.PulseAll(queuedJobs); Monitor.PulseAll(waitingThreads);
} }
return count; return count;
} }
private PoolJob Dequeue()
protected virtual PoolJob Dequeue() => Dequeue(PoolThread.CurrentPoolThread?.Value);
protected virtual PoolJob Dequeue(PoolThread poolThread)
{ {
lock (queuedJobs) if (poolThread == null)
throw new NullReferenceException();
if (State == PoolState.RUN)
{ {
if (queuedJobs.Count == 0) lock (waitingThreads)
{ {
Monitor.Wait(queuedJobs); if (queuedJobs.Count > 0)
return queuedJobs.Dequeue();
} }
if (releaseThreads > 0)
{
releaseThreads--;
throw new TimeoutException();
}
if (queuedJobs.Count == 0)
return null;
return queuedJobs.Dequeue();
} }
return null;
} }
private void Supervisor() protected virtual PoolJob WaitForJob(PoolThread poolThread)
{ {
supervisorThread = Thread.CurrentThread; while (State == PoolState.RUN)
lock (supervisorThread)
{ {
lock (waitingThreads)
while (CurrentPoolSize > 0)
{ {
Monitor.Wait(supervisorThread, 1000); PoolJob poolJob = Dequeue(poolThread);
if (poolJob != null)
return poolJob;
while (CurrentPoolSize != TargetPoolSize) waitingThreads.Add(poolThread);
{ Monitor.Wait(waitingThreads);
if (CurrentPoolSize - TargetPoolSize > 0) waitingThreads.Remove(poolThread);
{
lock (queuedJobs)
{
releaseThreads = CurrentPoolSize - TargetPoolSize;
}
if (CurrentPoolSize - TargetPoolSize > 0)
{
lock (queuedJobs)
{
Monitor.PulseAll(queuedJobs);
}
}
}
else if (CurrentPoolSize - TargetPoolSize < 0)
{
for (int n = CurrentPoolSize; n < TargetPoolSize; n++)
new PoolThread(this);
}
}
} }
} }
supervisorThread = null; return null;
} }
protected virtual void WorkerThreadLoop()
public class PoolThread
{ {
public Pool Pool { get; } PoolThread poolThread = PoolThread.CurrentPoolThread.Value;
public Thread Thread { get; } if (poolThread == null)
public PoolThreadState State { get; private set; } throw new NotSupportedException("Pool.WorkerThreadLoop(): Must be called from PoolThread");
public PoolJob CurrentJob { get; private set; } lock (poolThreads)
poolThreads.Add(poolThread);
private bool exitCalled { get; set; } try
public PoolThread(Pool pool)
{ {
State = PoolThreadState.WORKING; while (State == PoolState.RUN)
Pool = pool;
Thread = new Thread(thread);
lock (Pool.poolThreads)
{ {
Pool.poolThreads.Add(this); PoolJob poolJob = WaitForJob(poolThread);
} if (poolJob == null)
WaitStarted();
}
private void WaitStarted()
{
lock (this)
{
Thread.Start();
Monitor.Wait(this, 5000);
}
}
private void thread()
{
lock (this)
{
Monitor.Pulse(this);
}
State = PoolThreadState.READY;
while (!exitCalled)
{
PoolJob poolJob = null;
try
{
poolJob = Pool.Dequeue();
} catch (TimeoutException)
{ {
break; break;
} }
else
if (poolJob != null)
{ {
State = PoolThreadState.WORKING; lock (workingThreads)
CurrentJob = poolJob; workingThreads.Add(poolThread);
CurrentJob.Run(Pool);
if (Pool.PoolJobFinished != null) try
Pool.PoolJobFinished.Invoke(Pool, CurrentJob); {
poolJob.Run(this);
CurrentJob = null; }
State = PoolThreadState.READY; finally
{
lock (workingThreads)
workingThreads.Remove(poolThread);
}
} }
} }
}
State = PoolThreadState.EXITED; catch (PoolThreadMustExitException)
lock (Pool.poolThreads) {}
{ catch (Exception e)
Pool.poolThreads.Remove(this);
}
}
public void Exit()
{ {
lock (this) Logging.Log(e);
}
finally
{
lock (poolThreads)
poolThreads.Remove(poolThread);
}
}
public void Abort()
{
lock (poolThreads)
{
foreach (PoolThread poolThread in poolThreads)
poolThread?.CurrentJob?.RequestAbort();
}
}
private bool AmIPoolThread() => PoolThread.CurrentPoolThread.Value != null;
public PoolThread FindPoolThread(PoolJob job)
{
lock (poolThreads)
{
foreach (PoolThread poolThread in poolThreads)
{ {
exitCalled = true; if (poolThread.CurrentJob == job)
return poolThread;
} }
} }
return null;
}
protected virtual PoolThread CreatePoolThread()
{
PoolThread poolThread = new PoolThread(WorkerThreadLoop);
return poolThread;
} }
public void Dispose() public void Dispose()
{ {
Close(); if (State == PoolState.RUN)
Stop();
} }
} }
} }

View File

@ -20,7 +20,7 @@ namespace ln.types.threads
public PoolJobState JobState { get; private set; } public PoolJobState JobState { get; private set; }
public bool AbortRequested { get; private set; } public bool JobAbortRequested { get; private set; }
private Pool Pool { get; set; } private Pool Pool { get; set; }
@ -52,7 +52,16 @@ namespace ln.types.threads
public void RequestAbort() public void RequestAbort()
{ {
AbortRequested = true; JobAbortRequested = true;
if (Pool != null)
{
PoolThread poolThread = Pool.FindPoolThread(this);
if (poolThread != null)
{
if (poolThread.Thread.ThreadState == ThreadState.WaitSleepJoin)
poolThread.Thread.Interrupt();
}
}
} }
public void ForceFail() public void ForceFail()
@ -142,7 +151,7 @@ namespace ln.types.threads
break; break;
} }
if (AbortRequested) if (JobAbortRequested)
job.RequestAbort(); job.RequestAbort();
} }

View File

@ -0,0 +1,79 @@
using System;
using System.Threading;
namespace ln.types.threads
{
public class PoolThreadMustExitException : Exception
{}
public class PoolThread
{
public static ThreadLocal<PoolThread> CurrentPoolThread { get; } = new ThreadLocal<PoolThread>();
public Pool Pool { get; private set; }
public Thread Thread { get; }
public PoolJob CurrentJob { get; private set; }
private Action ThreadLoop;
public PoolThread(Action threadLoop)
{
ThreadLoop = threadLoop;
Thread = new Thread(thread);
Thread.Start();
}
private void thread()
{
lock (this)
{
CurrentPoolThread.Value = this;
}
ThreadLoop();
lock (this)
{
CurrentPoolThread.Value = null;
}
}
//while (Pool.State != PoolState.SHUTDOWN)
//{
// PoolJob poolJob = null;
// try
// {
// poolJob = Pool.Dequeue();
// }
// catch (TimeoutException)
// {
// break;
// }
// if (poolJob != null)
// {
// State = PoolThreadState.WORKING;
// CurrentJob = poolJob;
// CurrentJob.Run(Pool);
// if (Pool.PoolJobFinished != null)
// Pool.PoolJobFinished.Invoke(Pool, CurrentJob);
// CurrentJob = null;
// State = PoolThreadState.READY;
// }
//}
//CurrentPoolThread.Value = null;
//State = PoolThreadState.EXITED;
//lock (Pool.poolThreads)
//{
// Pool.poolThreads.Remove(this);
// Monitor.Pulse(Pool.poolThreads);
//}
}
}