ln.skyscanner/services/EntityService.cs

143 lines
4.1 KiB
C#

using System;
using ln.application;
using ln.application.service;
using System.Threading;
using System.IO;
using ln.types.odb.ng;
using ln.skyscanner.entities;
using ln.logging;
using System.Collections.Generic;
using ln.types.odb.ng.storage;
using System.Linq;
using ln.types.net;
using ln.skyscanner.import.skytron;
namespace ln.skyscanner.services
{
public class EntityService : ApplicationServiceBase
{
public string BasePath { get; private set; }
public SkyScanner.Service CoreService { get; private set; }
public RPC RPCInstance { get; private set; }
public IStorageContainer StorageContainer { get; private set; }
public IStorageContainer StorageSession { get; private set; }
public Mapper SessionMapper { get; private set; }
Queue<object> upsertQueue = new Queue<object>();
public EntityService()
:base("Entity Service")
{
DependOnService<SkyScanner.Service>();
RPCInstance = new RPC(this);
}
public override void ServiceMain(IApplicationInterface applicationInterface)
{
CoreService = Dependency<SkyScanner.Service>();
BasePath = Path.Combine(
CurrentApplicationInterface.Arguments["base-path"].Value,
"entities"
);
Logging.Log(LogLevel.INFO, "Entity Service: Initializing");
StorageContainer = new FSStorageContainer(System.IO.Path.Combine(BasePath, "storage"));
StorageSession = new Session(StorageContainer);
SessionMapper = new Mapper(StorageSession);
StorageContainer.Open();
StorageSession.Open();
CoreService.RPCContainer.Add("entities",RPCInstance);
Ready();
while (!StopRequested)
{
//object o;
//while (
// (o = DequeueUpsert()) != null
// )
//{
// ODB.GetCollection(o.GetType()).Upsert(o);
//}
lock (upsertQueue)
{
if (upsertQueue.Count == 0)
Monitor.Wait(upsertQueue,2500);
}
}
CoreService.RPCContainer.Remove(RPCInstance);
CoreService = null;
SessionMapper = null;
StorageSession.Dispose();
StorageContainer.Dispose();
BasePath = null;
}
public void EnqueueUpsert<T>(T o) where T: class
{
//ObjectCollection<T> col = ODBMapper.GetCollection<T>();
//lock (upsertQueue)
//{
// upsertQueue.Enqueue(o);
// Monitor.Pulse(upsertQueue);
//}
}
public object DequeueUpsert()
{
lock (upsertQueue)
{
if (upsertQueue.Count == 0)
return null;
return upsertQueue.Dequeue();
}
}
public class RPC
{
EntityService EntityService { get; }
public RPC(EntityService entityService)
{
EntityService = entityService;
}
public Node GetNode(Guid nodeID)
{
return EntityService.SessionMapper.Load<Node>(Query.Equals<Node>("ID", nodeID)).FirstOrDefault();
}
public Node[] GetNodes()
{
return EntityService.SessionMapper.Load<Node>().ToArray();
}
public Node CreateNode()
{
Node node = new Node(IPv4.ANY);
EntityService.SessionMapper.Save<Node>(node);
return node;
}
public void SyncSkytron()
{
SkytronImport skytronImport = new SkytronImport(EntityService.CurrentApplicationInterface.Arguments["import-skytron"].Value);
ThreadPool.QueueUserWorkItem((state) => skytronImport.Import(EntityService.SessionMapper));
}
}
}
}