ln.types/odb/ODBCollection.DocumentIndex.cs

338 lines
12 KiB
C#

using System.Collections;
using System.Collections.Generic;
using ln.types.odb.values;
using System.IO;
using ln.logging;
using ln.types.btree;
using System.Linq;
using System;
using ln.types.odb.index;
namespace ln.types.odb
{
public partial class ODBCollection
{
public partial class DocumentIndex : IEnumerable<ODBValue>
{
public ODBCollection Collection { get; }
public Stream StorageStream { get; }
public int Count => idLookup.Count;
private Dictionary<ODBValue, Area> idLookup = new Dictionary<ODBValue, Area>();
private AvailableAreas availableAreas;
private FileLogger transactionLogger = null;
private Dictionary<string, PropertyIndex> propertyIndeces = new Dictionary<string, PropertyIndex>();
public DocumentIndex(Stream stream,ODBCollection collection)
{
Collection = collection;
StorageStream = stream;
availableAreas = new AvailableAreas(this);
if (Environment.GetCommandLineArgs().Contains("--odb-transaction-log"))
EnableTransactionLog();
Initialize();
}
private void Initialize()
{
FileLogger dump = this.Collection.ODB.DumpCollectionLoading ? new FileLogger(Path.Combine(Collection.ODB.BasePath, String.Format("{0}.dump", Collection.CollectionName))) : null;
long offset = 0;
while (offset < StorageStream.Length)
{
StorageStream.Position = offset;
int asize = StorageStream.ReadInteger();
ODBValue documentID = ODBValue.Read(StorageStream);
Area area = new Area(offset, asize, documentID);
Log("INIT: O:0x{0:X8} S:0x{1:X8} N:0x{2:X8} {3}", area.Offset, area.Size, area.NextOffset, area.DocumentID);
if (dump != null)
{
dump.Log(LogLevel.INFO, "AREA SCAN: {0}", area);
dump.Log(LogLevel.INFO, "AREA DOCUMENT KEY: {0}", area.DocumentID);
dump.Log(LogLevel.INFO, "AREA DOCUMENT:\n{0}", Load(area).HexDump());
}
if (ODBNull.Instance.Equals(documentID))
availableAreas.AddReleased(area);
else if (idLookup.ContainsKey(documentID))
{
byte[] aBytes = Load(idLookup[documentID]);
ODBDocument aDoc = new ODBDocument(aBytes, 0, aBytes.Length);
byte[] bBytes = Load(area);
ODBDocument bDoc = new ODBDocument(bBytes, 0, bBytes.Length);
if (aDoc.StorageTimeStamp < bDoc.StorageTimeStamp)
{
availableAreas.GiveBack(idLookup[documentID]);
idLookup[documentID] = area;
}
else
{
availableAreas.GiveBack(area);
}
}
else
{
idLookup.Add(documentID, area);
}
offset = area.NextOffset;
}
}
public void Close()
{
if (transactionLogger != null)
{
transactionLogger.Close();
transactionLogger.Dispose();
}
}
public void EnableTransactionLog()
{
if (transactionLogger == null)
{
transactionLogger = new FileLogger(Path.Combine(Collection.ODB.BasePath, string.Format("{0}.log",Collection.CollectionName)));
}
}
public void EnsureIndex(string propertyPath, string indexName = null)
{
if (indexName == null)
indexName = propertyPath;
EnsureIndeces(new string[] { propertyPath });
//lock (this)
//{
// if (!propertyIndeces.ContainsKey(indexName))
// {
// PropertyIndex propertyIndex = new PropertyIndex(indexName,propertyPath);
// propertyIndeces.Add(indexName, propertyIndex);
// foreach (ODBValue documentID in this)
// {
// ODBDocument doc = LoadDocument(documentID);
// propertyIndex.UpdateIndex(doc);
// }
// }
//}
}
public void EnsureIndeces(string[] propertyPaths,bool unique = false)
{
Logging.Log(LogLevel.INFO, "Ensuring Indeces: {0} over {1} documents", string.Join(", ", propertyPaths), Count);
lock (this)
{
List<PropertyIndex> indeces = new List<PropertyIndex>();
foreach (string propertyPath in propertyPaths)
if (unique)
indeces.Add(new UniqueIndex(propertyPath));
else
indeces.Add(new PropertyIndex(propertyPath));
foreach (ODBValue documentID in this)
{
ODBDocument doc = LoadDocument(documentID);
foreach (PropertyIndex propertyIndex in indeces)
propertyIndex.UpdateIndex(doc);
}
foreach (PropertyIndex propertyIndex in indeces)
propertyIndeces.Add(propertyIndex.Name, propertyIndex);
}
Logging.Log(LogLevel.INFO, "Ensured Indeces: {0}", string.Join(", ", propertyPaths));
}
public void EnsureUniqueness(params string[] propertyPaths)
{
Logging.Log(LogLevel.INFO, "Ensuring Uniqueness: {0} over {1} documents", string.Join(", ", propertyPaths), Count);
lock (this)
{
ComplexUniqueIndex complexUniqueIndex = new ComplexUniqueIndex(propertyPaths);
foreach (ODBValue documentID in this)
{
ODBDocument doc = LoadDocument(documentID);
complexUniqueIndex.UpdateIndex(doc);
}
propertyIndeces.Add(complexUniqueIndex.Name,complexUniqueIndex);
}
Logging.Log(LogLevel.INFO, "Ensured Uniqueness: {0}", string.Join(", ", propertyPaths));
}
public IEnumerable<ODBValue> Query(string propertyName, Predicate<ODBValue> predicate)
{
if (propertyIndeces.ContainsKey(propertyName))
{
return propertyIndeces[propertyName].Where(predicate);
}
else
{
return this.Where((docId) => { ODBDocument doc = LoadDocument(docId); return predicate(doc[propertyName]); });
}
}
public IEnumerable<ODBValue> Query(string propertyName,ODBValue value)
{
if (propertyIndeces.ContainsKey(propertyName))
{
return propertyIndeces[propertyName].Find(value);
}
else
{
return this.Where((docId) => { ODBDocument doc = LoadDocument(docId); return value.Equals(doc[propertyName]); } );
}
}
public ODBDocument LoadDocument(ODBValue documentID)
{
byte[] storageBytes = Load(documentID);
if (storageBytes == null)
return null;
ODBDocument document = new ODBDocument(storageBytes, 0, storageBytes.Length);
document.Collection = this.Collection;
return document;
}
public void Log(string transaction,params object[] p)
{
if (transactionLogger != null)
Log(string.Format(transaction,p).Split('\n'));
}
public void Log(string[] transaction)
{
if (transactionLogger != null)
transactionLogger.Message(LogLevel.INFO, transaction);
}
public bool Contains(ODBValue documentID)
{
return idLookup.ContainsKey(documentID);
}
public byte[] Load(ODBValue documentID)
{
lock (this)
{
if (idLookup.ContainsKey(documentID))
{
Area area = idLookup[documentID];
return Load(area);
}
return null;
}
}
byte[] Load(Area area)
{
lock (this)
{
byte[] buffer = new byte[area.Size];
StorageStream.Position = area.Offset + 4;
StorageStream.Read(buffer, 0, area.Size);
return buffer;
}
}
public void Store(ODBDocument document)
{
lock (this)
{
byte[] storageBytes = !Object.ReferenceEquals(null,document) ? document.ToStorage() : new byte[0];
Area previousArea = null;
Area storageArea = availableAreas.Request(storageBytes.Length);
if (idLookup.ContainsKey(document.ID))
previousArea = idLookup[document.ID];
if (storageArea == null)
storageArea = new Area(StorageStream.Length, storageBytes.Length);
WriteArea(storageArea, document.ID, storageBytes);
idLookup[document.ID] = storageArea;
UpdateIndices(document);
if (previousArea != null)
availableAreas.GiveBack(previousArea);
}
}
public bool Remove(ODBValue documentID)
{
lock (this)
{
if (idLookup.ContainsKey(documentID))
{
Area area = idLookup[documentID];
idLookup.Remove(documentID);
availableAreas.GiveBack(area);
UpdateIndices(documentID);
return true;
}
return false;
}
}
private void WriteArea(Area area, ODBValue documentID,byte[] storageBytes)
{
Log("OFFSET(0x{0:X8}) SIZE(0x{1:X8}) ID: {2}",area.Offset,area.Size,documentID);
StorageStream.Position = area.Offset;
StorageStream.WriteInteger(area.Size);
StorageStream.Write(storageBytes, 0, storageBytes.Length);
if (storageBytes.Length < area.Size)
{
byte[] zero = new byte[area.Size - storageBytes.Length];
StorageStream.Write(zero, 0, zero.Length);
}
StorageStream.Flush();
area.DocumentID = documentID;
}
private void UpdateIndices(ODBDocument document)
{
foreach (PropertyIndex propIndex in propertyIndeces.Values)
{
propIndex.UpdateIndex(document);
}
}
private void UpdateIndices(ODBValue documentID)
{
foreach (PropertyIndex propIndex in propertyIndeces.Values)
propIndex.Remove(documentID);
}
Area Lookup(ODBValue documentID)
{
if (idLookup.ContainsKey(documentID))
return idLookup[documentID];
return null;
}
public IEnumerator<ODBValue> GetEnumerator()
{
ODBValue[] docIDs;
lock (this)
{
docIDs = idLookup.Keys.ToArray();
}
return ((IEnumerable<ODBValue>)docIDs).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}
}