From 7d2a914c8418f7a63d5e8da87a95e74254f2af8f Mon Sep 17 00:00:00 2001 From: defnotken Date: Thu, 1 Jan 2026 20:57:37 -0600 Subject: [PATCH] Queue File compacting to let workers download as priority, Offload decompression task --- .../WebAPI/Files/FileDownloadManager.cs | 89 ++++++++++++++++--- 1 file changed, 78 insertions(+), 11 deletions(-) diff --git a/LightlessSync/WebAPI/Files/FileDownloadManager.cs b/LightlessSync/WebAPI/Files/FileDownloadManager.cs index 8aa2b0b..28614a1 100644 --- a/LightlessSync/WebAPI/Files/FileDownloadManager.cs +++ b/LightlessSync/WebAPI/Files/FileDownloadManager.cs @@ -30,6 +30,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase private readonly ConcurrentDictionary _activeDownloadStreams; private readonly SemaphoreSlim _decompressGate = new(Math.Max(1, Environment.ProcessorCount / 2), Math.Max(1, Environment.ProcessorCount / 2)); + + private readonly ConcurrentQueue _deferredCompressionQueue = new(); private volatile bool _disableDirectDownloads; private int _consecutiveDirectDownloadFailures; @@ -556,7 +558,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase if (len == 0) { - await _fileCompactor.WriteAllBytesAsync(filePath, Array.Empty(), ct).ConfigureAwait(false); + await File.WriteAllBytesAsync(filePath, Array.Empty(), ct).ConfigureAwait(false); PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale); continue; } @@ -567,17 +569,21 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase await _decompressGate.WaitAsync(ct).ConfigureAwait(false); try { - var sw = System.Diagnostics.Stopwatch.StartNew(); + // offload CPU-intensive decompression to threadpool to free up worker + await Task.Run(async () => + { + var sw = System.Diagnostics.Stopwatch.StartNew(); - // decompress - var decompressed = LZ4Wrapper.Unwrap(compressed); + // decompress + var decompressed = LZ4Wrapper.Unwrap(compressed); - Logger.LogTrace("{dlName}: Unwrap {fileHash} took {ms}ms (compressed {c} bytes, decompressed {d} bytes)", - downloadLabel, fileHash, sw.ElapsedMilliseconds, compressed.Length, decompressed?.Length ?? -1); + Logger.LogTrace("{dlName}: Unwrap {fileHash} took {ms}ms (compressed {c} bytes, decompressed {d} bytes)", + downloadLabel, fileHash, sw.ElapsedMilliseconds, compressed.Length, decompressed?.Length ?? -1); - // write to file - await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false); - PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale); + // write to file without compacting during download + await File.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false); + PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale); + }, ct).ConfigureAwait(false); } finally { @@ -752,8 +758,16 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase if (gameObjectHandler is not null) Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus)); + // work based on cpu count and slots + var coreCount = Environment.ProcessorCount; + var baseWorkers = Math.Min(slots, coreCount); + + // only add buffer if decompression has capacity AND we have cores to spare + var availableDecompressSlots = _decompressGate.CurrentCount; + var extraWorkers = (availableDecompressSlots > 0 && coreCount >= 6) ? 2 : 0; + // allow some extra workers so downloads can continue while earlier items decompress. - var workerDop = Math.Clamp(slots * 2, 2, 16); + var workerDop = Math.Clamp(baseWorkers + extraWorkers, 2, coreCount); // batch downloads Task batchTask = batchChunks.Length == 0 @@ -769,6 +783,9 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase await Task.WhenAll(batchTask, directTask).ConfigureAwait(false); + // process deferred compressions after all downloads complete + await ProcessDeferredCompressionsAsync(ct).ConfigureAwait(false); + Logger.LogDebug("Download end: {id}", objectName); ClearDownload(); } @@ -873,7 +890,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename, ct).ConfigureAwait(false); var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes); - await _fileCompactor.WriteAllBytesAsync(finalFilename, decompressedBytes, ct).ConfigureAwait(false); + await File.WriteAllBytesAsync(finalFilename, decompressedBytes, ct).ConfigureAwait(false); PersistFileToStorage(directDownload.Hash, finalFilename, repl.GamePath, skipDownscale); MarkTransferredFiles(directDownload.DirectDownloadUrl!, 1); @@ -1001,6 +1018,10 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase fi.LastAccessTime = DateTime.Today; fi.LastWriteTime = RandomDayInThePast().Invoke(); + // queue file for deferred compression instead of compressing immediately + if (_configService.Current.UseCompactor) + _deferredCompressionQueue.Enqueue(filePath); + try { var entry = _fileDbManager.CreateCacheEntry(filePath); @@ -1026,6 +1047,52 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase private static IProgress CreateInlineProgress(Action callback) => new InlineProgress(callback); + private async Task ProcessDeferredCompressionsAsync(CancellationToken ct) + { + if (_deferredCompressionQueue.IsEmpty) + return; + + var filesToCompress = new List(); + while (_deferredCompressionQueue.TryDequeue(out var filePath)) + { + if (File.Exists(filePath)) + filesToCompress.Add(filePath); + } + + if (filesToCompress.Count == 0) + return; + + Logger.LogDebug("Starting deferred compression of {count} files", filesToCompress.Count); + + var compressionWorkers = Math.Clamp(Environment.ProcessorCount / 4, 2, 4); + + await Parallel.ForEachAsync(filesToCompress, + new ParallelOptions + { + MaxDegreeOfParallelism = compressionWorkers, + CancellationToken = ct + }, + async (filePath, token) => + { + try + { + await Task.Yield(); + if (_configService.Current.UseCompactor && File.Exists(filePath)) + { + var bytes = await File.ReadAllBytesAsync(filePath, token).ConfigureAwait(false); + await _fileCompactor.WriteAllBytesAsync(filePath, bytes, token).ConfigureAwait(false); + Logger.LogTrace("Compressed file: {filePath}", filePath); + } + } + catch (Exception ex) + { + Logger.LogWarning(ex, "Failed to compress file: {filePath}", filePath); + } + }).ConfigureAwait(false); + + Logger.LogDebug("Completed deferred compression of {count} files", filesToCompress.Count); + } + private sealed class InlineProgress : IProgress { private readonly Action _callback;