Added seperate collections for other states, moved clean up of penumbra collection out of config. Safe read of ptr on process, fixed notfications on popup and notifications with flags.

This commit is contained in:
cake
2026-01-13 17:45:32 +01:00
parent 4502cadaeb
commit 73dee6d9a5
22 changed files with 1528 additions and 753 deletions

View File

@@ -436,11 +436,9 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}",
requestId, transfers[0].DownloadUri, string.Join(", ", transfers.Select(c => c.Hash)));
// Wait for ready WITHOUT holding a slot
SetStatus(statusKey, DownloadStatus.WaitingForQueue);
await WaitForDownloadReady(transfers, requestId, ct).ConfigureAwait(false);
// Hold slot ONLY for the GET
SetStatus(statusKey, DownloadStatus.WaitingForSlot);
await using ((await AcquireSlotAsync(ct).ConfigureAwait(false)).ConfigureAwait(false))
{
@@ -462,7 +460,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
bool skipDecimation)
{
SetStatus(downloadStatusKey, DownloadStatus.Decompressing);
MarkTransferredFiles(downloadStatusKey, 1);
var extracted = 0;
try
{
@@ -471,6 +470,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
{
while (fileBlockStream.Position < fileBlockStream.Length)
{
ct.ThrowIfCancellationRequested();
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
try
@@ -480,72 +481,69 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
var len = checked((int)fileLengthBytes);
if (fileBlockStream.Position + len > fileBlockStream.Length)
throw new EndOfStreamException();
if (!replacementLookup.TryGetValue(fileHash, out var repl))
{
Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash);
// still need to skip bytes:
var skip = checked((int)fileLengthBytes);
fileBlockStream.Position += skip;
Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}, skipping {len} bytes",
downloadLabel, fileHash, len);
fileBlockStream.Seek(len, SeekOrigin.Current);
continue;
}
var filePath = _fileDbManager.GetCacheFilePath(fileHash, repl.Extension);
Logger.LogTrace("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath);
Logger.LogTrace("{dlName}: Extracting {fileHash}:{len} => {dest}",
downloadLabel, fileHash, len, filePath);
var compressed = new byte[len];
await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false);
MungeBuffer(compressed);
var decompressed = LZ4Wrapper.Unwrap(compressed);
if (rawSizeLookup.TryGetValue(fileHash, out var expectedRawSize)
&& expectedRawSize > 0
&& decompressed.LongLength != expectedRawSize)
{
await _fileCompactor.WriteAllBytesAsync(filePath, Array.Empty<byte>(), ct).ConfigureAwait(false);
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale, skipDecimation);
continue;
}
MungeBuffer(compressed);
await _decompressGate.WaitAsync(ct).ConfigureAwait(false);
byte[] decompressed;
try
{
// offload CPU-intensive decompression to threadpool to free up worker
await Task.Run(async () =>
{
var sw = System.Diagnostics.Stopwatch.StartNew();
// decompress
var decompressed = LZ4Wrapper.Unwrap(compressed);
Logger.LogTrace("{dlName}: Unwrap {fileHash} took {ms}ms (compressed {c} bytes, decompressed {d} bytes)",
downloadLabel, fileHash, sw.ElapsedMilliseconds, compressed.Length, decompressed?.Length ?? -1);
// write to file without compacting during download
await File.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale, skipDecimation);
}, ct).ConfigureAwait(false);
decompressed = await Task.Run(() => LZ4Wrapper.Unwrap(compressed), ct).ConfigureAwait(false);
}
finally
{
_decompressGate.Release();
}
if (rawSizeLookup.TryGetValue(fileHash, out var expectedRawSize)
&& expectedRawSize > 0
&& decompressed.LongLength != expectedRawSize)
{
Logger.LogWarning(
"{dlName}: Size mismatch for {fileHash} (expected {expected}, got {actual}). Treating as corrupt.",
downloadLabel, fileHash, expectedRawSize, decompressed.LongLength);
try { if (File.Exists(filePath)) File.Delete(filePath); } catch { /* ignore */ }
continue;
}
await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct, enqueueCompaction: false).ConfigureAwait(false);
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale, skipDecimation);
extracted++;
MarkTransferredFiles(downloadStatusKey, extracted);
}
catch (EndOfStreamException)
{
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", downloadLabel, fileHash);
Logger.LogWarning("{dlName}: Block ended mid-entry while extracting {fileHash}", downloadLabel, fileHash);
break;
}
catch (Exception e)
catch (Exception ex)
{
Logger.LogWarning(e, "{dlName}: Error during decompression", downloadLabel);
Logger.LogWarning(ex, "{dlName}: Error extracting {fileHash} from block", downloadLabel, fileHash);
}
}
}
SetStatus(downloadStatusKey, DownloadStatus.Completed);
SetStatus(downloadStatusKey, DownloadStatus.Completed);
}
}
catch (EndOfStreamException)
{
@@ -601,11 +599,10 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
}
CurrentDownloads = downloadFileInfoFromService
CurrentDownloads = [.. downloadFileInfoFromService
.Distinct()
.Select(d => new DownloadFileTransfer(d))
.Where(d => d.CanBeTransferred)
.ToList();
.Where(d => d.CanBeTransferred)];
return CurrentDownloads;
}
@@ -1033,48 +1030,58 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
private async Task ProcessDeferredCompressionsAsync(CancellationToken ct)
{
if (_deferredCompressionQueue.IsEmpty)
if (_deferredCompressionQueue.IsEmpty || !_configService.Current.UseCompactor)
return;
var filesToCompress = new List<string>();
// Drain queue into a unique set (same file can be enqueued multiple times)
var filesToCompact = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
while (_deferredCompressionQueue.TryDequeue(out var filePath))
{
if (File.Exists(filePath))
filesToCompress.Add(filePath);
if (!string.IsNullOrWhiteSpace(filePath))
filesToCompact.Add(filePath);
}
if (filesToCompress.Count == 0)
if (filesToCompact.Count == 0)
return;
Logger.LogDebug("Starting deferred compression of {count} files", filesToCompress.Count);
Logger.LogDebug("Starting deferred compaction of {count} files", filesToCompact.Count);
var compressionWorkers = Math.Clamp(Environment.ProcessorCount / 4, 2, 4);
var enqueueWorkers = Math.Clamp(Environment.ProcessorCount / 4, 1, 2);
await Parallel.ForEachAsync(filesToCompress,
new ParallelOptions
{
MaxDegreeOfParallelism = compressionWorkers,
CancellationToken = ct
await Parallel.ForEachAsync(
filesToCompact,
new ParallelOptions
{
MaxDegreeOfParallelism = enqueueWorkers,
CancellationToken = ct
},
async (filePath, token) =>
{
try
{
try
{
token.ThrowIfCancellationRequested();
if (!File.Exists(filePath))
return;
await Task.Yield();
if (_configService.Current.UseCompactor && File.Exists(filePath))
{
var bytes = await File.ReadAllBytesAsync(filePath, token).ConfigureAwait(false);
await _fileCompactor.WriteAllBytesAsync(filePath, bytes, token).ConfigureAwait(false);
Logger.LogTrace("Compressed file: {filePath}", filePath);
}
_fileCompactor.RequestCompaction(filePath);
Logger.LogTrace("Deferred compaction queued: {filePath}", filePath);
}
catch (OperationCanceledException)
{
Logger.LogTrace("Deferred compaction cancelled for file: {filePath}", filePath);
throw;
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Failed to compress file: {filePath}", filePath);
Logger.LogWarning(ex, "Failed to queue deferred compaction for file: {filePath}", filePath);
}
}).ConfigureAwait(false);
Logger.LogDebug("Completed deferred compression of {count} files", filesToCompress.Count);
Logger.LogDebug("Completed queuing deferred compaction of {count} files", filesToCompact.Count);
}
private sealed class InlineProgress : IProgress<long>