Major restructuring, removed searchdomain field from Entity/Index, moved embeddingcache to SearchdomainManager, improved logging
This commit is contained in:
160
src/Server/Helper/DatabaseHelper.cs
Normal file
160
src/Server/Helper/DatabaseHelper.cs
Normal file
@@ -0,0 +1,160 @@
|
||||
using System.Data.Common;
|
||||
using System.Text;
|
||||
|
||||
namespace Server;
|
||||
|
||||
public static class DatabaseHelper
|
||||
{
|
||||
public static void DatabaseInsertEmbeddingBulk(SQLHelper helper, int id_datapoint, List<(string model, byte[] embedding)> data)
|
||||
{
|
||||
Dictionary<string, object> parameters = [];
|
||||
parameters["id_datapoint"] = id_datapoint;
|
||||
var query = new StringBuilder("INSERT INTO embedding (id_datapoint, model, embedding) VALUES ");
|
||||
foreach (var (model, embedding) in data)
|
||||
{
|
||||
string modelParam = $"model_{Guid.NewGuid()}".Replace("-", "");
|
||||
string embeddingParam = $"embedding_{Guid.NewGuid()}".Replace("-", "");
|
||||
parameters[modelParam] = model;
|
||||
parameters[embeddingParam] = embedding;
|
||||
|
||||
query.Append($"(@id_datapoint, @{modelParam}, @{embeddingParam}), ");
|
||||
}
|
||||
|
||||
query.Length -= 2; // remove trailing comma
|
||||
helper.ExecuteSQLNonQuery(query.ToString(), parameters);
|
||||
}
|
||||
|
||||
public static int DatabaseInsertSearchdomain(SQLHelper helper, string name)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "name", name },
|
||||
{ "settings", "{}"} // TODO add settings. It's not used yet, but maybe it's needed someday...
|
||||
};
|
||||
return helper.ExecuteSQLCommandGetInsertedID("INSERT INTO searchdomain (name, settings) VALUES (@name, @settings)", parameters);
|
||||
}
|
||||
|
||||
public static int DatabaseInsertEntity(SQLHelper helper, string name, string probmethod, int id_searchdomain)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "name", name },
|
||||
{ "probmethod", probmethod },
|
||||
{ "id_searchdomain", id_searchdomain }
|
||||
};
|
||||
return helper.ExecuteSQLCommandGetInsertedID("INSERT INTO entity (name, probmethod, id_searchdomain) VALUES (@name, @probmethod, @id_searchdomain)", parameters);
|
||||
}
|
||||
|
||||
public static int DatabaseInsertAttribute(SQLHelper helper, string attribute, string value, int id_entity)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "attribute", attribute },
|
||||
{ "value", value },
|
||||
{ "id_entity", id_entity }
|
||||
};
|
||||
return helper.ExecuteSQLCommandGetInsertedID("INSERT INTO attribute (attribute, value, id_entity) VALUES (@attribute, @value, @id_entity)", parameters);
|
||||
}
|
||||
|
||||
public static int DatabaseInsertDatapoint(SQLHelper helper, string name, string probmethod_embedding, string hash, int id_entity)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "name", name },
|
||||
{ "probmethod_embedding", probmethod_embedding },
|
||||
{ "hash", hash },
|
||||
{ "id_entity", id_entity }
|
||||
};
|
||||
return helper.ExecuteSQLCommandGetInsertedID("INSERT INTO datapoint (name, probmethod_embedding, hash, id_entity) VALUES (@name, @probmethod_embedding, @hash, @id_entity)", parameters);
|
||||
}
|
||||
|
||||
public static int DatabaseInsertEmbedding(SQLHelper helper, int id_datapoint, string model, byte[] embedding)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "id_datapoint", id_datapoint },
|
||||
{ "model", model },
|
||||
{ "embedding", embedding }
|
||||
};
|
||||
return helper.ExecuteSQLCommandGetInsertedID("INSERT INTO embedding (id_datapoint, model, embedding) VALUES (@id_datapoint, @model, @embedding)", parameters);
|
||||
}
|
||||
|
||||
public static int GetSearchdomainID(SQLHelper helper, string searchdomain)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "searchdomain", searchdomain}
|
||||
};
|
||||
lock (helper.connection)
|
||||
{
|
||||
DbDataReader reader = helper.ExecuteSQLCommand("SELECT id FROM searchdomain WHERE name = @searchdomain", parameters);
|
||||
bool success = reader.Read();
|
||||
int result = success ? reader.GetInt32(0) : 0;
|
||||
reader.Close();
|
||||
if (success)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new Exception($"Unable to retrieve searchdomain ID for {searchdomain}"); // TODO implement logging here; add logger via method injection
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void RemoveEntity(List<Entity> entityCache, SQLHelper helper, string name, string searchdomain)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "name", name },
|
||||
{ "searchdomain", GetSearchdomainID(helper, searchdomain)}
|
||||
};
|
||||
|
||||
helper.ExecuteSQLNonQuery("DELETE embedding.* FROM embedding JOIN datapoint dp ON id_datapoint = dp.id JOIN entity ON id_entity = entity.id WHERE entity.name = @name AND entity.id_searchdomain = @searchdomain", parameters);
|
||||
helper.ExecuteSQLNonQuery("DELETE datapoint.* FROM datapoint JOIN entity ON id_entity = entity.id WHERE entity.name = @name AND entity.id_searchdomain = @searchdomain", parameters);
|
||||
helper.ExecuteSQLNonQuery("DELETE attribute.* FROM attribute JOIN entity ON id_entity = entity.id WHERE entity.name = @name AND entity.id_searchdomain = @searchdomain", parameters);
|
||||
helper.ExecuteSQLNonQuery("DELETE FROM entity WHERE name = @name AND entity.id_searchdomain = @searchdomain", parameters);
|
||||
entityCache.RemoveAll(entity => entity.name == name);
|
||||
}
|
||||
|
||||
public static bool HasEntity(SQLHelper helper, string name, string searchdomain)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "name", name },
|
||||
{ "searchdomain", GetSearchdomainID(helper, searchdomain)}
|
||||
};
|
||||
lock (helper.connection)
|
||||
{
|
||||
DbDataReader reader = helper.ExecuteSQLCommand("SELECT COUNT(*) FROM entity WHERE name = @name AND id_searchdomain = @searchdomain", parameters);
|
||||
bool success = reader.Read();
|
||||
bool result = success && reader.GetInt32(0) > 0;
|
||||
reader.Close();
|
||||
if (success)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new Exception($"Unable to determine whether an entity named {name} exists for {searchdomain}"); // TODO implement logging here; add logger via method injection
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static int? GetEntityID(SQLHelper helper, string name, string searchdomain)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "name", name },
|
||||
{ "searchdomain", GetSearchdomainID(helper, searchdomain)}
|
||||
};
|
||||
lock (helper.connection)
|
||||
{
|
||||
DbDataReader reader = helper.ExecuteSQLCommand("SELECT id FROM entity WHERE name = @name AND id_searchdomain = @searchdomain", parameters);
|
||||
bool success = reader.Read();
|
||||
int? result = success ? reader.GetInt32(0) : 0;
|
||||
reader.Close();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,10 +6,19 @@ namespace Server;
|
||||
public class SQLHelper
|
||||
{
|
||||
public MySqlConnection connection;
|
||||
public SQLHelper(MySqlConnection connection)
|
||||
public string connectionString;
|
||||
public SQLHelper(MySqlConnection connection, string connectionString)
|
||||
{
|
||||
this.connection = connection;
|
||||
this.connectionString = connectionString;
|
||||
}
|
||||
|
||||
public SQLHelper DuplicateConnection()
|
||||
{
|
||||
MySqlConnection newConnection = new(connectionString);
|
||||
return new SQLHelper(newConnection, connectionString);
|
||||
}
|
||||
|
||||
public DbDataReader ExecuteSQLCommand(string query, Dictionary<string, dynamic> parameters)
|
||||
{
|
||||
lock (connection)
|
||||
|
||||
146
src/Server/Helper/SearchdomainHelper.cs
Normal file
146
src/Server/Helper/SearchdomainHelper.cs
Normal file
@@ -0,0 +1,146 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using MySql.Data.MySqlClient;
|
||||
using OllamaSharp;
|
||||
|
||||
namespace Server;
|
||||
|
||||
public static class SearchdomainHelper
|
||||
{
|
||||
public static byte[] BytesFromFloatArray(float[] floats)
|
||||
{
|
||||
var byteArray = new byte[floats.Length * 4];
|
||||
var floatArray = floats.ToArray();
|
||||
Buffer.BlockCopy(floatArray, 0, byteArray, 0, byteArray.Length);
|
||||
return byteArray;
|
||||
}
|
||||
|
||||
public static float[] FloatArrayFromBytes(byte[] bytes)
|
||||
{
|
||||
var floatArray = new float[bytes.Length / 4];
|
||||
Buffer.BlockCopy(bytes, 0, floatArray, 0, bytes.Length);
|
||||
return floatArray;
|
||||
}
|
||||
|
||||
public static bool CacheHasEntity(List<Entity> entityCache, string name)
|
||||
{
|
||||
return CacheGetEntity(entityCache, name) is not null;
|
||||
}
|
||||
|
||||
public static Entity? CacheGetEntity(List<Entity> entityCache, string name)
|
||||
{
|
||||
foreach (Entity entity in entityCache)
|
||||
{
|
||||
if (entity.name == name)
|
||||
{
|
||||
return entity;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static List<Entity>? EntitiesFromJSON(List<Entity> entityCache, Dictionary<string, Dictionary<string, float[]>> embeddingCache, OllamaApiClient ollama, SQLHelper helper, string json)
|
||||
{
|
||||
List<JSONEntity>? jsonEntities = JsonSerializer.Deserialize<List<JSONEntity>>(json);
|
||||
if (jsonEntities is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
Dictionary<string, List<string>> toBeCached = [];
|
||||
foreach (JSONEntity jSONEntity in jsonEntities)
|
||||
{
|
||||
foreach (JSONDatapoint datapoint in jSONEntity.Datapoints)
|
||||
{
|
||||
foreach (string model in datapoint.Model)
|
||||
{
|
||||
if (!toBeCached.ContainsKey(model))
|
||||
{
|
||||
toBeCached[model] = [];
|
||||
}
|
||||
toBeCached[model].Add(datapoint.Text);
|
||||
}
|
||||
}
|
||||
}
|
||||
ConcurrentQueue<Entity> retVal = [];
|
||||
Parallel.ForEach(jsonEntities, jSONEntity =>
|
||||
{
|
||||
var tempHelper = helper.DuplicateConnection();
|
||||
var entity = EntityFromJSON(entityCache, embeddingCache, ollama, tempHelper, jSONEntity);
|
||||
if (entity is not null)
|
||||
{
|
||||
retVal.Enqueue(entity);
|
||||
}
|
||||
});
|
||||
return [.. retVal];
|
||||
}
|
||||
|
||||
public static Entity? EntityFromJSON(List<Entity> entityCache, Dictionary<string, Dictionary<string, float[]>> embeddingCache, OllamaApiClient ollama, SQLHelper helper, JSONEntity jsonEntity) //string json)
|
||||
{
|
||||
Dictionary<string, Dictionary<string, float[]>> embeddingsLUT = [];
|
||||
int? preexistingEntityID = DatabaseHelper.GetEntityID(helper, jsonEntity.Name, jsonEntity.Searchdomain);
|
||||
if (preexistingEntityID is not null)
|
||||
{
|
||||
lock (helper.connection)
|
||||
{
|
||||
Dictionary<string, dynamic> parameters = new()
|
||||
{
|
||||
{ "id", preexistingEntityID }
|
||||
};
|
||||
System.Data.Common.DbDataReader reader = helper.ExecuteSQLCommand("SELECT e.model, e.embedding, d.hash FROM datapoint d JOIN embedding e ON d.id = e.id_datapoint WHERE d.id_entity = @id", parameters);
|
||||
while (reader.Read())
|
||||
{
|
||||
string model = reader.GetString(0);
|
||||
long length = reader.GetBytes(1, 0, null, 0, 0);
|
||||
byte[] embeddingBytes = new byte[length];
|
||||
reader.GetBytes(1, 0, embeddingBytes, 0, (int)length);
|
||||
float[] embeddingValues = FloatArrayFromBytes(embeddingBytes);
|
||||
string hash = reader.GetString(2);
|
||||
if (!embeddingsLUT.ContainsKey(hash))
|
||||
{
|
||||
embeddingsLUT[hash] = [];
|
||||
}
|
||||
embeddingsLUT[hash].TryAdd(model, embeddingValues);
|
||||
}
|
||||
reader.Close();
|
||||
}
|
||||
DatabaseHelper.RemoveEntity(entityCache, helper, jsonEntity.Name, jsonEntity.Searchdomain); // TODO only remove entity if there is actually a change somewhere. Perhaps create 3 datapoint lists to operate with: 1. delete, 2. update, 3. create
|
||||
}
|
||||
int id_entity = DatabaseHelper.DatabaseInsertEntity(helper, jsonEntity.Name, jsonEntity.Probmethod, DatabaseHelper.GetSearchdomainID(helper, jsonEntity.Searchdomain));
|
||||
foreach (KeyValuePair<string, string> attribute in jsonEntity.Attributes)
|
||||
{
|
||||
DatabaseHelper.DatabaseInsertAttribute(helper, attribute.Key, attribute.Value, id_entity); // TODO implement bulk insert to reduce number of queries
|
||||
}
|
||||
|
||||
List<Datapoint> datapoints = [];
|
||||
foreach (JSONDatapoint jsonDatapoint in jsonEntity.Datapoints)
|
||||
{
|
||||
string hash = Convert.ToBase64String(SHA256.HashData(Encoding.UTF8.GetBytes(jsonDatapoint.Text)));
|
||||
Dictionary<string, float[]> embeddings = embeddingsLUT.ContainsKey(hash) ? embeddingsLUT[hash] : [];
|
||||
if (embeddings.Count == 0)
|
||||
{
|
||||
embeddings = Datapoint.GenerateEmbeddings(jsonDatapoint.Text, [.. jsonDatapoint.Model], ollama, embeddingCache);
|
||||
}
|
||||
var probMethod_embedding = Probmethods.GetMethod(jsonDatapoint.Probmethod_embedding) ?? throw new Exception($"Unknown probmethod name {jsonDatapoint.Probmethod_embedding}");
|
||||
Datapoint datapoint = new(jsonDatapoint.Name, probMethod_embedding, hash, [.. embeddings.Select(kv => (kv.Key, kv.Value))]);
|
||||
int id_datapoint = DatabaseHelper.DatabaseInsertDatapoint(helper, jsonDatapoint.Name, jsonDatapoint.Probmethod_embedding, hash, id_entity); // TODO make this a bulk add action to reduce number of queries
|
||||
List<(string model, byte[] embedding)> data = [];
|
||||
foreach ((string, float[]) embedding in datapoint.embeddings)
|
||||
{
|
||||
data.Add((embedding.Item1, BytesFromFloatArray(embedding.Item2)));
|
||||
}
|
||||
DatabaseHelper.DatabaseInsertEmbeddingBulk(helper, id_datapoint, data);
|
||||
datapoints.Add(datapoint);
|
||||
}
|
||||
|
||||
var probMethod = Probmethods.GetMethod(jsonEntity.Probmethod) ?? throw new Exception($"Unknown probmethod name {jsonEntity.Probmethod}");
|
||||
Entity entity = new(jsonEntity.Attributes, probMethod, datapoints, jsonEntity.Name)
|
||||
{
|
||||
id = id_entity
|
||||
};
|
||||
entityCache.Add(entity);
|
||||
return entity;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user