Initial Commit

master
Harald Wolff-Thobaben 2020-11-18 00:23:20 +01:00
commit 818f2a2a7a
13 changed files with 1067 additions and 0 deletions

41
.gitignore vendored 100644
View File

@ -0,0 +1,41 @@
# Autosave files
*~
# build
[Oo]bj/
[Bb]in/
packages/
TestResults/
# globs
Makefile.in
*.DS_Store
*.sln.cache
*.suo
*.cache
*.pidb
*.userprefs
*.usertasks
config.log
config.make
config.status
aclocal.m4
install-sh
autom4te.cache/
*.user
*.tar.gz
tarballs/
test-results/
Thumbs.db
.vs/
# Mac bundle stuff
*.dmg
*.app
# resharper
*_Resharper.*
*.Resharper
# dotCover
*.dotCover

124
CCALock.cs 100644
View File

@ -0,0 +1,124 @@
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
namespace ln.threading
{
public enum CCAState {
READ, // Read access may be acquired
WAIT, // Write access is to be acquired after current reads are released
WRITE // Write access is acquired
}
public class CCALock
{
public CCAState State { get; private set; }
public IEnumerable<Lock> Waiting => waiting;
public IEnumerable<Lock> Locks => locks;
HashSet<Lock> locks = new HashSet<Lock>();
Lock writeWait;
HashSet<Lock> waiting = new HashSet<Lock>();
public CCALock()
{}
public Lock AcquireRead() => AcquireLock(false, -1);
public Lock AcquireWrite() => AcquireLock(true, -1);
public Lock AcquireRead(int timeout) => AcquireLock(false, timeout);
public Lock AcquireWrite(int timeout) => AcquireLock(true, timeout);
public Lock AcquireLock(bool writeaccess,int timeout)
{
Lock lck = new Lock(this);
lock (this)
{
WaitForRead(lck, timeout);
if (writeaccess)
{
writeWait = lck;
if (locks.Count > 0)
{
State = CCAState.WAIT;
lock (writeWait)
{
Monitor.Exit(this);
if (!Monitor.Wait(writeWait, timeout))
{
Monitor.Enter(this);
State = CCAState.READ;
throw new TimeoutException();
}
Monitor.Enter(this);
}
}
State = CCAState.WRITE;
}
locks.Add(lck);
lck.Acquired = true;
}
return lck;
}
private void WaitForRead(Lock lck,int timeout)
{
while (State != CCAState.READ)
{
waiting.Add(lck);
if (!Monitor.Wait(this, timeout))
throw new TimeoutException();
waiting.Remove(lck);
}
}
public void ReleaseLock(Lock lck)
{
lock (this)
{
locks.Remove(lck);
if ((State == CCAState.WAIT) && (locks.Count == 0))
{
lock (writeWait)
{
Monitor.Pulse(writeWait);
}
}
else if (State == CCAState.WRITE)
{
if (writeWait != lck)
throw new LockingException();
writeWait = null;
State = CCAState.READ;
Monitor.PulseAll(this);
}
}
}
public class Lock : IDisposable
{
public CCALock CCALock { get; }
public bool Acquired { get; internal set; }
public Lock(CCALock cca)
{
CCALock = cca;
}
public void Dispose()
{
if (Acquired)
CCALock.ReleaseLock(this);
}
}
}
}

20
DisposableLock.cs 100644
View File

@ -0,0 +1,20 @@
using System;
using System.Threading;
namespace ln.threading
{
public class DisposableLock : IDisposable
{
public object LockedObject { get; }
public DisposableLock(object lockedObject)
{
LockedObject = lockedObject;
Monitor.Enter(lockedObject);
}
public void Dispose()
{
Monitor.Exit(LockedObject);
}
}
}

61
DynamicPool.cs 100644
View File

