From 2a75daee07ec2052a08b1810addf0283ea634607 Mon Sep 17 00:00:00 2001 From: LD-Reborn Date: Sun, 31 Aug 2025 00:11:05 +0200 Subject: [PATCH] Refactored Indexer models --- src/Indexer/{Models => }/ActionJob.cs | 1 - src/Indexer/Calls.cs | 317 ++++++++++ src/Indexer/Controllers/CallsController.cs | 4 +- src/Indexer/Controllers/WorkerController.cs | 6 +- src/Indexer/IndexerHealthChecks.cs | 4 +- src/Indexer/Models/CallModels.cs | 20 + .../Models/{Script.cs => ScriptModels.cs} | 9 + .../{Interfaces.cs => ScriptableModels.cs} | 12 +- src/Indexer/Models/Worker.cs | 566 ------------------ src/Indexer/Models/WorkerModels.cs | 14 + ...WorkerResults.cs => WorkerResultModels.cs} | 0 src/Indexer/Program.cs | 2 +- src/Indexer/Scripts/example.csx | 5 +- src/Indexer/Services/IndexerService.cs | 4 +- src/Indexer/Worker.cs | 46 ++ src/Indexer/WorkerManager.cs | 172 ++++++ 16 files changed, 591 insertions(+), 591 deletions(-) rename src/Indexer/{Models => }/ActionJob.cs (91%) create mode 100644 src/Indexer/Calls.cs create mode 100644 src/Indexer/Models/CallModels.cs rename src/Indexer/Models/{Script.cs => ScriptModels.cs} (87%) rename src/Indexer/Models/{Interfaces.cs => ScriptableModels.cs} (64%) delete mode 100644 src/Indexer/Models/Worker.cs create mode 100644 src/Indexer/Models/WorkerModels.cs rename src/Indexer/Models/{WorkerResults.cs => WorkerResultModels.cs} (100%) create mode 100644 src/Indexer/Worker.cs create mode 100644 src/Indexer/WorkerManager.cs diff --git a/src/Indexer/Models/ActionJob.cs b/src/Indexer/ActionJob.cs similarity index 91% rename from src/Indexer/Models/ActionJob.cs rename to src/Indexer/ActionJob.cs index 68342a1..e8828e0 100644 --- a/src/Indexer/Models/ActionJob.cs +++ b/src/Indexer/ActionJob.cs @@ -1,4 +1,3 @@ -using Indexer.Models; using Quartz; public class ActionJob : IJob { diff --git a/src/Indexer/Calls.cs b/src/Indexer/Calls.cs new file mode 100644 index 0000000..76c9a4c --- /dev/null +++ b/src/Indexer/Calls.cs @@ -0,0 +1,317 @@ +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Indexer.Models; +using Indexer.Exceptions; +using Quartz; +using Quartz.Impl; +public class RunOnceCall : ICall +{ + public ILogger _logger; + public bool IsEnabled { get; set; } + public bool IsExecuting { get; set; } + public Worker Worker { get; } + public CallConfig CallConfig { get; set; } + public DateTime? LastExecution { get; set; } + public DateTime? LastSuccessfulExecution { get; set; } + + public RunOnceCall(Worker worker, ILogger logger, CallConfig callConfig) + { + Worker = worker; + _logger = logger; + CallConfig = callConfig; + IsEnabled = true; + IsExecuting = false; + IndexAsync(); + } + + public void Start() + { + IndexAsync(); + IsEnabled = true; + } + + public void Stop() + { + IsEnabled = false; + } + + private async void IndexAsync() + { + try + { + DateTime beforeExecution = DateTime.Now; + IsExecuting = true; + try + { + await Task.Run(() => Worker.Scriptable.Update(new RunOnceCallbackInfos())); + } + finally + { + IsExecuting = false; + LastExecution = beforeExecution; + Worker.LastExecution = beforeExecution; + } + DateTime afterExecution = DateTime.Now; + WorkerManager.UpdateCallAndWorkerTimestamps(this, Worker, beforeExecution, afterExecution); + } + catch (Exception ex) + { + _logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", Worker.Name, ex.Message); + } + } + + public HealthCheckResult HealthCheck() + { + return HealthCheckResult.Healthy(); // TODO implement proper healthcheck + } + +} + +public class IntervalCall : ICall +{ + public System.Timers.Timer Timer; + public IScriptable Scriptable; + public ILogger _logger; + public bool IsEnabled { get; set; } + public bool IsExecuting { get; set; } + public CallConfig CallConfig { get; set; } + public DateTime? LastExecution { get; set; } + public DateTime? LastSuccessfulExecution { get; set; } + + public IntervalCall(System.Timers.Timer timer, IScriptable scriptable, ILogger logger, CallConfig callConfig) + { + Timer = timer; + Scriptable = scriptable; + _logger = logger; + CallConfig = callConfig; + IsEnabled = true; + IsExecuting = false; + } + + public void Start() + { + Timer.Start(); + IsEnabled = true; + } + + public void Stop() + { + Scriptable.Stop(); + Timer.Stop(); + IsEnabled = false; + } + + public HealthCheckResult HealthCheck() + { + if (!Scriptable.UpdateInfo.Successful) + { + _logger.LogWarning("HealthCheck revealed: The last execution of \"{name}\" was not successful", Scriptable.ToolSet.FilePath); + return HealthCheckResult.Unhealthy($"HealthCheck revealed: The last execution of \"{Scriptable.ToolSet.FilePath}\" was not successful"); + } + double timerInterval = Timer.Interval; // In ms + DateTime lastRunDateTime = Scriptable.UpdateInfo.DateTime; + DateTime now = DateTime.Now; + double millisecondsSinceLastExecution = now.Subtract(lastRunDateTime).TotalMilliseconds; + if (millisecondsSinceLastExecution >= 2 * timerInterval) + { + _logger.LogWarning("HealthCheck revealed: Since the last execution of \"{name}\" more than twice the interval has passed", Scriptable.ToolSet.FilePath); + return HealthCheckResult.Unhealthy($"HealthCheck revealed: Since the last execution of \"{Scriptable.ToolSet.FilePath}\" more than twice the interval has passed"); + } + return HealthCheckResult.Healthy(); + } + +} + +public class ScheduleCall : ICall +{ + public bool IsEnabled { get; set; } + public bool IsExecuting { get; set; } + public Worker Worker { get; } + public JobKey JobKey { get; } + public JobDataMap JobDataMap { get; } + public CallConfig CallConfig { get; set; } + private ILogger _logger { get; } + public DateTime? LastExecution { get; set; } + public DateTime? LastSuccessfulExecution { get; set; } + private StdSchedulerFactory SchedulerFactory { get; } + private IScheduler Scheduler { get; } + + public ScheduleCall(Worker worker, CallConfig callConfig, ILogger logger) + { + Worker = worker; + CallConfig = callConfig; + _logger = logger; + IsEnabled = false; + IsExecuting = false; + JobKey = new(worker.Name); + SchedulerFactory = new(); + Scheduler = SchedulerFactory.GetScheduler(CancellationToken.None).Result; + JobDataMap = []; + JobDataMap["action"] = () => + { + try + { + DateTime beforeExecution = DateTime.Now; + IsExecuting = true; + try + { + worker.Scriptable.Update(new ScheduleCallbackInfos()); + } + finally + { + IsExecuting = false; + LastExecution = beforeExecution; + worker.LastExecution = beforeExecution; + } + DateTime afterExecution = DateTime.Now; + WorkerManager.UpdateCallAndWorkerTimestamps(this, worker, beforeExecution, afterExecution); + } + catch (Exception ex) + { + _logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", worker.Name, ex.Message); + } + }; + CreateJob().Wait(); + Start(); + } + + public void Start() + { + if (!IsEnabled) + { + Scheduler.Start(CancellationToken.None).Wait(); + IsEnabled = true; + } + } + + public void Stop() + { + Scheduler.PauseAll(); + IsEnabled = false; + } + + + private async Task CreateJob() + { + if (CallConfig.Schedule is null) + { + throw new IndexerConfigurationException($"Interval not set for a Call in Worker \"{Worker.Name}\""); + } + try + { + + await Scheduler.ScheduleJob( + JobBuilder.Create() + .WithIdentity(JobKey) + .Build(), + TriggerBuilder.Create() + .ForJob(JobKey) + .WithIdentity(Worker.Name + "-trigger") + .UsingJobData(JobDataMap) + .WithCronSchedule(CallConfig.Schedule) + .Build(), + CancellationToken.None); + } + catch (FormatException) + { + throw new IndexerConfigurationException($"Quartz Cron expression invalid in Worker \"{Worker.Name}\" - Quartz syntax differs from classic cron"); + } + } + + public HealthCheckResult HealthCheck() + { + return HealthCheckResult.Unhealthy(); // Not implemented yet + } +} + +public class FileUpdateCall : ICall +{ + public bool IsEnabled { get; set; } + public bool IsExecuting { get; set; } + public Worker Worker { get; } + public CallConfig CallConfig { get; set; } + private ILogger _logger { get; } + private FileSystemWatcher _watcher { get; } + public DateTime? LastExecution { get; set; } + public DateTime? LastSuccessfulExecution { get; set; } + + public FileUpdateCall(Worker worker, CallConfig callConfig, ILogger logger) + { + Worker = worker; + CallConfig = callConfig; + _logger = logger; + IsEnabled = true; + IsExecuting = false; + if (CallConfig.Path is null) + { + throw new IndexerConfigurationException($"Path not set for a Call in Worker \"{Worker.Name}\""); + } + _watcher = new FileSystemWatcher(CallConfig.Path); + _watcher.Changed += OnFileChanged; + _watcher.Deleted += OnFileChanged; + _watcher.EnableRaisingEvents = true; + } + + public void Start() + { + if (!IsEnabled) + { + IsEnabled = true; + _watcher.EnableRaisingEvents = true; + Index(); + } + } + + public void Stop() + { + if (IsEnabled) + { + IsEnabled = false; + _watcher.EnableRaisingEvents = false; + } + } + + + private void OnFileChanged(object sender, FileSystemEventArgs e) + { + if (!IsEnabled) + { + return; + } + Index(sender, e); + } + + private void Index(object? sender, FileSystemEventArgs? e) + { + try + { + DateTime beforeExecution = DateTime.Now; + IsExecuting = true; + try + { + Worker.Scriptable.Update(new FileUpdateCallbackInfos() {sender = sender, e = e}); + } + finally + { + IsExecuting = false; + LastExecution = beforeExecution; + Worker.LastExecution = beforeExecution; + } + DateTime afterExecution = DateTime.Now; + WorkerManager.UpdateCallAndWorkerTimestamps(this, Worker, beforeExecution, afterExecution); + } + catch (Exception ex) + { + _logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", Worker.Name, ex.Message); + } + } + + private void Index() + { + Index(null, null); + } + + public HealthCheckResult HealthCheck() + { + return HealthCheckResult.Unhealthy(); // Not implemented yet + } +} \ No newline at end of file diff --git a/src/Indexer/Controllers/CallsController.cs b/src/Indexer/Controllers/CallsController.cs index 9155957..236bb39 100644 --- a/src/Indexer/Controllers/CallsController.cs +++ b/src/Indexer/Controllers/CallsController.cs @@ -11,9 +11,9 @@ public class CallsController : ControllerBase private readonly ILogger _logger; private readonly IConfiguration _config; private readonly IConfigurationRoot _configurationRoot; - private readonly WorkerCollection _workerCollection; + private readonly WorkerManager _workerCollection; - public CallsController(ILogger logger, IConfiguration config, IConfigurationRoot configurationRoot, WorkerCollection workerCollection) + public CallsController(ILogger logger, IConfiguration config, IConfigurationRoot configurationRoot, WorkerManager workerCollection) { _logger = logger; _config = config; diff --git a/src/Indexer/Controllers/WorkerController.cs b/src/Indexer/Controllers/WorkerController.cs index 3970254..dac57ab 100644 --- a/src/Indexer/Controllers/WorkerController.cs +++ b/src/Indexer/Controllers/WorkerController.cs @@ -11,9 +11,9 @@ public class WorkerController : ControllerBase private readonly ILogger _logger; private readonly IConfiguration _config; private readonly IConfigurationRoot _configurationRoot; - private readonly WorkerCollection _workerCollection; + private readonly WorkerManager _workerCollection; - public WorkerController(ILogger logger, IConfiguration config, IConfigurationRoot configurationRoot, WorkerCollection workerCollection) + public WorkerController(ILogger logger, IConfiguration config, IConfigurationRoot configurationRoot, WorkerManager workerCollection) { _logger = logger; _config = config; @@ -86,7 +86,7 @@ public class WorkerController : ControllerBase worker.LastExecution = beforeExecution; } DateTime afterExecution = DateTime.Now; - WorkerCollection.UpdateWorkerTimestamps(worker, beforeExecution, afterExecution); + WorkerManager.UpdateWorkerTimestamps(worker, beforeExecution, afterExecution); } _logger.LogInformation("triggered worker {name}.", [name]); return new WorkerTriggerUpdateResult { Success = true }; diff --git a/src/Indexer/IndexerHealthChecks.cs b/src/Indexer/IndexerHealthChecks.cs index 66d3c31..1db8041 100644 --- a/src/Indexer/IndexerHealthChecks.cs +++ b/src/Indexer/IndexerHealthChecks.cs @@ -6,8 +6,8 @@ namespace Indexer; public class WorkerHealthCheck : IHealthCheck { - private readonly WorkerCollection _workerCollection; - public WorkerHealthCheck(WorkerCollection workerCollection) + private readonly WorkerManager _workerCollection; + public WorkerHealthCheck(WorkerManager workerCollection) { _workerCollection = workerCollection; } diff --git a/src/Indexer/Models/CallModels.cs b/src/Indexer/Models/CallModels.cs new file mode 100644 index 0000000..77c1407 --- /dev/null +++ b/src/Indexer/Models/CallModels.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.Diagnostics.HealthChecks; + +public class CallConfig +{ + public required string Type { get; set; } + public long? Interval { get; set; } // For Type: Interval + public string? Path { get; set; } // For Type: FileSystemWatcher + public string? Schedule { get; set; } // For Type: Schedule +} +public interface ICall +{ + public HealthCheckResult HealthCheck(); + public void Start(); + public void Stop(); + public bool IsEnabled { get; set; } + public bool IsExecuting { get; set; } + public CallConfig CallConfig { get; set; } + public DateTime? LastExecution { get; set; } + public DateTime? LastSuccessfulExecution { get; set; } +} \ No newline at end of file diff --git a/src/Indexer/Models/Script.cs b/src/Indexer/Models/ScriptModels.cs similarity index 87% rename from src/Indexer/Models/Script.cs rename to src/Indexer/Models/ScriptModels.cs index f114a9e..08aba2c 100644 --- a/src/Indexer/Models/Script.cs +++ b/src/Indexer/Models/ScriptModels.cs @@ -2,6 +2,13 @@ using System.Timers; namespace Indexer.Models; +public interface IScript +{ + int Init(ScriptToolSet toolSet); + int Update(ICallbackInfos callbackInfos); + int Stop(); +} + public class ScriptToolSet { public string FilePath; @@ -21,6 +28,8 @@ public class ScriptToolSet } } +public interface ICallbackInfos { } + public class RunOnceCallbackInfos : ICallbackInfos {} public class IntervalCallbackInfos : ICallbackInfos diff --git a/src/Indexer/Models/Interfaces.cs b/src/Indexer/Models/ScriptableModels.cs similarity index 64% rename from src/Indexer/Models/Interfaces.cs rename to src/Indexer/Models/ScriptableModels.cs index b4b8815..88f5a62 100644 --- a/src/Indexer/Models/Interfaces.cs +++ b/src/Indexer/Models/ScriptableModels.cs @@ -1,12 +1,5 @@ namespace Indexer.Models; -public interface IScript -{ - int Init(ScriptToolSet toolSet); - int Update(ICallbackInfos callbackInfos); - int Stop(); -} - public interface IScriptable { ScriptToolSet ToolSet { get; set; } @@ -17,7 +10,4 @@ public interface IScriptable int Stop(); abstract static bool IsScript(string filePath); -} - -public interface ICallbackInfos { } - +} \ No newline at end of file diff --git a/src/Indexer/Models/Worker.cs b/src/Indexer/Models/Worker.cs deleted file mode 100644 index df0b835..0000000 --- a/src/Indexer/Models/Worker.cs +++ /dev/null @@ -1,566 +0,0 @@ -using Microsoft.Extensions.Diagnostics.HealthChecks; -using Indexer.Scriptables; -using Indexer.Exceptions; -using Quartz; -using Quartz.Impl; - -namespace Indexer.Models; - -public class WorkerCollection -{ - public Dictionary Workers; - public List types; - private readonly ILogger _logger; - private readonly IConfiguration _configuration; - private readonly Client.Client client; - - public WorkerCollection(ILogger logger, IConfiguration configuration, Client.Client client) - { - Workers = []; - types = [typeof(PythonScriptable), typeof(CSharpScriptable)]; - _logger = logger; - _configuration = configuration; - this.client = client; - } - - public void InitializeWorkers() - { - _logger.LogInformation("Initializing workers"); - // Load and configure all workers - var sectionMain = _configuration.GetSection("EmbeddingsearchIndexer"); - if (!sectionMain.Exists()) - { - _logger.LogCritical("Unable to load section \"EmbeddingsearchIndexer\""); - throw new IndexerConfigurationException("Unable to load section \"EmbeddingsearchIndexer\""); - } - - WorkerCollectionConfig? sectionWorker = (WorkerCollectionConfig?)sectionMain.Get(typeof(WorkerCollectionConfig)); //GetValue("Worker"); - if (sectionWorker is not null) - { - foreach (WorkerConfig workerConfig in sectionWorker.Worker) - { - ScriptToolSet toolSet = new(workerConfig.Script, client, _logger, _configuration, workerConfig.Name); - InitializeWorker(toolSet, workerConfig); - } - } - else - { - _logger.LogCritical("Unable to load section \"Worker\""); - throw new IndexerConfigurationException("Unable to load section \"Worker\""); - } - _logger.LogInformation("Initialized workers"); - } - - public void InitializeWorker(ScriptToolSet toolSet, WorkerConfig workerConfig) - { - _logger.LogInformation("Initializing worker: {Name}", workerConfig.Name); - Worker worker = new(workerConfig.Name, workerConfig, GetScriptable(toolSet)); - Workers[workerConfig.Name] = worker; - foreach (CallConfig callConfig in workerConfig.Calls) - { - _logger.LogInformation("Initializing call of type: {Type}", callConfig.Type); - - switch (callConfig.Type) - { - case "runonce": - RunOnceCall runOnceCall = new(worker, _logger, callConfig); - worker.Calls.Add(runOnceCall); - break; - case "interval": - if (callConfig.Interval is null) - { - _logger.LogError("Interval not set for a Call in Worker \"{Name}\"", workerConfig.Name); - throw new IndexerConfigurationException($"Interval not set for a Call in Worker \"{workerConfig.Name}\""); - } - var timer = new System.Timers.Timer((double)callConfig.Interval); - timer.AutoReset = true; - timer.Enabled = true; - DateTime now = DateTime.Now; - IntervalCall intervallCall = new(timer, worker.Scriptable, _logger, callConfig) - { - LastExecution = now, - LastSuccessfulExecution = now - }; - timer.Elapsed += (sender, e) => - { - try - { - DateTime beforeExecution = DateTime.Now; - intervallCall.IsExecuting = true; - try - { - worker.Scriptable.Update(new IntervalCallbackInfos() { sender = sender, e = e }); - } - finally - { - intervallCall.IsExecuting = false; - intervallCall.LastExecution = beforeExecution; - worker.LastExecution = beforeExecution; - } - DateTime afterExecution = DateTime.Now; - UpdateCallAndWorkerTimestamps(intervallCall, worker, beforeExecution, afterExecution); - } - catch (Exception ex) - { - _logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", worker.Name, ex.Message); - } - }; - worker.Calls.Add(intervallCall); - break; - case "schedule": // TODO implement scheduled tasks using Quartz - ScheduleCall scheduleCall = new(worker, callConfig, _logger); - worker.Calls.Add(scheduleCall); - break; - case "fileupdate": - if (callConfig.Path is null) - { - _logger.LogError("Path not set for a Call in Worker \"{Name}\"", workerConfig.Name); - throw new IndexerConfigurationException($"Path not set for a Call in Worker \"{workerConfig.Name}\""); - } - FileUpdateCall fileUpdateCall = new(worker, callConfig, _logger); - worker.Calls.Add(fileUpdateCall); - break; - default: - throw new IndexerConfigurationException($"Unknown Type specified for a Call in Worker \"{workerConfig.Name}\""); - } - } - } - - public static void UpdateCallAndWorkerTimestamps(ICall call, Worker worker, DateTime beforeExecution, DateTime afterExecution) - { - UpdateCallTimestamps(call, beforeExecution, afterExecution); - UpdateWorkerTimestamps(worker, beforeExecution, afterExecution); - } - - public static void UpdateCallTimestamps(ICall call, DateTime beforeExecution, DateTime afterExecution) - { - call.LastSuccessfulExecution = GetNewestDateTime(call.LastSuccessfulExecution, afterExecution); - } - - public static void UpdateWorkerTimestamps(Worker worker, DateTime beforeExecution, DateTime afterExecution) - { - worker.LastSuccessfulExecution = GetNewestDateTime(worker.LastSuccessfulExecution, afterExecution); - } - - - public static DateTime? GetNewestDateTime(DateTime? preexistingDateTime, DateTime incomingDateTime) - { - if (preexistingDateTime is null || preexistingDateTime.Value.CompareTo(incomingDateTime) < 0) - { - return incomingDateTime; - } - return preexistingDateTime; - } - - public IScriptable GetScriptable(ScriptToolSet toolSet) - { - string fileName = toolSet.FilePath ?? throw new IndexerConfigurationException($"\"Script\" not set for Worker \"{toolSet.Name}\""); - foreach (Type type in types) - { - System.Reflection.MethodInfo? method = type.GetMethod("IsScript"); - bool? isInstance = method is not null ? (bool?)method.Invoke(null, [fileName]) : null; - if (isInstance == true) - { - IScriptable? instance = (IScriptable?)Activator.CreateInstance(type, [toolSet, _logger]); - if (instance is null) - { - _logger.LogError("Unable to initialize script: \"{fileName}\"", fileName); - throw new Exception($"Unable to initialize script: \"{fileName}\""); - } - return instance; - } - } - _logger.LogError("Unable to determine the script's language: \"{fileName}\"", fileName); - throw new UnknownScriptLanguageException(fileName); - } -} - -public class Worker -{ - public string Name { get; set; } - public WorkerConfig Config { get; set; } - public IScriptable Scriptable { get; set; } - public List Calls { get; set; } - public bool IsExecuting { get; set; } - public DateTime? LastExecution { get; set; } - public DateTime? LastSuccessfulExecution { get; set; } - - public Worker(string name, WorkerConfig workerConfig, IScriptable scriptable) - { - Name = name; - Config = workerConfig; - Scriptable = scriptable; - IsExecuting = false; - Calls = []; - } - - public HealthCheckResult HealthCheck() - { - bool hasDegraded = false; - bool hasUnhealthy = false; - foreach (ICall call in Calls) - { - HealthCheckResult callHealth = call.HealthCheck(); - if (callHealth.Status != HealthStatus.Healthy) - { - hasDegraded |= callHealth.Status == HealthStatus.Degraded; - hasUnhealthy |= callHealth.Status == HealthStatus.Unhealthy; - } - } - if (hasUnhealthy) - { - return HealthCheckResult.Unhealthy(); // TODO: Retrieve and forward the error message for each call - } - else if (hasDegraded) - { - return HealthCheckResult.Degraded(); - } - return HealthCheckResult.Healthy(); - } -} - -public class WorkerCollectionConfig -{ - public required List Worker { get; set; } -} - -public class WorkerConfig -{ - public required string Name { get; set; } - public required List Searchdomains { get; set; } - public required string Script { get; set; } - public required List Calls { get; set; } -} - -public class CallConfig -{ - public required string Type { get; set; } - public long? Interval { get; set; } // For Type: Interval - public string? Path { get; set; } // For Type: FileSystemWatcher - public string? Schedule { get; set; } // For Type: Schedule -} - -public interface ICall -{ - public HealthCheckResult HealthCheck(); - public void Start(); - public void Stop(); - public bool IsEnabled { get; set; } - public bool IsExecuting { get; set; } - public CallConfig CallConfig { get; set; } - public DateTime? LastExecution { get; set; } - public DateTime? LastSuccessfulExecution { get; set; } -} - -public class RunOnceCall : ICall -{ - public ILogger _logger; - public bool IsEnabled { get; set; } - public bool IsExecuting { get; set; } - public Worker Worker { get; } - public CallConfig CallConfig { get; set; } - public DateTime? LastExecution { get; set; } - public DateTime? LastSuccessfulExecution { get; set; } - - public RunOnceCall(Worker worker, ILogger logger, CallConfig callConfig) - { - Worker = worker; - _logger = logger; - CallConfig = callConfig; - IsEnabled = true; - IsExecuting = false; - IndexAsync(); - } - - public void Start() - { - IndexAsync(); - IsEnabled = true; - } - - public void Stop() - { - IsEnabled = false; - } - - private async void IndexAsync() - { - try - { - DateTime beforeExecution = DateTime.Now; - IsExecuting = true; - try - { - await Task.Run(() => Worker.Scriptable.Update(new RunOnceCallbackInfos())); - } - finally - { - IsExecuting = false; - LastExecution = beforeExecution; - Worker.LastExecution = beforeExecution; - } - DateTime afterExecution = DateTime.Now; - WorkerCollection.UpdateCallAndWorkerTimestamps(this, Worker, beforeExecution, afterExecution); - } - catch (Exception ex) - { - _logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", Worker.Name, ex.Message); - } - } - - public HealthCheckResult HealthCheck() - { - return HealthCheckResult.Healthy(); // TODO implement proper healthcheck - } - -} - -public class IntervalCall : ICall -{ - public System.Timers.Timer Timer; - public IScriptable Scriptable; - public ILogger _logger; - public bool IsEnabled { get; set; } - public bool IsExecuting { get; set; } - public CallConfig CallConfig { get; set; } - public DateTime? LastExecution { get; set; } - public DateTime? LastSuccessfulExecution { get; set; } - - public IntervalCall(System.Timers.Timer timer, IScriptable scriptable, ILogger logger, CallConfig callConfig) - { - Timer = timer; - Scriptable = scriptable; - _logger = logger; - CallConfig = callConfig; - IsEnabled = true; - IsExecuting = false; - } - - public void Start() - { - Timer.Start(); - IsEnabled = true; - } - - public void Stop() - { - Scriptable.Stop(); - Timer.Stop(); - IsEnabled = false; - } - - public HealthCheckResult HealthCheck() - { - if (!Scriptable.UpdateInfo.Successful) - { - _logger.LogWarning("HealthCheck revealed: The last execution of \"{name}\" was not successful", Scriptable.ToolSet.FilePath); - return HealthCheckResult.Unhealthy($"HealthCheck revealed: The last execution of \"{Scriptable.ToolSet.FilePath}\" was not successful"); - } - double timerInterval = Timer.Interval; // In ms - DateTime lastRunDateTime = Scriptable.UpdateInfo.DateTime; - DateTime now = DateTime.Now; - double millisecondsSinceLastExecution = now.Subtract(lastRunDateTime).TotalMilliseconds; - if (millisecondsSinceLastExecution >= 2 * timerInterval) - { - _logger.LogWarning("HealthCheck revealed: Since the last execution of \"{name}\" more than twice the interval has passed", Scriptable.ToolSet.FilePath); - return HealthCheckResult.Unhealthy($"HealthCheck revealed: Since the last execution of \"{Scriptable.ToolSet.FilePath}\" more than twice the interval has passed"); - } - return HealthCheckResult.Healthy(); - } - -} - -public class ScheduleCall : ICall -{ - public bool IsEnabled { get; set; } - public bool IsExecuting { get; set; } - public Worker Worker { get; } - public JobKey JobKey { get; } - public JobDataMap JobDataMap { get; } - public CallConfig CallConfig { get; set; } - private ILogger _logger { get; } - public DateTime? LastExecution { get; set; } - public DateTime? LastSuccessfulExecution { get; set; } - private StdSchedulerFactory SchedulerFactory { get; } - private IScheduler Scheduler { get; } - - public ScheduleCall(Worker worker, CallConfig callConfig, ILogger logger) - { - Worker = worker; - CallConfig = callConfig; - _logger = logger; - IsEnabled = false; - IsExecuting = false; - JobKey = new(worker.Name); - SchedulerFactory = new(); - Scheduler = SchedulerFactory.GetScheduler(CancellationToken.None).Result; - JobDataMap = []; - JobDataMap["action"] = () => - { - try - { - DateTime beforeExecution = DateTime.Now; - IsExecuting = true; - try - { - worker.Scriptable.Update(new ScheduleCallbackInfos()); - } - finally - { - IsExecuting = false; - LastExecution = beforeExecution; - worker.LastExecution = beforeExecution; - } - DateTime afterExecution = DateTime.Now; - WorkerCollection.UpdateCallAndWorkerTimestamps(this, worker, beforeExecution, afterExecution); - } - catch (Exception ex) - { - _logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", worker.Name, ex.Message); - } - }; - CreateJob().Wait(); - Start(); - } - - public void Start() - { - if (!IsEnabled) - { - Scheduler.Start(CancellationToken.None).Wait(); - IsEnabled = true; - } - } - - public void Stop() - { - Scheduler.PauseAll(); - IsEnabled = false; - } - - - private async Task CreateJob() - { - if (CallConfig.Schedule is null) - { - throw new IndexerConfigurationException($"Interval not set for a Call in Worker \"{Worker.Name}\""); - } - try - { - - await Scheduler.ScheduleJob( - JobBuilder.Create() - .WithIdentity(JobKey) - .Build(), - TriggerBuilder.Create() - .ForJob(JobKey) - .WithIdentity(Worker.Name + "-trigger") - .UsingJobData(JobDataMap) - .WithCronSchedule(CallConfig.Schedule) - .Build(), - CancellationToken.None); - } - catch (FormatException) - { - throw new IndexerConfigurationException($"Quartz Cron expression invalid in Worker \"{Worker.Name}\" - Quartz syntax differs from classic cron"); - } - } - - public HealthCheckResult HealthCheck() - { - return HealthCheckResult.Unhealthy(); // Not implemented yet - } -} - -public class FileUpdateCall : ICall -{ - public bool IsEnabled { get; set; } - public bool IsExecuting { get; set; } - public Worker Worker { get; } - public CallConfig CallConfig { get; set; } - private ILogger _logger { get; } - private FileSystemWatcher _watcher { get; } - public DateTime? LastExecution { get; set; } - public DateTime? LastSuccessfulExecution { get; set; } - - public FileUpdateCall(Worker worker, CallConfig callConfig, ILogger logger) - { - Worker = worker; - CallConfig = callConfig; - _logger = logger; - IsEnabled = true; - IsExecuting = false; - if (CallConfig.Path is null) - { - throw new IndexerConfigurationException($"Path not set for a Call in Worker \"{Worker.Name}\""); - } - _watcher = new FileSystemWatcher(CallConfig.Path); - _watcher.Changed += OnFileChanged; - _watcher.Deleted += OnFileChanged; - _watcher.EnableRaisingEvents = true; - } - - public void Start() - { - if (!IsEnabled) - { - IsEnabled = true; - _watcher.EnableRaisingEvents = true; - Index(); - } - } - - public void Stop() - { - if (IsEnabled) - { - IsEnabled = false; - _watcher.EnableRaisingEvents = false; - } - } - - - private void OnFileChanged(object sender, FileSystemEventArgs e) - { - if (!IsEnabled) - { - return; - } - Index(sender, e); - } - - private void Index(object? sender, FileSystemEventArgs? e) - { - try - { - DateTime beforeExecution = DateTime.Now; - IsExecuting = true; - try - { - Worker.Scriptable.Update(new FileUpdateCallbackInfos() {sender = sender, e = e}); - } - finally - { - IsExecuting = false; - LastExecution = beforeExecution; - Worker.LastExecution = beforeExecution; - } - DateTime afterExecution = DateTime.Now; - WorkerCollection.UpdateCallAndWorkerTimestamps(this, Worker, beforeExecution, afterExecution); - } - catch (Exception ex) - { - _logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", Worker.Name, ex.Message); - } - } - - private void Index() - { - Index(null, null); - } - - public HealthCheckResult HealthCheck() - { - return HealthCheckResult.Unhealthy(); // Not implemented yet - } -} \ No newline at end of file diff --git a/src/Indexer/Models/WorkerModels.cs b/src/Indexer/Models/WorkerModels.cs new file mode 100644 index 0000000..9fb58af --- /dev/null +++ b/src/Indexer/Models/WorkerModels.cs @@ -0,0 +1,14 @@ +namespace Indexer.Models; + +public class WorkerCollectionConfig +{ + public required List Worker { get; set; } +} + +public class WorkerConfig +{ + public required string Name { get; set; } + public required List Searchdomains { get; set; } + public required string Script { get; set; } + public required List Calls { get; set; } +} diff --git a/src/Indexer/Models/WorkerResults.cs b/src/Indexer/Models/WorkerResultModels.cs similarity index 100% rename from src/Indexer/Models/WorkerResults.cs rename to src/Indexer/Models/WorkerResultModels.cs diff --git a/src/Indexer/Program.cs b/src/Indexer/Program.cs index 0090eb9..a78af58 100644 --- a/src/Indexer/Program.cs +++ b/src/Indexer/Program.cs @@ -22,7 +22,7 @@ builder.Logging.AddSerilog(); builder.Services.AddHttpContextAccessor(); builder.Services.AddSingleton(builder.Configuration); builder.Services.AddSingleton(); -builder.Services.AddSingleton(); +builder.Services.AddSingleton(); builder.Services.AddHostedService(); builder.Services.AddHealthChecks() .AddCheck("WorkerHealthCheck"); diff --git a/src/Indexer/Scripts/example.csx b/src/Indexer/Scripts/example.csx index 33b30a6..9507b1c 100644 --- a/src/Indexer/Scripts/example.csx +++ b/src/Indexer/Scripts/example.csx @@ -1,7 +1,6 @@ #load "../../Client/Client.cs" -#load "../Models/Script.cs" -#load "../Models/Interfaces.cs" -#load "../Models/WorkerResults.cs" +#load "../Models/ScriptModels.cs" +#load "../Models/WorkerResultModels.cs" #load "../../Shared/Models/SearchdomainResults.cs" #load "../../Shared/Models/JSONModels.cs" #load "../../Shared/Models/EntityResults.cs" diff --git a/src/Indexer/Services/IndexerService.cs b/src/Indexer/Services/IndexerService.cs index a6a0bda..07541d8 100644 --- a/src/Indexer/Services/IndexerService.cs +++ b/src/Indexer/Services/IndexerService.cs @@ -4,10 +4,10 @@ namespace Indexer.Services; public class IndexerService : IHostedService { - public WorkerCollection workerCollection; + public WorkerManager workerCollection; public ILogger _logger; - public IndexerService(WorkerCollection workerCollection, Client.Client client, ILogger logger) + public IndexerService(WorkerManager workerCollection, Client.Client client, ILogger logger) { this.workerCollection = workerCollection; _logger = logger; diff --git a/src/Indexer/Worker.cs b/src/Indexer/Worker.cs new file mode 100644 index 0000000..c196484 --- /dev/null +++ b/src/Indexer/Worker.cs @@ -0,0 +1,46 @@ +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Indexer.Models; + +public class Worker +{ + public string Name { get; set; } + public WorkerConfig Config { get; set; } + public IScriptable Scriptable { get; set; } + public List Calls { get; set; } + public bool IsExecuting { get; set; } + public DateTime? LastExecution { get; set; } + public DateTime? LastSuccessfulExecution { get; set; } + + public Worker(string name, WorkerConfig workerConfig, IScriptable scriptable) + { + Name = name; + Config = workerConfig; + Scriptable = scriptable; + IsExecuting = false; + Calls = []; + } + + public HealthCheckResult HealthCheck() + { + bool hasDegraded = false; + bool hasUnhealthy = false; + foreach (ICall call in Calls) + { + HealthCheckResult callHealth = call.HealthCheck(); + if (callHealth.Status != HealthStatus.Healthy) + { + hasDegraded |= callHealth.Status == HealthStatus.Degraded; + hasUnhealthy |= callHealth.Status == HealthStatus.Unhealthy; + } + } + if (hasUnhealthy) + { + return HealthCheckResult.Unhealthy(); // TODO: Retrieve and forward the error message for each call + } + else if (hasDegraded) + { + return HealthCheckResult.Degraded(); + } + return HealthCheckResult.Healthy(); + } +} \ No newline at end of file diff --git a/src/Indexer/WorkerManager.cs b/src/Indexer/WorkerManager.cs new file mode 100644 index 0000000..31bc883 --- /dev/null +++ b/src/Indexer/WorkerManager.cs @@ -0,0 +1,172 @@ +using Indexer.Scriptables; +using Indexer.Exceptions; +using Indexer.Models; + +public class WorkerManager +{ + public Dictionary Workers; + public List types; + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + private readonly Client.Client client; + + public WorkerManager(ILogger logger, IConfiguration configuration, Client.Client client) + { + Workers = []; + types = [typeof(PythonScriptable), typeof(CSharpScriptable)]; + _logger = logger; + _configuration = configuration; + this.client = client; + } + + public void InitializeWorkers() + { + _logger.LogInformation("Initializing workers"); + // Load and configure all workers + var sectionMain = _configuration.GetSection("EmbeddingsearchIndexer"); + if (!sectionMain.Exists()) + { + _logger.LogCritical("Unable to load section \"EmbeddingsearchIndexer\""); + throw new IndexerConfigurationException("Unable to load section \"EmbeddingsearchIndexer\""); + } + + WorkerCollectionConfig? sectionWorker = (WorkerCollectionConfig?)sectionMain.Get(typeof(WorkerCollectionConfig)); //GetValue("Worker"); + if (sectionWorker is not null) + { + foreach (WorkerConfig workerConfig in sectionWorker.Worker) + { + ScriptToolSet toolSet = new(workerConfig.Script, client, _logger, _configuration, workerConfig.Name); + InitializeWorker(toolSet, workerConfig); + } + } + else + { + _logger.LogCritical("Unable to load section \"Worker\""); + throw new IndexerConfigurationException("Unable to load section \"Worker\""); + } + _logger.LogInformation("Initialized workers"); + } + + public void InitializeWorker(ScriptToolSet toolSet, WorkerConfig workerConfig) + { + _logger.LogInformation("Initializing worker: {Name}", workerConfig.Name); + Worker worker = new(workerConfig.Name, workerConfig, GetScriptable(toolSet)); + Workers[workerConfig.Name] = worker; + foreach (CallConfig callConfig in workerConfig.Calls) + { + _logger.LogInformation("Initializing call of type: {Type}", callConfig.Type); + + switch (callConfig.Type) + { + case "runonce": + RunOnceCall runOnceCall = new(worker, _logger, callConfig); + worker.Calls.Add(runOnceCall); + break; + case "interval": + if (callConfig.Interval is null) + { + _logger.LogError("Interval not set for a Call in Worker \"{Name}\"", workerConfig.Name); + throw new IndexerConfigurationException($"Interval not set for a Call in Worker \"{workerConfig.Name}\""); + } + var timer = new System.Timers.Timer((double)callConfig.Interval); + timer.AutoReset = true; + timer.Enabled = true; + DateTime now = DateTime.Now; + IntervalCall intervallCall = new(timer, worker.Scriptable, _logger, callConfig) + { + LastExecution = now, + LastSuccessfulExecution = now + }; + timer.Elapsed += (sender, e) => + { + try + { + DateTime beforeExecution = DateTime.Now; + intervallCall.IsExecuting = true; + try + { + worker.Scriptable.Update(new IntervalCallbackInfos() { sender = sender, e = e }); + } + finally + { + intervallCall.IsExecuting = false; + intervallCall.LastExecution = beforeExecution; + worker.LastExecution = beforeExecution; + } + DateTime afterExecution = DateTime.Now; + UpdateCallAndWorkerTimestamps(intervallCall, worker, beforeExecution, afterExecution); + } + catch (Exception ex) + { + _logger.LogError("Exception occurred in a Call of Worker \"{name}\": \"{ex}\"", worker.Name, ex.Message); + } + }; + worker.Calls.Add(intervallCall); + break; + case "schedule": // TODO implement scheduled tasks using Quartz + ScheduleCall scheduleCall = new(worker, callConfig, _logger); + worker.Calls.Add(scheduleCall); + break; + case "fileupdate": + if (callConfig.Path is null) + { + _logger.LogError("Path not set for a Call in Worker \"{Name}\"", workerConfig.Name); + throw new IndexerConfigurationException($"Path not set for a Call in Worker \"{workerConfig.Name}\""); + } + FileUpdateCall fileUpdateCall = new(worker, callConfig, _logger); + worker.Calls.Add(fileUpdateCall); + break; + default: + throw new IndexerConfigurationException($"Unknown Type specified for a Call in Worker \"{workerConfig.Name}\""); + } + } + } + + public static void UpdateCallAndWorkerTimestamps(ICall call, Worker worker, DateTime beforeExecution, DateTime afterExecution) + { + UpdateCallTimestamps(call, beforeExecution, afterExecution); + UpdateWorkerTimestamps(worker, beforeExecution, afterExecution); + } + + public static void UpdateCallTimestamps(ICall call, DateTime beforeExecution, DateTime afterExecution) + { + call.LastSuccessfulExecution = GetNewestDateTime(call.LastSuccessfulExecution, afterExecution); + } + + public static void UpdateWorkerTimestamps(Worker worker, DateTime beforeExecution, DateTime afterExecution) + { + worker.LastSuccessfulExecution = GetNewestDateTime(worker.LastSuccessfulExecution, afterExecution); + } + + + public static DateTime? GetNewestDateTime(DateTime? preexistingDateTime, DateTime incomingDateTime) + { + if (preexistingDateTime is null || preexistingDateTime.Value.CompareTo(incomingDateTime) < 0) + { + return incomingDateTime; + } + return preexistingDateTime; + } + + public IScriptable GetScriptable(ScriptToolSet toolSet) + { + string fileName = toolSet.FilePath ?? throw new IndexerConfigurationException($"\"Script\" not set for Worker \"{toolSet.Name}\""); + foreach (Type type in types) + { + System.Reflection.MethodInfo? method = type.GetMethod("IsScript"); + bool? isInstance = method is not null ? (bool?)method.Invoke(null, [fileName]) : null; + if (isInstance == true) + { + IScriptable? instance = (IScriptable?)Activator.CreateInstance(type, [toolSet, _logger]); + if (instance is null) + { + _logger.LogError("Unable to initialize script: \"{fileName}\"", fileName); + throw new Exception($"Unable to initialize script: \"{fileName}\""); + } + return instance; + } + } + _logger.LogError("Unable to determine the script's language: \"{fileName}\"", fileName); + throw new UnknownScriptLanguageException(fileName); + } +}