Refactored Indexer models

This commit is contained in:
2025-08-31 00:11:05 +02:00
parent 8a3be8dd90
commit 2a75daee07
16 changed files with 591 additions and 591 deletions

View File

@@ -1,4 +1,3 @@
using Indexer.Models;
using Quartz; using Quartz;
public class ActionJob : IJob public class ActionJob : IJob
{ {

317
src/Indexer/Calls.cs Normal file
View File

@@ -0,0 +1,317 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Indexer.Models;
using Indexer.Exceptions;
using Quartz;
using Quartz.Impl;
public class RunOnceCall : ICall
{
public ILogger _logger;
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public Worker Worker { get; }
public CallConfig CallConfig { get; set; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
public RunOnceCall(Worker worker, ILogger logger, CallConfig callConfig)
{
Worker = worker;
_logger = logger;
CallConfig = callConfig;
IsEnabled = true;
IsExecuting = false;
IndexAsync();
}
public void Start()
{
IndexAsync();
IsEnabled = true;
}
public void Stop()
{
IsEnabled = false;
}
private async void IndexAsync()
{
try
{
DateTime beforeExecution = DateTime.Now;
IsExecuting = true;
try
{
await Task.Run(() => Worker.Scriptable.Update(new RunOnceCallbackInfos()));
}
finally
{
IsExecuting = false;
LastExecution = beforeExecution;
Worker.LastExecution = beforeExecution;
}
DateTime afterExecution = DateTime.Now;
WorkerManager.UpdateCallAndWorkerTimestamps(this, Worker, beforeExecution, afterExecution);
}
catch (Exception ex)
{
_logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", Worker.Name, ex.Message);
}
}
public HealthCheckResult HealthCheck()
{
return HealthCheckResult.Healthy(); // TODO implement proper healthcheck
}
}
public class IntervalCall : ICall
{
public System.Timers.Timer Timer;
public IScriptable Scriptable;
public ILogger _logger;
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public CallConfig CallConfig { get; set; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
public IntervalCall(System.Timers.Timer timer, IScriptable scriptable, ILogger logger, CallConfig callConfig)
{
Timer = timer;
Scriptable = scriptable;
_logger = logger;
CallConfig = callConfig;
IsEnabled = true;
IsExecuting = false;
}
public void Start()
{
Timer.Start();
IsEnabled = true;
}
public void Stop()
{
Scriptable.Stop();
Timer.Stop();
IsEnabled = false;
}
public HealthCheckResult HealthCheck()
{
if (!Scriptable.UpdateInfo.Successful)
{
_logger.LogWarning("HealthCheck revealed: The last execution of \"{name}\" was not successful", Scriptable.ToolSet.FilePath);
return HealthCheckResult.Unhealthy($"HealthCheck revealed: The last execution of \"{Scriptable.ToolSet.FilePath}\" was not successful");
}
double timerInterval = Timer.Interval; // In ms
DateTime lastRunDateTime = Scriptable.UpdateInfo.DateTime;
DateTime now = DateTime.Now;
double millisecondsSinceLastExecution = now.Subtract(lastRunDateTime).TotalMilliseconds;
if (millisecondsSinceLastExecution >= 2 * timerInterval)
{
_logger.LogWarning("HealthCheck revealed: Since the last execution of \"{name}\" more than twice the interval has passed", Scriptable.ToolSet.FilePath);
return HealthCheckResult.Unhealthy($"HealthCheck revealed: Since the last execution of \"{Scriptable.ToolSet.FilePath}\" more than twice the interval has passed");
}
return HealthCheckResult.Healthy();
}
}
public class ScheduleCall : ICall
{
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public Worker Worker { get; }
public JobKey JobKey { get; }
public JobDataMap JobDataMap { get; }
public CallConfig CallConfig { get; set; }
private ILogger _logger { get; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
private StdSchedulerFactory SchedulerFactory { get; }
private IScheduler Scheduler { get; }
public ScheduleCall(Worker worker, CallConfig callConfig, ILogger logger)
{
Worker = worker;
CallConfig = callConfig;
_logger = logger;
IsEnabled = false;
IsExecuting = false;
JobKey = new(worker.Name);
SchedulerFactory = new();
Scheduler = SchedulerFactory.GetScheduler(CancellationToken.None).Result;
JobDataMap = [];
JobDataMap["action"] = () =>
{
try
{
DateTime beforeExecution = DateTime.Now;
IsExecuting = true;
try
{
worker.Scriptable.Update(new ScheduleCallbackInfos());
}
finally
{
IsExecuting = false;
LastExecution = beforeExecution;
worker.LastExecution = beforeExecution;
}
DateTime afterExecution = DateTime.Now;
WorkerManager.UpdateCallAndWorkerTimestamps(this, worker, beforeExecution, afterExecution);
}
catch (Exception ex)
{
_logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", worker.Name, ex.Message);
}
};
CreateJob().Wait();
Start();
}
public void Start()
{
if (!IsEnabled)
{
Scheduler.Start(CancellationToken.None).Wait();
IsEnabled = true;
}
}
public void Stop()
{
Scheduler.PauseAll();
IsEnabled = false;
}
private async Task CreateJob()
{
if (CallConfig.Schedule is null)
{
throw new IndexerConfigurationException($"Interval not set for a Call in Worker \"{Worker.Name}\"");
}
try
{
await Scheduler.ScheduleJob(
JobBuilder.Create<ActionJob>()
.WithIdentity(JobKey)
.Build(),
TriggerBuilder.Create()
.ForJob(JobKey)
.WithIdentity(Worker.Name + "-trigger")
.UsingJobData(JobDataMap)
.WithCronSchedule(CallConfig.Schedule)
.Build(),
CancellationToken.None);
}
catch (FormatException)
{
throw new IndexerConfigurationException($"Quartz Cron expression invalid in Worker \"{Worker.Name}\" - Quartz syntax differs from classic cron");
}
}
public HealthCheckResult HealthCheck()
{
return HealthCheckResult.Unhealthy(); // Not implemented yet
}
}
public class FileUpdateCall : ICall
{
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public Worker Worker { get; }
public CallConfig CallConfig { get; set; }
private ILogger _logger { get; }
private FileSystemWatcher _watcher { get; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
public FileUpdateCall(Worker worker, CallConfig callConfig, ILogger logger)
{
Worker = worker;
CallConfig = callConfig;
_logger = logger;
IsEnabled = true;
IsExecuting = false;
if (CallConfig.Path is null)
{
throw new IndexerConfigurationException($"Path not set for a Call in Worker \"{Worker.Name}\"");
}
_watcher = new FileSystemWatcher(CallConfig.Path);
_watcher.Changed += OnFileChanged;
_watcher.Deleted += OnFileChanged;
_watcher.EnableRaisingEvents = true;
}
public void Start()
{
if (!IsEnabled)
{
IsEnabled = true;
_watcher.EnableRaisingEvents = true;
Index();
}
}
public void Stop()
{
if (IsEnabled)
{
IsEnabled = false;
_watcher.EnableRaisingEvents = false;
}
}
private void OnFileChanged(object sender, FileSystemEventArgs e)
{
if (!IsEnabled)
{
return;
}
Index(sender, e);
}
private void Index(object? sender, FileSystemEventArgs? e)
{
try
{
DateTime beforeExecution = DateTime.Now;
IsExecuting = true;
try
{
Worker.Scriptable.Update(new FileUpdateCallbackInfos() {sender = sender, e = e});
}
finally
{
IsExecuting = false;
LastExecution = beforeExecution;
Worker.LastExecution = beforeExecution;
}
DateTime afterExecution = DateTime.Now;
WorkerManager.UpdateCallAndWorkerTimestamps(this, Worker, beforeExecution, afterExecution);
}
catch (Exception ex)
{
_logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", Worker.Name, ex.Message);
}
}
private void Index()
{
Index(null, null);
}
public HealthCheckResult HealthCheck()
{
return HealthCheckResult.Unhealthy(); // Not implemented yet
}
}

View File

@@ -11,9 +11,9 @@ public class CallsController : ControllerBase
private readonly ILogger<WorkerController> _logger; private readonly ILogger<WorkerController> _logger;
private readonly IConfiguration _config; private readonly IConfiguration _config;
private readonly IConfigurationRoot _configurationRoot; private readonly IConfigurationRoot _configurationRoot;
private readonly WorkerCollection _workerCollection; private readonly WorkerManager _workerCollection;
public CallsController(ILogger<WorkerController> logger, IConfiguration config, IConfigurationRoot configurationRoot, WorkerCollection workerCollection) public CallsController(ILogger<WorkerController> logger, IConfiguration config, IConfigurationRoot configurationRoot, WorkerManager workerCollection)
{ {
_logger = logger; _logger = logger;
_config = config; _config = config;

View File

@@ -11,9 +11,9 @@ public class WorkerController : ControllerBase
private readonly ILogger<WorkerController> _logger; private readonly ILogger<WorkerController> _logger;
private readonly IConfiguration _config; private readonly IConfiguration _config;
private readonly IConfigurationRoot _configurationRoot; private readonly IConfigurationRoot _configurationRoot;
private readonly WorkerCollection _workerCollection; private readonly WorkerManager _workerCollection;
public WorkerController(ILogger<WorkerController> logger, IConfiguration config, IConfigurationRoot configurationRoot, WorkerCollection workerCollection) public WorkerController(ILogger<WorkerController> logger, IConfiguration config, IConfigurationRoot configurationRoot, WorkerManager workerCollection)
{ {
_logger = logger; _logger = logger;
_config = config; _config = config;
@@ -86,7 +86,7 @@ public class WorkerController : ControllerBase
worker.LastExecution = beforeExecution; worker.LastExecution = beforeExecution;
} }
DateTime afterExecution = DateTime.Now; DateTime afterExecution = DateTime.Now;
WorkerCollection.UpdateWorkerTimestamps(worker, beforeExecution, afterExecution); WorkerManager.UpdateWorkerTimestamps(worker, beforeExecution, afterExecution);
} }
_logger.LogInformation("triggered worker {name}.", [name]); _logger.LogInformation("triggered worker {name}.", [name]);
return new WorkerTriggerUpdateResult { Success = true }; return new WorkerTriggerUpdateResult { Success = true };

View File

@@ -6,8 +6,8 @@ namespace Indexer;
public class WorkerHealthCheck : IHealthCheck public class WorkerHealthCheck : IHealthCheck
{ {
private readonly WorkerCollection _workerCollection; private readonly WorkerManager _workerCollection;
public WorkerHealthCheck(WorkerCollection workerCollection) public WorkerHealthCheck(WorkerManager workerCollection)
{ {
_workerCollection = workerCollection; _workerCollection = workerCollection;
} }

View File

@@ -0,0 +1,20 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;
public class CallConfig
{
public required string Type { get; set; }
public long? Interval { get; set; } // For Type: Interval
public string? Path { get; set; } // For Type: FileSystemWatcher
public string? Schedule { get; set; } // For Type: Schedule
}
public interface ICall
{
public HealthCheckResult HealthCheck();
public void Start();
public void Stop();
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public CallConfig CallConfig { get; set; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
}

View File

@@ -2,6 +2,13 @@ using System.Timers;
namespace Indexer.Models; namespace Indexer.Models;
public interface IScript
{
int Init(ScriptToolSet toolSet);
int Update(ICallbackInfos callbackInfos);
int Stop();
}
public class ScriptToolSet public class ScriptToolSet
{ {
public string FilePath; public string FilePath;
@@ -21,6 +28,8 @@ public class ScriptToolSet
} }
} }
public interface ICallbackInfos { }
public class RunOnceCallbackInfos : ICallbackInfos {} public class RunOnceCallbackInfos : ICallbackInfos {}
public class IntervalCallbackInfos : ICallbackInfos public class IntervalCallbackInfos : ICallbackInfos

View File

@@ -1,12 +1,5 @@
namespace Indexer.Models; namespace Indexer.Models;
public interface IScript
{
int Init(ScriptToolSet toolSet);
int Update(ICallbackInfos callbackInfos);
int Stop();
}
public interface IScriptable public interface IScriptable
{ {
ScriptToolSet ToolSet { get; set; } ScriptToolSet ToolSet { get; set; }
@@ -17,7 +10,4 @@ public interface IScriptable
int Stop(); int Stop();
abstract static bool IsScript(string filePath); abstract static bool IsScript(string filePath);
} }
public interface ICallbackInfos { }

View File

@@ -1,566 +0,0 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Indexer.Scriptables;
using Indexer.Exceptions;
using Quartz;
using Quartz.Impl;
namespace Indexer.Models;
public class WorkerCollection
{
public Dictionary<string, Worker> Workers;
public List<Type> types;
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
private readonly Client.Client client;
public WorkerCollection(ILogger<WorkerCollection> logger, IConfiguration configuration, Client.Client client)
{
Workers = [];
types = [typeof(PythonScriptable), typeof(CSharpScriptable)];
_logger = logger;
_configuration = configuration;
this.client = client;
}
public void InitializeWorkers()
{
_logger.LogInformation("Initializing workers");
// Load and configure all workers
var sectionMain = _configuration.GetSection("EmbeddingsearchIndexer");
if (!sectionMain.Exists())
{
_logger.LogCritical("Unable to load section \"EmbeddingsearchIndexer\"");
throw new IndexerConfigurationException("Unable to load section \"EmbeddingsearchIndexer\"");
}
WorkerCollectionConfig? sectionWorker = (WorkerCollectionConfig?)sectionMain.Get(typeof(WorkerCollectionConfig)); //GetValue<WorkerCollectionConfig>("Worker");
if (sectionWorker is not null)
{
foreach (WorkerConfig workerConfig in sectionWorker.Worker)
{
ScriptToolSet toolSet = new(workerConfig.Script, client, _logger, _configuration, workerConfig.Name);
InitializeWorker(toolSet, workerConfig);
}
}
else
{
_logger.LogCritical("Unable to load section \"Worker\"");
throw new IndexerConfigurationException("Unable to load section \"Worker\"");
}
_logger.LogInformation("Initialized workers");
}
public void InitializeWorker(ScriptToolSet toolSet, WorkerConfig workerConfig)
{
_logger.LogInformation("Initializing worker: {Name}", workerConfig.Name);
Worker worker = new(workerConfig.Name, workerConfig, GetScriptable(toolSet));
Workers[workerConfig.Name] = worker;
foreach (CallConfig callConfig in workerConfig.Calls)
{
_logger.LogInformation("Initializing call of type: {Type}", callConfig.Type);
switch (callConfig.Type)
{
case "runonce":
RunOnceCall runOnceCall = new(worker, _logger, callConfig);
worker.Calls.Add(runOnceCall);
break;
case "interval":
if (callConfig.Interval is null)
{
_logger.LogError("Interval not set for a Call in Worker \"{Name}\"", workerConfig.Name);
throw new IndexerConfigurationException($"Interval not set for a Call in Worker \"{workerConfig.Name}\"");
}
var timer = new System.Timers.Timer((double)callConfig.Interval);
timer.AutoReset = true;
timer.Enabled = true;
DateTime now = DateTime.Now;
IntervalCall intervallCall = new(timer, worker.Scriptable, _logger, callConfig)
{
LastExecution = now,
LastSuccessfulExecution = now
};
timer.Elapsed += (sender, e) =>
{
try
{
DateTime beforeExecution = DateTime.Now;
intervallCall.IsExecuting = true;
try
{
worker.Scriptable.Update(new IntervalCallbackInfos() { sender = sender, e = e });
}
finally
{
intervallCall.IsExecuting = false;
intervallCall.LastExecution = beforeExecution;
worker.LastExecution = beforeExecution;
}
DateTime afterExecution = DateTime.Now;
UpdateCallAndWorkerTimestamps(intervallCall, worker, beforeExecution, afterExecution);
}
catch (Exception ex)
{
_logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", worker.Name, ex.Message);
}
};
worker.Calls.Add(intervallCall);
break;
case "schedule": // TODO implement scheduled tasks using Quartz
ScheduleCall scheduleCall = new(worker, callConfig, _logger);
worker.Calls.Add(scheduleCall);
break;
case "fileupdate":
if (callConfig.Path is null)
{
_logger.LogError("Path not set for a Call in Worker \"{Name}\"", workerConfig.Name);
throw new IndexerConfigurationException($"Path not set for a Call in Worker \"{workerConfig.Name}\"");
}
FileUpdateCall fileUpdateCall = new(worker, callConfig, _logger);
worker.Calls.Add(fileUpdateCall);
break;
default:
throw new IndexerConfigurationException($"Unknown Type specified for a Call in Worker \"{workerConfig.Name}\"");
}
}
}
public static void UpdateCallAndWorkerTimestamps(ICall call, Worker worker, DateTime beforeExecution, DateTime afterExecution)
{
UpdateCallTimestamps(call, beforeExecution, afterExecution);
UpdateWorkerTimestamps(worker, beforeExecution, afterExecution);
}
public static void UpdateCallTimestamps(ICall call, DateTime beforeExecution, DateTime afterExecution)
{
call.LastSuccessfulExecution = GetNewestDateTime(call.LastSuccessfulExecution, afterExecution);
}
public static void UpdateWorkerTimestamps(Worker worker, DateTime beforeExecution, DateTime afterExecution)
{
worker.LastSuccessfulExecution = GetNewestDateTime(worker.LastSuccessfulExecution, afterExecution);
}
public static DateTime? GetNewestDateTime(DateTime? preexistingDateTime, DateTime incomingDateTime)
{
if (preexistingDateTime is null || preexistingDateTime.Value.CompareTo(incomingDateTime) < 0)
{
return incomingDateTime;
}
return preexistingDateTime;
}
public IScriptable GetScriptable(ScriptToolSet toolSet)
{
string fileName = toolSet.FilePath ?? throw new IndexerConfigurationException($"\"Script\" not set for Worker \"{toolSet.Name}\"");
foreach (Type type in types)
{
System.Reflection.MethodInfo? method = type.GetMethod("IsScript");
bool? isInstance = method is not null ? (bool?)method.Invoke(null, [fileName]) : null;
if (isInstance == true)
{
IScriptable? instance = (IScriptable?)Activator.CreateInstance(type, [toolSet, _logger]);
if (instance is null)
{
_logger.LogError("Unable to initialize script: \"{fileName}\"", fileName);
throw new Exception($"Unable to initialize script: \"{fileName}\"");
}
return instance;
}
}
_logger.LogError("Unable to determine the script's language: \"{fileName}\"", fileName);
throw new UnknownScriptLanguageException(fileName);
}
}
public class Worker
{
public string Name { get; set; }
public WorkerConfig Config { get; set; }
public IScriptable Scriptable { get; set; }
public List<ICall> Calls { get; set; }
public bool IsExecuting { get; set; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
public Worker(string name, WorkerConfig workerConfig, IScriptable scriptable)
{
Name = name;
Config = workerConfig;
Scriptable = scriptable;
IsExecuting = false;
Calls = [];
}
public HealthCheckResult HealthCheck()
{
bool hasDegraded = false;
bool hasUnhealthy = false;
foreach (ICall call in Calls)
{
HealthCheckResult callHealth = call.HealthCheck();
if (callHealth.Status != HealthStatus.Healthy)
{
hasDegraded |= callHealth.Status == HealthStatus.Degraded;
hasUnhealthy |= callHealth.Status == HealthStatus.Unhealthy;
}
}
if (hasUnhealthy)
{
return HealthCheckResult.Unhealthy(); // TODO: Retrieve and forward the error message for each call
}
else if (hasDegraded)
{
return HealthCheckResult.Degraded();
}
return HealthCheckResult.Healthy();
}
}
public class WorkerCollectionConfig
{
public required List<WorkerConfig> Worker { get; set; }
}
public class WorkerConfig
{
public required string Name { get; set; }
public required List<string> Searchdomains { get; set; }
public required string Script { get; set; }
public required List<CallConfig> Calls { get; set; }
}
public class CallConfig
{
public required string Type { get; set; }
public long? Interval { get; set; } // For Type: Interval
public string? Path { get; set; } // For Type: FileSystemWatcher
public string? Schedule { get; set; } // For Type: Schedule
}
public interface ICall
{
public HealthCheckResult HealthCheck();
public void Start();
public void Stop();
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public CallConfig CallConfig { get; set; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
}
public class RunOnceCall : ICall
{
public ILogger _logger;
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public Worker Worker { get; }
public CallConfig CallConfig { get; set; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
public RunOnceCall(Worker worker, ILogger logger, CallConfig callConfig)
{
Worker = worker;
_logger = logger;
CallConfig = callConfig;
IsEnabled = true;
IsExecuting = false;
IndexAsync();
}
public void Start()
{
IndexAsync();
IsEnabled = true;
}
public void Stop()
{
IsEnabled = false;
}
private async void IndexAsync()
{
try
{
DateTime beforeExecution = DateTime.Now;
IsExecuting = true;
try
{
await Task.Run(() => Worker.Scriptable.Update(new RunOnceCallbackInfos()));
}
finally
{
IsExecuting = false;
LastExecution = beforeExecution;
Worker.LastExecution = beforeExecution;
}
DateTime afterExecution = DateTime.Now;
WorkerCollection.UpdateCallAndWorkerTimestamps(this, Worker, beforeExecution, afterExecution);
}
catch (Exception ex)
{
_logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", Worker.Name, ex.Message);
}
}
public HealthCheckResult HealthCheck()
{
return HealthCheckResult.Healthy(); // TODO implement proper healthcheck
}
}
public class IntervalCall : ICall
{
public System.Timers.Timer Timer;
public IScriptable Scriptable;
public ILogger _logger;
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public CallConfig CallConfig { get; set; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
public IntervalCall(System.Timers.Timer timer, IScriptable scriptable, ILogger logger, CallConfig callConfig)
{
Timer = timer;
Scriptable = scriptable;
_logger = logger;
CallConfig = callConfig;
IsEnabled = true;
IsExecuting = false;
}
public void Start()
{
Timer.Start();
IsEnabled = true;
}
public void Stop()
{
Scriptable.Stop();
Timer.Stop();
IsEnabled = false;
}
public HealthCheckResult HealthCheck()
{
if (!Scriptable.UpdateInfo.Successful)
{
_logger.LogWarning("HealthCheck revealed: The last execution of \"{name}\" was not successful", Scriptable.ToolSet.FilePath);
return HealthCheckResult.Unhealthy($"HealthCheck revealed: The last execution of \"{Scriptable.ToolSet.FilePath}\" was not successful");
}
double timerInterval = Timer.Interval; // In ms
DateTime lastRunDateTime = Scriptable.UpdateInfo.DateTime;
DateTime now = DateTime.Now;
double millisecondsSinceLastExecution = now.Subtract(lastRunDateTime).TotalMilliseconds;
if (millisecondsSinceLastExecution >= 2 * timerInterval)
{
_logger.LogWarning("HealthCheck revealed: Since the last execution of \"{name}\" more than twice the interval has passed", Scriptable.ToolSet.FilePath);
return HealthCheckResult.Unhealthy($"HealthCheck revealed: Since the last execution of \"{Scriptable.ToolSet.FilePath}\" more than twice the interval has passed");
}
return HealthCheckResult.Healthy();
}
}
public class ScheduleCall : ICall
{
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public Worker Worker { get; }
public JobKey JobKey { get; }
public JobDataMap JobDataMap { get; }
public CallConfig CallConfig { get; set; }
private ILogger _logger { get; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
private StdSchedulerFactory SchedulerFactory { get; }
private IScheduler Scheduler { get; }
public ScheduleCall(Worker worker, CallConfig callConfig, ILogger logger)
{
Worker = worker;
CallConfig = callConfig;
_logger = logger;
IsEnabled = false;
IsExecuting = false;
JobKey = new(worker.Name);
SchedulerFactory = new();
Scheduler = SchedulerFactory.GetScheduler(CancellationToken.None).Result;
JobDataMap = [];
JobDataMap["action"] = () =>
{
try
{
DateTime beforeExecution = DateTime.Now;
IsExecuting = true;
try
{
worker.Scriptable.Update(new ScheduleCallbackInfos());
}
finally
{
IsExecuting = false;
LastExecution = beforeExecution;
worker.LastExecution = beforeExecution;
}
DateTime afterExecution = DateTime.Now;
WorkerCollection.UpdateCallAndWorkerTimestamps(this, worker, beforeExecution, afterExecution);
}
catch (Exception ex)
{
_logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", worker.Name, ex.Message);
}
};
CreateJob().Wait();
Start();
}
public void Start()
{
if (!IsEnabled)
{
Scheduler.Start(CancellationToken.None).Wait();
IsEnabled = true;
}
}
public void Stop()
{
Scheduler.PauseAll();
IsEnabled = false;
}
private async Task CreateJob()
{
if (CallConfig.Schedule is null)
{
throw new IndexerConfigurationException($"Interval not set for a Call in Worker \"{Worker.Name}\"");
}
try
{
await Scheduler.ScheduleJob(
JobBuilder.Create<ActionJob>()
.WithIdentity(JobKey)
.Build(),
TriggerBuilder.Create()
.ForJob(JobKey)
.WithIdentity(Worker.Name + "-trigger")
.UsingJobData(JobDataMap)
.WithCronSchedule(CallConfig.Schedule)
.Build(),
CancellationToken.None);
}
catch (FormatException)
{
throw new IndexerConfigurationException($"Quartz Cron expression invalid in Worker \"{Worker.Name}\" - Quartz syntax differs from classic cron");
}
}
public HealthCheckResult HealthCheck()
{
return HealthCheckResult.Unhealthy(); // Not implemented yet
}
}
public class FileUpdateCall : ICall
{
public bool IsEnabled { get; set; }
public bool IsExecuting { get; set; }
public Worker Worker { get; }
public CallConfig CallConfig { get; set; }
private ILogger _logger { get; }
private FileSystemWatcher _watcher { get; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
public FileUpdateCall(Worker worker, CallConfig callConfig, ILogger logger)
{
Worker = worker;
CallConfig = callConfig;
_logger = logger;
IsEnabled = true;
IsExecuting = false;
if (CallConfig.Path is null)
{
throw new IndexerConfigurationException($"Path not set for a Call in Worker \"{Worker.Name}\"");
}
_watcher = new FileSystemWatcher(CallConfig.Path);
_watcher.Changed += OnFileChanged;
_watcher.Deleted += OnFileChanged;
_watcher.EnableRaisingEvents = true;
}
public void Start()
{
if (!IsEnabled)
{
IsEnabled = true;
_watcher.EnableRaisingEvents = true;
Index();
}
}
public void Stop()
{
if (IsEnabled)
{
IsEnabled = false;
_watcher.EnableRaisingEvents = false;
}
}
private void OnFileChanged(object sender, FileSystemEventArgs e)
{
if (!IsEnabled)
{
return;
}
Index(sender, e);
}
private void Index(object? sender, FileSystemEventArgs? e)
{
try
{
DateTime beforeExecution = DateTime.Now;
IsExecuting = true;
try
{
Worker.Scriptable.Update(new FileUpdateCallbackInfos() {sender = sender, e = e});
}
finally
{
IsExecuting = false;
LastExecution = beforeExecution;
Worker.LastExecution = beforeExecution;
}
DateTime afterExecution = DateTime.Now;
WorkerCollection.UpdateCallAndWorkerTimestamps(this, Worker, beforeExecution, afterExecution);
}
catch (Exception ex)
{
_logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", Worker.Name, ex.Message);
}
}
private void Index()
{
Index(null, null);
}
public HealthCheckResult HealthCheck()
{
return HealthCheckResult.Unhealthy(); // Not implemented yet
}
}

View File

@@ -0,0 +1,14 @@
namespace Indexer.Models;
public class WorkerCollectionConfig
{
public required List<WorkerConfig> Worker { get; set; }
}
public class WorkerConfig
{
public required string Name { get; set; }
public required List<string> Searchdomains { get; set; }
public required string Script { get; set; }
public required List<CallConfig> Calls { get; set; }
}

View File

@@ -22,7 +22,7 @@ builder.Logging.AddSerilog();
builder.Services.AddHttpContextAccessor(); builder.Services.AddHttpContextAccessor();
builder.Services.AddSingleton<IConfigurationRoot>(builder.Configuration); builder.Services.AddSingleton<IConfigurationRoot>(builder.Configuration);
builder.Services.AddSingleton<Client.Client>(); builder.Services.AddSingleton<Client.Client>();
builder.Services.AddSingleton<WorkerCollection>(); builder.Services.AddSingleton<WorkerManager>();
builder.Services.AddHostedService<IndexerService>(); builder.Services.AddHostedService<IndexerService>();
builder.Services.AddHealthChecks() builder.Services.AddHealthChecks()
.AddCheck<WorkerHealthCheck>("WorkerHealthCheck"); .AddCheck<WorkerHealthCheck>("WorkerHealthCheck");

View File

@@ -1,7 +1,6 @@
#load "../../Client/Client.cs" #load "../../Client/Client.cs"
#load "../Models/Script.cs" #load "../Models/ScriptModels.cs"
#load "../Models/Interfaces.cs" #load "../Models/WorkerResultModels.cs"
#load "../Models/WorkerResults.cs"
#load "../../Shared/Models/SearchdomainResults.cs" #load "../../Shared/Models/SearchdomainResults.cs"
#load "../../Shared/Models/JSONModels.cs" #load "../../Shared/Models/JSONModels.cs"
#load "../../Shared/Models/EntityResults.cs" #load "../../Shared/Models/EntityResults.cs"

View File

@@ -4,10 +4,10 @@ namespace Indexer.Services;
public class IndexerService : IHostedService public class IndexerService : IHostedService
{ {
public WorkerCollection workerCollection; public WorkerManager workerCollection;
public ILogger<IndexerService> _logger; public ILogger<IndexerService> _logger;
public IndexerService(WorkerCollection workerCollection, Client.Client client, ILogger<IndexerService> logger) public IndexerService(WorkerManager workerCollection, Client.Client client, ILogger<IndexerService> logger)
{ {
this.workerCollection = workerCollection; this.workerCollection = workerCollection;
_logger = logger; _logger = logger;

46
src/Indexer/Worker.cs Normal file
View File

@@ -0,0 +1,46 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Indexer.Models;
public class Worker
{
public string Name { get; set; }
public WorkerConfig Config { get; set; }
public IScriptable Scriptable { get; set; }
public List<ICall> Calls { get; set; }
public bool IsExecuting { get; set; }
public DateTime? LastExecution { get; set; }
public DateTime? LastSuccessfulExecution { get; set; }
public Worker(string name, WorkerConfig workerConfig, IScriptable scriptable)
{
Name = name;
Config = workerConfig;
Scriptable = scriptable;
IsExecuting = false;
Calls = [];
}
public HealthCheckResult HealthCheck()
{
bool hasDegraded = false;
bool hasUnhealthy = false;
foreach (ICall call in Calls)
{
HealthCheckResult callHealth = call.HealthCheck();
if (callHealth.Status != HealthStatus.Healthy)
{
hasDegraded |= callHealth.Status == HealthStatus.Degraded;
hasUnhealthy |= callHealth.Status == HealthStatus.Unhealthy;
}
}
if (hasUnhealthy)
{
return HealthCheckResult.Unhealthy(); // TODO: Retrieve and forward the error message for each call
}
else if (hasDegraded)
{
return HealthCheckResult.Degraded();
}
return HealthCheckResult.Healthy();
}
}

View File

@@ -0,0 +1,172 @@
using Indexer.Scriptables;
using Indexer.Exceptions;
using Indexer.Models;
public class WorkerManager
{
public Dictionary<string, Worker> Workers;
public List<Type> types;
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
private readonly Client.Client client;
public WorkerManager(ILogger<WorkerManager> logger, IConfiguration configuration, Client.Client client)
{
Workers = [];
types = [typeof(PythonScriptable), typeof(CSharpScriptable)];
_logger = logger;
_configuration = configuration;
this.client = client;
}
public void InitializeWorkers()
{
_logger.LogInformation("Initializing workers");
// Load and configure all workers
var sectionMain = _configuration.GetSection("EmbeddingsearchIndexer");
if (!sectionMain.Exists())
{
_logger.LogCritical("Unable to load section \"EmbeddingsearchIndexer\"");
throw new IndexerConfigurationException("Unable to load section \"EmbeddingsearchIndexer\"");
}
WorkerCollectionConfig? sectionWorker = (WorkerCollectionConfig?)sectionMain.Get(typeof(WorkerCollectionConfig)); //GetValue<WorkerCollectionConfig>("Worker");
if (sectionWorker is not null)
{
foreach (WorkerConfig workerConfig in sectionWorker.Worker)
{
ScriptToolSet toolSet = new(workerConfig.Script, client, _logger, _configuration, workerConfig.Name);
InitializeWorker(toolSet, workerConfig);
}
}
else
{
_logger.LogCritical("Unable to load section \"Worker\"");
throw new IndexerConfigurationException("Unable to load section \"Worker\"");
}
_logger.LogInformation("Initialized workers");
}
public void InitializeWorker(ScriptToolSet toolSet, WorkerConfig workerConfig)
{
_logger.LogInformation("Initializing worker: {Name}", workerConfig.Name);
Worker worker = new(workerConfig.Name, workerConfig, GetScriptable(toolSet));
Workers[workerConfig.Name] = worker;
foreach (CallConfig callConfig in workerConfig.Calls)
{
_logger.LogInformation("Initializing call of type: {Type}", callConfig.Type);
switch (callConfig.Type)
{
case "runonce":
RunOnceCall runOnceCall = new(worker, _logger, callConfig);
worker.Calls.Add(runOnceCall);
break;
case "interval":
if (callConfig.Interval is null)
{
_logger.LogError("Interval not set for a Call in Worker \"{Name}\"", workerConfig.Name);
throw new IndexerConfigurationException($"Interval not set for a Call in Worker \"{workerConfig.Name}\"");
}
var timer = new System.Timers.Timer((double)callConfig.Interval);
timer.AutoReset = true;
timer.Enabled = true;
DateTime now = DateTime.Now;
IntervalCall intervallCall = new(timer, worker.Scriptable, _logger, callConfig)
{
LastExecution = now,
LastSuccessfulExecution = now
};
timer.Elapsed += (sender, e) =>
{
try
{
DateTime beforeExecution = DateTime.Now;
intervallCall.IsExecuting = true;
try
{
worker.Scriptable.Update(new IntervalCallbackInfos() { sender = sender, e = e });
}
finally
{
intervallCall.IsExecuting = false;
intervallCall.LastExecution = beforeExecution;
worker.LastExecution = beforeExecution;
}
DateTime afterExecution = DateTime.Now;
UpdateCallAndWorkerTimestamps(intervallCall, worker, beforeExecution, afterExecution);
}
catch (Exception ex)
{
_logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", worker.Name, ex.Message);
}
};
worker.Calls.Add(intervallCall);
break;
case "schedule": // TODO implement scheduled tasks using Quartz
ScheduleCall scheduleCall = new(worker, callConfig, _logger);
worker.Calls.Add(scheduleCall);
break;
case "fileupdate":
if (callConfig.Path is null)
{
_logger.LogError("Path not set for a Call in Worker \"{Name}\"", workerConfig.Name);
throw new IndexerConfigurationException($"Path not set for a Call in Worker \"{workerConfig.Name}\"");
}
FileUpdateCall fileUpdateCall = new(worker, callConfig, _logger);
worker.Calls.Add(fileUpdateCall);
break;
default:
throw new IndexerConfigurationException($"Unknown Type specified for a Call in Worker \"{workerConfig.Name}\"");
}
}
}
public static void UpdateCallAndWorkerTimestamps(ICall call, Worker worker, DateTime beforeExecution, DateTime afterExecution)
{
UpdateCallTimestamps(call, beforeExecution, afterExecution);
UpdateWorkerTimestamps(worker, beforeExecution, afterExecution);
}
public static void UpdateCallTimestamps(ICall call, DateTime beforeExecution, DateTime afterExecution)
{
call.LastSuccessfulExecution = GetNewestDateTime(call.LastSuccessfulExecution, afterExecution);
}
public static void UpdateWorkerTimestamps(Worker worker, DateTime beforeExecution, DateTime afterExecution)
{
worker.LastSuccessfulExecution = GetNewestDateTime(worker.LastSuccessfulExecution, afterExecution);
}
public static DateTime? GetNewestDateTime(DateTime? preexistingDateTime, DateTime incomingDateTime)
{
if (preexistingDateTime is null || preexistingDateTime.Value.CompareTo(incomingDateTime) < 0)
{
return incomingDateTime;
}
return preexistingDateTime;
}
public IScriptable GetScriptable(ScriptToolSet toolSet)
{
string fileName = toolSet.FilePath ?? throw new IndexerConfigurationException($"\"Script\" not set for Worker \"{toolSet.Name}\"");
foreach (Type type in types)
{
System.Reflection.MethodInfo? method = type.GetMethod("IsScript");
bool? isInstance = method is not null ? (bool?)method.Invoke(null, [fileName]) : null;
if (isInstance == true)
{
IScriptable? instance = (IScriptable?)Activator.CreateInstance(type, [toolSet, _logger]);
if (instance is null)
{
_logger.LogError("Unable to initialize script: \"{fileName}\"", fileName);
throw new Exception($"Unable to initialize script: \"{fileName}\"");
}
return instance;
}
}
_logger.LogError("Unable to determine the script's language: \"{fileName}\"", fileName);
throw new UnknownScriptLanguageException(fileName);
}
}