test-abel-cake-changes

This commit is contained in:
cake
2026-01-03 22:45:55 +01:00
57 changed files with 11420 additions and 357 deletions

View File

@@ -6,6 +6,7 @@ using LightlessSync.FileCache;
using LightlessSync.LightlessConfiguration;
using LightlessSync.PlayerData.Handlers;
using LightlessSync.Services.Mediator;
using LightlessSync.Services.ModelDecimation;
using LightlessSync.Services.TextureCompression;
using LightlessSync.WebAPI.Files.Models;
using Microsoft.Extensions.Logging;
@@ -17,14 +18,14 @@ namespace LightlessSync.WebAPI.Files;
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
{
private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
private readonly object _downloadStatusLock = new();
private readonly ConcurrentDictionary<string, FileDownloadStatus> _downloadStatus;
private readonly FileCompactor _fileCompactor;
private readonly FileCacheManager _fileDbManager;
private readonly FileTransferOrchestrator _orchestrator;
private readonly LightlessConfigService _configService;
private readonly TextureDownscaleService _textureDownscaleService;
private readonly ModelDecimationService _modelDecimationService;
private readonly TextureMetadataHelper _textureMetadataHelper;
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
@@ -45,14 +46,16 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
FileCompactor fileCompactor,
LightlessConfigService configService,
TextureDownscaleService textureDownscaleService,
ModelDecimationService modelDecimationService,
TextureMetadataHelper textureMetadataHelper) : base(logger, mediator)
{
_downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
_downloadStatus = new ConcurrentDictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
_orchestrator = orchestrator;
_fileDbManager = fileCacheManager;
_fileCompactor = fileCompactor;
_configService = configService;
_textureDownscaleService = textureDownscaleService;
_modelDecimationService = modelDecimationService;
_textureMetadataHelper = textureMetadataHelper;
_activeDownloadStreams = new();
_lastConfigDirectDownloadsState = _configService.Current.EnableDirectDownloads;
@@ -86,10 +89,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
public void ClearDownload()
{
CurrentDownloads.Clear();
lock (_downloadStatusLock)
{
_downloadStatus.Clear();
}
_downloadStatus.Clear();
CurrentOwnerToken = null;
}
@@ -156,29 +156,20 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
private void SetStatus(string key, DownloadStatus status)
{
lock (_downloadStatusLock)
{
if (_downloadStatus.TryGetValue(key, out var st))
st.DownloadStatus = status;
}
if (_downloadStatus.TryGetValue(key, out var st))
st.DownloadStatus = status;
}
private void AddTransferredBytes(string key, long delta)
{
lock (_downloadStatusLock)
{
if (_downloadStatus.TryGetValue(key, out var st))
st.TransferredBytes += delta;
}
if (_downloadStatus.TryGetValue(key, out var st))
st.AddTransferredBytes(delta);
}
private void MarkTransferredFiles(string key, int files)
{
lock (_downloadStatusLock)
{
if (_downloadStatus.TryGetValue(key, out var st))
st.TransferredFiles = files;
}
if (_downloadStatus.TryGetValue(key, out var st))
st.SetTransferredFiles(files);
}
private static byte MungeByte(int byteOrEof)
@@ -460,18 +451,11 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
}
}
private void RemoveStatus(string key)
{
lock (_downloadStatusLock)
{
_downloadStatus.Remove(key);
}
}
private async Task DecompressBlockFileAsync(
string downloadStatusKey,
string blockFilePath,
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
IReadOnlyDictionary<string, long> rawSizeLookup,
string downloadLabel,
CancellationToken ct,
bool skipDownscale)
@@ -498,7 +482,9 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
if (!replacementLookup.TryGetValue(fileHash, out var repl))
{
Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash);
fileBlockStream.Seek(len, SeekOrigin.Current);
// still need to skip bytes:
var skip = checked((int)fileLengthBytes);
fileBlockStream.Position += skip;
continue;
}
@@ -506,11 +492,17 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogTrace("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath);
var compressed = new byte[len];
await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false);
if (len == 0)
MungeBuffer(compressed);
var decompressed = LZ4Wrapper.Unwrap(compressed);
if (rawSizeLookup.TryGetValue(fileHash, out var expectedRawSize)
&& expectedRawSize > 0
&& decompressed.LongLength != expectedRawSize)
{
await File.WriteAllBytesAsync(filePath, Array.Empty<byte>(), ct).ConfigureAwait(false);
await _fileCompactor.WriteAllBytesAsync(filePath, Array.Empty<byte>(), ct).ConfigureAwait(false);
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
continue;
}
@@ -551,6 +543,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
}
}
}
SetStatus(downloadStatusKey, DownloadStatus.Completed);
}
catch (EndOfStreamException)
{
@@ -560,10 +554,6 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
{
Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel);
}
finally
{
RemoveStatus(downloadStatusKey);
}
}
public async Task<List<DownloadFileTransfer>> InitiateDownloadList(
@@ -601,21 +591,25 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
.. await FilesGetSizes(hashes, ct).ConfigureAwait(false),
];
Logger.LogDebug("Files with size 0 or less: {files}",
string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
{
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
}
CurrentDownloads = [.. downloadFileInfoFromService
CurrentDownloads = downloadFileInfoFromService
.Distinct()
.Select(d => new DownloadFileTransfer(d))
.Where(d => d.CanBeTransferred)];
.Where(d => d.CanBeTransferred)
.ToList();
return CurrentDownloads;
}
private sealed record BatchChunk(string Key, List<DownloadFileTransfer> Items);
private sealed record BatchChunk(string HostKey, string StatusKey, List<DownloadFileTransfer> Items);
private static IEnumerable<List<T>> ChunkList<T>(List<T> items, int chunkSize)
{
@@ -641,6 +635,20 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
var allowDirectDownloads = ShouldUseDirectDownloads();
var replacementLookup = BuildReplacementLookup(fileReplacement);
var rawSizeLookup = new Dictionary<string, long>(StringComparer.OrdinalIgnoreCase);
foreach (var download in CurrentDownloads)
{
if (string.IsNullOrWhiteSpace(download.Hash))
{
continue;
}
if (!rawSizeLookup.TryGetValue(download.Hash, out var existing) || existing <= 0)
{
rawSizeLookup[download.Hash] = download.TotalRaw;
}
}
var directDownloads = new List<DownloadFileTransfer>();
var batchDownloads = new List<DownloadFileTransfer>();
@@ -665,39 +673,36 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
var chunkSize = (int)Math.Ceiling(list.Count / (double)chunkCount);
return ChunkList(list, chunkSize)
.Select(chunk => new BatchChunk(g.Key, chunk));
.Select((chunk, index) => new BatchChunk(g.Key, $"{g.Key}#{index + 1}", chunk));
})
.ToArray();
// init statuses
lock (_downloadStatusLock)
_downloadStatus.Clear();
// direct downloads and batch downloads tracked separately
foreach (var d in directDownloads)
{
_downloadStatus.Clear();
// direct downloads and batch downloads tracked separately
foreach (var d in directDownloads)
_downloadStatus[d.DirectDownloadUrl!] = new FileDownloadStatus
{
_downloadStatus[d.DirectDownloadUrl!] = new FileDownloadStatus
{
DownloadStatus = DownloadStatus.Initializing,
TotalBytes = d.Total,
TotalFiles = 1,
TransferredBytes = 0,
TransferredFiles = 0
};
}
DownloadStatus = DownloadStatus.WaitingForSlot,
TotalBytes = d.Total,
TotalFiles = 1,
TransferredBytes = 0,
TransferredFiles = 0
};
}
foreach (var g in batchChunks.GroupBy(c => c.Key, StringComparer.Ordinal))
foreach (var chunk in batchChunks)
{
_downloadStatus[chunk.StatusKey] = new FileDownloadStatus
{
_downloadStatus[g.Key] = new FileDownloadStatus
{
DownloadStatus = DownloadStatus.Initializing,
TotalBytes = g.SelectMany(x => x.Items).Sum(x => x.Total),
TotalFiles = 1,
TransferredBytes = 0,
TransferredFiles = 0
};
}
DownloadStatus = DownloadStatus.WaitingForQueue,
TotalBytes = chunk.Items.Sum(x => x.Total),
TotalFiles = 1,
TransferredBytes = 0,
TransferredFiles = 0
};
}
if (directDownloads.Count > 0 || batchChunks.Length > 0)
@@ -724,13 +729,13 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Task batchTask = batchChunks.Length == 0
? Task.CompletedTask
: Parallel.ForEachAsync(batchChunks, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct },
async (chunk, token) => await ProcessBatchChunkAsync(chunk, replacementLookup, token, skipDownscale).ConfigureAwait(false));
async (chunk, token) => await ProcessBatchChunkAsync(chunk, replacementLookup, rawSizeLookup, token, skipDownscale).ConfigureAwait(false));
// direct downloads
Task directTask = directDownloads.Count == 0
? Task.CompletedTask
: Parallel.ForEachAsync(directDownloads, new ParallelOptions { MaxDegreeOfParallelism = workerDop, CancellationToken = ct },
async (d, token) => await ProcessDirectAsync(d, replacementLookup, token, skipDownscale).ConfigureAwait(false));
async (d, token) => await ProcessDirectAsync(d, replacementLookup, rawSizeLookup, token, skipDownscale).ConfigureAwait(false));
await Task.WhenAll(batchTask, directTask).ConfigureAwait(false);
@@ -741,9 +746,14 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
ClearDownload();
}
private async Task ProcessBatchChunkAsync(BatchChunk chunk, Dictionary<string, (string Extension, string GamePath)> replacementLookup, CancellationToken ct, bool skipDownscale)
private async Task ProcessBatchChunkAsync(
BatchChunk chunk,
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
IReadOnlyDictionary<string, long> rawSizeLookup,
CancellationToken ct,
bool skipDownscale)
{
var statusKey = chunk.Key;
var statusKey = chunk.StatusKey;
// enqueue (no slot)
SetStatus(statusKey, DownloadStatus.WaitingForQueue);
@@ -770,10 +780,11 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
if (!File.Exists(blockFile))
{
Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name);
SetStatus(statusKey, DownloadStatus.Completed);
return;
}
await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, fi.Name, ct, skipDownscale).ConfigureAwait(false);
await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, rawSizeLookup, fi.Name, ct, skipDownscale).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@@ -790,7 +801,12 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
}
}
private async Task ProcessDirectAsync(DownloadFileTransfer directDownload, Dictionary<string, (string Extension, string GamePath)> replacementLookup, CancellationToken ct, bool skipDownscale)
private async Task ProcessDirectAsync(
DownloadFileTransfer directDownload,
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
IReadOnlyDictionary<string, long> rawSizeLookup,
CancellationToken ct,
bool skipDownscale)
{
var progress = CreateInlineProgress(bytes =>
{
@@ -800,7 +816,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
if (!ShouldUseDirectDownloads() || string.IsNullOrEmpty(directDownload.DirectDownloadUrl))
{
await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, progress, ct, skipDownscale).ConfigureAwait(false);
await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale).ConfigureAwait(false);
return;
}
@@ -828,6 +844,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
if (!replacementLookup.TryGetValue(directDownload.Hash, out var repl))
{
Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash);
SetStatus(directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
return;
}
@@ -840,13 +857,18 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename, ct).ConfigureAwait(false);
var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes);
await File.WriteAllBytesAsync(finalFilename, decompressedBytes, ct).ConfigureAwait(false);
if (directDownload.TotalRaw > 0 && decompressedBytes.LongLength != directDownload.TotalRaw)
{
throw new InvalidDataException(
$"{directDownload.Hash}: Decompressed size mismatch (expected {directDownload.TotalRaw}, got {decompressedBytes.LongLength})");
}
await _fileCompactor.WriteAllBytesAsync(finalFilename, decompressedBytes, ct).ConfigureAwait(false);
PersistFileToStorage(directDownload.Hash, finalFilename, repl.GamePath, skipDownscale);
MarkTransferredFiles(directDownload.DirectDownloadUrl!, 1);
SetStatus(directDownload.DirectDownloadUrl!, DownloadStatus.Completed);
Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash);
RemoveStatus(directDownload.DirectDownloadUrl!);
}
catch (OperationCanceledException ex)
{
@@ -869,7 +891,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
try
{
await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, progress, ct, skipDownscale).ConfigureAwait(false);
await ProcessDirectAsQueuedFallbackAsync(directDownload, replacementLookup, rawSizeLookup, progress, ct, skipDownscale).ConfigureAwait(false);
if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads)
{
@@ -896,6 +918,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
private async Task ProcessDirectAsQueuedFallbackAsync(
DownloadFileTransfer directDownload,
Dictionary<string, (string Extension, string GamePath)> replacementLookup,
IReadOnlyDictionary<string, long> rawSizeLookup,
IProgress<long> progress,
CancellationToken ct,
bool skipDownscale)
@@ -923,7 +946,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
if (!File.Exists(blockFile))
throw new FileNotFoundException("Block file missing after direct download fallback.", blockFile);
await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, $"fallback-{directDownload.Hash}", ct, skipDownscale)
await DecompressBlockFileAsync(statusKey, blockFile, replacementLookup, rawSizeLookup, $"fallback-{directDownload.Hash}", ct, skipDownscale)
.ConfigureAwait(false);
}
finally
@@ -972,11 +995,20 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
try
{
var entry = _fileDbManager.CreateCacheEntry(filePath);
var mapKind = _textureMetadataHelper.DetermineMapKind(gamePath, filePath);
var entry = _fileDbManager.CreateCacheEntryWithKnownHash(filePath, fileHash);
if (!skipDownscale)
_textureDownscaleService.ScheduleDownscale(fileHash, filePath, mapKind);
if (!skipDownscale && _textureDownscaleService.ShouldScheduleDownscale(filePath))
{
_textureDownscaleService.ScheduleDownscale(
fileHash,
filePath,
() => _textureMetadataHelper.DetermineMapKind(gamePath, filePath));
}
if (!skipDownscale && _modelDecimationService.ShouldScheduleDecimation(fileHash, filePath, gamePath))
{
_modelDecimationService.ScheduleDecimation(fileHash, filePath, gamePath);
}
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
{

View File

@@ -6,5 +6,6 @@ public enum DownloadStatus
WaitingForSlot,
WaitingForQueue,
Downloading,
Decompressing
Decompressing,
Completed
}

View File

@@ -1,10 +1,46 @@
namespace LightlessSync.WebAPI.Files.Models;
using System.Threading;
namespace LightlessSync.WebAPI.Files.Models;
public class FileDownloadStatus
{
public DownloadStatus DownloadStatus { get; set; }
public long TotalBytes { get; set; }
public int TotalFiles { get; set; }
public long TransferredBytes { get; set; }
public int TransferredFiles { get; set; }
}
private int _downloadStatus;
private long _totalBytes;
private int _totalFiles;
private long _transferredBytes;
private int _transferredFiles;
public DownloadStatus DownloadStatus
{
get => (DownloadStatus)Volatile.Read(ref _downloadStatus);
set => Volatile.Write(ref _downloadStatus, (int)value);
}
public long TotalBytes
{
get => Interlocked.Read(ref _totalBytes);
set => Interlocked.Exchange(ref _totalBytes, value);
}
public int TotalFiles
{
get => Volatile.Read(ref _totalFiles);
set => Volatile.Write(ref _totalFiles, value);
}
public long TransferredBytes
{
get => Interlocked.Read(ref _transferredBytes);
set => Interlocked.Exchange(ref _transferredBytes, value);
}
public int TransferredFiles
{
get => Volatile.Read(ref _transferredFiles);
set => Volatile.Write(ref _transferredFiles, value);
}
public void AddTransferredBytes(long delta) => Interlocked.Add(ref _transferredBytes, delta);
public void SetTransferredFiles(int files) => Volatile.Write(ref _transferredFiles, files);
}