using Microsoft.Extensions.Logging; using System.Collections.Concurrent; namespace LightlessSync.Services; public sealed class AssetProcessingQueue : IDisposable { private readonly BlockingCollection _queue = new(); private readonly Thread _worker; private readonly ILogger _logger; private bool _disposed; public AssetProcessingQueue(ILogger logger, string name) { _logger = logger; _worker = new Thread(Run) { IsBackground = true, Name = string.IsNullOrWhiteSpace(name) ? "LightlessSync.AssetProcessing" : name }; _worker.Start(); } public Task Enqueue(Func work, CancellationToken token = default) { if (work is null) { throw new ArgumentNullException(nameof(work)); } var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); if (token.IsCancellationRequested) { completion.TrySetCanceled(token); return completion.Task; } if (_queue.IsAddingCompleted || _disposed) { completion.TrySetException(new ObjectDisposedException(nameof(AssetProcessingQueue))); return completion.Task; } _queue.Add(new WorkItem(work, token, completion)); return completion.Task; } private void Run() { foreach (var item in _queue.GetConsumingEnumerable()) { if (item.Token.IsCancellationRequested) { item.Completion.TrySetCanceled(item.Token); continue; } try { item.Work(item.Token).GetAwaiter().GetResult(); item.Completion.TrySetResult(null); } catch (OperationCanceledException ex) { var token = ex.CancellationToken.IsCancellationRequested ? ex.CancellationToken : item.Token; item.Completion.TrySetCanceled(token); } catch (Exception ex) { _logger.LogWarning(ex, "Asset processing job failed."); item.Completion.TrySetException(ex); } } } public void Dispose() { if (_disposed) { return; } _disposed = true; _queue.CompleteAdding(); _worker.Join(TimeSpan.FromSeconds(2)); _queue.Dispose(); } private readonly record struct WorkItem( Func Work, CancellationToken Token, TaskCompletionSource Completion); }