@ -0,0 +1,61 @@
using System;
using System.Threading;
namespace ln.threading
{
public class DynamicPool : Pool
{
public int Timeout { get; set; }
public DynamicPool() :this(Environment.ProcessorCount){}
public DynamicPool(int maxPoolSize)
:base(maxPoolSize)
{
Timeout = 15000;
}
public override void Start()
{
if ((State == PoolState.RUN) || (State == PoolState.SHUTDOWN))
throw new NotSupportedException("Pool can only be started if not running");
State = PoolState.RUN;
}
protected override PoolJob WaitForJob(PoolThread poolThread)
{
if (State == PoolState.RUN)
{
lock (waitingThreads)
{
PoolJob poolJob = Dequeue(poolThread);
if (poolJob != null)
return poolJob;
waitingThreads.Add(poolThread);
Monitor.Wait(waitingThreads, Timeout);
waitingThreads.Remove(poolThread);
poolJob = Dequeue(poolThread);
return poolJob;
}
}
return null;
}
protected override void PulseWaitingThread()
{
lock (waitingThreads)
{
if (waitingThreads.Count > 0)
{
Monitor.Pulse(waitingThreads);
}
else if (CurrentPoolSize < PoolSize)
{
CreatePoolThread();
}
}
}
}
}

View File

@ -0,0 +1,10 @@
using System;
namespace ln.threading
{
public class LockingException : Exception
{
public LockingException()
{
}
}
}

278
Pool.cs 100644
View File

