Added decompression gate to decompress files
This commit is contained in:
@@ -28,6 +28,8 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
private readonly TextureMetadataHelper _textureMetadataHelper;
|
private readonly TextureMetadataHelper _textureMetadataHelper;
|
||||||
|
|
||||||
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
|
private readonly ConcurrentDictionary<ThrottledStream, byte> _activeDownloadStreams;
|
||||||
|
private readonly SemaphoreSlim _decompressGate =
|
||||||
|
new(Math.Max(1, Environment.ProcessorCount / 2), Math.Max(1, Environment.ProcessorCount / 2));
|
||||||
|
|
||||||
private volatile bool _disableDirectDownloads;
|
private volatile bool _disableDirectDownloads;
|
||||||
private int _consecutiveDirectDownloadFailures;
|
private int _consecutiveDirectDownloadFailures;
|
||||||
@@ -522,32 +524,57 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
// sanity check length
|
||||||
if (fileLengthBytes < 0 || fileLengthBytes > int.MaxValue)
|
if (fileLengthBytes < 0 || fileLengthBytes > int.MaxValue)
|
||||||
throw new InvalidDataException($"Invalid block entry length: {fileLengthBytes}");
|
throw new InvalidDataException($"Invalid block entry length: {fileLengthBytes}");
|
||||||
|
|
||||||
|
// safe cast after check
|
||||||
|
var len = checked((int)fileLengthBytes);
|
||||||
|
|
||||||
if (!replacementLookup.TryGetValue(fileHash, out var repl))
|
if (!replacementLookup.TryGetValue(fileHash, out var repl))
|
||||||
{
|
{
|
||||||
Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash);
|
Logger.LogWarning("{dlName}: No replacement mapping for {fileHash}", downloadLabel, fileHash);
|
||||||
// still need to skip bytes:
|
fileBlockStream.Seek(len, SeekOrigin.Current);
|
||||||
var skip = checked((int)fileLengthBytes);
|
|
||||||
fileBlockStream.Position += skip;
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decompress
|
||||||
var filePath = _fileDbManager.GetCacheFilePath(fileHash, repl.Extension);
|
var filePath = _fileDbManager.GetCacheFilePath(fileHash, repl.Extension);
|
||||||
|
Logger.LogTrace("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath);
|
||||||
|
|
||||||
Logger.LogDebug("{dlName}: Decompressing {file}:{len} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath);
|
// read compressed data
|
||||||
|
|
||||||
var len = checked((int)fileLengthBytes);
|
|
||||||
var compressed = new byte[len];
|
var compressed = new byte[len];
|
||||||
|
|
||||||
await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false);
|
await ReadExactlyAsync(fileBlockStream, compressed.AsMemory(0, len), ct).ConfigureAwait(false);
|
||||||
|
|
||||||
MungeBuffer(compressed);
|
if (len == 0)
|
||||||
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
{
|
||||||
|
await _fileCompactor.WriteAllBytesAsync(filePath, Array.Empty<byte>(), ct).ConfigureAwait(false);
|
||||||
|
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
|
MungeBuffer(compressed);
|
||||||
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
|
||||||
|
// limit concurrent decompressions
|
||||||
|
await _decompressGate.WaitAsync(ct).ConfigureAwait(false);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
||||||
|
|
||||||
|
// decompress
|
||||||
|
var decompressed = LZ4Wrapper.Unwrap(compressed);
|
||||||
|
|
||||||
|
Logger.LogTrace("{dlName}: Unwrap {fileHash} took {ms}ms (compressed {c} bytes, decompressed {d} bytes)",
|
||||||
|
downloadLabel, fileHash, sw.ElapsedMilliseconds, compressed.Length, decompressed?.Length ?? -1);
|
||||||
|
|
||||||
|
// write to file
|
||||||
|
await _fileCompactor.WriteAllBytesAsync(filePath, decompressed, ct).ConfigureAwait(false);
|
||||||
|
PersistFileToStorage(fileHash, filePath, repl.GamePath, skipDownscale);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_decompressGate.Release();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (EndOfStreamException)
|
catch (EndOfStreamException)
|
||||||
{
|
{
|
||||||
@@ -605,20 +632,16 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|||||||
.. await FilesGetSizes(hashes, ct).ConfigureAwait(false),
|
.. await FilesGetSizes(hashes, ct).ConfigureAwait(false),
|
||||||
];
|
];
|
||||||
|
|
||||||
Logger.LogDebug("Files with size 0 or less: {files}",
|
|
||||||
string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
|
|
||||||
|
|
||||||
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
||||||
{
|
{
|
||||||
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
||||||
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
||||||
}
|
}
|
||||||
|
|
||||||
CurrentDownloads = downloadFileInfoFromService
|
CurrentDownloads = [.. downloadFileInfoFromService
|
||||||
.Distinct()
|
.Distinct()
|
||||||
.Select(d => new DownloadFileTransfer(d))
|
.Select(d => new DownloadFileTransfer(d))
|
||||||
.Where(d => d.CanBeTransferred)
|
.Where(d => d.CanBeTransferred)];
|
||||||
.ToList();
|
|
||||||
|
|
||||||
return CurrentDownloads;
|
return CurrentDownloads;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user