ln.skyscanner/threads/Pool.cs

169 lines
4.4 KiB
C#

using System;
using System.Threading;
using System.Collections.Generic;
using System.Linq.Expressions;
using ln.logging;
using System.Linq;
namespace ln.skyscanner.threads
{
public delegate void JobDelegate();
public enum PoolThreadState { READY, WORKING, EXITED }
public class Pool :IDisposable
{
public PoolThread[] PoolThreads => poolThreads.ToArray();
public int NumThreads
{
get => poolThreads.Count;
set => setPoolThreads(value);
}
public PoolThreadState[] ThreadStates => poolThreads.Select((x) => x.State).ToArray();
public int QueuedJobs => queuedJobs.Count;
private List<PoolThread> poolThreads = new List<PoolThread>();
private Queue<JobDelegate> queuedJobs = new Queue<JobDelegate>();
private bool stopping;
public Pool()
{
NumThreads = 4;
}
public void Close()
{
stopping = true;
NumThreads = 0;
stopping = false;
}
private void setPoolThreads(int count)
{
lock (poolThreads)
{
while (poolThreads.Count > count)
{
poolThreads[poolThreads.Count - 1].Exit();
}
while (poolThreads.Count < count)
{
poolThreads.Add(new PoolThread(this));
}
}
}
public void Enqueue(JobDelegate job)
{
if (stopping)
return;
lock (queuedJobs)
{
queuedJobs.Enqueue(job);
Monitor.Pulse(queuedJobs);
}
}
public class PoolThread
{
public Pool Pool { get; }
public Thread Thread { get; }
public PoolThreadState State { get; private set; }
private bool exitCalled { get; set; }
public PoolThread(Pool pool)
{
Pool = pool;
Thread = new Thread(thread);
WaitStarted();
}
private void WaitStarted()
{
lock (this)
{
Thread.Start();
Monitor.Wait(this, 5000);
}
}
private void thread()
{
lock (this)
{
Monitor.Pulse(this);
}
while (!exitCalled)
{
try
{
State = PoolThreadState.WORKING;
lock (Pool.queuedJobs)
{
while (!exitCalled && Pool.queuedJobs.Count > 0)
{
JobDelegate job = Pool.queuedJobs.Dequeue();
Monitor.Exit(Pool.queuedJobs);
Logging.Log(LogLevel.DEBUG, "PoolThread: starting next job {0}", job);
try
{
job();
}
catch (Exception ie)
{
Logging.Log(ie);
}
Monitor.Enter(Pool.queuedJobs);
State = PoolThreadState.READY;
Monitor.Wait(Pool.queuedJobs, 5000);
State = PoolThreadState.WORKING;
}
}
}
catch (Exception e)
{
Logging.Log(e);
}
}
lock (this)
{
State = PoolThreadState.EXITED;
Monitor.Pulse(this);
}
}
public void Exit()
{
lock (this)
{
exitCalled = true;
Monitor.Pulse(this);
Monitor.Wait(this);
Pool.poolThreads.Remove(this);
}
}
}
public void Dispose()
{
Close();
}
}
}