adds the ability for shards to sync with the main server regarding their caches, when pulling files from main server the shard can stream it to the client directly while downloading and add info for server to report to client regarding file locations across shards
This commit is contained in:
Submodule LightlessAPI updated: 4ecd5375e6...89bcc242cf
@@ -0,0 +1,14 @@
|
||||
namespace LightlessSyncShared.Models;
|
||||
|
||||
public sealed record ShardFileInventoryUpdateDto
|
||||
{
|
||||
public long Sequence { get; init; }
|
||||
public bool IsFullSnapshot { get; init; }
|
||||
public List<string> Added { get; init; } = new();
|
||||
public List<string> Removed { get; init; } = new();
|
||||
}
|
||||
|
||||
public sealed record ShardFileInventoryUpdateAckDto
|
||||
{
|
||||
public long AppliedSequence { get; init; }
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
using LightlessSync.API.Routes;
|
||||
using LightlessSyncShared.Utils.Configuration;
|
||||
using LightlessSyncStaticFilesServer.Services;
|
||||
using LightlessSyncShared.Models;
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
|
||||
@@ -63,4 +64,19 @@ public class MainController : ControllerBase
|
||||
return BadRequest();
|
||||
}
|
||||
}
|
||||
|
||||
[HttpPost(LightlessFiles.Main_ShardFiles)]
|
||||
public IActionResult ShardFilesUpdate([FromBody] ShardFileInventoryUpdateDto update)
|
||||
{
|
||||
try
|
||||
{
|
||||
var applied = _shardRegistrationService.ApplyFileInventoryUpdate(LightlessUser, update);
|
||||
return Ok(new ShardFileInventoryUpdateAckDto { AppliedSequence = applied });
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Shard file inventory update failed: {shard}", LightlessUser);
|
||||
return BadRequest();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -82,7 +82,9 @@ public class ServerFilesController : ControllerBase
|
||||
}
|
||||
|
||||
[HttpGet(LightlessFiles.ServerFiles_GetSizes)]
|
||||
public async Task<IActionResult> FilesGetSizes([FromBody] List<string> hashes)
|
||||
public async Task<IActionResult> FilesGetSizes(
|
||||
[FromBody] List<string> hashes,
|
||||
[FromQuery(Name = "avoidHost")] List<string>? avoidHosts = null)
|
||||
{
|
||||
using var dbContext = await _lightlessDbContext.CreateDbContextAsync();
|
||||
var forbiddenFiles = await dbContext.ForbiddenUploadEntries.
|
||||
@@ -94,27 +96,97 @@ public class ServerFilesController : ControllerBase
|
||||
.Select(k => new { k.Hash, k.Size, k.RawSize })
|
||||
.ToListAsync().ConfigureAwait(false);
|
||||
|
||||
var allFileShards = _shardRegistrationService.GetConfigurationsByContinent(Continent);
|
||||
var avoidHostSet = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
if (avoidHosts != null)
|
||||
{
|
||||
foreach (var host in avoidHosts)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(host))
|
||||
{
|
||||
avoidHostSet.Add(host);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var allFileShards = _shardRegistrationService.GetShardEntriesByContinent(Continent);
|
||||
var shardContexts = new List<ShardSelectionContext>(allFileShards.Count);
|
||||
foreach (var shard in allFileShards)
|
||||
{
|
||||
shardContexts.Add(new ShardSelectionContext(
|
||||
shard.ShardName,
|
||||
shard.Config,
|
||||
new Regex(shard.Config.FileMatch, RegexOptions.Compiled)));
|
||||
}
|
||||
|
||||
foreach (var file in cacheFile)
|
||||
{
|
||||
var forbiddenFile = forbiddenFiles.SingleOrDefault(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase));
|
||||
Uri? baseUrl = null;
|
||||
Uri? queuedBaseUrl = null;
|
||||
Uri? directBaseUrl = null;
|
||||
var queuedUrls = new List<string>();
|
||||
var hasFileUrls = new List<string>();
|
||||
var hasFileDirectUrls = new List<string>();
|
||||
var pullThroughUrls = new List<string>();
|
||||
var pullThroughDirectUrls = new List<string>();
|
||||
|
||||
if (forbiddenFile == null)
|
||||
{
|
||||
var matchingShards = allFileShards.Where(f => new Regex(f.FileMatch).IsMatch(file.Hash)).ToList();
|
||||
var matchingShards = shardContexts
|
||||
.Where(f => f.FileMatchRegex.IsMatch(file.Hash))
|
||||
.ToList();
|
||||
|
||||
var shard = matchingShards.SelectMany(g => g.RegionUris)
|
||||
.OrderBy(g => Guid.NewGuid()).FirstOrDefault();
|
||||
foreach (var shardEntry in matchingShards)
|
||||
{
|
||||
var regionUris = shardEntry.GetRegionUris(avoidHostSet);
|
||||
|
||||
baseUrl = shard.Value ?? _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
|
||||
if (regionUris.Count == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach (var uri in regionUris)
|
||||
{
|
||||
AddBaseUrl(queuedUrls, uri);
|
||||
}
|
||||
|
||||
var hasFile = !string.IsNullOrEmpty(shardEntry.ShardName)
|
||||
&& _shardRegistrationService.ShardHasFile(shardEntry.ShardName, file.Hash);
|
||||
|
||||
var baseList = hasFile ? hasFileUrls : pullThroughUrls;
|
||||
var directList = hasFile ? hasFileDirectUrls : pullThroughDirectUrls;
|
||||
|
||||
foreach (var uri in regionUris)
|
||||
{
|
||||
AddCandidate(baseList, directList, uri, file.Hash);
|
||||
}
|
||||
}
|
||||
|
||||
if (queuedUrls.Count == 0)
|
||||
{
|
||||
var fallback = _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
|
||||
if (fallback != null && (avoidHostSet.Count == 0 || !IsAvoidedHost(fallback, avoidHostSet)))
|
||||
{
|
||||
AddBaseUrl(queuedUrls, fallback);
|
||||
}
|
||||
}
|
||||
|
||||
if (hasFileUrls.Count == 0 && pullThroughUrls.Count == 0)
|
||||
{
|
||||
var fallback = _configuration.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl));
|
||||
if (fallback != null && (avoidHostSet.Count == 0 || !IsAvoidedHost(fallback, avoidHostSet)))
|
||||
{
|
||||
AddCandidate(pullThroughUrls, pullThroughDirectUrls, fallback, file.Hash);
|
||||
}
|
||||
}
|
||||
|
||||
queuedBaseUrl = SelectPreferredBase(queuedUrls);
|
||||
directBaseUrl = SelectPreferredBase(hasFileUrls, pullThroughUrls);
|
||||
}
|
||||
|
||||
var cdnDownloadUrl = string.Empty;
|
||||
if (forbiddenFile == null)
|
||||
{
|
||||
var directUri = _cdnDownloadUrlService.TryCreateDirectDownloadUri(baseUrl, file.Hash);
|
||||
var directUri = _cdnDownloadUrlService.TryCreateDirectDownloadUri(directBaseUrl, file.Hash);
|
||||
if (directUri != null)
|
||||
{
|
||||
cdnDownloadUrl = directUri.ToString();
|
||||
@@ -128,8 +200,10 @@ public class ServerFilesController : ControllerBase
|
||||
IsForbidden = forbiddenFile != null,
|
||||
Hash = file.Hash,
|
||||
Size = file.Size,
|
||||
Url = baseUrl?.ToString() ?? string.Empty,
|
||||
Url = queuedBaseUrl?.ToString() ?? string.Empty,
|
||||
CDNDownloadUrl = cdnDownloadUrl,
|
||||
HasFileDirectUrls = hasFileDirectUrls,
|
||||
PullThroughDirectUrls = pullThroughDirectUrls,
|
||||
RawSize = file.RawSize
|
||||
});
|
||||
}
|
||||
@@ -144,22 +218,127 @@ public class ServerFilesController : ControllerBase
|
||||
return Ok(JsonSerializer.Serialize(allFileShards.SelectMany(t => t.RegionUris.Select(v => v.Value.ToString()))));
|
||||
}
|
||||
|
||||
private static bool IsAvoidedHost(Uri uri, HashSet<string> avoidHosts)
|
||||
{
|
||||
if (avoidHosts.Count == 0)
|
||||
return false;
|
||||
|
||||
var host = uri.Host;
|
||||
if (!string.IsNullOrWhiteSpace(host) && avoidHosts.Contains(host))
|
||||
return true;
|
||||
|
||||
var authority = uri.Authority;
|
||||
if (!string.IsNullOrWhiteSpace(authority) && avoidHosts.Contains(authority))
|
||||
return true;
|
||||
|
||||
var absolute = uri.ToString().TrimEnd('/');
|
||||
return avoidHosts.Contains(absolute);
|
||||
}
|
||||
|
||||
private sealed class ShardSelectionContext
|
||||
{
|
||||
private List<Uri>? _cachedUris;
|
||||
private List<Uri>? _cachedAvoidedUris;
|
||||
|
||||
public ShardSelectionContext(string shardName, ShardConfiguration config, Regex fileMatchRegex)
|
||||
{
|
||||
ShardName = shardName;
|
||||
Config = config;
|
||||
FileMatchRegex = fileMatchRegex;
|
||||
}
|
||||
|
||||
public string ShardName { get; }
|
||||
public ShardConfiguration Config { get; }
|
||||
public Regex FileMatchRegex { get; }
|
||||
|
||||
public List<Uri> GetRegionUris(HashSet<string> avoidHosts)
|
||||
{
|
||||
if (_cachedUris == null)
|
||||
{
|
||||
_cachedUris = Config.RegionUris.Values.ToList();
|
||||
}
|
||||
|
||||
if (avoidHosts.Count == 0)
|
||||
{
|
||||
return _cachedUris;
|
||||
}
|
||||
|
||||
_cachedAvoidedUris ??= _cachedUris.Where(u => !IsAvoidedHost(u, avoidHosts)).ToList();
|
||||
return _cachedAvoidedUris.Count > 0 ? _cachedAvoidedUris : _cachedUris;
|
||||
}
|
||||
}
|
||||
|
||||
private void AddCandidate(List<string> baseUrls, List<string> directUrls, Uri baseUri, string hash)
|
||||
{
|
||||
var baseUrl = baseUri.ToString();
|
||||
if (baseUrls.Any(u => string.Equals(u, baseUrl, StringComparison.OrdinalIgnoreCase)))
|
||||
return;
|
||||
|
||||
baseUrls.Add(baseUrl);
|
||||
|
||||
var direct = _cdnDownloadUrlService.TryCreateDirectDownloadUri(baseUri, hash);
|
||||
directUrls.Add(direct?.ToString() ?? string.Empty);
|
||||
}
|
||||
|
||||
private static void AddBaseUrl(List<string> baseUrls, Uri baseUri)
|
||||
{
|
||||
var baseUrl = baseUri.ToString();
|
||||
if (baseUrls.Any(u => string.Equals(u, baseUrl, StringComparison.OrdinalIgnoreCase)))
|
||||
return;
|
||||
|
||||
baseUrls.Add(baseUrl);
|
||||
}
|
||||
|
||||
private static Uri? SelectPreferredBase(List<string> urls)
|
||||
{
|
||||
if (urls.Count == 0)
|
||||
return null;
|
||||
|
||||
var selected = urls[Random.Shared.Next(urls.Count)];
|
||||
return Uri.TryCreate(selected, UriKind.Absolute, out var uri) ? uri : null;
|
||||
}
|
||||
|
||||
private static Uri? SelectPreferredBase(List<string> hasFileUrls, List<string> pullThroughUrls)
|
||||
{
|
||||
var list = hasFileUrls.Count > 0 ? hasFileUrls : pullThroughUrls;
|
||||
if (list.Count == 0)
|
||||
return null;
|
||||
|
||||
var selected = list[Random.Shared.Next(list.Count)];
|
||||
return Uri.TryCreate(selected, UriKind.Absolute, out var uri) ? uri : null;
|
||||
}
|
||||
|
||||
[HttpGet(LightlessFiles.ServerFiles_DirectDownload + "/{hash}")]
|
||||
[AllowAnonymous]
|
||||
public async Task<IActionResult> DownloadFileDirect(string hash, [FromQuery] long expires, [FromQuery] string signature)
|
||||
{
|
||||
var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature).ConfigureAwait(false);
|
||||
var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature, HttpContext.RequestAborted).ConfigureAwait(false);
|
||||
|
||||
return result.Status switch
|
||||
{
|
||||
CDNDownloadsService.ResultStatus.Disabled => NotFound(),
|
||||
CDNDownloadsService.ResultStatus.Unauthorized => Unauthorized(),
|
||||
CDNDownloadsService.ResultStatus.NotFound => NotFound(),
|
||||
CDNDownloadsService.ResultStatus.Success => PhysicalFile(result.File!.FullName, "application/octet-stream"),
|
||||
CDNDownloadsService.ResultStatus.Success => BuildDirectDownloadResult(result),
|
||||
_ => NotFound()
|
||||
};
|
||||
}
|
||||
|
||||
private IActionResult BuildDirectDownloadResult(CDNDownloadsService.Result result)
|
||||
{
|
||||
if (result.Stream != null)
|
||||
{
|
||||
if (result.ContentLength.HasValue)
|
||||
{
|
||||
Response.ContentLength = result.ContentLength.Value;
|
||||
}
|
||||
|
||||
return new FileStreamResult(result.Stream, "application/octet-stream");
|
||||
}
|
||||
|
||||
return PhysicalFile(result.File!.FullName, "application/octet-stream");
|
||||
}
|
||||
|
||||
[HttpPost(LightlessFiles.ServerFiles_FilesSend)]
|
||||
public async Task<IActionResult> FilesSend([FromBody] FilesSendDto filesSendDto)
|
||||
{
|
||||
|
||||
@@ -20,15 +20,30 @@ public class ShardServerFilesController : ControllerBase
|
||||
[AllowAnonymous]
|
||||
public async Task<IActionResult> DownloadFileDirect(string hash, [FromQuery] long expires, [FromQuery] string signature)
|
||||
{
|
||||
var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature).ConfigureAwait(false);
|
||||
var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature, HttpContext.RequestAborted).ConfigureAwait(false);
|
||||
|
||||
return result.Status switch
|
||||
{
|
||||
CDNDownloadsService.ResultStatus.Disabled => NotFound(),
|
||||
CDNDownloadsService.ResultStatus.Unauthorized => Unauthorized(),
|
||||
CDNDownloadsService.ResultStatus.NotFound => NotFound(),
|
||||
CDNDownloadsService.ResultStatus.Success => PhysicalFile(result.File!.FullName, "application/octet-stream"),
|
||||
CDNDownloadsService.ResultStatus.Success => BuildDirectDownloadResult(result),
|
||||
_ => NotFound()
|
||||
};
|
||||
}
|
||||
|
||||
private IActionResult BuildDirectDownloadResult(CDNDownloadsService.Result result)
|
||||
{
|
||||
if (result.Stream != null)
|
||||
{
|
||||
if (result.ContentLength.HasValue)
|
||||
{
|
||||
Response.ContentLength = result.ContentLength.Value;
|
||||
}
|
||||
|
||||
return new FileStreamResult(result.Stream, "application/octet-stream");
|
||||
}
|
||||
|
||||
return PhysicalFile(result.File!.FullName, "application/octet-stream");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace LightlessSyncStaticFilesServer.Services;
|
||||
@@ -13,7 +14,7 @@ public class CDNDownloadsService
|
||||
Success
|
||||
}
|
||||
|
||||
public readonly record struct Result(ResultStatus Status, FileInfo? File);
|
||||
public readonly record struct Result(ResultStatus Status, FileInfo? File, Stream? Stream, long? ContentLength);
|
||||
|
||||
private readonly CDNDownloadUrlService _cdnDownloadUrlService;
|
||||
private readonly CachedFileProvider _cachedFileProvider;
|
||||
@@ -26,31 +27,31 @@ public class CDNDownloadsService
|
||||
|
||||
public bool DownloadsEnabled => _cdnDownloadUrlService.DirectDownloadsEnabled;
|
||||
|
||||
public async Task<Result> GetDownloadAsync(string hash, long expiresUnixSeconds, string signature)
|
||||
public async Task<Result> GetDownloadAsync(string hash, long expiresUnixSeconds, string signature, CancellationToken ct)
|
||||
{
|
||||
if (!_cdnDownloadUrlService.DirectDownloadsEnabled)
|
||||
{
|
||||
return new Result(ResultStatus.Disabled, null);
|
||||
return new Result(ResultStatus.Disabled, null, null, null);
|
||||
}
|
||||
|
||||
if (string.IsNullOrEmpty(signature) || string.IsNullOrEmpty(hash))
|
||||
{
|
||||
return new Result(ResultStatus.Unauthorized, null);
|
||||
return new Result(ResultStatus.Unauthorized, null, null, null);
|
||||
}
|
||||
|
||||
hash = hash.ToUpperInvariant();
|
||||
|
||||
if (!_cdnDownloadUrlService.TryValidateSignature(hash, expiresUnixSeconds, signature))
|
||||
{
|
||||
return new Result(ResultStatus.Unauthorized, null);
|
||||
return new Result(ResultStatus.Unauthorized, null, null, null);
|
||||
}
|
||||
|
||||
var fileInfo = await _cachedFileProvider.DownloadAndGetLocalFileInfo(hash).ConfigureAwait(false);
|
||||
if (fileInfo == null)
|
||||
var fileResult = await _cachedFileProvider.GetFileStreamForDirectDownloadAsync(hash, ct).ConfigureAwait(false);
|
||||
if (fileResult == null)
|
||||
{
|
||||
return new Result(ResultStatus.NotFound, null);
|
||||
return new Result(ResultStatus.NotFound, null, null, null);
|
||||
}
|
||||
|
||||
return new Result(ResultStatus.Success, fileInfo);
|
||||
return new Result(ResultStatus.Success, fileResult.Value.File, fileResult.Value.Stream, fileResult.Value.ContentLength);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ public sealed class CachedFileProvider : IDisposable
|
||||
private readonly FileStatisticsService _fileStatisticsService;
|
||||
private readonly LightlessMetrics _metrics;
|
||||
private readonly ServerTokenGenerator _generator;
|
||||
private readonly IShardFileInventoryReporter _inventoryReporter;
|
||||
private readonly Uri _remoteCacheSourceUri;
|
||||
private readonly string _hotStoragePath;
|
||||
private readonly ConcurrentDictionary<string, Task> _currentTransfers = new(StringComparer.Ordinal);
|
||||
@@ -27,13 +28,15 @@ public sealed class CachedFileProvider : IDisposable
|
||||
private bool _isDistributionServer;
|
||||
|
||||
public CachedFileProvider(IConfigurationService<StaticFilesServerConfiguration> configuration, ILogger<CachedFileProvider> logger,
|
||||
FileStatisticsService fileStatisticsService, LightlessMetrics metrics, ServerTokenGenerator generator)
|
||||
FileStatisticsService fileStatisticsService, LightlessMetrics metrics, ServerTokenGenerator generator,
|
||||
IShardFileInventoryReporter inventoryReporter)
|
||||
{
|
||||
_configuration = configuration;
|
||||
_logger = logger;
|
||||
_fileStatisticsService = fileStatisticsService;
|
||||
_metrics = metrics;
|
||||
_generator = generator;
|
||||
_inventoryReporter = inventoryReporter;
|
||||
_remoteCacheSourceUri = configuration.GetValueOrDefault<Uri>(nameof(StaticFilesServerConfiguration.DistributionFileServerAddress), null);
|
||||
_isDistributionServer = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.IsDistributionNode), false);
|
||||
_hotStoragePath = configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
@@ -97,10 +100,11 @@ public sealed class CachedFileProvider : IDisposable
|
||||
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash).Length);
|
||||
_inventoryReporter.ReportAdded(hash);
|
||||
response.Dispose();
|
||||
}
|
||||
|
||||
private bool TryCopyFromColdStorage(string hash, string destinationFilePath)
|
||||
private bool TryCopyFromColdStorage(string hash, string destinationFilePath)
|
||||
{
|
||||
if (!_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false)) return false;
|
||||
|
||||
@@ -124,6 +128,7 @@ public sealed class CachedFileProvider : IDisposable
|
||||
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, new FileInfo(destinationFilePath).Length);
|
||||
_inventoryReporter.ReportAdded(hash);
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -134,6 +139,93 @@ public sealed class CachedFileProvider : IDisposable
|
||||
return false;
|
||||
}
|
||||
|
||||
public async Task<CachedFileStreamResult?> GetFileStreamForDirectDownloadAsync(string hash, CancellationToken ct)
|
||||
{
|
||||
var destinationFilePath = FilePathUtil.GetFilePath(_hotStoragePath, hash);
|
||||
|
||||
var existing = GetLocalFilePath(hash);
|
||||
if (existing != null)
|
||||
{
|
||||
return new CachedFileStreamResult(existing, null, existing.Length);
|
||||
}
|
||||
|
||||
if (TryCopyFromColdStorage(hash, destinationFilePath))
|
||||
{
|
||||
var coldFile = GetLocalFilePath(hash);
|
||||
if (coldFile != null)
|
||||
{
|
||||
return new CachedFileStreamResult(coldFile, null, coldFile.Length);
|
||||
}
|
||||
}
|
||||
|
||||
if (_remoteCacheSourceUri == null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
TaskCompletionSource<bool>? completion = null;
|
||||
|
||||
await _downloadSemaphore.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_currentTransfers.TryGetValue(hash, out var downloadTask)
|
||||
&& !(downloadTask?.IsCompleted ?? true))
|
||||
{
|
||||
completion = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
completion = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
_currentTransfers[hash] = completion.Task;
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesDownloadingFromCache);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_downloadSemaphore.Release();
|
||||
}
|
||||
|
||||
if (completion == null)
|
||||
{
|
||||
var waited = await DownloadAndGetLocalFileInfo(hash).ConfigureAwait(false);
|
||||
if (waited == null) return null;
|
||||
return new CachedFileStreamResult(waited, null, waited.Length);
|
||||
}
|
||||
|
||||
var downloadUrl = LightlessFiles.DistributionGetFullPath(_remoteCacheSourceUri, hash);
|
||||
_logger.LogInformation("Did not find {hash}, streaming from {server}", hash, downloadUrl);
|
||||
|
||||
using var requestMessage = new HttpRequestMessage(HttpMethod.Get, downloadUrl);
|
||||
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _generator.Token);
|
||||
|
||||
HttpResponseMessage response;
|
||||
try
|
||||
{
|
||||
response = await _httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false);
|
||||
response.EnsureSuccessStatusCode();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to stream {url}", downloadUrl);
|
||||
FinalizeStreamingDownload(hash, null, destinationFilePath, completion, false, 0);
|
||||
return null;
|
||||
}
|
||||
|
||||
var tempFileName = destinationFilePath + ".dl";
|
||||
var fileStream = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.Read, bufferSize: 64 * 1024, useAsync: true);
|
||||
var sourceStream = await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false);
|
||||
|
||||
var stream = new StreamingCacheWriteStream(
|
||||
sourceStream,
|
||||
fileStream,
|
||||
response,
|
||||
bytesWritten => FinalizeStreamingDownload(hash, tempFileName, destinationFilePath, completion, true, bytesWritten),
|
||||
bytesWritten => FinalizeStreamingDownload(hash, tempFileName, destinationFilePath, completion, false, bytesWritten),
|
||||
_logger);
|
||||
|
||||
return new CachedFileStreamResult(null, stream, response.Content.Headers.ContentLength);
|
||||
}
|
||||
|
||||
public async Task DownloadFileWhenRequired(string hash)
|
||||
{
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
|
||||
@@ -219,4 +311,174 @@ public sealed class CachedFileProvider : IDisposable
|
||||
{
|
||||
return hashes.Exists(_currentTransfers.Keys.Contains);
|
||||
}
|
||||
}
|
||||
|
||||
private void FinalizeStreamingDownload(string hash, string? tempFileName, string destinationFilePath,
|
||||
TaskCompletionSource<bool> completion, bool success, long bytesWritten)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (success)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(tempFileName))
|
||||
{
|
||||
File.Move(tempFileName, destinationFilePath, true);
|
||||
}
|
||||
|
||||
var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash);
|
||||
if (fi != null)
|
||||
{
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, fi.Length);
|
||||
_fileStatisticsService.LogFile(hash, fi.Length);
|
||||
_inventoryReporter.ReportAdded(hash);
|
||||
}
|
||||
}
|
||||
else if (!string.IsNullOrEmpty(tempFileName))
|
||||
{
|
||||
try { File.Delete(tempFileName); } catch { /* ignore */ }
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to finalize streaming download for {hash} after {bytes} bytes", hash, bytesWritten);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesDownloadingFromCache);
|
||||
_currentTransfers.Remove(hash, out _);
|
||||
completion.TrySetResult(success);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class StreamingCacheWriteStream : Stream
|
||||
{
|
||||
private readonly Stream _source;
|
||||
private readonly FileStream _cacheStream;
|
||||
private readonly HttpResponseMessage _response;
|
||||
private readonly Action<long> _onSuccess;
|
||||
private readonly Action<long> _onFailure;
|
||||
private readonly ILogger _logger;
|
||||
private long _bytesWritten;
|
||||
private int _completed;
|
||||
|
||||
public StreamingCacheWriteStream(Stream source, FileStream cacheStream, HttpResponseMessage response,
|
||||
Action<long> onSuccess, Action<long> onFailure, ILogger logger)
|
||||
{
|
||||
_source = source;
|
||||
_cacheStream = cacheStream;
|
||||
_response = response;
|
||||
_onSuccess = onSuccess;
|
||||
_onFailure = onFailure;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public override bool CanRead => true;
|
||||
public override bool CanSeek => false;
|
||||
public override bool CanWrite => false;
|
||||
public override long Length => _bytesWritten;
|
||||
public override long Position
|
||||
{
|
||||
get => _bytesWritten;
|
||||
set => throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
if (Volatile.Read(ref _completed) != 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
int bytesRead = _source.Read(buffer, offset, count);
|
||||
if (bytesRead > 0)
|
||||
{
|
||||
_cacheStream.Write(buffer, offset, bytesRead);
|
||||
_bytesWritten += bytesRead;
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
Complete(true);
|
||||
return 0;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Streaming download failed while reading");
|
||||
Complete(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (Volatile.Read(ref _completed) != 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
int bytesRead = await _source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
|
||||
if (bytesRead > 0)
|
||||
{
|
||||
await _cacheStream.WriteAsync(buffer.Slice(0, bytesRead), cancellationToken).ConfigureAwait(false);
|
||||
_bytesWritten += bytesRead;
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
Complete(true);
|
||||
return 0;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Streaming download failed while reading");
|
||||
Complete(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public override void Flush()
|
||||
{
|
||||
// no-op
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
Complete(false);
|
||||
}
|
||||
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
private void Complete(bool success)
|
||||
{
|
||||
if (Interlocked.Exchange(ref _completed, 1) != 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
try { _cacheStream.Flush(); } catch { /* ignore */ }
|
||||
try { _cacheStream.Dispose(); } catch { /* ignore */ }
|
||||
try { _source.Dispose(); } catch { /* ignore */ }
|
||||
try { _response.Dispose(); } catch { /* ignore */ }
|
||||
|
||||
if (success)
|
||||
{
|
||||
_onSuccess(_bytesWritten);
|
||||
}
|
||||
else
|
||||
{
|
||||
_onFailure(_bytesWritten);
|
||||
}
|
||||
}
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
||||
public override void SetLength(long value) => throw new NotSupportedException();
|
||||
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
|
||||
}
|
||||
}
|
||||
|
||||
public readonly record struct CachedFileStreamResult(FileInfo? File, Stream? Stream, long? ContentLength);
|
||||
@@ -1,5 +1,6 @@
|
||||
using LightlessSyncShared.Services;
|
||||
using LightlessSyncShared.Utils.Configuration;
|
||||
using LightlessSyncShared.Models;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Frozen;
|
||||
|
||||
@@ -11,8 +12,16 @@ public class MainServerShardRegistrationService : IHostedService
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configurationService;
|
||||
private readonly ConcurrentDictionary<string, ShardConfiguration> _shardConfigs = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, DateTime> _shardHeartbeats = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, ShardFileInventory> _shardFileInventory = new(StringComparer.Ordinal);
|
||||
private readonly CancellationTokenSource _periodicCheckCts = new();
|
||||
|
||||
private sealed class ShardFileInventory
|
||||
{
|
||||
public long Sequence { get; set; }
|
||||
public HashSet<string> Files { get; set; } = new(StringComparer.OrdinalIgnoreCase);
|
||||
public object SyncRoot { get; } = new();
|
||||
}
|
||||
|
||||
public MainServerShardRegistrationService(ILogger<MainServerShardRegistrationService> logger,
|
||||
IConfigurationService<StaticFilesServerConfiguration> configurationService)
|
||||
{
|
||||
@@ -32,6 +41,7 @@ public class MainServerShardRegistrationService : IHostedService
|
||||
|
||||
_shardHeartbeats[shardName] = DateTime.UtcNow;
|
||||
_shardConfigs[shardName] = shardConfiguration;
|
||||
_shardFileInventory.TryAdd(shardName, new ShardFileInventory());
|
||||
}
|
||||
|
||||
public void UnregisterShard(string shardName)
|
||||
@@ -40,6 +50,7 @@ public class MainServerShardRegistrationService : IHostedService
|
||||
|
||||
_shardHeartbeats.TryRemove(shardName, out _);
|
||||
_shardConfigs.TryRemove(shardName, out _);
|
||||
_shardFileInventory.TryRemove(shardName, out _);
|
||||
}
|
||||
|
||||
public List<ShardConfiguration> GetConfigurationsByContinent(string continent)
|
||||
@@ -56,6 +67,94 @@ public class MainServerShardRegistrationService : IHostedService
|
||||
} }];
|
||||
}
|
||||
|
||||
public List<(string ShardName, ShardConfiguration Config)> GetShardEntriesByContinent(string continent)
|
||||
{
|
||||
var shardConfigs = _shardConfigs
|
||||
.Where(v => v.Value.Continents.Contains(continent, StringComparer.OrdinalIgnoreCase))
|
||||
.Select(kvp => (kvp.Key, kvp.Value))
|
||||
.ToList();
|
||||
if (shardConfigs.Any()) return shardConfigs;
|
||||
|
||||
shardConfigs = _shardConfigs
|
||||
.Where(v => v.Value.Continents.Contains("*", StringComparer.OrdinalIgnoreCase))
|
||||
.Select(kvp => (kvp.Key, kvp.Value))
|
||||
.ToList();
|
||||
if (shardConfigs.Any()) return shardConfigs;
|
||||
|
||||
var fallback = new ShardConfiguration()
|
||||
{
|
||||
Continents = ["*"],
|
||||
FileMatch = ".*",
|
||||
RegionUris = new(StringComparer.Ordinal)
|
||||
{
|
||||
{ "Central", _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.CdnFullUrl)) }
|
||||
}
|
||||
};
|
||||
|
||||
return [(string.Empty, fallback)];
|
||||
}
|
||||
|
||||
public long ApplyFileInventoryUpdate(string shardName, ShardFileInventoryUpdateDto update)
|
||||
{
|
||||
if (!_shardConfigs.ContainsKey(shardName))
|
||||
throw new InvalidOperationException("Shard not registered");
|
||||
|
||||
var inventory = _shardFileInventory.GetOrAdd(shardName, _ => new ShardFileInventory());
|
||||
lock (inventory.SyncRoot)
|
||||
{
|
||||
if (update.IsFullSnapshot && update.Sequence <= inventory.Sequence)
|
||||
{
|
||||
inventory.Files = new HashSet<string>(update.Added ?? [], StringComparer.OrdinalIgnoreCase);
|
||||
inventory.Sequence = update.Sequence;
|
||||
return inventory.Sequence;
|
||||
}
|
||||
|
||||
if (update.Sequence <= inventory.Sequence)
|
||||
{
|
||||
return inventory.Sequence;
|
||||
}
|
||||
|
||||
if (update.IsFullSnapshot)
|
||||
{
|
||||
inventory.Files = new HashSet<string>(update.Added ?? [], StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (update.Added != null)
|
||||
{
|
||||
foreach (var hash in update.Added)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(hash))
|
||||
inventory.Files.Add(hash);
|
||||
}
|
||||
}
|
||||
|
||||
if (update.Removed != null)
|
||||
{
|
||||
foreach (var hash in update.Removed)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(hash))
|
||||
inventory.Files.Remove(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inventory.Sequence = update.Sequence;
|
||||
return inventory.Sequence;
|
||||
}
|
||||
}
|
||||
|
||||
public bool ShardHasFile(string shardName, string hash)
|
||||
{
|
||||
if (!_shardFileInventory.TryGetValue(shardName, out var inventory))
|
||||
return false;
|
||||
|
||||
lock (inventory.SyncRoot)
|
||||
{
|
||||
return inventory.Files.Contains(hash);
|
||||
}
|
||||
}
|
||||
|
||||
public void ShardHeartbeat(string shardName)
|
||||
{
|
||||
if (!_shardConfigs.ContainsKey(shardName))
|
||||
@@ -87,6 +186,7 @@ public class MainServerShardRegistrationService : IHostedService
|
||||
{
|
||||
_shardHeartbeats.TryRemove(kvp.Key, out _);
|
||||
_shardConfigs.TryRemove(kvp.Key, out _);
|
||||
_shardFileInventory.TryRemove(kvp.Key, out _);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,13 +12,16 @@ public class ShardFileCleanupService : IHostedService
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configuration;
|
||||
private readonly ILogger<MainFileCleanupService> _logger;
|
||||
private readonly LightlessMetrics _metrics;
|
||||
private readonly IShardFileInventoryReporter _inventoryReporter;
|
||||
private CancellationTokenSource _cleanupCts;
|
||||
|
||||
public ShardFileCleanupService(LightlessMetrics metrics, ILogger<MainFileCleanupService> logger, IConfigurationService<StaticFilesServerConfiguration> configuration)
|
||||
public ShardFileCleanupService(LightlessMetrics metrics, ILogger<MainFileCleanupService> logger, IConfigurationService<StaticFilesServerConfiguration> configuration,
|
||||
IShardFileInventoryReporter inventoryReporter)
|
||||
{
|
||||
_metrics = metrics;
|
||||
_logger = logger;
|
||||
_configuration = configuration;
|
||||
_inventoryReporter = inventoryReporter;
|
||||
_cacheDir = _configuration.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
}
|
||||
|
||||
@@ -99,6 +102,7 @@ public class ShardFileCleanupService : IHostedService
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, oldestFile.Length);
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_logger.LogInformation("Deleting {oldestFile} with size {size}MiB", oldestFile.FullName, ByteSize.FromBytes(oldestFile.Length).MebiBytes);
|
||||
ReportRemoval(oldestFile.Name);
|
||||
oldestFile.Delete();
|
||||
}
|
||||
}
|
||||
@@ -135,6 +139,7 @@ public class ShardFileCleanupService : IHostedService
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_logger.LogInformation("File outdated: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
|
||||
ReportRemoval(file.Name);
|
||||
file.Delete();
|
||||
}
|
||||
else if (forcedDeletionAfterHours > 0 && file.LastWriteTime < prevTimeForcedDeletion)
|
||||
@@ -142,6 +147,7 @@ public class ShardFileCleanupService : IHostedService
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_logger.LogInformation("File forcefully deleted: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes);
|
||||
ReportRemoval(file.Name);
|
||||
file.Delete();
|
||||
}
|
||||
else if (file.Length == 0 && !string.Equals(file.Extension, ".dl", StringComparison.OrdinalIgnoreCase))
|
||||
@@ -149,6 +155,7 @@ public class ShardFileCleanupService : IHostedService
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length);
|
||||
_metrics.DecGauge(MetricsAPI.GaugeFilesTotal);
|
||||
_logger.LogInformation("File with size 0 deleted: {filename}", file.Name);
|
||||
ReportRemoval(file.Name);
|
||||
file.Delete();
|
||||
}
|
||||
|
||||
@@ -160,4 +167,12 @@ public class ShardFileCleanupService : IHostedService
|
||||
_logger.LogWarning(ex, "Error during file cleanup of old files");
|
||||
}
|
||||
}
|
||||
|
||||
private void ReportRemoval(string fileName)
|
||||
{
|
||||
if (fileName.Length == 40 && fileName.All(char.IsAsciiLetterOrDigit))
|
||||
{
|
||||
_inventoryReporter.ReportRemoved(fileName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,282 @@
|
||||
using LightlessSync.API.Routes;
|
||||
using LightlessSyncShared.Models;
|
||||
using LightlessSyncShared.Services;
|
||||
using LightlessSyncShared.Utils;
|
||||
using LightlessSyncShared.Utils.Configuration;
|
||||
using System.Net.Http.Json;
|
||||
using System.Linq;
|
||||
|
||||
namespace LightlessSyncStaticFilesServer.Services;
|
||||
|
||||
public interface IShardFileInventoryReporter
|
||||
{
|
||||
void ReportAdded(string hash);
|
||||
void ReportRemoved(string hash);
|
||||
}
|
||||
|
||||
public sealed class NullShardFileInventoryReporter : IShardFileInventoryReporter
|
||||
{
|
||||
public void ReportAdded(string hash) { }
|
||||
public void ReportRemoved(string hash) { }
|
||||
}
|
||||
|
||||
public sealed class ShardFileInventoryReporter : IHostedService, IShardFileInventoryReporter
|
||||
{
|
||||
private static readonly TimeSpan ResyncInterval = TimeSpan.FromMinutes(30);
|
||||
private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(10);
|
||||
private static readonly TimeSpan BatchDelay = TimeSpan.FromSeconds(2);
|
||||
|
||||
private readonly IConfigurationService<StaticFilesServerConfiguration> _configurationService;
|
||||
private readonly ILogger<ShardFileInventoryReporter> _logger;
|
||||
private readonly HttpClient _httpClient = new();
|
||||
private readonly string _cacheDir;
|
||||
private readonly object _pendingLock = new();
|
||||
private readonly SemaphoreSlim _signal = new(0);
|
||||
|
||||
private HashSet<string> _pendingAdds = new(StringComparer.OrdinalIgnoreCase);
|
||||
private HashSet<string> _pendingRemoves = new(StringComparer.OrdinalIgnoreCase);
|
||||
private CancellationTokenSource? _cts;
|
||||
private Task? _processTask;
|
||||
private Task? _resyncTask;
|
||||
private long _sequence;
|
||||
private bool _resyncRequested;
|
||||
|
||||
public ShardFileInventoryReporter(
|
||||
IConfigurationService<StaticFilesServerConfiguration> configurationService,
|
||||
ILogger<ShardFileInventoryReporter> logger,
|
||||
ServerTokenGenerator serverTokenGenerator)
|
||||
{
|
||||
_configurationService = configurationService;
|
||||
_logger = logger;
|
||||
_cacheDir = configurationService.GetValue<string>(nameof(StaticFilesServerConfiguration.CacheDirectory));
|
||||
_httpClient.DefaultRequestHeaders.Authorization =
|
||||
new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", serverTokenGenerator.Token);
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
_resyncRequested = true;
|
||||
_signal.Release();
|
||||
|
||||
_processTask = Task.Run(() => ProcessUpdatesAsync(_cts.Token), _cts.Token);
|
||||
_resyncTask = Task.Run(() => ResyncLoopAsync(_cts.Token), _cts.Token);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_cts == null)
|
||||
return;
|
||||
|
||||
_cts.Cancel();
|
||||
|
||||
try
|
||||
{
|
||||
if (_processTask != null) await _processTask.ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (_resyncTask != null) await _resyncTask.ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
|
||||
_httpClient.Dispose();
|
||||
}
|
||||
|
||||
public void ReportAdded(string hash)
|
||||
{
|
||||
if (!IsValidHash(hash))
|
||||
return;
|
||||
|
||||
lock (_pendingLock)
|
||||
{
|
||||
_pendingAdds.Add(hash);
|
||||
_pendingRemoves.Remove(hash);
|
||||
}
|
||||
|
||||
_signal.Release();
|
||||
}
|
||||
|
||||
public void ReportRemoved(string hash)
|
||||
{
|
||||
if (!IsValidHash(hash))
|
||||
return;
|
||||
|
||||
lock (_pendingLock)
|
||||
{
|
||||
_pendingRemoves.Add(hash);
|
||||
_pendingAdds.Remove(hash);
|
||||
}
|
||||
|
||||
_signal.Release();
|
||||
}
|
||||
|
||||
private async Task ResyncLoopAsync(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(ResyncInterval, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
_resyncRequested = true;
|
||||
_signal.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessUpdatesAsync(CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _signal.WaitAsync(BatchDelay, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
ShardFileInventoryUpdateDto? update = null;
|
||||
|
||||
if (_resyncRequested)
|
||||
{
|
||||
var snapshot = BuildSnapshot();
|
||||
update = new ShardFileInventoryUpdateDto
|
||||
{
|
||||
Sequence = Interlocked.Increment(ref _sequence),
|
||||
IsFullSnapshot = true,
|
||||
Added = snapshot
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
HashSet<string> adds;
|
||||
HashSet<string> removes;
|
||||
|
||||
lock (_pendingLock)
|
||||
{
|
||||
if (_pendingAdds.Count == 0 && _pendingRemoves.Count == 0)
|
||||
break;
|
||||
|
||||
adds = _pendingAdds;
|
||||
removes = _pendingRemoves;
|
||||
_pendingAdds = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
_pendingRemoves = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
update = new ShardFileInventoryUpdateDto
|
||||
{
|
||||
Sequence = Interlocked.Increment(ref _sequence),
|
||||
Added = adds.ToList(),
|
||||
Removed = removes.ToList()
|
||||
};
|
||||
}
|
||||
|
||||
if (update == null)
|
||||
break;
|
||||
|
||||
await SendUpdateWithRetryAsync(update, ct).ConfigureAwait(false);
|
||||
|
||||
if (update.IsFullSnapshot)
|
||||
{
|
||||
lock (_pendingLock)
|
||||
{
|
||||
_pendingAdds.Clear();
|
||||
_pendingRemoves.Clear();
|
||||
_resyncRequested = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SendUpdateWithRetryAsync(ShardFileInventoryUpdateDto update, CancellationToken ct)
|
||||
{
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await SendUpdateAsync(update, ct).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to send shard file inventory update (seq {seq})", update.Sequence);
|
||||
try
|
||||
{
|
||||
await Task.Delay(RetryDelay, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SendUpdateAsync(ShardFileInventoryUpdateDto update, CancellationToken ct)
|
||||
{
|
||||
var mainServer = _configurationService.GetValue<Uri>(nameof(StaticFilesServerConfiguration.MainFileServerAddress));
|
||||
if (mainServer == null)
|
||||
throw new InvalidOperationException("Main server address is not configured.");
|
||||
|
||||
using var response = await _httpClient.PostAsJsonAsync(
|
||||
LightlessFiles.MainShardFilesFullPath(mainServer),
|
||||
update,
|
||||
ct).ConfigureAwait(false);
|
||||
|
||||
response.EnsureSuccessStatusCode();
|
||||
|
||||
var ack = await response.Content.ReadFromJsonAsync<ShardFileInventoryUpdateAckDto>(cancellationToken: ct)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (ack == null || ack.AppliedSequence < update.Sequence)
|
||||
throw new InvalidOperationException($"Main server did not apply update {update.Sequence}.");
|
||||
}
|
||||
|
||||
private List<string> BuildSnapshot()
|
||||
{
|
||||
var hashes = new List<string>();
|
||||
|
||||
if (string.IsNullOrWhiteSpace(_cacheDir) || !Directory.Exists(_cacheDir))
|
||||
return hashes;
|
||||
|
||||
foreach (var file in Directory.EnumerateFiles(_cacheDir, "*", SearchOption.AllDirectories))
|
||||
{
|
||||
var name = Path.GetFileName(file);
|
||||
if (name.EndsWith(".dl", StringComparison.OrdinalIgnoreCase))
|
||||
continue;
|
||||
|
||||
if (IsValidHash(name))
|
||||
hashes.Add(name);
|
||||
}
|
||||
|
||||
return hashes;
|
||||
}
|
||||
|
||||
private static bool IsValidHash(string hash)
|
||||
{
|
||||
return hash.Length == 40 && hash.All(char.IsAsciiLetterOrDigit);
|
||||
}
|
||||
}
|
||||
@@ -97,6 +97,7 @@ public class Startup
|
||||
// specific services
|
||||
if (_isMain)
|
||||
{
|
||||
services.AddSingleton<IShardFileInventoryReporter, NullShardFileInventoryReporter>();
|
||||
services.AddSingleton<IClientReadyMessageService, MainClientReadyMessageService>();
|
||||
services.AddHostedService<MainFileCleanupService>();
|
||||
services.AddSingleton<IConfigurationService<StaticFilesServerConfiguration>, LightlessConfigurationServiceServer<StaticFilesServerConfiguration>>();
|
||||
@@ -184,6 +185,9 @@ public class Startup
|
||||
}
|
||||
else
|
||||
{
|
||||
services.AddSingleton<ShardFileInventoryReporter>();
|
||||
services.AddSingleton<IShardFileInventoryReporter>(sp => sp.GetRequiredService<ShardFileInventoryReporter>());
|
||||
services.AddHostedService(sp => sp.GetRequiredService<ShardFileInventoryReporter>());
|
||||
services.AddSingleton<ShardRegistrationService>();
|
||||
services.AddHostedService(s => s.GetRequiredService<ShardRegistrationService>());
|
||||
services.AddSingleton<IClientReadyMessageService, ShardClientReadyMessageService>();
|
||||
|
||||
Reference in New Issue
Block a user