1545 lines
60 KiB
C#
1545 lines
60 KiB
C#
using K4os.Compression.LZ4;
|
|
using K4os.Compression.LZ4.Legacy;
|
|
using LightlessSync.API.Data;
|
|
using LightlessSync.API.Dto.Files;
|
|
using LightlessSync.API.Routes;
|
|
using LightlessSync.FileCache;
|
|
using LightlessSync.LightlessConfiguration;
|
|
using LightlessSync.PlayerData.Handlers;
|
|
using LightlessSync.Services.Mediator;
|
|
using LightlessSync.Services.ModelDecimation;
|
|
using LightlessSync.Services.TextureCompression;
|
|
using LightlessSync.Utils;
|
|
using LightlessSync.WebAPI.Files.Models;
|
|
using Microsoft.Extensions.Logging;
|
|
using System.Buffers;
|
|
using System.Buffers.Binary;
|
|
using System.Collections.Concurrent;
|
|
using System.IO.MemoryMappedFiles;
|
|
using System.Net;
|
|
using System.Net.Http.Json;
|
|
|
|
namespace LightlessSync.WebAPI.Files;
|
|
|
|
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|
{
|
|
private readonly FileCompactor _fileCompactor;
|
|
private readonly FileCacheManager _fileDbManager;
|
|
private readonly FileTransferOrchestrator _orchestrator;
|
|
private readonly LightlessConfigService _configService;
|
|
private readonly TextureDownscaleService _textureDownscaleService;
|
|
private readonly ModelDecimationService _modelDecimationService;
|
|
private readonly TextureMetadataHelper _textureMetadataHelper;
|
|
private readonly FileDownloadDeduplicator _downloadDeduplicator;
|
|
|
|
private readonly ConcurrentDictionary<GameObjectHandler, DownloadSession> _activeSessions = new();
|
|
private readonly ConcurrentDictionary<GameObjectHandler, ConcurrentQueue<DownloadRequest>> _downloadQueues = new();
|
|
private readonly TaskRegistry<GameObjectHandler> _downloadQueueWaiters = new();
|
|
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
|
|
private readonly SemaphoreSlim _decompressGate =
|
|
new(Math.Max(1, Environment.ProcessorCount / 2), Math.Max(1, Environment.ProcessorCount / 2));
|
|
|
|
private volatile bool _disableDirectDownloads;
|
|
private int _consecutiveDirectDownloadFailures;
|
|
private bool _lastConfigDirectDownloadsState;
|
|
|
|
public FileDownloadManager(
|
|
ILogger<FileDownloadManager> logger,
|
|
LightlessMediator mediator,
|
|
FileTransferOrchestrator orchestrator,
|
|
FileCacheManager fileCacheManager,
|
|
FileCompactor fileCompactor,
|
|
LightlessConfigService configService,
|
|
TextureDownscaleService textureDownscaleService,
|
|
ModelDecimationService modelDecimationService,
|
|
TextureMetadataHelper textureMetadataHelper,
|
|
FileDownloadDeduplicator downloadDeduplicator) : base(logger, mediator)
|
|
{
|
|
_orchestrator = orchestrator;
|
|
_fileDbManager = fileCacheManager;
|
|
_fileCompactor = fileCompactor;
|
|
_configService = configService;
|
|
_textureDownscaleService = textureDownscaleService;
|
|
_modelDecimationService = modelDecimationService;
|
|
_textureMetadataHelper = textureMetadataHelper;
|
|
_downloadDeduplicator = downloadDeduplicator;
|
|
_activeDownloadStreams = new();
|
|
_lastConfigDirectDownloadsState = _configService.Current.EnableDirectDownloads;
|
|
|
|
Mediator.Subscribe<DownloadLimitChangedMessage>(this, _ =>
|
|
{
|
|
if (_activeDownloadStreams.IsEmpty) return;
|
|
|
|
var newLimit = _orchestrator.DownloadLimitPerSlot();
|
|
Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
|
|
|
|
foreach (var stream in _activeDownloadStreams.Keys)
|
|
stream.BandwidthLimit = newLimit;
|
|
});
|
|
|
|
Mediator.Subscribe<DisconnectedMessage>(this, _ =>
|
|
{
|
|
Logger.LogDebug("Disconnected from server, clearing in-flight downloads");
|
|
ClearDownload();
|
|
_downloadDeduplicator.CompleteAll(false);
|
|
});
|
|
}
|
|
|
|
public List<DownloadFileTransfer> CurrentDownloads => _activeSessions.Values.SelectMany(s => s.Downloads).ToList();
|
|
public List<FileTransfer> ForbiddenTransfers => _orchestrator.ForbiddenTransfers;
|
|
public bool IsDownloading => !_activeSessions.IsEmpty || _downloadQueues.Any(kvp => !kvp.Value.IsEmpty);
|
|
|
|
public bool IsDownloadingFor(GameObjectHandler? handler)
|
|
{
|
|
if (handler is null)
|
|
return false;
|
|
|
|
return _activeSessions.ContainsKey(handler)
|
|
|| (_downloadQueues.TryGetValue(handler, out var queue) && !queue.IsEmpty);
|
|
}
|
|
|
|
public int GetPendingDownloadCount(GameObjectHandler? handler)
|
|
{
|
|
if (handler is null)
|
|
return 0;
|
|
|
|
var count = 0;
|
|
|
|
if (_activeSessions.TryGetValue(handler, out var session))
|
|
count += session.Downloads.Count;
|
|
|
|
if (_downloadQueues.TryGetValue(handler, out var queue))
|
|
{
|
|
foreach (var request in queue)
|
|
count += request.Session.Downloads.Count;
|
|
}
|
|
|
|
return count;
|
|
}
|
|
|
|
private bool ShouldUseDirectDownloads()
|
|
=> _configService.Current.EnableDirectDownloads && !_disableDirectDownloads;
|
|
|
|
public static void MungeBuffer(Span<byte> buffer)
|
|
{
|
|
for (int i = 0; i < buffer.Length; ++i)
|
|
buffer[i] ^= 42;
|
|
}
|
|
|
|
public void ClearDownload()
|
|
{
|
|
foreach (var session in _activeSessions.Values.ToList())
|
|
ClearDownload(session);
|
|
}
|
|
|
|
private void ClearDownload(DownloadSession session)
|
|
{
|
|
foreach (var hash in session.OwnedDownloads.Keys.ToList())
|
|
{
|
|
CompleteOwnedDownload(session, hash, false);
|
|
}
|
|
|
|
session.Status.Clear();
|
|
session.OwnedDownloads.Clear();
|
|
session.Downloads.Clear();
|
|
|
|
if (session.Handler is not null)
|
|
_activeSessions.TryRemove(session.Handler, out _);
|
|
}
|
|
|
|
public async Task DownloadFiles(GameObjectHandler? gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct, bool skipDownscale = false, bool skipDecimation = false)
|
|
{
|
|
var downloads = await InitiateDownloadList(gameObject, fileReplacementDto, ct).ConfigureAwait(false);
|
|
await DownloadFiles(gameObject, fileReplacementDto, downloads, ct, skipDownscale, skipDecimation).ConfigureAwait(false);
|
|
}
|
|
|
|
public Task DownloadFiles(GameObjectHandler? gameObject, List<FileReplacementData> fileReplacementDto, List<DownloadFileTransfer> downloads, CancellationToken ct, bool skipDownscale = false, bool skipDecimation = false)
|
|
{
|
|
var session = new DownloadSession(gameObject, downloads);
|
|
var completion = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var request = new DownloadRequest(session, fileReplacementDto, ct, skipDownscale, skipDecimation, completion);
|
|
return EnqueueDownloadAsync(request);
|
|
}
|
|
|
|
private Task EnqueueDownloadAsync(DownloadRequest request)
|
|
{
|
|
var handler = request.Session.Handler;
|
|
if (handler is null)
|
|
{
|
|
_ = ExecuteDownloadRequestAsync(request);
|
|
return request.Completion.Task;
|
|
}
|
|
|
|
var queue = _downloadQueues.GetOrAdd(handler, _ => new ConcurrentQueue<DownloadRequest>());
|
|
queue.Enqueue(request);
|
|
|
|
_downloadQueueWaiters.GetOrStart(handler, () => ProcessDownloadQueueAsync(handler));
|
|
|
|
return request.Completion.Task;
|
|
}
|
|
|
|
private async Task ProcessDownloadQueueAsync(GameObjectHandler handler)
|
|
{
|
|
if (!_downloadQueues.TryGetValue(handler, out var queue))
|
|
return;
|
|
|
|
while (true)
|
|
{
|
|
while (queue.TryDequeue(out var request))
|
|
{
|
|
await ExecuteDownloadRequestAsync(request).ConfigureAwait(false);
|
|
}
|
|
|
|
await Task.Yield();
|
|
if (queue.IsEmpty)
|
|
return;
|
|
}
|
|
}
|
|
|
|
private async Task ExecuteDownloadRequestAsync(DownloadRequest request)
|
|
{
|
|
if (request.CancellationToken.IsCancellationRequested)
|
|
{
|
|
request.Completion.TrySetCanceled(request.CancellationToken);
|
|
return;
|
|
}
|
|
|
|
var session = request.Session;
|
|
if (session.Handler is not null)
|
|
{
|
|
_activeSessions[session.Handler] = session;
|
|
}
|
|
|
|
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
|
|
try
|
|
{
|
|
await DownloadFilesInternal(session, request.Replacements, request.CancellationToken, request.SkipDownscale, request.SkipDecimation).ConfigureAwait(false);
|
|
request.Completion.TrySetResult(true);
|
|
}
|
|
catch (OperationCanceledException) when (request.CancellationToken.IsCancellationRequested)
|
|
{
|
|
ClearDownload(session);
|
|
request.Completion.TrySetCanceled(request.CancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ClearDownload(session);
|
|
request.Completion.TrySetException(ex);
|
|
}
|
|
finally
|
|
{
|
|
if (session.Handler is not null)
|
|
{
|
|
Mediator.Publish(new DownloadFinishedMessage(session.Handler));
|
|
_activeSessions.TryRemove(session.Handler, out _);
|
|
}
|
|
|
|
Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
|
|
}
|
|
}
|
|
|
|
protected override void Dispose(bool disposing)
|
|
{
|
|
ClearDownload();
|
|
|
|
foreach (var stream in _activeDownloadStreams.Keys.ToList())
|
|
{
|
|
try { stream.Dispose(); }
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
finally { _activeDownloadStreams.TryRemove(stream, out _); }
|
|
}
|
|
|
|
base.Dispose(disposing);
|
|
}
|
|
|
|
private sealed class DownloadSession
|
|
{
|
|
public DownloadSession(GameObjectHandler? handler, List<DownloadFileTransfer> downloads)
|
|
{
|
|
Handler = handler;
|
|
ObjectName = handler?.Name ?? "Unknown";
|
|
Downloads = downloads;
|
|
}
|
|
|
|
public GameObjectHandler? Handler { get; }
|
|
public string ObjectName { get; }
|
|
public List<DownloadFileTransfer> Downloads { get; }
|
|
public ConcurrentDictionary<string, FileDownloadStatus> Status { get; } = new(StringComparer.Ordinal);
|
|
public ConcurrentDictionary<string, byte> OwnedDownloads { get; } = new(StringComparer.OrdinalIgnoreCase);
|
|
}
|
|
|
|
private sealed record DownloadRequest(
|
|
DownloadSession Session,
|
|
List<FileReplacementData> Replacements,
|
|
CancellationToken CancellationToken,
|
|
bool SkipDownscale,
|
|
bool SkipDecimation,
|
|
TaskCompletionSource<bool> Completion);
|
|
|
|
private sealed class DownloadSlotLease : IAsyncDisposable
|
|
{
|
|
private readonly FileTransferOrchestrator _orch;
|
|
private bool _released;
|
|
|
|
public DownloadSlotLease(FileTransferOrchestrator orch) => _orch = orch;
|
|
|
|
public ValueTask DisposeAsync()
|
|
{
|
|
if (!_released)
|
|
{
|
|
_released = true;
|
|
_orch.ReleaseDownloadSlot();
|
|
}
|
|
return ValueTask.CompletedTask;
|
|
}
|
|
}
|
|
|
|
private async ValueTask<DownloadSlotLease> AcquireSlotAsync(CancellationToken ct)
|
|
{
|
|
await _orchestrator.WaitForDownloadSlotAsync(ct).ConfigureAwait(false);
|
|
return new DownloadSlotLease(_orchestrator);
|
|
}
|
|
|
|
private void SetStatus(DownloadSession session, string key, DownloadStatus status)
|
|
{
|
|
if (session.Status.TryGetValue(key, out var st))
|
|
st.DownloadStatus = status;
|
|
}
|
|
|
|
private void AddTransferredBytes(DownloadSession session, string key, long delta)
|
|
{
|
|
if (session.Status.TryGetValue(key, out var st))
|
|
st.AddTransferredBytes(delta);
|
|
}
|
|
|
|
private void MarkTransferredFiles(DownloadSession session, string key, int files)
|
|
{
|
|
if (session.Status.TryGetValue(key, out var st))
|
|
st.SetTransferredFiles(files);
|
|
}
|
|
|
|
private void CompleteOwnedDownload(DownloadSession session, string hash, bool success)
|
|
{
|
|
if (session.OwnedDownloads.TryRemove(hash, out _))
|
|
{
|
|
_downloadDeduplicator.Complete(hash, success);
|
|
}
|
|
}
|
|
|
|
private static byte MungeByte(int byteOrEof)
|
|
{
|
|
if (byteOrEof == -1) throw new EndOfStreamException();
|
|
return (byte)(byteOrEof ^ 42);
|
|
}
|
|
|
|
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
|
|
{
|
|
List<char> hashName = [];
|
|
List<char> fileLength = [];
|
|
|
|
var separator = (char)MungeByte(fileBlockStream.ReadByte());
|
|
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
|
|
|
|
bool readHash = false;
|
|
while (true)
|
|
{
|
|
int readByte = fileBlockStream.ReadByte();
|
|
if (readByte == -1) throw new EndOfStreamException();
|
|
|
|
var readChar = (char)MungeByte(readByte);
|
|
if (readChar == ':')
|
|
{
|
|
readHash = true;
|
|
continue;
|
|
}
|
|
if (readChar == '#') break;
|
|
|
|
if (!readHash) hashName.Add(readChar);
|
|
else fileLength.Add(readChar);
|
|
}
|
|
|
|
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
|
|
}
|
|
|
|
private static async Task ReadExactlyAsync(FileStream stream, Memory<byte> buffer, CancellationToken ct)
|
|
{
|
|
int offset = 0;
|
|
while (offset < buffer.Length)
|
|
{
|
|
int n = await stream.ReadAsync(buffer.Slice(offset), ct).ConfigureAwait(false);
|
|
if (n == 0) throw new EndOfStreamException();
|
|
offset += n;
|
|
}
|
|
}
|
|
|
|
private static async Task CopyExactlyAsync(Stream source, Stream destination, long bytesToCopy, CancellationToken ct)
|
|
{
|
|
if (bytesToCopy <= 0)
|
|
return;
|
|
|
|
var buffer = ArrayPool<byte>.Shared.Rent(81920);
|
|
try
|
|
{
|
|
long remaining = bytesToCopy;
|
|
while (remaining > 0)
|
|
{
|
|
int read = await source.ReadAsync(buffer.AsMemory(0, (int)Math.Min(buffer.Length, remaining)), ct).ConfigureAwait(false);
|
|
if (read == 0) throw new EndOfStreamException();
|
|
await destination.WriteAsync(buffer.AsMemory(0, read), ct).ConfigureAwait(false);
|
|
remaining -= read;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
ArrayPool<byte>.Shared.Return(buffer);
|
|
}
|
|
}
|
|
|
|
private async Task<long> DecompressWrappedLz4ToFileAsync(string compressedPath, string outputPath, CancellationToken ct)
|
|
{
|
|
await using var input = new FileStream(compressedPath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, useAsync: true);
|
|
byte[] header = new byte[8];
|
|
await ReadExactlyAsync(input, header, ct).ConfigureAwait(false);
|
|
|
|
int outputLength = BinaryPrimitives.ReadInt32LittleEndian(header.AsSpan(0, 4));
|
|
int inputLength = BinaryPrimitives.ReadInt32LittleEndian(header.AsSpan(4, 4));
|
|
|
|
if (outputLength < 0 || inputLength < 0)
|
|
throw new InvalidDataException("LZ4 header contained a negative length.");
|
|
|
|
long remainingLength = input.Length - 8;
|
|
if (inputLength > remainingLength)
|
|
throw new InvalidDataException("LZ4 header length exceeds file size.");
|
|
|
|
var dir = Path.GetDirectoryName(outputPath);
|
|
if (!string.IsNullOrEmpty(dir) && !Directory.Exists(dir))
|
|
Directory.CreateDirectory(dir);
|
|
|
|
if (outputLength == 0)
|
|
{
|
|
await using var emptyStream = new FileStream(outputPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, useAsync: true);
|
|
await emptyStream.FlushAsync(ct).ConfigureAwait(false);
|
|
return 0;
|
|
}
|
|
|
|
if (inputLength >= outputLength)
|
|
{
|
|
await using var outputStream = new FileStream(outputPath, FileMode.Create, FileAccess.Write, FileShare.None, 81920, useAsync: true);
|
|
await CopyExactlyAsync(input, outputStream, inputLength, ct).ConfigureAwait(false);
|
|
await outputStream.FlushAsync(ct).ConfigureAwait(false);
|
|
return outputLength;
|
|
}
|
|
|
|
await using var mappedOutputStream = new FileStream(outputPath, FileMode.Create, FileAccess.ReadWrite, FileShare.None, 4096, FileOptions.SequentialScan);
|
|
mappedOutputStream.SetLength(outputLength);
|
|
|
|
using var inputMap = MemoryMappedFile.CreateFromFile(compressedPath, FileMode.Open, null, 0, MemoryMappedFileAccess.Read);
|
|
using var inputView = inputMap.CreateViewAccessor(8, inputLength, MemoryMappedFileAccess.Read);
|
|
using var outputMap = MemoryMappedFile.CreateFromFile(mappedOutputStream, null, outputLength, MemoryMappedFileAccess.ReadWrite, HandleInheritability.None, leaveOpen: true);
|
|
using var outputView = outputMap.CreateViewAccessor(0, outputLength, MemoryMappedFileAccess.Write);
|
|
|
|
unsafe
|
|
{
|
|
byte* inputPtr = null;
|
|
byte* outputPtr = null;
|
|
try
|
|
{
|
|
inputView.SafeMemoryMappedViewHandle.AcquirePointer(ref inputPtr);
|
|
outputView.SafeMemoryMappedViewHandle.AcquirePointer(ref outputPtr);
|
|
|
|
inputPtr += inputView.PointerOffset;
|
|
outputPtr += outputView.PointerOffset;
|
|
|
|
int decoded = LZ4Codec.Decode(inputPtr, inputLength, outputPtr, outputLength);
|
|
if (decoded != outputLength)
|
|
throw new InvalidDataException($"LZ4 decode length mismatch (expected {outputLength}, got {decoded}).");
|
|
}
|
|
finally
|
|
{
|
|
if (inputPtr != null)
|
|
inputView.SafeMemoryMappedViewHandle.ReleasePointer();
|
|
if (outputPtr != null)
|
|
outputView.SafeMemoryMappedViewHandle.ReleasePointer();
|
|
}
|
|
}
|
|
|
|
outputView.Flush();
|
|
return outputLength;
|
|
}
|
|
|
|
private static Dictionary<string, (string Extension, string GamePath)> BuildReplacementLookup(List<FileReplacementData> fileReplacement)
|
|
{
|
|
var map = new Dictionary<string, (string Extension, string GamePath)>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
foreach (var r in fileReplacement)
|
|
{
|
|
if (r == null || string.IsNullOrWhiteSpace(r.Hash)) continue;
|
|
if (map.ContainsKey(r.Hash)) continue;
|
|
|
|
var gamePath = r.GamePaths?.FirstOrDefault() ?? string.Empty;
|
|
|
|
string ext = "";
|
|
try
|
|
{
|
|
ext = Path.GetExtension(gamePath)?.TrimStart('.') ?? "";
|
|
}
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(ext))
|
|
ext = "bin";
|
|
|
|
map[r.Hash] = (ext, gamePath);
|
|
}
|
|
|
|
return map;
|
|
}
|
|
|
|
private delegate void DownloadDataCallback(Span<byte> data);
|
|
|
|
private async Task DownloadFileThrottled(
|
|
Uri requestUrl,
|
|
string destinationFilename,
|
|
IProgress<long> progress,
|
|
DownloadDataCallback? callback,
|
|
CancellationToken ct,
|
|
bool withToken)
|
|
{
|
|
const int maxRetries = 3;
|
|
int retryCount = 0;
|
|
TimeSpan retryDelay = TimeSpan.FromSeconds(2);
|
|
|
|
HttpResponseMessage? response = null;
|
|
|
|
while (true)
|
|
{
|
|
try
|
|
{
|
|
Logger.LogDebug("Attempt {attempt} - Downloading {requestUrl}", retryCount + 1, requestUrl);
|
|
response = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Get,
|
|
requestUrl,
|
|
ct,
|
|
HttpCompletionOption.ResponseHeadersRead,
|
|
withToken)
|
|
.ConfigureAwait(false);
|
|
|
|
response.EnsureSuccessStatusCode();
|
|
break;
|
|
}
|
|
catch (HttpRequestException ex) when (ex.InnerException is TimeoutException || ex.StatusCode == null)
|
|
{
|
|
response?.Dispose();
|
|
retryCount++;
|
|
|
|
Logger.LogWarning(ex, "Timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries);
|
|
|
|
if (retryCount >= maxRetries || ct.IsCancellationRequested)
|
|
{
|
|
Logger.LogError("Max retries reached or cancelled. Failing download for {requestUrl}", requestUrl);
|
|
throw;
|
|
}
|
|
|
|
await Task.Delay(retryDelay, ct).ConfigureAwait(false);
|
|
}
|
|
catch (TaskCanceledException ex) when (!ct.IsCancellationRequested)
|
|
{
|
|
response?.Dispose();
|
|
retryCount++;
|
|
|
|
Logger.LogWarning(ex, "Cancellation/timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries);
|
|
|
|
if (retryCount >= maxRetries)
|
|
{
|
|
Logger.LogError("Max retries reached for {requestUrl} after TaskCanceledException", requestUrl);
|
|
throw;
|
|
}
|
|
|
|
await Task.Delay(retryDelay, ct).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
|
{
|
|
response?.Dispose();
|
|
throw;
|
|
}
|
|
catch (HttpRequestException ex)
|
|
{
|
|
response?.Dispose();
|
|
Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode);
|
|
|
|
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
|
|
throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
|
|
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ThrottledStream? stream = null;
|
|
|
|
try
|
|
{
|
|
// Determine buffer size based on content length
|
|
var contentLen = response!.Content.Headers.ContentLength ?? 0;
|
|
var bufferSize = contentLen > 1024 * 1024 ? 65536 : 8196;
|
|
var buffer = new byte[bufferSize];
|
|
|
|
// Create destination file stream
|
|
var fileStream = new FileStream(
|
|
destinationFilename,
|
|
FileMode.Create,
|
|
FileAccess.Write,
|
|
FileShare.None,
|
|
bufferSize: 64 * 1024,
|
|
useAsync: true);
|
|
|
|
// Download with throttling
|
|
await using (fileStream.ConfigureAwait(false))
|
|
{
|
|
var limit = _orchestrator.DownloadLimitPerSlot();
|
|
Logger.LogTrace("Starting Download with a speed limit of {limit} to {destination}", limit, destinationFilename);
|
|
|
|
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
|
|
_activeDownloadStreams.TryAdd(stream, 0);
|
|
|
|
while (true)
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
int bytesRead = await stream.ReadAsync(buffer.AsMemory(0, buffer.Length), ct).ConfigureAwait(false);
|
|
if (bytesRead == 0) break;
|
|
|
|
callback?.Invoke(buffer.AsSpan(0, bytesRead));
|
|
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
|
|
|
|
progress.Report(bytesRead);
|
|
}
|
|
|
|
Logger.LogDebug("{requestUrl} downloaded to {destination}", requestUrl, destinationFilename);
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
try
|
|
{
|
|
if (!string.IsNullOrEmpty(destinationFilename) && File.Exists(destinationFilename))
|
|
File.Delete(destinationFilename);
|
|
}
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
if (stream != null)
|
|
{
|
|
_activeDownloadStreams.TryRemove(stream, out _);
|
|
await stream.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
response?.Dispose();
|
|
}
|
|
}
|
|
|
|
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
|
|
{
|
|
while (true)
|
|
{
|
|
downloadCt.ThrowIfCancellationRequested();
|
|
|
|
if (_orchestrator.IsDownloadReady(requestId))
|
|
break;
|
|
|
|
using var resp = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Get,
|
|
LightlessFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
|
|
downloadFileTransfer.Select(t => t.Hash).ToList(),
|
|
downloadCt).ConfigureAwait(false);
|
|
|
|
resp.EnsureSuccessStatusCode();
|
|
|
|
var body = (await resp.Content.ReadAsStringAsync(downloadCt).ConfigureAwait(false)).Trim();
|
|
if (string.Equals(body, "true", StringComparison.OrdinalIgnoreCase) ||
|
|
body.Contains("\"ready\":true", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
break;
|
|
}
|
|
|
|
await Task.Delay(250, downloadCt).ConfigureAwait(false);
|
|
}
|
|
|
|
_orchestrator.ClearDownloadRequest(requestId);
|
|
}
|
|
|
|
private async Task DownloadQueuedBlockFileAsync(
|
|
DownloadSession session,
|
|
string statusKey,
|
|
Guid requestId,
|
|
List<DownloadFileTransfer> transfers,
|
|
string tempPath,
|
|
IProgress<long> progress,
|
|
CancellationToken ct)
|
|
{
|
|
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}",
|
|
requestId, transfers[0].DownloadUri, string.Join(", ", transfers.Select(c => c.Hash)));
|
|
|
|
// Wait for ready WITHOUT holding a slot
|
|
SetStatus(session, statusKey, DownloadStatus.WaitingForQueue);
|
|
await WaitForDownloadReady(transfers, requestId, ct).ConfigureAwait(false);
|
|
|
|
// Hold slot ONLY for the GET
|
|
SetStatus(session, statusKey, DownloadStatus.WaitingForSlot);
|
|
await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false))
|
|
{
|
|
SetStatus(session, statusKey, DownloadStatus.Downloading);
|
|
|
|
var requestUrl = LightlessFiles.CacheGetFullPath(transfers[0].DownloadUri, requestId);
|
|
await DownloadFileThrottled(requestUrl, tempPath, progress, MungeBuffer, ct, withToken: true).ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
private async Task DecompressBlockFileAsync(
|
|
DownloadSession session,
|
|
string downloadStatusKey,
|
|
string blockFilePath,
|
|
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
|
IReadOnlyDictionary<string, long> rawSizeLookup,
|
|
string downloadLabel,
|
|
CancellationToken ct,
|
|
bool skipDownscale,
|
|
bool skipDecimation)
|
|
{
|
|
SetStatus(session, downloadStatusKey, DownloadStatus.Decompressing);
|
|
MarkTransferredFiles(session, downloadStatusKey, 1);
|
|
|
|
try
|
|
{
|
|
var fileBlockStream = File.OpenRead(blockFilePath);
|
|
await using (fileBlockStream.ConfigureAwait(false))
|
|
{
|
|
while (fileBlockStream.Position < fileBlockStream.Length)
|
|
{
|
|
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
|
|
|
|
try
|
|
{
|
|
if (fileLengthBytes < 0 || fileLengthBytes > int.MaxValue)
|
|
throw new InvalidDataException($"Invalid block entry length: {fileLengthBytes}");
|
|
|
|
var len = checked((int)fileLengthBytes);
|
|
|
|
if (!replacementLookup.TryGetValue(fileHash, out var repl))
|
|
{
|
|
Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash);
|
|
CompleteOwnedDownload(session, fileHash, false);
|
|
// still need to skip bytes:
|
|
var skip = checked((int)fileLengthBytes);
|
|
fileBlockStream.Position += skip;
|
|
continue;
|
|
}
|
|
|
|
var filePath = _fileDbManager.GetCacheFilePath(fileHash, repl.Extension);
|
|
Logger.LogTrace("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath);
|
|
|
|
var compressed = new byte[len];
|
|
|
|
await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false);
|
|
|
|
MungeBuffer(compressed);
|
|
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
|
|
|
if (rawSizeLookup.TryGetValue(fileHash, out var expectedRawSize)
|
|
&& expectedRawSize > 0
|
|
&& decompressed.LongLength != expectedRawSize)
|
|
{
|
|
Logger.LogWarning("{dlName}: Decompressed size mismatch for {fileHash} (expected {expected}, got {actual})",
|
|
downloadLabel, fileHash, expectedRawSize, decompressed.LongLength);
|
|
CompleteOwnedDownload(session, fileHash, false);
|
|
continue;
|
|
}
|
|
|
|
await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
|
|
PersistFileToStorage(session, fileHash, filePath, repl.GamePath, skipDownscale, skipDecimation);
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", downloadLabel, fileHash);
|
|
CompleteOwnedDownload(session, fileHash, false);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Logger.LogWarning(e, "{dlName}: Error during decompression", downloadLabel);
|
|
CompleteOwnedDownload(session, fileHash, false);
|
|
}
|
|
}
|
|
}
|
|
|
|
SetStatus(session, downloadStatusKey, DownloadStatus.Completed);
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", downloadLabel);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel);
|
|
}
|
|
}
|
|
|
|
public async Task<List<DownloadFileTransfer>> InitiateDownloadList(
|
|
GameObjectHandler? gameObjectHandler,
|
|
List<FileReplacementData> fileReplacement,
|
|
CancellationToken ct,
|
|
Guid? ownerToken = null)
|
|
{
|
|
_ = ownerToken;
|
|
var objectName = gameObjectHandler?.Name ?? "Unknown";
|
|
Logger.LogDebug("Download start: {id}", objectName);
|
|
|
|
if (fileReplacement == null || fileReplacement.Count == 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: No file replacements provided", objectName);
|
|
return [];
|
|
}
|
|
|
|
var hashes = fileReplacement
|
|
.Where(f => f != null && !string.IsNullOrWhiteSpace(f.Hash))
|
|
.Select(f => f.Hash)
|
|
.Distinct(StringComparer.Ordinal)
|
|
.ToList();
|
|
|
|
if (hashes.Count == 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: No valid hashes to download", objectName);
|
|
return [];
|
|
}
|
|
|
|
var missingHashes = new List<string>(hashes.Count);
|
|
foreach (var hash in hashes)
|
|
{
|
|
if (_fileDbManager.GetFileCacheByHash(hash) is null)
|
|
{
|
|
missingHashes.Add(hash);
|
|
}
|
|
}
|
|
|
|
if (missingHashes.Count == 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: All requested hashes already present in cache", objectName);
|
|
return [];
|
|
}
|
|
|
|
if (missingHashes.Count < hashes.Count)
|
|
{
|
|
Logger.LogDebug("{dlName}: Skipping {count} hashes already present in cache", objectName, hashes.Count - missingHashes.Count);
|
|
}
|
|
|
|
List<DownloadFileDto> downloadFileInfoFromService =
|
|
[
|
|
.. await FilesGetSizes(missingHashes, ct).ConfigureAwait(false),
|
|
];
|
|
|
|
Logger.LogDebug("Files with size 0 or less: {files}",
|
|
string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
|
|
|
|
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
|
{
|
|
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
|
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
|
}
|
|
|
|
var downloads = downloadFileInfoFromService
|
|
.Distinct()
|
|
.Select(d => new DownloadFileTransfer(d))
|
|
.Where(d => d.CanBeTransferred)
|
|
.ToList();
|
|
|
|
return downloads;
|
|
}
|
|
|
|
private sealed record BatchChunk(string HostKey, string StatusKey, List<DownloadFileTransfer> Items);
|
|
|
|
private static IEnumerable<List<T>> ChunkList<T>(List<T> items, int chunkSize)
|
|
{
|
|
for (int i = 0; i < items.Count; i += chunkSize)
|
|
yield return items.GetRange(i, Math.Min(chunkSize, items.Count - i));
|
|
}
|
|
|
|
private async Task DownloadFilesInternal(DownloadSession session, List<FileReplacementData> fileReplacement, CancellationToken ct, bool skipDownscale, bool skipDecimation)
|
|
{
|
|
var objectName = session.ObjectName;
|
|
|
|
// config toggles
|
|
var configAllowsDirect = _configService.Current.EnableDirectDownloads;
|
|
if (configAllowsDirect != _lastConfigDirectDownloadsState)
|
|
{
|
|
_lastConfigDirectDownloadsState = configAllowsDirect;
|
|
if (configAllowsDirect)
|
|
{
|
|
_disableDirectDownloads = false;
|
|
_consecutiveDirectDownloadFailures = 0;
|
|
}
|
|
}
|
|
|
|
var allowDirectDownloads = ShouldUseDirectDownloads();
|
|
var replacementLookup = BuildReplacementLookup(fileReplacement);
|
|
var rawSizeLookup = new Dictionary<string, long>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
foreach (var download in session.Downloads)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(download.Hash))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
if (!rawSizeLookup.TryGetValue(download.Hash, out var existing) || existing <= 0)
|
|
{
|
|
rawSizeLookup[download.Hash] = download.TotalRaw;
|
|
}
|
|
}
|
|
|
|
var directDownloads = new List<DownloadFileTransfer>();
|
|
var batchDownloads = new List<DownloadFileTransfer>();
|
|
|
|
foreach (var download in session.Downloads)
|
|
{
|
|
if (!string.IsNullOrEmpty(download.DirectDownloadUrl) && allowDirectDownloads)
|
|
directDownloads.Add(download);
|
|
else
|
|
batchDownloads.Add(download);
|
|
}
|
|
|
|
session.OwnedDownloads.Clear();
|
|
var waitingHashes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
|
var waitTasks = new List<Task<bool>>();
|
|
var claims = new Dictionary<string, DownloadClaim>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
DownloadClaim GetClaim(string hash)
|
|
{
|
|
if (!claims.TryGetValue(hash, out var claim))
|
|
{
|
|
claim = _downloadDeduplicator.Claim(hash);
|
|
claims[hash] = claim;
|
|
}
|
|
|
|
return claim;
|
|
}
|
|
|
|
List<DownloadFileTransfer> FilterOwned(List<DownloadFileTransfer> downloads)
|
|
{
|
|
if (downloads.Count == 0)
|
|
{
|
|
return downloads;
|
|
}
|
|
|
|
var owned = new List<DownloadFileTransfer>(downloads.Count);
|
|
foreach (var download in downloads)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(download.Hash))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
var claim = GetClaim(download.Hash);
|
|
if (claim.IsOwner)
|
|
{
|
|
session.OwnedDownloads.TryAdd(download.Hash, 0);
|
|
owned.Add(download);
|
|
}
|
|
else if (waitingHashes.Add(download.Hash))
|
|
{
|
|
waitTasks.Add(claim.Completion);
|
|
}
|
|
}
|
|
|
|
return owned;
|
|
}
|
|
|
|
directDownloads = FilterOwned(directDownloads);
|
|
batchDownloads = FilterOwned(batchDownloads);
|
|
|
|
if (waitTasks.Count > 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: {count} files already downloading elsewhere; waiting for completion.", objectName, waitTasks.Count);
|
|
}
|
|
|
|
// Chunk per host so we can fill all slots
|
|
var slots = Math.Max(1, _configService.Current.ParallelDownloads);
|
|
|
|
var batchChunks = batchDownloads
|
|
.GroupBy(f => $"{f.DownloadUri.Host}:{f.DownloadUri.Port}", StringComparer.Ordinal)
|
|
.SelectMany(g =>
|
|
{
|
|
var list = g.ToList();
|
|
var chunkCount = Math.Min(slots, Math.Max(1, list.Count));
|
|
var chunkSize = (int)Math.Ceiling(list.Count / (double)chunkCount);
|
|
|
|
return ChunkList(list, chunkSize)
|
|
.Select((chunk, index) => new BatchChunk(g.Key, $"{g.Key}#{index + 1}", chunk));
|
|
})
|
|
.ToArray();
|
|
|
|
// init statuses
|
|
session.Status.Clear();
|
|
|
|
// direct downloads and batch downloads tracked separately
|
|
foreach (var d in directDownloads)
|
|
{
|
|
session.Status[d.DirectDownloadUrl!] = new FileDownloadStatus
|
|
{
|
|
DownloadStatus = DownloadStatus.WaitingForSlot,
|
|
TotalBytes = d.Total,
|
|
TotalFiles = 1,
|
|
TransferredBytes = 0,
|
|
TransferredFiles = 0
|
|
};
|
|
}
|
|
|
|
foreach (var chunk in batchChunks)
|
|
{
|
|
session.Status[chunk.StatusKey] = new FileDownloadStatus
|
|
{
|
|
DownloadStatus = DownloadStatus.WaitingForQueue,
|
|
TotalBytes = chunk.Items.Sum(x => x.Total),
|
|
TotalFiles = 1,
|
|
TransferredBytes = 0,
|
|
TransferredFiles = 0
|
|
};
|
|
}
|
|
|
|
if (directDownloads.Count > 0 || batchChunks.Length > 0)
|
|
{
|
|
Logger.LogInformation("Downloading {direct} files directly, and {batchtotal} queued in {chunks} chunks.",
|
|
directDownloads.Count, batchDownloads.Count, batchChunks.Length);
|
|
}
|
|
|
|
if (session.Handler is not null)
|
|
Mediator.Publish(new DownloadStartedMessage(session.Handler, session.Status));
|
|
|
|
// work based on cpu count and slots
|
|
var coreCount = Environment.ProcessorCount;
|
|
var baseWorkers = Math.Min(slots, coreCount);
|
|
|
|
// only add buffer if decompression has capacity AND we have cores to spare
|
|
var availableDecompressSlots = _decompressGate.CurrentCount;
|
|
var extraWorkers = (availableDecompressSlots > 0 && coreCount >= 6) ? 2 : 0;
|
|
|
|
// allow some extra workers so downloads can continue while earlier items decompress.
|
|
var workerDop = Math.Clamp(slots * 2, 2, 16);
|
|
var decompressionTasks = new ConcurrentBag<Task>();
|
|
using var decompressionLimiter = new SemaphoreSlim(CalculateDecompressionLimit(slots));
|
|
|
|
// batch downloads
|
|
Task batchTask = batchChunks.Length == 0
|
|
? Task.CompletedTask
|
|
: Parallel.ForEachAsync(batchChunks, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct },
|
|
async (chunk, token) => await ProcessBatchChunkAsync(session, chunk, replacementLookup, rawSizeLookup, decompressionTasks, decompressionLimiter, token, skipDownscale, skipDecimation).ConfigureAwait(false));
|
|
|
|
// direct downloads
|
|
Task directTask = directDownloads.Count == 0
|
|
? Task.CompletedTask
|
|
: Parallel.ForEachAsync(directDownloads, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct },
|
|
async (d, token) => await ProcessDirectAsync(session, d, replacementLookup, rawSizeLookup, decompressionTasks, decompressionLimiter, token, skipDownscale, skipDecimation).ConfigureAwait(false));
|
|
|
|
Task<bool[]> dedupWaitTask = waitTasks.Count == 0
|
|
? Task.FromResult(Array.Empty<bool>())
|
|
: Task.WhenAll(waitTasks);
|
|
|
|
try
|
|
{
|
|
await Task.WhenAll(batchTask, directTask).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
await WaitForAllTasksAsync(decompressionTasks).ConfigureAwait(false);
|
|
}
|
|
|
|
var dedupResults = await dedupWaitTask.ConfigureAwait(false);
|
|
|
|
if (waitTasks.Count > 0 && dedupResults.Any(r => !r))
|
|
{
|
|
Logger.LogWarning("{dlName}: One or more shared downloads failed; missing files may remain.", objectName);
|
|
}
|
|
|
|
Logger.LogDebug("Download end: {id}", objectName);
|
|
ClearDownload(session);
|
|
}
|
|
|
|
private async Task ProcessBatchChunkAsync(
|
|
DownloadSession session,
|
|
BatchChunk chunk,
|
|
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
|
IReadOnlyDictionary<string, long> rawSizeLookup,
|
|
ConcurrentBag<Task> decompressionTasks,
|
|
SemaphoreSlim decompressionLimiter,
|
|
CancellationToken ct,
|
|
bool skipDownscale,
|
|
bool skipDecimation)
|
|
{
|
|
var statusKey = chunk.StatusKey;
|
|
|
|
// enqueue (no slot)
|
|
SetStatus(session, statusKey, DownloadStatus.WaitingForQueue);
|
|
|
|
var requestIdResponse = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Post,
|
|
LightlessFiles.RequestEnqueueFullPath(chunk.Items[0].DownloadUri),
|
|
chunk.Items.Select(c => c.Hash),
|
|
ct).ConfigureAwait(false);
|
|
|
|
var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync(ct).ConfigureAwait(false)).Trim('"'));
|
|
|
|
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
|
|
var fi = new FileInfo(blockFile);
|
|
|
|
var decompressionQueued = false;
|
|
|
|
try
|
|
{
|
|
// download (with slot)
|
|
var progress = CreateInlineProgress(bytes => AddTransferredBytes(session, statusKey, bytes));
|
|
|
|
// Download slot held on get
|
|
await DownloadQueuedBlockFileAsync(session, statusKey, requestId, chunk.Items, blockFile, progress, ct).ConfigureAwait(false);
|
|
|
|
// decompress if file exists
|
|
if (!File.Exists(blockFile))
|
|
{
|
|
Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name);
|
|
SetStatus(session, statusKey, DownloadStatus.Completed);
|
|
return;
|
|
}
|
|
SetStatus(session, statusKey, DownloadStatus.Decompressing);
|
|
|
|
EnqueueLimitedTask(
|
|
decompressionTasks,
|
|
decompressionLimiter,
|
|
async token =>
|
|
{
|
|
try
|
|
{
|
|
await DecompressBlockFileAsync(session, statusKey, blockFile, replacementLookup, rawSizeLookup, fi.Name, token, skipDownscale, skipDecimation)
|
|
.ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
try { File.Delete(blockFile); } catch {}
|
|
foreach (var item in chunk.Items)
|
|
{
|
|
if (!string.IsNullOrWhiteSpace(item.Hash))
|
|
{
|
|
CompleteOwnedDownload(session, item.Hash, false);
|
|
}
|
|
}
|
|
}
|
|
},
|
|
ct);
|
|
decompressionQueued = true;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, requestId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "{dlName}: Error during batch chunk processing", fi.Name);
|
|
ClearDownload(session);
|
|
}
|
|
finally
|
|
{
|
|
if (!decompressionQueued)
|
|
{
|
|
try { File.Delete(blockFile); } catch { /* ignore */ }
|
|
foreach (var item in chunk.Items)
|
|
{
|
|
if (!string.IsNullOrWhiteSpace(item.Hash))
|
|
{
|
|
CompleteOwnedDownload(session, item.Hash, false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ProcessDirectAsync(
|
|
DownloadSession session,
|
|
DownloadFileTransfer directDownload,
|
|
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
|
IReadOnlyDictionary<string, long> rawSizeLookup,
|
|
ConcurrentBag<Task> decompressionTasks,
|
|
SemaphoreSlim decompressionLimiter,
|
|
CancellationToken ct,
|
|
bool skipDownscale,
|
|
bool skipDecimation)
|
|
{
|
|
var progress = CreateInlineProgress(bytes =>
|
|
{
|
|
if (!string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
|
AddTransferredBytes(session, directDownload.DirectDownloadUrl!, bytes);
|
|
});
|
|
|
|
if (!ShouldUseDirectDownloads() || string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
|
{
|
|
try
|
|
{
|
|
await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "{hash}: Error during direct download fallback.", directDownload.Hash);
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
throw;
|
|
}
|
|
return;
|
|
}
|
|
|
|
var tempFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, "bin");
|
|
var decompressionQueued = false;
|
|
|
|
try
|
|
{
|
|
// Download slot held on get
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.WaitingForSlot);
|
|
|
|
await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false))
|
|
{
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Downloading);
|
|
Logger.LogDebug("Beginning direct download of {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl);
|
|
|
|
await DownloadFileThrottled(new Uri(directDownload.DirectDownloadUrl!), tempFilename, progress, callback: null, ct, withToken: false)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
Interlocked.Exchange(ref _consecutiveDirectDownloadFailures, 0);
|
|
|
|
if (!replacementLookup.TryGetValue(directDownload.Hash, out var repl))
|
|
{
|
|
Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash);
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
return;
|
|
}
|
|
|
|
var finalFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, repl.Extension);
|
|
|
|
Logger.LogDebug("Decompressing direct download {hash} from {compressedFile} to {finalFile}",
|
|
directDownload.Hash, tempFilename, finalFilename);
|
|
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Decompressing);
|
|
EnqueueLimitedTask(
|
|
decompressionTasks,
|
|
decompressionLimiter,
|
|
async token =>
|
|
{
|
|
try
|
|
{
|
|
var decompressedLength = await DecompressWrappedLz4ToFileAsync(tempFilename, finalFilename, token).ConfigureAwait(false);
|
|
|
|
if (directDownload.TotalRaw > 0 && decompressedLength != directDownload.TotalRaw)
|
|
{
|
|
throw new InvalidDataException(
|
|
$"{directDownload.Hash}: Decompressed size mismatch (expected {directDownload.TotalRaw}, got {decompressedLength})");
|
|
}
|
|
|
|
_fileCompactor.NotifyFileWritten(finalFilename);
|
|
PersistFileToStorage(session, directDownload.Hash, finalFilename, repl.GamePath, skipDownscale, skipDecimation);
|
|
|
|
MarkTransferredFiles(session, directDownload.DirectDownloadUrl!, 1);
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
|
|
Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
var expectedDirectDownloadFailure = ex is InvalidDataException;
|
|
var failureCount = expectedDirectDownloadFailure ? 0 : Interlocked.Increment(ref _consecutiveDirectDownloadFailures);
|
|
|
|
if (expectedDirectDownloadFailure)
|
|
Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash);
|
|
else
|
|
Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash);
|
|
|
|
try
|
|
{
|
|
await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, token, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false);
|
|
|
|
if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads)
|
|
{
|
|
_disableDirectDownloads = true;
|
|
Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount);
|
|
}
|
|
}
|
|
catch (Exception fallbackEx)
|
|
{
|
|
Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash);
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
|
|
ClearDownload(session);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
try { File.Delete(tempFilename); }
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
}
|
|
},
|
|
ct);
|
|
decompressionQueued = true;
|
|
}
|
|
catch (OperationCanceledException ex)
|
|
{
|
|
if (ct.IsCancellationRequested)
|
|
Logger.LogDebug("{hash}: Direct download cancelled by caller, discarding file.", directDownload.Hash);
|
|
else
|
|
Logger.LogWarning(ex, "{hash}: Direct download cancelled unexpectedly.", directDownload.Hash);
|
|
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
ClearDownload(session);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
var expectedDirectDownloadFailure = ex is InvalidDataException;
|
|
var failureCount = expectedDirectDownloadFailure ? 0 : Interlocked.Increment(ref _consecutiveDirectDownloadFailures);
|
|
|
|
if (expectedDirectDownloadFailure)
|
|
Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash);
|
|
else
|
|
Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash);
|
|
|
|
try
|
|
{
|
|
await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false);
|
|
|
|
if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads)
|
|
{
|
|
_disableDirectDownloads = true;
|
|
Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount);
|
|
}
|
|
}
|
|
catch (Exception fallbackEx)
|
|
{
|
|
Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash);
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
ClearDownload(session);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (!decompressionQueued)
|
|
{
|
|
try { File.Delete(tempFilename); }
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ProcessDirectAsQueuedFallbackAsync(
|
|
DownloadSession session,
|
|
DownloadFileTransfer directDownload,
|
|
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
|
IReadOnlyDictionary<string, long> rawSizeLookup,
|
|
IProgress<long> progress,
|
|
CancellationToken ct,
|
|
bool skipDownscale,
|
|
bool skipDecimation,
|
|
ConcurrentBag<Task> decompressionTasks,
|
|
SemaphoreSlim decompressionLimiter)
|
|
{
|
|
if (string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
|
throw new InvalidOperationException("Direct download fallback requested without a direct download URL.");
|
|
|
|
var statusKey = directDownload.DirectDownloadUrl!;
|
|
|
|
SetStatus(session, statusKey, DownloadStatus.WaitingForQueue);
|
|
|
|
var requestIdResponse = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Post,
|
|
LightlessFiles.RequestEnqueueFullPath(directDownload.DownloadUri),
|
|
new[] { directDownload.Hash },
|
|
ct).ConfigureAwait(false);
|
|
|
|
var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync(ct).ConfigureAwait(false)).Trim('"'));
|
|
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
|
|
var fi = new FileInfo(blockFile);
|
|
var decompressionQueued = false;
|
|
|
|
try
|
|
{
|
|
await DownloadQueuedBlockFileAsync(session, statusKey, requestId, [directDownload], blockFile, progress, ct).ConfigureAwait(false);
|
|
|
|
if (!File.Exists(blockFile))
|
|
{
|
|
Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name);
|
|
SetStatus(session, statusKey, DownloadStatus.Completed);
|
|
return;
|
|
}
|
|
|
|
SetStatus(session, statusKey, DownloadStatus.Decompressing);
|
|
EnqueueLimitedTask(
|
|
decompressionTasks,
|
|
decompressionLimiter,
|
|
async token =>
|
|
{
|
|
try
|
|
{
|
|
await DecompressBlockFileAsync(session, statusKey, blockFile, replacementLookup, rawSizeLookup, $"fallback-{directDownload.Hash}", token, skipDownscale, skipDecimation)
|
|
.ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
try { File.Delete(blockFile); } catch {}
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
}
|
|
},
|
|
ct);
|
|
decompressionQueued = true;
|
|
}
|
|
finally
|
|
{
|
|
if (!decompressionQueued)
|
|
{
|
|
try { File.Delete(blockFile); } catch {}
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)
|
|
{
|
|
if (!_orchestrator.IsInitialized)
|
|
throw new InvalidOperationException("FileTransferManager is not initialized");
|
|
|
|
var response = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Get,
|
|
LightlessFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!),
|
|
hashes,
|
|
ct).ConfigureAwait(false);
|
|
|
|
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
|
|
}
|
|
|
|
private bool PersistFileToStorage(DownloadSession session, string fileHash, string filePath, string gamePath, bool skipDownscale, bool skipDecimation)
|
|
{
|
|
var fi = new FileInfo(filePath);
|
|
var persisted = false;
|
|
|
|
Func<DateTime> RandomDayInThePast()
|
|
{
|
|
DateTime start = new(1995, 1, 1, 1, 1, 1, DateTimeKind.Local);
|
|
Random gen = new();
|
|
int range = (DateTime.Today - start).Days;
|
|
return () => start.AddDays(gen.Next(range));
|
|
}
|
|
|
|
fi.CreationTime = RandomDayInThePast().Invoke();
|
|
fi.LastAccessTime = DateTime.Today;
|
|
fi.LastWriteTime = RandomDayInThePast().Invoke();
|
|
|
|
try
|
|
{
|
|
var entry = _fileDbManager.CreateCacheEntryWithKnownHash(filePath, fileHash);
|
|
if (entry != null && string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
persisted = true;
|
|
}
|
|
|
|
if (!skipDownscale && _textureDownscaleService.ShouldScheduleDownscale(filePath))
|
|
{
|
|
_textureDownscaleService.ScheduleDownscale(
|
|
fileHash,
|
|
filePath,
|
|
() => _textureMetadataHelper.DetermineMapKind(gamePath, filePath));
|
|
}
|
|
|
|
if (!skipDecimation && _modelDecimationService.ShouldScheduleDecimation(fileHash, filePath, gamePath))
|
|
{
|
|
_modelDecimationService.ScheduleDecimation(fileHash, filePath, gamePath);
|
|
}
|
|
|
|
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file",
|
|
entry.Hash, fileHash);
|
|
|
|
File.Delete(filePath);
|
|
_fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath);
|
|
persisted = false;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogWarning(ex, "Error creating cache entry");
|
|
}
|
|
finally
|
|
{
|
|
CompleteOwnedDownload(session, fileHash, persisted);
|
|
}
|
|
|
|
return persisted;
|
|
}
|
|
|
|
private static int CalculateDecompressionLimit(int downloadSlots)
|
|
{
|
|
var cpuBound = Math.Max(1, Math.Min(Environment.ProcessorCount, 4));
|
|
return Math.Clamp(downloadSlots, 1, cpuBound);
|
|
}
|
|
|
|
private static Task EnqueueLimitedTask(
|
|
ConcurrentBag<Task> tasks,
|
|
SemaphoreSlim limiter,
|
|
Func<CancellationToken, Task> work,
|
|
CancellationToken ct)
|
|
{
|
|
var task = Task.Run(async () =>
|
|
{
|
|
await limiter.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
await work(ct).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
limiter.Release();
|
|
}
|
|
}, ct);
|
|
|
|
tasks.Add(task);
|
|
return task;
|
|
}
|
|
|
|
private static async Task WaitForAllTasksAsync(ConcurrentBag<Task> tasks)
|
|
{
|
|
while (true)
|
|
{
|
|
var snapshot = tasks.ToArray();
|
|
if (snapshot.Length == 0)
|
|
return;
|
|
|
|
await Task.WhenAll(snapshot).ConfigureAwait(false);
|
|
|
|
if (tasks.Count == snapshot.Length)
|
|
return;
|
|
}
|
|
}
|
|
|
|
private static IProgress<long> CreateInlineProgress(Action<long> callback) => new InlineProgress(callback);
|
|
|
|
private sealed class InlineProgress : IProgress<long>
|
|
{
|
|
private readonly Action<long> _callback;
|
|
public InlineProgress(Action<long> callback) => _callback = callback ?? throw new ArgumentNullException(nameof(callback));
|
|
public void Report(long value) => _callback(value);
|
|
}
|
|
}
|