move texture and model processing out of download thread and fix some normalization for weights
This commit is contained in:
93
LightlessSync/Services/AssetProcessingQueue.cs
Normal file
93
LightlessSync/Services/AssetProcessingQueue.cs
Normal file
@@ -0,0 +1,93 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace LightlessSync.Services;
|
||||
|
||||
public sealed class AssetProcessingQueue : IDisposable
|
||||
{
|
||||
private readonly BlockingCollection<WorkItem> _queue = new();
|
||||
private readonly Thread _worker;
|
||||
private readonly ILogger _logger;
|
||||
private bool _disposed;
|
||||
|
||||
public AssetProcessingQueue(ILogger logger, string name)
|
||||
{
|
||||
_logger = logger;
|
||||
_worker = new Thread(Run)
|
||||
{
|
||||
IsBackground = true,
|
||||
Name = string.IsNullOrWhiteSpace(name) ? "LightlessSync.AssetProcessing" : name
|
||||
};
|
||||
_worker.Start();
|
||||
}
|
||||
|
||||
public Task Enqueue(Func<CancellationToken, Task> work, CancellationToken token = default)
|
||||
{
|
||||
if (work is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(work));
|
||||
}
|
||||
|
||||
var completion = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
if (token.IsCancellationRequested)
|
||||
{
|
||||
completion.TrySetCanceled(token);
|
||||
return completion.Task;
|
||||
}
|
||||
|
||||
if (_queue.IsAddingCompleted || _disposed)
|
||||
{
|
||||
completion.TrySetException(new ObjectDisposedException(nameof(AssetProcessingQueue)));
|
||||
return completion.Task;
|
||||
}
|
||||
|
||||
_queue.Add(new WorkItem(work, token, completion));
|
||||
return completion.Task;
|
||||
}
|
||||
|
||||
private void Run()
|
||||
{
|
||||
foreach (var item in _queue.GetConsumingEnumerable())
|
||||
{
|
||||
if (item.Token.IsCancellationRequested)
|
||||
{
|
||||
item.Completion.TrySetCanceled(item.Token);
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
item.Work(item.Token).GetAwaiter().GetResult();
|
||||
item.Completion.TrySetResult(null);
|
||||
}
|
||||
catch (OperationCanceledException ex)
|
||||
{
|
||||
var token = ex.CancellationToken.IsCancellationRequested ? ex.CancellationToken : item.Token;
|
||||
item.Completion.TrySetCanceled(token);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Asset processing job failed.");
|
||||
item.Completion.TrySetException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_disposed = true;
|
||||
_queue.CompleteAdding();
|
||||
_worker.Join(TimeSpan.FromSeconds(2));
|
||||
_queue.Dispose();
|
||||
}
|
||||
|
||||
private readonly record struct WorkItem(
|
||||
Func<CancellationToken, Task> Work,
|
||||
CancellationToken Token,
|
||||
TaskCompletionSource<object?> Completion);
|
||||
}
|
||||
Reference in New Issue
Block a user