Promise implementation

master
Harald Wolff 2019-11-24 14:54:29 +01:00
parent ad9157a0ca
commit a2c3b74b3c
3 changed files with 349 additions and 0 deletions

234
Promise.cs 100644
View File

@ -0,0 +1,234 @@
// /**
// * File: Promise.cs
// * Author: haraldwolff
// *
// * This file and it's content is copyrighted by the Author and / or copyright holder.
// * Any use wihtout proper permission is illegal and may lead to legal actions.
// *
// *
// **/
using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
namespace ln.types
{
public delegate void PromiseEvaluator<T>(Action<T> resolve, Action<object> reject);
public delegate void PromiseEvaluator<I,O>(I value, Action<O> resolve, Action<object> reject);
public delegate void PromiseResolved<T>(T value);
public delegate void PromiseRejected(object error);
public enum PromiseState { PENDING, RESOLVED, REJECTED }
public class Promise<T>
{
public event PromiseResolved<T> OnResolve;
public event PromiseRejected OnReject;
public PromiseState State { get; private set; } = PromiseState.PENDING;
PromiseEvaluator<T> Evaluator;
T resolvedValue;
object reason;
public T Value
{
get
{
if (State != PromiseState.RESOLVED)
throw new InvalidOperationException("promise not (yet) resolved");
return resolvedValue;
}
}
public object Reason
{
get
{
if (State != PromiseState.REJECTED)
throw new InvalidOperationException("promise not (yet) rejected");
return reason;
}
}
private Promise() { }
public Promise(PromiseEvaluator<T> evaluator) : this(evaluator, true) { }
private Promise(PromiseEvaluator<T> evaluator, bool queueJob)
{
Evaluator = evaluator;
if (queueJob)
ThreadPool.QueueUserWorkItem((state) => evaluator(resolve,reject ));
}
public Promise(Promise<T> parent, Action fin)
{
OnResolve += (value) => fin();
OnReject += (error) => fin();
parent.OnResolve += (value) => this.Resolve(value);
parent.OnReject += (error) => this.Reject(error);
}
protected Promise(PromiseResolved<T> onResolve, PromiseRejected onReject)
{
OnResolve += onResolve;
OnReject += onReject;
}
protected Promise(PromiseRejected onReject)
{
OnReject += onReject;
}
public Promise<T> Then(PromiseResolved<T> onResolve) => Then(onResolve, (e) => { });
public Promise<T> Then(PromiseResolved<T> onResolve, PromiseRejected onRejected) => new ChainedPromise<T>(this, (v, res, rej) => { res(v); onResolve(v); }, onRejected);
public Promise<T> Finally(Action action) => new Promise<T>(this, action);
public Promise<S> Then<S>(PromiseEvaluator<T, S> evaluator) => new ChainedPromise<S>(this, evaluator, (e)=>{} );
public Promise<S> Then<S>(PromiseEvaluator<T, S> evaluator, PromiseRejected rejected) => new ChainedPromise<S>(this, evaluator, rejected);
private void resolve(T value)
{
bool resolved;
lock (this)
{
resolved = (State == PromiseState.PENDING);
State = PromiseState.RESOLVED;
resolvedValue = value;
}
if (resolved)
OnResolve?.Invoke(value);
}
private void reject(object reason)
{
bool rejected;
lock (this)
{
rejected = (State == PromiseState.PENDING);
State = PromiseState.REJECTED;
this.reason = reason;
}
if (rejected)
OnReject?.Invoke(reason);
}
public Promise<T> Resolve(T value)
{
lock (this)
{
if (State != PromiseState.PENDING)
throw new InvalidOperationException("Promise already settled");
resolve(value);
}
return this;
}
public Promise<T> Reject(object error)
{
lock (this)
{
if (State != PromiseState.PENDING)
throw new InvalidOperationException("Promise already settled");
reject(error);
}
return this;
}
public static Promise<T[]> All(IEnumerable<Promise<T>> promises)
{
Promise<T>[] sources = promises.ToArray();
T[] results = new T[sources.Length];
int cresolved = 0;
Promise<T[]> promise = new Promise<T[]>();
for (int n = 0; n < sources.Length; n++)
{
Promise<T> p = sources[n];
int nn = n;
lock (p)
{
switch (p.State)
{
case PromiseState.REJECTED:
return promise.Reject(p.Reason);
case PromiseState.RESOLVED:
cresolved++;
results[nn] = p.Value;
break;
case PromiseState.PENDING:
p.OnResolve += (value) => {
cresolved++;
results[nn] = p.Value;
if (cresolved == results.Length)
promise.resolve(results);
};
p.OnReject += (error) => {
promise.Reject(error);
};
break;
}
}
}
if (cresolved == results.Length)
promise.resolve(results);
return promise;
}
public static Promise<T> Race(IEnumerable<Promise<T>> promises)
{
Promise<T>[] sources = promises.ToArray();
Promise<T> promise = new Promise<T>();
lock (promise)
{
for (int n = 0; n < sources.Length; n++)
{
Promise<T> p = sources[n];
int nn = n;
lock (p)
{
switch (p.State)
{
case PromiseState.REJECTED:
return promise.Reject(p.Reason);
case PromiseState.RESOLVED:
return promise.Resolve(p.Value);
case PromiseState.PENDING:
p.OnResolve += (value) => promise.resolve(value);
p.OnReject += (error) => promise.reject(error);
break;
}
}
}
}
return promise;
}
class ChainedPromise<O> : Promise<O>
{
public ChainedPromise(Promise<T> parent, PromiseEvaluator<T,O> evalOnResolve, PromiseRejected onReject)
:base(onReject)
{
parent.OnResolve += (value) => {
evalOnResolve(
value,
(obj) => this.Resolve(obj),
(error) => this.Reject(error)
);
};
parent.OnReject += (error) => this.Reject(error);
}
}
public static implicit operator Promise<T>(T value) => new Promise<T>().Resolve(value);
}
public class Promise : Promise<Object>
{
public Promise(PromiseEvaluator<object> evaluator)
: base(evaluator)
{
}
}
}

