Files
LightlessClient/LightlessSync/Services/AssetProcessingQueue.cs

94 lines
2.6 KiB
C#

using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
namespace LightlessSync.Services;
public sealed class AssetProcessingQueue : IDisposable
{
private readonly BlockingCollection<WorkItem> _queue = new();
private readonly Thread _worker;
private readonly ILogger _logger;
private bool _disposed;
public AssetProcessingQueue(ILogger logger, string name)
{
_logger = logger;
_worker = new Thread(Run)
{
IsBackground = true,
Name = string.IsNullOrWhiteSpace(name) ? "LightlessSync.AssetProcessing" : name
};
_worker.Start();
}
public Task Enqueue(Func<CancellationToken, Task> work, CancellationToken token = default)
{
if (work is null)
{
throw new ArgumentNullException(nameof(work));
}
var completion = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
if (token.IsCancellationRequested)
{
completion.TrySetCanceled(token);
return completion.Task;
}
if (_queue.IsAddingCompleted || _disposed)
{
completion.TrySetException(new ObjectDisposedException(nameof(AssetProcessingQueue)));
return completion.Task;
}
_queue.Add(new WorkItem(work, token, completion));
return completion.Task;
}
private void Run()
{
foreach (var item in _queue.GetConsumingEnumerable())
{
if (item.Token.IsCancellationRequested)
{
item.Completion.TrySetCanceled(item.Token);
continue;
}
try
{
item.Work(item.Token).GetAwaiter().GetResult();
item.Completion.TrySetResult(null);
}
catch (OperationCanceledException ex)
{
var token = ex.CancellationToken.IsCancellationRequested ? ex.CancellationToken : item.Token;
item.Completion.TrySetCanceled(token);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Asset processing job failed.");
item.Completion.TrySetException(ex);
}
}
}
public void Dispose()
{
if (_disposed)
{
return;
}
_disposed = true;
_queue.CompleteAdding();
_worker.Join(TimeSpan.FromSeconds(2));
_queue.Dispose();
}
private readonly record struct WorkItem(
Func<CancellationToken, Task> Work,
CancellationToken Token,
TaskCompletionSource<object?> Completion);
}