Queue File compacting to let workers download as priority, Offload decompression task
This commit is contained in:
@@ -30,6 +30,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
|
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
|
||||||
private readonly SemaphoreSlim _decompressGate =
|
private readonly SemaphoreSlim _decompressGate =
|
||||||
new(Math.Max(1, Environment.ProcessorCount / 2), Math.Max(1, Environment.ProcessorCount / 2));
|
new(Math.Max(1, Environment.ProcessorCount / 2), Math.Max(1, Environment.ProcessorCount / 2));
|
||||||
|
|
||||||
|
private readonly ConcurrentQueue<string> _deferredCompressionQueue = new();
|
||||||
|
|
||||||
private volatile bool _disableDirectDownloads;
|
private volatile bool _disableDirectDownloads;
|
||||||
private int _consecutiveDirectDownloadFailures;
|
private int _consecutiveDirectDownloadFailures;
|
||||||
@@ -556,7 +558,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
|
|
||||||
if (len == 0)
|
if (len == 0)
|
||||||
{
|
{
|
||||||
await _fileCompactor.WriteAllBytesAsync(filePath, Array.Empty<byte>(), ct).ConfigureAwait(false);
|
await File.WriteAllBytesAsync(filePath, Array.Empty<byte>(), ct).ConfigureAwait(false);
|
||||||
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -567,17 +569,21 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
await _decompressGate.WaitAsync(ct).ConfigureAwait(false);
|
await _decompressGate.WaitAsync(ct).ConfigureAwait(false);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
// offload CPU-intensive decompression to threadpool to free up worker
|
||||||
|
await Task.Run(async () =>
|
||||||
|
{
|
||||||
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||||
|
|
||||||
// decompress
|
// decompress
|
||||||
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
||||||
|
|
||||||
Logger.LogTrace("{dlName}: Unwrap {fileHash} took {ms}ms (compressed {c} bytes, decompressed {d} bytes)",
|
Logger.LogTrace("{dlName}: Unwrap {fileHash} took {ms}ms (compressed {c} bytes, decompressed {d} bytes)",
|
||||||
downloadLabel, fileHash, sw.ElapsedMilliseconds, compressed.Length, decompressed?.Length ?? -1);
|
downloadLabel, fileHash, sw.ElapsedMilliseconds, compressed.Length, decompressed?.Length ?? -1);
|
||||||
|
|
||||||
// write to file
|
// write to file without compacting during download
|
||||||
await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
|
await File.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
|
||||||
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
||||||
|
}, ct).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
@@ -752,8 +758,16 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
if (gameObjectHandler is not null)
|
if (gameObjectHandler is not null)
|
||||||
Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
|
Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
|
||||||
|
|
||||||
|
// work based on cpu count and slots
|
||||||
|
var coreCount = Environment.ProcessorCount;
|
||||||
|
var baseWorkers = Math.Min(slots, coreCount);
|
||||||
|
|
||||||
|
// only add buffer if decompression has capacity AND we have cores to spare
|
||||||
|
var availableDecompressSlots = _decompressGate.CurrentCount;
|
||||||
|
var extraWorkers = (availableDecompressSlots > 0 && coreCount >= 6) ? 2 : 0;
|
||||||
|
|
||||||
// allow some extra workers so downloads can continue while earlier items decompress.
|
// allow some extra workers so downloads can continue while earlier items decompress.
|
||||||
var workerDop = Math.Clamp(slots * 2, 2, 16);
|
var workerDop = Math.Clamp(baseWorkers + extraWorkers, 2, coreCount);
|
||||||
|
|
||||||
// batch downloads
|
// batch downloads
|
||||||
Task batchTask = batchChunks.Length == 0
|
Task batchTask = batchChunks.Length == 0
|
||||||
@@ -769,6 +783,9 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
|
|
||||||
await Task.WhenAll(batchTask, directTask).ConfigureAwait(false);
|
await Task.WhenAll(batchTask, directTask).ConfigureAwait(false);
|
||||||
|
|
||||||
|
// process deferred compressions after all downloads complete
|
||||||
|
await ProcessDeferredCompressionsAsync(ct).ConfigureAwait(false);
|
||||||
|
|
||||||
Logger.LogDebug("Download end: {id}", objectName);
|
Logger.LogDebug("Download end: {id}", objectName);
|
||||||
ClearDownload();
|
ClearDownload();
|
||||||
}
|
}
|
||||||
@@ -873,7 +890,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename, ct).ConfigureAwait(false);
|
byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename, ct).ConfigureAwait(false);
|
||||||
var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes);
|
var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes);
|
||||||
|
|
||||||
await _fileCompactor.WriteAllBytesAsync(finalFilename, decompressedBytes, ct).ConfigureAwait(false);
|
await File.WriteAllBytesAsync(finalFilename, decompressedBytes, ct).ConfigureAwait(false);
|
||||||
PersistFileToStorage(directDownload.Hash, finalFilename, repl.GamePath, skipDownscale);
|
PersistFileToStorage(directDownload.Hash, finalFilename, repl.GamePath, skipDownscale);
|
||||||
|
|
||||||
MarkTransferredFiles(directDownload.DirectDownloadUrl!, 1);
|
MarkTransferredFiles(directDownload.DirectDownloadUrl!, 1);
|
||||||
@@ -1001,6 +1018,10 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
fi.LastAccessTime = DateTime.Today;
|
fi.LastAccessTime = DateTime.Today;
|
||||||
fi.LastWriteTime = RandomDayInThePast().Invoke();
|
fi.LastWriteTime = RandomDayInThePast().Invoke();
|
||||||
|
|
||||||
|
// queue file for deferred compression instead of compressing immediately
|
||||||
|
if (_configService.Current.UseCompactor)
|
||||||
|
_deferredCompressionQueue.Enqueue(filePath);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var entry = _fileDbManager.CreateCacheEntry(filePath);
|
var entry = _fileDbManager.CreateCacheEntry(filePath);
|
||||||
@@ -1026,6 +1047,52 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
|
|
||||||
private static IProgress<long> CreateInlineProgress(Action<long> callback) => new InlineProgress(callback);
|
private static IProgress<long> CreateInlineProgress(Action<long> callback) => new InlineProgress(callback);
|
||||||
|
|
||||||
|
private async Task ProcessDeferredCompressionsAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (_deferredCompressionQueue.IsEmpty)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var filesToCompress = new List<string>();
|
||||||
|
while (_deferredCompressionQueue.TryDequeue(out var filePath))
|
||||||
|
{
|
||||||
|
if (File.Exists(filePath))
|
||||||
|
filesToCompress.Add(filePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (filesToCompress.Count == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
Logger.LogDebug("Starting deferred compression of {count} files", filesToCompress.Count);
|
||||||
|
|
||||||
|
var compressionWorkers = Math.Clamp(Environment.ProcessorCount / 4, 2, 4);
|
||||||
|
|
||||||
|
await Parallel.ForEachAsync(filesToCompress,
|
||||||
|
new ParallelOptions
|
||||||
|
{
|
||||||
|
MaxDegreeOfParallelism = compressionWorkers,
|
||||||
|
CancellationToken = ct
|
||||||
|
},
|
||||||
|
async (filePath, token) =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.Yield();
|
||||||
|
if (_configService.Current.UseCompactor && File.Exists(filePath))
|
||||||
|
{
|
||||||
|
var bytes = await File.ReadAllBytesAsync(filePath, token).ConfigureAwait(false);
|
||||||
|
await _fileCompactor.WriteAllBytesAsync(filePath, bytes, token).ConfigureAwait(false);
|
||||||
|
Logger.LogTrace("Compressed file: {filePath}", filePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Logger.LogWarning(ex, "Failed to compress file: {filePath}", filePath);
|
||||||
|
}
|
||||||
|
}).ConfigureAwait(false);
|
||||||
|
|
||||||
|
Logger.LogDebug("Completed deferred compression of {count} files", filesToCompress.Count);
|
||||||
|
}
|
||||||
|
|
||||||
private sealed class InlineProgress : IProgress<long>
|
private sealed class InlineProgress : IProgress<long>
|
||||||
{
|
{
|
||||||
private readonly Action<long> _callback;
|
private readonly Action<long> _callback;
|
||||||
|
|||||||
Reference in New Issue
Block a user