From 319e5b6b6d401559f8cd9436a749f90de99c0faa Mon Sep 17 00:00:00 2001 From: cake Date: Fri, 26 Dec 2025 17:46:02 +0100 Subject: [PATCH] Refactored most of file download, redone it so correct usage of slots and better thread management. --- .../WebAPI/Files/FileDownloadManager.cs | 1290 +++++++++-------- .../WebAPI/Files/FileTransferOrchestrator.cs | 289 ++-- 2 files changed, 866 insertions(+), 713 deletions(-) diff --git a/LightlessSync/WebAPI/Files/FileDownloadManager.cs b/LightlessSync/WebAPI/Files/FileDownloadManager.cs index 59a7e3d..47774f7 100644 --- a/LightlessSync/WebAPI/Files/FileDownloadManager.cs +++ b/LightlessSync/WebAPI/Files/FileDownloadManager.cs @@ -3,6 +3,7 @@ using LightlessSync.API.Data; using LightlessSync.API.Dto.Files; using LightlessSync.API.Routes; using LightlessSync.FileCache; +using LightlessSync.LightlessConfiguration; using LightlessSync.PlayerData.Handlers; using LightlessSync.Services.Mediator; using LightlessSync.Services.TextureCompression; @@ -11,20 +12,23 @@ using Microsoft.Extensions.Logging; using System.Collections.Concurrent; using System.Net; using System.Net.Http.Json; -using LightlessSync.LightlessConfiguration; namespace LightlessSync.WebAPI.Files; public partial class FileDownloadManager : DisposableMediatorSubscriberBase { private readonly Dictionary _downloadStatus; + private readonly object _downloadStatusLock = new(); + private readonly FileCompactor _fileCompactor; private readonly FileCacheManager _fileDbManager; private readonly FileTransferOrchestrator _orchestrator; private readonly LightlessConfigService _configService; private readonly TextureDownscaleService _textureDownscaleService; private readonly TextureMetadataHelper _textureMetadataHelper; + private readonly ConcurrentDictionary _activeDownloadStreams; + private volatile bool _disableDirectDownloads; private int _consecutiveDirectDownloadFailures; private bool _lastConfigDirectDownloadsState; @@ -36,7 +40,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase FileCacheManager fileCacheManager, FileCompactor fileCompactor, LightlessConfigService configService, - TextureDownscaleService textureDownscaleService, TextureMetadataHelper textureMetadataHelper) : base(logger, mediator) + TextureDownscaleService textureDownscaleService, + TextureMetadataHelper textureMetadataHelper) : base(logger, mediator) { _downloadStatus = new Dictionary(StringComparer.Ordinal); _orchestrator = orchestrator; @@ -48,42 +53,39 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase _activeDownloadStreams = new(); _lastConfigDirectDownloadsState = _configService.Current.EnableDirectDownloads; - Mediator.Subscribe(this, (msg) => + Mediator.Subscribe(this, _ => { if (_activeDownloadStreams.IsEmpty) return; + var newLimit = _orchestrator.DownloadLimitPerSlot(); Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit); + foreach (var stream in _activeDownloadStreams.Keys) - { stream.BandwidthLimit = newLimit; - } }); } public List CurrentDownloads { get; private set; } = []; - public List ForbiddenTransfers => _orchestrator.ForbiddenTransfers; public Guid? CurrentOwnerToken { get; private set; } - - public bool IsDownloading => CurrentDownloads.Any(); + public bool IsDownloading => CurrentDownloads.Count != 0; private bool ShouldUseDirectDownloads() - { - return _configService.Current.EnableDirectDownloads && !_disableDirectDownloads; - } + => _configService.Current.EnableDirectDownloads && !_disableDirectDownloads; public static void MungeBuffer(Span buffer) { for (int i = 0; i < buffer.Length; ++i) - { buffer[i] ^= 42; - } } public void ClearDownload() { CurrentDownloads.Clear(); - _downloadStatus.Clear(); + lock (_downloadStatusLock) + { + _downloadStatus.Clear(); + } CurrentOwnerToken = null; } @@ -101,9 +103,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase finally { if (gameObject is not null) - { Mediator.Publish(new DownloadFinishedMessage(gameObject)); - } + Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles))); } } @@ -111,32 +112,74 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase protected override void Dispose(bool disposing) { ClearDownload(); + foreach (var stream in _activeDownloadStreams.Keys.ToList()) { - try - { - stream.Dispose(); - } + try { stream.Dispose(); } catch { - // do nothing - // - } - finally - { - _activeDownloadStreams.TryRemove(stream, out _); + // ignore } + finally { _activeDownloadStreams.TryRemove(stream, out _); } } + base.Dispose(disposing); } + private sealed class DownloadSlotLease : IAsyncDisposable + { + private readonly FileTransferOrchestrator _orch; + private bool _released; + + public DownloadSlotLease(FileTransferOrchestrator orch) => _orch = orch; + + public ValueTask DisposeAsync() + { + if (!_released) + { + _released = true; + _orch.ReleaseDownloadSlot(); + } + return ValueTask.CompletedTask; + } + } + + private async ValueTask AcquireSlotAsync(CancellationToken ct) + { + await _orchestrator.WaitForDownloadSlotAsync(ct).ConfigureAwait(false); + return new DownloadSlotLease(_orchestrator); + } + + private void SetStatus(string key, DownloadStatus status) + { + lock (_downloadStatusLock) + { + if (_downloadStatus.TryGetValue(key, out var st)) + st.DownloadStatus = status; + } + } + + private void AddTransferredBytes(string key, long delta) + { + lock (_downloadStatusLock) + { + if (_downloadStatus.TryGetValue(key, out var st)) + st.TransferredBytes += delta; + } + } + + private void MarkTransferredFiles(string key, int files) + { + lock (_downloadStatusLock) + { + if (_downloadStatus.TryGetValue(key, out var st)) + st.TransferredFiles = files; + } + } + private static byte MungeByte(int byteOrEof) { - if (byteOrEof == -1) - { - throw new EndOfStreamException(); - } - + if (byteOrEof == -1) throw new EndOfStreamException(); return (byte)(byteOrEof ^ 42); } @@ -144,6 +187,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { List hashName = []; List fileLength = []; + var separator = (char)MungeByte(fileBlockStream.ReadByte()); if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #"); @@ -151,8 +195,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase while (true) { int readByte = fileBlockStream.ReadByte(); - if (readByte == -1) - throw new EndOfStreamException(); + if (readByte == -1) throw new EndOfStreamException(); var readChar = (char)MungeByte(readByte); if (readChar == ':') @@ -161,39 +204,69 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase continue; } if (readChar == '#') break; + if (!readHash) hashName.Add(readChar); else fileLength.Add(readChar); } + return (string.Join("", hashName), long.Parse(string.Join("", fileLength))); } - private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List fileTransfer, string tempPath, IProgress progress, CancellationToken ct) + private static async Task ReadExactlyAsync(FileStream stream, Memory buffer, CancellationToken ct) { - Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList())); - - await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false); - - if (_downloadStatus.TryGetValue(downloadGroup, out var downloadStatus)) + int offset = 0; + while (offset < buffer.Length) { - downloadStatus.DownloadStatus = DownloadStatus.Downloading; + int n = await stream.ReadAsync(buffer.Slice(offset), ct).ConfigureAwait(false); + if (n == 0) throw new EndOfStreamException(); + offset += n; } - else + } + + private static Dictionary BuildReplacementLookup(List fileReplacement) + { + var map = new Dictionary(StringComparer.OrdinalIgnoreCase); + + foreach (var r in fileReplacement) { - Logger.LogWarning("Download status missing for {group} when starting download", downloadGroup); + if (r == null || string.IsNullOrWhiteSpace(r.Hash)) continue; + if (map.ContainsKey(r.Hash)) continue; + + var gamePath = r.GamePaths?.FirstOrDefault() ?? string.Empty; + + string ext = ""; + try + { + ext = Path.GetExtension(gamePath)?.TrimStart('.') ?? ""; + } + catch + { + // ignore + } + + if (string.IsNullOrWhiteSpace(ext)) + ext = "bin"; + + map[r.Hash] = (ext, gamePath); } - var requestUrl = LightlessFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId); - - await DownloadFileThrottled(requestUrl, tempPath, progress, MungeBuffer, ct, withToken: true).ConfigureAwait(false); + return map; } private delegate void DownloadDataCallback(Span data); - private async Task DownloadFileThrottled(Uri requestUrl, string destinationFilename, IProgress progress, DownloadDataCallback? callback, CancellationToken ct, bool withToken) + private async Task DownloadFileThrottled( + Uri requestUrl, + string destinationFilename, + IProgress progress, + DownloadDataCallback? callback, + CancellationToken ct, + bool withToken) { const int maxRetries = 3; int retryCount = 0; TimeSpan retryDelay = TimeSpan.FromSeconds(2); + HttpResponseMessage? response = null; while (true) @@ -201,7 +274,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase try { Logger.LogDebug("Attempt {attempt} - Downloading {requestUrl}", retryCount + 1, requestUrl); - response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead, withToken).ConfigureAwait(false); + response = await _orchestrator.SendRequestAsync( + HttpMethod.Get, + requestUrl, + ct, + HttpCompletionOption.ResponseHeadersRead, + withToken) + .ConfigureAwait(false); + response.EnsureSuccessStatusCode(); break; } @@ -246,52 +326,47 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode); if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized) - { throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex); - } throw; } } ThrottledStream? stream = null; - FileStream? fileStream = null; try { - fileStream = File.Create(destinationFilename); + // Determine buffer size based on content length + var contentLen = response!.Content.Headers.ContentLength ?? 0; + var bufferSize = contentLen > 1024 * 1024 ? 65536 : 8196; + var buffer = new byte[bufferSize]; + + // Create destination file stream + var fileStream = new FileStream( + destinationFilename, + FileMode.Create, + FileAccess.Write, + FileShare.None, + bufferSize: 64 * 1024, + useAsync: true); + + // Download with throttling await using (fileStream.ConfigureAwait(false)) { - var bufferSize = response!.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196; - var buffer = new byte[bufferSize]; - var limit = _orchestrator.DownloadLimitPerSlot(); Logger.LogTrace("Starting Download with a speed limit of {limit} to {destination}", limit, destinationFilename); - stream = new(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit); + stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit); _activeDownloadStreams.TryAdd(stream, 0); while (true) { ct.ThrowIfCancellationRequested(); - int bytesRead; - try - { - bytesRead = await stream.ReadAsync(buffer.AsMemory(0, buffer.Length), ct).ConfigureAwait(false); - } - catch (OperationCanceledException ex) - { - Logger.LogWarning(ex, "Request got cancelled : {url}", requestUrl); - throw; - } - if (bytesRead == 0) - { - break; - } + int bytesRead = await stream.ReadAsync(buffer.AsMemory(0, buffer.Length), ct).ConfigureAwait(false); + if (bytesRead == 0) break; callback?.Invoke(buffer.AsSpan(0, bytesRead)); - await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false); progress.Report(bytesRead); @@ -300,24 +375,16 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase Logger.LogDebug("{requestUrl} downloaded to {destination}", requestUrl, destinationFilename); } } - catch (OperationCanceledException) - { - throw; - } - catch (Exception) + catch { try { - fileStream?.Close(); - if (!string.IsNullOrEmpty(destinationFilename) && File.Exists(destinationFilename)) - { File.Delete(destinationFilename); - } } - catch - { - // ignore cleanup errors + catch + { + // ignore } throw; } @@ -333,515 +400,6 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase } } - private async Task DecompressBlockFileAsync(string downloadStatusKey, string blockFilePath, List fileReplacement, string downloadLabel, bool skipDownscale) - { - if (_downloadStatus.TryGetValue(downloadStatusKey, out var status)) - { - status.TransferredFiles = 1; - status.DownloadStatus = DownloadStatus.Decompressing; - } - - FileStream? fileBlockStream = null; - try - { - fileBlockStream = File.OpenRead(blockFilePath); - while (fileBlockStream.Position < fileBlockStream.Length) - { - (string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream); - - try - { - var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1]; - var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension); - Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath); - - byte[] compressedFileContent = new byte[fileLengthBytes]; - var readBytes = await fileBlockStream.ReadAsync(compressedFileContent, CancellationToken.None).ConfigureAwait(false); - if (readBytes != fileLengthBytes) - { - throw new EndOfStreamException(); - } - MungeBuffer(compressedFileContent); - - var decompressedFile = LZ4Wrapper.Unwrap(compressedFileContent); - await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, CancellationToken.None).ConfigureAwait(false); - - var gamePath = fileReplacement.FirstOrDefault(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase))?.GamePaths.FirstOrDefault() ?? string.Empty; - PersistFileToStorage(fileHash, filePath, gamePath, skipDownscale); - } - catch (EndOfStreamException) - { - Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", downloadLabel, fileHash); - } - catch (Exception e) - { - Logger.LogWarning(e, "{dlName}: Error during decompression", downloadLabel); - } - } - } - catch (EndOfStreamException) - { - Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", downloadLabel); - } - catch (Exception ex) - { - Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel); - } - finally - { - if (fileBlockStream != null) - await fileBlockStream.DisposeAsync().ConfigureAwait(false); - } - } - - private async Task PerformDirectDownloadFallbackAsync(DownloadFileTransfer directDownload, List fileReplacement, - IProgress progress, CancellationToken token, bool skipDownscale, bool slotAlreadyAcquired) - { - if (string.IsNullOrEmpty(directDownload.DirectDownloadUrl)) - { - throw new InvalidOperationException("Direct download fallback requested without a direct download URL."); - } - - var downloadKey = directDownload.DirectDownloadUrl!; - bool slotAcquiredHere = false; - string? blockFile = null; - - try - { - if (!slotAlreadyAcquired) - { - if (_downloadStatus.TryGetValue(downloadKey, out var tracker)) - { - tracker.DownloadStatus = DownloadStatus.WaitingForSlot; - } - - await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); - slotAcquiredHere = true; - } - - if (_downloadStatus.TryGetValue(downloadKey, out var queueTracker)) - { - queueTracker.DownloadStatus = DownloadStatus.WaitingForQueue; - } - - var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, LightlessFiles.RequestEnqueueFullPath(directDownload.DownloadUri), - new[] { directDownload.Hash }, token).ConfigureAwait(false); - var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"')); - - blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); - - await DownloadAndMungeFileHttpClient(downloadKey, requestId, [directDownload], blockFile, progress, token).ConfigureAwait(false); - - if (!File.Exists(blockFile)) - { - throw new FileNotFoundException("Block file missing after direct download fallback.", blockFile); - } - - await DecompressBlockFileAsync(downloadKey, blockFile, fileReplacement, $"fallback-{directDownload.Hash}", skipDownscale).ConfigureAwait(false); - } - finally - { - if (slotAcquiredHere) - { - _orchestrator.ReleaseDownloadSlot(); - } - - if (!string.IsNullOrEmpty(blockFile)) - { - try - { - File.Delete(blockFile); - } - catch - { - // ignore cleanup errors - } - } - } - } - - public async Task> InitiateDownloadList(GameObjectHandler? gameObjectHandler, List fileReplacement, CancellationToken ct, Guid? ownerToken = null) - { - CurrentOwnerToken = ownerToken; - var objectName = gameObjectHandler?.Name ?? "Unknown"; - Logger.LogDebug("Download start: {id}", objectName); - - if (fileReplacement == null || fileReplacement.Count == 0) - { - Logger.LogDebug("{dlName}: No file replacements provided", objectName); - CurrentDownloads = []; - return CurrentDownloads; - } - - var hashes = fileReplacement.Where(f => f != null && !string.IsNullOrWhiteSpace(f.Hash)).Select(f => f.Hash).Distinct(StringComparer.Ordinal).ToList(); - - if (hashes.Count == 0) - { - Logger.LogDebug("{dlName}: No valid hashes to download", objectName); - CurrentDownloads = []; - return CurrentDownloads; - } - - List downloadFileInfoFromService = - [ - .. await FilesGetSizes(hashes, ct).ConfigureAwait(false), - ]; - - Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash))); - - foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden)) - { - if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal))) - { - _orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto)); - } - } - - CurrentDownloads = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d)) - .Where(d => d.CanBeTransferred).ToList(); - - return CurrentDownloads; - } - - private async Task DownloadFilesInternal(GameObjectHandler? gameObjectHandler, List fileReplacement, CancellationToken ct, bool skipDownscale) - { - var objectName = gameObjectHandler?.Name ?? "Unknown"; - - var configAllowsDirect = _configService.Current.EnableDirectDownloads; - if (configAllowsDirect != _lastConfigDirectDownloadsState) - { - _lastConfigDirectDownloadsState = configAllowsDirect; - if (configAllowsDirect) - { - _disableDirectDownloads = false; - _consecutiveDirectDownloadFailures = 0; - } - } - - var allowDirectDownloads = ShouldUseDirectDownloads(); - - var directDownloads = new List(); - var batchDownloads = new List(); - - foreach (var download in CurrentDownloads) - { - if (!string.IsNullOrEmpty(download.DirectDownloadUrl) && allowDirectDownloads) - { - directDownloads.Add(download); - } - else - { - batchDownloads.Add(download); - } - } - - var downloadBatches = batchDownloads.GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal).ToArray(); - - foreach (var directDownload in directDownloads) - { - _downloadStatus[directDownload.DirectDownloadUrl!] = new FileDownloadStatus() - { - DownloadStatus = DownloadStatus.Initializing, - TotalBytes = directDownload.Total, - TotalFiles = 1, - TransferredBytes = 0, - TransferredFiles = 0 - }; - } - - foreach (var downloadBatch in downloadBatches) - { - _downloadStatus[downloadBatch.Key] = new FileDownloadStatus() - { - DownloadStatus = DownloadStatus.Initializing, - TotalBytes = downloadBatch.Sum(c => c.Total), - TotalFiles = 1, - TransferredBytes = 0, - TransferredFiles = 0 - }; - } - - if (directDownloads.Count > 0 || downloadBatches.Length > 0) - { - Logger.LogInformation("Downloading {direct} files directly, and {batchtotal} in {batches} batches.", directDownloads.Count, batchDownloads.Count, downloadBatches.Length); - } - - if (gameObjectHandler is not null) - { - Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus)); - } - - Task batchDownloadsTask = downloadBatches.Length == 0 ? Task.CompletedTask : Parallel.ForEachAsync(downloadBatches, new ParallelOptions() - { - MaxDegreeOfParallelism = downloadBatches.Length, - CancellationToken = ct, - }, - async (fileGroup, token) => - { - var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, LightlessFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri), - fileGroup.Select(c => c.Hash), token).ConfigureAwait(false); - Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri, - await requestIdResponse.Content.ReadAsStringAsync(token).ConfigureAwait(false)); - - Guid requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"')); - - Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri); - - var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); - FileInfo fi = new(blockFile); - try - { - if (!_downloadStatus.TryGetValue(fileGroup.Key, out var downloadStatus)) - { - Logger.LogWarning("Download status missing for {group}, aborting", fileGroup.Key); - return; - } - - downloadStatus.DownloadStatus = DownloadStatus.WaitingForSlot; - await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); - downloadStatus.DownloadStatus = DownloadStatus.WaitingForQueue; - var progress = CreateInlineProgress((bytesDownloaded) => - { - try - { - if (_downloadStatus.TryGetValue(fileGroup.Key, out FileDownloadStatus? value)) - { - value.TransferredBytes += bytesDownloaded; - } - } - catch (Exception ex) - { - Logger.LogWarning(ex, "Could not set download progress"); - } - }); - await DownloadAndMungeFileHttpClient(fileGroup.Key, requestId, [.. fileGroup], blockFile, progress, token).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, objectName); - } - catch (Exception ex) - { - _orchestrator.ReleaseDownloadSlot(); - File.Delete(blockFile); - Logger.LogError(ex, "{dlName}: Error during download of {id}", fi.Name, requestId); - ClearDownload(); - return; - } - - try - { - if (!File.Exists(blockFile)) - { - Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name); - return; - } - - await DecompressBlockFileAsync(fileGroup.Key, blockFile, fileReplacement, fi.Name, skipDownscale).ConfigureAwait(false); - } - finally - { - _orchestrator.ReleaseDownloadSlot(); - File.Delete(blockFile); - } - }); - - Task directDownloadsTask = directDownloads.Count == 0 ? Task.CompletedTask : Parallel.ForEachAsync(directDownloads, new ParallelOptions() - { - MaxDegreeOfParallelism = directDownloads.Count, - CancellationToken = ct, - }, - async (directDownload, token) => - { - if (!_downloadStatus.TryGetValue(directDownload.DirectDownloadUrl!, out var downloadTracker)) - { - Logger.LogWarning("Download status missing for direct URL {url}", directDownload.DirectDownloadUrl); - return; - } - - var progress = CreateInlineProgress((bytesDownloaded) => - { - try - { - if (_downloadStatus.TryGetValue(directDownload.DirectDownloadUrl!, out FileDownloadStatus? value)) - { - value.TransferredBytes += bytesDownloaded; - } - } - catch (Exception ex) - { - Logger.LogWarning(ex, "Could not set download progress"); - } - }); - - if (!ShouldUseDirectDownloads()) - { - await PerformDirectDownloadFallbackAsync(directDownload, fileReplacement, progress, token, skipDownscale, slotAlreadyAcquired: false).ConfigureAwait(false); - return; - } - - var tempFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, "bin"); - var slotAcquired = false; - - try - { - downloadTracker.DownloadStatus = DownloadStatus.WaitingForSlot; - await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); - slotAcquired = true; - - downloadTracker.DownloadStatus = DownloadStatus.Downloading; - Logger.LogDebug("Beginning direct download of {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl); - await DownloadFileThrottled(new Uri(directDownload.DirectDownloadUrl!), tempFilename, progress, null, token, withToken: false).ConfigureAwait(false); - - Interlocked.Exchange(ref _consecutiveDirectDownloadFailures, 0); - - downloadTracker.DownloadStatus = DownloadStatus.Decompressing; - - try - { - var replacement = fileReplacement.FirstOrDefault(f => string.Equals(f.Hash, directDownload.Hash, StringComparison.OrdinalIgnoreCase)); - if (replacement == null || replacement.GamePaths.Length == 0) - { - Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash); - return; - } - - var fileExtension = replacement.GamePaths[0].Split(".")[^1]; - var finalFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, fileExtension); - Logger.LogDebug("Decompressing direct download {hash} from {compressedFile} to {finalFile}", directDownload.Hash, tempFilename, finalFilename); - byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename).ConfigureAwait(false); - var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes); - await _fileCompactor.WriteAllBytesAsync(finalFilename, decompressedBytes, CancellationToken.None).ConfigureAwait(false); - PersistFileToStorage(directDownload.Hash, finalFilename, replacement.GamePaths[0], skipDownscale); - - downloadTracker.TransferredFiles = 1; - Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash); - } - catch (Exception ex) - { - Logger.LogError(ex, "Exception downloading {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl); - } - } - catch (OperationCanceledException ex) - { - if (token.IsCancellationRequested) - { - Logger.LogDebug("{hash}: Direct download cancelled by caller, discarding file.", directDownload.Hash); - } - else - { - Logger.LogWarning(ex, "{hash}: Direct download cancelled unexpectedly.", directDownload.Hash); - } - - ClearDownload(); - return; - } - catch (Exception ex) - { - var expectedDirectDownloadFailure = ex is InvalidDataException; - var failureCount = 0; - - if (expectedDirectDownloadFailure) - { - Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash); - } - else - { - failureCount = Interlocked.Increment(ref _consecutiveDirectDownloadFailures); - Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash); - } - - try - { - downloadTracker.DownloadStatus = DownloadStatus.WaitingForQueue; - await PerformDirectDownloadFallbackAsync(directDownload, fileReplacement, progress, token, skipDownscale, slotAcquired).ConfigureAwait(false); - - if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads) - { - _disableDirectDownloads = true; - Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount); - } - } - catch (Exception fallbackEx) - { - if (slotAcquired) - { - _orchestrator.ReleaseDownloadSlot(); - slotAcquired = false; - } - - Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash); - ClearDownload(); - return; - } - } - finally - { - if (slotAcquired) - { - _orchestrator.ReleaseDownloadSlot(); - } - - try - { - File.Delete(tempFilename); - } - catch - { - // ignore - } - } - }); - - await Task.WhenAll(batchDownloadsTask, directDownloadsTask).ConfigureAwait(false); - - Logger.LogDebug("Download end: {id}", objectName); - - ClearDownload(); - } - - private async Task> FilesGetSizes(List hashes, CancellationToken ct) - { - if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized"); - var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false); - return await response.Content.ReadFromJsonAsync>(cancellationToken: ct).ConfigureAwait(false) ?? []; - } - - private void PersistFileToStorage(string fileHash, string filePath, string gamePath, bool skipDownscale) - { - var fi = new FileInfo(filePath); - Func RandomDayInThePast() - { - DateTime start = new(1995, 1, 1, 1, 1, 1, DateTimeKind.Local); - Random gen = new(); - int range = (DateTime.Today - start).Days; - return () => start.AddDays(gen.Next(range)); - } - - fi.CreationTime = RandomDayInThePast().Invoke(); - fi.LastAccessTime = DateTime.Today; - fi.LastWriteTime = RandomDayInThePast().Invoke(); - try - { - var entry = _fileDbManager.CreateCacheEntry(filePath); - var mapKind = _textureMetadataHelper.DetermineMapKind(gamePath, filePath); - if (!skipDownscale) - { - _textureDownscaleService.ScheduleDownscale(fileHash, filePath, mapKind); - } - if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase)) - { - Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry.Hash, fileHash); - File.Delete(filePath); - _fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath); - } - } - catch (Exception ex) - { - Logger.LogWarning(ex, "Error creating cache entry"); - } - } - private async Task WaitForDownloadReady(List downloadFileTransfer, Guid requestId, CancellationToken downloadCt) { bool alreadyCancelled = false; @@ -861,11 +419,17 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { if (downloadCt.IsCancellationRequested) throw; - var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId), - downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false); + var req = await _orchestrator.SendRequestAsync( + HttpMethod.Get, + LightlessFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId), + downloadFileTransfer.Select(c => c.Hash).ToList(), + downloadCt).ConfigureAwait(false); + req.EnsureSuccessStatusCode(); + localTimeoutCts.Dispose(); composite.Dispose(); + localTimeoutCts = new(); localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5)); composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token); @@ -881,12 +445,13 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { try { - await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false); + await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)) + .ConfigureAwait(false); alreadyCancelled = true; } - catch + catch { - // ignore whatever happens here + // ignore } throw; @@ -897,34 +462,537 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { try { - await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false); + await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)) + .ConfigureAwait(false); } - catch + catch { - // ignore whatever happens here + // ignore } } _orchestrator.ClearDownloadRequest(requestId); } } - private static IProgress CreateInlineProgress(Action callback) + private async Task DownloadQueuedBlockFileAsync( + string statusKey, + Guid requestId, + List transfers, + string tempPath, + IProgress progress, + CancellationToken ct) { - return new InlineProgress(callback); + Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", + requestId, transfers[0].DownloadUri, string.Join(", ", transfers.Select(c => c.Hash))); + + // Wait for ready WITHOUT holding a slot + SetStatus(statusKey, DownloadStatus.WaitingForQueue); + await WaitForDownloadReady(transfers, requestId, ct).ConfigureAwait(false); + + // Hold slot ONLY for the GET + SetStatus(statusKey, DownloadStatus.WaitingForSlot); + await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false)) + { + SetStatus(statusKey, DownloadStatus.Downloading); + + var requestUrl = LightlessFiles.CacheGetFullPath(transfers[0].DownloadUri, requestId); + await DownloadFileThrottled(requestUrl, tempPath, progress, MungeBuffer, ct, withToken: true).ConfigureAwait(false); + } } + private async Task DecompressBlockFileAsync( + string downloadStatusKey, + string blockFilePath, + Dictionary replacementLookup, + string downloadLabel, + CancellationToken ct, + bool skipDownscale) + { + SetStatus(downloadStatusKey, DownloadStatus.Decompressing); + MarkTransferredFiles(downloadStatusKey, 1); + + try + { + var fileBlockStream = File.OpenRead(blockFilePath); + await using (fileBlockStream.ConfigureAwait(false)) + { + while (fileBlockStream.Position < fileBlockStream.Length) + { + (string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream); + + try + { + if (fileLengthBytes < 0 || fileLengthBytes > int.MaxValue) + throw new InvalidDataException($"Invalid block entry length: {fileLengthBytes}"); + + if (!replacementLookup.TryGetValue(fileHash, out var repl)) + { + Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash); + // still need to skip bytes: + var skip = checked((int)fileLengthBytes); + fileBlockStream.Position += skip; + continue; + } + + var filePath = _fileDbManager.GetCacheFilePath(fileHash, repl.Extension); + + Logger.LogDebug("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath); + + var len = checked((int)fileLengthBytes); + var compressed = new byte[len]; + + await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false); + + MungeBuffer(compressed); + var decompressed = LZ4Wrapper.Unwrap(compressed); + + await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false); + PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale); + } + catch (EndOfStreamException) + { + Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", downloadLabel, fileHash); + } + catch (Exception e) + { + Logger.LogWarning(e, "{dlName}: Error during decompression", downloadLabel); + } + } + } + } + catch (EndOfStreamException) + { + Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", downloadLabel); + } + catch (Exception ex) + { + Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel); + } + } + + public async Task> InitiateDownloadList( + GameObjectHandler? gameObjectHandler, + List fileReplacement, + CancellationToken ct, + Guid? ownerToken = null) + { + CurrentOwnerToken = ownerToken; + var objectName = gameObjectHandler?.Name ?? "Unknown"; + Logger.LogDebug("Download start: {id}", objectName); + + if (fileReplacement == null || fileReplacement.Count == 0) + { + Logger.LogDebug("{dlName}: No file replacements provided", objectName); + CurrentDownloads = []; + return CurrentDownloads; + } + + var hashes = fileReplacement + .Where(f => f != null && !string.IsNullOrWhiteSpace(f.Hash)) + .Select(f => f.Hash) + .Distinct(StringComparer.Ordinal) + .ToList(); + + if (hashes.Count == 0) + { + Logger.LogDebug("{dlName}: No valid hashes to download", objectName); + CurrentDownloads = []; + return CurrentDownloads; + } + + List downloadFileInfoFromService = + [ + .. await FilesGetSizes(hashes, ct).ConfigureAwait(false), + ]; + + Logger.LogDebug("Files with size 0 or less: {files}", + string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash))); + + foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden)) + { + if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal))) + _orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto)); + } + + CurrentDownloads = downloadFileInfoFromService + .Distinct() + .Select(d => new DownloadFileTransfer(d)) + .Where(d => d.CanBeTransferred) + .ToList(); + + return CurrentDownloads; + } + + private sealed record BatchChunk(string Key, List Items); + + private static IEnumerable> ChunkList(List items, int chunkSize) + { + for (int i = 0; i < items.Count; i += chunkSize) + yield return items.GetRange(i, Math.Min(chunkSize, items.Count - i)); + } + + private async Task DownloadFilesInternal(GameObjectHandler? gameObjectHandler, List fileReplacement, CancellationToken ct, bool skipDownscale) + { + var objectName = gameObjectHandler?.Name ?? "Unknown"; + + // config toggles + var configAllowsDirect = _configService.Current.EnableDirectDownloads; + if (configAllowsDirect != _lastConfigDirectDownloadsState) + { + _lastConfigDirectDownloadsState = configAllowsDirect; + if (configAllowsDirect) + { + _disableDirectDownloads = false; + _consecutiveDirectDownloadFailures = 0; + } + } + + var allowDirectDownloads = ShouldUseDirectDownloads(); + var replacementLookup = BuildReplacementLookup(fileReplacement); + + var directDownloads = new List(); + var batchDownloads = new List(); + + foreach (var download in CurrentDownloads) + { + if (!string.IsNullOrEmpty(download.DirectDownloadUrl) && allowDirectDownloads) + directDownloads.Add(download); + else + batchDownloads.Add(download); + } + + // Chunk per host so we can fill all slots + var slots = Math.Max(1, _configService.Current.ParallelDownloads); + + var batchChunks = batchDownloads + .GroupBy(f => $"{f.DownloadUri.Host}:{f.DownloadUri.Port}", StringComparer.Ordinal) + .SelectMany(g => + { + var list = g.ToList(); + var chunkCount = Math.Min(slots, Math.Max(1, list.Count)); + var chunkSize = (int)Math.Ceiling(list.Count / (double)chunkCount); + + return ChunkList(list, chunkSize) + .Select(chunk => new BatchChunk(g.Key, chunk)); + }) + .ToArray(); + + // init statuses + lock (_downloadStatusLock) + { + _downloadStatus.Clear(); + + // direct downloads and batch downloads tracked separately + foreach (var d in directDownloads) + { + _downloadStatus[d.DirectDownloadUrl!] = new FileDownloadStatus + { + DownloadStatus = DownloadStatus.Initializing, + TotalBytes = d.Total, + TotalFiles = 1, + TransferredBytes = 0, + TransferredFiles = 0 + }; + } + + foreach (var g in batchChunks.GroupBy(c => c.Key, StringComparer.Ordinal)) + { + _downloadStatus[g.Key] = new FileDownloadStatus + { + DownloadStatus = DownloadStatus.Initializing, + TotalBytes = g.SelectMany(x => x.Items).Sum(x => x.Total), + TotalFiles = 1, + TransferredBytes = 0, + TransferredFiles = 0 + }; + } + } + + if (directDownloads.Count > 0 || batchChunks.Length > 0) + { + Logger.LogInformation("Downloading {direct} files directly, and {batchtotal} queued in {chunks} chunks.", + directDownloads.Count, batchDownloads.Count, batchChunks.Length); + } + + if (gameObjectHandler is not null) + Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus)); + + // allow some extra workers so downloads can continue while earlier items decompress. + var workerDop = Math.Clamp(slots * 2, 2, 16); + + // batch downloads + Task batchTask = batchChunks.Length == 0 + ? Task.CompletedTask + : Parallel.ForEachAsync(batchChunks, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct }, + async (chunk, token) => await ProcessBatchChunkAsync(chunk, replacementLookup, token, skipDownscale).ConfigureAwait(false)); + + // direct downloads + Task directTask = directDownloads.Count == 0 + ? Task.CompletedTask + : Parallel.ForEachAsync(directDownloads, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct }, + async (d, token) => await ProcessDirectAsync(d, replacementLookup, token, skipDownscale).ConfigureAwait(false)); + + await Task.WhenAll(batchTask, directTask).ConfigureAwait(false); + + Logger.LogDebug("Download end: {id}", objectName); + ClearDownload(); + } + + private async Task ProcessBatchChunkAsync(BatchChunk chunk, Dictionary replacementLookup, CancellationToken ct, bool skipDownscale) + { + var statusKey = chunk.Key; + + // enqueue (no slot) + SetStatus(statusKey, DownloadStatus.WaitingForQueue); + + var requestIdResponse = await _orchestrator.SendRequestAsync( + HttpMethod.Post, + LightlessFiles.RequestEnqueueFullPath(chunk.Items[0].DownloadUri), + chunk.Items.Select(c => c.Hash), + ct).ConfigureAwait(false); + + var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync(ct).ConfigureAwait(false)).Trim('"')); + + var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); + var fi = new FileInfo(blockFile); + + try + { + // download (with slot) + var progress = CreateInlineProgress(bytes => AddTransferredBytes(statusKey, bytes)); + + // Download slot held on get + await DownloadQueuedBlockFileAsync(statusKey, requestId, chunk.Items, blockFile, progress, ct).ConfigureAwait(false); + + // decompress if file exists + if (!File.Exists(blockFile)) + { + Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name); + return; + } + + await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, fi.Name, ct, skipDownscale).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, requestId); + } + catch (Exception ex) + { + Logger.LogError(ex, "{dlName}: Error during batch chunk processing", fi.Name); + ClearDownload(); + } + finally + { + try { File.Delete(blockFile); } catch { /* ignore */ } + } + } + + private async Task ProcessDirectAsync(DownloadFileTransfer directDownload, Dictionary replacementLookup, CancellationToken ct, bool skipDownscale) + { + var progress = CreateInlineProgress(bytes => + { + if (!string.IsNullOrEmpty(directDownload.DirectDownloadUrl)) + AddTransferredBytes(directDownload.DirectDownloadUrl!, bytes); + }); + + if (!ShouldUseDirectDownloads() || string.IsNullOrEmpty(directDownload.DirectDownloadUrl)) + { + await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, progress, ct, skipDownscale).ConfigureAwait(false); + return; + } + + var tempFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, "bin"); + + try + { + // Download slot held on get + SetStatus(directDownload.DirectDownloadUrl!, DownloadStatus.WaitingForSlot); + + await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false)) + { + SetStatus(directDownload.DirectDownloadUrl!, DownloadStatus.Downloading); + Logger.LogDebug("Beginning direct download of {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl); + + await DownloadFileThrottled(new Uri(directDownload.DirectDownloadUrl!), tempFilename, progress, callback: null, ct, withToken: false) + .ConfigureAwait(false); + } + + Interlocked.Exchange(ref _consecutiveDirectDownloadFailures, 0); + + // Decompress/write + SetStatus(directDownload.DirectDownloadUrl!, DownloadStatus.Decompressing); + + if (!replacementLookup.TryGetValue(directDownload.Hash, out var repl)) + { + Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash); + return; + } + + var finalFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, repl.Extension); + + Logger.LogDebug("Decompressing direct download {hash} from {compressedFile} to {finalFile}", + directDownload.Hash, tempFilename, finalFilename); + + // Read compressed bytes and decompress in memory + byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename, ct).ConfigureAwait(false); + var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes); + + await _fileCompactor.WriteAllBytesAsync(finalFilename, decompressedBytes, ct).ConfigureAwait(false); + PersistFileToStorage(directDownload.Hash, finalFilename, repl.GamePath, skipDownscale); + + MarkTransferredFiles(directDownload.DirectDownloadUrl!, 1); + Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash); + } + catch (OperationCanceledException ex) + { + if (ct.IsCancellationRequested) + Logger.LogDebug("{hash}: Direct download cancelled by caller, discarding file.", directDownload.Hash); + else + Logger.LogWarning(ex, "{hash}: Direct download cancelled unexpectedly.", directDownload.Hash); + + ClearDownload(); + } + catch (Exception ex) + { + var expectedDirectDownloadFailure = ex is InvalidDataException; + var failureCount = expectedDirectDownloadFailure ? 0 : Interlocked.Increment(ref _consecutiveDirectDownloadFailures); + + if (expectedDirectDownloadFailure) + Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash); + else + Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash); + + try + { + await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, progress, ct, skipDownscale).ConfigureAwait(false); + + if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads) + { + _disableDirectDownloads = true; + Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount); + } + } + catch (Exception fallbackEx) + { + Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash); + ClearDownload(); + } + } + finally + { + try { File.Delete(tempFilename); } + catch + { + // ignore + } + } + } + + private async Task ProcessDirectAsQueuedFallbackAsync( + DownloadFileTransfer directDownload, + Dictionary replacementLookup, + IProgress progress, + CancellationToken ct, + bool skipDownscale) + { + if (string.IsNullOrEmpty(directDownload.DirectDownloadUrl)) + throw new InvalidOperationException("Direct download fallback requested without a direct download URL."); + + var statusKey = directDownload.DirectDownloadUrl!; + + SetStatus(statusKey, DownloadStatus.WaitingForQueue); + + var requestIdResponse = await _orchestrator.SendRequestAsync( + HttpMethod.Post, + LightlessFiles.RequestEnqueueFullPath(directDownload.DownloadUri), + new[] { directDownload.Hash }, + ct).ConfigureAwait(false); + + var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync(ct).ConfigureAwait(false)).Trim('"')); + var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); + + try + { + await DownloadQueuedBlockFileAsync(statusKey, requestId, [directDownload], blockFile, progress, ct).ConfigureAwait(false); + + if (!File.Exists(blockFile)) + throw new FileNotFoundException("Block file missing after direct download fallback.", blockFile); + + await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, $"fallback-{directDownload.Hash}", ct, skipDownscale) + .ConfigureAwait(false); + } + finally + { + try { File.Delete(blockFile); } + catch + { + // ignore + } + } + } + + private async Task> FilesGetSizes(List hashes, CancellationToken ct) + { + if (!_orchestrator.IsInitialized) + throw new InvalidOperationException("FileTransferManager is not initialized"); + + // batch request + var response = await _orchestrator.SendRequestAsync( + HttpMethod.Get, + LightlessFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), + hashes, + ct).ConfigureAwait(false); + + // ensure success + return await response.Content.ReadFromJsonAsync>(cancellationToken: ct).ConfigureAwait(false) ?? []; + } + + private void PersistFileToStorage(string fileHash, string filePath, string gamePath, bool skipDownscale) + { + var fi = new FileInfo(filePath); + + Func RandomDayInThePast() + { + DateTime start = new(1995, 1, 1, 1, 1, 1, DateTimeKind.Local); + Random gen = new(); + int range = (DateTime.Today - start).Days; + return () => start.AddDays(gen.Next(range)); + } + + fi.CreationTime = RandomDayInThePast().Invoke(); + fi.LastAccessTime = DateTime.Today; + fi.LastWriteTime = RandomDayInThePast().Invoke(); + + try + { + var entry = _fileDbManager.CreateCacheEntry(filePath); + var mapKind = _textureMetadataHelper.DetermineMapKind(gamePath, filePath); + + if (!skipDownscale) + _textureDownscaleService.ScheduleDownscale(fileHash, filePath, mapKind); + + if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase)) + { + Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", + entry.Hash, fileHash); + + File.Delete(filePath); + _fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath); + } + } + catch (Exception ex) + { + Logger.LogWarning(ex, "Error creating cache entry"); + } + } + + private static IProgress CreateInlineProgress(Action callback) => new InlineProgress(callback); + private sealed class InlineProgress : IProgress { private readonly Action _callback; - - public InlineProgress(Action callback) - { - _callback = callback ?? throw new ArgumentNullException(nameof(callback)); - } - - public void Report(long value) - { - _callback(value); - } + public InlineProgress(Action callback) => _callback = callback ?? throw new ArgumentNullException(nameof(callback)); + public void Report(long value) => _callback(value); } } diff --git a/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs b/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs index ac77b23..9586577 100644 --- a/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs +++ b/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs @@ -18,56 +18,72 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase private readonly LightlessConfigService _lightlessConfig; private readonly object _semaphoreModificationLock = new(); private readonly TokenProvider _tokenProvider; + private int _availableDownloadSlots; private SemaphoreSlim _downloadSemaphore; + private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount; - public FileTransferOrchestrator(ILogger logger, LightlessConfigService lightlessConfig, - LightlessMediator mediator, TokenProvider tokenProvider, HttpClient httpClient) : base(logger, mediator) + public FileTransferOrchestrator( + ILogger logger, + LightlessConfigService lightlessConfig, + LightlessMediator mediator, + TokenProvider tokenProvider, + HttpClient httpClient) : base(logger, mediator) { _lightlessConfig = lightlessConfig; _tokenProvider = tokenProvider; _httpClient = httpClient; + var ver = Assembly.GetExecutingAssembly().GetName().Version; - _httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("LightlessSync", ver!.Major + "." + ver!.Minor + "." + ver!.Build)); + _httpClient.DefaultRequestHeaders.UserAgent.Add( + new ProductInfoHeaderValue("LightlessSync", $"{ver!.Major}.{ver.Minor}.{ver.Build}")); - _availableDownloadSlots = lightlessConfig.Current.ParallelDownloads; - _downloadSemaphore = new(_availableDownloadSlots, _availableDownloadSlots); + _availableDownloadSlots = Math.Max(1, lightlessConfig.Current.ParallelDownloads); + _downloadSemaphore = new SemaphoreSlim(_availableDownloadSlots, _availableDownloadSlots); - Mediator.Subscribe(this, (msg) => - { - FilesCdnUri = msg.Connection.ServerInfo.FileServerAddress; - }); - - Mediator.Subscribe(this, (msg) => - { - FilesCdnUri = null; - }); - Mediator.Subscribe(this, (msg) => - { - _downloadReady[msg.RequestId] = true; - }); + Mediator.Subscribe(this, msg => FilesCdnUri = msg.Connection.ServerInfo.FileServerAddress); + Mediator.Subscribe(this, _ => FilesCdnUri = null); + Mediator.Subscribe(this, msg => _downloadReady[msg.RequestId] = true); } + /// + /// Files CDN Uri from server + /// public Uri? FilesCdnUri { private set; get; } + + /// + /// Forbidden file transfers given by server + /// public List ForbiddenTransfers { get; } = []; + + /// + /// Is the FileTransferOrchestrator initialized + /// public bool IsInitialized => FilesCdnUri != null; - public void ClearDownloadRequest(Guid guid) - { - _downloadReady.Remove(guid, out _); - } + /// + /// Configured parallel downloads in settings (ParallelDownloads) + /// + public int ConfiguredParallelDownloads => Math.Max(1, _lightlessConfig.Current.ParallelDownloads); + /// + /// Clears the download request for the given guid + /// + /// Guid of download request + public void ClearDownloadRequest(Guid guid) => _downloadReady.Remove(guid, out _); + + /// + /// Is the download ready for the given guid + /// + /// Guid of download request + /// Completion of the download public bool IsDownloadReady(Guid guid) - { - if (_downloadReady.TryGetValue(guid, out bool isReady) && isReady) - { - return true; - } - - return false; - } + => _downloadReady.TryGetValue(guid, out bool isReady) && isReady; + /// + /// Release a download slot after download is complete + /// public void ReleaseDownloadSlot() { try @@ -81,60 +97,26 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase } } - public async Task SendRequestAsync(HttpMethod method, Uri uri, - CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, - bool withToken = true) - { - return await SendRequestInternalAsync(() => new HttpRequestMessage(method, uri), - ct, httpCompletionOption, withToken, allowRetry: true).ConfigureAwait(false); - } - - public async Task SendRequestAsync(HttpMethod method, Uri uri, T content, CancellationToken ct, - bool withToken = true) where T : class - { - return await SendRequestInternalAsync(() => - { - var requestMessage = new HttpRequestMessage(method, uri); - if (content is not ByteArrayContent byteArrayContent) - { - requestMessage.Content = JsonContent.Create(content); - } - else - { - var clonedContent = new ByteArrayContent(byteArrayContent.ReadAsByteArrayAsync().GetAwaiter().GetResult()); - foreach (var header in byteArrayContent.Headers) - { - clonedContent.Headers.TryAddWithoutValidation(header.Key, header.Value); - } - requestMessage.Content = clonedContent; - } - - return requestMessage; - }, ct, HttpCompletionOption.ResponseContentRead, withToken, - allowRetry: content is not HttpContent || content is ByteArrayContent).ConfigureAwait(false); - } - - public async Task SendRequestStreamAsync(HttpMethod method, Uri uri, ProgressableStreamContent content, - CancellationToken ct, bool withToken = true) - { - return await SendRequestInternalAsync(() => - { - var requestMessage = new HttpRequestMessage(method, uri) - { - Content = content - }; - return requestMessage; - }, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: false).ConfigureAwait(false); - } - + /// + /// Wait for an available download slot asyncronously + /// + /// Cancellation Token + /// Task of the slot public async Task WaitForDownloadSlotAsync(CancellationToken token) { lock (_semaphoreModificationLock) { - if (_availableDownloadSlots != _lightlessConfig.Current.ParallelDownloads && _availableDownloadSlots == _downloadSemaphore.CurrentCount) + var desired = Math.Max(1, _lightlessConfig.Current.ParallelDownloads); + + if (_availableDownloadSlots != desired && + _availableDownloadSlots == _downloadSemaphore.CurrentCount) { - _availableDownloadSlots = _lightlessConfig.Current.ParallelDownloads; - _downloadSemaphore = new(_availableDownloadSlots, _availableDownloadSlots); + _availableDownloadSlots = desired; + + var old = _downloadSemaphore; + _downloadSemaphore = new SemaphoreSlim(_availableDownloadSlots, _availableDownloadSlots); + + try { old.Dispose(); } catch { /* ignore */ } } } @@ -142,10 +124,15 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase Mediator.Publish(new DownloadLimitChangedMessage()); } + /// + /// Download limit per slot in bytes + /// + /// Bytes of the download limit public long DownloadLimitPerSlot() { var limit = _lightlessConfig.Current.DownloadSpeedLimitInBytes; if (limit <= 0) return 0; + limit = _lightlessConfig.Current.DownloadSpeedType switch { LightlessConfiguration.Models.DownloadSpeeds.Bps => limit, @@ -153,22 +140,113 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase LightlessConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024, _ => limit, }; - var currentUsedDlSlots = CurrentlyUsedDownloadSlots; - var avaialble = _availableDownloadSlots; - var currentCount = _downloadSemaphore.CurrentCount; - var dividedLimit = limit / (currentUsedDlSlots == 0 ? 1 : currentUsedDlSlots); - if (dividedLimit < 0) + + var usedSlots = CurrentlyUsedDownloadSlots; + var divided = limit / (usedSlots <= 0 ? 1 : usedSlots); + + if (divided < 0) { - Logger.LogWarning("Calculated Bandwidth Limit is negative, returning Infinity: {value}, CurrentlyUsedDownloadSlots is {currentSlots}, " + - "DownloadSpeedLimit is {limit}, available slots: {avail}, current count: {count}", dividedLimit, currentUsedDlSlots, limit, avaialble, currentCount); + Logger.LogWarning( + "Calculated Bandwidth Limit is negative, returning Infinity: {value}, usedSlots={usedSlots}, limit={limit}, avail={avail}, currentCount={count}", + divided, usedSlots, limit, _availableDownloadSlots, _downloadSemaphore.CurrentCount); return long.MaxValue; } - return Math.Clamp(dividedLimit, 1, long.MaxValue); + + return Math.Clamp(divided, 1, long.MaxValue); } - private async Task SendRequestInternalAsync(Func requestFactory, - CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, - bool withToken = true, bool allowRetry = true) + /// + /// sends an HTTP request without content serialization + /// + /// HttpMethod for the request + /// Uri for the request + /// Cancellation Token + /// Enum of HttpCollectionOption + /// Include Cancellation Token + /// Http response of the request + public async Task SendRequestAsync( + HttpMethod method, + Uri uri, + CancellationToken? ct = null, + HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, + bool withToken = true) + { + return await SendRequestInternalAsync( + () => new HttpRequestMessage(method, uri), + ct, + httpCompletionOption, + withToken, + allowRetry: true).ConfigureAwait(false); + } + + /// + /// Sends an HTTP request with JSON content serialization + /// + /// HttpResponseMessage + /// Http method + /// Url of the direct download link + /// content of the request + /// cancellation token + /// include cancellation token + /// + public async Task SendRequestAsync( + HttpMethod method, + Uri uri, + T content, + CancellationToken ct, + bool withToken = true) where T : class + { + return await SendRequestInternalAsync(() => + { + var requestMessage = new HttpRequestMessage(method, uri); + + if (content is ByteArrayContent byteArrayContent) + { + var bytes = byteArrayContent.ReadAsByteArrayAsync(ct).GetAwaiter().GetResult(); + var cloned = new ByteArrayContent(bytes); + foreach (var header in byteArrayContent.Headers) + cloned.Headers.TryAddWithoutValidation(header.Key, header.Value); + + requestMessage.Content = cloned; + } + else + { + requestMessage.Content = JsonContent.Create(content); + } + + return requestMessage; + }, ct, HttpCompletionOption.ResponseContentRead, withToken, + allowRetry: content is not HttpContent || content is ByteArrayContent).ConfigureAwait(false); + } + + public async Task SendRequestStreamAsync( + HttpMethod method, + Uri uri, + ProgressableStreamContent content, + CancellationToken ct, + bool withToken = true) + { + return await SendRequestInternalAsync(() => + { + return new HttpRequestMessage(method, uri) { Content = content }; + }, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: false).ConfigureAwait(false); + } + + /// + /// sends an HTTP request with optional retry logic for transient network errors + /// + /// Request factory + /// Cancellation Token + /// Http Options + /// With cancellation token + /// Allows retry of request + /// Response message of request + private async Task SendRequestInternalAsync( + Func requestFactory, + CancellationToken? ct = null, + HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, + bool withToken = true, + bool allowRetry = true) { const int maxAttempts = 2; var attempt = 0; @@ -184,8 +262,11 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token); } - if (requestMessage.Content != null && requestMessage.Content is not StreamContent && requestMessage.Content is not ByteArrayContent) + if (requestMessage.Content != null && + requestMessage.Content is not StreamContent && + requestMessage.Content is not ByteArrayContent) { + // log content for debugging var content = await ((JsonContent)requestMessage.Content).ReadAsStringAsync().ConfigureAwait(false); Logger.LogDebug("Sending {method} to {uri} (Content: {content})", requestMessage.Method, requestMessage.RequestUri, content); } @@ -196,9 +277,10 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase try { - if (ct != null) - return await _httpClient.SendAsync(requestMessage, httpCompletionOption, ct.Value).ConfigureAwait(false); - return await _httpClient.SendAsync(requestMessage, httpCompletionOption).ConfigureAwait(false); + // send request + return ct != null + ? await _httpClient.SendAsync(requestMessage, httpCompletionOption, ct.Value).ConfigureAwait(false) + : await _httpClient.SendAsync(requestMessage, httpCompletionOption).ConfigureAwait(false); } catch (TaskCanceledException) { @@ -208,14 +290,11 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase { Logger.LogWarning(ex, "Transient error during SendRequestInternal for {uri}, retrying attempt {attempt}/{maxAttempts}", requestMessage.RequestUri, attempt, maxAttempts); + if (ct.HasValue) - { await Task.Delay(TimeSpan.FromMilliseconds(200), ct.Value).ConfigureAwait(false); - } else - { await Task.Delay(TimeSpan.FromMilliseconds(200)).ConfigureAwait(false); - } } catch (Exception ex) { @@ -225,6 +304,11 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase } } + /// + /// Is the exception a transient network exception + /// + /// expection + /// Is transient network expection private static bool IsTransientNetworkException(Exception ex) { var current = ex; @@ -232,12 +316,13 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase { if (current is SocketException socketEx) { - return socketEx.SocketErrorCode is SocketError.ConnectionReset or SocketError.ConnectionAborted or SocketError.TimedOut; + return socketEx.SocketErrorCode is + SocketError.ConnectionReset or + SocketError.ConnectionAborted or + SocketError.TimedOut; } - current = current.InnerException; } - return false; } -} \ No newline at end of file +}