Fixed Putting entities only upserts entities instead of also deleting non-existant ones
This commit is contained in:
@@ -47,15 +47,27 @@ public class Client
|
|||||||
return await FetchUrlAndProcessJson<EntityListResults>(HttpMethod.Get, url);
|
return await FetchUrlAndProcessJson<EntityListResults>(HttpMethod.Get, url);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<EntityIndexResult> EntityIndexAsync(List<JSONEntity> jsonEntity)
|
public async Task<EntityIndexResult> EntityIndexAsync(List<JSONEntity> jsonEntity, string? sessionId = null, bool? sessionComplete = null)
|
||||||
{
|
{
|
||||||
return await EntityIndexAsync(JsonSerializer.Serialize(jsonEntity));
|
return await EntityIndexAsync(JsonSerializer.Serialize(jsonEntity), sessionId, sessionComplete);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<EntityIndexResult> EntityIndexAsync(string jsonEntity)
|
public async Task<EntityIndexResult> EntityIndexAsync(string jsonEntity, string? sessionId = null, bool? sessionComplete = null)
|
||||||
{
|
{
|
||||||
var content = new StringContent(jsonEntity, Encoding.UTF8, "application/json");
|
var content = new StringContent(jsonEntity, Encoding.UTF8, "application/json");
|
||||||
return await FetchUrlAndProcessJson<EntityIndexResult>(HttpMethod.Put, GetUrl($"{baseUri}", "Entities", []), content);
|
Dictionary<string, string> parameters = [];
|
||||||
|
if (sessionId is not null) parameters.Add("sessionId", sessionId);
|
||||||
|
if (sessionComplete is not null) parameters.Add("sessionComplete", ((bool)sessionComplete).ToString());
|
||||||
|
|
||||||
|
return await FetchUrlAndProcessJson<EntityIndexResult>(
|
||||||
|
HttpMethod.Put,
|
||||||
|
GetUrl(
|
||||||
|
$"{baseUri}",
|
||||||
|
$"Entities",
|
||||||
|
parameters
|
||||||
|
),
|
||||||
|
content
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<EntityDeleteResults> EntityDeleteAsync(string entityName)
|
public async Task<EntityDeleteResults> EntityDeleteAsync(string entityName)
|
||||||
|
|||||||
@@ -65,6 +65,7 @@ def index_files(toolset: Toolset):
|
|||||||
jsonEntities.append(jsonEntity)
|
jsonEntities.append(jsonEntity)
|
||||||
jsonstring = json.dumps(jsonEntities)
|
jsonstring = json.dumps(jsonEntities)
|
||||||
timer_start = time.time()
|
timer_start = time.time()
|
||||||
|
# Index all entities in one go. If you need to split it into chunks, use the session attributes! See example_chunked.py
|
||||||
result:EntityIndexResult = toolset.Client.EntityIndexAsync(jsonstring).Result
|
result:EntityIndexResult = toolset.Client.EntityIndexAsync(jsonstring).Result
|
||||||
timer_end = time.time()
|
timer_end = time.time()
|
||||||
toolset.Logger.LogInformation(f"Update was successful: {result.Success} - and was done in {timer_end - timer_start} seconds.")
|
toolset.Logger.LogInformation(f"Update was successful: {result.Success} - and was done in {timer_end - timer_start} seconds.")
|
||||||
85
src/Indexer/Scripts/example_chunked.py
Normal file
85
src/Indexer/Scripts/example_chunked.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
import math
|
||||||
|
import os
|
||||||
|
from tools import *
|
||||||
|
import json
|
||||||
|
from dataclasses import asdict
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
example_content = "./Scripts/example_content"
|
||||||
|
probmethod = "HVEWAvg"
|
||||||
|
similarityMethod = "Cosine"
|
||||||
|
example_searchdomain = "example_" + probmethod
|
||||||
|
example_counter = 0
|
||||||
|
models = ["ollama:bge-m3", "ollama:mxbai-embed-large"]
|
||||||
|
probmethod_datapoint = probmethod
|
||||||
|
probmethod_entity = probmethod
|
||||||
|
# Example for a dictionary based weighted average:
|
||||||
|
# probmethod_datapoint = "DictionaryWeightedAverage:{\"ollama:bge-m3\": 4, \"ollama:mxbai-embed-large\": 1}"
|
||||||
|
# probmethod_entity = "DictionaryWeightedAverage:{\"title\": 2, \"filename\": 0.1, \"text\": 0.25}"
|
||||||
|
|
||||||
|
def init(toolset: Toolset):
|
||||||
|
global example_counter
|
||||||
|
toolset.Logger.LogInformation("{toolset.Name} - init", toolset.Name)
|
||||||
|
toolset.Logger.LogInformation("This is the init function from the python example script")
|
||||||
|
toolset.Logger.LogInformation(f"example_counter: {example_counter}")
|
||||||
|
searchdomainlist:SearchdomainListResults = toolset.Client.SearchdomainListAsync().Result
|
||||||
|
if example_searchdomain not in searchdomainlist.Searchdomains:
|
||||||
|
toolset.Client.SearchdomainCreateAsync(example_searchdomain).Result
|
||||||
|
searchdomainlist = toolset.Client.SearchdomainListAsync().Result
|
||||||
|
output = "Currently these searchdomains exist:\n"
|
||||||
|
for searchdomain in searchdomainlist.Searchdomains:
|
||||||
|
output += f" - {searchdomain}\n"
|
||||||
|
toolset.Logger.LogInformation(output)
|
||||||
|
|
||||||
|
def update(toolset: Toolset):
|
||||||
|
global example_counter
|
||||||
|
toolset.Logger.LogInformation("{toolset.Name} - update", toolset.Name)
|
||||||
|
toolset.Logger.LogInformation("This is the update function from the python example script")
|
||||||
|
callbackInfos:ICallbackInfos = toolset.CallbackInfos
|
||||||
|
if (str(callbackInfos) == "Indexer.Models.RunOnceCallbackInfos"):
|
||||||
|
toolset.Logger.LogInformation("It was triggered by a runonce call")
|
||||||
|
elif (str(callbackInfos) == "Indexer.Models.IntervalCallbackInfos"):
|
||||||
|
toolset.Logger.LogInformation("It was triggered by an interval call")
|
||||||
|
elif (str(callbackInfos) == "Indexer.Models.ScheduleCallbackInfos"):
|
||||||
|
toolset.Logger.LogInformation("It was triggered by a schedule call")
|
||||||
|
elif (str(callbackInfos) == "Indexer.Models.FileUpdateCallbackInfos"):
|
||||||
|
toolset.Logger.LogInformation("It was triggered by a fileupdate call")
|
||||||
|
else:
|
||||||
|
toolset.Logger.LogInformation("It was triggered, but the origin of the call could not be determined")
|
||||||
|
example_counter += 1
|
||||||
|
toolset.Logger.LogInformation(f"example_counter: {example_counter}")
|
||||||
|
index_files(toolset)
|
||||||
|
|
||||||
|
def index_files(toolset: Toolset):
|
||||||
|
jsonEntities:list = []
|
||||||
|
for filename in os.listdir(example_content):
|
||||||
|
qualified_filepath = example_content + "/" + filename
|
||||||
|
with open(qualified_filepath, "r", encoding='utf-8', errors="replace") as file:
|
||||||
|
title = file.readline()
|
||||||
|
text = file.read()
|
||||||
|
datapoints:list = [
|
||||||
|
JSONDatapoint("filename", qualified_filepath, probmethod_datapoint, similarityMethod, models),
|
||||||
|
JSONDatapoint("title", title, probmethod_datapoint, similarityMethod, models),
|
||||||
|
JSONDatapoint("text", text, probmethod_datapoint, similarityMethod, models)
|
||||||
|
]
|
||||||
|
jsonEntity:dict = asdict(JSONEntity(qualified_filepath, probmethod_entity, example_searchdomain, {}, datapoints))
|
||||||
|
jsonEntities.append(jsonEntity)
|
||||||
|
timer_start = time.time()
|
||||||
|
chunkSize = 5
|
||||||
|
chunkList = chunk_list(jsonEntities, chunkSize)
|
||||||
|
chunkCount = math.ceil(len(jsonEntities) / chunkSize)
|
||||||
|
sessionId = uuid.uuid4().hex
|
||||||
|
print(f"indexing {len(jsonEntities)} entities")
|
||||||
|
for i, entities in enumerate(chunkList):
|
||||||
|
isLast = i == chunkCount
|
||||||
|
print(f'Processing chunk {i} / {len(jsonEntities) / chunkSize}')
|
||||||
|
jsonstring = json.dumps(entities)
|
||||||
|
result:EntityIndexResult = toolset.Client.EntityIndexAsync(jsonstring, sessionId, isLast).Result
|
||||||
|
timer_end = time.time()
|
||||||
|
toolset.Logger.LogInformation(f"Update was successful: {result.Success} - and was done in {timer_end - timer_start} seconds.")
|
||||||
|
|
||||||
|
|
||||||
|
def chunk_list(lst, chunk_size):
|
||||||
|
for i in range(0, len(lst), chunk_size):
|
||||||
|
yield lst[i: i + chunk_size]
|
||||||
@@ -107,6 +107,8 @@ class Client:
|
|||||||
# pass
|
# pass
|
||||||
async def EntityIndexAsync(jsonEntity:str) -> EntityIndexResult:
|
async def EntityIndexAsync(jsonEntity:str) -> EntityIndexResult:
|
||||||
pass
|
pass
|
||||||
|
async def EntityIndexAsync(jsonEntity:str, sessionId:str, sessionComplete:bool) -> EntityIndexResult:
|
||||||
|
pass
|
||||||
async def EntityIndexAsync(searchdomain:str, jsonEntity:str) -> EntityIndexResult:
|
async def EntityIndexAsync(searchdomain:str, jsonEntity:str) -> EntityIndexResult:
|
||||||
pass
|
pass
|
||||||
async def EntityListAsync(returnEmbeddings:bool = False) -> EntityListResults:
|
async def EntityListAsync(returnEmbeddings:bool = False) -> EntityListResults:
|
||||||
|
|||||||
@@ -14,6 +14,9 @@ public class EntityController : ControllerBase
|
|||||||
private SearchdomainManager _domainManager;
|
private SearchdomainManager _domainManager;
|
||||||
private readonly SearchdomainHelper _searchdomainHelper;
|
private readonly SearchdomainHelper _searchdomainHelper;
|
||||||
private readonly DatabaseHelper _databaseHelper;
|
private readonly DatabaseHelper _databaseHelper;
|
||||||
|
private readonly Dictionary<string, EntityIndexSessionData> _sessions = [];
|
||||||
|
private readonly object _sessionLock = new();
|
||||||
|
private const int SessionTimeoutMinutes = 60; // TODO: remove magic number; add an optional configuration option
|
||||||
|
|
||||||
public EntityController(ILogger<EntityController> logger, IConfiguration config, SearchdomainManager domainManager, SearchdomainHelper searchdomainHelper, DatabaseHelper databaseHelper)
|
public EntityController(ILogger<EntityController> logger, IConfiguration config, SearchdomainManager domainManager, SearchdomainHelper searchdomainHelper, DatabaseHelper databaseHelper)
|
||||||
{
|
{
|
||||||
@@ -86,31 +89,59 @@ public class EntityController : ControllerBase
|
|||||||
/// Index entities
|
/// Index entities
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <remarks>
|
/// <remarks>
|
||||||
/// Behavior: Creates new entities, but overwrites existing entities that have the same name
|
/// Behavior: Updates the index using the provided entities. Creates new entities, overwrites existing entities with the same name, and deletes entities that are not part of the index anymore.
|
||||||
|
///
|
||||||
|
/// Can be executed in a single request or in multiple chunks using a (self-defined) session UUID string.
|
||||||
|
///
|
||||||
|
/// For session-based chunk uploads:
|
||||||
|
/// - Provide sessionId to accumulate entities across multiple requests
|
||||||
|
/// - Set sessionComplete=true on the final request to finalize and delete entities that are not in the accumulated list
|
||||||
|
/// - Without sessionId: Missing entities will be deleted from the searchdomain.
|
||||||
|
/// - Sessions expire after 60 minutes of inactivity (or as otherwise configured in the appsettings)
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
/// <param name="jsonEntities">Entities to index</param>
|
/// <param name="jsonEntities">Entities to index</param>
|
||||||
|
/// <param name="sessionId">Optional session ID for batch uploads across multiple requests</param>
|
||||||
|
/// <param name="sessionComplete">If true, finalizes the session and deletes entities not in the accumulated list</param>
|
||||||
[HttpPut("/Entities")]
|
[HttpPut("/Entities")]
|
||||||
public async Task<ActionResult<EntityIndexResult>> Index([FromBody] List<JSONEntity>? jsonEntities)
|
public async Task<ActionResult<EntityIndexResult>> Index(
|
||||||
|
[FromBody] List<JSONEntity>? jsonEntities,
|
||||||
|
string? sessionId = null,
|
||||||
|
bool sessionComplete = false)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
if (sessionId is null || string.IsNullOrWhiteSpace(sessionId))
|
||||||
|
{
|
||||||
|
sessionId = Guid.NewGuid().ToString(); // Create a short-lived session
|
||||||
|
sessionComplete = true; // If no sessionId was set, there is no trackable session. The pseudo-session ends here.
|
||||||
|
}
|
||||||
|
// Periodic cleanup of expired sessions
|
||||||
|
CleanupExpiredEntityIndexSessions();
|
||||||
|
EntityIndexSessionData session = GetOrCreateEntityIndexSession(sessionId);
|
||||||
|
|
||||||
|
if (jsonEntities is null && !sessionComplete)
|
||||||
|
{
|
||||||
|
return BadRequest(new EntityIndexResult() { Success = false, Message = "jsonEntities can only be null for a complete session" });
|
||||||
|
} else if (jsonEntities is null && sessionComplete)
|
||||||
|
{
|
||||||
|
await EntityIndexSessionDeleteUnindexedEntities(session);
|
||||||
|
return Ok(new EntityIndexResult() { Success = true });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Standard entity indexing (upsert behavior)
|
||||||
List<Entity>? entities = await _searchdomainHelper.EntitiesFromJSON(
|
List<Entity>? entities = await _searchdomainHelper.EntitiesFromJSON(
|
||||||
_domainManager,
|
_domainManager,
|
||||||
_logger,
|
_logger,
|
||||||
JsonSerializer.Serialize(jsonEntities));
|
JsonSerializer.Serialize(jsonEntities));
|
||||||
if (entities is not null && jsonEntities is not null)
|
if (entities is not null && jsonEntities is not null)
|
||||||
{
|
{
|
||||||
List<string> invalidatedSearchdomains = [];
|
session.AccumulatedEntities.AddRange(entities);
|
||||||
foreach (var jsonEntity in jsonEntities)
|
|
||||||
|
if (sessionComplete)
|
||||||
{
|
{
|
||||||
string jsonEntityName = jsonEntity.Name;
|
await EntityIndexSessionDeleteUnindexedEntities(session);
|
||||||
string jsonEntitySearchdomainName = jsonEntity.Searchdomain;
|
|
||||||
if (entities.Select(x => x.name == jsonEntityName).Any()
|
|
||||||
&& !invalidatedSearchdomains.Contains(jsonEntitySearchdomainName))
|
|
||||||
{
|
|
||||||
invalidatedSearchdomains.Add(jsonEntitySearchdomainName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Ok(new EntityIndexResult() { Success = true });
|
return Ok(new EntityIndexResult() { Success = true });
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -129,6 +160,44 @@ public class EntityController : ControllerBase
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async Task EntityIndexSessionDeleteUnindexedEntities(EntityIndexSessionData session)
|
||||||
|
{
|
||||||
|
var entityGroupsBySearchdomain = session.AccumulatedEntities.GroupBy(e => e.searchdomain);
|
||||||
|
|
||||||
|
foreach (var entityGroup in entityGroupsBySearchdomain)
|
||||||
|
{
|
||||||
|
string searchdomainName = entityGroup.Key;
|
||||||
|
var entityNamesInRequest = entityGroup.Select(e => e.name).ToHashSet();
|
||||||
|
|
||||||
|
(Searchdomain? searchdomain_, int? httpStatusCode, string? message) =
|
||||||
|
SearchdomainHelper.TryGetSearchdomain(_domainManager, searchdomainName, _logger);
|
||||||
|
|
||||||
|
if (searchdomain_ is not null && httpStatusCode is null) // If getting searchdomain was successful
|
||||||
|
{
|
||||||
|
var entitiesToDelete = searchdomain_.entityCache
|
||||||
|
.Where(kvp => !entityNamesInRequest.Contains(kvp.Value.name))
|
||||||
|
.Select(kvp => kvp.Value)
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
foreach (var entity in entitiesToDelete)
|
||||||
|
{
|
||||||
|
searchdomain_.ReconciliateOrInvalidateCacheForDeletedEntity(entity);
|
||||||
|
await _databaseHelper.RemoveEntity(
|
||||||
|
[],
|
||||||
|
_domainManager.helper,
|
||||||
|
entity.name,
|
||||||
|
searchdomainName);
|
||||||
|
searchdomain_.entityCache.TryRemove(entity.name, out _);
|
||||||
|
_logger.LogInformation("Deleted entity {entityName} from {searchdomain}", entity.name, searchdomainName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Unable to delete entities for searchdomain {searchdomain}", searchdomainName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Deletes an entity
|
/// Deletes an entity
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -158,4 +227,44 @@ public class EntityController : ControllerBase
|
|||||||
|
|
||||||
return Ok(new EntityDeleteResults() {Success = success});
|
return Ok(new EntityDeleteResults() {Success = success});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void CleanupExpiredEntityIndexSessions()
|
||||||
|
{
|
||||||
|
lock (_sessionLock)
|
||||||
|
{
|
||||||
|
var expiredSessions = _sessions
|
||||||
|
.Where(kvp => (DateTime.UtcNow - kvp.Value.LastInteractionAt).TotalMinutes > SessionTimeoutMinutes)
|
||||||
|
.Select(kvp => kvp.Key)
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
foreach (var sessionId in expiredSessions)
|
||||||
|
{
|
||||||
|
_sessions.Remove(sessionId);
|
||||||
|
_logger.LogWarning("Removed expired, non-closed session {sessionId}", sessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private EntityIndexSessionData GetOrCreateEntityIndexSession(string sessionId)
|
||||||
|
{
|
||||||
|
lock (_sessionLock)
|
||||||
|
{
|
||||||
|
if (!_sessions.TryGetValue(sessionId, out var session))
|
||||||
|
{
|
||||||
|
session = new EntityIndexSessionData();
|
||||||
|
_sessions[sessionId] = session;
|
||||||
|
} else
|
||||||
|
{
|
||||||
|
session.LastInteractionAt = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class EntityIndexSessionData
|
||||||
|
{
|
||||||
|
public List<Entity> AccumulatedEntities { get; set; } = [];
|
||||||
|
public DateTime LastInteractionAt { get; set; } = DateTime.UtcNow;
|
||||||
|
}
|
||||||
@@ -2,7 +2,7 @@ using System.Collections.Concurrent;
|
|||||||
|
|
||||||
namespace Server;
|
namespace Server;
|
||||||
|
|
||||||
public class Entity(Dictionary<string, string> attributes, Probmethods.probMethodDelegate probMethod, string probMethodName, ConcurrentBag<Datapoint> datapoints, string name)
|
public class Entity(Dictionary<string, string> attributes, Probmethods.probMethodDelegate probMethod, string probMethodName, ConcurrentBag<Datapoint> datapoints, string name, string searchdomain)
|
||||||
{
|
{
|
||||||
public Dictionary<string, string> attributes = attributes;
|
public Dictionary<string, string> attributes = attributes;
|
||||||
public Probmethods.probMethodDelegate probMethod = probMethod;
|
public Probmethods.probMethodDelegate probMethod = probMethod;
|
||||||
@@ -10,4 +10,5 @@ public class Entity(Dictionary<string, string> attributes, Probmethods.probMetho
|
|||||||
public ConcurrentBag<Datapoint> datapoints = datapoints;
|
public ConcurrentBag<Datapoint> datapoints = datapoints;
|
||||||
public int id;
|
public int id;
|
||||||
public string name = name;
|
public string name = name;
|
||||||
|
public string searchdomain = searchdomain;
|
||||||
}
|
}
|
||||||
@@ -389,7 +389,7 @@ public class SearchdomainHelper(ILogger<SearchdomainHelper> logger, DatabaseHelp
|
|||||||
List<Datapoint> datapoints = await DatabaseInsertDatapointsWithEmbeddings(helper, searchdomain, toBeInsertedDatapoints, id_entity, id_searchdomain);
|
List<Datapoint> datapoints = await DatabaseInsertDatapointsWithEmbeddings(helper, searchdomain, toBeInsertedDatapoints, id_entity, id_searchdomain);
|
||||||
|
|
||||||
var probMethod = Probmethods.GetMethod(jsonEntity.Probmethod) ?? throw new ProbMethodNotFoundException(jsonEntity.Probmethod);
|
var probMethod = Probmethods.GetMethod(jsonEntity.Probmethod) ?? throw new ProbMethodNotFoundException(jsonEntity.Probmethod);
|
||||||
Entity entity = new(jsonEntity.Attributes, probMethod, jsonEntity.Probmethod.ToString(), new(datapoints), jsonEntity.Name)
|
Entity entity = new(jsonEntity.Attributes, probMethod, jsonEntity.Probmethod.ToString(), [.. datapoints], jsonEntity.Name, jsonEntity.Searchdomain)
|
||||||
{
|
{
|
||||||
id = id_entity
|
id = id_entity
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ public class Searchdomain
|
|||||||
Probmethods.probMethodDelegate? probmethod = Probmethods.GetMethod(probmethodString);
|
Probmethods.probMethodDelegate? probmethod = Probmethods.GetMethod(probmethodString);
|
||||||
if (datapoint_unassigned.TryGetValue(id, out ConcurrentBag<Datapoint>? datapoints) && probmethod is not null)
|
if (datapoint_unassigned.TryGetValue(id, out ConcurrentBag<Datapoint>? datapoints) && probmethod is not null)
|
||||||
{
|
{
|
||||||
Entity entity = new(attributes, probmethod, probmethodString, datapoints, name)
|
Entity entity = new(attributes, probmethod, probmethodString, datapoints, name, searchdomain)
|
||||||
{
|
{
|
||||||
id = id
|
id = id
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user