Added DynamicThreadPool, Timer, Promise
ln.build - build0.waldrennach.l--n.de build job pending Details

master
Harald Wolff 2020-12-30 12:12:36 +01:00
parent 596c0e60b3
commit d3f4cc2033
9 changed files with 737 additions and 2 deletions

View File

@ -5,6 +5,8 @@ VisualStudioVersion = 15.0.26124.0
MinimumVisualStudioVersion = 15.0.26124.0
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ln.threading", "ln.threading\ln.threading.csproj", "{EF42AC52-F094-4A9F-80D1-6E4365C89F70}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ln.threading.tests", "ln.threading.tests\ln.threading.tests.csproj", "{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -30,5 +32,17 @@ Global
{EF42AC52-F094-4A9F-80D1-6E4365C89F70}.Release|x64.Build.0 = Release|Any CPU
{EF42AC52-F094-4A9F-80D1-6E4365C89F70}.Release|x86.ActiveCfg = Release|Any CPU
{EF42AC52-F094-4A9F-80D1-6E4365C89F70}.Release|x86.Build.0 = Release|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|x64.ActiveCfg = Debug|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|x64.Build.0 = Debug|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|x86.ActiveCfg = Debug|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Debug|x86.Build.0 = Debug|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|Any CPU.Build.0 = Release|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|x64.ActiveCfg = Release|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|x64.Build.0 = Release|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|x86.ActiveCfg = Release|Any CPU
{764FFAE4-D96C-4DF9-96E0-9CD0B5D98FBC}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal

View File

@ -0,0 +1,110 @@
using System;
using NUnit.Framework;
namespace ln.threading.tests
{
public class PromiseTests
{
[Test]
public void Test_A_Promises()
{
Promise<int> p = new Promise<int>((resolve,reject)=>{
resolve(1234);
});
Assert.AreEqual(1234, p.Value);
Assert.AreEqual(
33,
new Promise<string>((resolve,reject)=>{
resolve("resolved");
})
.Then((value)=>{
return 4;
})
.Then<int>((value)=>{
throw new Exception("TestException");
})
.Then((value)=>{
return value + 16;
})
.Catch((e)=>{
return 32;
})
.Then((value)=>{
return value + 1;
})
.Value
);
Assert.AreEqual(
23,
new Promise<string>((resolve,reject)=>{
resolve("resolved");
})
.Then((value)=>{
return 4;
})
.Then<int>((value)=>{
return value + 2;
})
.Then((value)=>{
return value + 16;
})
.Catch((e)=>{
return 32;
})
.Then((value)=>{
return value + 1;
})
.Value
);
}
[Test]
public void Test_B_Promises()
{
Assert.AreEqual(
"The magic result is 129",
createTestPromise(1).Value
);
Assert.AreEqual(
"The magic result is 130",
createTestPromise(2).Value
);
Assert.AreEqual(
"The magic result is 131",
createTestPromise(3).Value
);
Assert.AreEqual(
"The magic result is 132",
createTestPromise(4).Value
);
Assert.AreEqual(
"sorry, I have no result for you",
createTestPromise(11).Value
);
}
public Promise<string> createTestPromise(int i)
{
return new Promise<int>((resolve, reject)=>{
resolve(i);
})
.Then((n)=>{
if (n== 11)
throw new Exception("I don't like 11");
return n + 128;
})
.Then(
(n)=>{
return string.Format("The magic result is {0}", n);
},
(e)=>{
return "sorry, I have no result for you";
})
;
}
}
}

View File

@ -0,0 +1,87 @@
using NUnit.Framework;
using System;
using System.Threading;
namespace ln.threading.tests
{
public class Tests
{
bool setup;
[SetUp]
public void Setup()
{
if (!setup)
{
DynamicThreadPool.DefaultPool.Enqueue(()=>{
while (true)
{
Thread.Sleep(1000);
TestContext.Error.WriteLine("SimpleThreadPool: Threads={0} Idle={1} AvgIdle={5} Tasks={2} Min={3} Max={4}", DynamicThreadPool.DefaultPool.CurrentThreads, DynamicThreadPool.DefaultPool.IdleThreads, DynamicThreadPool.DefaultPool.QueuedTasks, DynamicThreadPool.DefaultPool.MinThreads, DynamicThreadPool.DefaultPool.MaxThreads, DynamicThreadPool.DefaultPool.AverageIdleThreads);
TestContext.Error.Flush();
}
});
setup = true;
}
}
[Test]
public void Test0_SimpleThreadPool()
{
bool success = false;
DynamicThreadPool.DefaultPool.Enqueue(()=>{success=true;});
Thread.Sleep(TimeSpan.FromSeconds(1));
Assert.IsTrue(success);
for (int n=0;n<32;n++)
DynamicThreadPool.DefaultPool.Enqueue(CreateTask(n));
Thread.Sleep(3200);
Assert.AreEqual(0, DynamicThreadPool.DefaultPool.QueuedTasks);
}
public Action CreateTask(int n)
{
return ()=>{
TestContext.Error.WriteLine("STP_task_{0}", n);
TestContext.Error.Flush();
Thread.Sleep(100 * n);
};
}
[Test]
public void Test1()
{
int a = 0;
PoolTimer timer = new PoolTimer(100, ()=>{
lock (this)
{
a++;
TestContext.Error.WriteLine("timer elapsed: {0}", a);
TestContext.Error.Flush();
}
});
timer.Start();
Thread.Sleep(TimeSpan.FromSeconds(2.05));
timer.Stop();
Assert.AreEqual(20, a);
Assert.Pass();
}
[Test]
public void ZZZ_Waiter()
{
Thread.Sleep(30000);
}
[Test]
public void AAA_Waiter()
{
Thread.Sleep(5000);
}
}
}

View File

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0"/>
<ProjectReference Include="../ln.threading/ln.threading.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,145 @@
using System;
using System.Collections.Generic;
using System.Threading;
using ln.logging;
namespace ln.threading
{
public class DynamicThreadPool : IDisposable
{
public static DynamicThreadPool DefaultPool { get; } = new DynamicThreadPool();
public int MinThreads { get; set; } = Environment.ProcessorCount * 2;
public int MaxThreads { get; set; } = Environment.ProcessorCount * 32;
public int IdleWaitTime { get; set; } = 5000;
public int QueuedTasks { get { lock(queuedTasks) return queuedTasks.Count; }}
public int CurrentThreads { get { lock(poolThreads) return poolThreads.Count; }}
public int IdleThreads { get { lock(idleThreads) return idleThreads.Count; }}
Queue<Action> queuedTasks = new Queue<Action>();
HashSet<Thread> poolThreads = new HashSet<Thread>();
HashSet<Thread> idleThreads = new HashSet<Thread>();
double avgIdleThreads;
public double AverageIdleThreads => avgIdleThreads;
Thread threadController;
bool stopController;
public DynamicThreadPool()
{
threadController = new Thread(Controller);
threadController.IsBackground = true;
threadController.Start();
}
void Controller()
{
while (!stopController)
{
Thread.Sleep(100);
lock (threadController)
{
avgIdleThreads = (0.9 * avgIdleThreads) + (0.1 * IdleThreads);
if ((avgIdleThreads - MinThreads) <= 0.0)
{
avgIdleThreads += 1.0;
StartThread();
}
}
}
}
public void Enqueue(Action action)
{
lock (idleThreads)
{
lock (queuedTasks)
queuedTasks.Enqueue(action);
if (idleThreads.Count > 0)
Monitor.Pulse(idleThreads);
else
{
lock (poolThreads)
if (poolThreads.Count < MaxThreads)
{
StartThread();
}
}
}
}
void StartThread()
{
Thread t = new Thread(PoolThread);
t.Start();
}
bool TryDequeueTask(out Action action)
{
lock (queuedTasks)
return queuedTasks.TryDequeue(out action);
}
public void PoolThread()
{
Thread currentThread = Thread.CurrentThread;
try
{
Action action;
currentThread.IsBackground = true;
lock (poolThreads)
poolThreads.Add(currentThread);
while (true)
{
while (TryDequeueTask(out action))
{
try
{
action();
} catch (Exception e)
{
Logging.Log(LogLevel.ERROR, "ThreadPool: task threw exception: {0}", e.ToString());
Logging.Log(e);
}
}
lock (idleThreads)
{
idleThreads.Add(currentThread);
bool wasPulsed = Monitor.Wait(idleThreads, IdleWaitTime);
idleThreads.Remove(currentThread);
lock (threadController)
if (!wasPulsed && (avgIdleThreads - MinThreads)>1.0)
{
avgIdleThreads-=1.0;
break;
}
}
}
} catch (Exception e)
{
Logging.Log(e);
} finally
{
lock (poolThreads)
poolThreads.Remove(currentThread);
}
}
public void Dispose()
{
stopController = true;
}
}
}

View File

@ -107,7 +107,7 @@ namespace ln.threading
return null;
poolJob.Prepare();
queuedJobs.Enqueue(poolJob);
queuedJobs.Enqueue(poolJob);
PulseWaitingThread();
}

