sharp-extensions/ExtendableThreadPool.cs

175 lines
3.0 KiB
C#

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
namespace sharp.extensions
{
public delegate void TaskDelegate();
public class ExtendableThreadPool
{
Stack<ExtendableThread> availableThreads = new Stack<ExtendableThread>();
List<ExtendableThread> activeThreads = new List<ExtendableThread>();
Queue<TaskDelegate> queuedTasks = new Queue<TaskDelegate>();
public static ThreadLocal<ExtendableThread> CurrentExtendedThread { get; private set; } = new ThreadLocal<ExtendableThread>();
public int MaximumConcurrentThreads { get; set; } = -1;
public ExtendableThreadPool()
{
}
public void QueueTask(TaskDelegate task){
lock(this)
{
queuedTasks.Enqueue(task);
SignalQueue();
}
}
private void SignalQueue(){
ExtendableThread st;
lock (this){
if (availableThreads.Count > 0){
st = availableThreads.Pop();
Monitor.Enter(st);
Monitor.Pulse(st);
Monitor.Exit(st);
} else {
st = new ExtendableThread(this);
}
}
}
private TaskDelegate fetchTask(){
lock (this){
if (queuedTasks.Count == 0){
return null;
} else {
return queuedTasks.Dequeue();
}
}
}
protected void InvokeTaskDelegate(TaskDelegate task)
{
try
{
task.Invoke();
}
catch (Exception e)
{
Console.WriteLine("InvokeTaskDelegate:t Task threw exception: {0}", e.ToString());
}
}
public class ExtendableThread
{
Thread thread;
bool requestExit;
bool isReady;
public ExtendableThreadPool Pool { get; private set; }
public ExtendableThread(ExtendableThreadPool pool)
{
Pool = pool;
thread = new Thread(worker);
thread.Start();
}
public bool IsReady {
get
{
lock (this){
return isReady;
}
}
}
public bool ExitRequested {
get { lock(this) { return requestExit; } }
}
protected void InvokeTaskDelegate(TaskDelegate task)
{
Pool.InvokeTaskDelegate(task);
}
private void worker(){
ExtendableThreadPool.CurrentExtendedThread.Value = this;
bool taskAvailable = true;
while (!ExitRequested){
TaskDelegate t;
while ((t = Pool.fetchTask()) != null)
{
if (t != null)
{
InvokeTaskDelegate(t);
}
}
lock(Pool)
{
Pool.activeThreads.Remove(this);
Pool.availableThreads.Push(this);
}
lock(this)
{
isReady = true;
while (!Monitor.Wait(this, 250) && !ExitRequested) { }
if (ExitRequested)
{
break;
}
}
lock (Pool)
{
Pool.activeThreads.Add(this);
}
}
lock(this){
Monitor.Pulse(this);
isReady = false;
}
}
public void Stop()
{
lock (this)
{
requestExit = true;
}
}
public void Stop(int timeoutms)
{
lock (this)
{
requestExit = true;
Monitor.Wait(this, timeoutms);
if (thread.IsAlive){
thread.Abort();
}
}
}
}
}
}