All checks were successful
Tag and Release Lightless / tag-and-release (push) Successful in 2m9s
# Patchnotes 2.1.0 The changes in this update are more than just "patches". With a new UI, a new feature, and a bunch of bug fixes, improvements and a new member on the dev team, we thought this was more of a minor update. We would like to introduce @tsubasahane of MareCN to the team! We’re happy to work with them to bring Lightless and its features to the CN client as well as having another talented dev bring features and ideas to us. Speaking of which: # Location Sharing (Big shout out to @tsubasahane for bringing this feature) - Are you TIRED of scrambling to find the address of the venue you're in to share with your friends? We are introducing Location Sharing! An optional feature where you can share your location with direct pairs temporarily [30 minutes, 1 hour, 3 hours] minutes or until you turn it off for them. That's up to you! [#125](<#125>) [#49](<Lightless-Sync/LightlessServer#49>) - To share your location with a pair, click the three dots beside the pair and choose a duration to share with them. [#125](<#125>) [#49](<Lightless-Sync/LightlessServer#49>) - To view the location of someone who's shared with you, simply hover over the globe icon! [#125](<#125>) [#49](<Lightless-Sync/LightlessServer#49>) [1] # Model Optimization (Mesh Decimating) - This new option can automatically “simplify” incoming character meshes to help performance by reducing triangle counts. You choose how strong the reduction is (default/recommended is 80%). [#131](<#131>) - Decimation only kicks in when a mesh is above a certain triangle threshold, and only for the items that qualify for it and you selected for. [#131](<#131>) - Hair meshes is always excluded, since simplifying hair meshes is very prone to breaking. - You can find everything under Settings → Performance → Model Optimization. [#131](<#131>) + ** IF YOU HAVE USED DECIMATION IN TESTING, PLEASE CLEAR YOUR CACHE ❗ ** [2] # Animation (PAP) Validation (Safer animations) - Lightless now checks your currently animations to see if they work with your local skeleton/bone mod. If an animation matches, it’s included in what gets sent to other players. If it doesn’t, Lightless will skip it and write a warning to your log showing how many were skipped due to skeleton changes. Its defaulted to Unsafe (off). turn it on if you experience crashes from others users. [#131](<#131>) - Lightless also does the same kind of check for incoming animation files, to make sure they match the body/skeleton they were sent with. [#131](<#131>) - Because these checks can sometimes be a little picky, you can adjust how strict they are in Settings -> General -> Animation & Bones to reduce false positives. [#131](<#131>) # UI Changes (Thanks to @kyuwu for UI Changes) - The top part of the main screen has gotten a makeover. You can adjust the colors of the gradiant in the Color settings of Lightless. [#127](<#127>) [3] - Settings have gotten some changes as well to make this change more universal, and will use the same color settings. [#127](<#127>) - The particle effects of the gradient are toggleable in 'Settings -> UI -> Behavior' [#127](<#127>) - Instead of showing download/upload on bottom of Main UI, it will show VRAM usage and triangles with their optimization options next to it [#138](<#138>) # LightFinder / ShellFinder - UI Changes that follow our new design follow the color codes for the Gradient top as the main screen does. [#127](<#127>) [4] Co-authored-by: defnotken <itsdefnotken@gmail.com> Co-authored-by: azyges <aaaaaa@aaa.aaa> Co-authored-by: cake <admin@cakeandbanana.nl> Co-authored-by: Tsubasa <tsubasa@noreply.git.lightless-sync.org> Co-authored-by: choco <choco@patat.nl> Co-authored-by: celine <aaa@aaa.aaa> Co-authored-by: celine <celine@noreply.git.lightless-sync.org> Co-authored-by: Tsubasahane <wozaiha@gmail.com> Co-authored-by: cake <cake@noreply.git.lightless-sync.org> Reviewed-on: #123
1545 lines
60 KiB
C#
1545 lines
60 KiB
C#
using K4os.Compression.LZ4;
|
|
using K4os.Compression.LZ4.Legacy;
|
|
using LightlessSync.API.Data;
|
|
using LightlessSync.API.Dto.Files;
|
|
using LightlessSync.API.Routes;
|
|
using LightlessSync.FileCache;
|
|
using LightlessSync.LightlessConfiguration;
|
|
using LightlessSync.PlayerData.Handlers;
|
|
using LightlessSync.Services.Mediator;
|
|
using LightlessSync.Services.ModelDecimation;
|
|
using LightlessSync.Services.TextureCompression;
|
|
using LightlessSync.Utils;
|
|
using LightlessSync.WebAPI.Files.Models;
|
|
using Microsoft.Extensions.Logging;
|
|
using System.Buffers;
|
|
using System.Buffers.Binary;
|
|
using System.Collections.Concurrent;
|
|
using System.IO.MemoryMappedFiles;
|
|
using System.Net;
|
|
using System.Net.Http.Json;
|
|
|
|
namespace LightlessSync.WebAPI.Files;
|
|
|
|
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|
{
|
|
private readonly FileCompactor _fileCompactor;
|
|
private readonly FileCacheManager _fileDbManager;
|
|
private readonly FileTransferOrchestrator _orchestrator;
|
|
private readonly LightlessConfigService _configService;
|
|
private readonly TextureDownscaleService _textureDownscaleService;
|
|
private readonly ModelDecimationService _modelDecimationService;
|
|
private readonly TextureMetadataHelper _textureMetadataHelper;
|
|
private readonly FileDownloadDeduplicator _downloadDeduplicator;
|
|
|
|
private readonly ConcurrentDictionary<GameObjectHandler, DownloadSession> _activeSessions = new();
|
|
private readonly ConcurrentDictionary<GameObjectHandler, ConcurrentQueue<DownloadRequest>> _downloadQueues = new();
|
|
private readonly TaskRegistry<GameObjectHandler> _downloadQueueWaiters = new();
|
|
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
|
|
private readonly SemaphoreSlim _decompressGate =
|
|
new(Math.Max(1, Environment.ProcessorCount / 2), Math.Max(1, Environment.ProcessorCount / 2));
|
|
|
|
private volatile bool _disableDirectDownloads;
|
|
private int _consecutiveDirectDownloadFailures;
|
|
private bool _lastConfigDirectDownloadsState;
|
|
|
|
public FileDownloadManager(
|
|
ILogger<FileDownloadManager> logger,
|
|
LightlessMediator mediator,
|
|
FileTransferOrchestrator orchestrator,
|
|
FileCacheManager fileCacheManager,
|
|
FileCompactor fileCompactor,
|
|
LightlessConfigService configService,
|
|
TextureDownscaleService textureDownscaleService,
|
|
ModelDecimationService modelDecimationService,
|
|
TextureMetadataHelper textureMetadataHelper,
|
|
FileDownloadDeduplicator downloadDeduplicator) : base(logger, mediator)
|
|
{
|
|
_orchestrator = orchestrator;
|
|
_fileDbManager = fileCacheManager;
|
|
_fileCompactor = fileCompactor;
|
|
_configService = configService;
|
|
_textureDownscaleService = textureDownscaleService;
|
|
_modelDecimationService = modelDecimationService;
|
|
_textureMetadataHelper = textureMetadataHelper;
|
|
_downloadDeduplicator = downloadDeduplicator;
|
|
_activeDownloadStreams = new();
|
|
_lastConfigDirectDownloadsState = _configService.Current.EnableDirectDownloads;
|
|
|
|
Mediator.Subscribe<DownloadLimitChangedMessage>(this, _ =>
|
|
{
|
|
if (_activeDownloadStreams.IsEmpty) return;
|
|
|
|
var newLimit = _orchestrator.DownloadLimitPerSlot();
|
|
Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
|
|
|
|
foreach (var stream in _activeDownloadStreams.Keys)
|
|
stream.BandwidthLimit = newLimit;
|
|
});
|
|
|
|
Mediator.Subscribe<DisconnectedMessage>(this, _ =>
|
|
{
|
|
Logger.LogDebug("Disconnected from server, clearing in-flight downloads");
|
|
ClearDownload();
|
|
_downloadDeduplicator.CompleteAll(false);
|
|
});
|
|
}
|
|
|
|
public List<DownloadFileTransfer> CurrentDownloads => _activeSessions.Values.SelectMany(s => s.Downloads).ToList();
|
|
public List<FileTransfer> ForbiddenTransfers => _orchestrator.ForbiddenTransfers;
|
|
public bool IsDownloading => !_activeSessions.IsEmpty || _downloadQueues.Any(kvp => !kvp.Value.IsEmpty);
|
|
|
|
public bool IsDownloadingFor(GameObjectHandler? handler)
|
|
{
|
|
if (handler is null)
|
|
return false;
|
|
|
|
return _activeSessions.ContainsKey(handler)
|
|
|| (_downloadQueues.TryGetValue(handler, out var queue) && !queue.IsEmpty);
|
|
}
|
|
|
|
public int GetPendingDownloadCount(GameObjectHandler? handler)
|
|
{
|
|
if (handler is null)
|
|
return 0;
|
|
|
|
var count = 0;
|
|
|
|
if (_activeSessions.TryGetValue(handler, out var session))
|
|
count += session.Downloads.Count;
|
|
|
|
if (_downloadQueues.TryGetValue(handler, out var queue))
|
|
{
|
|
foreach (var request in queue)
|
|
count += request.Session.Downloads.Count;
|
|
}
|
|
|
|
return count;
|
|
}
|
|
|
|
private bool ShouldUseDirectDownloads()
|
|
=> _configService.Current.EnableDirectDownloads && !_disableDirectDownloads;
|
|
|
|
public static void MungeBuffer(Span<byte> buffer)
|
|
{
|
|
for (int i = 0; i < buffer.Length; ++i)
|
|
buffer[i] ^= 42;
|
|
}
|
|
|
|
public void ClearDownload()
|
|
{
|
|
foreach (var session in _activeSessions.Values.ToList())
|
|
ClearDownload(session);
|
|
}
|
|
|
|
private void ClearDownload(DownloadSession session)
|
|
{
|
|
foreach (var hash in session.OwnedDownloads.Keys.ToList())
|
|
{
|
|
CompleteOwnedDownload(session, hash, false);
|
|
}
|
|
|
|
session.Status.Clear();
|
|
session.OwnedDownloads.Clear();
|
|
session.Downloads.Clear();
|
|
|
|
if (session.Handler is not null)
|
|
_activeSessions.TryRemove(session.Handler, out _);
|
|
}
|
|
|
|
public async Task DownloadFiles(GameObjectHandler? gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct, bool skipDownscale = false, bool skipDecimation = false)
|
|
{
|
|
var downloads = await InitiateDownloadList(gameObject, fileReplacementDto, ct).ConfigureAwait(false);
|
|
await DownloadFiles(gameObject, fileReplacementDto, downloads, ct, skipDownscale, skipDecimation).ConfigureAwait(false);
|
|
}
|
|
|
|
public Task DownloadFiles(GameObjectHandler? gameObject, List<FileReplacementData> fileReplacementDto, List<DownloadFileTransfer> downloads, CancellationToken ct, bool skipDownscale = false, bool skipDecimation = false)
|
|
{
|
|
var session = new DownloadSession(gameObject, downloads);
|
|
var completion = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var request = new DownloadRequest(session, fileReplacementDto, ct, skipDownscale, skipDecimation, completion);
|
|
return EnqueueDownloadAsync(request);
|
|
}
|
|
|
|
private Task EnqueueDownloadAsync(DownloadRequest request)
|
|
{
|
|
var handler = request.Session.Handler;
|
|
if (handler is null)
|
|
{
|
|
_ = ExecuteDownloadRequestAsync(request);
|
|
return request.Completion.Task;
|
|
}
|
|
|
|
var queue = _downloadQueues.GetOrAdd(handler, _ => new ConcurrentQueue<DownloadRequest>());
|
|
queue.Enqueue(request);
|
|
|
|
_downloadQueueWaiters.GetOrStart(handler, () => ProcessDownloadQueueAsync(handler));
|
|
|
|
return request.Completion.Task;
|
|
}
|
|
|
|
private async Task ProcessDownloadQueueAsync(GameObjectHandler handler)
|
|
{
|
|
if (!_downloadQueues.TryGetValue(handler, out var queue))
|
|
return;
|
|
|
|
while (true)
|
|
{
|
|
while (queue.TryDequeue(out var request))
|
|
{
|
|
await ExecuteDownloadRequestAsync(request).ConfigureAwait(false);
|
|
}
|
|
|
|
await Task.Yield();
|
|
if (queue.IsEmpty)
|
|
return;
|
|
}
|
|
}
|
|
|
|
private async Task ExecuteDownloadRequestAsync(DownloadRequest request)
|
|
{
|
|
if (request.CancellationToken.IsCancellationRequested)
|
|
{
|
|
request.Completion.TrySetCanceled(request.CancellationToken);
|
|
return;
|
|
}
|
|
|
|
var session = request.Session;
|
|
if (session.Handler is not null)
|
|
{
|
|
_activeSessions[session.Handler] = session;
|
|
}
|
|
|
|
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
|
|
try
|
|
{
|
|
await DownloadFilesInternal(session, request.Replacements, request.CancellationToken, request.SkipDownscale, request.SkipDecimation).ConfigureAwait(false);
|
|
request.Completion.TrySetResult(true);
|
|
}
|
|
catch (OperationCanceledException) when (request.CancellationToken.IsCancellationRequested)
|
|
{
|
|
ClearDownload(session);
|
|
request.Completion.TrySetCanceled(request.CancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
ClearDownload(session);
|
|
request.Completion.TrySetException(ex);
|
|
}
|
|
finally
|
|
{
|
|
if (session.Handler is not null)
|
|
{
|
|
Mediator.Publish(new DownloadFinishedMessage(session.Handler));
|
|
_activeSessions.TryRemove(session.Handler, out _);
|
|
}
|
|
|
|
Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
|
|
}
|
|
}
|
|
|
|
protected override void Dispose(bool disposing)
|
|
{
|
|
ClearDownload();
|
|
|
|
foreach (var stream in _activeDownloadStreams.Keys.ToList())
|
|
{
|
|
try { stream.Dispose(); }
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
finally { _activeDownloadStreams.TryRemove(stream, out _); }
|
|
}
|
|
|
|
base.Dispose(disposing);
|
|
}
|
|
|
|
private sealed class DownloadSession
|
|
{
|
|
public DownloadSession(GameObjectHandler? handler, List<DownloadFileTransfer> downloads)
|
|
{
|
|
Handler = handler;
|
|
ObjectName = handler?.Name ?? "Unknown";
|
|
Downloads = downloads;
|
|
}
|
|
|
|
public GameObjectHandler? Handler { get; }
|
|
public string ObjectName { get; }
|
|
public List<DownloadFileTransfer> Downloads { get; }
|
|
public ConcurrentDictionary<string, FileDownloadStatus> Status { get; } = new(StringComparer.Ordinal);
|
|
public ConcurrentDictionary<string, byte> OwnedDownloads { get; } = new(StringComparer.OrdinalIgnoreCase);
|
|
}
|
|
|
|
private sealed record DownloadRequest(
|
|
DownloadSession Session,
|
|
List<FileReplacementData> Replacements,
|
|
CancellationToken CancellationToken,
|
|
bool SkipDownscale,
|
|
bool SkipDecimation,
|
|
TaskCompletionSource<bool> Completion);
|
|
|
|
private sealed class DownloadSlotLease : IAsyncDisposable
|
|
{
|
|
private readonly FileTransferOrchestrator _orch;
|
|
private bool _released;
|
|
|
|
public DownloadSlotLease(FileTransferOrchestrator orch) => _orch = orch;
|
|
|
|
public ValueTask DisposeAsync()
|
|
{
|
|
if (!_released)
|
|
{
|
|
_released = true;
|
|
_orch.ReleaseDownloadSlot();
|
|
}
|
|
return ValueTask.CompletedTask;
|
|
}
|
|
}
|
|
|
|
private async ValueTask<DownloadSlotLease> AcquireSlotAsync(CancellationToken ct)
|
|
{
|
|
await _orchestrator.WaitForDownloadSlotAsync(ct).ConfigureAwait(false);
|
|
return new DownloadSlotLease(_orchestrator);
|
|
}
|
|
|
|
private void SetStatus(DownloadSession session, string key, DownloadStatus status)
|
|
{
|
|
if (session.Status.TryGetValue(key, out var st))
|
|
st.DownloadStatus = status;
|
|
}
|
|
|
|
private void AddTransferredBytes(DownloadSession session, string key, long delta)
|
|
{
|
|
if (session.Status.TryGetValue(key, out var st))
|
|
st.AddTransferredBytes(delta);
|
|
}
|
|
|
|
private void MarkTransferredFiles(DownloadSession session, string key, int files)
|
|
{
|
|
if (session.Status.TryGetValue(key, out var st))
|
|
st.SetTransferredFiles(files);
|
|
}
|
|
|
|
private void CompleteOwnedDownload(DownloadSession session, string hash, bool success)
|
|
{
|
|
if (session.OwnedDownloads.TryRemove(hash, out _))
|
|
{
|
|
_downloadDeduplicator.Complete(hash, success);
|
|
}
|
|
}
|
|
|
|
private static byte MungeByte(int byteOrEof)
|
|
{
|
|
if (byteOrEof == -1) throw new EndOfStreamException();
|
|
return (byte)(byteOrEof ^ 42);
|
|
}
|
|
|
|
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
|
|
{
|
|
List<char> hashName = [];
|
|
List<char> fileLength = [];
|
|
|
|
var separator = (char)MungeByte(fileBlockStream.ReadByte());
|
|
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
|
|
|
|
bool readHash = false;
|
|
while (true)
|
|
{
|
|
int readByte = fileBlockStream.ReadByte();
|
|
if (readByte == -1) throw new EndOfStreamException();
|
|
|
|
var readChar = (char)MungeByte(readByte);
|
|
if (readChar == ':')
|
|
{
|
|
readHash = true;
|
|
continue;
|
|
}
|
|
if (readChar == '#') break;
|
|
|
|
if (!readHash) hashName.Add(readChar);
|
|
else fileLength.Add(readChar);
|
|
}
|
|
|
|
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
|
|
}
|
|
|
|
private static async Task ReadExactlyAsync(FileStream stream, Memory<byte> buffer, CancellationToken ct)
|
|
{
|
|
int offset = 0;
|
|
while (offset < buffer.Length)
|
|
{
|
|
int n = await stream.ReadAsync(buffer.Slice(offset), ct).ConfigureAwait(false);
|
|
if (n == 0) throw new EndOfStreamException();
|
|
offset += n;
|
|
}
|
|
}
|
|
|
|
private static async Task CopyExactlyAsync(Stream source, Stream destination, long bytesToCopy, CancellationToken ct)
|
|
{
|
|
if (bytesToCopy <= 0)
|
|
return;
|
|
|
|
var buffer = ArrayPool<byte>.Shared.Rent(81920);
|
|
try
|
|
{
|
|
long remaining = bytesToCopy;
|
|
while (remaining > 0)
|
|
{
|
|
int read = await source.ReadAsync(buffer.AsMemory(0, (int)Math.Min(buffer.Length, remaining)), ct).ConfigureAwait(false);
|
|
if (read == 0) throw new EndOfStreamException();
|
|
await destination.WriteAsync(buffer.AsMemory(0, read), ct).ConfigureAwait(false);
|
|
remaining -= read;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
ArrayPool<byte>.Shared.Return(buffer);
|
|
}
|
|
}
|
|
|
|
private async Task<long> DecompressWrappedLz4ToFileAsync(string compressedPath, string outputPath, CancellationToken ct)
|
|
{
|
|
await using var input = new FileStream(compressedPath, FileMode.Open, FileAccess.Read, FileShare.Read, 81920, useAsync: true);
|
|
byte[] header = new byte[8];
|
|
await ReadExactlyAsync(input, header, ct).ConfigureAwait(false);
|
|
|
|
int outputLength = BinaryPrimitives.ReadInt32LittleEndian(header.AsSpan(0, 4));
|
|
int inputLength = BinaryPrimitives.ReadInt32LittleEndian(header.AsSpan(4, 4));
|
|
|
|
if (outputLength < 0 || inputLength < 0)
|
|
throw new InvalidDataException("LZ4 header contained a negative length.");
|
|
|
|
long remainingLength = input.Length - 8;
|
|
if (inputLength > remainingLength)
|
|
throw new InvalidDataException("LZ4 header length exceeds file size.");
|
|
|
|
var dir = Path.GetDirectoryName(outputPath);
|
|
if (!string.IsNullOrEmpty(dir) && !Directory.Exists(dir))
|
|
Directory.CreateDirectory(dir);
|
|
|
|
if (outputLength == 0)
|
|
{
|
|
await using var emptyStream = new FileStream(outputPath, FileMode.Create, FileAccess.Write, FileShare.None, 4096, useAsync: true);
|
|
await emptyStream.FlushAsync(ct).ConfigureAwait(false);
|
|
return 0;
|
|
}
|
|
|
|
if (inputLength >= outputLength)
|
|
{
|
|
await using var outputStream = new FileStream(outputPath, FileMode.Create, FileAccess.Write, FileShare.None, 81920, useAsync: true);
|
|
await CopyExactlyAsync(input, outputStream, inputLength, ct).ConfigureAwait(false);
|
|
await outputStream.FlushAsync(ct).ConfigureAwait(false);
|
|
return outputLength;
|
|
}
|
|
|
|
await using var mappedOutputStream = new FileStream(outputPath, FileMode.Create, FileAccess.ReadWrite, FileShare.None, 4096, FileOptions.SequentialScan);
|
|
mappedOutputStream.SetLength(outputLength);
|
|
|
|
using var inputMap = MemoryMappedFile.CreateFromFile(compressedPath, FileMode.Open, null, 0, MemoryMappedFileAccess.Read);
|
|
using var inputView = inputMap.CreateViewAccessor(8, inputLength, MemoryMappedFileAccess.Read);
|
|
using var outputMap = MemoryMappedFile.CreateFromFile(mappedOutputStream, null, outputLength, MemoryMappedFileAccess.ReadWrite, HandleInheritability.None, leaveOpen: true);
|
|
using var outputView = outputMap.CreateViewAccessor(0, outputLength, MemoryMappedFileAccess.Write);
|
|
|
|
unsafe
|
|
{
|
|
byte* inputPtr = null;
|
|
byte* outputPtr = null;
|
|
try
|
|
{
|
|
inputView.SafeMemoryMappedViewHandle.AcquirePointer(ref inputPtr);
|
|
outputView.SafeMemoryMappedViewHandle.AcquirePointer(ref outputPtr);
|
|
|
|
inputPtr += inputView.PointerOffset;
|
|
outputPtr += outputView.PointerOffset;
|
|
|
|
int decoded = LZ4Codec.Decode(inputPtr, inputLength, outputPtr, outputLength);
|
|
if (decoded != outputLength)
|
|
throw new InvalidDataException($"LZ4 decode length mismatch (expected {outputLength}, got {decoded}).");
|
|
}
|
|
finally
|
|
{
|
|
if (inputPtr != null)
|
|
inputView.SafeMemoryMappedViewHandle.ReleasePointer();
|
|
if (outputPtr != null)
|
|
outputView.SafeMemoryMappedViewHandle.ReleasePointer();
|
|
}
|
|
}
|
|
|
|
outputView.Flush();
|
|
return outputLength;
|
|
}
|
|
|
|
private static Dictionary<string, (string Extension, string GamePath)> BuildReplacementLookup(List<FileReplacementData> fileReplacement)
|
|
{
|
|
var map = new Dictionary<string, (string Extension, string GamePath)>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
foreach (var r in fileReplacement)
|
|
{
|
|
if (r == null || string.IsNullOrWhiteSpace(r.Hash)) continue;
|
|
if (map.ContainsKey(r.Hash)) continue;
|
|
|
|
var gamePath = r.GamePaths?.FirstOrDefault() ?? string.Empty;
|
|
|
|
string ext = "";
|
|
try
|
|
{
|
|
ext = Path.GetExtension(gamePath)?.TrimStart('.') ?? "";
|
|
}
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(ext))
|
|
ext = "bin";
|
|
|
|
map[r.Hash] = (ext, gamePath);
|
|
}
|
|
|
|
return map;
|
|
}
|
|
|
|
private delegate void DownloadDataCallback(Span<byte> data);
|
|
|
|
private async Task DownloadFileThrottled(
|
|
Uri requestUrl,
|
|
string destinationFilename,
|
|
IProgress<long> progress,
|
|
DownloadDataCallback? callback,
|
|
CancellationToken ct,
|
|
bool withToken)
|
|
{
|
|
const int maxRetries = 3;
|
|
int retryCount = 0;
|
|
TimeSpan retryDelay = TimeSpan.FromSeconds(2);
|
|
|
|
HttpResponseMessage? response = null;
|
|
|
|
while (true)
|
|
{
|
|
try
|
|
{
|
|
Logger.LogDebug("Attempt {attempt} - Downloading {requestUrl}", retryCount + 1, requestUrl);
|
|
response = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Get,
|
|
requestUrl,
|
|
ct,
|
|
HttpCompletionOption.ResponseHeadersRead,
|
|
withToken)
|
|
.ConfigureAwait(false);
|
|
|
|
response.EnsureSuccessStatusCode();
|
|
break;
|
|
}
|
|
catch (HttpRequestException ex) when (ex.InnerException is TimeoutException || ex.StatusCode == null)
|
|
{
|
|
response?.Dispose();
|
|
retryCount++;
|
|
|
|
Logger.LogWarning(ex, "Timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries);
|
|
|
|
if (retryCount >= maxRetries || ct.IsCancellationRequested)
|
|
{
|
|
Logger.LogError("Max retries reached or cancelled. Failing download for {requestUrl}", requestUrl);
|
|
throw;
|
|
}
|
|
|
|
await Task.Delay(retryDelay, ct).ConfigureAwait(false);
|
|
}
|
|
catch (TaskCanceledException ex) when (!ct.IsCancellationRequested)
|
|
{
|
|
response?.Dispose();
|
|
retryCount++;
|
|
|
|
Logger.LogWarning(ex, "Cancellation/timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries);
|
|
|
|
if (retryCount >= maxRetries)
|
|
{
|
|
Logger.LogError("Max retries reached for {requestUrl} after TaskCanceledException", requestUrl);
|
|
throw;
|
|
}
|
|
|
|
await Task.Delay(retryDelay, ct).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
|
{
|
|
response?.Dispose();
|
|
throw;
|
|
}
|
|
catch (HttpRequestException ex)
|
|
{
|
|
response?.Dispose();
|
|
Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode);
|
|
|
|
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
|
|
throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
|
|
|
|
throw;
|
|
}
|
|
}
|
|
|
|
ThrottledStream? stream = null;
|
|
|
|
try
|
|
{
|
|
// Determine buffer size based on content length
|
|
var contentLen = response!.Content.Headers.ContentLength ?? 0;
|
|
var bufferSize = contentLen > 1024 * 1024 ? 65536 : 8196;
|
|
var buffer = new byte[bufferSize];
|
|
|
|
// Create destination file stream
|
|
var fileStream = new FileStream(
|
|
destinationFilename,
|
|
FileMode.Create,
|
|
FileAccess.Write,
|
|
FileShare.None,
|
|
bufferSize: 64 * 1024,
|
|
useAsync: true);
|
|
|
|
// Download with throttling
|
|
await using (fileStream.ConfigureAwait(false))
|
|
{
|
|
var limit = _orchestrator.DownloadLimitPerSlot();
|
|
Logger.LogTrace("Starting Download with a speed limit of {limit} to {destination}", limit, destinationFilename);
|
|
|
|
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
|
|
_activeDownloadStreams.TryAdd(stream, 0);
|
|
|
|
while (true)
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
int bytesRead = await stream.ReadAsync(buffer.AsMemory(0, buffer.Length), ct).ConfigureAwait(false);
|
|
if (bytesRead == 0) break;
|
|
|
|
callback?.Invoke(buffer.AsSpan(0, bytesRead));
|
|
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
|
|
|
|
progress.Report(bytesRead);
|
|
}
|
|
|
|
Logger.LogDebug("{requestUrl} downloaded to {destination}", requestUrl, destinationFilename);
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
try
|
|
{
|
|
if (!string.IsNullOrEmpty(destinationFilename) && File.Exists(destinationFilename))
|
|
File.Delete(destinationFilename);
|
|
}
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
if (stream != null)
|
|
{
|
|
_activeDownloadStreams.TryRemove(stream, out _);
|
|
await stream.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
response?.Dispose();
|
|
}
|
|
}
|
|
|
|
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
|
|
{
|
|
while (true)
|
|
{
|
|
downloadCt.ThrowIfCancellationRequested();
|
|
|
|
if (_orchestrator.IsDownloadReady(requestId))
|
|
break;
|
|
|
|
using var resp = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Get,
|
|
LightlessFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
|
|
downloadFileTransfer.Select(t => t.Hash).ToList(),
|
|
downloadCt).ConfigureAwait(false);
|
|
|
|
resp.EnsureSuccessStatusCode();
|
|
|
|
var body = (await resp.Content.ReadAsStringAsync(downloadCt).ConfigureAwait(false)).Trim();
|
|
if (string.Equals(body, "true", StringComparison.OrdinalIgnoreCase) ||
|
|
body.Contains("\"ready\":true", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
break;
|
|
}
|
|
|
|
await Task.Delay(250, downloadCt).ConfigureAwait(false);
|
|
}
|
|
|
|
_orchestrator.ClearDownloadRequest(requestId);
|
|
}
|
|
|
|
private async Task DownloadQueuedBlockFileAsync(
|
|
DownloadSession session,
|
|
string statusKey,
|
|
Guid requestId,
|
|
List<DownloadFileTransfer> transfers,
|
|
string tempPath,
|
|
IProgress<long> progress,
|
|
CancellationToken ct)
|
|
{
|
|
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}",
|
|
requestId, transfers[0].DownloadUri, string.Join(", ", transfers.Select(c => c.Hash)));
|
|
|
|
// Wait for ready WITHOUT holding a slot
|
|
SetStatus(session, statusKey, DownloadStatus.WaitingForQueue);
|
|
await WaitForDownloadReady(transfers, requestId, ct).ConfigureAwait(false);
|
|
|
|
// Hold slot ONLY for the GET
|
|
SetStatus(session, statusKey, DownloadStatus.WaitingForSlot);
|
|
await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false))
|
|
{
|
|
SetStatus(session, statusKey, DownloadStatus.Downloading);
|
|
|
|
var requestUrl = LightlessFiles.CacheGetFullPath(transfers[0].DownloadUri, requestId);
|
|
await DownloadFileThrottled(requestUrl, tempPath, progress, MungeBuffer, ct, withToken: true).ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
private async Task DecompressBlockFileAsync(
|
|
DownloadSession session,
|
|
string downloadStatusKey,
|
|
string blockFilePath,
|
|
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
|
IReadOnlyDictionary<string, long> rawSizeLookup,
|
|
string downloadLabel,
|
|
CancellationToken ct,
|
|
bool skipDownscale,
|
|
bool skipDecimation)
|
|
{
|
|
SetStatus(session, downloadStatusKey, DownloadStatus.Decompressing);
|
|
MarkTransferredFiles(session, downloadStatusKey, 1);
|
|
|
|
try
|
|
{
|
|
var fileBlockStream = File.OpenRead(blockFilePath);
|
|
await using (fileBlockStream.ConfigureAwait(false))
|
|
{
|
|
while (fileBlockStream.Position < fileBlockStream.Length)
|
|
{
|
|
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
|
|
|
|
try
|
|
{
|
|
if (fileLengthBytes < 0 || fileLengthBytes > int.MaxValue)
|
|
throw new InvalidDataException($"Invalid block entry length: {fileLengthBytes}");
|
|
|
|
var len = checked((int)fileLengthBytes);
|
|
|
|
if (!replacementLookup.TryGetValue(fileHash, out var repl))
|
|
{
|
|
Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash);
|
|
CompleteOwnedDownload(session, fileHash, false);
|
|
// still need to skip bytes:
|
|
var skip = checked((int)fileLengthBytes);
|
|
fileBlockStream.Position += skip;
|
|
continue;
|
|
}
|
|
|
|
var filePath = _fileDbManager.GetCacheFilePath(fileHash, repl.Extension);
|
|
Logger.LogTrace("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath);
|
|
|
|
var compressed = new byte[len];
|
|
|
|
await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false);
|
|
|
|
MungeBuffer(compressed);
|
|
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
|
|
|
if (rawSizeLookup.TryGetValue(fileHash, out var expectedRawSize)
|
|
&& expectedRawSize > 0
|
|
&& decompressed.LongLength != expectedRawSize)
|
|
{
|
|
Logger.LogWarning("{dlName}: Decompressed size mismatch for {fileHash} (expected {expected}, got {actual})",
|
|
downloadLabel, fileHash, expectedRawSize, decompressed.LongLength);
|
|
CompleteOwnedDownload(session, fileHash, false);
|
|
continue;
|
|
}
|
|
|
|
await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
|
|
PersistFileToStorage(session, fileHash, filePath, repl.GamePath, skipDownscale, skipDecimation);
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", downloadLabel, fileHash);
|
|
CompleteOwnedDownload(session, fileHash, false);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Logger.LogWarning(e, "{dlName}: Error during decompression", downloadLabel);
|
|
CompleteOwnedDownload(session, fileHash, false);
|
|
}
|
|
}
|
|
}
|
|
|
|
SetStatus(session, downloadStatusKey, DownloadStatus.Completed);
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", downloadLabel);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel);
|
|
}
|
|
}
|
|
|
|
public async Task<List<DownloadFileTransfer>> InitiateDownloadList(
|
|
GameObjectHandler? gameObjectHandler,
|
|
List<FileReplacementData> fileReplacement,
|
|
CancellationToken ct,
|
|
Guid? ownerToken = null)
|
|
{
|
|
_ = ownerToken;
|
|
var objectName = gameObjectHandler?.Name ?? "Unknown";
|
|
Logger.LogDebug("Download start: {id}", objectName);
|
|
|
|
if (fileReplacement == null || fileReplacement.Count == 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: No file replacements provided", objectName);
|
|
return [];
|
|
}
|
|
|
|
var hashes = fileReplacement
|
|
.Where(f => f != null && !string.IsNullOrWhiteSpace(f.Hash))
|
|
.Select(f => f.Hash)
|
|
.Distinct(StringComparer.Ordinal)
|
|
.ToList();
|
|
|
|
if (hashes.Count == 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: No valid hashes to download", objectName);
|
|
return [];
|
|
}
|
|
|
|
var missingHashes = new List<string>(hashes.Count);
|
|
foreach (var hash in hashes)
|
|
{
|
|
if (_fileDbManager.GetFileCacheByHash(hash) is null)
|
|
{
|
|
missingHashes.Add(hash);
|
|
}
|
|
}
|
|
|
|
if (missingHashes.Count == 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: All requested hashes already present in cache", objectName);
|
|
return [];
|
|
}
|
|
|
|
if (missingHashes.Count < hashes.Count)
|
|
{
|
|
Logger.LogDebug("{dlName}: Skipping {count} hashes already present in cache", objectName, hashes.Count - missingHashes.Count);
|
|
}
|
|
|
|
List<DownloadFileDto> downloadFileInfoFromService =
|
|
[
|
|
.. await FilesGetSizes(missingHashes, ct).ConfigureAwait(false),
|
|
];
|
|
|
|
Logger.LogDebug("Files with size 0 or less: {files}",
|
|
string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
|
|
|
|
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
|
{
|
|
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
|
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
|
}
|
|
|
|
var downloads = downloadFileInfoFromService
|
|
.Distinct()
|
|
.Select(d => new DownloadFileTransfer(d))
|
|
.Where(d => d.CanBeTransferred)
|
|
.ToList();
|
|
|
|
return downloads;
|
|
}
|
|
|
|
private sealed record BatchChunk(string HostKey, string StatusKey, List<DownloadFileTransfer> Items);
|
|
|
|
private static IEnumerable<List<T>> ChunkList<T>(List<T> items, int chunkSize)
|
|
{
|
|
for (int i = 0; i < items.Count; i += chunkSize)
|
|
yield return items.GetRange(i, Math.Min(chunkSize, items.Count - i));
|
|
}
|
|
|
|
private async Task DownloadFilesInternal(DownloadSession session, List<FileReplacementData> fileReplacement, CancellationToken ct, bool skipDownscale, bool skipDecimation)
|
|
{
|
|
var objectName = session.ObjectName;
|
|
|
|
// config toggles
|
|
var configAllowsDirect = _configService.Current.EnableDirectDownloads;
|
|
if (configAllowsDirect != _lastConfigDirectDownloadsState)
|
|
{
|
|
_lastConfigDirectDownloadsState = configAllowsDirect;
|
|
if (configAllowsDirect)
|
|
{
|
|
_disableDirectDownloads = false;
|
|
_consecutiveDirectDownloadFailures = 0;
|
|
}
|
|
}
|
|
|
|
var allowDirectDownloads = ShouldUseDirectDownloads();
|
|
var replacementLookup = BuildReplacementLookup(fileReplacement);
|
|
var rawSizeLookup = new Dictionary<string, long>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
foreach (var download in session.Downloads)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(download.Hash))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
if (!rawSizeLookup.TryGetValue(download.Hash, out var existing) || existing <= 0)
|
|
{
|
|
rawSizeLookup[download.Hash] = download.TotalRaw;
|
|
}
|
|
}
|
|
|
|
var directDownloads = new List<DownloadFileTransfer>();
|
|
var batchDownloads = new List<DownloadFileTransfer>();
|
|
|
|
foreach (var download in session.Downloads)
|
|
{
|
|
if (!string.IsNullOrEmpty(download.DirectDownloadUrl) && allowDirectDownloads)
|
|
directDownloads.Add(download);
|
|
else
|
|
batchDownloads.Add(download);
|
|
}
|
|
|
|
session.OwnedDownloads.Clear();
|
|
var waitingHashes = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
|
var waitTasks = new List<Task<bool>>();
|
|
var claims = new Dictionary<string, DownloadClaim>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
DownloadClaim GetClaim(string hash)
|
|
{
|
|
if (!claims.TryGetValue(hash, out var claim))
|
|
{
|
|
claim = _downloadDeduplicator.Claim(hash);
|
|
claims[hash] = claim;
|
|
}
|
|
|
|
return claim;
|
|
}
|
|
|
|
List<DownloadFileTransfer> FilterOwned(List<DownloadFileTransfer> downloads)
|
|
{
|
|
if (downloads.Count == 0)
|
|
{
|
|
return downloads;
|
|
}
|
|
|
|
var owned = new List<DownloadFileTransfer>(downloads.Count);
|
|
foreach (var download in downloads)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(download.Hash))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
var claim = GetClaim(download.Hash);
|
|
if (claim.IsOwner)
|
|
{
|
|
session.OwnedDownloads.TryAdd(download.Hash, 0);
|
|
owned.Add(download);
|
|
}
|
|
else if (waitingHashes.Add(download.Hash))
|
|
{
|
|
waitTasks.Add(claim.Completion);
|
|
}
|
|
}
|
|
|
|
return owned;
|
|
}
|
|
|
|
directDownloads = FilterOwned(directDownloads);
|
|
batchDownloads = FilterOwned(batchDownloads);
|
|
|
|
if (waitTasks.Count > 0)
|
|
{
|
|
Logger.LogDebug("{dlName}: {count} files already downloading elsewhere; waiting for completion.", objectName, waitTasks.Count);
|
|
}
|
|
|
|
// Chunk per host so we can fill all slots
|
|
var slots = Math.Max(1, _configService.Current.ParallelDownloads);
|
|
|
|
var batchChunks = batchDownloads
|
|
.GroupBy(f => $"{f.DownloadUri.Host}:{f.DownloadUri.Port}", StringComparer.Ordinal)
|
|
.SelectMany(g =>
|
|
{
|
|
var list = g.ToList();
|
|
var chunkCount = Math.Min(slots, Math.Max(1, list.Count));
|
|
var chunkSize = (int)Math.Ceiling(list.Count / (double)chunkCount);
|
|
|
|
return ChunkList(list, chunkSize)
|
|
.Select((chunk, index) => new BatchChunk(g.Key, $"{g.Key}#{index + 1}", chunk));
|
|
})
|
|
.ToArray();
|
|
|
|
// init statuses
|
|
session.Status.Clear();
|
|
|
|
// direct downloads and batch downloads tracked separately
|
|
foreach (var d in directDownloads)
|
|
{
|
|
session.Status[d.DirectDownloadUrl!] = new FileDownloadStatus
|
|
{
|
|
DownloadStatus = DownloadStatus.WaitingForSlot,
|
|
TotalBytes = d.Total,
|
|
TotalFiles = 1,
|
|
TransferredBytes = 0,
|
|
TransferredFiles = 0
|
|
};
|
|
}
|
|
|
|
foreach (var chunk in batchChunks)
|
|
{
|
|
session.Status[chunk.StatusKey] = new FileDownloadStatus
|
|
{
|
|
DownloadStatus = DownloadStatus.WaitingForQueue,
|
|
TotalBytes = chunk.Items.Sum(x => x.Total),
|
|
TotalFiles = 1,
|
|
TransferredBytes = 0,
|
|
TransferredFiles = 0
|
|
};
|
|
}
|
|
|
|
if (directDownloads.Count > 0 || batchChunks.Length > 0)
|
|
{
|
|
Logger.LogInformation("Downloading {direct} files directly, and {batchtotal} queued in {chunks} chunks.",
|
|
directDownloads.Count, batchDownloads.Count, batchChunks.Length);
|
|
}
|
|
|
|
if (session.Handler is not null)
|
|
Mediator.Publish(new DownloadStartedMessage(session.Handler, session.Status));
|
|
|
|
// work based on cpu count and slots
|
|
var coreCount = Environment.ProcessorCount;
|
|
var baseWorkers = Math.Min(slots, coreCount);
|
|
|
|
// only add buffer if decompression has capacity AND we have cores to spare
|
|
var availableDecompressSlots = _decompressGate.CurrentCount;
|
|
var extraWorkers = (availableDecompressSlots > 0 && coreCount >= 6) ? 2 : 0;
|
|
|
|
// allow some extra workers so downloads can continue while earlier items decompress.
|
|
var workerDop = Math.Clamp(slots * 2, 2, 16);
|
|
var decompressionTasks = new ConcurrentBag<Task>();
|
|
using var decompressionLimiter = new SemaphoreSlim(CalculateDecompressionLimit(slots));
|
|
|
|
// batch downloads
|
|
Task batchTask = batchChunks.Length == 0
|
|
? Task.CompletedTask
|
|
: Parallel.ForEachAsync(batchChunks, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct },
|
|
async (chunk, token) => await ProcessBatchChunkAsync(session, chunk, replacementLookup, rawSizeLookup, decompressionTasks, decompressionLimiter, token, skipDownscale, skipDecimation).ConfigureAwait(false));
|
|
|
|
// direct downloads
|
|
Task directTask = directDownloads.Count == 0
|
|
? Task.CompletedTask
|
|
: Parallel.ForEachAsync(directDownloads, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct },
|
|
async (d, token) => await ProcessDirectAsync(session, d, replacementLookup, rawSizeLookup, decompressionTasks, decompressionLimiter, token, skipDownscale, skipDecimation).ConfigureAwait(false));
|
|
|
|
Task<bool[]> dedupWaitTask = waitTasks.Count == 0
|
|
? Task.FromResult(Array.Empty<bool>())
|
|
: Task.WhenAll(waitTasks);
|
|
|
|
try
|
|
{
|
|
await Task.WhenAll(batchTask, directTask).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
await WaitForAllTasksAsync(decompressionTasks).ConfigureAwait(false);
|
|
}
|
|
|
|
var dedupResults = await dedupWaitTask.ConfigureAwait(false);
|
|
|
|
if (waitTasks.Count > 0 && dedupResults.Any(r => !r))
|
|
{
|
|
Logger.LogWarning("{dlName}: One or more shared downloads failed; missing files may remain.", objectName);
|
|
}
|
|
|
|
Logger.LogDebug("Download end: {id}", objectName);
|
|
ClearDownload(session);
|
|
}
|
|
|
|
private async Task ProcessBatchChunkAsync(
|
|
DownloadSession session,
|
|
BatchChunk chunk,
|
|
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
|
IReadOnlyDictionary<string, long> rawSizeLookup,
|
|
ConcurrentBag<Task> decompressionTasks,
|
|
SemaphoreSlim decompressionLimiter,
|
|
CancellationToken ct,
|
|
bool skipDownscale,
|
|
bool skipDecimation)
|
|
{
|
|
var statusKey = chunk.StatusKey;
|
|
|
|
// enqueue (no slot)
|
|
SetStatus(session, statusKey, DownloadStatus.WaitingForQueue);
|
|
|
|
var requestIdResponse = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Post,
|
|
LightlessFiles.RequestEnqueueFullPath(chunk.Items[0].DownloadUri),
|
|
chunk.Items.Select(c => c.Hash),
|
|
ct).ConfigureAwait(false);
|
|
|
|
var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync(ct).ConfigureAwait(false)).Trim('"'));
|
|
|
|
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
|
|
var fi = new FileInfo(blockFile);
|
|
|
|
var decompressionQueued = false;
|
|
|
|
try
|
|
{
|
|
// download (with slot)
|
|
var progress = CreateInlineProgress(bytes => AddTransferredBytes(session, statusKey, bytes));
|
|
|
|
// Download slot held on get
|
|
await DownloadQueuedBlockFileAsync(session, statusKey, requestId, chunk.Items, blockFile, progress, ct).ConfigureAwait(false);
|
|
|
|
// decompress if file exists
|
|
if (!File.Exists(blockFile))
|
|
{
|
|
Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name);
|
|
SetStatus(session, statusKey, DownloadStatus.Completed);
|
|
return;
|
|
}
|
|
SetStatus(session, statusKey, DownloadStatus.Decompressing);
|
|
|
|
EnqueueLimitedTask(
|
|
decompressionTasks,
|
|
decompressionLimiter,
|
|
async token =>
|
|
{
|
|
try
|
|
{
|
|
await DecompressBlockFileAsync(session, statusKey, blockFile, replacementLookup, rawSizeLookup, fi.Name, token, skipDownscale, skipDecimation)
|
|
.ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
try { File.Delete(blockFile); } catch {}
|
|
foreach (var item in chunk.Items)
|
|
{
|
|
if (!string.IsNullOrWhiteSpace(item.Hash))
|
|
{
|
|
CompleteOwnedDownload(session, item.Hash, false);
|
|
}
|
|
}
|
|
}
|
|
},
|
|
ct);
|
|
decompressionQueued = true;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, requestId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "{dlName}: Error during batch chunk processing", fi.Name);
|
|
ClearDownload(session);
|
|
}
|
|
finally
|
|
{
|
|
if (!decompressionQueued)
|
|
{
|
|
try { File.Delete(blockFile); } catch { /* ignore */ }
|
|
foreach (var item in chunk.Items)
|
|
{
|
|
if (!string.IsNullOrWhiteSpace(item.Hash))
|
|
{
|
|
CompleteOwnedDownload(session, item.Hash, false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ProcessDirectAsync(
|
|
DownloadSession session,
|
|
DownloadFileTransfer directDownload,
|
|
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
|
IReadOnlyDictionary<string, long> rawSizeLookup,
|
|
ConcurrentBag<Task> decompressionTasks,
|
|
SemaphoreSlim decompressionLimiter,
|
|
CancellationToken ct,
|
|
bool skipDownscale,
|
|
bool skipDecimation)
|
|
{
|
|
var progress = CreateInlineProgress(bytes =>
|
|
{
|
|
if (!string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
|
AddTransferredBytes(session, directDownload.DirectDownloadUrl!, bytes);
|
|
});
|
|
|
|
if (!ShouldUseDirectDownloads() || string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
|
{
|
|
try
|
|
{
|
|
await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "{hash}: Error during direct download fallback.", directDownload.Hash);
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
throw;
|
|
}
|
|
return;
|
|
}
|
|
|
|
var tempFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, "bin");
|
|
var decompressionQueued = false;
|
|
|
|
try
|
|
{
|
|
// Download slot held on get
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.WaitingForSlot);
|
|
|
|
await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false))
|
|
{
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Downloading);
|
|
Logger.LogDebug("Beginning direct download of {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl);
|
|
|
|
await DownloadFileThrottled(new Uri(directDownload.DirectDownloadUrl!), tempFilename, progress, callback: null, ct, withToken: false)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
Interlocked.Exchange(ref _consecutiveDirectDownloadFailures, 0);
|
|
|
|
if (!replacementLookup.TryGetValue(directDownload.Hash, out var repl))
|
|
{
|
|
Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash);
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
return;
|
|
}
|
|
|
|
var finalFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, repl.Extension);
|
|
|
|
Logger.LogDebug("Decompressing direct download {hash} from {compressedFile} to {finalFile}",
|
|
directDownload.Hash, tempFilename, finalFilename);
|
|
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Decompressing);
|
|
EnqueueLimitedTask(
|
|
decompressionTasks,
|
|
decompressionLimiter,
|
|
async token =>
|
|
{
|
|
try
|
|
{
|
|
var decompressedLength = await DecompressWrappedLz4ToFileAsync(tempFilename, finalFilename, token).ConfigureAwait(false);
|
|
|
|
if (directDownload.TotalRaw > 0 && decompressedLength != directDownload.TotalRaw)
|
|
{
|
|
throw new InvalidDataException(
|
|
$"{directDownload.Hash}: Decompressed size mismatch (expected {directDownload.TotalRaw}, got {decompressedLength})");
|
|
}
|
|
|
|
_fileCompactor.NotifyFileWritten(finalFilename);
|
|
PersistFileToStorage(session, directDownload.Hash, finalFilename, repl.GamePath, skipDownscale, skipDecimation);
|
|
|
|
MarkTransferredFiles(session, directDownload.DirectDownloadUrl!, 1);
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
|
|
Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
var expectedDirectDownloadFailure = ex is InvalidDataException;
|
|
var failureCount = expectedDirectDownloadFailure ? 0 : Interlocked.Increment(ref _consecutiveDirectDownloadFailures);
|
|
|
|
if (expectedDirectDownloadFailure)
|
|
Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash);
|
|
else
|
|
Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash);
|
|
|
|
try
|
|
{
|
|
await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, token, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false);
|
|
|
|
if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads)
|
|
{
|
|
_disableDirectDownloads = true;
|
|
Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount);
|
|
}
|
|
}
|
|
catch (Exception fallbackEx)
|
|
{
|
|
Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash);
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
SetStatus(session, directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
|
|
ClearDownload(session);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
try { File.Delete(tempFilename); }
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
}
|
|
},
|
|
ct);
|
|
decompressionQueued = true;
|
|
}
|
|
catch (OperationCanceledException ex)
|
|
{
|
|
if (ct.IsCancellationRequested)
|
|
Logger.LogDebug("{hash}: Direct download cancelled by caller, discarding file.", directDownload.Hash);
|
|
else
|
|
Logger.LogWarning(ex, "{hash}: Direct download cancelled unexpectedly.", directDownload.Hash);
|
|
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
ClearDownload(session);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
var expectedDirectDownloadFailure = ex is InvalidDataException;
|
|
var failureCount = expectedDirectDownloadFailure ? 0 : Interlocked.Increment(ref _consecutiveDirectDownloadFailures);
|
|
|
|
if (expectedDirectDownloadFailure)
|
|
Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash);
|
|
else
|
|
Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash);
|
|
|
|
try
|
|
{
|
|
await ProcessDirectAsQueuedFallbackAsync(session, directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale, skipDecimation, decompressionTasks, decompressionLimiter).ConfigureAwait(false);
|
|
|
|
if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads)
|
|
{
|
|
_disableDirectDownloads = true;
|
|
Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount);
|
|
}
|
|
}
|
|
catch (Exception fallbackEx)
|
|
{
|
|
Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash);
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
ClearDownload(session);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
if (!decompressionQueued)
|
|
{
|
|
try { File.Delete(tempFilename); }
|
|
catch
|
|
{
|
|
// ignore
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ProcessDirectAsQueuedFallbackAsync(
|
|
DownloadSession session,
|
|
DownloadFileTransfer directDownload,
|
|
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
|
|
IReadOnlyDictionary<string, long> rawSizeLookup,
|
|
IProgress<long> progress,
|
|
CancellationToken ct,
|
|
bool skipDownscale,
|
|
bool skipDecimation,
|
|
ConcurrentBag<Task> decompressionTasks,
|
|
SemaphoreSlim decompressionLimiter)
|
|
{
|
|
if (string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
|
|
throw new InvalidOperationException("Direct download fallback requested without a direct download URL.");
|
|
|
|
var statusKey = directDownload.DirectDownloadUrl!;
|
|
|
|
SetStatus(session, statusKey, DownloadStatus.WaitingForQueue);
|
|
|
|
var requestIdResponse = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Post,
|
|
LightlessFiles.RequestEnqueueFullPath(directDownload.DownloadUri),
|
|
new[] { directDownload.Hash },
|
|
ct).ConfigureAwait(false);
|
|
|
|
var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync(ct).ConfigureAwait(false)).Trim('"'));
|
|
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
|
|
var fi = new FileInfo(blockFile);
|
|
var decompressionQueued = false;
|
|
|
|
try
|
|
{
|
|
await DownloadQueuedBlockFileAsync(session, statusKey, requestId, [directDownload], blockFile, progress, ct).ConfigureAwait(false);
|
|
|
|
if (!File.Exists(blockFile))
|
|
{
|
|
Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name);
|
|
SetStatus(session, statusKey, DownloadStatus.Completed);
|
|
return;
|
|
}
|
|
|
|
SetStatus(session, statusKey, DownloadStatus.Decompressing);
|
|
EnqueueLimitedTask(
|
|
decompressionTasks,
|
|
decompressionLimiter,
|
|
async token =>
|
|
{
|
|
try
|
|
{
|
|
await DecompressBlockFileAsync(session, statusKey, blockFile, replacementLookup, rawSizeLookup, $"fallback-{directDownload.Hash}", token, skipDownscale, skipDecimation)
|
|
.ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
try { File.Delete(blockFile); } catch {}
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
}
|
|
},
|
|
ct);
|
|
decompressionQueued = true;
|
|
}
|
|
finally
|
|
{
|
|
if (!decompressionQueued)
|
|
{
|
|
try { File.Delete(blockFile); } catch {}
|
|
CompleteOwnedDownload(session, directDownload.Hash, false);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)
|
|
{
|
|
if (!_orchestrator.IsInitialized)
|
|
throw new InvalidOperationException("FileTransferManager is not initialized");
|
|
|
|
var response = await _orchestrator.SendRequestAsync(
|
|
HttpMethod.Get,
|
|
LightlessFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!),
|
|
hashes,
|
|
ct).ConfigureAwait(false);
|
|
|
|
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
|
|
}
|
|
|
|
private bool PersistFileToStorage(DownloadSession session, string fileHash, string filePath, string gamePath, bool skipDownscale, bool skipDecimation)
|
|
{
|
|
var fi = new FileInfo(filePath);
|
|
var persisted = false;
|
|
|
|
Func<DateTime> RandomDayInThePast()
|
|
{
|
|
DateTime start = new(1995, 1, 1, 1, 1, 1, DateTimeKind.Local);
|
|
Random gen = new();
|
|
int range = (DateTime.Today - start).Days;
|
|
return () => start.AddDays(gen.Next(range));
|
|
}
|
|
|
|
fi.CreationTime = RandomDayInThePast().Invoke();
|
|
fi.LastAccessTime = DateTime.Today;
|
|
fi.LastWriteTime = RandomDayInThePast().Invoke();
|
|
|
|
try
|
|
{
|
|
var entry = _fileDbManager.CreateCacheEntryWithKnownHash(filePath, fileHash);
|
|
if (entry != null && string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
persisted = true;
|
|
}
|
|
|
|
if (!skipDownscale && _textureDownscaleService.ShouldScheduleDownscale(filePath))
|
|
{
|
|
_textureDownscaleService.ScheduleDownscale(
|
|
fileHash,
|
|
filePath,
|
|
() => _textureMetadataHelper.DetermineMapKind(gamePath, filePath));
|
|
}
|
|
|
|
if (!skipDecimation && _modelDecimationService.ShouldScheduleDecimation(fileHash, filePath, gamePath))
|
|
{
|
|
_modelDecimationService.ScheduleDecimation(fileHash, filePath, gamePath);
|
|
}
|
|
|
|
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file",
|
|
entry.Hash, fileHash);
|
|
|
|
File.Delete(filePath);
|
|
_fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath);
|
|
persisted = false;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogWarning(ex, "Error creating cache entry");
|
|
}
|
|
finally
|
|
{
|
|
CompleteOwnedDownload(session, fileHash, persisted);
|
|
}
|
|
|
|
return persisted;
|
|
}
|
|
|
|
private static int CalculateDecompressionLimit(int downloadSlots)
|
|
{
|
|
var cpuBound = Math.Max(1, Math.Min(Environment.ProcessorCount, 4));
|
|
return Math.Clamp(downloadSlots, 1, cpuBound);
|
|
}
|
|
|
|
private static Task EnqueueLimitedTask(
|
|
ConcurrentBag<Task> tasks,
|
|
SemaphoreSlim limiter,
|
|
Func<CancellationToken, Task> work,
|
|
CancellationToken ct)
|
|
{
|
|
var task = Task.Run(async () =>
|
|
{
|
|
await limiter.WaitAsync(ct).ConfigureAwait(false);
|
|
try
|
|
{
|
|
await work(ct).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
limiter.Release();
|
|
}
|
|
}, ct);
|
|
|
|
tasks.Add(task);
|
|
return task;
|
|
}
|
|
|
|
private static async Task WaitForAllTasksAsync(ConcurrentBag<Task> tasks)
|
|
{
|
|
while (true)
|
|
{
|
|
var snapshot = tasks.ToArray();
|
|
if (snapshot.Length == 0)
|
|
return;
|
|
|
|
await Task.WhenAll(snapshot).ConfigureAwait(false);
|
|
|
|
if (tasks.Count == snapshot.Length)
|
|
return;
|
|
}
|
|
}
|
|
|
|
private static IProgress<long> CreateInlineProgress(Action<long> callback) => new InlineProgress(callback);
|
|
|
|
private sealed class InlineProgress : IProgress<long>
|
|
{
|
|
private readonly Action<long> _callback;
|
|
public InlineProgress(Action<long> callback) => _callback = callback ?? throw new ArgumentNullException(nameof(callback));
|
|
public void Report(long value) => _callback(value);
|
|
}
|
|
}
|