Files
LightlessClient/LightlessSync/WebAPI/Files/FileTransferOrchestrator.cs

329 lines
12 KiB
C#

using LightlessSync.LightlessConfiguration;
using LightlessSync.Services.Mediator;
using LightlessSync.WebAPI.Files.Models;
using LightlessSync.WebAPI.SignalR;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Net.Http.Headers;
using System.Net.Http.Json;
using System.Net.Sockets;
using System.Reflection;
namespace LightlessSync.WebAPI.Files;
public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
{
private readonly ConcurrentDictionary<Guid, bool> _downloadReady = new();
private readonly HttpClient _httpClient;
private readonly LightlessConfigService _lightlessConfig;
private readonly object _semaphoreModificationLock = new();
private readonly TokenProvider _tokenProvider;
private int _availableDownloadSlots;
private SemaphoreSlim _downloadSemaphore;
private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount;
public FileTransferOrchestrator(
ILogger<FileTransferOrchestrator> logger,
LightlessConfigService lightlessConfig,
LightlessMediator mediator,
TokenProvider tokenProvider,
HttpClient httpClient) : base(logger, mediator)
{
_lightlessConfig = lightlessConfig;
_tokenProvider = tokenProvider;
_httpClient = httpClient;
var ver = Assembly.GetExecutingAssembly().GetName().Version;
_httpClient.DefaultRequestHeaders.UserAgent.Add(
new ProductInfoHeaderValue("LightlessSync", $"{ver!.Major}.{ver.Minor}.{ver.Build}"));
_availableDownloadSlots = Math.Max(1, lightlessConfig.Current.ParallelDownloads);
_downloadSemaphore = new SemaphoreSlim(_availableDownloadSlots, _availableDownloadSlots);
Mediator.Subscribe<ConnectedMessage>(this, msg => FilesCdnUri = msg.Connection.ServerInfo.FileServerAddress);
Mediator.Subscribe<DisconnectedMessage>(this, _ => FilesCdnUri = null);
Mediator.Subscribe<DownloadReadyMessage>(this, msg => _downloadReady[msg.RequestId] = true);
}
/// <summary>
/// Files CDN Uri from server
/// </summary>
public Uri? FilesCdnUri { private set; get; }
/// <summary>
/// Forbidden file transfers given by server
/// </summary>
public List<FileTransfer> ForbiddenTransfers { get; } = [];
/// <summary>
/// Is the FileTransferOrchestrator initialized
/// </summary>
public bool IsInitialized => FilesCdnUri != null;
/// <summary>
/// Configured parallel downloads in settings (ParallelDownloads)
/// </summary>
public int ConfiguredParallelDownloads => Math.Max(1, _lightlessConfig.Current.ParallelDownloads);
/// <summary>
/// Clears the download request for the given guid
/// </summary>
/// <param name="guid">Guid of download request</param>
public void ClearDownloadRequest(Guid guid) => _downloadReady.Remove(guid, out _);
/// <summary>
/// Is the download ready for the given guid
/// </summary>
/// <param name="guid">Guid of download request</param>
/// <returns>Completion of the download</returns>
public bool IsDownloadReady(Guid guid)
=> _downloadReady.TryGetValue(guid, out bool isReady) && isReady;
/// <summary>
/// Release a download slot after download is complete
/// </summary>
public void ReleaseDownloadSlot()
{
try
{
_downloadSemaphore.Release();
Mediator.Publish(new DownloadLimitChangedMessage());
}
catch (SemaphoreFullException)
{
// ignore
}
}
/// <summary>
/// Wait for an available download slot asyncronously
/// </summary>
/// <param name="token">Cancellation Token</param>
/// <returns>Task of the slot</returns>
public async Task WaitForDownloadSlotAsync(CancellationToken token)
{
lock (_semaphoreModificationLock)
{
var desired = Math.Max(1, _lightlessConfig.Current.ParallelDownloads);
if (_availableDownloadSlots != desired &&
_availableDownloadSlots == _downloadSemaphore.CurrentCount)
{
_availableDownloadSlots = desired;
var old = _downloadSemaphore;
_downloadSemaphore = new SemaphoreSlim(_availableDownloadSlots, _availableDownloadSlots);
try { old.Dispose(); } catch { /* ignore */ }
}
}
await _downloadSemaphore.WaitAsync(token).ConfigureAwait(false);
Mediator.Publish(new DownloadLimitChangedMessage());
}
/// <summary>
/// Download limit per slot in bytes
/// </summary>
/// <returns>Bytes of the download limit</returns>
public long DownloadLimitPerSlot()
{
var limit = _lightlessConfig.Current.DownloadSpeedLimitInBytes;
if (limit <= 0) return 0;
limit = _lightlessConfig.Current.DownloadSpeedType switch
{
LightlessConfiguration.Models.DownloadSpeeds.Bps => limit,
LightlessConfiguration.Models.DownloadSpeeds.KBps => limit * 1024,
LightlessConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024,
_ => limit,
};
var usedSlots = CurrentlyUsedDownloadSlots;
var divided = limit / (usedSlots <= 0 ? 1 : usedSlots);
if (divided < 0)
{
Logger.LogWarning(
"Calculated Bandwidth Limit is negative, returning Infinity: {value}, usedSlots={usedSlots}, limit={limit}, avail={avail}, currentCount={count}",
divided, usedSlots, limit, _availableDownloadSlots, _downloadSemaphore.CurrentCount);
return long.MaxValue;
}
return Math.Clamp(divided, 1, long.MaxValue);
}
/// <summary>
/// sends an HTTP request without content serialization
/// </summary>
/// <param name="method">HttpMethod for the request</param>
/// <param name="uri">Uri for the request</param>
/// <param name="ct">Cancellation Token</param>
/// <param name="httpCompletionOption">Enum of HttpCollectionOption</param>
/// <param name="withToken">Include Cancellation Token</param>
/// <returns>Http response of the request</returns>
public async Task<HttpResponseMessage> SendRequestAsync(
HttpMethod method,
Uri uri,
CancellationToken? ct = null,
HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead,
bool withToken = true)
{
return await SendRequestInternalAsync(
() => new HttpRequestMessage(method, uri),
ct,
httpCompletionOption,
withToken,
allowRetry: true).ConfigureAwait(false);
}
/// <summary>
/// Sends an HTTP request with JSON content serialization
/// </summary>
/// <typeparam name="T">HttpResponseMessage</typeparam>
/// <param name="method">Http method</param>
/// <param name="uri">Url of the direct download link</param>
/// <param name="content">content of the request</param>
/// <param name="ct">cancellation token</param>
/// <param name="withToken">include cancellation token</param>
/// <returns></returns>
public async Task<HttpResponseMessage> SendRequestAsync<T>(
HttpMethod method,
Uri uri,
T content,
CancellationToken ct,
bool withToken = true) where T : class
{
return await SendRequestInternalAsync(() =>
{
var requestMessage = new HttpRequestMessage(method, uri);
if (content is ByteArrayContent byteArrayContent)
{
var bytes = byteArrayContent.ReadAsByteArrayAsync(ct).GetAwaiter().GetResult();
var cloned = new ByteArrayContent(bytes);
foreach (var header in byteArrayContent.Headers)
cloned.Headers.TryAddWithoutValidation(header.Key, header.Value);
requestMessage.Content = cloned;
}
else
{
requestMessage.Content = JsonContent.Create(content);
}
return requestMessage;
}, ct, HttpCompletionOption.ResponseContentRead, withToken,
allowRetry: content is not HttpContent || content is ByteArrayContent).ConfigureAwait(false);
}
public async Task<HttpResponseMessage> SendRequestStreamAsync(
HttpMethod method,
Uri uri,
ProgressableStreamContent content,
CancellationToken ct,
bool withToken = true)
{
return await SendRequestInternalAsync(() =>
{
return new HttpRequestMessage(method, uri) { Content = content };
}, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: false).ConfigureAwait(false);
}
/// <summary>
/// sends an HTTP request with optional retry logic for transient network errors
/// </summary>
/// <param name="requestFactory">Request factory</param>
/// <param name="ct">Cancellation Token</param>
/// <param name="httpCompletionOption">Http Options</param>
/// <param name="withToken">With cancellation token</param>
/// <param name="allowRetry">Allows retry of request</param>
/// <returns>Response message of request</returns>
private async Task<HttpResponseMessage> SendRequestInternalAsync(
Func<HttpRequestMessage> requestFactory,
CancellationToken? ct = null,
HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead,
bool withToken = true,
bool allowRetry = true)
{
const int maxAttempts = 2;
var attempt = 0;
while (true)
{
attempt++;
using var requestMessage = requestFactory();
if (withToken)
{
var token = await _tokenProvider.GetToken().ConfigureAwait(false);
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
}
if (requestMessage.Content != null &&
requestMessage.Content is not StreamContent &&
requestMessage.Content is not ByteArrayContent)
{
// log content for debugging
var content = await ((JsonContent)requestMessage.Content).ReadAsStringAsync().ConfigureAwait(false);
Logger.LogDebug("Sending {method} to {uri} (Content: {content})", requestMessage.Method, requestMessage.RequestUri, content);
}
else
{
Logger.LogDebug("Sending {method} to {uri}", requestMessage.Method, requestMessage.RequestUri);
}
try
{
// send request
return ct != null
? await _httpClient.SendAsync(requestMessage, httpCompletionOption, ct.Value).ConfigureAwait(false)
: await _httpClient.SendAsync(requestMessage, httpCompletionOption).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
throw;
}
catch (Exception ex) when (allowRetry && attempt < maxAttempts && IsTransientNetworkException(ex))
{
Logger.LogWarning(ex, "Transient error during SendRequestInternal for {uri}, retrying attempt {attempt}/{maxAttempts}",
requestMessage.RequestUri, attempt, maxAttempts);
if (ct.HasValue)
await Task.Delay(TimeSpan.FromMilliseconds(200), ct.Value).ConfigureAwait(false);
else
await Task.Delay(TimeSpan.FromMilliseconds(200)).ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Error during SendRequestInternal for {uri}", requestMessage.RequestUri);
throw;
}
}
}
/// <summary>
/// Is the exception a transient network exception
/// </summary>
/// <param name="ex">expection</param>
/// <returns>Is transient network expection</returns>
private static bool IsTransientNetworkException(Exception ex)
{
var current = ex;
while (current != null)
{
if (current is SocketException socketEx)
{
return socketEx.SocketErrorCode is
SocketError.ConnectionReset or
SocketError.ConnectionAborted or
SocketError.TimedOut;
}
current = current.InnerException;
}
return false;
}
}