View File

@ -129,6 +129,8 @@
<Compile Include="odb\ng\IdentityCache.cs" />
<Compile Include="net\Endpoint.cs" />
<Compile Include="ByteArrayExtensions.cs" />
<Compile Include="Promise.cs" />
<Compile Include="test\PromiseTests.cs" />
</ItemGroup>
<ItemGroup>
<Folder Include="odb\" />

View File

@ -0,0 +1,113 @@
// /**
// * File: PromiseTests.cs
// * Author: haraldwolff
// *
// * This file and it's content is copyrighted by the Author and / or copyright holder.
// * Any use wihtout proper permission is illegal and may lead to legal actions.
// *
// *
// **/
using NUnit.Framework;
using System;
using System.Threading;
namespace ln.types.test
{
[TestFixture()]
public class PromiseTests
{
[Test()]
public void TestCase()
{
object l = new object();
lock (l)
{
new Promise<int>((resolve, reject) =>
{
resolve(2);
})
.Then<double>(
(value, resolve, reject) => {
resolve(value);
})
.Then((value) =>
{
Console.WriteLine("Double Value: {0}",value);
})
.Finally(() => {
lock (l)
{
Monitor.PulseAll(l);
}
});
if (!Monitor.Wait(l, 500))
throw new TimeoutException();
}
}
[Test()]
public void PromiseAll()
{
Promise<int>[] promises = new Promise<int>[5];
for (int n = 0; n < promises.Length; n++)
{
promises[n] = new Promise<int>((resolve, reject) =>
{
ThreadPool.QueueUserWorkItem((state) =>
{
Thread.Sleep(100 * n);
resolve(n);
});
});
}
lock (promises)
{
Promise<int>
.All(promises)
.Then((value) =>
{
lock(promises)
Monitor.PulseAll(promises);
});
if (!Monitor.Wait(promises, 1500))
throw new TimeoutException();
}
}
[Test()]
public void PromiseRace()
{
Promise<int>[] promises = new Promise<int>[5];
for (int n = 0; n < promises.Length; n++)
{
promises[n] = new Promise<int>((resolve, reject) =>
{
ThreadPool.QueueUserWorkItem((state) =>
{
Thread.Sleep(100 * n);
resolve(n);
});
});
}
lock (promises)
{
Promise<int>
.Race(promises)
.Then((value) =>
{
lock (promises)
Monitor.PulseAll(promises);
});
if (!Monitor.Wait(promises, 1500))
throw new TimeoutException();
}
}
}
}