View File

@ -0,0 +1,233 @@
using System;
using System.Threading;
namespace ln.threading
{
public class PromiseNotPendingException : Exception {}
public class PromiseRejectedException : Exception
{
public PromiseRejectedException(){}
public PromiseRejectedException(Exception caughtExcpetion) : base("Promise was rejected", caughtExcpetion) {}
}
public enum PromiseState { PENDING, RESOLVED, REJECTED }
public class Promise<T>
{
public event Action<T> Resolved;
public event Action<Exception> Rejected;
public event Action<Promise<T>> Settled;
public bool IsSettled { get; private set; }
public PromiseState State { get; private set; } = PromiseState.PENDING;
T value;
Exception rejectingException;
public Promise() {}
public Promise(Action<Action<T>,Action<Exception>> resolver)
{
Resolve(resolver);
}
void Resolve(Action<Action<T>,Action<Exception>> resolver)
{
DynamicThreadPool.DefaultPool.Enqueue(()=>{
try{
resolver(Resolve, Reject);
} catch (Exception e)
{
Reject(e);
}
});
}
public void Resolve(T value)
{
lock (this)
{
if (State == PromiseState.PENDING)
{
this.value = value;
State = PromiseState.RESOLVED;
IsSettled = true;
Monitor.PulseAll(this);
DynamicThreadPool.DefaultPool.Enqueue(()=> {
Resolved?.Invoke(this.value);
Settled?.Invoke(this);
});
} else
{
throw new PromiseNotPendingException();
}
}
}
public void Reject() => Reject(null);
public void Reject(Exception rejectingException)
{
lock (this)
{
if (State != PromiseState.PENDING)
throw new PromiseNotPendingException();
this.rejectingException = rejectingException;
IsSettled = true;
State = PromiseState.REJECTED;
Monitor.PulseAll(this);
DynamicThreadPool.DefaultPool.Enqueue(()=> {
Rejected?.Invoke(this.rejectingException);
Settled?.Invoke(this);
});
}
}
public T Value {
get {
lock (this)
{
while (State == PromiseState.PENDING)
Monitor.Wait(this);
if (State == PromiseState.REJECTED)
throw new PromiseRejectedException(rejectingException);
if (State == PromiseState.RESOLVED)
return value;
throw new Exception("serious bug in Promise implementation");
}
}
}
public Promise<S> Then<S>(Func<T,S> resolved) => Then(resolved, null);
public Promise<S> Then<S>(Func<T,S> resolved, Func<Exception,S> rejected)
{
lock (this)
{
Promise<S> chainedPromise = new Promise<S>();
Action resolveAction = ()=>{
lock (this)
{
switch (State)
{
case PromiseState.PENDING:
throw new Exception("serious bug in Promise.Then(..)");
case PromiseState.RESOLVED:
try{
chainedPromise.Resolve(resolved(value));
} catch (Exception e)
{
chainedPromise.Reject(e);
}
break;
case PromiseState.REJECTED:
if (rejected == null)
chainedPromise.Reject(rejectingException);
else
{
try{
chainedPromise.Resolve(rejected(rejectingException));
} catch (Exception e)
{
chainedPromise.Reject(e);
}
}
break;
}
}
};
if (IsSettled)
DynamicThreadPool.DefaultPool.Enqueue(resolveAction);
else
Settled += (p) => resolveAction();
return chainedPromise;
}
}
public Promise<T> Catch(Func<Exception,T> rejectedHandler)
{
lock (this)
{
Promise<T> chainedPromise = new Promise<T>();
Action resolveAction = ()=>{
lock (this)
{
switch (State)
{
case PromiseState.PENDING:
throw new Exception("serious bug in Promise.Catch(..)");
case PromiseState.RESOLVED:
chainedPromise.Resolve(value);
break;
case PromiseState.REJECTED:
try
{
chainedPromise.Resolve( rejectedHandler(rejectingException));
} catch (Exception e)
{
chainedPromise.Reject(e);
}
break;
}
}
};
if (IsSettled)
DynamicThreadPool.DefaultPool.Enqueue(resolveAction);
else
Settled += (p) => resolveAction();
return chainedPromise;
}
}
public Promise<T> Finally(Action finallyHandler)
{
Promise<T> chainedPromise = new Promise<T>();
lock (this)
{
Action resolveAction = ()=>{
lock (this)
{
if (State == PromiseState.PENDING)
throw new Exception("serious bug in Promise.Finally(..)");
try{
finallyHandler();
switch (State)
{
case PromiseState.RESOLVED:
chainedPromise.Resolve(value);
break;
case PromiseState.REJECTED:
chainedPromise.Reject(rejectingException);
break;
}
} catch (Exception e)
{
chainedPromise.Reject(e);
}
}
};
if (IsSettled)
DynamicThreadPool.DefaultPool.Enqueue(resolveAction);
else
Settled += (p) => resolveAction();
return chainedPromise;
}
}
}
}

