using LightlessSync.LightlessConfiguration; using LightlessSync.Services.Mediator; using LightlessSync.WebAPI.Files.Models; using LightlessSync.WebAPI.SignalR; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; using System.Net.Http; using System.Net.Http.Headers; using System.Net.Http.Json; using System.Net.Sockets; using System.Reflection; namespace LightlessSync.WebAPI.Files; public class FileTransferOrchestrator : DisposableMediatorSubscriberBase { private readonly ConcurrentDictionary _downloadReady = new(); private readonly HttpClient _httpClient; private readonly LightlessConfigService _lightlessConfig; private readonly object _semaphoreModificationLock = new(); private readonly TokenProvider _tokenProvider; private int _availableDownloadSlots; private SemaphoreSlim _downloadSemaphore; private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount; public FileTransferOrchestrator(ILogger logger, LightlessConfigService lightlessConfig, LightlessMediator mediator, TokenProvider tokenProvider, HttpClient httpClient) : base(logger, mediator) { _lightlessConfig = lightlessConfig; _tokenProvider = tokenProvider; _httpClient = httpClient; var ver = Assembly.GetExecutingAssembly().GetName().Version; _httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("LightlessSync", ver!.Major + "." + ver!.Minor + "." + ver!.Build)); _availableDownloadSlots = lightlessConfig.Current.ParallelDownloads; _downloadSemaphore = new(_availableDownloadSlots, _availableDownloadSlots); Mediator.Subscribe(this, (msg) => { FilesCdnUri = msg.Connection.ServerInfo.FileServerAddress; }); Mediator.Subscribe(this, (msg) => { FilesCdnUri = null; }); Mediator.Subscribe(this, (msg) => { _downloadReady[msg.RequestId] = true; }); } public Uri? FilesCdnUri { private set; get; } public List ForbiddenTransfers { get; } = []; public bool IsInitialized => FilesCdnUri != null; public void ClearDownloadRequest(Guid guid) { _downloadReady.Remove(guid, out _); } public bool IsDownloadReady(Guid guid) { if (_downloadReady.TryGetValue(guid, out bool isReady) && isReady) { return true; } return false; } public void ReleaseDownloadSlot() { try { _downloadSemaphore.Release(); Mediator.Publish(new DownloadLimitChangedMessage()); } catch (SemaphoreFullException) { // ignore } } public async Task SendRequestAsync(HttpMethod method, Uri uri, CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, bool withToken = true) { return await SendRequestInternalAsync(() => new HttpRequestMessage(method, uri), ct, httpCompletionOption, withToken, allowRetry: true).ConfigureAwait(false); } public async Task SendRequestAsync(HttpMethod method, Uri uri, T content, CancellationToken ct, bool withToken = true) where T : class { return await SendRequestInternalAsync(() => { var requestMessage = new HttpRequestMessage(method, uri); if (content is not ByteArrayContent byteArrayContent) { requestMessage.Content = JsonContent.Create(content); } else { var clonedContent = new ByteArrayContent(byteArrayContent.ReadAsByteArrayAsync().GetAwaiter().GetResult()); foreach (var header in byteArrayContent.Headers) { clonedContent.Headers.TryAddWithoutValidation(header.Key, header.Value); } requestMessage.Content = clonedContent; } return requestMessage; }, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: content is not HttpContent || content is ByteArrayContent).ConfigureAwait(false); } public async Task SendRequestStreamAsync(HttpMethod method, Uri uri, ProgressableStreamContent content, CancellationToken ct, bool withToken = true) { return await SendRequestInternalAsync(() => { var requestMessage = new HttpRequestMessage(method, uri) { Content = content }; return requestMessage; }, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: false).ConfigureAwait(false); } public async Task WaitForDownloadSlotAsync(CancellationToken token) { lock (_semaphoreModificationLock) { if (_availableDownloadSlots != _lightlessConfig.Current.ParallelDownloads && _availableDownloadSlots == _downloadSemaphore.CurrentCount) { _availableDownloadSlots = _lightlessConfig.Current.ParallelDownloads; _downloadSemaphore = new(_availableDownloadSlots, _availableDownloadSlots); } } await _downloadSemaphore.WaitAsync(token).ConfigureAwait(false); Mediator.Publish(new DownloadLimitChangedMessage()); } public long DownloadLimitPerSlot() { var limit = _lightlessConfig.Current.DownloadSpeedLimitInBytes; if (limit <= 0) return 0; limit = _lightlessConfig.Current.DownloadSpeedType switch { LightlessConfiguration.Models.DownloadSpeeds.Bps => limit, LightlessConfiguration.Models.DownloadSpeeds.KBps => limit * 1024, LightlessConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024, _ => limit, }; var currentUsedDlSlots = CurrentlyUsedDownloadSlots; var avaialble = _availableDownloadSlots; var currentCount = _downloadSemaphore.CurrentCount; var dividedLimit = limit / (currentUsedDlSlots == 0 ? 1 : currentUsedDlSlots); if (dividedLimit < 0) { Logger.LogWarning("Calculated Bandwidth Limit is negative, returning Infinity: {value}, CurrentlyUsedDownloadSlots is {currentSlots}, " + "DownloadSpeedLimit is {limit}, available slots: {avail}, current count: {count}", dividedLimit, currentUsedDlSlots, limit, avaialble, currentCount); return long.MaxValue; } return Math.Clamp(dividedLimit, 1, long.MaxValue); } private async Task SendRequestInternalAsync(Func requestFactory, CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, bool withToken = true, bool allowRetry = true) { const int maxAttempts = 2; var attempt = 0; while (true) { attempt++; using var requestMessage = requestFactory(); if (withToken) { var token = await _tokenProvider.GetToken().ConfigureAwait(false); requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token); } if (requestMessage.Content != null && requestMessage.Content is not StreamContent && requestMessage.Content is not ByteArrayContent) { var content = await ((JsonContent)requestMessage.Content).ReadAsStringAsync().ConfigureAwait(false); Logger.LogDebug("Sending {method} to {uri} (Content: {content})", requestMessage.Method, requestMessage.RequestUri, content); } else { Logger.LogDebug("Sending {method} to {uri}", requestMessage.Method, requestMessage.RequestUri); } try { if (ct != null) return await _httpClient.SendAsync(requestMessage, httpCompletionOption, ct.Value).ConfigureAwait(false); return await _httpClient.SendAsync(requestMessage, httpCompletionOption).ConfigureAwait(false); } catch (TaskCanceledException) { throw; } catch (Exception ex) when (allowRetry && attempt < maxAttempts && IsTransientNetworkException(ex)) { Logger.LogWarning(ex, "Transient error during SendRequestInternal for {uri}, retrying attempt {attempt}/{maxAttempts}", requestMessage.RequestUri, attempt, maxAttempts); if (ct.HasValue) { await Task.Delay(TimeSpan.FromMilliseconds(200), ct.Value).ConfigureAwait(false); } else { await Task.Delay(TimeSpan.FromMilliseconds(200)).ConfigureAwait(false); } } catch (Exception ex) { Logger.LogWarning(ex, "Error during SendRequestInternal for {uri}", requestMessage.RequestUri); throw; } } } private static bool IsTransientNetworkException(Exception ex) { var current = ex; while (current != null) { if (current is SocketException socketEx) { return socketEx.SocketErrorCode is SocketError.ConnectionReset or SocketError.ConnectionAborted or SocketError.TimedOut; } current = current.InnerException; } return false; } }