SchedulingPool

master
Harald Wolff 2020-01-07 08:51:28 +01:00
parent 86399306a9
commit a17b1e21c7
3 changed files with 196 additions and 0 deletions

View File

@ -132,6 +132,8 @@
<Compile Include="Promise.cs" />
<Compile Include="test\PromiseTests.cs" />
<Compile Include="threads\TaskQueue.cs" />
<Compile Include="threads\SchedulingPool.cs" />
<Compile Include="test\SchedulingPoolTests.cs" />
</ItemGroup>
<ItemGroup>
<Folder Include="odb\" />

View File

@ -0,0 +1,40 @@
// /**
// * File: SchedulingPoolTests.cs
// * Author: haraldwolff
// *
// * This file and it's content is copyrighted by the Author and / or copyright holder.
// * Any use wihtout proper permission is illegal and may lead to legal actions.
// *
// *
// **/
using NUnit.Framework;
using System;
using ln.types.threads;
using System.Threading;
namespace ln.types.test
{
[TestFixture()]
public class SchedulingPoolTests
{
[Test()]
public void TestCase()
{
int[] c = new int[5];
SchedulingPool schedulingPool = new SchedulingPool();
schedulingPool.Schedule(() => c[0]++, 2000);
schedulingPool.Schedule(() => c[1]++, 3000);
schedulingPool.Schedule(() => c[2]++, 4000);
schedulingPool.Schedule(() => c[3]++, 5000);
schedulingPool.Schedule(() => c[4]++, 6000);
Thread.Sleep(21000);
Assert.AreEqual(10, c[0]);
Assert.AreEqual(7, c[1]);
Assert.AreEqual(5, c[2]);
Assert.AreEqual(4, c[3]);
Assert.AreEqual(3, c[4]);
}
}
}

View File

@ -0,0 +1,154 @@
// /**
// * File: SchedulingPool.cs
// * Author: haraldwolff
// *
// * This file and it's content is copyrighted by the Author and / or copyright holder.
// * Any use wihtout proper permission is illegal and may lead to legal actions.
// *
// *
// **/
using System;
using System.Collections.Generic;
using System.Threading;
using ln.types.btree;
using ln.logging;
namespace ln.types.threads
{
public class SchedulingPool
{
static SchedulingPool defaultPool = null;
static public SchedulingPool Default
{
get
{
if (defaultPool == null)
defaultPool = new SchedulingPool();
return defaultPool;
}
set => defaultPool = value;
}
public Pool ThreadPool { get; private set; }
public int ThreadCount => ThreadPool.PoolSize;
BTreeValueList<double, ScheduledJob> schedule = new BTreeValueList<double, ScheduledJob>();
Thread schedulerThread;
Dictionary<Action, ScheduledJob> scheduledJobs = new Dictionary<Action, ScheduledJob>();
public SchedulingPool()
:this(Environment.ProcessorCount)
{
}
public SchedulingPool(int nThreads)
{
ThreadPool = new Pool(nThreads);
ThreadPool.Start();
schedulerThread = new Thread(scheduler);
schedulerThread.Start();
}
void scheduler()
{
while (true)
{
lock (this)
{
double now = DateTime.Now.ToUnixTimeMilliseconds();
if (schedule.Empty)
{
Monitor.Wait(this);
}
else if (schedule.First <= now)
{
while (!schedule.Empty && (schedule.First <= now))
{
ScheduledJob scheduledJob = schedule.Shift();
ThreadPool.Enqueue(scheduledJob);
scheduledJob.Reschedule();
enqueue(scheduledJob);
}
}
else
{
Monitor.Wait(this, (int)(schedule.First - now));
}
}
}
}
void enqueue(ScheduledJob job)
{
lock (this)
{
if (schedule.TryAdd(job.NextScheduledExecution, job))
Monitor.Pulse(this);
}
}
void dequeue(ScheduledJob job)
{
lock (this)
{
if (schedule.TryRemove(job.NextScheduledExecution, job))
Monitor.Pulse(this);
}
}
public void Schedule(Action action, double scheduledIntervall)
{
if (!scheduledJobs.TryGetValue(action, out ScheduledJob scheduledJob))
{
scheduledJob = new ScheduledJob(action, scheduledIntervall);
scheduledJobs.Add(action, scheduledJob);
}
else
{
scheduledJob.ScheduledInterval = scheduledIntervall;
}
enqueue(scheduledJob);
}
public void Unschedule(Action action)
{
if (scheduledJobs.TryGetValue(action, out ScheduledJob scheduledJob))
{
dequeue(scheduledJob);
}
}
public class ScheduledJob : PoolJob
{
public SchedulingPool Pool { get; private set; }
public Action Action { get; }
public double NextScheduledExecution { get; set; }
public double ScheduledInterval { get; set; }
public ScheduledJob(Action action)
: this(action, 0, DateTime.Now.ToUnixTimeMilliseconds())
{ }
public ScheduledJob(Action action, double scheduledInterval)
: this(action, scheduledInterval, DateTime.Now.ToUnixTimeMilliseconds() + scheduledInterval)
{
}
public ScheduledJob(Action action, double scheduledInterval, double nextScheduledExecution)
{
Action = action;
ScheduledInterval = scheduledInterval;
NextScheduledExecution = nextScheduledExecution;
}
public override void RunJob() => Action.Invoke();
public void Reschedule() => Reschedule(ScheduledInterval);
public void Reschedule(double interval)
{
double currentTime = DateTime.Now.ToUnixTimeMilliseconds();
while (NextScheduledExecution < currentTime)
NextScheduledExecution += ScheduledInterval;
}
}
}
}