From a17b1e21c7519486fd9ff741c40c505d160f609c Mon Sep 17 00:00:00 2001 From: Harald Wolff Date: Tue, 7 Jan 2020 08:51:28 +0100 Subject: [PATCH] SchedulingPool --- ln.types.csproj | 2 + test/SchedulingPoolTests.cs | 40 ++++++++++ threads/SchedulingPool.cs | 154 ++++++++++++++++++++++++++++++++++++ 3 files changed, 196 insertions(+) create mode 100644 test/SchedulingPoolTests.cs create mode 100644 threads/SchedulingPool.cs diff --git a/ln.types.csproj b/ln.types.csproj index cae801c..b0af7cb 100644 --- a/ln.types.csproj +++ b/ln.types.csproj @@ -132,6 +132,8 @@ + + diff --git a/test/SchedulingPoolTests.cs b/test/SchedulingPoolTests.cs new file mode 100644 index 0000000..43513b5 --- /dev/null +++ b/test/SchedulingPoolTests.cs @@ -0,0 +1,40 @@ +// /** +// * File: SchedulingPoolTests.cs +// * Author: haraldwolff +// * +// * This file and it's content is copyrighted by the Author and / or copyright holder. +// * Any use wihtout proper permission is illegal and may lead to legal actions. +// * +// * +// **/ +using NUnit.Framework; +using System; +using ln.types.threads; +using System.Threading; +namespace ln.types.test +{ + [TestFixture()] + public class SchedulingPoolTests + { + [Test()] + public void TestCase() + { + int[] c = new int[5]; + SchedulingPool schedulingPool = new SchedulingPool(); + + schedulingPool.Schedule(() => c[0]++, 2000); + schedulingPool.Schedule(() => c[1]++, 3000); + schedulingPool.Schedule(() => c[2]++, 4000); + schedulingPool.Schedule(() => c[3]++, 5000); + schedulingPool.Schedule(() => c[4]++, 6000); + + Thread.Sleep(21000); + + Assert.AreEqual(10, c[0]); + Assert.AreEqual(7, c[1]); + Assert.AreEqual(5, c[2]); + Assert.AreEqual(4, c[3]); + Assert.AreEqual(3, c[4]); + } + } +} diff --git a/threads/SchedulingPool.cs b/threads/SchedulingPool.cs new file mode 100644 index 0000000..0dcdc2d --- /dev/null +++ b/threads/SchedulingPool.cs @@ -0,0 +1,154 @@ +// /** +// * File: SchedulingPool.cs +// * Author: haraldwolff +// * +// * This file and it's content is copyrighted by the Author and / or copyright holder. +// * Any use wihtout proper permission is illegal and may lead to legal actions. +// * +// * +// **/ +using System; +using System.Collections.Generic; +using System.Threading; +using ln.types.btree; +using ln.logging; +namespace ln.types.threads +{ + + public class SchedulingPool + { + static SchedulingPool defaultPool = null; + static public SchedulingPool Default + { + get + { + if (defaultPool == null) + defaultPool = new SchedulingPool(); + + return defaultPool; + } + set => defaultPool = value; + } + + public Pool ThreadPool { get; private set; } + public int ThreadCount => ThreadPool.PoolSize; + + BTreeValueList schedule = new BTreeValueList(); + Thread schedulerThread; + + Dictionary scheduledJobs = new Dictionary(); + + public SchedulingPool() + :this(Environment.ProcessorCount) + { + } + public SchedulingPool(int nThreads) + { + ThreadPool = new Pool(nThreads); + ThreadPool.Start(); + + schedulerThread = new Thread(scheduler); + schedulerThread.Start(); + } + + void scheduler() + { + while (true) + { + lock (this) + { + double now = DateTime.Now.ToUnixTimeMilliseconds(); + if (schedule.Empty) + { + Monitor.Wait(this); + } + else if (schedule.First <= now) + { + while (!schedule.Empty && (schedule.First <= now)) + { + ScheduledJob scheduledJob = schedule.Shift(); + ThreadPool.Enqueue(scheduledJob); + scheduledJob.Reschedule(); + enqueue(scheduledJob); + } + } + else + { + Monitor.Wait(this, (int)(schedule.First - now)); + } + } + } + } + + void enqueue(ScheduledJob job) + { + lock (this) + { + if (schedule.TryAdd(job.NextScheduledExecution, job)) + Monitor.Pulse(this); + } + } + void dequeue(ScheduledJob job) + { + lock (this) + { + if (schedule.TryRemove(job.NextScheduledExecution, job)) + Monitor.Pulse(this); + } + } + + public void Schedule(Action action, double scheduledIntervall) + { + if (!scheduledJobs.TryGetValue(action, out ScheduledJob scheduledJob)) + { + scheduledJob = new ScheduledJob(action, scheduledIntervall); + scheduledJobs.Add(action, scheduledJob); + } + else + { + scheduledJob.ScheduledInterval = scheduledIntervall; + } + enqueue(scheduledJob); + } + public void Unschedule(Action action) + { + if (scheduledJobs.TryGetValue(action, out ScheduledJob scheduledJob)) + { + dequeue(scheduledJob); + } + } + + public class ScheduledJob : PoolJob + { + public SchedulingPool Pool { get; private set; } + public Action Action { get; } + + public double NextScheduledExecution { get; set; } + public double ScheduledInterval { get; set; } + + public ScheduledJob(Action action) + : this(action, 0, DateTime.Now.ToUnixTimeMilliseconds()) + { } + public ScheduledJob(Action action, double scheduledInterval) + : this(action, scheduledInterval, DateTime.Now.ToUnixTimeMilliseconds() + scheduledInterval) + { + } + public ScheduledJob(Action action, double scheduledInterval, double nextScheduledExecution) + { + Action = action; + ScheduledInterval = scheduledInterval; + NextScheduledExecution = nextScheduledExecution; + } + + public override void RunJob() => Action.Invoke(); + + public void Reschedule() => Reschedule(ScheduledInterval); + public void Reschedule(double interval) + { + double currentTime = DateTime.Now.ToUnixTimeMilliseconds(); + while (NextScheduledExecution < currentTime) + NextScheduledExecution += ScheduledInterval; + } + } + } +}