View File

@ -0,0 +1,129 @@
using System;
using System.Threading;
namespace ln.threading
{
public delegate void ElapsedDelegate(PoolTimer timer);
public class PoolTimer : IDisposable
{
public event ElapsedDelegate Elapsed;
TimeSpan interval;
public TimeSpan Interval {
get => interval;
set {
if (value <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(value));
interval = value;
lock (this){
Monitor.Pulse(this);
}
}
}
DateTime nextStart;
public DateTime NextStart => nextStart;
bool canceled = true;
public bool IsActive {
get => !canceled;
private set {
lock (this)
{
if (value && canceled)
Start();
else if (!value && !canceled)
Stop();
}
}
}
public PoolTimer(TimeSpan interval)
{
Interval = interval;
}
public PoolTimer(double interval)
{
Interval = TimeSpan.FromSeconds(interval);
}
public PoolTimer(int interval)
{
Interval = TimeSpan.FromMilliseconds(interval);
}
public PoolTimer(TimeSpan interval, Action action) :this(action)
{
Interval = interval;
}
public PoolTimer(double interval, Action action) :this(action)
{
Interval = TimeSpan.FromSeconds(interval);
}
public PoolTimer(int interval, Action action) :this(action)
{
Interval = TimeSpan.FromMilliseconds(interval);
}
PoolTimer(Action action)
{
Elapsed += (t)=>action();
}
public void Start()
{
lock (this)
{
if (canceled){
canceled = false;
DynamicThreadPool.DefaultPool.Enqueue(TimerThread);
}
}
}
public void Stop()
{
lock (this)
{
if (!canceled)
{
canceled = true;
Monitor.Pulse(this);
}
}
}
void TimerThread()
{
lock (this)
{
nextStart = DateTime.Now + Interval;
DateTime now;
while (!canceled)
{
now = DateTime.Now;
if ((nextStart < now) || (!Monitor.Wait(this, nextStart - now) && !canceled))
{
DynamicThreadPool.DefaultPool.Enqueue(()=>Elapsed?.Invoke(this));
nextStart += Interval;
} else {
nextStart = DateTime.Now + Interval;
}
}
}
}
public void Dispose()
{
Stop();
}
}
}

View File

@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<Version>0.1.0</Version>
<Version>0.2.0</Version>
<Authors>Harald Wolff-Thobaben</Authors>
<Company>l--n.de</Company>
<Description />