From 0582ff9a6c5e3571e560ceba3a5d584558572587 Mon Sep 17 00:00:00 2001 From: LD-Reborn Date: Sun, 22 Feb 2026 19:48:26 +0100 Subject: [PATCH] Fixed Putting entities only upserts entities instead of also deleting non-existant ones --- src/Client/Client.cs | 20 +++- src/Indexer/Scripts/example.py | 1 + src/Indexer/Scripts/example_chunked.py | 85 +++++++++++++ src/Indexer/Scripts/tools.py | 2 + src/Server/Controllers/EntityController.cs | 131 +++++++++++++++++++-- src/Server/Entity.cs | 3 +- src/Server/Helper/SearchdomainHelper.cs | 2 +- src/Server/Searchdomain.cs | 2 +- 8 files changed, 228 insertions(+), 18 deletions(-) create mode 100644 src/Indexer/Scripts/example_chunked.py diff --git a/src/Client/Client.cs b/src/Client/Client.cs index 290681d..a00537e 100644 --- a/src/Client/Client.cs +++ b/src/Client/Client.cs @@ -47,15 +47,27 @@ public class Client return await FetchUrlAndProcessJson(HttpMethod.Get, url); } - public async Task EntityIndexAsync(List jsonEntity) + public async Task EntityIndexAsync(List jsonEntity, string? sessionId = null, bool? sessionComplete = null) { - return await EntityIndexAsync(JsonSerializer.Serialize(jsonEntity)); + return await EntityIndexAsync(JsonSerializer.Serialize(jsonEntity), sessionId, sessionComplete); } - public async Task EntityIndexAsync(string jsonEntity) + public async Task EntityIndexAsync(string jsonEntity, string? sessionId = null, bool? sessionComplete = null) { var content = new StringContent(jsonEntity, Encoding.UTF8, "application/json"); - return await FetchUrlAndProcessJson(HttpMethod.Put, GetUrl($"{baseUri}", "Entities", []), content); + Dictionary parameters = []; + if (sessionId is not null) parameters.Add("sessionId", sessionId); + if (sessionComplete is not null) parameters.Add("sessionComplete", ((bool)sessionComplete).ToString()); + + return await FetchUrlAndProcessJson( + HttpMethod.Put, + GetUrl( + $"{baseUri}", + $"Entities", + parameters + ), + content + ); } public async Task EntityDeleteAsync(string entityName) diff --git a/src/Indexer/Scripts/example.py b/src/Indexer/Scripts/example.py index 05130c8..687ae7f 100644 --- a/src/Indexer/Scripts/example.py +++ b/src/Indexer/Scripts/example.py @@ -65,6 +65,7 @@ def index_files(toolset: Toolset): jsonEntities.append(jsonEntity) jsonstring = json.dumps(jsonEntities) timer_start = time.time() + # Index all entities in one go. If you need to split it into chunks, use the session attributes! See example_chunked.py result:EntityIndexResult = toolset.Client.EntityIndexAsync(jsonstring).Result timer_end = time.time() toolset.Logger.LogInformation(f"Update was successful: {result.Success} - and was done in {timer_end - timer_start} seconds.") \ No newline at end of file diff --git a/src/Indexer/Scripts/example_chunked.py b/src/Indexer/Scripts/example_chunked.py new file mode 100644 index 0000000..8b08f08 --- /dev/null +++ b/src/Indexer/Scripts/example_chunked.py @@ -0,0 +1,85 @@ +import math +import os +from tools import * +import json +from dataclasses import asdict +import time +import uuid + +example_content = "./Scripts/example_content" +probmethod = "HVEWAvg" +similarityMethod = "Cosine" +example_searchdomain = "example_" + probmethod +example_counter = 0 +models = ["ollama:bge-m3", "ollama:mxbai-embed-large"] +probmethod_datapoint = probmethod +probmethod_entity = probmethod +# Example for a dictionary based weighted average: +# probmethod_datapoint = "DictionaryWeightedAverage:{\"ollama:bge-m3\": 4, \"ollama:mxbai-embed-large\": 1}" +# probmethod_entity = "DictionaryWeightedAverage:{\"title\": 2, \"filename\": 0.1, \"text\": 0.25}" + +def init(toolset: Toolset): + global example_counter + toolset.Logger.LogInformation("{toolset.Name} - init", toolset.Name) + toolset.Logger.LogInformation("This is the init function from the python example script") + toolset.Logger.LogInformation(f"example_counter: {example_counter}") + searchdomainlist:SearchdomainListResults = toolset.Client.SearchdomainListAsync().Result + if example_searchdomain not in searchdomainlist.Searchdomains: + toolset.Client.SearchdomainCreateAsync(example_searchdomain).Result + searchdomainlist = toolset.Client.SearchdomainListAsync().Result + output = "Currently these searchdomains exist:\n" + for searchdomain in searchdomainlist.Searchdomains: + output += f" - {searchdomain}\n" + toolset.Logger.LogInformation(output) + +def update(toolset: Toolset): + global example_counter + toolset.Logger.LogInformation("{toolset.Name} - update", toolset.Name) + toolset.Logger.LogInformation("This is the update function from the python example script") + callbackInfos:ICallbackInfos = toolset.CallbackInfos + if (str(callbackInfos) == "Indexer.Models.RunOnceCallbackInfos"): + toolset.Logger.LogInformation("It was triggered by a runonce call") + elif (str(callbackInfos) == "Indexer.Models.IntervalCallbackInfos"): + toolset.Logger.LogInformation("It was triggered by an interval call") + elif (str(callbackInfos) == "Indexer.Models.ScheduleCallbackInfos"): + toolset.Logger.LogInformation("It was triggered by a schedule call") + elif (str(callbackInfos) == "Indexer.Models.FileUpdateCallbackInfos"): + toolset.Logger.LogInformation("It was triggered by a fileupdate call") + else: + toolset.Logger.LogInformation("It was triggered, but the origin of the call could not be determined") + example_counter += 1 + toolset.Logger.LogInformation(f"example_counter: {example_counter}") + index_files(toolset) + +def index_files(toolset: Toolset): + jsonEntities:list = [] + for filename in os.listdir(example_content): + qualified_filepath = example_content + "/" + filename + with open(qualified_filepath, "r", encoding='utf-8', errors="replace") as file: + title = file.readline() + text = file.read() + datapoints:list = [ + JSONDatapoint("filename", qualified_filepath, probmethod_datapoint, similarityMethod, models), + JSONDatapoint("title", title, probmethod_datapoint, similarityMethod, models), + JSONDatapoint("text", text, probmethod_datapoint, similarityMethod, models) + ] + jsonEntity:dict = asdict(JSONEntity(qualified_filepath, probmethod_entity, example_searchdomain, {}, datapoints)) + jsonEntities.append(jsonEntity) + timer_start = time.time() + chunkSize = 5 + chunkList = chunk_list(jsonEntities, chunkSize) + chunkCount = math.ceil(len(jsonEntities) / chunkSize) + sessionId = uuid.uuid4().hex + print(f"indexing {len(jsonEntities)} entities") + for i, entities in enumerate(chunkList): + isLast = i == chunkCount + print(f'Processing chunk {i} / {len(jsonEntities) / chunkSize}') + jsonstring = json.dumps(entities) + result:EntityIndexResult = toolset.Client.EntityIndexAsync(jsonstring, sessionId, isLast).Result + timer_end = time.time() + toolset.Logger.LogInformation(f"Update was successful: {result.Success} - and was done in {timer_end - timer_start} seconds.") + + +def chunk_list(lst, chunk_size): + for i in range(0, len(lst), chunk_size): + yield lst[i: i + chunk_size] \ No newline at end of file diff --git a/src/Indexer/Scripts/tools.py b/src/Indexer/Scripts/tools.py index aa6005e..983b28c 100644 --- a/src/Indexer/Scripts/tools.py +++ b/src/Indexer/Scripts/tools.py @@ -107,6 +107,8 @@ class Client: # pass async def EntityIndexAsync(jsonEntity:str) -> EntityIndexResult: pass + async def EntityIndexAsync(jsonEntity:str, sessionId:str, sessionComplete:bool) -> EntityIndexResult: + pass async def EntityIndexAsync(searchdomain:str, jsonEntity:str) -> EntityIndexResult: pass async def EntityListAsync(returnEmbeddings:bool = False) -> EntityListResults: diff --git a/src/Server/Controllers/EntityController.cs b/src/Server/Controllers/EntityController.cs index d103dd1..0a0f51f 100644 --- a/src/Server/Controllers/EntityController.cs +++ b/src/Server/Controllers/EntityController.cs @@ -14,6 +14,9 @@ public class EntityController : ControllerBase private SearchdomainManager _domainManager; private readonly SearchdomainHelper _searchdomainHelper; private readonly DatabaseHelper _databaseHelper; + private readonly Dictionary _sessions = []; + private readonly object _sessionLock = new(); + private const int SessionTimeoutMinutes = 60; // TODO: remove magic number; add an optional configuration option public EntityController(ILogger logger, IConfiguration config, SearchdomainManager domainManager, SearchdomainHelper searchdomainHelper, DatabaseHelper databaseHelper) { @@ -86,31 +89,59 @@ public class EntityController : ControllerBase /// Index entities /// /// - /// Behavior: Creates new entities, but overwrites existing entities that have the same name + /// Behavior: Updates the index using the provided entities. Creates new entities, overwrites existing entities with the same name, and deletes entities that are not part of the index anymore. + /// + /// Can be executed in a single request or in multiple chunks using a (self-defined) session UUID string. + /// + /// For session-based chunk uploads: + /// - Provide sessionId to accumulate entities across multiple requests + /// - Set sessionComplete=true on the final request to finalize and delete entities that are not in the accumulated list + /// - Without sessionId: Missing entities will be deleted from the searchdomain. + /// - Sessions expire after 60 minutes of inactivity (or as otherwise configured in the appsettings) /// /// Entities to index + /// Optional session ID for batch uploads across multiple requests + /// If true, finalizes the session and deletes entities not in the accumulated list [HttpPut("/Entities")] - public async Task> Index([FromBody] List? jsonEntities) + public async Task> Index( + [FromBody] List? jsonEntities, + string? sessionId = null, + bool sessionComplete = false) { try { + if (sessionId is null || string.IsNullOrWhiteSpace(sessionId)) + { + sessionId = Guid.NewGuid().ToString(); // Create a short-lived session + sessionComplete = true; // If no sessionId was set, there is no trackable session. The pseudo-session ends here. + } + // Periodic cleanup of expired sessions + CleanupExpiredEntityIndexSessions(); + EntityIndexSessionData session = GetOrCreateEntityIndexSession(sessionId); + + if (jsonEntities is null && !sessionComplete) + { + return BadRequest(new EntityIndexResult() { Success = false, Message = "jsonEntities can only be null for a complete session" }); + } else if (jsonEntities is null && sessionComplete) + { + await EntityIndexSessionDeleteUnindexedEntities(session); + return Ok(new EntityIndexResult() { Success = true }); + } + + // Standard entity indexing (upsert behavior) List? entities = await _searchdomainHelper.EntitiesFromJSON( _domainManager, _logger, JsonSerializer.Serialize(jsonEntities)); if (entities is not null && jsonEntities is not null) { - List invalidatedSearchdomains = []; - foreach (var jsonEntity in jsonEntities) + session.AccumulatedEntities.AddRange(entities); + + if (sessionComplete) { - string jsonEntityName = jsonEntity.Name; - string jsonEntitySearchdomainName = jsonEntity.Searchdomain; - if (entities.Select(x => x.name == jsonEntityName).Any() - && !invalidatedSearchdomains.Contains(jsonEntitySearchdomainName)) - { - invalidatedSearchdomains.Add(jsonEntitySearchdomainName); - } + await EntityIndexSessionDeleteUnindexedEntities(session); } + return Ok(new EntityIndexResult() { Success = true }); } else @@ -129,6 +160,44 @@ public class EntityController : ControllerBase } + private async Task EntityIndexSessionDeleteUnindexedEntities(EntityIndexSessionData session) + { + var entityGroupsBySearchdomain = session.AccumulatedEntities.GroupBy(e => e.searchdomain); + + foreach (var entityGroup in entityGroupsBySearchdomain) + { + string searchdomainName = entityGroup.Key; + var entityNamesInRequest = entityGroup.Select(e => e.name).ToHashSet(); + + (Searchdomain? searchdomain_, int? httpStatusCode, string? message) = + SearchdomainHelper.TryGetSearchdomain(_domainManager, searchdomainName, _logger); + + if (searchdomain_ is not null && httpStatusCode is null) // If getting searchdomain was successful + { + var entitiesToDelete = searchdomain_.entityCache + .Where(kvp => !entityNamesInRequest.Contains(kvp.Value.name)) + .Select(kvp => kvp.Value) + .ToList(); + + foreach (var entity in entitiesToDelete) + { + searchdomain_.ReconciliateOrInvalidateCacheForDeletedEntity(entity); + await _databaseHelper.RemoveEntity( + [], + _domainManager.helper, + entity.name, + searchdomainName); + searchdomain_.entityCache.TryRemove(entity.name, out _); + _logger.LogInformation("Deleted entity {entityName} from {searchdomain}", entity.name, searchdomainName); + } + } + else + { + _logger.LogWarning("Unable to delete entities for searchdomain {searchdomain}", searchdomainName); + } + } + } + /// /// Deletes an entity /// @@ -158,4 +227,44 @@ public class EntityController : ControllerBase return Ok(new EntityDeleteResults() {Success = success}); } + + + private void CleanupExpiredEntityIndexSessions() + { + lock (_sessionLock) + { + var expiredSessions = _sessions + .Where(kvp => (DateTime.UtcNow - kvp.Value.LastInteractionAt).TotalMinutes > SessionTimeoutMinutes) + .Select(kvp => kvp.Key) + .ToList(); + + foreach (var sessionId in expiredSessions) + { + _sessions.Remove(sessionId); + _logger.LogWarning("Removed expired, non-closed session {sessionId}", sessionId); + } + } + } + + private EntityIndexSessionData GetOrCreateEntityIndexSession(string sessionId) + { + lock (_sessionLock) + { + if (!_sessions.TryGetValue(sessionId, out var session)) + { + session = new EntityIndexSessionData(); + _sessions[sessionId] = session; + } else + { + session.LastInteractionAt = DateTime.UtcNow; + } + return session; + } + } } + +public class EntityIndexSessionData +{ + public List AccumulatedEntities { get; set; } = []; + public DateTime LastInteractionAt { get; set; } = DateTime.UtcNow; +} \ No newline at end of file diff --git a/src/Server/Entity.cs b/src/Server/Entity.cs index ad5cb1c..401dd31 100644 --- a/src/Server/Entity.cs +++ b/src/Server/Entity.cs @@ -2,7 +2,7 @@ using System.Collections.Concurrent; namespace Server; -public class Entity(Dictionary attributes, Probmethods.probMethodDelegate probMethod, string probMethodName, ConcurrentBag datapoints, string name) +public class Entity(Dictionary attributes, Probmethods.probMethodDelegate probMethod, string probMethodName, ConcurrentBag datapoints, string name, string searchdomain) { public Dictionary attributes = attributes; public Probmethods.probMethodDelegate probMethod = probMethod; @@ -10,4 +10,5 @@ public class Entity(Dictionary attributes, Probmethods.probMetho public ConcurrentBag datapoints = datapoints; public int id; public string name = name; + public string searchdomain = searchdomain; } \ No newline at end of file diff --git a/src/Server/Helper/SearchdomainHelper.cs b/src/Server/Helper/SearchdomainHelper.cs index 5a543aa..084bed1 100644 --- a/src/Server/Helper/SearchdomainHelper.cs +++ b/src/Server/Helper/SearchdomainHelper.cs @@ -389,7 +389,7 @@ public class SearchdomainHelper(ILogger logger, DatabaseHelp List datapoints = await DatabaseInsertDatapointsWithEmbeddings(helper, searchdomain, toBeInsertedDatapoints, id_entity, id_searchdomain); var probMethod = Probmethods.GetMethod(jsonEntity.Probmethod) ?? throw new ProbMethodNotFoundException(jsonEntity.Probmethod); - Entity entity = new(jsonEntity.Attributes, probMethod, jsonEntity.Probmethod.ToString(), new(datapoints), jsonEntity.Name) + Entity entity = new(jsonEntity.Attributes, probMethod, jsonEntity.Probmethod.ToString(), [.. datapoints], jsonEntity.Name, jsonEntity.Searchdomain) { id = id_entity }; diff --git a/src/Server/Searchdomain.cs b/src/Server/Searchdomain.cs index 5c0a0fe..3659e69 100644 --- a/src/Server/Searchdomain.cs +++ b/src/Server/Searchdomain.cs @@ -166,7 +166,7 @@ public class Searchdomain Probmethods.probMethodDelegate? probmethod = Probmethods.GetMethod(probmethodString); if (datapoint_unassigned.TryGetValue(id, out ConcurrentBag? datapoints) && probmethod is not null) { - Entity entity = new(attributes, probmethod, probmethodString, datapoints, name) + Entity entity = new(attributes, probmethod, probmethodString, datapoints, name, searchdomain) { id = id };