Refactored most of file download, redone it so correct usage of slots and better thread management. (#107)

Before: https://lightless.media/u/n5DhLTPR.mp4

After: https://lightless.media/u/sqvDR0Ho.mp4

Usage of the locks is way more optimized.

Co-authored-by: cake <admin@cakeandbanana.nl>
Reviewed-on: #107
Reviewed-by: defnotken <defnotken@noreply.git.lightless-sync.org>
This commit was merged in pull request #107.
This commit is contained in:
2025-12-26 20:42:43 +00:00
parent 1a2885fd74
commit 88cb778791
2 changed files with 866 additions and 713 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -18,56 +18,72 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
private readonly LightlessConfigService _lightlessConfig; private readonly LightlessConfigService _lightlessConfig;
private readonly object _semaphoreModificationLock = new(); private readonly object _semaphoreModificationLock = new();
private readonly TokenProvider _tokenProvider; private readonly TokenProvider _tokenProvider;
private int _availableDownloadSlots; private int _availableDownloadSlots;
private SemaphoreSlim _downloadSemaphore; private SemaphoreSlim _downloadSemaphore;
private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount; private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount;
public FileTransferOrchestrator(ILogger<FileTransferOrchestrator> logger, LightlessConfigService lightlessConfig, public FileTransferOrchestrator(
LightlessMediator mediator, TokenProvider tokenProvider, HttpClient httpClient) : base(logger, mediator) ILogger<FileTransferOrchestrator> logger,
LightlessConfigService lightlessConfig,
LightlessMediator mediator,
TokenProvider tokenProvider,
HttpClient httpClient) : base(logger, mediator)
{ {
_lightlessConfig = lightlessConfig; _lightlessConfig = lightlessConfig;
_tokenProvider = tokenProvider; _tokenProvider = tokenProvider;
_httpClient = httpClient; _httpClient = httpClient;
var ver = Assembly.GetExecutingAssembly().GetName().Version; var ver = Assembly.GetExecutingAssembly().GetName().Version;
_httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("LightlessSync", ver!.Major + "." + ver!.Minor + "." + ver!.Build)); _httpClient.DefaultRequestHeaders.UserAgent.Add(
new ProductInfoHeaderValue("LightlessSync", $"{ver!.Major}.{ver.Minor}.{ver.Build}"));
_availableDownloadSlots = lightlessConfig.Current.ParallelDownloads; _availableDownloadSlots = Math.Max(1, lightlessConfig.Current.ParallelDownloads);
_downloadSemaphore = new(_availableDownloadSlots, _availableDownloadSlots); _downloadSemaphore = new SemaphoreSlim(_availableDownloadSlots, _availableDownloadSlots);
Mediator.Subscribe<ConnectedMessage>(this, (msg) => Mediator.Subscribe<ConnectedMessage>(this, msg => FilesCdnUri = msg.Connection.ServerInfo.FileServerAddress);
{ Mediator.Subscribe<DisconnectedMessage>(this, _ => FilesCdnUri = null);
FilesCdnUri = msg.Connection.ServerInfo.FileServerAddress; Mediator.Subscribe<DownloadReadyMessage>(this, msg => _downloadReady[msg.RequestId] = true);
});
Mediator.Subscribe<DisconnectedMessage>(this, (msg) =>
{
FilesCdnUri = null;
});
Mediator.Subscribe<DownloadReadyMessage>(this, (msg) =>
{
_downloadReady[msg.RequestId] = true;
});
} }
/// <summary>
/// Files CDN Uri from server
/// </summary>
public Uri? FilesCdnUri { private set; get; } public Uri? FilesCdnUri { private set; get; }
/// <summary>
/// Forbidden file transfers given by server
/// </summary>
public List<FileTransfer> ForbiddenTransfers { get; } = []; public List<FileTransfer> ForbiddenTransfers { get; } = [];
/// <summary>
/// Is the FileTransferOrchestrator initialized
/// </summary>
public bool IsInitialized => FilesCdnUri != null; public bool IsInitialized => FilesCdnUri != null;
public void ClearDownloadRequest(Guid guid) /// <summary>
{ /// Configured parallel downloads in settings (ParallelDownloads)
_downloadReady.Remove(guid, out _); /// </summary>
} public int ConfiguredParallelDownloads => Math.Max(1, _lightlessConfig.Current.ParallelDownloads);
/// <summary>
/// Clears the download request for the given guid
/// </summary>
/// <param name="guid">Guid of download request</param>
public void ClearDownloadRequest(Guid guid) => _downloadReady.Remove(guid, out _);
/// <summary>
/// Is the download ready for the given guid
/// </summary>
/// <param name="guid">Guid of download request</param>
/// <returns>Completion of the download</returns>
public bool IsDownloadReady(Guid guid) public bool IsDownloadReady(Guid guid)
{ => _downloadReady.TryGetValue(guid, out bool isReady) && isReady;
if (_downloadReady.TryGetValue(guid, out bool isReady) && isReady)
{
return true;
}
return false;
}
/// <summary>
/// Release a download slot after download is complete
/// </summary>
public void ReleaseDownloadSlot() public void ReleaseDownloadSlot()
{ {
try try
@@ -81,60 +97,26 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
} }
} }
public async Task<HttpResponseMessage> SendRequestAsync(HttpMethod method, Uri uri, /// <summary>
CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, /// Wait for an available download slot asyncronously
bool withToken = true) /// </summary>
{ /// <param name="token">Cancellation Token</param>
return await SendRequestInternalAsync(() => new HttpRequestMessage(method, uri), /// <returns>Task of the slot</returns>
ct, httpCompletionOption, withToken, allowRetry: true).ConfigureAwait(false);
}
public async Task<HttpResponseMessage> SendRequestAsync<T>(HttpMethod method, Uri uri, T content, CancellationToken ct,
bool withToken = true) where T : class
{
return await SendRequestInternalAsync(() =>
{
var requestMessage = new HttpRequestMessage(method, uri);
if (content is not ByteArrayContent byteArrayContent)
{
requestMessage.Content = JsonContent.Create(content);
}
else
{
var clonedContent = new ByteArrayContent(byteArrayContent.ReadAsByteArrayAsync().GetAwaiter().GetResult());
foreach (var header in byteArrayContent.Headers)
{
clonedContent.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
requestMessage.Content = clonedContent;
}
return requestMessage;
}, ct, HttpCompletionOption.ResponseContentRead, withToken,
allowRetry: content is not HttpContent || content is ByteArrayContent).ConfigureAwait(false);
}
public async Task<HttpResponseMessage> SendRequestStreamAsync(HttpMethod method, Uri uri, ProgressableStreamContent content,
CancellationToken ct, bool withToken = true)
{
return await SendRequestInternalAsync(() =>
{
var requestMessage = new HttpRequestMessage(method, uri)
{
Content = content
};
return requestMessage;
}, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: false).ConfigureAwait(false);
}
public async Task WaitForDownloadSlotAsync(CancellationToken token) public async Task WaitForDownloadSlotAsync(CancellationToken token)
{ {
lock (_semaphoreModificationLock) lock (_semaphoreModificationLock)
{ {
if (_availableDownloadSlots != _lightlessConfig.Current.ParallelDownloads && _availableDownloadSlots == _downloadSemaphore.CurrentCount) var desired = Math.Max(1, _lightlessConfig.Current.ParallelDownloads);
if (_availableDownloadSlots != desired &&
_availableDownloadSlots == _downloadSemaphore.CurrentCount)
{ {
_availableDownloadSlots = _lightlessConfig.Current.ParallelDownloads; _availableDownloadSlots = desired;
_downloadSemaphore = new(_availableDownloadSlots, _availableDownloadSlots);
var old = _downloadSemaphore;
_downloadSemaphore = new SemaphoreSlim(_availableDownloadSlots, _availableDownloadSlots);
try { old.Dispose(); } catch { /* ignore */ }
} }
} }
@@ -142,10 +124,15 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
Mediator.Publish(new DownloadLimitChangedMessage()); Mediator.Publish(new DownloadLimitChangedMessage());
} }
/// <summary>
/// Download limit per slot in bytes
/// </summary>
/// <returns>Bytes of the download limit</returns>
public long DownloadLimitPerSlot() public long DownloadLimitPerSlot()
{ {
var limit = _lightlessConfig.Current.DownloadSpeedLimitInBytes; var limit = _lightlessConfig.Current.DownloadSpeedLimitInBytes;
if (limit <= 0) return 0; if (limit <= 0) return 0;
limit = _lightlessConfig.Current.DownloadSpeedType switch limit = _lightlessConfig.Current.DownloadSpeedType switch
{ {
LightlessConfiguration.Models.DownloadSpeeds.Bps => limit, LightlessConfiguration.Models.DownloadSpeeds.Bps => limit,
@@ -153,22 +140,113 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
LightlessConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024, LightlessConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024,
_ => limit, _ => limit,
}; };
var currentUsedDlSlots = CurrentlyUsedDownloadSlots;
var avaialble = _availableDownloadSlots; var usedSlots = CurrentlyUsedDownloadSlots;
var currentCount = _downloadSemaphore.CurrentCount; var divided = limit / (usedSlots <= 0 ? 1 : usedSlots);
var dividedLimit = limit / (currentUsedDlSlots == 0 ? 1 : currentUsedDlSlots);
if (dividedLimit < 0) if (divided < 0)
{ {
Logger.LogWarning("Calculated Bandwidth Limit is negative, returning Infinity: {value}, CurrentlyUsedDownloadSlots is {currentSlots}, " + Logger.LogWarning(
"DownloadSpeedLimit is {limit}, available slots: {avail}, current count: {count}", dividedLimit, currentUsedDlSlots, limit, avaialble, currentCount); "Calculated Bandwidth Limit is negative, returning Infinity: {value}, usedSlots={usedSlots}, limit={limit}, avail={avail}, currentCount={count}",
divided, usedSlots, limit, _availableDownloadSlots, _downloadSemaphore.CurrentCount);
return long.MaxValue; return long.MaxValue;
} }
return Math.Clamp(dividedLimit, 1, long.MaxValue);
return Math.Clamp(divided, 1, long.MaxValue);
} }
private async Task<HttpResponseMessage> SendRequestInternalAsync(Func<HttpRequestMessage> requestFactory, /// <summary>
CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead, /// sends an HTTP request without content serialization
bool withToken = true, bool allowRetry = true) /// </summary>
/// <param name="method">HttpMethod for the request</param>
/// <param name="uri">Uri for the request</param>
/// <param name="ct">Cancellation Token</param>
/// <param name="httpCompletionOption">Enum of HttpCollectionOption</param>
/// <param name="withToken">Include Cancellation Token</param>
/// <returns>Http response of the request</returns>
public async Task<HttpResponseMessage> SendRequestAsync(
HttpMethod method,
Uri uri,
CancellationToken? ct = null,
HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead,
bool withToken = true)
{
return await SendRequestInternalAsync(
() => new HttpRequestMessage(method, uri),
ct,
httpCompletionOption,
withToken,
allowRetry: true).ConfigureAwait(false);
}
/// <summary>
/// Sends an HTTP request with JSON content serialization
/// </summary>
/// <typeparam name="T">HttpResponseMessage</typeparam>
/// <param name="method">Http method</param>
/// <param name="uri">Url of the direct download link</param>
/// <param name="content">content of the request</param>
/// <param name="ct">cancellation token</param>
/// <param name="withToken">include cancellation token</param>
/// <returns></returns>
public async Task<HttpResponseMessage> SendRequestAsync<T>(
HttpMethod method,
Uri uri,
T content,
CancellationToken ct,
bool withToken = true) where T : class
{
return await SendRequestInternalAsync(() =>
{
var requestMessage = new HttpRequestMessage(method, uri);
if (content is ByteArrayContent byteArrayContent)
{
var bytes = byteArrayContent.ReadAsByteArrayAsync(ct).GetAwaiter().GetResult();
var cloned = new ByteArrayContent(bytes);
foreach (var header in byteArrayContent.Headers)
cloned.Headers.TryAddWithoutValidation(header.Key, header.Value);
requestMessage.Content = cloned;
}
else
{
requestMessage.Content = JsonContent.Create(content);
}
return requestMessage;
}, ct, HttpCompletionOption.ResponseContentRead, withToken,
allowRetry: content is not HttpContent || content is ByteArrayContent).ConfigureAwait(false);
}
public async Task<HttpResponseMessage> SendRequestStreamAsync(
HttpMethod method,
Uri uri,
ProgressableStreamContent content,
CancellationToken ct,
bool withToken = true)
{
return await SendRequestInternalAsync(() =>
{
return new HttpRequestMessage(method, uri) { Content = content };
}, ct, HttpCompletionOption.ResponseContentRead, withToken, allowRetry: false).ConfigureAwait(false);
}
/// <summary>
/// sends an HTTP request with optional retry logic for transient network errors
/// </summary>
/// <param name="requestFactory">Request factory</param>
/// <param name="ct">Cancellation Token</param>
/// <param name="httpCompletionOption">Http Options</param>
/// <param name="withToken">With cancellation token</param>
/// <param name="allowRetry">Allows retry of request</param>
/// <returns>Response message of request</returns>
private async Task<HttpResponseMessage> SendRequestInternalAsync(
Func<HttpRequestMessage> requestFactory,
CancellationToken? ct = null,
HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead,
bool withToken = true,
bool allowRetry = true)
{ {
const int maxAttempts = 2; const int maxAttempts = 2;
var attempt = 0; var attempt = 0;
@@ -184,8 +262,11 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token); requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
} }
if (requestMessage.Content != null && requestMessage.Content is not StreamContent && requestMessage.Content is not ByteArrayContent) if (requestMessage.Content != null &&
requestMessage.Content is not StreamContent &&
requestMessage.Content is not ByteArrayContent)
{ {
// log content for debugging
var content = await ((JsonContent)requestMessage.Content).ReadAsStringAsync().ConfigureAwait(false); var content = await ((JsonContent)requestMessage.Content).ReadAsStringAsync().ConfigureAwait(false);
Logger.LogDebug("Sending {method} to {uri} (Content: {content})", requestMessage.Method, requestMessage.RequestUri, content); Logger.LogDebug("Sending {method} to {uri} (Content: {content})", requestMessage.Method, requestMessage.RequestUri, content);
} }
@@ -196,9 +277,10 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
try try
{ {
if (ct != null) // send request
return await _httpClient.SendAsync(requestMessage, httpCompletionOption, ct.Value).ConfigureAwait(false); return ct != null
return await _httpClient.SendAsync(requestMessage, httpCompletionOption).ConfigureAwait(false); ? await _httpClient.SendAsync(requestMessage, httpCompletionOption, ct.Value).ConfigureAwait(false)
: await _httpClient.SendAsync(requestMessage, httpCompletionOption).ConfigureAwait(false);
} }
catch (TaskCanceledException) catch (TaskCanceledException)
{ {
@@ -208,14 +290,11 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
{ {
Logger.LogWarning(ex, "Transient error during SendRequestInternal for {uri}, retrying attempt {attempt}/{maxAttempts}", Logger.LogWarning(ex, "Transient error during SendRequestInternal for {uri}, retrying attempt {attempt}/{maxAttempts}",
requestMessage.RequestUri, attempt, maxAttempts); requestMessage.RequestUri, attempt, maxAttempts);
if (ct.HasValue) if (ct.HasValue)
{
await Task.Delay(TimeSpan.FromMilliseconds(200), ct.Value).ConfigureAwait(false); await Task.Delay(TimeSpan.FromMilliseconds(200), ct.Value).ConfigureAwait(false);
}
else else
{
await Task.Delay(TimeSpan.FromMilliseconds(200)).ConfigureAwait(false); await Task.Delay(TimeSpan.FromMilliseconds(200)).ConfigureAwait(false);
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -225,6 +304,11 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
} }
} }
/// <summary>
/// Is the exception a transient network exception
/// </summary>
/// <param name="ex">expection</param>
/// <returns>Is transient network expection</returns>
private static bool IsTransientNetworkException(Exception ex) private static bool IsTransientNetworkException(Exception ex)
{ {
var current = ex; var current = ex;
@@ -232,12 +316,13 @@ public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
{ {
if (current is SocketException socketEx) if (current is SocketException socketEx)
{ {
return socketEx.SocketErrorCode is SocketError.ConnectionReset or SocketError.ConnectionAborted or SocketError.TimedOut; return socketEx.SocketErrorCode is
SocketError.ConnectionReset or
SocketError.ConnectionAborted or
SocketError.TimedOut;
} }
current = current.InnerException; current = current.InnerException;
} }
return false; return false;
} }
} }