976 lines
39 KiB
C#
976 lines
39 KiB
C#
using K4os.Compression.LZ4.Legacy;
|
|
using LightlessSync.API.Data;
|
|
using LightlessSync.API.Dto.Files;
|
|
using LightlessSync.API.Routes;
|
|
using LightlessSync.FileCache;
|
|
using LightlessSync.PlayerData.Handlers;
|
|
using LightlessSync.Services.Mediator;
|
|
using LightlessSync.Services.TextureCompression;
|
|
using LightlessSync.WebAPI.Files.Models;
|
|
using Microsoft.Extensions.Logging;
|
|
using System.Collections.Concurrent;
|
|
using System.Net;
|
|
using System.Net.Http.Json;
|
|
using LightlessSync.LightlessConfiguration;
|
|
using LightlessSync.Services.PairProcessing;
|
|
|
|
namespace LightlessSync.WebAPI.Files;
|
|
|
|
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|
{
|
|
private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
|
|
private readonly FileCompactor _fileCompactor;
|
|
private readonly FileCacheManager _fileDbManager;
|
|
private readonly FileTransferOrchestrator _orchestrator;
|
|
private readonly PairProcessingLimiter _pairProcessingLimiter;
|
|
private readonly LightlessConfigService _configService;
|
|
private readonly TextureDownscaleService _textureDownscaleService;
|
|
private readonly TextureMetadataHelper _textureMetadataHelper;
|
|
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
|
|
private static readonly TimeSpan DownloadStallTimeout = TimeSpan.FromSeconds(30);
|
|
private volatile bool _disableDirectDownloads;
|
|
private int _consecutiveDirectDownloadFailures;
|
|
private bool _lastConfigDirectDownloadsState;
|
|
|
|
public FileDownloadManager(
|
|
ILogger<FileDownloadManager> logger,
|
|
LightlessMediator mediator,
|
|
FileTransferOrchestrator orchestrator,
|
|
FileCacheManager fileCacheManager,
|
|
FileCompactor fileCompactor,
|
|
PairProcessingLimiter pairProcessingLimiter,
|
|
LightlessConfigService configService,
|
|
TextureDownscaleService textureDownscaleService, TextureMetadataHelper textureMetadataHelper) : base(logger, mediator)
|
|
{
|
|
_downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
|
|
_orchestrator = orchestrator;
|
|
_fileDbManager = fileCacheManager;
|
|
_fileCompactor = fileCompactor;
|
|
_pairProcessingLimiter = pairProcessingLimiter;
|
|
_configService = configService;
|
|
_textureDownscaleService = textureDownscaleService;
|
|
_textureMetadataHelper = textureMetadataHelper;
|
|
_activeDownloadStreams = new();
|
|
_lastConfigDirectDownloadsState = _configService.Current.EnableDirectDownloads;
|
|
|
|
Mediator.Subscribe<DownloadLimitChangedMessage>(this, (msg) =>
|
|
{
|
|
if (_activeDownloadStreams.IsEmpty) return;
|
|
var newLimit = _orchestrator.DownloadLimitPerSlot();
|
|
Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
|
|
foreach (var stream in _activeDownloadStreams.Keys)
|
|
{
|
|
stream.BandwidthLimit = newLimit;
|
|
}
|
|
});
|
|
}
|
|
|
|
public List<DownloadFileTransfer> CurrentDownloads { get; private set; } = [];
|
|
|
|
public List<FileTransfer> ForbiddenTransfers => _orchestrator.ForbiddenTransfers;
|
|
public Guid? CurrentOwnerToken { get; private set; }
|
|
|
|
public bool IsDownloading => CurrentDownloads.Any();
|
|
|
|
private bool ShouldUseDirectDownloads()
|
|
{
|
|
return _configService.Current.EnableDirectDownloads && !_disableDirectDownloads;
|
|
}
|
|
|
|
public static void MungeBuffer(Span<byte> buffer)
|
|
{
|
|
for (int i = 0; i < buffer.Length; ++i)
|
|
{
|
|
buffer[i] ^= 42;
|
|
}
|
|
}
|
|
|
|
public void ClearDownload()
|
|
{
|
|
CurrentDownloads.Clear();
|
|
_downloadStatus.Clear();
|
|
CurrentOwnerToken = null;
|
|
}
|
|
|
|
public async Task DownloadFiles(GameObjectHandler? gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct, bool skipDownscale = false)
|
|
{
|
|
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
|
|
try
|
|
{
|
|
await DownloadFilesInternal(gameObject, fileReplacementDto, ct, skipDownscale).ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
ClearDownload();
|
|
}
|
|
finally
|
|
{
|
|
if (gameObject is not null)
|
|
{
|
|
Mediator.Publish(new DownloadFinishedMessage(gameObject));
|
|
}
|
|
Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
|
|
}
|
|
}
|
|
|
|
protected override void Dispose(bool disposing)
|
|
{
|
|
ClearDownload();
|
|
foreach (var stream in _activeDownloadStreams.Keys.ToList())
|
|
{
|
|
try
|
|
{
|
|
stream.Dispose();
|
|
}
|
|
catch
|
|
{
|
|
// do nothing
|
|
//
|
|
}
|
|
finally
|
|
{
|
|
_activeDownloadStreams.TryRemove(stream, out _);
|
|
}
|
|
}
|
|
base.Dispose(disposing);
|
|
}
|
|
|
|
private static byte MungeByte(int byteOrEof)
|
|
{
|
|
if (byteOrEof == -1)
|
|
{
|
|
throw new EndOfStreamException();
|
|
}
|
|
|
|
return (byte)(byteOrEof ^ 42);
|
|
}
|
|
|
|
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
|
|
{
|
|
List<char> hashName = [];
|
|
List<char> fileLength = [];
|
|
var separator = (char)MungeByte(fileBlockStream.ReadByte());
|
|
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
|
|
|
|
bool readHash = false;
|
|
while (true)
|
|
{
|
|
int readByte = fileBlockStream.ReadByte();
|
|
if (readByte == -1)
|
|
throw new EndOfStreamException();
|
|
|
|
var readChar = (char)MungeByte(readByte);
|
|
if (readChar == ':')
|
|
{
|
|
readHash = true;
|
|
continue;
|
|
}
|
|
if (readChar == '#') break;
|
|
if (!readHash) hashName.Add(readChar);
|
|
else fileLength.Add(readChar);
|
|
}
|
|
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
|
|
}
|
|
|
|
private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List<DownloadFileTransfer> fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
|
|
{
|
|
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList()));
|
|
|
|
await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false);
|
|
|
|
if (_downloadStatus.TryGetValue(downloadGroup, out var downloadStatus))
|
|
{
|
|
downloadStatus.DownloadStatus = DownloadStatus.Downloading;
|
|
}
|
|
else
|
|
{
|
|
Logger.LogWarning("Download status missing for {group} when starting download", downloadGroup);
|
|
}
|
|
|
|
var requestUrl = LightlessFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId);
|
|
|
|
await DownloadFileThrottled(requestUrl, tempPath, progress, MungeBuffer, ct, withToken: true).ConfigureAwait(false);
|
|
}
|
|
|
|
private delegate void DownloadDataCallback(Span<byte> data);
|
|
|
|
private async Task DownloadFileThrottled(Uri requestUrl, string destinationFilename, IProgress<long> progress, DownloadDataCallback? callback, CancellationToken ct, bool withToken)
|
|
{
|
|
const int maxRetries = 3;
|
|
int retryCount = 0;
|
|
TimeSpan retryDelay = TimeSpan.FromSeconds(2);
|
|
HttpResponseMessage? response = null;
|
|
|
|
while (true)
|
|
{
|
|
try
|
|
{
|
|
Logger.LogDebug("Attempt {attempt} - Downloading {requestUrl}", retryCount + 1, requestUrl);
|
|
response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead, withToken).ConfigureAwait(false);
|
|
response.EnsureSuccessStatusCode();
|
|
break;
|
|
}
|
|
catch (HttpRequestException ex) when (ex.InnerException is TimeoutException || ex.StatusCode == null)
|
|
{
|
|
response?.Dispose();
|
|
retryCount++;
|
|
|
|
Logger.LogWarning(ex, "Timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries);
|
|
|
|
if (retryCount >= maxRetries || ct.IsCancellationRequested)
|
|
{
|
|
Logger.LogError("Max retries reached or cancelled. Failing download for {requestUrl}", requestUrl);
|
|
throw;
|
|
}
|
|
|
|
await Task.Delay(retryDelay, ct).ConfigureAwait(false);
|
|
}
|
|
catch (TaskCanceledException ex) when (!ct.IsCancellationRequested)
|
|
{
|
|
response?.Dispose();
|
|
retryCount++;
|
|
|
|
Logger.LogWarning(ex, "Cancellation/timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries);
|
|
|
|
if (retryCount >= maxRetries)
|
|
{
|
|
Logger.LogError("Max retries reached for {requestUrl} after TaskCanceledException", requestUrl);
|
|
throw;
|
|
}
|
|
|
|
await Task.Delay(retryDelay, ct).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
|
{
|
|
response?.Dispose();
|
|
throw;
|
|
}
|
|
catch (HttpRequestException ex)
|
|
{
|
|
response?.Dispose();
|
|
Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode);
|
|
|
|
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
|
|
{
|
|
throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
|
|
}
|
|
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ThrottledStream? stream = null;
|
|
FileStream? fileStream = null;
|
|
|
|
try
|
|
{
|
|
fileStream = File.Create(destinationFilename);
|
|
await using (fileStream.ConfigureAwait(false))
|
|
{
|
|
var bufferSize = response!.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196;
|
|
var buffer = new byte[bufferSize];
|
|
|
|
var limit = _orchestrator.DownloadLimitPerSlot();
|
|
Logger.LogTrace("Starting Download with a speed limit of {limit} to {destination}", limit, destinationFilename);
|
|
|
|
stream = new(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
|
|
_activeDownloadStreams.TryAdd(stream, 0);
|
|
|
|
while (true)
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
int bytesRead;
|
|
try
|
|
{
|
|
using var readCancellation = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
var readTask = stream.ReadAsync(buffer.AsMemory(0, buffer.Length), readCancellation.Token).AsTask();
|
|
while (!readTask.IsCompleted)
|
|
{
|
|
var completedTask = await Task.WhenAny(readTask, Task.Delay(DownloadStallTimeout)).ConfigureAwait(false);
|
|
if (completedTask == readTask)
|
|
{
|
|
break;
|
|
}
|
|
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
var snapshot = _pairProcessingLimiter.GetSnapshot();
|
|
if (snapshot.Waiting > 0)
|
|
{
|
|
readCancellation.Cancel();
|
|
try
|
|
{
|
|
await readTask.ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// expected when cancelling the read due to timeout
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogDebug(ex, "Error finishing read task after stall detection for {requestUrl}", requestUrl);
|
|
}
|
|
|
|
throw new TimeoutException($"No data received for {DownloadStallTimeout.TotalSeconds} seconds while downloading {requestUrl} (waiting: {snapshot.Waiting})");
|
|
}
|
|
|
|
Logger.LogTrace("Download stalled for {requestUrl} but no queued pairs, continuing to wait", requestUrl);
|
|
}
|
|
|
|
bytesRead = await readTask.ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException ex)
|
|
{
|
|
Logger.LogWarning(ex, "Request got cancelled : {url}", requestUrl);
|
|
throw;
|
|
}
|
|
|
|
if (bytesRead == 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
callback?.Invoke(buffer.AsSpan(0, bytesRead));
|
|
|
|
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
|
|
|
|
progress.Report(bytesRead);
|
|
}
|
|
|
|
Logger.LogDebug("{requestUrl} downloaded to {destination}", requestUrl, destinationFilename);
|
|
}
|
|
}
|
|
catch (TimeoutException ex)
|
|
{
|
|
Logger.LogWarning(ex, "Detected stalled download for {requestUrl}, aborting transfer", requestUrl);
|
|
throw;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception)
|
|
{
|
|
try
|
|
{
|
|
fileStream?.Close();
|
|
|
|
if (!string.IsNullOrEmpty(destinationFilename) && File.Exists(destinationFilename))
|
|
{
|
|
File.Delete(destinationFilename);
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
// ignore cleanup errors
|
|
}
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
if (stream != null)
|
|
{
|
|
_activeDownloadStreams.TryRemove(stream, out _);
|
|
await stream.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
response?.Dispose();
|
|
}
|
|
}
|
|
|
|
private async Task DecompressBlockFileAsync(string downloadStatusKey, string blockFilePath, List<FileReplacementData> fileReplacement, string downloadLabel, bool skipDownscale)
|
|
{
|
|
if (_downloadStatus.TryGetValue(downloadStatusKey, out var status))
|
|
{
|
|
status.TransferredFiles = 1;
|
|
status.DownloadStatus = DownloadStatus.Decompressing;
|
|
}
|
|
|
|
FileStream? fileBlockStream = null;
|
|
try
|
|
{
|
|
fileBlockStream = File.OpenRead(blockFilePath);
|
|
while (fileBlockStream.Position < fileBlockStream.Length)
|
|
{
|
|
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
|
|
|
|
try
|
|
{
|
|
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
|
|
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
|
|
Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath);
|
|
|
|
byte[] compressedFileContent = new byte[fileLengthBytes];
|
|
var readBytes = await fileBlockStream.ReadAsync(compressedFileContent, CancellationToken.None).ConfigureAwait(false);
|
|
if (readBytes != fileLengthBytes)
|
|
{
|
|
throw new EndOfStreamException();
|
|
}
|
|
MungeBuffer(compressedFileContent);
|
|
|
|
var decompressedFile = LZ4Wrapper.Unwrap(compressedFileContent);
|
|
await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, CancellationToken.None).ConfigureAwait(false);
|
|
|
|
var gamePath = fileReplacement.FirstOrDefault(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase))?.GamePaths.FirstOrDefault() ?? string.Empty;
|
|
PersistFileToStorage(fileHash, filePath, gamePath, skipDownscale);
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", downloadLabel, fileHash);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Logger.LogWarning(e, "{dlName}: Error during decompression", downloadLabel);
|
|
}
|
|
}
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", downloadLabel);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel);
|
|
}
|
|
finally
|
|
{
|
|
if (fileBlockStream != null)
|
|
await fileBlockStream.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
private async Task PerformDirectDownloadFallbackAsync(DownloadFileTransfer directDownload, List<FileReplacementData> fileReplacement,
|
|
IProgress<long> progress, CancellationToken token, bool skipDownscale, bool slotAlreadyAcquired)
|
|
{
|
|
if (string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
|
{
|
|
throw new InvalidOperationException("Direct download fallback requested without a direct download URL.");
|
|
}
|
|
|
|
var downloadKey = directDownload.DirectDownloadUrl!;
|
|
bool slotAcquiredHere = false;
|
|
string? blockFile = null;
|
|
|
|
try
|
|
{
|
|
if (!slotAlreadyAcquired)
|
|
{
|
|
if (_downloadStatus.TryGetValue(downloadKey, out var tracker))
|
|
{
|
|
tracker.DownloadStatus = DownloadStatus.WaitingForSlot;
|
|
}
|
|
|
|
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
|
|
slotAcquiredHere = true;
|
|
}
|
|
|
|
if (_downloadStatus.TryGetValue(downloadKey, out var queueTracker))
|
|
{
|
|
queueTracker.DownloadStatus = DownloadStatus.WaitingForQueue;
|
|
}
|
|
|
|
var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, LightlessFiles.RequestEnqueueFullPath(directDownload.DownloadUri),
|
|
new[] { directDownload.Hash }, token).ConfigureAwait(false);
|
|
var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"'));
|
|
|
|
blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
|
|
|
|
await DownloadAndMungeFileHttpClient(downloadKey, requestId, [directDownload], blockFile, progress, token).ConfigureAwait(false);
|
|
|
|
if (!File.Exists(blockFile))
|
|
{
|
|
throw new FileNotFoundException("Block file missing after direct download fallback.", blockFile);
|
|
}
|
|
|
|
await DecompressBlockFileAsync(downloadKey, blockFile, fileReplacement, $"fallback-{directDownload.Hash}", skipDownscale).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
if (slotAcquiredHere)
|
|
{
|
|
_orchestrator.ReleaseDownloadSlot();
|
|
}
|
|
|
|
if (!string.IsNullOrEmpty(blockFile))
|
|
{
|
|
try
|
|
{
|
|
File.Delete(blockFile);
|
|
}
|
|
catch
|
|
{
|
|
// ignore cleanup errors
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public async Task<List<DownloadFileTransfer>> InitiateDownloadList(GameObjectHandler? gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct, Guid? ownerToken = null)
|
|
{
|
|
CurrentOwnerToken = ownerToken;
|
|
var objectName = gameObjectHandler?.Name ?? "Unknown";
|
|
Logger.LogDebug("Download start: {id}", objectName);
|
|
|
|
if (fileReplacement == null || fileReplacement.Count == 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: No file replacements provided", objectName);
|
|
CurrentDownloads = [];
|
|
return CurrentDownloads;
|
|
}
|
|
|
|
var hashes = fileReplacement.Where(f => f != null && !string.IsNullOrWhiteSpace(f.Hash)).Select(f => f.Hash).Distinct(StringComparer.Ordinal).ToList();
|
|
|
|
if (hashes.Count == 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: No valid hashes to download", objectName);
|
|
CurrentDownloads = [];
|
|
return CurrentDownloads;
|
|
}
|
|
|
|
List<DownloadFileDto> downloadFileInfoFromService =
|
|
[
|
|
.. await FilesGetSizes(hashes, ct).ConfigureAwait(false),
|
|
];
|
|
|
|
Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
|
|
|
|
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
|
{
|
|
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
|
{
|
|
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
|
}
|
|
}
|
|
|
|
CurrentDownloads = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d))
|
|
.Where(d => d.CanBeTransferred).ToList();
|
|
|
|
return CurrentDownloads;
|
|
}
|
|
|
|
private async Task DownloadFilesInternal(GameObjectHandler? gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct, bool skipDownscale)
|
|
{
|
|
var objectName = gameObjectHandler?.Name ?? "Unknown";
|
|
|
|
var configAllowsDirect = _configService.Current.EnableDirectDownloads;
|
|
if (configAllowsDirect != _lastConfigDirectDownloadsState)
|
|
{
|
|
_lastConfigDirectDownloadsState = configAllowsDirect;
|
|
if (configAllowsDirect)
|
|
{
|
|
_disableDirectDownloads = false;
|
|
_consecutiveDirectDownloadFailures = 0;
|
|
}
|
|
}
|
|
|
|
var allowDirectDownloads = ShouldUseDirectDownloads();
|
|
|
|
var directDownloads = new List<DownloadFileTransfer>();
|
|
var batchDownloads = new List<DownloadFileTransfer>();
|
|
|
|
foreach (var download in CurrentDownloads)
|
|
{
|
|
if (!string.IsNullOrEmpty(download.DirectDownloadUrl) && allowDirectDownloads)
|
|
{
|
|
directDownloads.Add(download);
|
|
}
|
|
else
|
|
{
|
|
batchDownloads.Add(download);
|
|
}
|
|
}
|
|
|
|
var downloadBatches = batchDownloads.GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal).ToArray();
|
|
|
|
foreach (var directDownload in directDownloads)
|
|
{
|
|
_downloadStatus[directDownload.DirectDownloadUrl!] = new FileDownloadStatus()
|
|
{
|
|
DownloadStatus = DownloadStatus.Initializing,
|
|
TotalBytes = directDownload.Total,
|
|
TotalFiles = 1,
|
|
TransferredBytes = 0,
|
|
TransferredFiles = 0
|
|
};
|
|
}
|
|
|
|
foreach (var downloadBatch in downloadBatches)
|
|
{
|
|
_downloadStatus[downloadBatch.Key] = new FileDownloadStatus()
|
|
{
|
|
DownloadStatus = DownloadStatus.Initializing,
|
|
TotalBytes = downloadBatch.Sum(c => c.Total),
|
|
TotalFiles = 1,
|
|
TransferredBytes = 0,
|
|
TransferredFiles = 0
|
|
};
|
|
}
|
|
|
|
if (directDownloads.Count > 0 || downloadBatches.Length > 0)
|
|
{
|
|
Logger.LogWarning("Downloading {direct} files directly, and {batchtotal} in {batches} batches.", directDownloads.Count, batchDownloads.Count, downloadBatches.Length);
|
|
}
|
|
|
|
if (gameObjectHandler is not null)
|
|
{
|
|
Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
|
|
}
|
|
|
|
Task batchDownloadsTask = downloadBatches.Length == 0 ? Task.CompletedTask : Parallel.ForEachAsync(downloadBatches, new ParallelOptions()
|
|
{
|
|
MaxDegreeOfParallelism = downloadBatches.Length,
|
|
CancellationToken = ct,
|
|
},
|
|
async (fileGroup, token) =>
|
|
{
|
|
var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, LightlessFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri),
|
|
fileGroup.Select(c => c.Hash), token).ConfigureAwait(false);
|
|
Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri,
|
|
await requestIdResponse.Content.ReadAsStringAsync(token).ConfigureAwait(false));
|
|
|
|
Guid requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"'));
|
|
|
|
Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
|
|
|
|
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
|
|
FileInfo fi = new(blockFile);
|
|
try
|
|
{
|
|
if (!_downloadStatus.TryGetValue(fileGroup.Key, out var downloadStatus))
|
|
{
|
|
Logger.LogWarning("Download status missing for {group}, aborting", fileGroup.Key);
|
|
return;
|
|
}
|
|
|
|
downloadStatus.DownloadStatus = DownloadStatus.WaitingForSlot;
|
|
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
|
|
downloadStatus.DownloadStatus = DownloadStatus.WaitingForQueue;
|
|
var progress = CreateInlineProgress((bytesDownloaded) =>
|
|
{
|
|
try
|
|
{
|
|
if (_downloadStatus.TryGetValue(fileGroup.Key, out FileDownloadStatus? value))
|
|
{
|
|
value.TransferredBytes += bytesDownloaded;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogWarning(ex, "Could not set download progress");
|
|
}
|
|
});
|
|
await DownloadAndMungeFileHttpClient(fileGroup.Key, requestId, [.. fileGroup], blockFile, progress, token).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, objectName);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_orchestrator.ReleaseDownloadSlot();
|
|
File.Delete(blockFile);
|
|
Logger.LogError(ex, "{dlName}: Error during download of {id}", fi.Name, requestId);
|
|
ClearDownload();
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
if (!File.Exists(blockFile))
|
|
{
|
|
Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name);
|
|
return;
|
|
}
|
|
|
|
await DecompressBlockFileAsync(fileGroup.Key, blockFile, fileReplacement, fi.Name, skipDownscale).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
_orchestrator.ReleaseDownloadSlot();
|
|
File.Delete(blockFile);
|
|
}
|
|
});
|
|
|
|
Task directDownloadsTask = directDownloads.Count == 0 ? Task.CompletedTask : Parallel.ForEachAsync(directDownloads, new ParallelOptions()
|
|
{
|
|
MaxDegreeOfParallelism = directDownloads.Count,
|
|
CancellationToken = ct,
|
|
},
|
|
async (directDownload, token) =>
|
|
{
|
|
if (!_downloadStatus.TryGetValue(directDownload.DirectDownloadUrl!, out var downloadTracker))
|
|
{
|
|
Logger.LogWarning("Download status missing for direct URL {url}", directDownload.DirectDownloadUrl);
|
|
return;
|
|
}
|
|
|
|
var progress = CreateInlineProgress((bytesDownloaded) =>
|
|
{
|
|
try
|
|
{
|
|
if (_downloadStatus.TryGetValue(directDownload.DirectDownloadUrl!, out FileDownloadStatus? value))
|
|
{
|
|
value.TransferredBytes += bytesDownloaded;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogWarning(ex, "Could not set download progress");
|
|
}
|
|
});
|
|
|
|
if (!ShouldUseDirectDownloads())
|
|
{
|
|
await PerformDirectDownloadFallbackAsync(directDownload, fileReplacement, progress, token, skipDownscale, slotAlreadyAcquired: false).ConfigureAwait(false);
|
|
return;
|
|
}
|
|
|
|
var tempFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, "bin");
|
|
var slotAcquired = false;
|
|
|
|
try
|
|
{
|
|
downloadTracker.DownloadStatus = DownloadStatus.WaitingForSlot;
|
|
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
|
|
slotAcquired = true;
|
|
|
|
downloadTracker.DownloadStatus = DownloadStatus.Downloading;
|
|
Logger.LogDebug("Beginning direct download of {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl);
|
|
await DownloadFileThrottled(new Uri(directDownload.DirectDownloadUrl!), tempFilename, progress, null, token, withToken: false).ConfigureAwait(false);
|
|
|
|
Interlocked.Exchange(ref _consecutiveDirectDownloadFailures, 0);
|
|
|
|
downloadTracker.DownloadStatus = DownloadStatus.Decompressing;
|
|
|
|
try
|
|
{
|
|
var replacement = fileReplacement.FirstOrDefault(f => string.Equals(f.Hash, directDownload.Hash, StringComparison.OrdinalIgnoreCase));
|
|
if (replacement == null || replacement.GamePaths.Length == 0)
|
|
{
|
|
Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash);
|
|
return;
|
|
}
|
|
|
|
var fileExtension = replacement.GamePaths[0].Split(".")[^1];
|
|
var finalFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, fileExtension);
|
|
Logger.LogDebug("Decompressing direct download {hash} from {compressedFile} to {finalFile}", directDownload.Hash, tempFilename, finalFilename);
|
|
byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename).ConfigureAwait(false);
|
|
var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes);
|
|
await _fileCompactor.WriteAllBytesAsync(finalFilename, decompressedBytes, CancellationToken.None).ConfigureAwait(false);
|
|
PersistFileToStorage(directDownload.Hash, finalFilename, replacement.GamePaths[0], skipDownscale);
|
|
|
|
downloadTracker.TransferredFiles = 1;
|
|
Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "Exception downloading {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl);
|
|
}
|
|
}
|
|
catch (OperationCanceledException ex)
|
|
{
|
|
if (token.IsCancellationRequested)
|
|
{
|
|
Logger.LogDebug("{hash}: Direct download cancelled by caller, discarding file.", directDownload.Hash);
|
|
}
|
|
else
|
|
{
|
|
Logger.LogWarning(ex, "{hash}: Direct download cancelled unexpectedly.", directDownload.Hash);
|
|
}
|
|
|
|
ClearDownload();
|
|
return;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
var expectedDirectDownloadFailure = ex is InvalidDataException;
|
|
var failureCount = 0;
|
|
|
|
if (expectedDirectDownloadFailure)
|
|
{
|
|
Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash);
|
|
}
|
|
else
|
|
{
|
|
failureCount = Interlocked.Increment(ref _consecutiveDirectDownloadFailures);
|
|
Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash);
|
|
}
|
|
|
|
try
|
|
{
|
|
downloadTracker.DownloadStatus = DownloadStatus.WaitingForQueue;
|
|
await PerformDirectDownloadFallbackAsync(directDownload, fileReplacement, progress, token, skipDownscale, slotAcquired).ConfigureAwait(false);
|
|
|
|
if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads)
|
|
{
|
|
_disableDirectDownloads = true;
|
|
Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount);
|
|
}
|
|
}
|
|
catch (Exception fallbackEx)
|
|
{
|
|
if (slotAcquired)
|
|
{
|
|
_orchestrator.ReleaseDownloadSlot();
|
|
slotAcquired = false;
|
|
}
|
|
|
|
Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash);
|
|
ClearDownload();
|
|
return;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (slotAcquired)
|
|
{
|
|
_orchestrator.ReleaseDownloadSlot();
|
|
}
|
|
|
|
try
|
|
{
|
|
File.Delete(tempFilename);
|
|
}
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
}
|
|
});
|
|
|
|
await Task.WhenAll(batchDownloadsTask, directDownloadsTask).ConfigureAwait(false);
|
|
|
|
Logger.LogDebug("Download end: {id}", objectName);
|
|
|
|
ClearDownload();
|
|
}
|
|
|
|
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)
|
|
{
|
|
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
|
|
var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false);
|
|
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
|
|
}
|
|
|
|
private void PersistFileToStorage(string fileHash, string filePath, string gamePath, bool skipDownscale)
|
|
{
|
|
var fi = new FileInfo(filePath);
|
|
Func<DateTime> RandomDayInThePast()
|
|
{
|
|
DateTime start = new(1995, 1, 1, 1, 1, 1, DateTimeKind.Local);
|
|
Random gen = new();
|
|
int range = (DateTime.Today - start).Days;
|
|
return () => start.AddDays(gen.Next(range));
|
|
}
|
|
|
|
fi.CreationTime = RandomDayInThePast().Invoke();
|
|
fi.LastAccessTime = DateTime.Today;
|
|
fi.LastWriteTime = RandomDayInThePast().Invoke();
|
|
try
|
|
{
|
|
var entry = _fileDbManager.CreateCacheEntry(filePath);
|
|
var mapKind = _textureMetadataHelper.DetermineMapKind(gamePath, filePath);
|
|
if (!skipDownscale)
|
|
{
|
|
_textureDownscaleService.ScheduleDownscale(fileHash, filePath, mapKind);
|
|
}
|
|
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", entry.Hash, fileHash);
|
|
File.Delete(filePath);
|
|
_fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogWarning(ex, "Error creating cache entry");
|
|
}
|
|
}
|
|
|
|
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
|
|
{
|
|
bool alreadyCancelled = false;
|
|
try
|
|
{
|
|
CancellationTokenSource localTimeoutCts = new();
|
|
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
|
|
CancellationTokenSource composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
|
|
|
|
while (!_orchestrator.IsDownloadReady(requestId))
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(250, composite.Token).ConfigureAwait(false);
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
if (downloadCt.IsCancellationRequested) throw;
|
|
|
|
var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
|
|
downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false);
|
|
req.EnsureSuccessStatusCode();
|
|
localTimeoutCts.Dispose();
|
|
composite.Dispose();
|
|
localTimeoutCts = new();
|
|
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
|
|
composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
|
|
}
|
|
}
|
|
|
|
localTimeoutCts.Dispose();
|
|
composite.Dispose();
|
|
|
|
Logger.LogDebug("Download {requestId} ready", requestId);
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
try
|
|
{
|
|
await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
|
|
alreadyCancelled = true;
|
|
}
|
|
catch
|
|
{
|
|
// ignore whatever happens here
|
|
}
|
|
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
if (downloadCt.IsCancellationRequested && !alreadyCancelled)
|
|
{
|
|
try
|
|
{
|
|
await _orchestrator.SendRequestAsync(HttpMethod.Get, LightlessFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
// ignore whatever happens here
|
|
}
|
|
}
|
|
_orchestrator.ClearDownloadRequest(requestId);
|
|
}
|
|
}
|
|
|
|
private static IProgress<long> CreateInlineProgress(Action<long> callback)
|
|
{
|
|
return new InlineProgress(callback);
|
|
}
|
|
|
|
private sealed class InlineProgress : IProgress<long>
|
|
{
|
|
private readonly Action<long> _callback;
|
|
|
|
public InlineProgress(Action<long> callback)
|
|
{
|
|
_callback = callback ?? throw new ArgumentNullException(nameof(callback));
|
|
}
|
|
|
|
public void Report(long value)
|
|
{
|
|
_callback(value);
|
|
}
|
|
}
|
|
}
|