@ -0,0 +1,278 @@
using System;
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using ln.logging;
namespace ln.threading
{
public enum PoolThreadState { READY, WORKING, EXITED }
public delegate void PoolJobFinished(Pool pool, PoolJob job);
public enum PoolState
{
INITIALIZE, // Pool Instanz erstellt
RUN, // Pool ist in Betrieb
SHUTDOWN, // Pool wird heruntergefahren (es warten noch laufende Jobs)
STOPPED // Pool wurde beendet, alle Resourcen freigegeben
}
public partial class Pool : IDisposable
{
public static bool DEBUG = false;
public PoolState State { get; protected set; } = PoolState.INITIALIZE;
public event PoolJobFinished PoolJobFinished;
public PoolThread[] PoolThreads => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.ToArray());
public int PoolSize { get; }
public int CurrentPoolSize => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Count);
public PoolJob[] CurrentPoolJobs => ThreadHelpers.GetLockedValue(poolThreads, () => poolThreads.Select((t) => t.CurrentJob).Where((j) => j != null).ToArray());
public PoolJob[] QueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.ToArray());
public int NumQueuedJobs => ThreadHelpers.GetLockedValue(waitingThreads, () => queuedJobs.Count);
private List<PoolThread> poolThreads = new List<PoolThread>();
protected HashSet<PoolThread> workingThreads = new HashSet<PoolThread>();
protected HashSet<PoolThread> waitingThreads = new HashSet<PoolThread>();
private Queue<PoolJob> queuedJobs = new Queue<PoolJob>();
public Pool() : this(Environment.ProcessorCount){}
public Pool(int numThreads)
{
PoolSize = numThreads;
}
public virtual void Start()
{
if ((State == PoolState.RUN) || (State == PoolState.SHUTDOWN))
throw new NotSupportedException("Pool can only be started if not running");
State = PoolState.RUN;
for (int n = 0; n < PoolSize; n++)
{
CreatePoolThread();
}
}
public virtual void Stop() => Stop(false);
public virtual void Stop(bool abort)
{
if (State != PoolState.RUN)
throw new NotSupportedException("Pool must be running to be able to stop");
State = PoolState.SHUTDOWN;
lock (waitingThreads)
{
queuedJobs.Clear();
}
if (abort)
Abort();
while (CurrentPoolSize > ((PoolThread.CurrentPoolThread.Value?.Pool == this) ? 1 : 0))
{
try
{
lock (waitingThreads)
Monitor.PulseAll(waitingThreads);
Thread.Sleep(100);
}
catch (ThreadInterruptedException)
{ }
}
State = PoolState.STOPPED;
}
public void Enqueue(JobDelegate job) => Enqueue((poolJob) => job());
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate) => Enqueue(extendedJobDelegate, "N/A");
public PoolJob Enqueue(ExtendedJobDelegate extendedJobDelegate, string jobName) => EnqueuePoolJob(new PoolJob(jobName, extendedJobDelegate));
public bool Enqueue(PoolJob poolJob) => EnqueuePoolJob(poolJob) != null;
protected virtual PoolJob EnqueuePoolJob(PoolJob poolJob)
{
if (State != PoolState.RUN)
throw new NotSupportedException("Pool not running");
lock (waitingThreads)
{
if (queuedJobs.Contains(poolJob) || CurrentPoolJobs.Contains(poolJob))
return null;
poolJob.Prepare();
queuedJobs.Enqueue(poolJob);
PulseWaitingThread();
}
return poolJob;
}
protected virtual void PulseWaitingThread()
{
lock (waitingThreads)
Monitor.Pulse(waitingThreads);
}
public virtual int Enqueue(IEnumerable<PoolJob> poolJobs)
{
if (State != PoolState.RUN)
throw new NotSupportedException("Pool not running");
int count = 0;
lock (waitingThreads)
{
HashSet<PoolJob> currentPoolJobs = new HashSet<PoolJob>(CurrentPoolJobs);
foreach (PoolJob j in queuedJobs)
currentPoolJobs.Add(j);
foreach (PoolJob poolJob in poolJobs)
{
if (!currentPoolJobs.Contains(poolJob))
{
poolJob.Prepare();
queuedJobs.Enqueue(poolJob);
count++;
}
}
Monitor.PulseAll(waitingThreads);
}
return count;
}
protected virtual PoolJob Dequeue() => Dequeue(PoolThread.CurrentPoolThread?.Value);
protected virtual PoolJob Dequeue(PoolThread poolThread)
{
if (poolThread == null)
throw new NullReferenceException();
if (State == PoolState.RUN)
{
lock (waitingThreads)
{
if (queuedJobs.Count > 0)
return queuedJobs.Dequeue();
}
}
return null;
}
protected virtual PoolJob WaitForJob(PoolThread poolThread)
{
while (State == PoolState.RUN)
{
lock (waitingThreads)
{
PoolJob poolJob = Dequeue(poolThread);
if (poolJob != null)
return poolJob;
waitingThreads.Add(poolThread);
Monitor.Wait(waitingThreads);
waitingThreads.Remove(poolThread);
}
}
return null;
}
protected virtual void WorkerThreadLoop()
{
PoolThread poolThread = PoolThread.CurrentPoolThread.Value;
if (poolThread == null)
throw new NotSupportedException("Pool.WorkerThreadLoop(): Must be called from PoolThread");
lock (poolThreads)
poolThreads.Add(poolThread);
try
{
while (State == PoolState.RUN)
{
PoolJob poolJob = WaitForJob(poolThread);
if (poolJob == null)
{
break;
}
else
{
lock (workingThreads)
workingThreads.Add(poolThread);
try
{
poolJob.Run(this);
if (PoolJobFinished != null)
PoolJobFinished(this,poolJob);
}
finally
{
lock (workingThreads)
workingThreads.Remove(poolThread);
}
}
}
}
catch (PoolThreadMustExitException)
{}
catch (Exception e)
{
Logging.Log(e);
}
finally
{
lock (poolThreads)
poolThreads.Remove(poolThread);
}
}
public void Abort()
{
lock (poolThreads)
{
foreach (PoolThread poolThread in poolThreads)
poolThread?.CurrentJob?.RequestAbort();
}
}
private bool AmIPoolThread() => PoolThread.CurrentPoolThread.Value != null;
public PoolThread FindPoolThread(PoolJob job)
{
lock (poolThreads)
{
foreach (PoolThread poolThread in poolThreads)
{
if (poolThread.CurrentJob == job)
return poolThread;
}
}
return null;
}
protected virtual PoolThread CreatePoolThread()
{
PoolThread poolThread = new PoolThread(WorkerThreadLoop);
return poolThread;
}
public void Dispose()
{
if (State == PoolState.RUN)
Stop();
}
}
}

