diff --git a/LightlessAPI b/LightlessAPI index 44fbe10..0bc7abb 160000 --- a/LightlessAPI +++ b/LightlessAPI @@ -1 +1 @@ -Subproject commit 44fbe1045872fcae4df45e43625a9ff1a79bc2ef +Subproject commit 0bc7abb274548bcde36c65ef1cf9f1a143d6492c diff --git a/LightlessSync/FileCache/FileCacheManager.cs b/LightlessSync/FileCache/FileCacheManager.cs index ed57656..972c4d9 100644 --- a/LightlessSync/FileCache/FileCacheManager.cs +++ b/LightlessSync/FileCache/FileCacheManager.cs @@ -27,6 +27,7 @@ public sealed class FileCacheManager : IHostedService private readonly Lock _fileWriteLock = new(); private readonly IpcManager _ipcManager; private readonly ILogger _logger; + private bool _csvHeaderEnsured; public string CacheFolder => _configService.Current.CacheFolder; public FileCacheManager(ILogger logger, IpcManager ipcManager, LightlessConfigService configService, LightlessMediator lightlessMediator) @@ -462,6 +463,7 @@ public sealed class FileCacheManager : IHostedService string[] existingLines = File.ReadAllLines(_csvPath); if (existingLines.Length > 0 && TryParseVersionHeader(existingLines[0], out var existingVersion) && existingVersion == FileCacheVersion) { + _csvHeaderEnsured = true; return; } @@ -481,6 +483,18 @@ public sealed class FileCacheManager : IHostedService } File.WriteAllText(_csvPath, rebuilt.ToString()); + _csvHeaderEnsured = true; + } + + private void EnsureCsvHeaderLockedCached() + { + if (_csvHeaderEnsured) + { + return; + } + + EnsureCsvHeaderLocked(); + _csvHeaderEnsured = true; } private void BackupUnsupportedCache(string suffix) @@ -540,10 +554,11 @@ public sealed class FileCacheManager : IHostedService if (!File.Exists(_csvPath)) { File.WriteAllLines(_csvPath, new[] { BuildVersionHeader(), entity.CsvEntry }); + _csvHeaderEnsured = true; } else { - EnsureCsvHeaderLocked(); + EnsureCsvHeaderLockedCached(); File.AppendAllLines(_csvPath, new[] { entity.CsvEntry }); } } diff --git a/LightlessSync/FileCache/FileCompactor.cs b/LightlessSync/FileCache/FileCompactor.cs index 737c1f0..1a35ad6 100644 --- a/LightlessSync/FileCache/FileCompactor.cs +++ b/LightlessSync/FileCache/FileCompactor.cs @@ -2,25 +2,33 @@ using LightlessSync.Services; using Microsoft.Extensions.Logging; using System.Runtime.InteropServices; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; namespace LightlessSync.FileCache; -public sealed class FileCompactor +public sealed class FileCompactor : IDisposable { public const uint FSCTL_DELETE_EXTERNAL_BACKING = 0x90314U; public const ulong WOF_PROVIDER_FILE = 2UL; private readonly Dictionary _clusterSizes; - + private readonly ConcurrentDictionary _pendingCompactions; private readonly WOF_FILE_COMPRESSION_INFO_V1 _efInfo; private readonly ILogger _logger; private readonly LightlessConfigService _lightlessConfigService; private readonly DalamudUtilService _dalamudUtilService; + private readonly Channel _compactionQueue; + private readonly CancellationTokenSource _compactionCts = new(); + private readonly Task _compactionWorker; public FileCompactor(ILogger logger, LightlessConfigService lightlessConfigService, DalamudUtilService dalamudUtilService) { _clusterSizes = new(StringComparer.Ordinal); + _pendingCompactions = new(StringComparer.OrdinalIgnoreCase); _logger = logger; _lightlessConfigService = lightlessConfigService; _dalamudUtilService = dalamudUtilService; @@ -29,6 +37,18 @@ public sealed class FileCompactor Algorithm = CompressionAlgorithm.XPRESS8K, Flags = 0 }; + + _compactionQueue = Channel.CreateUnbounded(new UnboundedChannelOptions + { + SingleReader = true, + SingleWriter = false + }); + _compactionWorker = Task.Factory.StartNew( + () => ProcessQueueAsync(_compactionCts.Token), + _compactionCts.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default) + .Unwrap(); } private enum CompressionAlgorithm @@ -87,7 +107,30 @@ public sealed class FileCompactor return; } - CompactFile(filePath); + EnqueueCompaction(filePath); + } + + public void Dispose() + { + _compactionQueue.Writer.TryComplete(); + _compactionCts.Cancel(); + try + { + if (!_compactionWorker.Wait(TimeSpan.FromSeconds(5))) + { + _logger.LogDebug("Compaction worker did not shut down within timeout"); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogDebug(ex, "Error shutting down compaction worker"); + } + finally + { + _compactionCts.Dispose(); + } + + GC.SuppressFinalize(this); } [DllImport("kernel32.dll")] @@ -226,4 +269,67 @@ public sealed class FileCompactor public CompressionAlgorithm Algorithm; public ulong Flags; } -} \ No newline at end of file + + private void EnqueueCompaction(string filePath) + { + if (!_pendingCompactions.TryAdd(filePath, 0)) + { + return; + } + + if (!_compactionQueue.Writer.TryWrite(filePath)) + { + _pendingCompactions.TryRemove(filePath, out _); + _logger.LogDebug("Failed to enqueue compaction job for {file}", filePath); + } + } + + private async Task ProcessQueueAsync(CancellationToken token) + { + try + { + while (await _compactionQueue.Reader.WaitToReadAsync(token).ConfigureAwait(false)) + { + while (_compactionQueue.Reader.TryRead(out var filePath)) + { + try + { + if (token.IsCancellationRequested) + { + return; + } + + if (_dalamudUtilService.IsWine || !_lightlessConfigService.Current.UseCompactor) + { + continue; + } + + if (!File.Exists(filePath)) + { + _logger.LogTrace("Skipping compaction for missing file {file}", filePath); + continue; + } + + CompactFile(filePath); + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Error compacting file {file}", filePath); + } + finally + { + _pendingCompactions.TryRemove(filePath, out _); + } + } + } + } + catch (OperationCanceledException) + { + // expected during shutdown + } + } +} diff --git a/LightlessSync/LightlessConfiguration/Configurations/LightlessConfig.cs b/LightlessSync/LightlessConfiguration/Configurations/LightlessConfig.cs index d66e956..f849c87 100644 --- a/LightlessSync/LightlessConfiguration/Configurations/LightlessConfig.cs +++ b/LightlessSync/LightlessConfiguration/Configurations/LightlessConfig.cs @@ -67,6 +67,7 @@ public class LightlessConfig : ILightlessConfiguration public bool ShowUploading { get; set; } = true; public bool ShowUploadingBigText { get; set; } = true; public bool ShowVisibleUsersSeparately { get; set; } = true; + public bool EnableDirectDownloads { get; set; } = true; public int TimeSpanBetweenScansInSeconds { get; set; } = 30; public int TransferBarsHeight { get; set; } = 12; public bool TransferBarsShowText { get; set; } = true; diff --git a/LightlessSync/PlayerData/Factories/FileDownloadManagerFactory.cs b/LightlessSync/PlayerData/Factories/FileDownloadManagerFactory.cs index eea3ea6..231ded3 100644 --- a/LightlessSync/PlayerData/Factories/FileDownloadManagerFactory.cs +++ b/LightlessSync/PlayerData/Factories/FileDownloadManagerFactory.cs @@ -1,4 +1,6 @@ -using LightlessSync.FileCache; +using LightlessSync.FileCache; +using LightlessSync.LightlessConfiguration; +using LightlessSync.Services; using LightlessSync.Services.Mediator; using LightlessSync.WebAPI.Files; using Microsoft.Extensions.Logging; @@ -10,21 +12,38 @@ public class FileDownloadManagerFactory private readonly FileCacheManager _fileCacheManager; private readonly FileCompactor _fileCompactor; private readonly FileTransferOrchestrator _fileTransferOrchestrator; + private readonly PairProcessingLimiter _pairProcessingLimiter; private readonly ILoggerFactory _loggerFactory; private readonly LightlessMediator _lightlessMediator; + private readonly LightlessConfigService _configService; - public FileDownloadManagerFactory(ILoggerFactory loggerFactory, LightlessMediator lightlessMediator, FileTransferOrchestrator fileTransferOrchestrator, - FileCacheManager fileCacheManager, FileCompactor fileCompactor) + public FileDownloadManagerFactory( + ILoggerFactory loggerFactory, + LightlessMediator lightlessMediator, + FileTransferOrchestrator fileTransferOrchestrator, + FileCacheManager fileCacheManager, + FileCompactor fileCompactor, + PairProcessingLimiter pairProcessingLimiter, + LightlessConfigService configService) { _loggerFactory = loggerFactory; _lightlessMediator = lightlessMediator; _fileTransferOrchestrator = fileTransferOrchestrator; _fileCacheManager = fileCacheManager; _fileCompactor = fileCompactor; + _pairProcessingLimiter = pairProcessingLimiter; + _configService = configService; } public FileDownloadManager Create() { - return new FileDownloadManager(_loggerFactory.CreateLogger(), _lightlessMediator, _fileTransferOrchestrator, _fileCacheManager, _fileCompactor); + return new FileDownloadManager( + _loggerFactory.CreateLogger(), + _lightlessMediator, + _fileTransferOrchestrator, + _fileCacheManager, + _fileCompactor, + _pairProcessingLimiter, + _configService); } -} \ No newline at end of file +} diff --git a/LightlessSync/Services/NameplateHandler.cs b/LightlessSync/Services/NameplateHandler.cs index dc761bb..a28be5f 100644 --- a/LightlessSync/Services/NameplateHandler.cs +++ b/LightlessSync/Services/NameplateHandler.cs @@ -208,7 +208,13 @@ public unsafe class NameplateHandler : IMediatorSubscriber for (int i = 0; i < ui3DModule->NamePlateObjectInfoCount; ++i) { - var objectInfo = ui3DModule->NamePlateObjectInfoPointers[i].Value; + if (ui3DModule->NamePlateObjectInfoPointers.IsEmpty) continue; + + var objectInfoPtr = ui3DModule->NamePlateObjectInfoPointers[i]; + + if (objectInfoPtr == null) continue; + + var objectInfo = objectInfoPtr.Value; if (objectInfo == null || objectInfo->GameObject == null) continue; diff --git a/LightlessSync/Services/PairProcessingLimiter.cs b/LightlessSync/Services/PairProcessingLimiter.cs index 0e75d28..239ba75 100644 --- a/LightlessSync/Services/PairProcessingLimiter.cs +++ b/LightlessSync/Services/PairProcessingLimiter.cs @@ -15,6 +15,7 @@ public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase private readonly SemaphoreSlim _semaphore; private int _currentLimit; private int _pendingReductions; + private int _pendingIncrements; private int _waiting; private int _inFlight; @@ -70,7 +71,7 @@ public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase if (!IsEnabled) { - _semaphore.Release(); + TryReleaseSemaphore(); return NoopReleaser.Instance; } @@ -90,18 +91,12 @@ public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase var releaseAmount = HardLimit - _semaphore.CurrentCount; if (releaseAmount > 0) { - try - { - _semaphore.Release(releaseAmount); - } - catch (SemaphoreFullException) - { - // ignore, already at max - } + TryReleaseSemaphore(releaseAmount); } _currentLimit = desiredLimit; _pendingReductions = 0; + _pendingIncrements = 0; return; } @@ -113,10 +108,13 @@ public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase if (desiredLimit > _currentLimit) { var increment = desiredLimit - _currentLimit; - var allowed = Math.Min(increment, HardLimit - _semaphore.CurrentCount); - if (allowed > 0) + _pendingIncrements += increment; + + var available = HardLimit - _semaphore.CurrentCount; + var toRelease = Math.Min(_pendingIncrements, available); + if (toRelease > 0 && TryReleaseSemaphore(toRelease)) { - _semaphore.Release(allowed); + _pendingIncrements -= toRelease; } } else @@ -133,6 +131,13 @@ public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase { _pendingReductions += remaining; } + + if (_pendingIncrements > 0) + { + var offset = Math.Min(_pendingIncrements, _pendingReductions); + _pendingIncrements -= offset; + _pendingReductions -= offset; + } } _currentLimit = desiredLimit; @@ -146,6 +151,25 @@ public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase return Math.Clamp(configured, 1, HardLimit); } + private bool TryReleaseSemaphore(int count = 1) + { + if (count <= 0) + { + return true; + } + + try + { + _semaphore.Release(count); + return true; + } + catch (SemaphoreFullException ex) + { + Logger.LogDebug(ex, "Attempted to release {count} pair processing slots but semaphore is already at the hard limit.", count); + return false; + } + } + private void ReleaseOne() { var inFlight = Interlocked.Decrement(ref _inFlight); @@ -166,9 +190,20 @@ public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase _pendingReductions--; return; } + + if (_pendingIncrements > 0) + { + if (!TryReleaseSemaphore()) + { + return; + } + + _pendingIncrements--; + return; + } } - _semaphore.Release(); + TryReleaseSemaphore(); } protected override void Dispose(bool disposing) diff --git a/LightlessSync/UI/SettingsUi.cs b/LightlessSync/UI/SettingsUi.cs index dd7ee84..6154ac7 100644 --- a/LightlessSync/UI/SettingsUi.cs +++ b/LightlessSync/UI/SettingsUi.cs @@ -591,6 +591,7 @@ public class SettingsUi : WindowMediatorSubscriberBase bool limitPairApplications = _configService.Current.EnablePairProcessingLimiter; bool useAlternativeUpload = _configService.Current.UseAlternativeFileUpload; int downloadSpeedLimit = _configService.Current.DownloadSpeedLimitInBytes; + bool enableDirectDownloads = _configService.Current.EnableDirectDownloads; ImGui.AlignTextToFramePadding(); ImGui.TextUnformatted("Global Download Speed Limit"); @@ -622,6 +623,13 @@ public class SettingsUi : WindowMediatorSubscriberBase ImGui.AlignTextToFramePadding(); ImGui.TextUnformatted("0 = No limit/infinite"); + if (ImGui.Checkbox("[BETA] Enable Lightspeed Downloads", ref enableDirectDownloads)) + { + _configService.Current.EnableDirectDownloads = enableDirectDownloads; + _configService.Save(); + } + _uiShared.DrawHelpText("Uses signed CDN links when available. Disable to force the legacy queued download flow."); + if (ImGui.SliderInt("Maximum Parallel Downloads", ref maxParallelDownloads, 1, 10)) { _configService.Current.ParallelDownloads = maxParallelDownloads; diff --git a/LightlessSync/UI/SyncshellFinderUI.cs b/LightlessSync/UI/SyncshellFinderUI.cs index 6cd6935..d7f5605 100644 --- a/LightlessSync/UI/SyncshellFinderUI.cs +++ b/LightlessSync/UI/SyncshellFinderUI.cs @@ -288,8 +288,6 @@ public class SyncshellFinderUI : WindowMediatorSubscriberBase return; } - var currentGids = _nearbySyncshells.Select(s => s.Group.GID).ToHashSet(StringComparer.Ordinal); - if (updatedList != null) { var previousGid = GetSelectedGid(); diff --git a/LightlessSync/Utils/Crypto.cs b/LightlessSync/Utils/Crypto.cs index 8ed6ecb..de04d26 100644 --- a/LightlessSync/Utils/Crypto.cs +++ b/LightlessSync/Utils/Crypto.cs @@ -1,4 +1,7 @@ -using System.Security.Cryptography; +using System; +using System.Collections.Generic; +using System.IO; +using System.Security.Cryptography; using System.Text; namespace LightlessSync.Utils; @@ -13,8 +16,9 @@ public static class Crypto public static string GetFileHash(this string filePath) { - using SHA1CryptoServiceProvider cryptoProvider = new(); - return BitConverter.ToString(cryptoProvider.ComputeHash(File.ReadAllBytes(filePath))).Replace("-", "", StringComparison.Ordinal); + using SHA1 sha1 = SHA1.Create(); + using FileStream stream = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete); + return BitConverter.ToString(sha1.ComputeHash(stream)).Replace("-", "", StringComparison.Ordinal); } public static string GetHash256(this (string, ushort) playerToHash) diff --git a/LightlessSync/WebAPI/Files/FileDownloadManager.cs b/LightlessSync/WebAPI/Files/FileDownloadManager.cs index 3f48af2..cc82d04 100644 --- a/LightlessSync/WebAPI/Files/FileDownloadManager.cs +++ b/LightlessSync/WebAPI/Files/FileDownloadManager.cs @@ -5,12 +5,18 @@ using LightlessSync.API.Dto.Files; using LightlessSync.API.Routes; using LightlessSync.FileCache; using LightlessSync.PlayerData.Handlers; +using LightlessSync.Services; using LightlessSync.Services.Mediator; using LightlessSync.WebAPI.Files.Models; using Microsoft.Extensions.Logging; +using System; using System.Collections.Concurrent; +using System.IO; using System.Net; using System.Net.Http.Json; +using System.Threading; +using System.Threading.Tasks; +using LightlessSync.LightlessConfiguration; namespace LightlessSync.WebAPI.Files; @@ -20,17 +26,27 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase private readonly FileCompactor _fileCompactor; private readonly FileCacheManager _fileDbManager; private readonly FileTransferOrchestrator _orchestrator; + private readonly PairProcessingLimiter _pairProcessingLimiter; + private readonly LightlessConfigService _configService; private readonly ConcurrentDictionary _activeDownloadStreams; + private static readonly TimeSpan DownloadStallTimeout = TimeSpan.FromSeconds(30); + private volatile bool _disableDirectDownloads; + private int _consecutiveDirectDownloadFailures; + private bool _lastConfigDirectDownloadsState; public FileDownloadManager(ILogger logger, LightlessMediator mediator, FileTransferOrchestrator orchestrator, - FileCacheManager fileCacheManager, FileCompactor fileCompactor) : base(logger, mediator) + FileCacheManager fileCacheManager, FileCompactor fileCompactor, + PairProcessingLimiter pairProcessingLimiter, LightlessConfigService configService) : base(logger, mediator) { _downloadStatus = new Dictionary(StringComparer.Ordinal); _orchestrator = orchestrator; _fileDbManager = fileCacheManager; _fileCompactor = fileCompactor; + _pairProcessingLimiter = pairProcessingLimiter; + _configService = configService; _activeDownloadStreams = new(); + _lastConfigDirectDownloadsState = _configService.Current.EnableDirectDownloads; Mediator.Subscribe(this, (msg) => { @@ -50,6 +66,11 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase public bool IsDownloading => CurrentDownloads.Any(); + private bool ShouldUseDirectDownloads() + { + return _configService.Current.EnableDirectDownloads && !_disableDirectDownloads; + } + public static void MungeBuffer(Span buffer) { for (int i = 0; i < buffer.Length; ++i) @@ -156,39 +177,47 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase Logger.LogWarning("Download status missing for {group} when starting download", downloadGroup); } + var requestUrl = LightlessFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId); + + await DownloadFileThrottled(requestUrl, tempPath, progress, MungeBuffer, ct, withToken: true).ConfigureAwait(false); + } + + private delegate void DownloadDataCallback(Span data); + + private async Task DownloadFileThrottled(Uri requestUrl, string destinationFilename, IProgress progress, DownloadDataCallback? callback, CancellationToken ct, bool withToken) + { const int maxRetries = 3; int retryCount = 0; TimeSpan retryDelay = TimeSpan.FromSeconds(2); - - HttpResponseMessage response = null!; - var requestUrl = LightlessFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId); + HttpResponseMessage? response = null; while (true) { try { - Logger.LogDebug("Attempt {attempt} - Downloading {requestUrl} for request {id}", retryCount + 1, requestUrl, requestId); - - response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false); + Logger.LogDebug("Attempt {attempt} - Downloading {requestUrl}", retryCount + 1, requestUrl); + response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead, withToken).ConfigureAwait(false); response.EnsureSuccessStatusCode(); break; } catch (HttpRequestException ex) when (ex.InnerException is TimeoutException || ex.StatusCode == null) { + response?.Dispose(); retryCount++; Logger.LogWarning(ex, "Timeout during download of {requestUrl}. Attempt {attempt} of {maxRetries}", requestUrl, retryCount, maxRetries); if (retryCount >= maxRetries || ct.IsCancellationRequested) { - Logger.LogError($"Max retries reached or cancelled. Failing download for {requestUrl}"); + Logger.LogError("Max retries reached or cancelled. Failing download for {requestUrl}", requestUrl); throw; } - await Task.Delay(retryDelay, ct).ConfigureAwait(false); // Wait before retrying + await Task.Delay(retryDelay, ct).ConfigureAwait(false); } catch (HttpRequestException ex) { + response?.Dispose(); Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode); if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized) @@ -196,42 +225,80 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex); } - throw; + throw; } } + ThrottledStream? stream = null; FileStream? fileStream = null; try { - fileStream = File.Create(tempPath); + fileStream = File.Create(destinationFilename); await using (fileStream.ConfigureAwait(false)) { - var bufferSize = response.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196; + var bufferSize = response!.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196; var buffer = new byte[bufferSize]; - var bytesRead = 0; var limit = _orchestrator.DownloadLimitPerSlot(); - Logger.LogTrace("Starting Download of {id} with a speed limit of {limit} to {tempPath}", requestId, limit, tempPath); + Logger.LogTrace("Starting Download with a speed limit of {limit} to {destination}", limit, destinationFilename); stream = new(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit); - _activeDownloadStreams.TryAdd(stream, 0); - while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0) + while (true) { ct.ThrowIfCancellationRequested(); + int bytesRead; + try + { + var readTask = stream.ReadAsync(buffer.AsMemory(0, buffer.Length), ct).AsTask(); + while (!readTask.IsCompleted) + { + var completedTask = await Task.WhenAny(readTask, Task.Delay(DownloadStallTimeout)).ConfigureAwait(false); + if (completedTask == readTask) + { + break; + } - MungeBuffer(buffer.AsSpan(0, bytesRead)); + ct.ThrowIfCancellationRequested(); + + var snapshot = _pairProcessingLimiter.GetSnapshot(); + if (snapshot.Waiting > 0) + { + throw new TimeoutException($"No data received for {DownloadStallTimeout.TotalSeconds} seconds while downloading {requestUrl} (waiting: {snapshot.Waiting})"); + } + + Logger.LogTrace("Download stalled for {requestUrl} but no queued pairs, continuing to wait", requestUrl); + } + + bytesRead = await readTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + throw; + } + + if (bytesRead == 0) + { + break; + } + + callback?.Invoke(buffer.AsSpan(0, bytesRead)); await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false); progress.Report(bytesRead); } - Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath); + Logger.LogDebug("{requestUrl} downloaded to {destination}", requestUrl, destinationFilename); } } + catch (TimeoutException ex) + { + Logger.LogWarning(ex, "Detected stalled download for {requestUrl}, aborting transfer", requestUrl); + throw; + } catch (OperationCanceledException) { throw; @@ -240,18 +307,18 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase { try { - fileStream?.Close(); + fileStream?.Close(); - if (!string.IsNullOrEmpty(tempPath) && File.Exists(tempPath)) + if (!string.IsNullOrEmpty(destinationFilename) && File.Exists(destinationFilename)) { - File.Delete(tempPath); + File.Delete(destinationFilename); } } catch { - // Ignore errors during cleanup + // ignore cleanup errors } - throw; + throw; } finally { @@ -260,6 +327,134 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase _activeDownloadStreams.TryRemove(stream, out _); await stream.DisposeAsync().ConfigureAwait(false); } + + response?.Dispose(); + } + } + + private async Task DecompressBlockFileAsync(string downloadStatusKey, string blockFilePath, List fileReplacement, string downloadLabel) + { + if (_downloadStatus.TryGetValue(downloadStatusKey, out var status)) + { + status.TransferredFiles = 1; + status.DownloadStatus = DownloadStatus.Decompressing; + } + + FileStream? fileBlockStream = null; + try + { + fileBlockStream = File.OpenRead(blockFilePath); + while (fileBlockStream.Position < fileBlockStream.Length) + { + (string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream); + + try + { + var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1]; + var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension); + Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", downloadLabel, fileHash, fileLengthBytes, filePath); + + byte[] compressedFileContent = new byte[fileLengthBytes]; + var readBytes = await fileBlockStream.ReadAsync(compressedFileContent, CancellationToken.None).ConfigureAwait(false); + if (readBytes != fileLengthBytes) + { + throw new EndOfStreamException(); + } + MungeBuffer(compressedFileContent); + + var decompressedFile = LZ4Wrapper.Unwrap(compressedFileContent); + await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, CancellationToken.None).ConfigureAwait(false); + + PersistFileToStorage(fileHash, filePath); + } + catch (EndOfStreamException) + { + Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", downloadLabel, fileHash); + } + catch (Exception e) + { + Logger.LogWarning(e, "{dlName}: Error during decompression", downloadLabel); + } + } + } + catch (EndOfStreamException) + { + Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", downloadLabel); + } + catch (Exception ex) + { + Logger.LogError(ex, "{dlName}: Error during block file read", downloadLabel); + } + finally + { + if (fileBlockStream != null) + await fileBlockStream.DisposeAsync().ConfigureAwait(false); + } + } + + private async Task PerformDirectDownloadFallbackAsync(DownloadFileTransfer directDownload, List fileReplacement, + IProgress progress, CancellationToken token, bool slotAlreadyAcquired) + { + if (string.IsNullOrEmpty(directDownload.DirectDownloadUrl)) + { + throw new InvalidOperationException("Direct download fallback requested without a direct download URL."); + } + + var downloadKey = directDownload.DirectDownloadUrl!; + bool slotAcquiredHere = false; + string? blockFile = null; + + try + { + if (!slotAlreadyAcquired) + { + if (_downloadStatus.TryGetValue(downloadKey, out var tracker)) + { + tracker.DownloadStatus = DownloadStatus.WaitingForSlot; + } + + await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); + slotAcquiredHere = true; + } + + if (_downloadStatus.TryGetValue(downloadKey, out var queueTracker)) + { + queueTracker.DownloadStatus = DownloadStatus.WaitingForQueue; + } + + var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, LightlessFiles.RequestEnqueueFullPath(directDownload.DownloadUri), + new[] { directDownload.Hash }, token).ConfigureAwait(false); + var requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"')); + + blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); + + await DownloadAndMungeFileHttpClient(downloadKey, requestId, [directDownload], blockFile, progress, token).ConfigureAwait(false); + + if (!File.Exists(blockFile)) + { + throw new FileNotFoundException("Block file missing after direct download fallback.", blockFile); + } + + await DecompressBlockFileAsync(downloadKey, blockFile, fileReplacement, $"fallback-{directDownload.Hash}").ConfigureAwait(false); + } + finally + { + if (slotAcquiredHere) + { + _orchestrator.ReleaseDownloadSlot(); + } + + if (!string.IsNullOrEmpty(blockFile)) + { + try + { + File.Delete(blockFile); + } + catch + { + // ignore cleanup errors + } + } } } @@ -307,30 +502,76 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List fileReplacement, CancellationToken ct) { - var downloadGroups = CurrentDownloads.GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal); + var objectName = gameObjectHandler?.Name ?? "Unknown"; - foreach (var downloadGroup in downloadGroups) + var configAllowsDirect = _configService.Current.EnableDirectDownloads; + if (configAllowsDirect != _lastConfigDirectDownloadsState) { - _downloadStatus[downloadGroup.Key] = new FileDownloadStatus() + _lastConfigDirectDownloadsState = configAllowsDirect; + if (configAllowsDirect) + { + _disableDirectDownloads = false; + _consecutiveDirectDownloadFailures = 0; + } + } + + var allowDirectDownloads = ShouldUseDirectDownloads(); + + var directDownloads = new List(); + var batchDownloads = new List(); + + foreach (var download in CurrentDownloads) + { + if (!string.IsNullOrEmpty(download.DirectDownloadUrl) && allowDirectDownloads) + { + directDownloads.Add(download); + } + else + { + batchDownloads.Add(download); + } + } + + var downloadBatches = batchDownloads.GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal).ToArray(); + + foreach (var directDownload in directDownloads) + { + _downloadStatus[directDownload.DirectDownloadUrl!] = new FileDownloadStatus() { DownloadStatus = DownloadStatus.Initializing, - TotalBytes = downloadGroup.Sum(c => c.Total), + TotalBytes = directDownload.Total, TotalFiles = 1, TransferredBytes = 0, TransferredFiles = 0 }; } + foreach (var downloadBatch in downloadBatches) + { + _downloadStatus[downloadBatch.Key] = new FileDownloadStatus() + { + DownloadStatus = DownloadStatus.Initializing, + TotalBytes = downloadBatch.Sum(c => c.Total), + TotalFiles = 1, + TransferredBytes = 0, + TransferredFiles = 0 + }; + } + + if (directDownloads.Count > 0 || downloadBatches.Length > 0) + { + Logger.LogWarning("Downloading {direct} files directly, and {batchtotal} in {batches} batches.", directDownloads.Count, batchDownloads.Count, downloadBatches.Length); + } + Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus)); - await Parallel.ForEachAsync(downloadGroups, new ParallelOptions() + Task batchDownloadsTask = downloadBatches.Length == 0 ? Task.CompletedTask : Parallel.ForEachAsync(downloadBatches, new ParallelOptions() { - MaxDegreeOfParallelism = downloadGroups.Count(), + MaxDegreeOfParallelism = downloadBatches.Length, CancellationToken = ct, }, async (fileGroup, token) => { - // let server predownload files var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, LightlessFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri), fileGroup.Select(c => c.Hash), token).ConfigureAwait(false); Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri, @@ -353,7 +594,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase downloadStatus.DownloadStatus = DownloadStatus.WaitingForSlot; await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); downloadStatus.DownloadStatus = DownloadStatus.WaitingForQueue; - Progress progress = new((bytesDownloaded) => + var progress = CreateInlineProgress((bytesDownloaded) => { try { @@ -371,7 +612,7 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase } catch (OperationCanceledException) { - Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, gameObjectHandler); + Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, objectName); } catch (Exception ex) { @@ -382,72 +623,167 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase return; } - FileStream? fileBlockStream = null; try { - if (_downloadStatus.TryGetValue(fileGroup.Key, out var status)) - { - status.TransferredFiles = 1; - status.DownloadStatus = DownloadStatus.Decompressing; - } if (!File.Exists(blockFile)) { Logger.LogWarning("{dlName}: Block file missing before extraction, skipping", fi.Name); return; } - fileBlockStream = File.OpenRead(blockFile); - while (fileBlockStream.Position < fileBlockStream.Length) - { - (string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream); - - try - { - var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1]; - var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension); - Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", fi.Name, fileHash, fileLengthBytes, filePath); - - byte[] compressedFileContent = new byte[fileLengthBytes]; - var readBytes = await fileBlockStream.ReadAsync(compressedFileContent, CancellationToken.None).ConfigureAwait(false); - if (readBytes != fileLengthBytes) - { - throw new EndOfStreamException(); - } - MungeBuffer(compressedFileContent); - - var decompressedFile = LZ4Wrapper.Unwrap(compressedFileContent); - await _fileCompactor.WriteAllBytesAsync(filePath, decompressedFile, CancellationToken.None).ConfigureAwait(false); - - PersistFileToStorage(fileHash, filePath); - } - catch (EndOfStreamException) - { - Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", fi.Name, fileHash); - } - catch (Exception e) - { - Logger.LogWarning(e, "{dlName}: Error during decompression", fi.Name); - } - } - } - catch (EndOfStreamException) - { - Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", fi.Name); - } - catch (Exception ex) - { - Logger.LogError(ex, "{dlName}: Error during block file read", fi.Name); + await DecompressBlockFileAsync(fileGroup.Key, blockFile, fileReplacement, fi.Name).ConfigureAwait(false); } finally { _orchestrator.ReleaseDownloadSlot(); - if (fileBlockStream != null) - await fileBlockStream.DisposeAsync().ConfigureAwait(false); File.Delete(blockFile); } - }).ConfigureAwait(false); + }); - Logger.LogDebug("Download end: {id}", gameObjectHandler); + Task directDownloadsTask = directDownloads.Count == 0 ? Task.CompletedTask : Parallel.ForEachAsync(directDownloads, new ParallelOptions() + { + MaxDegreeOfParallelism = directDownloads.Count, + CancellationToken = ct, + }, + async (directDownload, token) => + { + if (!_downloadStatus.TryGetValue(directDownload.DirectDownloadUrl!, out var downloadTracker)) + { + Logger.LogWarning("Download status missing for direct URL {url}", directDownload.DirectDownloadUrl); + return; + } + + var progress = CreateInlineProgress((bytesDownloaded) => + { + try + { + if (_downloadStatus.TryGetValue(directDownload.DirectDownloadUrl!, out FileDownloadStatus? value)) + { + value.TransferredBytes += bytesDownloaded; + } + } + catch (Exception ex) + { + Logger.LogWarning(ex, "Could not set download progress"); + } + }); + + if (!ShouldUseDirectDownloads()) + { + await PerformDirectDownloadFallbackAsync(directDownload, fileReplacement, progress, token, slotAlreadyAcquired: false).ConfigureAwait(false); + return; + } + + var tempFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, "bin"); + var slotAcquired = false; + + + try + { + downloadTracker.DownloadStatus = DownloadStatus.WaitingForSlot; + await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); + slotAcquired = true; + + downloadTracker.DownloadStatus = DownloadStatus.Downloading; + Logger.LogDebug("Beginning direct download of {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl); + await DownloadFileThrottled(new Uri(directDownload.DirectDownloadUrl!), tempFilename, progress, null, token, withToken: false).ConfigureAwait(false); + + Interlocked.Exchange(ref _consecutiveDirectDownloadFailures, 0); + + downloadTracker.DownloadStatus = DownloadStatus.Decompressing; + + try + { + var replacement = fileReplacement.FirstOrDefault(f => string.Equals(f.Hash, directDownload.Hash, StringComparison.OrdinalIgnoreCase)); + if (replacement == null || replacement.GamePaths.Length == 0) + { + Logger.LogWarning("{hash}: No replacement data found for direct download.", directDownload.Hash); + return; + } + + var fileExtension = replacement.GamePaths[0].Split(".")[^1]; + var finalFilename = _fileDbManager.GetCacheFilePath(directDownload.Hash, fileExtension); + Logger.LogDebug("Decompressing direct download {hash} from {compressedFile} to {finalFile}", directDownload.Hash, tempFilename, finalFilename); + byte[] compressedBytes = await File.ReadAllBytesAsync(tempFilename).ConfigureAwait(false); + var decompressedBytes = LZ4Wrapper.Unwrap(compressedBytes); + await _fileCompactor.WriteAllBytesAsync(finalFilename, decompressedBytes, CancellationToken.None).ConfigureAwait(false); + PersistFileToStorage(directDownload.Hash, finalFilename); + + downloadTracker.TransferredFiles = 1; + Logger.LogDebug("Finished direct download of {hash}.", directDownload.Hash); + } + catch (Exception ex) + { + Logger.LogError(ex, "Exception downloading {hash} from {url}", directDownload.Hash, directDownload.DirectDownloadUrl); + } + } + catch (OperationCanceledException ex) + { + Logger.LogDebug("{hash}: Detected cancellation of direct download, discarding file.", directDownload.Hash); + Logger.LogError(ex, "{hash}: Error during direct download.", directDownload.Hash); + ClearDownload(); + return; + } + catch (Exception ex) + { + var expectedDirectDownloadFailure = ex is InvalidDataException; + var failureCount = 0; + + if (expectedDirectDownloadFailure) + { + Logger.LogInformation(ex, "{hash}: Direct download unavailable, attempting queued fallback.", directDownload.Hash); + } + else + { + failureCount = Interlocked.Increment(ref _consecutiveDirectDownloadFailures); + Logger.LogWarning(ex, "{hash}: Direct download failed, attempting queued fallback.", directDownload.Hash); + } + + try + { + downloadTracker.DownloadStatus = DownloadStatus.WaitingForQueue; + await PerformDirectDownloadFallbackAsync(directDownload, fileReplacement, progress, token, slotAcquired).ConfigureAwait(false); + + if (!expectedDirectDownloadFailure && failureCount >= 3 && !_disableDirectDownloads) + { + _disableDirectDownloads = true; + Logger.LogWarning("Disabling direct downloads for this session after {count} consecutive failures.", failureCount); + } + } + catch (Exception fallbackEx) + { + if (slotAcquired) + { + _orchestrator.ReleaseDownloadSlot(); + slotAcquired = false; + } + + Logger.LogError(fallbackEx, "{hash}: Error during direct download fallback.", directDownload.Hash); + ClearDownload(); + return; + } + } + finally + { + if (slotAcquired) + { + _orchestrator.ReleaseDownloadSlot(); + } + + try + { + File.Delete(tempFilename); + } + catch + { + // ignore + } + } + }); + + await Task.WhenAll(batchDownloadsTask, directDownloadsTask).ConfigureAwait(false); + + Logger.LogDebug("Download end: {id}", objectName); ClearDownload(); } @@ -554,4 +890,24 @@ public partial class FileDownloadManager : DisposableMediatorSubscriberBase _orchestrator.ClearDownloadRequest(requestId); } } -} \ No newline at end of file + + private static IProgress CreateInlineProgress(Action callback) + { + return new InlineProgress(callback); + } + + private sealed class InlineProgress : IProgress + { + private readonly Action _callback; + + public InlineProgress(Action callback) + { + _callback = callback ?? throw new ArgumentNullException(nameof(callback)); + } + + public void Report(long value) + { + _callback(value); + } + } +} diff --git a/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs b/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs index 690ea79..de84a81 100644 --- a/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs +++ b/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs @@ -81,27 +81,30 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase } public async Task SendRequestAsync(HttpMethod method, Uri uri, - CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead) + CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, + bool withToken = true) { using var requestMessage = new HttpRequestMessage(method, uri); - return await SendRequestInternalAsync(requestMessage, ct, httpCompletionOption).ConfigureAwait(false); + return await SendRequestInternalAsync(requestMessage, ct, httpCompletionOption, withToken).ConfigureAwait(false); } - public async Task SendRequestAsync(HttpMethod method, Uri uri, T content, CancellationToken ct) where T : class + public async Task SendRequestAsync(HttpMethod method, Uri uri, T content, CancellationToken ct, + bool withToken = true) where T : class { using var requestMessage = new HttpRequestMessage(method, uri); if (content is not ByteArrayContent) requestMessage.Content = JsonContent.Create(content); else requestMessage.Content = content as ByteArrayContent; - return await SendRequestInternalAsync(requestMessage, ct).ConfigureAwait(false); + return await SendRequestInternalAsync(requestMessage, ct, withToken: withToken).ConfigureAwait(false); } - public async Task SendRequestStreamAsync(HttpMethod method, Uri uri, ProgressableStreamContent content, CancellationToken ct) + public async Task SendRequestStreamAsync(HttpMethod method, Uri uri, ProgressableStreamContent content, + CancellationToken ct, bool withToken = true) { using var requestMessage = new HttpRequestMessage(method, uri); requestMessage.Content = content; - return await SendRequestInternalAsync(requestMessage, ct).ConfigureAwait(false); + return await SendRequestInternalAsync(requestMessage, ct, withToken: withToken).ConfigureAwait(false); } public async Task WaitForDownloadSlotAsync(CancellationToken token) @@ -144,10 +147,13 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase } private async Task SendRequestInternalAsync(HttpRequestMessage requestMessage, - CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead) + CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, bool withToken = true) { - var token = await _tokenProvider.GetToken().ConfigureAwait(false); - requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token); + if (withToken) + { + var token = await _tokenProvider.GetToken().ConfigureAwait(false); + requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token); + } if (requestMessage.Content != null && requestMessage.Content is not StreamContent && requestMessage.Content is not ByteArrayContent) { diff --git a/LightlessSync/WebAPI/Files/Models/DownloadFileTransfer.cs b/LightlessSync/WebAPI/Files/Models/DownloadFileTransfer.cs index effb461..d0e9fd4 100644 --- a/LightlessSync/WebAPI/Files/Models/DownloadFileTransfer.cs +++ b/LightlessSync/WebAPI/Files/Models/DownloadFileTransfer.cs @@ -18,6 +18,7 @@ public class DownloadFileTransfer : FileTransfer } get => Dto.Size; } + public string? DirectDownloadUrl => ((DownloadFileDto)TransferDto).CDNDownloadUrl; public long TotalRaw => Dto.RawSize; private DownloadFileDto Dto => (DownloadFileDto)TransferDto;