using LightlessSync.LightlessConfiguration; using LightlessSync.Services.Mediator; using LightlessSync.WebAPI.Files.Models; using LightlessSync.WebAPI.SignalR; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; using System.Net.Http.Headers; using System.Net.Http.Json; using System.Net.Sockets; using System.Reflection; namespace LightlessSync.WebAPI.Files; public class FileTransferOrchestrator : DisposableMediatorSubscriberBase { private readonly ConcurrentDictionary _downloadReady = new(); private readonly HttpClient _httpClient; private readonly LightlessConfigService _lightlessConfig; private readonly object _semaphoreModificationLock = new(); private readonly TokenProvider _tokenProvider; private int _availableDownloadSlots; private SemaphoreSlim _downloadSemaphore; private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount; public FileTransferOrchestrator( ILogger logger, LightlessConfigService lightlessConfig, LightlessMediator mediator, TokenProvider tokenProvider, HttpClient httpClient) : base(logger, mediator) { _lightlessConfig = lightlessConfig; _tokenProvider = tokenProvider; _httpClient = httpClient; var ver = Assembly.GetExecutingAssembly().GetName().Version; _httpClient.DefaultRequestHeaders.UserAgent.Add( new ProductInfoHeaderValue("LightlessSync", $"{ver!.Major}.{ver.Minor}.{ver.Build}")); _availableDownloadSlots = Math.Max(1, lightlessConfig.Current.ParallelDownloads); _downloadSemaphore = new SemaphoreSlim(_availableDownloadSlots, _availableDownloadSlots); Mediator.Subscribe(this, msg => FilesCdnUri = msg.Connection.ServerInfo.FileServerAddress); Mediator.Subscribe(this, _ => FilesCdnUri = null); Mediator.Subscribe(this, msg => _downloadReady[msg.RequestId] = true); } /// /// Files CDN Uri from server /// public Uri? FilesCdnUri { private set; get; } /// /// Forbidden file transfers given by server /// public List ForbiddenTransfers { get; } = []; /// /// Is the FileTransferOrchestrator initialized /// public bool IsInitialized => FilesCdnUri != null; /// /// Configured parallel downloads in settings (ParallelDownloads) /// public int ConfiguredParallelDownloads => Math.Max(1, _lightlessConfig.Current.ParallelDownloads); /// /// Clears the download request for the given guid /// /// Guid of download request public void ClearDownloadRequest(Guid guid) => _downloadReady.Remove(guid, out _); /// /// Is the download ready for the given guid /// /// Guid of download request /// Completion of the download public bool IsDownloadReady(Guid guid) => _downloadReady.TryGetValue(guid, out bool isReady) && isReady; /// /// Release a download slot after download is complete /// public void ReleaseDownloadSlot() { try { _downloadSemaphore.Release(); Mediator.Publish(new DownloadLimitChangedMessage()); } catch (SemaphoreFullException) { // ignore } } /// /// Wait for an available download slot asyncronously /// /// Cancellation Token /// Task of the slot public async Task WaitForDownloadSlotAsync(CancellationToken token) { lock (_semaphoreModificationLock) { var desired = Math.Max(1, _lightlessConfig.Current.ParallelDownloads); if (_availableDownloadSlots != desired && _availableDownloadSlots == _downloadSemaphore.CurrentCount) { _availableDownloadSlots = desired; var old = _downloadSemaphore; _downloadSemaphore = new SemaphoreSlim(_availableDownloadSlots, _availableDownloadSlots); try { old.Dispose(); } catch { /* ignore */ } } } await _downloadSemaphore.WaitAsync(token).ConfigureAwait(false); Mediator.Publish(new DownloadLimitChangedMessage()); } /// /// Download limit per slot in bytes /// /// Bytes of the download limit public long DownloadLimitPerSlot() { var limit = _lightlessConfig.Current.DownloadSpeedLimitInBytes; if (limit <= 0) return 0; limit = _lightlessConfig.Current.DownloadSpeedType switch { LightlessConfiguration.Models.DownloadSpeeds.Bps => limit, LightlessConfiguration.Models.DownloadSpeeds.KBps => limit * 1024, LightlessConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024, _ => limit, }; var usedSlots = CurrentlyUsedDownloadSlots; var divided = limit / (usedSlots <= 0 ? 1 : usedSlots); if (divided < 0) { Logger.LogWarning( "Calculated Bandwidth Limit is negative, returning Infinity: {value}, usedSlots={usedSlots}, limit={limit}, avail={avail}, currentCount={count}", divided, usedSlots, limit, _availableDownloadSlots, _downloadSemaphore.CurrentCount); return long.MaxValue; } return Math.Clamp(divided, 1, long.MaxValue); } /// /// sends an HTTP request without content serialization /// /// HttpMethod for the request /// Uri for the request /// Cancellation Token /// Enum of HttpCollectionOption /// Include Cancellation Token /// Http response of the request public async Task SendRequestAsync( HttpMethod method, Uri uri, CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, bool withToken = true) { return await SendRequestInternalAsync( () => new HttpRequestMessage(method, uri), ct, httpCompletionOption, withToken, allowRetry: true).ConfigureAwait(false); } /// /// Sends an HTTP request with JSON content serialization /// /// HttpResponseMessage /// Http method /// Url of the direct download link /// content of the request /// cancellation token /// include cancellation token /// public async Task SendRequestAsync( HttpMethod method, Uri uri, T content, CancellationToken ct, bool withToken = true) where T : class { return await SendRequestInternalAsync(() => { var requestMessage = new HttpRequestMessage(method, uri); if (content is ByteArrayContent byteArrayContent) { var bytes = byteArrayContent.ReadAsByteArrayAsync(ct).GetAwaiter().GetResult(); var cloned = new ByteArrayContent(bytes); foreach (var header in byteArrayContent.Headers) cloned.Headers.TryAddWithoutValidation(header.Key, header.Value); requestMessage.Content = cloned; } else { requestMessage.Content = JsonContent.Create(content); } return requestMessage; }, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: content is not HttpContent || content is ByteArrayContent).ConfigureAwait(false); } public async Task SendRequestStreamAsync( HttpMethod method, Uri uri, ProgressableStreamContent content, CancellationToken ct, bool withToken = true) { return await SendRequestInternalAsync(() => { return new HttpRequestMessage(method, uri) { Content = content }; }, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: false).ConfigureAwait(false); } /// /// sends an HTTP request with optional retry logic for transient network errors /// /// Request factory /// Cancellation Token /// Http Options /// With cancellation token /// Allows retry of request /// Response message of request private async Task SendRequestInternalAsync( Func requestFactory, CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, bool withToken = true, bool allowRetry = true) { const int maxAttempts = 2; var attempt = 0; while (true) { attempt++; using var requestMessage = requestFactory(); if (withToken) { var token = await _tokenProvider.GetToken().ConfigureAwait(false); requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token); } if (requestMessage.Content != null && requestMessage.Content is not StreamContent && requestMessage.Content is not ByteArrayContent) { // log content for debugging var content = await ((JsonContent)requestMessage.Content).ReadAsStringAsync().ConfigureAwait(false); Logger.LogDebug("Sending {method} to {uri} (Content: {content})", requestMessage.Method, requestMessage.RequestUri, content); } else { Logger.LogDebug("Sending {method} to {uri}", requestMessage.Method, requestMessage.RequestUri); } try { // send request return ct != null ? await _httpClient.SendAsync(requestMessage, httpCompletionOption, ct.Value).ConfigureAwait(false) : await _httpClient.SendAsync(requestMessage, httpCompletionOption).ConfigureAwait(false); } catch (TaskCanceledException) { throw; } catch (Exception ex) when (allowRetry && attempt < maxAttempts && IsTransientNetworkException(ex)) { Logger.LogWarning(ex, "Transient error during SendRequestInternal for {uri}, retrying attempt {attempt}/{maxAttempts}", requestMessage.RequestUri, attempt, maxAttempts); if (ct.HasValue) await Task.Delay(TimeSpan.FromMilliseconds(200), ct.Value).ConfigureAwait(false); else await Task.Delay(TimeSpan.FromMilliseconds(200)).ConfigureAwait(false); } catch (Exception ex) { Logger.LogWarning(ex, "Error during SendRequestInternal for {uri}", requestMessage.RequestUri); throw; } } } /// /// Is the exception a transient network exception /// /// expection /// Is transient network expection private static bool IsTransientNetworkException(Exception ex) { var current = ex; while (current != null) { if (current is SocketException socketEx) { return socketEx.SocketErrorCode is SocketError.ConnectionReset or SocketError.ConnectionAborted or SocketError.TimedOut; } current = current.InnerException; } return false; } }