172
PoolJob.cs 100644
View File

@ -0,0 +1,172 @@
using System;
using ln.logging;
using System.Threading;
namespace ln.threading
{
public delegate void JobDelegate();
public delegate void ExtendedJobDelegate(PoolJob poolJob);
public enum PoolJobState { PREPARED, RUNNING, DONE, FAILED };
public class PoolJob
{
public string Name { get; set; }
public JobDelegate Job { get; }
public ExtendedJobDelegate ExtendedJob { get; }
public double Progress { get; set; }
public string State { get; set; }
public PoolJobState JobState { get; private set; }
public bool JobAbortRequested { get; private set; }
private Pool Pool { get; set; }
protected PoolJob()
{
JobState = PoolJobState.PREPARED;
}
public PoolJob(JobDelegate job)
: this()
{
Job = job;
ExtendedJob = null;
Name = "N/A";
}
public PoolJob(ExtendedJobDelegate job)
: this()
{
Job = null;
ExtendedJob = job;
Name = "N/A";
}
public PoolJob(String name, ExtendedJobDelegate job)
: this()
{
Job = null;
ExtendedJob = job;
Name = name;
}
public void RequestAbort()
{
JobAbortRequested = true;
if (Pool != null)
{
PoolThread poolThread = Pool.FindPoolThread(this);
if (poolThread != null)
{
if (poolThread.Thread.ThreadState == ThreadState.WaitSleepJoin)
poolThread.Thread.Interrupt();
}
}
}
public void ForceFail()
{
JobState = PoolJobState.FAILED;
}
public void setState(string state,params object[] p)
{
State = String.Format(state, p);
}
public virtual void RunJob()
{
if (Job != null)
{
Job();
}
else if (ExtendedJob != null)
{
ExtendedJob(this);
}
else
{
Logging.Log(LogLevel.ERROR, "PoolJob without JobDelegate tried to run");
JobState = PoolJobState.FAILED;
}
}
public void Run(Pool pool)
{
Pool = pool;
try
{
JobState = PoolJobState.RUNNING;
RunJob();
JobState = PoolJobState.DONE;
if (Pool.DEBUG && (State != null) && !String.Empty.Equals(State))
Logging.Log(LogLevel.DEBUG, "PoolJob exited normally with State={0}", State);
}
catch (Exception e)
{
JobState = PoolJobState.FAILED;
Logging.Log(LogLevel.ERROR, "PoolJob: {0} caught exception {1}", this, e);
Logging.Log(e);
}
Pool = null;
}
public void SplitJob(PoolJob[] jobs)
{
if (Pool == null)
throw new ArgumentNullException();
foreach (PoolJob job in jobs)
{
if (!Pool.Enqueue(job))
{
Logging.Log(LogLevel.ERROR, "PoolJob.SplitJob(): Failed to enqueue splitted Job");
foreach (PoolJob _job in jobs)
{
_job.RequestAbort();
}
throw new Exception("Failed to enqueue splitted Job");
}
}
while (true)
{
int nFinished = 0, nError = 0;
double progress = 0.0;
foreach (PoolJob job in jobs)
{
progress += job.Progress;
switch (job.JobState)
{
case PoolJobState.DONE:
nFinished++;
break;
case PoolJobState.FAILED:
nError++;
break;
}
if (JobAbortRequested)
job.RequestAbort();
}
Progress = progress / jobs.Length;
if ((nError + nFinished) == jobs.Length)
break;
Thread.Sleep(1000);
}
}
public virtual void Prepare()
{
}
}
}

79
PoolThread.cs 100644
View File

