diff --git a/ln.threading.sln b/ln.threading.sln index 0b87b90..245a12b 100644 --- a/ln.threading.sln +++ b/ln.threading.sln @@ -5,6 +5,8 @@ VisualStudioVersion = 15.0.26124.0 MinimumVisualStudioVersion = 15.0.26124.0 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ln.threading", "ln.threading\ln.threading.csproj", "{EF42AC52-F094-4A9F-80D1-6E4365C89F70}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ln.threading.tests", "ln.threading.tests\ln.threading.tests.csproj", "{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -30,5 +32,17 @@ Global {EF42AC52-F094-4A9F-80D1-6E4365C89F70}.Release|x64.Build.0 = Release|Any CPU {EF42AC52-F094-4A9F-80D1-6E4365C89F70}.Release|x86.ActiveCfg = Release|Any CPU {EF42AC52-F094-4A9F-80D1-6E4365C89F70}.Release|x86.Build.0 = Release|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|x64.ActiveCfg = Debug|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|x64.Build.0 = Debug|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|x86.ActiveCfg = Debug|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|x86.Build.0 = Debug|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|Any CPU.Build.0 = Release|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|x64.ActiveCfg = Release|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|x64.Build.0 = Release|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|x86.ActiveCfg = Release|Any CPU + {764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/ln.threading.tests/PromiseTests.cs b/ln.threading.tests/PromiseTests.cs new file mode 100644 index 0000000..70abcb0 --- /dev/null +++ b/ln.threading.tests/PromiseTests.cs @@ -0,0 +1,110 @@ +using System; +using NUnit.Framework; + +namespace ln.threading.tests +{ + public class PromiseTests + { + [Test] + public void Test_A_Promises() + { + Promise p = new Promise((resolve,reject)=>{ + resolve(1234); + }); + + Assert.AreEqual(1234, p.Value); + + Assert.AreEqual( + 33, + new Promise((resolve,reject)=>{ + resolve("resolved"); + }) + .Then((value)=>{ + return 4; + }) + .Then((value)=>{ + throw new Exception("TestException"); + }) + .Then((value)=>{ + return value + 16; + }) + .Catch((e)=>{ + return 32; + }) + .Then((value)=>{ + return value + 1; + }) + .Value + ); + + Assert.AreEqual( + 23, + new Promise((resolve,reject)=>{ + resolve("resolved"); + }) + .Then((value)=>{ + return 4; + }) + .Then((value)=>{ + return value + 2; + }) + .Then((value)=>{ + return value + 16; + }) + .Catch((e)=>{ + return 32; + }) + .Then((value)=>{ + return value + 1; + }) + .Value + ); + } + + [Test] + public void Test_B_Promises() + { + Assert.AreEqual( + "The magic result is 129", + createTestPromise(1).Value + ); + Assert.AreEqual( + "The magic result is 130", + createTestPromise(2).Value + ); + Assert.AreEqual( + "The magic result is 131", + createTestPromise(3).Value + ); + Assert.AreEqual( + "The magic result is 132", + createTestPromise(4).Value + ); + Assert.AreEqual( + "sorry, I have no result for you", + createTestPromise(11).Value + ); + } + + public Promise createTestPromise(int i) + { + return new Promise((resolve, reject)=>{ + resolve(i); + }) + .Then((n)=>{ + if (n== 11) + throw new Exception("I don't like 11"); + return n + 128; + }) + .Then( + (n)=>{ + return string.Format("The magic result is {0}", n); + }, + (e)=>{ + return "sorry, I have no result for you"; + }) + ; + } + + } +} \ No newline at end of file diff --git a/ln.threading.tests/UnitTest1.cs b/ln.threading.tests/UnitTest1.cs new file mode 100644 index 0000000..cd3caf0 --- /dev/null +++ b/ln.threading.tests/UnitTest1.cs @@ -0,0 +1,87 @@ +using NUnit.Framework; +using System; +using System.Threading; + +namespace ln.threading.tests +{ + public class Tests + { + bool setup; + + [SetUp] + public void Setup() + { + if (!setup) + { + DynamicThreadPool.DefaultPool.Enqueue(()=>{ + while (true) + { + Thread.Sleep(1000); + TestContext.Error.WriteLine("SimpleThreadPool: Threads={0} Idle={1} AvgIdle={5} Tasks={2} Min={3} Max={4}", DynamicThreadPool.DefaultPool.CurrentThreads, DynamicThreadPool.DefaultPool.IdleThreads, DynamicThreadPool.DefaultPool.QueuedTasks, DynamicThreadPool.DefaultPool.MinThreads, DynamicThreadPool.DefaultPool.MaxThreads, DynamicThreadPool.DefaultPool.AverageIdleThreads); + TestContext.Error.Flush(); + } + }); + setup = true; + } + } + + [Test] + public void Test0_SimpleThreadPool() + { + bool success = false; + DynamicThreadPool.DefaultPool.Enqueue(()=>{success=true;}); + Thread.Sleep(TimeSpan.FromSeconds(1)); + Assert.IsTrue(success); + + for (int n=0;n<32;n++) + DynamicThreadPool.DefaultPool.Enqueue(CreateTask(n)); + + Thread.Sleep(3200); + + Assert.AreEqual(0, DynamicThreadPool.DefaultPool.QueuedTasks); + } + + public Action CreateTask(int n) + { + return ()=>{ + TestContext.Error.WriteLine("STP_task_{0}", n); + TestContext.Error.Flush(); + Thread.Sleep(100 * n); + }; + } + + [Test] + public void Test1() + { + int a = 0; + + PoolTimer timer = new PoolTimer(100, ()=>{ + lock (this) + { + a++; + TestContext.Error.WriteLine("timer elapsed: {0}", a); + TestContext.Error.Flush(); + } + }); + + timer.Start(); + Thread.Sleep(TimeSpan.FromSeconds(2.05)); + timer.Stop(); + + Assert.AreEqual(20, a); + + Assert.Pass(); + } + + [Test] + public void ZZZ_Waiter() + { + Thread.Sleep(30000); + } + [Test] + public void AAA_Waiter() + { + Thread.Sleep(5000); + } + } +} \ No newline at end of file diff --git a/ln.threading.tests/ln.threading.tests.csproj b/ln.threading.tests/ln.threading.tests.csproj new file mode 100644 index 0000000..76aaab1 --- /dev/null +++ b/ln.threading.tests/ln.threading.tests.csproj @@ -0,0 +1,17 @@ + + + + netcoreapp3.1 + + false + + + + + + + + + + + diff --git a/ln.threading/DynamicThreadPool.cs b/ln.threading/DynamicThreadPool.cs new file mode 100644 index 0000000..5fa2e5d --- /dev/null +++ b/ln.threading/DynamicThreadPool.cs @@ -0,0 +1,145 @@ + +using System; +using System.Collections.Generic; +using System.Threading; +using ln.logging; + +namespace ln.threading +{ + public class DynamicThreadPool : IDisposable + { + public static DynamicThreadPool DefaultPool { get; } = new DynamicThreadPool(); + + public int MinThreads { get; set; } = Environment.ProcessorCount * 2; + public int MaxThreads { get; set; } = Environment.ProcessorCount * 32; + + public int IdleWaitTime { get; set; } = 5000; + + public int QueuedTasks { get { lock(queuedTasks) return queuedTasks.Count; }} + public int CurrentThreads { get { lock(poolThreads) return poolThreads.Count; }} + public int IdleThreads { get { lock(idleThreads) return idleThreads.Count; }} + + + Queue queuedTasks = new Queue(); + HashSet poolThreads = new HashSet(); + HashSet idleThreads = new HashSet(); + + double avgIdleThreads; + public double AverageIdleThreads => avgIdleThreads; + + Thread threadController; + bool stopController; + + + public DynamicThreadPool() + { + threadController = new Thread(Controller); + threadController.IsBackground = true; + threadController.Start(); + } + + void Controller() + { + while (!stopController) + { + Thread.Sleep(100); + lock (threadController) + { + avgIdleThreads = (0.9 * avgIdleThreads) + (0.1 * IdleThreads); + + if ((avgIdleThreads - MinThreads) <= 0.0) + { + avgIdleThreads += 1.0; + StartThread(); + } + } + } + } + + public void Enqueue(Action action) + { + lock (idleThreads) + { + lock (queuedTasks) + queuedTasks.Enqueue(action); + + if (idleThreads.Count > 0) + Monitor.Pulse(idleThreads); + else + { + lock (poolThreads) + if (poolThreads.Count < MaxThreads) + { + StartThread(); + } + } + } + } + + void StartThread() + { + Thread t = new Thread(PoolThread); + t.Start(); + } + + bool TryDequeueTask(out Action action) + { + lock (queuedTasks) + return queuedTasks.TryDequeue(out action); + } + + public void PoolThread() + { + Thread currentThread = Thread.CurrentThread; + try + { + Action action; + + currentThread.IsBackground = true; + lock (poolThreads) + poolThreads.Add(currentThread); + + while (true) + { + while (TryDequeueTask(out action)) + { + try + { + action(); + } catch (Exception e) + { + Logging.Log(LogLevel.ERROR, "ThreadPool: task threw exception: {0}", e.ToString()); + Logging.Log(e); + } + } + + lock (idleThreads) + { + idleThreads.Add(currentThread); + bool wasPulsed = Monitor.Wait(idleThreads, IdleWaitTime); + idleThreads.Remove(currentThread); + + lock (threadController) + if (!wasPulsed && (avgIdleThreads - MinThreads)>1.0) + { + avgIdleThreads-=1.0; + break; + } + } + } + } catch (Exception e) + { + Logging.Log(e); + } finally + { + lock (poolThreads) + poolThreads.Remove(currentThread); + } + } + + public void Dispose() + { + stopController = true; + } + } +} \ No newline at end of file diff --git a/ln.threading/Pool.cs b/ln.threading/Pool.cs index e0b8f1c..fe16ab6 100644 --- a/ln.threading/Pool.cs +++ b/ln.threading/Pool.cs @@ -107,7 +107,7 @@ namespace ln.threading return null; poolJob.Prepare(); - queuedJobs.Enqueue(poolJob); + queuedJobs.Enqueue(poolJob); PulseWaitingThread(); } diff --git a/ln.threading/Promise.cs b/ln.threading/Promise.cs new file mode 100644 index 0000000..6add56e --- /dev/null +++ b/ln.threading/Promise.cs @@ -0,0 +1,233 @@ + +using System; +using System.Threading; + +namespace ln.threading +{ + public class PromiseNotPendingException : Exception {} + public class PromiseRejectedException : Exception + { + public PromiseRejectedException(){} + public PromiseRejectedException(Exception caughtExcpetion) : base("Promise was rejected", caughtExcpetion) {} + + } + + public enum PromiseState { PENDING, RESOLVED, REJECTED } + public class Promise + { + public event Action Resolved; + public event Action Rejected; + public event Action> Settled; + + public bool IsSettled { get; private set; } + + public PromiseState State { get; private set; } = PromiseState.PENDING; + T value; + Exception rejectingException; + + + public Promise() {} + public Promise(Action,Action> resolver) + { + Resolve(resolver); + } + + void Resolve(Action,Action> resolver) + { + DynamicThreadPool.DefaultPool.Enqueue(()=>{ + try{ + resolver(Resolve, Reject); + } catch (Exception e) + { + Reject(e); + } + }); + } + + + public void Resolve(T value) + { + lock (this) + { + if (State == PromiseState.PENDING) + { + this.value = value; + State = PromiseState.RESOLVED; + IsSettled = true; + Monitor.PulseAll(this); + DynamicThreadPool.DefaultPool.Enqueue(()=> { + Resolved?.Invoke(this.value); + Settled?.Invoke(this); + }); + } else + { + throw new PromiseNotPendingException(); + } + } + } + + public void Reject() => Reject(null); + public void Reject(Exception rejectingException) + { + lock (this) + { + if (State != PromiseState.PENDING) + throw new PromiseNotPendingException(); + + this.rejectingException = rejectingException; + IsSettled = true; + + State = PromiseState.REJECTED; + Monitor.PulseAll(this); + DynamicThreadPool.DefaultPool.Enqueue(()=> { + Rejected?.Invoke(this.rejectingException); + Settled?.Invoke(this); + }); + } + } + + public T Value { + get { + lock (this) + { + while (State == PromiseState.PENDING) + Monitor.Wait(this); + + if (State == PromiseState.REJECTED) + throw new PromiseRejectedException(rejectingException); + if (State == PromiseState.RESOLVED) + return value; + + throw new Exception("serious bug in Promise implementation"); + } + } + } + + + public Promise Then(Func resolved) => Then(resolved, null); + public Promise Then(Func resolved, Func rejected) + { + lock (this) + { + Promise chainedPromise = new Promise(); + Action resolveAction = ()=>{ + lock (this) + { + switch (State) + { + case PromiseState.PENDING: + throw new Exception("serious bug in Promise.Then(..)"); + case PromiseState.RESOLVED: + try{ + chainedPromise.Resolve(resolved(value)); + } catch (Exception e) + { + chainedPromise.Reject(e); + } + break; + case PromiseState.REJECTED: + if (rejected == null) + chainedPromise.Reject(rejectingException); + else + { + try{ + chainedPromise.Resolve(rejected(rejectingException)); + } catch (Exception e) + { + chainedPromise.Reject(e); + } + } + break; + } + } + }; + + if (IsSettled) + DynamicThreadPool.DefaultPool.Enqueue(resolveAction); + else + Settled += (p) => resolveAction(); + + return chainedPromise; + } + } + + public Promise Catch(Func rejectedHandler) + { + lock (this) + { + Promise chainedPromise = new Promise(); + Action resolveAction = ()=>{ + lock (this) + { + switch (State) + { + case PromiseState.PENDING: + throw new Exception("serious bug in Promise.Catch(..)"); + case PromiseState.RESOLVED: + chainedPromise.Resolve(value); + break; + case PromiseState.REJECTED: + try + { + chainedPromise.Resolve( rejectedHandler(rejectingException)); + } catch (Exception e) + { + chainedPromise.Reject(e); + } + break; + } + } + }; + + if (IsSettled) + DynamicThreadPool.DefaultPool.Enqueue(resolveAction); + else + Settled += (p) => resolveAction(); + + return chainedPromise; + } + } + + public Promise Finally(Action finallyHandler) + { + Promise chainedPromise = new Promise(); + lock (this) + { + Action resolveAction = ()=>{ + lock (this) + { + if (State == PromiseState.PENDING) + throw new Exception("serious bug in Promise.Finally(..)"); + + try{ + finallyHandler(); + + switch (State) + { + case PromiseState.RESOLVED: + chainedPromise.Resolve(value); + break; + case PromiseState.REJECTED: + chainedPromise.Reject(rejectingException); + break; + } + } catch (Exception e) + { + chainedPromise.Reject(e); + } + } + }; + + if (IsSettled) + DynamicThreadPool.DefaultPool.Enqueue(resolveAction); + else + Settled += (p) => resolveAction(); + + return chainedPromise; + } + } + + + + } +} \ No newline at end of file diff --git a/ln.threading/Timer.cs b/ln.threading/Timer.cs new file mode 100644 index 0000000..8505502 --- /dev/null +++ b/ln.threading/Timer.cs @@ -0,0 +1,129 @@ + +using System; +using System.Threading; + +namespace ln.threading +{ + public delegate void ElapsedDelegate(PoolTimer timer); + + public class PoolTimer : IDisposable + { + public event ElapsedDelegate Elapsed; + + TimeSpan interval; + public TimeSpan Interval { + get => interval; + set { + if (value <= TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(value)); + + interval = value; + + lock (this){ + Monitor.Pulse(this); + } + } + } + + DateTime nextStart; + public DateTime NextStart => nextStart; + + + + bool canceled = true; + public bool IsActive { + get => !canceled; + private set { + lock (this) + { + if (value && canceled) + Start(); + else if (!value && !canceled) + Stop(); + } + } + } + + + public PoolTimer(TimeSpan interval) + { + Interval = interval; + } + public PoolTimer(double interval) + { + Interval = TimeSpan.FromSeconds(interval); + } + public PoolTimer(int interval) + { + Interval = TimeSpan.FromMilliseconds(interval); + } + + public PoolTimer(TimeSpan interval, Action action) :this(action) + { + Interval = interval; + } + public PoolTimer(double interval, Action action) :this(action) + { + Interval = TimeSpan.FromSeconds(interval); + } + public PoolTimer(int interval, Action action) :this(action) + { + Interval = TimeSpan.FromMilliseconds(interval); + } + + PoolTimer(Action action) + { + Elapsed += (t)=>action(); + } + + public void Start() + { + lock (this) + { + if (canceled){ + canceled = false; + DynamicThreadPool.DefaultPool.Enqueue(TimerThread); + } + } + } + + public void Stop() + { + lock (this) + { + if (!canceled) + { + canceled = true; + Monitor.Pulse(this); + } + } + } + + void TimerThread() + { + lock (this) + { + nextStart = DateTime.Now + Interval; + DateTime now; + + while (!canceled) + { + now = DateTime.Now; + + if ((nextStart < now) || (!Monitor.Wait(this, nextStart - now) && !canceled)) + { + DynamicThreadPool.DefaultPool.Enqueue(()=>Elapsed?.Invoke(this)); + nextStart += Interval; + } else { + nextStart = DateTime.Now + Interval; + } + } + } + } + + public void Dispose() + { + Stop(); + } + } +} \ No newline at end of file diff --git a/ln.threading/ln.threading.csproj b/ln.threading/ln.threading.csproj index b54b483..f86e0fa 100644 --- a/ln.threading/ln.threading.csproj +++ b/ln.threading/ln.threading.csproj @@ -2,7 +2,7 @@ netcoreapp3.1 - 0.1.0 + 0.2.0 Harald Wolff-Thobaben l--n.de