using K4os.Compression.LZ4; using K4os.Compression.LZ4.Legacy; using LightlessSync.API.Data; using LightlessSync.API.Dto.Files; using LightlessSync.API.Routes; using LightlessSync.FileCache; using LightlessSync.LightlessConfiguration; using LightlessSync.PlayerData.Handlers; using LightlessSync.Services.Mediator; using LightlessSync.Services.ModelDecimation; using LightlessSync.Services.TextureCompression; using LightlessSync.Utils; using LightlessSync.WebAPI.Files.Models; using Microsoft.Extensions.Logging; using System.Buffers; using System.Buffers.Binary; using System.Collections.Concurrent; using System.IO.MemoryMappedFiles; using System.Net; using System.Net.Http.Json; namespace LightlessSync.WebAPI.Files; public partial class FileDownloadManager : DisposableMediatorSubscriberBase { private readonly FileCompactor _fileCompactor; private readonly FileCacheManager _fileDbManager; private readonly FileTransferOrchestrator _orchestrator; private readonly LightlessConfigService _configService; private readonly TextureDownscaleService _textureDownscaleService; private readonly ModelDecimationService _modelDecimationService; private readonly TextureMetadataHelper _textureMetadataHelper; private readonly FileDownloadDeduplicator _downloadDeduplicator; private readonly ConcurrentDictionary _activeSessions = new(); private readonly ConcurrentDictionary> _downloadQueues = new(); private readonly TaskRegistry _downloadQueueWaiters = new(); private readonly ConcurrentDictionary _activeDownloadStreams; private readonly SemaphoreSlim _decompressGate = new(Math.Max(1, Environment.ProcessorCount / 2), Math.Max(1, Environment.ProcessorCount / 2)); private volatile bool _disableDirectDownloads; private int _consecutiveDirectDownloadFailures; private bool _lastConfigDirectDownloadsState; public FileDownloadManager( ILogger logger, LightlessMediator mediator, FileTransferOrchestrator orchestrator, FileCacheManager fileCacheManager, FileCompactor fileCompactor, LightlessConfigService configService, TextureDownscaleService textureDownscaleService, ModelDecimationService modelDecimationService, TextureMetadataHelper textureMetadataHelper, FileDownloadDeduplicator downloadDeduplicator) : base(logger, mediator) { _orchestrator = orchestrator; _fileDbManager = fileCacheManager; _fileCompactor = fileCompactor; _configService = configService; _textureDownscaleService = textureDownscaleService; _modelDecimationService = modelDecimationService; _textureMetadataHelper = textureMetadataHelper; _downloadDeduplicator = downloadDeduplicator; _activeDownloadStreams = new(); _lastConfigDirectDownloadsState = _configService.Current.EnableDirectDownloads; Mediator.Subscribe(this, _ => { if (_activeDownloadStreams.IsEmpty) return; var newLimit = _orchestrator.DownloadLimitPerSlot(); Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit); foreach (var stream in _activeDownloadStreams.Keys) stream.BandwidthLimit = newLimit; }); Mediator.Subscribe(this, _ => { Logger.LogDebug("Disconnected from server, clearing in-flight downloads"); ClearDownload(); _downloadDeduplicator.CompleteAll(false); }); } public List CurrentDownloads => _activeSessions.Values.SelectMany(s => s.Downloads).ToList(); public List ForbiddenTransfers => _orchestrator.ForbiddenTransfers; public bool IsDownloading => !_activeSessions.IsEmpty || _downloadQueues.Any(kvp => !kvp.Value.IsEmpty); public bool IsDownloadingFor(GameObjectHandler? handler) { if (handler is null) return false; return _activeSessions.ContainsKey(handler) || (_downloadQueues.TryGetValue(handler, out var queue) && !queue.IsEmpty); } public int GetPendingDownloadCount(GameObjectHandler? handler) { if (handler is null) return 0; var count = 0; if (_activeSessions.TryGetValue(handler, out var session)) count += session.Downloads.Count; if (_downloadQueues.TryGetValue(handler, out var queue)) { foreach (var request in queue) count += request.Session.Downloads.Count; } return count; } private bool ShouldUseDirectDownloads() => _configService.Current.EnableDirectDownloads && !_disableDirectDownloads; public static void MungeBuffer(Span buffer) { for (int i = 0; i < buffer.Length; ++i) buffer[i] ^= 42; } public void ClearDownload() { foreach (var session in _activeSessions.Values.ToList()) ClearDownload(session); } private void ClearDownload(DownloadSession session) { foreach (var hash in session.OwnedDownloads.Keys.ToList()) { CompleteOwnedDownload(session, hash, false); } session.Status.Clear(); session.OwnedDownloads.Clear(); session.Downloads.Clear(); if (session.Handler is not null) _activeSessions.TryRemove(session.Handler, out _); } public async Task DownloadFiles(GameObjectHandler? gameObject, List fileReplacementDto, CancellationToken ct, bool skipDownscale = false, bool skipDecimation = false) { var downloads = await InitiateDownloadList(gameObject, fileReplacementDto, ct).ConfigureAwait(false); await DownloadFiles(gameObject, fileReplacementDto, downloads, ct, skipDownscale, skipDecimation).ConfigureAwait(false); } public Task DownloadFiles(GameObjectHandler? gameObject, List fileReplacementDto, List downloads, CancellationToken ct, bool skipDownscale = false, bool skipDecimation = false) { var session = new DownloadSession(gameObject, downloads); var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var request = new DownloadRequest(session, fileReplacementDto, ct, skipDownscale, skipDecimation, completion); return EnqueueDownloadAsync(request); } private Task EnqueueDownloadAsync(DownloadRequest request) { var handler = request.Session.Handler; if (handler is null) { _ = ExecuteDownloadRequestAsync(request); return request.Completion.Task; } var queue = _downloadQueues.GetOrAdd(handler, _ => new ConcurrentQueue()); queue.Enqueue(request); _downloadQueueWaiters.GetOrStart(handler, () => ProcessDownloadQueueAsync(handler)); return request.Completion.Task; } private async Task ProcessDownloadQueueAsync(GameObjectHandler handler) { if (!_downloadQueues.TryGetValue(handler, out var queue)) return; while (true) { while (queue.TryDequeue(out var request)) { await ExecuteDownloadRequestAsync(request).ConfigureAwait(false); } await Task.Yield(); if (queue.IsEmpty) return; } } private async Task ExecuteDownloadRequestAsync(DownloadRequest request) { if (request.CancellationToken.IsCancellationRequested) { request.Completion.TrySetCanceled(request.CancellationToken); return; } var session = request.Session; if (session.Handler is not null) { _activeSessions[session.Handler] = session; } Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles))); try { await DownloadFilesInternal(session, request.Replacements, request.CancellationToken, request.SkipDownscale, request.SkipDecimation).ConfigureAwait(false); request.Completion.TrySetResult(true); } catch (OperationCanceledException) when (request.CancellationToken.IsCancellationRequested) { ClearDownload(session); request.Completion.TrySetCanceled(request.CancellationToken); } catch (Exception ex) { ClearDownload(session); request.Completion.TrySetException(ex); } finally { if (session.Handler is not null) { Mediator.Publish(new DownloadFinishedMessage(session.Handler)); _activeSessions.TryRemove(session.Handler, out _); } Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles))); } } protected override void Dispose(bool disposing) { ClearDownload(); foreach (var stream in _activeDownloadStreams.Keys.ToList()) { try { stream.Dispose(); } catch { // ignore } finally { _activeDownloadStreams.TryRemove(stream, out _); } } base.Dispose(disposing); } private sealed class DownloadSession { public DownloadSession(GameObjectHandler? handler, List downloads) { Handler = handler; ObjectName = handler?.Name ?? "Unknown"; Downloads = downloads; } public GameObjectHandler? Handler { get; } public string ObjectName { get; } public List Downloads { get; } public ConcurrentDictionary Status { get; } = new(StringComparer.Ordinal); public ConcurrentDictionary OwnedDownloads { get; } = new(StringComparer.OrdinalIgnoreCase); } private sealed record DownloadRequest( DownloadSession Session, List Replacements, CancellationToken CancellationToken, bool SkipDownscale, bool SkipDecimation, TaskCompletionSource Completion); private sealed class DownloadSlotLease : IAsyncDisposable { private readonly FileTransferOrchestrator _orch; private bool _released; public DownloadSlotLease(FileTransferOrchestrator orch) => _orch = orch; public ValueTask DisposeAsync() { if (!_released) { _released = true; _orch.ReleaseDownloadSlot(); } return ValueTask.CompletedTask; } } private async ValueTask AcquireSlotAsync(CancellationToken ct) { await _orchestrator.WaitForDownloadSlotAsync(ct).ConfigureAwait(false); return new DownloadSlotLease(_orchestrator); } private void SetStatus(DownloadSession session, string key, DownloadStatus status) { if (session.Status.TryGetValue(key, out var st)) st.DownloadStatus = status; } private void AddTransferredBytes(DownloadSession session, string key, long delta) { if (session.Status.TryGetValue(key, out var st)) st.AddTransferredBytes(delta); } private void MarkTransferredFiles(DownloadSession session, string key, int files) { if (session.Status.TryGetValue(key, out var st)) st.SetTransferredFiles(files); } private void CompleteOwnedDownload(DownloadSession session, string hash, bool success) { if (session.OwnedDownloads.TryRemove(hash, out _)) { _downloadDeduplicator.Complete(hash, success); } } private static byte MungeByte(int byteOrEof) { if (byteOrEof == -1) throw new EndOfStreamException(); return (byte)(byteOrEof ^ 42); } private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream) { List hashName = []; List fileLength = []; var separator = (char)MungeByte(fileBlockStream.ReadByte()); if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #"); bool readHash = false; while (true) { int readByte = fileBlockStream.ReadByte(); if (readByte == -1) throw new EndOfStreamException(); var readChar = (char)MungeByte(readByte); if (readChar == ':') { readHash = true; continue; } if (readChar == '#') break; if (!readHash) hashName.Add(readChar); else fileLength.Add(readChar); } return (string.Join("", hashName), long.Parse(string.Join("", fileLength))); } private static async Task ReadExactlyAsync(FileStream stream, Memory buffer, CancellationToken ct) { int offset = 0; while (offset < buffer.Length) { int n = await stream.ReadAsync(buffer.Slice(offset), ct).ConfigureAwait(false); if (n == 0) throw new EndOfStreamException(); offset += n; } } private static async Task CopyExactlyAsync(Stream source, Stream destination, long bytesToCopy, CancellationToken ct) { if (bytesToCopy <= 0) return; var buffer = ArrayPool.Shared.Rent(81920); try { long remaining = bytesToCopy; while (remaining > 0) { int read = await source.ReadAsync(buffer.AsMemory(0, (int)Math.Min(buffer.Length, remaining)), ct).ConfigureAwait(false); if (read == 0) throw new EndOfStreamException(); await destination.WriteAsync(buffer.AsMemory(0, read), ct).ConfigureAwait(false); remaining -= read; } } finally { ArrayPool.Shared.Return(buffer); } } private async Task DecompressWrappedLz4ToFileAsync(string compressedPath, string outputPath, CancellationToken ct) { await using var input = new FileStream(compressedPath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, useAsync: true); byte[] header = new byte[8]; await ReadExactlyAsync(input, header, ct).ConfigureAwait(false); int outputLength = BinaryPrimitives.ReadInt32LittleEndian(header.AsSpan(0, 4)); int inputLength = BinaryPrimitives.ReadInt32LittleEndian(header.AsSpan(4, 4)); if (outputLength < 0 || inputLength < 0) throw new InvalidDataException("LZ4 header contained a negative length."); long remainingLength = input.Length - 8; if (inputLength > remainingLength) throw new InvalidDataException("LZ4 header length exceeds file size."); var dir = Path.GetDirectoryName(outputPath); if (!string.IsNullOrEmpty(dir) && !Directory.Exists(dir)) Directory.CreateDirectory(dir); if (outputLength == 0) { await using var emptyStream = new FileStream(outputPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, useAsync: true); await emptyStream.FlushAsync(ct).ConfigureAwait(false); return 0; } if (inputLength >= outputLength) { await using var outputStream = new FileStream(outputPath, FileMode.Create, FileAccess.Write, FileShare.None, 81920, useAsync: true); await CopyExactlyAsync(input, outputStream, inputLength, ct).ConfigureAwait(false); await outputStream.FlushAsync(ct).ConfigureAwait(false); return outputLength; } await using var mappedOutputStream = new FileStream(outputPath, FileMode.Create, FileAccess.ReadWrite, FileShare.None, 4096, FileOptions.SequentialScan); mappedOutputStream.SetLength(outputLength); using var inputMap = MemoryMappedFile.CreateFromFile(compressedPath, FileMode.Open, null, 0, MemoryMappedFileAccess.Read); using var inputView = inputMap.CreateViewAccessor(8, inputLength, MemoryMappedFileAccess.Read); using var outputMap = MemoryMappedFile.CreateFromFile(mappedOutputStream, null, outputLength, MemoryMappedFileAccess.ReadWrite, HandleInheritability.None, leaveOpen: true); using var outputView = outputMap.CreateViewAccessor(0, outputLength, MemoryMappedFileAccess.Write); unsafe { byte* inputPtr = null; byte* outputPtr = null; try { inputView.SafeMemoryMappedViewHandle.AcquirePointer(ref inputPtr); outputView.SafeMemoryMappedViewHandle.AcquirePointer(ref outputPtr); inputPtr += inputView.PointerOffset; outputPtr += outputView.PointerOffset; int decoded = LZ4Codec.Decode(inputPtr, inputLength, outputPtr, outputLength); if (decoded != outputLength) throw new InvalidDataException($"LZ4 decode length mismatch (expected {outputLength}, got {decoded})."); } finally { if (inputPtr != null) inputView.SafeMemoryMappedViewHandle.ReleasePointer(); if (outputPtr != null) outputView.SafeMemoryMappedViewHandle.ReleasePointer(); } } outputView.Flush(); return outputLength; } private static Dictionary BuildReplacementLookup(List fileReplacement) { var map = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (var r in fileReplacement) { if (r == null || string.IsNullOrWhiteSpace(r.Hash)) continue; if (map.ContainsKey(r.Hash)) continue; var gamePath = r.GamePaths?.FirstOrDefault() ?? string.Empty; string ext = ""; try { ext = Path.GetExtension(gamePath)?.TrimStart('.') ?? ""; } catch { // ignore } if (string.IsNullOrWhiteSpace(ext)) ext = "bin"; map[r.Hash] = (ext, gamePath); } return map; } private delegate void DownloadDataCallback(Span data); private async Task DownloadFileThrottled( Uri requestUrl, string destinationFilename, IProgress progress, DownloadDataCallback? callback, CancellationToken ct, bool withToken) { const int maxRetries = 3; int retryCount = 0; TimeSpan retryDelay = TimeSpan.FromSeconds(2); HttpResponseMessage? response = null; while (true) { try { Logger.LogDebug("Attempt {attempt} - Downloading {requestUrl}", retryCount + 1, requestUrl); response = await _orchestrator.SendRequestAsync( HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead, withToken) .ConfigureAwait(false); response.EnsureSuccessStatusCode(); break; } catch (HttpRequestException ex) when (ex.InnerException is TimeoutException || ex.StatusCode == null) { response?.Dispose(); retryCount++; Logger.LogWarning(ex, "Timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries); if (retryCount >= maxRetries || ct.IsCancellationRequested) { Logger.LogError("Max retries reached or cancelled. Failing download for {requestUrl}", requestUrl); throw; } await Task.Delay(retryDelay, ct).ConfigureAwait(false); } catch (TaskCanceledException ex) when (!ct.IsCancellationRequested) { response?.Dispose(); retryCount++; Logger.LogWarning(ex, "Cancellation/timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries); if (retryCount >= maxRetries) { Logger.LogError("Max retries reached for {requestUrl} after TaskCanceledException", requestUrl); throw; } await Task.Delay(retryDelay, ct).ConfigureAwait(false); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { response?.Dispose(); throw; } catch (HttpRequestException ex) { response?.Dispose(); Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode); if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized) throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex); throw; } } ThrottledStream? stream = null; try { // Determine buffer size based on content length var contentLen = response!.Content.Headers.ContentLength ?? 0; var bufferSize = contentLen > 1024 * 1024 ? 65536 : 8196; var buffer = new byte[bufferSize]; // Create destination file stream var fileStream = new FileStream( destinationFilename, FileMode.Create, FileAccess.Write, FileShare.None, bufferSize: 64 * 1024, useAsync: true); // Download with throttling await using (fileStream.ConfigureAwait(false)) { var limit = _orchestrator.DownloadLimitPerSlot(); Logger.LogTrace("Starting Download with a speed limit of {limit} to {destination}", limit, destinationFilename); stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit); _activeDownloadStreams.TryAdd(stream, 0); while (true) { ct.ThrowIfCancellationRequested(); int bytesRead = await stream.ReadAsync(buffer.AsMemory(0, buffer.Length), ct).ConfigureAwait(false); if (bytesRead == 0) break; callback?.Invoke(buffer.AsSpan(0, bytesRead)); await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false); progress.Report(bytesRead); } Logger.LogDebug("{requestUrl} downloaded to {destination}", requestUrl, destinationFilename); } } catch { try { if (!string.IsNullOrEmpty(destinationFilename) && File.Exists(destinationFilename)) File.Delete(destinationFilename); } catch { // ignore } throw; } finally { if (stream != null) { _activeDownloadStreams.TryRemove(stream, out _); await stream.DisposeAsync().ConfigureAwait(false); } response?.Dispose(); } } private async Task WaitForDownloadReady(List downloadFileTransfer, Guid requestId, CancellationToken downloadCt) { while (true) { downloadCt.ThrowIfCancellationRequested(); if (_orchestrator.IsDownloadReady(requestId)) break; using var resp = await _orchestrator.SendRequestAsync( HttpMethod.Get, LightlessFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId), downloadFileTransfer.Select(t => t.Hash).ToList(), downloadCt).ConfigureAwait(false); resp.EnsureSuccessStatusCode(); var body = (await resp.Content.ReadAsStringAsync(downloadCt).ConfigureAwait(false)).Trim(); if (string.Equals(body, "true", StringComparison.OrdinalIgnoreCase) || body.Contains("\"ready\":true", StringComparison.OrdinalIgnoreCase)) { break; } await Task.Delay(250, downloadCt).ConfigureAwait(false); } _orchestrator.ClearDownloadRequest(requestId); } private async Task DownloadQueuedBlockFileAsync( DownloadSession session, string statusKey, Guid requestId, List transfers, string tempPath, IProgress progress, CancellationToken ct) { Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, transfers[0].DownloadUri, string.Join(", ", transfers.Select(c => c.Hash))); // Wait for ready WITHOUT holding a slot SetStatus(session, statusKey, DownloadStatus.WaitingForQueue); await WaitForDownloadReady(transfers, requestId, ct).ConfigureAwait(false); // Hold slot ONLY for the GET SetStatus(session, statusKey, DownloadStatus.WaitingForSlot); await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false)) { SetStatus(session, statusKey, DownloadStatus.Downloading); var requestUrl = LightlessFiles.CacheGetFullPath(transfers[0].DownloadUri, requestId); await DownloadFileThrottled(requestUrl, tempPath, progress, MungeBuffer, ct, withToken: true).ConfigureAwait(false); } } private async Task DecompressBlockFileAsync( DownloadSession session, string downloadStatusKey, string blockFilePath, Dictionary replacementLookup, IReadOnlyDictionary rawSizeLookup, string downloadLabel, CancellationToken ct, bool skipDownscale, bool skipDecimation) { SetStatus(session, downloadStatusKey, DownloadStatus.Decompressing); MarkTransferredFiles(session, downloadStatusKey, 1); try { var fileBlockStream = File.OpenRead(blockFilePath); await using (fileBlockStream.ConfigureAwait(false)) { while (fileBlockStream.Position < fileBlockStream.Length) { (string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream); try { if (fileLengthBytes < 0 || fileLengthBytes > int.MaxValue) throw new InvalidDataException($"Invalid block entry length: {fileLengthBytes}"); var len = checked((int)fileLengthBytes); if (!replacementLookup.TryGetValue(fileHash, out var repl)) { Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash); CompleteOwnedDownload(session, fileHash, false); // still need to skip bytes: var skip = checked((int)fileLengthBytes); fileBlockStream.Position += skip; continue; } var filePath = _fileDbManager.GetCacheFilePath(fileHash, repl.Extension); Logger.LogTrace("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath); var compressed = new byte[len]; await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false); MungeBuffer(compressed); var decompressed = LZ4Wrapper.Unwrap(compressed); if (rawSizeLookup.TryGetValue(fileHash, out var expectedRawSize) && expectedRawSize > 0 && decompressed.LongLength != expectedRawSize) { Logger.LogWarning("{dlName}: Decompressed size mismatch for {fileHash} (expected {expected}, got {actual})", downloadLabel, fileHash, expectedRawSize, decompressed.LongLength); CompleteOwnedDownload(session, fileHash, false); continue; } await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false); PersistFileToStorage(session, fileHash, filePath, repl.GamePath, skipDownscale, skipDecimation); } catch (EndOfStreamException) { Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", downloadLabel, fileHash); CompleteOwnedDownload(session, fileHash, false); } catch (Exception e) { Logger.LogWarning(e, "{dlName}: Error during decompression", downloadLabel); CompleteOwnedDownload(session, fileHash, false); } } } SetStatus(session, downloadStatusKey, DownloadStatus.Completed); } catch (EndOfStreamException) { Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", downloadLabel); } catch (Exception ex) { Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel); } } public async Task> InitiateDownloadList( GameObjectHandler? gameObjectHandler, List fileReplacement, CancellationToken ct, Guid? ownerToken = null) { _ = ownerToken; var objectName = gameObjectHandler?.Name ?? "Unknown"; Logger.LogDebug("Download start: {id}", objectName); if (fileReplacement == null || fileReplacement.Count == 0) { Logger.LogDebug("{dlName}: No file replacements provided", objectName); return []; } var hashes = fileReplacement .Where(f => f != null && !string.IsNullOrWhiteSpace(f.Hash)) .Select(f => f.Hash) .Distinct(StringComparer.Ordinal) .ToList(); if (hashes.Count == 0) { Logger.LogDebug("{dlName}: No valid hashes to download", objectName); return []; } var missingHashes = new List(hashes.Count); foreach (var hash in hashes) { if (_fileDbManager.GetFileCacheByHash(hash) is null) { missingHashes.Add(hash); } } if (missingHashes.Count == 0) { Logger.LogDebug("{dlName}: All requested hashes already present in cache", objectName); return []; } if (missingHashes.Count < hashes.Count) { Logger.LogDebug("{dlName}: Skipping {count} hashes already present in cache", objectName, hashes.Count - missingHashes.Count); } List downloadFileInfoFromService = [ .. await FilesGetSizes(missingHashes, ct).ConfigureAwait(false), ]; Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash))); foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden)) { if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal))) _orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto)); } var downloads = downloadFileInfoFromService .Distinct() .Select(d => new DownloadFileTransfer(d)) .Where(d => d.CanBeTransferred) .ToList(); return downloads; } private sealed record BatchChunk(string HostKey, string StatusKey, List Items); private static IEnumerable> ChunkList(List items, int chunkSize) { for (int i = 0; i < items.Count; i += chunkSize) yield return items.GetRange(i, Math.Min(chunkSize, items.Count - i)); } private async Task DownloadFilesInternal(DownloadSession session, List fileReplacement, CancellationToken ct, bool skipDownscale, bool skipDecimation) { var objectName = session.ObjectName; // config toggles var configAllowsDirect = _configService.Current.EnableDirectDownloads; if (configAllowsDirect != _lastConfigDirectDownloadsState) { _lastConfigDirectDownloadsState = configAllowsDirect; if (configAllowsDirect) { _disableDirectDownloads = false; _consecutiveDirectDownloadFailures = 0; } } var allowDirectDownloads = ShouldUseDirectDownloads(); var replacementLookup = BuildReplacementLookup(fileReplacement); var rawSizeLookup = new Dictionary(StringComparer.OrdinalIgnoreCase); foreach (var download in session.Downloads) { if (string.IsNullOrWhiteSpace(download.Hash)) { continue; } if (!rawSizeLookup.TryGetValue(download.Hash, out var existing) || existing <= 0) { rawSizeLookup[download.Hash] = download.TotalRaw; } } var directDownloads = new List(); var batchDownloads = new List(); foreach (var download in session.Downloads) { if (!string.IsNullOrEmpty(download.DirectDownloadUrl) && allowDirectDownloads) directDownloads.Add(download); else batchDownloads.Add(download); } session.OwnedDownloads.Clear(); var waitingHashes = new HashSet(StringComparer.OrdinalIgnoreCase); var waitTasks = new List>(); var claims = new Dictionary(StringComparer.OrdinalIgnoreCase); DownloadClaim GetClaim(string hash) { if (!claims.TryGetValue(hash, out var claim)) { claim = _downloadDeduplicator.Claim(hash); claims[hash] = claim; } return claim; } List FilterOwned(List downloads) { if (downloads.Count == 0) { return downloads; } var owned = new List(downloads.Count); foreach (var download in downloads) { if (string.IsNullOrWhiteSpace(download.Hash)) { continue; } var claim = GetClaim(download.Hash); if (claim.IsOwner) { session.OwnedDownloads.TryAdd(download.Hash, 0); owned.Add(download); } else if (waitingHashes.Add(download.Hash)) { waitTasks.Add(claim.Completion); } } return owned; } directDownloads = FilterOwned(directDownloads); batchDownloads = FilterOwned(batchDownloads); if (waitTasks.Count > 0) { Logger.LogDebug("{dlName}: {count} files already downloading elsewhere; waiting for completion.", objectName, waitTasks.Count); } // Chunk per host so we can fill all slots var slots = Math.Max(1, _configService.Current.ParallelDownloads); var batchChunks = batchDownloads .GroupBy(f => $"{f.DownloadUri.Host}:{f.DownloadUri.Port}", StringComparer.Ordinal) .SelectMany(g => { var list = g.ToList(); var chunkCount = Math.Min(slots, Math.Max(1, list.Count)); var chunkSize = (int)Math.Ceiling(list.Count / (double)chunkCount); return ChunkList(list, chunkSize) .Select((chunk, index) => new BatchChunk(g.Key, $"{g.Key}#{index + 1}", chunk)); }) .ToArray(); // init statuses session.Status.Clear(); // direct downloads and batch downloads tracked separately foreach (var d in directDownloads) { session.Status[d.DirectDownloadUrl!] = new FileDownloadStatus { DownloadStatus = DownloadStatus.WaitingForSlot, TotalBytes = d.Total, TotalFiles = 1, TransferredBytes = 0, TransferredFiles = 0 }; } foreach (var chunk in batchChunks) { session.Status[chunk.StatusKey] = new FileDownloadStatus { DownloadStatus = DownloadStatus.WaitingForQueue, TotalBytes = chunk.Items.Sum(x => x.Total), TotalFiles = 1, TransferredBytes = 0, TransferredFiles = 0 }; } if (directDownloads.Count > 0 || batchChunks.Length > 0) { Logger.LogInformation("Downloading {direct} files directly, and {batchtotal} queued in {chunks} chunks.", directDownloads.Count, batchDownloads.Count, batchChunks.Length); } if (session.Handler is not null) Mediator.Publish(new DownloadStartedMessage(session.Handler, session.Status)); // work based on cpu count and slots var coreCount = Environment.ProcessorCount; var baseWorkers = Math.Min(slots, coreCount); // only add buffer if decompression has capacity AND we have cores to spare var availableDecompressSlots = _decompressGate.CurrentCount; var extraWorkers = (availableDecompressSlots > 0 && coreCount >= 6) ? 2 : 0; // allow some extra workers so downloads can continue while earlier items decompress. var workerDop = Math.Clamp(slots * 2, 2, 16); var decompressionTasks = new ConcurrentBag(); using var decompressionLimiter = new SemaphoreSlim(CalculateDecompressionLimit(slots)); // batch downloads Task batchTask = batchChunks.Length == 0 ? Task.CompletedTask : Parallel.ForEachAsync(batchChunks, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct }, async (chunk, token) => await ProcessBatchChunkAsync(session, chunk, replacementLookup, rawSizeLookup, decompressionTasks, decompressionLimiter, token, skipDownscale, skipDecimation).ConfigureAwait(false)); // direct downloads Task directTask = directDownloads.Count == 0 ? Task.CompletedTask : Parallel.ForEachAsync(directDownloads, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct }, async (d, token) => await ProcessDirectAsync(session, d, replacementLookup, rawSizeLookup, decompressionTasks, decompressionLimiter, token, skipDownscale, skipDecimation).ConfigureAwait(false)); Task dedupWaitTask = waitTasks.Count == 0 ? Task.FromResult(Array.Empty()) : Task.WhenAll(waitTasks); try { await Task.WhenAll(batchTask, directTask).ConfigureAwait(false); } finally { await WaitForAllTasksAsync(decompressionTasks).ConfigureAwait(false); } var dedupResults = await dedupWaitTask.ConfigureAwait(false); if (waitTasks.Count > 0 && dedupResults.Any(r => !r)) { Logger.LogWarning("{dlName}: One or more shared downloads failed; missing files may remain.", objectName); } Logger.LogDebug("Download end: {id}", objectName); ClearDownload(session); } private async Task ProcessBatchChunkAsync( DownloadSession session, BatchChunk chunk, Dictionary replacementLookup, IReadOnlyDictionary rawSizeLookup, ConcurrentBag decompressionTasks, SemaphoreSlim decompressionLimiter, CancellationToken ct, bool skipDownscale, bool skipDecimation) { var statusKey = chunk.StatusKey; // enqueue (no slot) SetStatus(session, statusKey, DownloadStatus.WaitingForQueue); var requestIdResponse = await _orchestrator.SendRequestAsync( HttpMethod.Post, LightlessFiles.RequestEnqueueFullPath(chunk.Items[0].DownloadUri), chunk.Items.Select(c => c.Hash), ct).ConfigureAwait(false); var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync(ct).ConfigureAwait(false)).Trim('"')); var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); var fi = new FileInfo(blockFile); var decompressionQueued = false; try { // download (with slot) var progress = CreateInlineProgress(bytes => AddTransferredBytes(session, statusKey, bytes)); // Download slot held on get await DownloadQueuedBlockFileAsync(session, statusKey, requestId, chunk.Items, blockFile, progress, ct).ConfigureAwait(false); // decompress if file exists if (!File.Exists(blockFile)) { Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name); SetStatus(session, statusKey, DownloadStatus.Completed); return; } SetStatus(session, statusKey, DownloadStatus.Decompressing); EnqueueLimitedTask( decompressionTasks, decompressionLimiter, async token => { try { await DecompressBlockFileAsync(session, statusKey, blockFile, replacementLookup, rawSizeLookup, fi.Name, token, skipDownscale, skipDecimation) .ConfigureAwait(false); } finally { try { File.Delete(blockFile); } catch {} foreach (var item in chunk.Items) { if (!string.IsNullOrWhiteSpace(item.Hash)) { CompleteOwnedDownload(session, item.Hash, false); } } } }, ct); decompressionQueued = true; } catch (OperationCanceledException) { Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, requestId); } catch (Exception ex) { Logger.LogError(ex, "{dlName}: Error during batch chunk processing", fi.Name); ClearDownload(session); } finally { if (!decompressionQueued) { try { File.Delete(blockFile); } catch { /* ignore */ } foreach (var item in chunk.Items) { if (!string.IsNullOrWhiteSpace(item.Hash)) { CompleteOwnedDownload(session, item.Hash, false); } } } } } private async Task ProcessDirectAsync( DownloadSession session, DownloadFileTransfer directDownload, Dictionary replacementLookup, IReadOnlyDictionary rawSizeLookup, ConcurrentBag decompressionTasks, SemaphoreSlim decompressionLimiter, CancellationToken ct, bool skipDownscale, bool skipDecimation) { var progress = CreateInlineProgress(bytes => { if (!string.IsNullOrEmpty(directDownload.DirectDownloadUrl)) AddTransferredBytes(session, directDownload.DirectDownloadUrl!, bytes); }); if (!ShouldUseDirectDownloads() || string.IsNullOrEmpty(directDownload.DirectDownloadUrl)) { try { await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false); } catch (Exception ex) { Logger.LogError(ex, "{hash}: Error during direct download fallback.", directDownload.Hash); CompleteOwnedDownload(session, directDownload.Hash, false); throw; } return; } var tempFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, "bin"); var decompressionQueued = false; try { // Download slot held on get SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.WaitingForSlot); await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false)) { SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Downloading); Logger.LogDebug("Beginning direct download of {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl); await DownloadFileThrottled(new Uri(directDownload.DirectDownloadUrl!), tempFilename, progress, callback: null, ct, withToken: false) .ConfigureAwait(false); } Interlocked.Exchange(ref _consecutiveDirectDownloadFailures, 0); if (!replacementLookup.TryGetValue(directDownload.Hash, out var repl)) { Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash); SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed); CompleteOwnedDownload(session, directDownload.Hash, false); return; } var finalFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, repl.Extension); Logger.LogDebug("Decompressing direct download {hash} from {compressedFile} to {finalFile}", directDownload.Hash, tempFilename, finalFilename); SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Decompressing); EnqueueLimitedTask( decompressionTasks, decompressionLimiter, async token => { try { var decompressedLength = await DecompressWrappedLz4ToFileAsync(tempFilename, finalFilename, token).ConfigureAwait(false); if (directDownload.TotalRaw > 0 && decompressedLength != directDownload.TotalRaw) { throw new InvalidDataException( $"{directDownload.Hash}: Decompressed size mismatch (expected {directDownload.TotalRaw}, got {decompressedLength})"); } _fileCompactor.NotifyFileWritten(finalFilename); PersistFileToStorage(session, directDownload.Hash, finalFilename, repl.GamePath, skipDownscale, skipDecimation); MarkTransferredFiles(session, directDownload.DirectDownloadUrl!, 1); SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed); Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash); } catch (Exception ex) { var expectedDirectDownloadFailure = ex is InvalidDataException; var failureCount = expectedDirectDownloadFailure ? 0 : Interlocked.Increment(ref _consecutiveDirectDownloadFailures); if (expectedDirectDownloadFailure) Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash); else Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash); try { await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, token, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false); if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads) { _disableDirectDownloads = true; Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount); } } catch (Exception fallbackEx) { Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash); CompleteOwnedDownload(session, directDownload.Hash, false); SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed); ClearDownload(session); } } finally { try { File.Delete(tempFilename); } catch { // ignore } } }, ct); decompressionQueued = true; } catch (OperationCanceledException ex) { if (ct.IsCancellationRequested) Logger.LogDebug("{hash}: Direct download cancelled by caller, discarding file.", directDownload.Hash); else Logger.LogWarning(ex, "{hash}: Direct download cancelled unexpectedly.", directDownload.Hash); CompleteOwnedDownload(session, directDownload.Hash, false); ClearDownload(session); } catch (Exception ex) { var expectedDirectDownloadFailure = ex is InvalidDataException; var failureCount = expectedDirectDownloadFailure ? 0 : Interlocked.Increment(ref _consecutiveDirectDownloadFailures); if (expectedDirectDownloadFailure) Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash); else Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash); try { await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false); if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads) { _disableDirectDownloads = true; Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount); } } catch (Exception fallbackEx) { Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash); CompleteOwnedDownload(session, directDownload.Hash, false); ClearDownload(session); } } finally { if (!decompressionQueued) { try { File.Delete(tempFilename); } catch { // ignore } } } } private async Task ProcessDirectAsQueuedFallbackAsync( DownloadSession session, DownloadFileTransfer directDownload, Dictionary replacementLookup, IReadOnlyDictionary rawSizeLookup, IProgress progress, CancellationToken ct, bool skipDownscale, bool skipDecimation, ConcurrentBag decompressionTasks, SemaphoreSlim decompressionLimiter) { if (string.IsNullOrEmpty(directDownload.DirectDownloadUrl)) throw new InvalidOperationException("Direct download fallback requested without a direct download URL."); var statusKey = directDownload.DirectDownloadUrl!; SetStatus(session, statusKey, DownloadStatus.WaitingForQueue); var requestIdResponse = await _orchestrator.SendRequestAsync( HttpMethod.Post, LightlessFiles.RequestEnqueueFullPath(directDownload.DownloadUri), new[] { directDownload.Hash }, ct).ConfigureAwait(false); var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync(ct).ConfigureAwait(false)).Trim('"')); var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); var fi = new FileInfo(blockFile); var decompressionQueued = false; try { await DownloadQueuedBlockFileAsync(session, statusKey, requestId, [directDownload], blockFile, progress, ct).ConfigureAwait(false); if (!File.Exists(blockFile)) { Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name); SetStatus(session, statusKey, DownloadStatus.Completed); return; } SetStatus(session, statusKey, DownloadStatus.Decompressing); EnqueueLimitedTask( decompressionTasks, decompressionLimiter, async token => { try { await DecompressBlockFileAsync(session, statusKey, blockFile, replacementLookup, rawSizeLookup, $"fallback-{directDownload.Hash}", token, skipDownscale, skipDecimation) .ConfigureAwait(false); } finally { try { File.Delete(blockFile); } catch {} CompleteOwnedDownload(session, directDownload.Hash, false); } }, ct); decompressionQueued = true; } finally { if (!decompressionQueued) { try { File.Delete(blockFile); } catch {} CompleteOwnedDownload(session, directDownload.Hash, false); } } } private async Task> FilesGetSizes(List hashes, CancellationToken ct) { if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized"); var response = await _orchestrator.SendRequestAsync( HttpMethod.Get, LightlessFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false); return await response.Content.ReadFromJsonAsync>(cancellationToken: ct).ConfigureAwait(false) ?? []; } private bool PersistFileToStorage(DownloadSession session, string fileHash, string filePath, string gamePath, bool skipDownscale, bool skipDecimation) { var fi = new FileInfo(filePath); var persisted = false; Func RandomDayInThePast() { DateTime start = new(1995, 1, 1, 1, 1, 1, DateTimeKind.Local); Random gen = new(); int range = (DateTime.Today - start).Days; return () => start.AddDays(gen.Next(range)); } fi.CreationTime = RandomDayInThePast().Invoke(); fi.LastAccessTime = DateTime.Today; fi.LastWriteTime = RandomDayInThePast().Invoke(); try { var entry = _fileDbManager.CreateCacheEntryWithKnownHash(filePath, fileHash); if (entry != null && string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase)) { persisted = true; } if (!skipDownscale && _textureDownscaleService.ShouldScheduleDownscale(filePath)) { _textureDownscaleService.ScheduleDownscale( fileHash, filePath, () => _textureMetadataHelper.DetermineMapKind(gamePath, filePath)); } if (!skipDecimation && _modelDecimationService.ShouldScheduleDecimation(fileHash, filePath, gamePath)) { _modelDecimationService.ScheduleDecimation(fileHash, filePath, gamePath); } if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase)) { Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry.Hash, fileHash); File.Delete(filePath); _fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath); persisted = false; } } catch (Exception ex) { Logger.LogWarning(ex, "Error creating cache entry"); } finally { CompleteOwnedDownload(session, fileHash, persisted); } return persisted; } private static int CalculateDecompressionLimit(int downloadSlots) { var cpuBound = Math.Max(1, Math.Min(Environment.ProcessorCount, 4)); return Math.Clamp(downloadSlots, 1, cpuBound); } private static Task EnqueueLimitedTask( ConcurrentBag tasks, SemaphoreSlim limiter, Func work, CancellationToken ct) { var task = Task.Run(async () => { await limiter.WaitAsync(ct).ConfigureAwait(false); try { await work(ct).ConfigureAwait(false); } finally { limiter.Release(); } }, ct); tasks.Add(task); return task; } private static async Task WaitForAllTasksAsync(ConcurrentBag tasks) { while (true) { var snapshot = tasks.ToArray(); if (snapshot.Length == 0) return; await Task.WhenAll(snapshot).ConfigureAwait(false); if (tasks.Count == snapshot.Length) return; } } private static IProgress CreateInlineProgress(Action callback) => new InlineProgress(callback); private sealed class InlineProgress : IProgress { private readonly Action _callback; public InlineProgress(Action callback) => _callback = callback ?? throw new ArgumentNullException(nameof(callback)); public void Report(long value) => _callback(value); } }