@ -0,0 +1,79 @@
using System;
using System.Threading;
namespace ln.threading
{
public class PoolThreadMustExitException : Exception
{}
public class PoolThread
{
public static ThreadLocal<PoolThread> CurrentPoolThread { get; } = new ThreadLocal<PoolThread>();
public Pool Pool { get; private set; }
public Thread Thread { get; }
public PoolJob CurrentJob { get; private set; }
private Action ThreadLoop;
public PoolThread(Action threadLoop)
{
ThreadLoop = threadLoop;
Thread = new Thread(thread);
Thread.Start();
}
private void thread()
{
lock (this)
{
CurrentPoolThread.Value = this;
}
ThreadLoop();
lock (this)
{
CurrentPoolThread.Value = null;
}
}
//while (Pool.State != PoolState.SHUTDOWN)
//{
// PoolJob poolJob = null;
// try
// {
// poolJob = Pool.Dequeue();
// }
// catch (TimeoutException)
// {
// break;
// }
// if (poolJob != null)
// {
// State = PoolThreadState.WORKING;
// CurrentJob = poolJob;
// CurrentJob.Run(Pool);
// if (Pool.PoolJobFinished != null)
// Pool.PoolJobFinished.Invoke(Pool, CurrentJob);
// CurrentJob = null;
// State = PoolThreadState.READY;
// }
//}
//CurrentPoolThread.Value = null;
//State = PoolThreadState.EXITED;
//lock (Pool.poolThreads)
//{
// Pool.poolThreads.Remove(this);
// Monitor.Pulse(Pool.poolThreads);
//}
}
}

162
SchedulingPool.cs 100644
View File

@ -0,0 +1,162 @@
// /**
// * File: SchedulingPool.cs
// * Author: haraldwolff
// *
// * This file and it's content is copyrighted by the Author and / or copyright holder.
// * Any use wihtout proper permission is illegal and may lead to legal actions.
// *
// *
// **/
using System;
using System.Collections.Generic;
using System.Threading;
using ln.collections;
using ln.type;
namespace ln.threading
{
public class SchedulingPool
{
static SchedulingPool defaultPool = null;
static public SchedulingPool Default
{
get
{
if (defaultPool == null)
defaultPool = new SchedulingPool();
return defaultPool;
}
set => defaultPool = value;
}
public Pool ThreadPool { get; private set; }
public int ThreadCount => ThreadPool.PoolSize;
BTreeValueList<double, ScheduledJob> schedule = new BTreeValueList<double, ScheduledJob>();
Thread schedulerThread;
Dictionary<Action, ScheduledJob> scheduledJobs = new Dictionary<Action, ScheduledJob>();
public SchedulingPool()
:this(Environment.ProcessorCount)
{
}
public SchedulingPool(int nThreads)
{
ThreadPool = new Pool(nThreads);
ThreadPool.Start();
lock (this)
{
schedulerThread = new Thread(scheduler);
schedulerThread.Start();
Monitor.Wait(this);
}
}
void scheduler()
{
lock (this)
{
Monitor.PulseAll(this);
while (true)
{
double now = DateTime.Now.ToUnixTimeMilliseconds();
if (schedule.Empty)
{
Monitor.Wait(this);
}
else if (schedule.First <= now)
{
while (!schedule.Empty && (schedule.First <= now))
{
ScheduledJob scheduledJob = schedule.Shift();
ThreadPool.Enqueue(scheduledJob);
scheduledJob.Reschedule();
enqueue(scheduledJob);
}
}
else
{
Monitor.Wait(this, (int)(schedule.First - now));
}
}
}
}
void enqueue(ScheduledJob job)
{
lock (this)
{
if (schedule.TryAdd(job.NextScheduledExecution, job))
Monitor.Pulse(this);
}
}
void dequeue(ScheduledJob job)
{
lock (this)
{
if (schedule.TryRemove(job.NextScheduledExecution, job))
Monitor.Pulse(this);
}
}
public void Schedule(Action action, double scheduledIntervall)
{
if (!scheduledJobs.TryGetValue(action, out ScheduledJob scheduledJob))
{
scheduledJob = new ScheduledJob(action, scheduledIntervall);
scheduledJobs.Add(action, scheduledJob);
}
else
{
scheduledJob.ScheduledInterval = scheduledIntervall;
}
enqueue(scheduledJob);
}
public void Unschedule(Action action)
{
if (scheduledJobs.TryGetValue(action, out ScheduledJob scheduledJob))
{
dequeue(scheduledJob);
}
}
public class ScheduledJob : PoolJob
{
public SchedulingPool Pool { get; private set; }
public Action Action { get; }
public double NextScheduledExecution { get; set; }
public double ScheduledInterval { get; set; }
public ScheduledJob(Action action)
: this(action, 0, DateTime.Now.ToUnixTimeMilliseconds())
{ }
public ScheduledJob(Action action, double scheduledInterval)
: this(action, scheduledInterval, DateTime.Now.ToUnixTimeMilliseconds() + scheduledInterval)
{
}
public ScheduledJob(Action action, double scheduledInterval, double nextScheduledExecution)
{
Action = action;
ScheduledInterval = scheduledInterval;
NextScheduledExecution = nextScheduledExecution;
}
public override void RunJob() => Action.Invoke();
public void Reschedule() => Reschedule(ScheduledInterval);
public void Reschedule(double interval)
{
double currentTime = DateTime.Now.ToUnixTimeMilliseconds();
while (NextScheduledExecution < currentTime)
NextScheduledExecution += ScheduledInterval;
}
}
}
}

