ln.threading/ln.threading/DynamicThreadPool.cs

145 lines
4.2 KiB
C#

using System;
using System.Collections.Generic;
using System.Threading;
using ln.logging;
namespace ln.threading
{
public class DynamicThreadPool : IDisposable
{
public static DynamicThreadPool DefaultPool { get; } = new DynamicThreadPool();
public int MinThreads { get; set; } = Environment.ProcessorCount * 2;
public int MaxThreads { get; set; } = Environment.ProcessorCount * 32;
public int IdleWaitTime { get; set; } = 5000;
public int QueuedTasks { get { lock(queuedTasks) return queuedTasks.Count; }}
public int CurrentThreads { get { lock(poolThreads) return poolThreads.Count; }}
public int IdleThreads { get { lock(idleThreads) return idleThreads.Count; }}
Queue<Action> queuedTasks = new Queue<Action>();
HashSet<Thread> poolThreads = new HashSet<Thread>();
HashSet<Thread> idleThreads = new HashSet<Thread>();
double avgIdleThreads;
public double AverageIdleThreads => avgIdleThreads;
Thread threadController;
bool stopController;
public DynamicThreadPool()
{
threadController = new Thread(Controller);
threadController.IsBackground = true;
threadController.Start();
}
void Controller()
{
while (!stopController)
{
Thread.Sleep(100);
lock (threadController)
{
avgIdleThreads = (0.9 * avgIdleThreads) + (0.1 * IdleThreads);
if ((avgIdleThreads - MinThreads) <= 0.0)
{
avgIdleThreads += 1.0;
StartThread();
}
}
}
}
public void Enqueue(Action action)
{
lock (idleThreads)
{
lock (queuedTasks)
queuedTasks.Enqueue(action);
if (idleThreads.Count > 0)
Monitor.Pulse(idleThreads);
else
{
lock (poolThreads)
if (poolThreads.Count < MaxThreads)
{
StartThread();
}
}
}
}
void StartThread()
{
Thread t = new Thread(PoolThread);
t.Start();
}
bool TryDequeueTask(out Action action)
{
lock (queuedTasks)
return queuedTasks.TryDequeue(out action);
}
public void PoolThread()
{
Thread currentThread = Thread.CurrentThread;
try
{
Action action;
currentThread.IsBackground = true;
lock (poolThreads)
poolThreads.Add(currentThread);
while (true)
{
while (TryDequeueTask(out action))
{
try
{
action();
} catch (Exception e)
{
Logging.Log(LogLevel.ERROR, "ThreadPool: task threw exception: {0}", e.ToString());
Logging.Log(e);
}
}
lock (idleThreads)
{
idleThreads.Add(currentThread);
bool wasPulsed = Monitor.Wait(idleThreads, IdleWaitTime);
idleThreads.Remove(currentThread);
lock (threadController)
if (!wasPulsed && (avgIdleThreads - MinThreads)>1.0)
{
avgIdleThreads-=1.0;
break;
}
}
}
} catch (Exception e)
{
Logging.Log(e);
} finally
{
lock (poolThreads)
poolThreads.Remove(currentThread);
}
}
public void Dispose()
{
stopController = true;
}
}
}