Merged Cake and Abel branched into 2.0.3 (#131)
Co-authored-by: azyges <aaaaaa@aaa.aaa> Co-authored-by: cake <admin@cakeandbanana.nl> Co-authored-by: defnotken <itsdefnotken@gmail.com> Reviewed-on: #131
This commit was merged in pull request #131.
This commit is contained in:
@@ -6,6 +6,7 @@ using LightlessSync.FileCache;
|
||||
using LightlessSync.LightlessConfiguration;
|
||||
using LightlessSync.PlayerData.Handlers;
|
||||
using LightlessSync.Services.Mediator;
|
||||
using LightlessSync.Services.ModelDecimation;
|
||||
using LightlessSync.Services.TextureCompression;
|
||||
using LightlessSync.WebAPI.Files.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
@@ -17,19 +18,21 @@ namespace LightlessSync.WebAPI.Files;
|
||||
|
||||
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
{
|
||||
private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
|
||||
private readonly object _downloadStatusLock = new();
|
||||
private readonly ConcurrentDictionary<string, FileDownloadStatus> _downloadStatus;
|
||||
|
||||
private readonly FileCompactor _fileCompactor;
|
||||
private readonly FileCacheManager _fileDbManager;
|
||||
private readonly FileTransferOrchestrator _orchestrator;
|
||||
private readonly LightlessConfigService _configService;
|
||||
private readonly TextureDownscaleService _textureDownscaleService;
|
||||
private readonly ModelDecimationService _modelDecimationService;
|
||||
private readonly TextureMetadataHelper _textureMetadataHelper;
|
||||
|
||||
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
|
||||
private readonly SemaphoreSlim _decompressGate =
|
||||
new(Math.Max(1, Environment.ProcessorCount / 2), Math.Max(1, Environment.ProcessorCount / 2));
|
||||
|
||||
private readonly ConcurrentQueue<string> _deferredCompressionQueue = new();
|
||||
|
||||
private volatile bool _disableDirectDownloads;
|
||||
private int _consecutiveDirectDownloadFailures;
|
||||
@@ -43,14 +46,16 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
FileCompactor fileCompactor,
|
||||
LightlessConfigService configService,
|
||||
TextureDownscaleService textureDownscaleService,
|
||||
ModelDecimationService modelDecimationService,
|
||||
TextureMetadataHelper textureMetadataHelper) : base(logger, mediator)
|
||||
{
|
||||
_downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
|
||||
_downloadStatus = new ConcurrentDictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
|
||||
_orchestrator = orchestrator;
|
||||
_fileDbManager = fileCacheManager;
|
||||
_fileCompactor = fileCompactor;
|
||||
_configService = configService;
|
||||
_textureDownscaleService = textureDownscaleService;
|
||||
_modelDecimationService = modelDecimationService;
|
||||
_textureMetadataHelper = textureMetadataHelper;
|
||||
_activeDownloadStreams = new();
|
||||
_lastConfigDirectDownloadsState = _configService.Current.EnableDirectDownloads;
|
||||
@@ -84,19 +89,16 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
public void ClearDownload()
|
||||
{
|
||||
CurrentDownloads.Clear();
|
||||
lock (_downloadStatusLock)
|
||||
{
|
||||
_downloadStatus.Clear();
|
||||
}
|
||||
_downloadStatus.Clear();
|
||||
CurrentOwnerToken = null;
|
||||
}
|
||||
|
||||
public async Task DownloadFiles(GameObjectHandler? gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct, bool skipDownscale = false)
|
||||
public async Task DownloadFiles(GameObjectHandler? gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct, bool skipDownscale = false, bool skipDecimation = false)
|
||||
{
|
||||
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
|
||||
try
|
||||
{
|
||||
await DownloadFilesInternal(gameObject, fileReplacementDto, ct, skipDownscale).ConfigureAwait(false);
|
||||
await DownloadFilesInternal(gameObject, fileReplacementDto, ct, skipDownscale, skipDecimation).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
@@ -154,29 +156,20 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
private void SetStatus(string key, DownloadStatus status)
|
||||
{
|
||||
lock (_downloadStatusLock)
|
||||
{
|
||||
if (_downloadStatus.TryGetValue(key, out var st))
|
||||
st.DownloadStatus = status;
|
||||
}
|
||||
if (_downloadStatus.TryGetValue(key, out var st))
|
||||
st.DownloadStatus = status;
|
||||
}
|
||||
|
||||
private void AddTransferredBytes(string key, long delta)
|
||||
{
|
||||
lock (_downloadStatusLock)
|
||||
{
|
||||
if (_downloadStatus.TryGetValue(key, out var st))
|
||||
st.TransferredBytes += delta;
|
||||
}
|
||||
if (_downloadStatus.TryGetValue(key, out var st))
|
||||
st.AddTransferredBytes(delta);
|
||||
}
|
||||
|
||||
private void MarkTransferredFiles(string key, int files)
|
||||
{
|
||||
lock (_downloadStatusLock)
|
||||
{
|
||||
if (_downloadStatus.TryGetValue(key, out var st))
|
||||
st.TransferredFiles = files;
|
||||
}
|
||||
if (_downloadStatus.TryGetValue(key, out var st))
|
||||
st.SetTransferredFiles(files);
|
||||
}
|
||||
|
||||
private static byte MungeByte(int byteOrEof)
|
||||
@@ -404,76 +397,32 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
|
||||
{
|
||||
bool alreadyCancelled = false;
|
||||
try
|
||||
while (true)
|
||||
{
|
||||
CancellationTokenSource localTimeoutCts = new();
|
||||
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
|
||||
CancellationTokenSource composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
|
||||
downloadCt.ThrowIfCancellationRequested();
|
||||
|
||||
while (!_orchestrator.IsDownloadReady(requestId))
|
||||
if (_orchestrator.IsDownloadReady(requestId))
|
||||
break;
|
||||
|
||||
using var resp = await _orchestrator.SendRequestAsync(
|
||||
HttpMethod.Get,
|
||||
LightlessFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
|
||||
downloadFileTransfer.Select(t => t.Hash).ToList(),
|
||||
downloadCt).ConfigureAwait(false);
|
||||
|
||||
resp.EnsureSuccessStatusCode();
|
||||
|
||||
var body = (await resp.Content.ReadAsStringAsync(downloadCt).ConfigureAwait(false)).Trim();
|
||||
if (string.Equals(body, "true", StringComparison.OrdinalIgnoreCase) ||
|
||||
body.Contains("\"ready\":true", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(250, composite.Token).ConfigureAwait(false);
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
if (downloadCt.IsCancellationRequested) throw;
|
||||
|
||||
var req = await _orchestrator.SendRequestAsync(
|
||||
HttpMethod.Get,
|
||||
LightlessFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
|
||||
downloadFileTransfer.Select(c => c.Hash).ToList(),
|
||||
downloadCt).ConfigureAwait(false);
|
||||
|
||||
req.EnsureSuccessStatusCode();
|
||||
|
||||
localTimeoutCts.Dispose();
|
||||
composite.Dispose();
|
||||
|
||||
localTimeoutCts = new();
|
||||
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
|
||||
composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
localTimeoutCts.Dispose();
|
||||
composite.Dispose();
|
||||
|
||||
Logger.LogDebug("Download {requestId} ready", requestId);
|
||||
await Task.Delay(250, downloadCt).ConfigureAwait(false);
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId))
|
||||
.ConfigureAwait(false);
|
||||
alreadyCancelled = true;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (downloadCt.IsCancellationRequested && !alreadyCancelled)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId))
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
_orchestrator.ClearDownloadRequest(requestId);
|
||||
}
|
||||
_orchestrator.ClearDownloadRequest(requestId);
|
||||
}
|
||||
|
||||
private async Task DownloadQueuedBlockFileAsync(
|
||||
@@ -502,21 +451,15 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
}
|
||||
}
|
||||
|
||||
private void RemoveStatus(string key)
|
||||
{
|
||||
lock (_downloadStatusLock)
|
||||
{
|
||||
_downloadStatus.Remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task DecompressBlockFileAsync(
|
||||
string downloadStatusKey,
|
||||
string blockFilePath,
|
||||
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
||||
IReadOnlyDictionary<string, long> rawSizeLookup,
|
||||
string downloadLabel,
|
||||
CancellationToken ct,
|
||||
bool skipDownscale)
|
||||
bool skipDownscale,
|
||||
bool skipDecimation)
|
||||
{
|
||||
SetStatus(downloadStatusKey, DownloadStatus.Decompressing);
|
||||
MarkTransferredFiles(downloadStatusKey, 1);
|
||||
@@ -532,29 +475,33 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
try
|
||||
{
|
||||
// sanity check length
|
||||
if (fileLengthBytes < 0 || fileLengthBytes > int.MaxValue)
|
||||
throw new InvalidDataException($"Invalid block entry length: {fileLengthBytes}");
|
||||
|
||||
// safe cast after check
|
||||
var len = checked((int)fileLengthBytes);
|
||||
|
||||
if (!replacementLookup.TryGetValue(fileHash, out var repl))
|
||||
{
|
||||
Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash);
|
||||
fileBlockStream.Seek(len, SeekOrigin.Current);
|
||||
// still need to skip bytes:
|
||||
var skip = checked((int)fileLengthBytes);
|
||||
fileBlockStream.Position += skip;
|
||||
continue;
|
||||
}
|
||||
|
||||
// decompress
|
||||
var filePath = _fileDbManager.GetCacheFilePath(fileHash, repl.Extension);
|
||||
Logger.LogTrace("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath);
|
||||
|
||||
// read compressed data
|
||||
var compressed = new byte[len];
|
||||
|
||||
await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false);
|
||||
|
||||
if (len == 0)
|
||||
MungeBuffer(compressed);
|
||||
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
||||
|
||||
if (rawSizeLookup.TryGetValue(fileHash, out var expectedRawSize)
|
||||
&& expectedRawSize > 0
|
||||
&& decompressed.LongLength != expectedRawSize)
|
||||
{
|
||||
await _fileCompactor.WriteAllBytesAsync(filePath, Array.Empty<byte>(), ct).ConfigureAwait(false);
|
||||
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
||||
@@ -563,21 +510,24 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
MungeBuffer(compressed);
|
||||
|
||||
// limit concurrent decompressions
|
||||
await _decompressGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
// offload CPU-intensive decompression to threadpool to free up worker
|
||||
await Task.Run(async () =>
|
||||
{
|
||||
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||
|
||||
// decompress
|
||||
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
||||
// decompress
|
||||
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
||||
|
||||
Logger.LogTrace("{dlName}: Unwrap {fileHash} took {ms}ms (compressed {c} bytes, decompressed {d} bytes)",
|
||||
downloadLabel, fileHash, sw.ElapsedMilliseconds, compressed.Length, decompressed?.Length ?? -1);
|
||||
Logger.LogTrace("{dlName}: Unwrap {fileHash} took {ms}ms (compressed {c} bytes, decompressed {d} bytes)",
|
||||
downloadLabel, fileHash, sw.ElapsedMilliseconds, compressed.Length, decompressed?.Length ?? -1);
|
||||
|
||||
// write to file
|
||||
await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
|
||||
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
||||
// write to file without compacting during download
|
||||
await File.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
|
||||
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
||||
}, ct).ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -594,6 +544,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SetStatus(downloadStatusKey, DownloadStatus.Completed);
|
||||
}
|
||||
catch (EndOfStreamException)
|
||||
{
|
||||
@@ -603,10 +555,6 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
{
|
||||
Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel);
|
||||
}
|
||||
finally
|
||||
{
|
||||
RemoveStatus(downloadStatusKey);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<List<DownloadFileTransfer>> InitiateDownloadList(
|
||||
@@ -644,21 +592,25 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
.. await FilesGetSizes(hashes, ct).ConfigureAwait(false),
|
||||
];
|
||||
|
||||
Logger.LogDebug("Files with size 0 or less: {files}",
|
||||
string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
|
||||
|
||||
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
||||
{
|
||||
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
||||
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
||||
}
|
||||
|
||||
CurrentDownloads = [.. downloadFileInfoFromService
|
||||
CurrentDownloads = downloadFileInfoFromService
|
||||
.Distinct()
|
||||
.Select(d => new DownloadFileTransfer(d))
|
||||
.Where(d => d.CanBeTransferred)];
|
||||
.Where(d => d.CanBeTransferred)
|
||||
.ToList();
|
||||
|
||||
return CurrentDownloads;
|
||||
}
|
||||
|
||||
private sealed record BatchChunk(string Key, List<DownloadFileTransfer> Items);
|
||||
private sealed record BatchChunk(string HostKey, string StatusKey, List<DownloadFileTransfer> Items);
|
||||
|
||||
private static IEnumerable<List<T>> ChunkList<T>(List<T> items, int chunkSize)
|
||||
{
|
||||
@@ -666,7 +618,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
yield return items.GetRange(i, Math.Min(chunkSize, items.Count - i));
|
||||
}
|
||||
|
||||
private async Task DownloadFilesInternal(GameObjectHandler? gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct, bool skipDownscale)
|
||||
private async Task DownloadFilesInternal(GameObjectHandler? gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct, bool skipDownscale, bool skipDecimation)
|
||||
{
|
||||
var objectName = gameObjectHandler?.Name ?? "Unknown";
|
||||
|
||||
@@ -684,6 +636,20 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
var allowDirectDownloads = ShouldUseDirectDownloads();
|
||||
var replacementLookup = BuildReplacementLookup(fileReplacement);
|
||||
var rawSizeLookup = new Dictionary<string, long>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
foreach (var download in CurrentDownloads)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(download.Hash))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!rawSizeLookup.TryGetValue(download.Hash, out var existing) || existing <= 0)
|
||||
{
|
||||
rawSizeLookup[download.Hash] = download.TotalRaw;
|
||||
}
|
||||
}
|
||||
|
||||
var directDownloads = new List<DownloadFileTransfer>();
|
||||
var batchDownloads = new List<DownloadFileTransfer>();
|
||||
@@ -708,39 +674,36 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
var chunkSize = (int)Math.Ceiling(list.Count / (double)chunkCount);
|
||||
|
||||
return ChunkList(list, chunkSize)
|
||||
.Select(chunk => new BatchChunk(g.Key, chunk));
|
||||
.Select((chunk, index) => new BatchChunk(g.Key, $"{g.Key}#{index + 1}", chunk));
|
||||
})
|
||||
.ToArray();
|
||||
|
||||
// init statuses
|
||||
lock (_downloadStatusLock)
|
||||
_downloadStatus.Clear();
|
||||
|
||||
// direct downloads and batch downloads tracked separately
|
||||
foreach (var d in directDownloads)
|
||||
{
|
||||
_downloadStatus.Clear();
|
||||
|
||||
// direct downloads and batch downloads tracked separately
|
||||
foreach (var d in directDownloads)
|
||||
_downloadStatus[d.DirectDownloadUrl!] = new FileDownloadStatus
|
||||
{
|
||||
_downloadStatus[d.DirectDownloadUrl!] = new FileDownloadStatus
|
||||
{
|
||||
DownloadStatus = DownloadStatus.Initializing,
|
||||
TotalBytes = d.Total,
|
||||
TotalFiles = 1,
|
||||
TransferredBytes = 0,
|
||||
TransferredFiles = 0
|
||||
};
|
||||
}
|
||||
DownloadStatus = DownloadStatus.WaitingForSlot,
|
||||
TotalBytes = d.Total,
|
||||
TotalFiles = 1,
|
||||
TransferredBytes = 0,
|
||||
TransferredFiles = 0
|
||||
};
|
||||
}
|
||||
|
||||
foreach (var g in batchChunks.GroupBy(c => c.Key, StringComparer.Ordinal))
|
||||
foreach (var chunk in batchChunks)
|
||||
{
|
||||
_downloadStatus[chunk.StatusKey] = new FileDownloadStatus
|
||||
{
|
||||
_downloadStatus[g.Key] = new FileDownloadStatus
|
||||
{
|
||||
DownloadStatus = DownloadStatus.Initializing,
|
||||
TotalBytes = g.SelectMany(x => x.Items).Sum(x => x.Total),
|
||||
TotalFiles = 1,
|
||||
TransferredBytes = 0,
|
||||
TransferredFiles = 0
|
||||
};
|
||||
}
|
||||
DownloadStatus = DownloadStatus.WaitingForQueue,
|
||||
TotalBytes = chunk.Items.Sum(x => x.Total),
|
||||
TotalFiles = 1,
|
||||
TransferredBytes = 0,
|
||||
TransferredFiles = 0
|
||||
};
|
||||
}
|
||||
|
||||
if (directDownloads.Count > 0 || batchChunks.Length > 0)
|
||||
@@ -752,30 +715,47 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
if (gameObjectHandler is not null)
|
||||
Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
|
||||
|
||||
// work based on cpu count and slots
|
||||
var coreCount = Environment.ProcessorCount;
|
||||
var baseWorkers = Math.Min(slots, coreCount);
|
||||
|
||||
// only add buffer if decompression has capacity AND we have cores to spare
|
||||
var availableDecompressSlots = _decompressGate.CurrentCount;
|
||||
var extraWorkers = (availableDecompressSlots > 0 && coreCount >= 6) ? 2 : 0;
|
||||
|
||||
// allow some extra workers so downloads can continue while earlier items decompress.
|
||||
var workerDop = Math.Clamp(slots * 2, 2, 16);
|
||||
var workerDop = Math.Clamp(baseWorkers + extraWorkers, 2, coreCount);
|
||||
|
||||
// batch downloads
|
||||
Task batchTask = batchChunks.Length == 0
|
||||
? Task.CompletedTask
|
||||
: Parallel.ForEachAsync(batchChunks, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct },
|
||||
async (chunk, token) => await ProcessBatchChunkAsync(chunk, replacementLookup, token, skipDownscale).ConfigureAwait(false));
|
||||
async (chunk, token) => await ProcessBatchChunkAsync(chunk, replacementLookup, rawSizeLookup, token, skipDownscale, skipDecimation).ConfigureAwait(false));
|
||||
|
||||
// direct downloads
|
||||
Task directTask = directDownloads.Count == 0
|
||||
? Task.CompletedTask
|
||||
: Parallel.ForEachAsync(directDownloads, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct },
|
||||
async (d, token) => await ProcessDirectAsync(d, replacementLookup, token, skipDownscale).ConfigureAwait(false));
|
||||
async (d, token) => await ProcessDirectAsync(d, replacementLookup, rawSizeLookup, token, skipDownscale, skipDecimation).ConfigureAwait(false));
|
||||
|
||||
await Task.WhenAll(batchTask, directTask).ConfigureAwait(false);
|
||||
|
||||
// process deferred compressions after all downloads complete
|
||||
await ProcessDeferredCompressionsAsync(ct).ConfigureAwait(false);
|
||||
|
||||
Logger.LogDebug("Download end: {id}", objectName);
|
||||
ClearDownload();
|
||||
}
|
||||
|
||||
private async Task ProcessBatchChunkAsync(BatchChunk chunk, Dictionary<string, (string Extension, string GamePath)> replacementLookup, CancellationToken ct, bool skipDownscale)
|
||||
private async Task ProcessBatchChunkAsync(
|
||||
BatchChunk chunk,
|
||||
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
||||
IReadOnlyDictionary<string, long> rawSizeLookup,
|
||||
CancellationToken ct,
|
||||
bool skipDownscale,
|
||||
bool skipDecimation)
|
||||
{
|
||||
var statusKey = chunk.Key;
|
||||
var statusKey = chunk.StatusKey;
|
||||
|
||||
// enqueue (no slot)
|
||||
SetStatus(statusKey, DownloadStatus.WaitingForQueue);
|
||||
@@ -793,7 +773,6 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
try
|
||||
{
|
||||
// download (with slot)
|
||||
var progress = CreateInlineProgress(bytes => AddTransferredBytes(statusKey, bytes));
|
||||
|
||||
// Download slot held on get
|
||||
@@ -803,10 +782,11 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
if (!File.Exists(blockFile))
|
||||
{
|
||||
Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name);
|
||||
SetStatus(statusKey, DownloadStatus.Completed);
|
||||
return;
|
||||
}
|
||||
|
||||
await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, fi.Name, ct, skipDownscale).ConfigureAwait(false);
|
||||
await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, rawSizeLookup, fi.Name, ct, skipDownscale, skipDecimation).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@@ -823,7 +803,13 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessDirectAsync(DownloadFileTransfer directDownload, Dictionary<string, (string Extension, string GamePath)> replacementLookup, CancellationToken ct, bool skipDownscale)
|
||||
private async Task ProcessDirectAsync(
|
||||
DownloadFileTransfer directDownload,
|
||||
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
||||
IReadOnlyDictionary<string, long> rawSizeLookup,
|
||||
CancellationToken ct,
|
||||
bool skipDownscale,
|
||||
bool skipDecimation)
|
||||
{
|
||||
var progress = CreateInlineProgress(bytes =>
|
||||
{
|
||||
@@ -833,7 +819,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
if (!ShouldUseDirectDownloads() || string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
||||
{
|
||||
await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, progress, ct, skipDownscale).ConfigureAwait(false);
|
||||
await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale, skipDecimation).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -861,6 +847,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
if (!replacementLookup.TryGetValue(directDownload.Hash, out var repl))
|
||||
{
|
||||
Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash);
|
||||
SetStatus(directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -873,13 +860,18 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename, ct).ConfigureAwait(false);
|
||||
var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes);
|
||||
|
||||
if (directDownload.TotalRaw > 0 && decompressedBytes.LongLength != directDownload.TotalRaw)
|
||||
{
|
||||
throw new InvalidDataException(
|
||||
$"{directDownload.Hash}: Decompressed size mismatch (expected {directDownload.TotalRaw}, got {decompressedBytes.LongLength})");
|
||||
}
|
||||
|
||||
await _fileCompactor.WriteAllBytesAsync(finalFilename, decompressedBytes, ct).ConfigureAwait(false);
|
||||
PersistFileToStorage(directDownload.Hash, finalFilename, repl.GamePath, skipDownscale);
|
||||
PersistFileToStorage(directDownload.Hash, finalFilename, repl.GamePath, skipDownscale, skipDecimation);
|
||||
|
||||
MarkTransferredFiles(directDownload.DirectDownloadUrl!, 1);
|
||||
SetStatus(directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
|
||||
Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash);
|
||||
|
||||
RemoveStatus(directDownload.DirectDownloadUrl!);
|
||||
}
|
||||
catch (OperationCanceledException ex)
|
||||
{
|
||||
@@ -902,7 +894,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
try
|
||||
{
|
||||
await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, progress, ct, skipDownscale).ConfigureAwait(false);
|
||||
await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale, skipDecimation).ConfigureAwait(false);
|
||||
|
||||
if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads)
|
||||
{
|
||||
@@ -929,9 +921,11 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
private async Task ProcessDirectAsQueuedFallbackAsync(
|
||||
DownloadFileTransfer directDownload,
|
||||
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
||||
IReadOnlyDictionary<string, long> rawSizeLookup,
|
||||
IProgress<long> progress,
|
||||
CancellationToken ct,
|
||||
bool skipDownscale)
|
||||
bool skipDownscale,
|
||||
bool skipDecimation)
|
||||
{
|
||||
if (string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
||||
throw new InvalidOperationException("Direct download fallback requested without a direct download URL.");
|
||||
@@ -956,7 +950,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
if (!File.Exists(blockFile))
|
||||
throw new FileNotFoundException("Block file missing after direct download fallback.", blockFile);
|
||||
|
||||
await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, $"fallback-{directDownload.Hash}", ct, skipDownscale)
|
||||
await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, rawSizeLookup, $"fallback-{directDownload.Hash}", ct, skipDownscale, skipDecimation)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
finally
|
||||
@@ -974,18 +968,16 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
if (!_orchestrator.IsInitialized)
|
||||
throw new InvalidOperationException("FileTransferManager is not initialized");
|
||||
|
||||
// batch request
|
||||
var response = await _orchestrator.SendRequestAsync(
|
||||
HttpMethod.Get,
|
||||
LightlessFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!),
|
||||
hashes,
|
||||
ct).ConfigureAwait(false);
|
||||
|
||||
// ensure success
|
||||
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
|
||||
}
|
||||
|
||||
private void PersistFileToStorage(string fileHash, string filePath, string gamePath, bool skipDownscale)
|
||||
private void PersistFileToStorage(string fileHash, string filePath, string gamePath, bool skipDownscale, bool skipDecimation)
|
||||
{
|
||||
var fi = new FileInfo(filePath);
|
||||
|
||||
@@ -1001,13 +993,26 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
fi.LastAccessTime = DateTime.Today;
|
||||
fi.LastWriteTime = RandomDayInThePast().Invoke();
|
||||
|
||||
// queue file for deferred compression instead of compressing immediately
|
||||
if (_configService.Current.UseCompactor)
|
||||
_deferredCompressionQueue.Enqueue(filePath);
|
||||
|
||||
try
|
||||
{
|
||||
var entry = _fileDbManager.CreateCacheEntry(filePath);
|
||||
var mapKind = _textureMetadataHelper.DetermineMapKind(gamePath, filePath);
|
||||
var entry = _fileDbManager.CreateCacheEntryWithKnownHash(filePath, fileHash);
|
||||
|
||||
if (!skipDownscale)
|
||||
_textureDownscaleService.ScheduleDownscale(fileHash, filePath, mapKind);
|
||||
if (!skipDownscale && _textureDownscaleService.ShouldScheduleDownscale(filePath))
|
||||
{
|
||||
_textureDownscaleService.ScheduleDownscale(
|
||||
fileHash,
|
||||
filePath,
|
||||
() => _textureMetadataHelper.DetermineMapKind(gamePath, filePath));
|
||||
}
|
||||
|
||||
if (!skipDecimation && _modelDecimationService.ShouldScheduleDecimation(fileHash, filePath, gamePath))
|
||||
{
|
||||
_modelDecimationService.ScheduleDecimation(fileHash, filePath, gamePath);
|
||||
}
|
||||
|
||||
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
@@ -1026,6 +1031,52 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
|
||||
private static IProgress<long> CreateInlineProgress(Action<long> callback) => new InlineProgress(callback);
|
||||
|
||||
private async Task ProcessDeferredCompressionsAsync(CancellationToken ct)
|
||||
{
|
||||
if (_deferredCompressionQueue.IsEmpty)
|
||||
return;
|
||||
|
||||
var filesToCompress = new List<string>();
|
||||
while (_deferredCompressionQueue.TryDequeue(out var filePath))
|
||||
{
|
||||
if (File.Exists(filePath))
|
||||
filesToCompress.Add(filePath);
|
||||
}
|
||||
|
||||
if (filesToCompress.Count == 0)
|
||||
return;
|
||||
|
||||
Logger.LogDebug("Starting deferred compression of {count} files", filesToCompress.Count);
|
||||
|
||||
var compressionWorkers = Math.Clamp(Environment.ProcessorCount / 4, 2, 4);
|
||||
|
||||
await Parallel.ForEachAsync(filesToCompress,
|
||||
new ParallelOptions
|
||||
{
|
||||
MaxDegreeOfParallelism = compressionWorkers,
|
||||
CancellationToken = ct
|
||||
},
|
||||
async (filePath, token) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Yield();
|
||||
if (_configService.Current.UseCompactor && File.Exists(filePath))
|
||||
{
|
||||
var bytes = await File.ReadAllBytesAsync(filePath, token).ConfigureAwait(false);
|
||||
await _fileCompactor.WriteAllBytesAsync(filePath, bytes, token).ConfigureAwait(false);
|
||||
Logger.LogTrace("Compressed file: {filePath}", filePath);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Failed to compress file: {filePath}", filePath);
|
||||
}
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
Logger.LogDebug("Completed deferred compression of {count} files", filesToCompress.Count);
|
||||
}
|
||||
|
||||
private sealed class InlineProgress : IProgress<long>
|
||||
{
|
||||
private readonly Action<long> _callback;
|
||||
|
||||
@@ -6,5 +6,6 @@ public enum DownloadStatus
|
||||
WaitingForSlot,
|
||||
WaitingForQueue,
|
||||
Downloading,
|
||||
Decompressing
|
||||
Decompressing,
|
||||
Completed
|
||||
}
|
||||
@@ -1,10 +1,46 @@
|
||||
namespace LightlessSync.WebAPI.Files.Models;
|
||||
using System.Threading;
|
||||
|
||||
namespace LightlessSync.WebAPI.Files.Models;
|
||||
|
||||
public class FileDownloadStatus
|
||||
{
|
||||
public DownloadStatus DownloadStatus { get; set; }
|
||||
public long TotalBytes { get; set; }
|
||||
public int TotalFiles { get; set; }
|
||||
public long TransferredBytes { get; set; }
|
||||
public int TransferredFiles { get; set; }
|
||||
}
|
||||
private int _downloadStatus;
|
||||
private long _totalBytes;
|
||||
private int _totalFiles;
|
||||
private long _transferredBytes;
|
||||
private int _transferredFiles;
|
||||
|
||||
public DownloadStatus DownloadStatus
|
||||
{
|
||||
get => (DownloadStatus)Volatile.Read(ref _downloadStatus);
|
||||
set => Volatile.Write(ref _downloadStatus, (int)value);
|
||||
}
|
||||
|
||||
public long TotalBytes
|
||||
{
|
||||
get => Interlocked.Read(ref _totalBytes);
|
||||
set => Interlocked.Exchange(ref _totalBytes, value);
|
||||
}
|
||||
|
||||
public int TotalFiles
|
||||
{
|
||||
get => Volatile.Read(ref _totalFiles);
|
||||
set => Volatile.Write(ref _totalFiles, value);
|
||||
}
|
||||
|
||||
public long TransferredBytes
|
||||
{
|
||||
get => Interlocked.Read(ref _transferredBytes);
|
||||
set => Interlocked.Exchange(ref _transferredBytes, value);
|
||||
}
|
||||
|
||||
public int TransferredFiles
|
||||
{
|
||||
get => Volatile.Read(ref _transferredFiles);
|
||||
set => Volatile.Write(ref _transferredFiles, value);
|
||||
}
|
||||
|
||||
public void AddTransferredBytes(long delta) => Interlocked.Add(ref _transferredBytes, delta);
|
||||
|
||||
public void SetTransferredFiles(int files) => Volatile.Write(ref _transferredFiles, files);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user