44
TaskQueue.cs 100644
View File

@ -0,0 +1,44 @@
// /**
// * File: TaskQueue.cs
// * Author: haraldwolff
// *
// * This file and it's content is copyrighted by the Author and / or copyright holder.
// * Any use wihtout proper permission is illegal and may lead to legal actions.
// *
// *
// **/
using System;
using ln.collections;
using System.Linq;
namespace ln.threading
{
public class TaskQueue
{
BTreeValueList<int, Action> queue = new BTreeValueList<int, Action>();
public TaskQueue()
{
}
public bool Empty => queue.Keys.Count() == 0;
public void Enqueue(Action action) => Enqueue(0, action);
public void Enqueue(int priority,Action action)
{
lock (queue)
{
queue.TryRemove(priority, action);
queue.Add(priority, action);
}
}
public Action Dequeue()
{
lock (queue)
{
return queue.Shift();
}
}
}
}

16
ThreadHelpers.cs 100644
View File

@ -0,0 +1,16 @@
using System;
namespace ln.threading
{
public static class ThreadHelpers
{
public static T GetLockedValue<T>(object lockObject,Func<T> getter)
{
lock (lockObject)
{
return getter();
}
}
}
}

47
Timing.cs 100644
View File

@ -0,0 +1,47 @@
// /**
// * File: Timing.cs
// * Author: haraldwolff
// *
// * This file and it's content is copyrighted by the Author and / or copyright holder.
// * Any use wihtout proper permission is illegal and may lead to legal actions.
// *
// *
// **/
using System;
using System.Diagnostics;
using ln.logging;
namespace ln.threading
{
public static class Timing
{
public delegate void VoidDelegate();
public delegate T TypedDelegate<T>();
public static void Meassure(VoidDelegate f) => Meassure("", f);
public static void Meassure(String prefix,VoidDelegate f)
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
f();
stopwatch.Stop();
Logging.Log(LogLevel.DEBUG, "Timing({1}): {0}ms", stopwatch.ElapsedMilliseconds,prefix);
}
public static T Meassure<T>(TypedDelegate<T> f)
{
return Meassure("", f);
}
public static T Meassure<T>(String prefix,TypedDelegate<T> f)
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
T r = f();
stopwatch.Stop();
Logging.Log(LogLevel.DEBUG, "Timing({1}): {0}ms", stopwatch.ElapsedMilliseconds,prefix);
return r;
}
}
}

View File

@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\ln.collections\ln.collections.csproj" />
<ProjectReference Include="..\ln.logging\ln.logging.csproj" />
<ProjectReference Include="..\ln.type\ln.type.csproj" />
</ItemGroup>
</Project>