ln.types/threads/Pool.cs

345 lines
8.7 KiB
C#

using System;
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using ln.logging;
namespace ln.types.threads
{
public enum PoolThreadState { READY, WORKING, EXITED }
public class Pool : IDisposable
{
public PoolThread[] PoolThreads
{
get
{
lock (poolThreads)
{
return poolThreads.ToArray();
}
}
}
public int CurrentPoolSize
{
get
{
lock (poolThreads) { return poolThreads.Count; }
}
set
{
TargetPoolSize = value;
}
}
public int TargetPoolSize { get; set; }
public PoolThreadState[] ThreadStates
{
get
{
lock (poolThreads)
{
return poolThreads.Select((x) => x.State).ToArray();
}
}
}
public int NumQueuedJobs => queuedJobs.Count;
public PoolJob[] CurrentPoolJobs
{
get
{
lock (poolThreads)
{
return poolThreads.Select((t) => t.CurrentJob).Where((j) => j != null).ToArray();
}
}
}
public PoolJob[] QueuedJobs
{
get
{
lock (queuedJobs)
{
return queuedJobs.ToArray();
}
}
}
private List<PoolThread> poolThreads = new List<PoolThread>();
private Queue<PoolJob> queuedJobs = new Queue<PoolJob>();
private bool stopping;
private int releaseThreads = 0;
private Thread supervisorThread;
public Pool()
{
SetPoolSize(4);
}
public Pool(int numThreads)
{
SetPoolSize(numThreads);
}
public void Close()
{
lock (queuedJobs)
{
stopping = true;
foreach (PoolJob job in queuedJobs)
{
job.RequestAbort();
job.ForceFail();
}
queuedJobs.Clear();
}
TargetPoolSize = 0;
int waitFor = AmIPoolThread() ? 1 : 0;
while (CurrentPoolSize > waitFor)
{
Thread.Sleep(250);
}
stopping = false;
}
public void Abort()
{
foreach (PoolJob job in CurrentPoolJobs)
{
job.RequestAbort();
}
}
private PoolThread FindWaitingThread()
{
lock (queuedJobs)
{
foreach (PoolThread poolThread in poolThreads)
if (poolThread.State == PoolThreadState.READY)
return poolThread;
}
return null;
}
private bool AmIPoolThread()
{
Thread thread = Thread.CurrentThread;
lock (poolThreads)
{
foreach (PoolThread poolThread in poolThreads)
{
if (thread.Equals(poolThread.Thread))
return true;
}
}
return false;
}
public void SetPoolSize(int poolSize)
{
TargetPoolSize = poolSize;
if ((CurrentPoolSize == 0) && (TargetPoolSize > 0))
{
new PoolThread(this);
new Thread(() => Supervisor()).Start();
}
}
public void Enqueue(JobDelegate job)
{
Enqueue((poolJob) => job());
}
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate)
{
return Enqueue(extendedJobDelegate, "N/A");
}
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate,string jobName)
{
PoolJob poolJob = new PoolJob(jobName, extendedJobDelegate);
Enqueue(poolJob);
return poolJob;
}
public bool Enqueue(PoolJob poolJob)
{
if (stopping)
throw new OperationCanceledException("Thread Pool to be shut down");
lock (queuedJobs)
{
if (queuedJobs.Contains(poolJob) || CurrentPoolJobs.Contains(poolJob))
return false;
queuedJobs.Enqueue(poolJob);
Monitor.Pulse(queuedJobs);
}
return true;
}
private PoolJob Dequeue()
{
lock (queuedJobs)
{
if (queuedJobs.Count == 0)
{
Monitor.Wait(queuedJobs);
}
if (releaseThreads > 0)
{
releaseThreads--;
throw new TimeoutException();
}
if (queuedJobs.Count == 0)
return null;
return queuedJobs.Dequeue();
}
}
private void Supervisor()
{
supervisorThread = Thread.CurrentThread;
lock (supervisorThread)
{
while (CurrentPoolSize > 0)
{
Monitor.Wait(supervisorThread, 1000);
while (CurrentPoolSize != TargetPoolSize)
{
if (CurrentPoolSize - TargetPoolSize > 0)
{
lock (queuedJobs)
{
releaseThreads = CurrentPoolSize - TargetPoolSize;
}
if (CurrentPoolSize - TargetPoolSize > 0)
{
lock (queuedJobs)
{
Monitor.PulseAll(queuedJobs);
}
}
}
else if (CurrentPoolSize - TargetPoolSize < 0)
{
for (int n = CurrentPoolSize; n < TargetPoolSize; n++)
new PoolThread(this);
}
}
}
}
supervisorThread = null;
}
public class PoolThread
{
public Pool Pool { get; }
public Thread Thread { get; }
public PoolThreadState State { get; private set; }
public PoolJob CurrentJob { get; private set; }
private bool exitCalled { get; set; }
public PoolThread(Pool pool)
{
State = PoolThreadState.WORKING;
Pool = pool;
Thread = new Thread(thread);
lock (Pool.poolThreads)
{
Pool.poolThreads.Add(this);
}
WaitStarted();
}
private void WaitStarted()
{
lock (this)
{
Thread.Start();
Monitor.Wait(this, 5000);
}
}
private void thread()
{
lock (this)
{
Monitor.Pulse(this);
}
State = PoolThreadState.READY;
while (!exitCalled)
{
PoolJob poolJob = null;
try
{
poolJob = Pool.Dequeue();
} catch (TimeoutException)
{
break;
}
if (Pool.poolThreads.Count > 4)
{
Logging.Log(LogLevel.DEBUGDETAIL, "--");
}
if (poolJob != null)
{
State = PoolThreadState.WORKING;
CurrentJob = poolJob;
CurrentJob.Run(Pool);
CurrentJob = null;
State = PoolThreadState.READY;
}
}
State = PoolThreadState.EXITED;
lock (Pool.poolThreads)
{
Pool.poolThreads.Remove(this);
}
}
public void Exit()
{
lock (this)
{
exitCalled = true;
}
}
}
public void Dispose()
{
Close();
}
}
}