From 0eea339e4129a7220dc764e58ee755462d3ed7ab Mon Sep 17 00:00:00 2001 From: Abelfreyja <96177659+Abelfreyja@users.noreply.github.com> Date: Wed, 14 Jan 2026 16:52:06 +0900 Subject: [PATCH] adds the ability for shards to sync with the main server regarding their caches, when pulling files from main server the shard can stream it to the client directly while downloading and add info for server to report to client regarding file locations across shards --- LightlessAPI | 2 +- .../Models/ShardFileInventoryUpdateDto.cs | 14 + .../Controllers/MainController.cs | 16 + .../Controllers/ServerFilesController.cs | 201 ++++++++++++- .../Controllers/ShardServerFilesController.cs | 19 +- .../Services/CDNDownloadsService.cs | 19 +- .../Services/CachedFileProvider.cs | 268 ++++++++++++++++- .../MainServerShardRegistrationService.cs | 100 +++++++ .../Services/ShardFileCleanupService.cs | 17 +- .../Services/ShardFileInventoryReporter.cs | 282 ++++++++++++++++++ .../LightlessSyncStaticFilesServer/Startup.cs | 4 + 11 files changed, 915 insertions(+), 27 deletions(-) create mode 100644 LightlessSyncServer/LightlessSyncShared/Models/ShardFileInventoryUpdateDto.cs create mode 100644 LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileInventoryReporter.cs diff --git a/LightlessAPI b/LightlessAPI index 4ecd537..89bcc24 160000 --- a/LightlessAPI +++ b/LightlessAPI @@ -1 +1 @@ -Subproject commit 4ecd5375e63082f44b841bcba38d5dd3f4a2a79b +Subproject commit 89bcc242cf7c5a65f3b0addd5347c7eddecbdae1 diff --git a/LightlessSyncServer/LightlessSyncShared/Models/ShardFileInventoryUpdateDto.cs b/LightlessSyncServer/LightlessSyncShared/Models/ShardFileInventoryUpdateDto.cs new file mode 100644 index 0000000..4429e68 --- /dev/null +++ b/LightlessSyncServer/LightlessSyncShared/Models/ShardFileInventoryUpdateDto.cs @@ -0,0 +1,14 @@ +namespace LightlessSyncShared.Models; + +public sealed record ShardFileInventoryUpdateDto +{ + public long Sequence { get; init; } + public bool IsFullSnapshot { get; init; } + public List Added { get; init; } = new(); + public List Removed { get; init; } = new(); +} + +public sealed record ShardFileInventoryUpdateAckDto +{ + public long AppliedSequence { get; init; } +} diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/MainController.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/MainController.cs index 2c8a2be..7bd4719 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/MainController.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/MainController.cs @@ -1,6 +1,7 @@ using LightlessSync.API.Routes; using LightlessSyncShared.Utils.Configuration; using LightlessSyncStaticFilesServer.Services; +using LightlessSyncShared.Models; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; @@ -63,4 +64,19 @@ public class MainController : ControllerBase return BadRequest(); } } + + [HttpPost(LightlessFiles.Main_ShardFiles)] + public IActionResult ShardFilesUpdate([FromBody] ShardFileInventoryUpdateDto update) + { + try + { + var applied = _shardRegistrationService.ApplyFileInventoryUpdate(LightlessUser, update); + return Ok(new ShardFileInventoryUpdateAckDto { AppliedSequence = applied }); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Shard file inventory update failed: {shard}", LightlessUser); + return BadRequest(); + } + } } \ No newline at end of file diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ServerFilesController.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ServerFilesController.cs index d0b5d95..d5af8e9 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ServerFilesController.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ServerFilesController.cs @@ -82,7 +82,9 @@ public class ServerFilesController : ControllerBase } [HttpGet(LightlessFiles.ServerFiles_GetSizes)] - public async Task FilesGetSizes([FromBody] List hashes) + public async Task FilesGetSizes( + [FromBody] List hashes, + [FromQuery(Name = "avoidHost")] List? avoidHosts = null) { using var dbContext = await _lightlessDbContext.CreateDbContextAsync(); var forbiddenFiles = await dbContext.ForbiddenUploadEntries. @@ -94,27 +96,97 @@ public class ServerFilesController : ControllerBase .Select(k => new { k.Hash, k.Size, k.RawSize }) .ToListAsync().ConfigureAwait(false); - var allFileShards = _shardRegistrationService.GetConfigurationsByContinent(Continent); + var avoidHostSet = new HashSet(StringComparer.OrdinalIgnoreCase); + if (avoidHosts != null) + { + foreach (var host in avoidHosts) + { + if (!string.IsNullOrWhiteSpace(host)) + { + avoidHostSet.Add(host); + } + } + } + + var allFileShards = _shardRegistrationService.GetShardEntriesByContinent(Continent); + var shardContexts = new List(allFileShards.Count); + foreach (var shard in allFileShards) + { + shardContexts.Add(new ShardSelectionContext( + shard.ShardName, + shard.Config, + new Regex(shard.Config.FileMatch, RegexOptions.Compiled))); + } foreach (var file in cacheFile) { var forbiddenFile = forbiddenFiles.SingleOrDefault(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase)); - Uri? baseUrl = null; + Uri? queuedBaseUrl = null; + Uri? directBaseUrl = null; + var queuedUrls = new List(); + var hasFileUrls = new List(); + var hasFileDirectUrls = new List(); + var pullThroughUrls = new List(); + var pullThroughDirectUrls = new List(); if (forbiddenFile == null) { - var matchingShards = allFileShards.Where(f => new Regex(f.FileMatch).IsMatch(file.Hash)).ToList(); + var matchingShards = shardContexts + .Where(f => f.FileMatchRegex.IsMatch(file.Hash)) + .ToList(); - var shard = matchingShards.SelectMany(g => g.RegionUris) - .OrderBy(g => Guid.NewGuid()).FirstOrDefault(); + foreach (var shardEntry in matchingShards) + { + var regionUris = shardEntry.GetRegionUris(avoidHostSet); - baseUrl = shard.Value ?? _configuration.GetValue(nameof(StaticFilesServerConfiguration.CdnFullUrl)); + if (regionUris.Count == 0) + { + continue; + } + + foreach (var uri in regionUris) + { + AddBaseUrl(queuedUrls, uri); + } + + var hasFile = !string.IsNullOrEmpty(shardEntry.ShardName) + && _shardRegistrationService.ShardHasFile(shardEntry.ShardName, file.Hash); + + var baseList = hasFile ? hasFileUrls : pullThroughUrls; + var directList = hasFile ? hasFileDirectUrls : pullThroughDirectUrls; + + foreach (var uri in regionUris) + { + AddCandidate(baseList, directList, uri, file.Hash); + } + } + + if (queuedUrls.Count == 0) + { + var fallback = _configuration.GetValue(nameof(StaticFilesServerConfiguration.CdnFullUrl)); + if (fallback != null && (avoidHostSet.Count == 0 || !IsAvoidedHost(fallback, avoidHostSet))) + { + AddBaseUrl(queuedUrls, fallback); + } + } + + if (hasFileUrls.Count == 0 && pullThroughUrls.Count == 0) + { + var fallback = _configuration.GetValue(nameof(StaticFilesServerConfiguration.CdnFullUrl)); + if (fallback != null && (avoidHostSet.Count == 0 || !IsAvoidedHost(fallback, avoidHostSet))) + { + AddCandidate(pullThroughUrls, pullThroughDirectUrls, fallback, file.Hash); + } + } + + queuedBaseUrl = SelectPreferredBase(queuedUrls); + directBaseUrl = SelectPreferredBase(hasFileUrls, pullThroughUrls); } var cdnDownloadUrl = string.Empty; if (forbiddenFile == null) { - var directUri = _cdnDownloadUrlService.TryCreateDirectDownloadUri(baseUrl, file.Hash); + var directUri = _cdnDownloadUrlService.TryCreateDirectDownloadUri(directBaseUrl, file.Hash); if (directUri != null) { cdnDownloadUrl = directUri.ToString(); @@ -128,8 +200,10 @@ public class ServerFilesController : ControllerBase IsForbidden = forbiddenFile != null, Hash = file.Hash, Size = file.Size, - Url = baseUrl?.ToString() ?? string.Empty, + Url = queuedBaseUrl?.ToString() ?? string.Empty, CDNDownloadUrl = cdnDownloadUrl, + HasFileDirectUrls = hasFileDirectUrls, + PullThroughDirectUrls = pullThroughDirectUrls, RawSize = file.RawSize }); } @@ -144,22 +218,127 @@ public class ServerFilesController : ControllerBase return Ok(JsonSerializer.Serialize(allFileShards.SelectMany(t => t.RegionUris.Select(v => v.Value.ToString())))); } + private static bool IsAvoidedHost(Uri uri, HashSet avoidHosts) + { + if (avoidHosts.Count == 0) + return false; + + var host = uri.Host; + if (!string.IsNullOrWhiteSpace(host) && avoidHosts.Contains(host)) + return true; + + var authority = uri.Authority; + if (!string.IsNullOrWhiteSpace(authority) && avoidHosts.Contains(authority)) + return true; + + var absolute = uri.ToString().TrimEnd('/'); + return avoidHosts.Contains(absolute); + } + + private sealed class ShardSelectionContext + { + private List? _cachedUris; + private List? _cachedAvoidedUris; + + public ShardSelectionContext(string shardName, ShardConfiguration config, Regex fileMatchRegex) + { + ShardName = shardName; + Config = config; + FileMatchRegex = fileMatchRegex; + } + + public string ShardName { get; } + public ShardConfiguration Config { get; } + public Regex FileMatchRegex { get; } + + public List GetRegionUris(HashSet avoidHosts) + { + if (_cachedUris == null) + { + _cachedUris = Config.RegionUris.Values.ToList(); + } + + if (avoidHosts.Count == 0) + { + return _cachedUris; + } + + _cachedAvoidedUris ??= _cachedUris.Where(u => !IsAvoidedHost(u, avoidHosts)).ToList(); + return _cachedAvoidedUris.Count > 0 ? _cachedAvoidedUris : _cachedUris; + } + } + + private void AddCandidate(List baseUrls, List directUrls, Uri baseUri, string hash) + { + var baseUrl = baseUri.ToString(); + if (baseUrls.Any(u => string.Equals(u, baseUrl, StringComparison.OrdinalIgnoreCase))) + return; + + baseUrls.Add(baseUrl); + + var direct = _cdnDownloadUrlService.TryCreateDirectDownloadUri(baseUri, hash); + directUrls.Add(direct?.ToString() ?? string.Empty); + } + + private static void AddBaseUrl(List baseUrls, Uri baseUri) + { + var baseUrl = baseUri.ToString(); + if (baseUrls.Any(u => string.Equals(u, baseUrl, StringComparison.OrdinalIgnoreCase))) + return; + + baseUrls.Add(baseUrl); + } + + private static Uri? SelectPreferredBase(List urls) + { + if (urls.Count == 0) + return null; + + var selected = urls[Random.Shared.Next(urls.Count)]; + return Uri.TryCreate(selected, UriKind.Absolute, out var uri) ? uri : null; + } + + private static Uri? SelectPreferredBase(List hasFileUrls, List pullThroughUrls) + { + var list = hasFileUrls.Count > 0 ? hasFileUrls : pullThroughUrls; + if (list.Count == 0) + return null; + + var selected = list[Random.Shared.Next(list.Count)]; + return Uri.TryCreate(selected, UriKind.Absolute, out var uri) ? uri : null; + } + [HttpGet(LightlessFiles.ServerFiles_DirectDownload + "/{hash}")] [AllowAnonymous] public async Task DownloadFileDirect(string hash, [FromQuery] long expires, [FromQuery] string signature) { - var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature).ConfigureAwait(false); + var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature, HttpContext.RequestAborted).ConfigureAwait(false); return result.Status switch { CDNDownloadsService.ResultStatus.Disabled => NotFound(), CDNDownloadsService.ResultStatus.Unauthorized => Unauthorized(), CDNDownloadsService.ResultStatus.NotFound => NotFound(), - CDNDownloadsService.ResultStatus.Success => PhysicalFile(result.File!.FullName, "application/octet-stream"), + CDNDownloadsService.ResultStatus.Success => BuildDirectDownloadResult(result), _ => NotFound() }; } + private IActionResult BuildDirectDownloadResult(CDNDownloadsService.Result result) + { + if (result.Stream != null) + { + if (result.ContentLength.HasValue) + { + Response.ContentLength = result.ContentLength.Value; + } + + return new FileStreamResult(result.Stream, "application/octet-stream"); + } + + return PhysicalFile(result.File!.FullName, "application/octet-stream"); + } + [HttpPost(LightlessFiles.ServerFiles_FilesSend)] public async Task FilesSend([FromBody] FilesSendDto filesSendDto) { diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ShardServerFilesController.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ShardServerFilesController.cs index 819597e..29eb041 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ShardServerFilesController.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ShardServerFilesController.cs @@ -20,15 +20,30 @@ public class ShardServerFilesController : ControllerBase [AllowAnonymous] public async Task DownloadFileDirect(string hash, [FromQuery] long expires, [FromQuery] string signature) { - var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature).ConfigureAwait(false); + var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature, HttpContext.RequestAborted).ConfigureAwait(false); return result.Status switch { CDNDownloadsService.ResultStatus.Disabled => NotFound(), CDNDownloadsService.ResultStatus.Unauthorized => Unauthorized(), CDNDownloadsService.ResultStatus.NotFound => NotFound(), - CDNDownloadsService.ResultStatus.Success => PhysicalFile(result.File!.FullName, "application/octet-stream"), + CDNDownloadsService.ResultStatus.Success => BuildDirectDownloadResult(result), _ => NotFound() }; } + + private IActionResult BuildDirectDownloadResult(CDNDownloadsService.Result result) + { + if (result.Stream != null) + { + if (result.ContentLength.HasValue) + { + Response.ContentLength = result.ContentLength.Value; + } + + return new FileStreamResult(result.Stream, "application/octet-stream"); + } + + return PhysicalFile(result.File!.FullName, "application/octet-stream"); + } } diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CDNDownloadsService.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CDNDownloadsService.cs index 3cbd661..5495a9a 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CDNDownloadsService.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CDNDownloadsService.cs @@ -1,4 +1,5 @@ using System.IO; +using System.Threading; using System.Threading.Tasks; namespace LightlessSyncStaticFilesServer.Services; @@ -13,7 +14,7 @@ public class CDNDownloadsService Success } - public readonly record struct Result(ResultStatus Status, FileInfo? File); + public readonly record struct Result(ResultStatus Status, FileInfo? File, Stream? Stream, long? ContentLength); private readonly CDNDownloadUrlService _cdnDownloadUrlService; private readonly CachedFileProvider _cachedFileProvider; @@ -26,31 +27,31 @@ public class CDNDownloadsService public bool DownloadsEnabled => _cdnDownloadUrlService.DirectDownloadsEnabled; - public async Task GetDownloadAsync(string hash, long expiresUnixSeconds, string signature) + public async Task GetDownloadAsync(string hash, long expiresUnixSeconds, string signature, CancellationToken ct) { if (!_cdnDownloadUrlService.DirectDownloadsEnabled) { - return new Result(ResultStatus.Disabled, null); + return new Result(ResultStatus.Disabled, null, null, null); } if (string.IsNullOrEmpty(signature) || string.IsNullOrEmpty(hash)) { - return new Result(ResultStatus.Unauthorized, null); + return new Result(ResultStatus.Unauthorized, null, null, null); } hash = hash.ToUpperInvariant(); if (!_cdnDownloadUrlService.TryValidateSignature(hash, expiresUnixSeconds, signature)) { - return new Result(ResultStatus.Unauthorized, null); + return new Result(ResultStatus.Unauthorized, null, null, null); } - var fileInfo = await _cachedFileProvider.DownloadAndGetLocalFileInfo(hash).ConfigureAwait(false); - if (fileInfo == null) + var fileResult = await _cachedFileProvider.GetFileStreamForDirectDownloadAsync(hash, ct).ConfigureAwait(false); + if (fileResult == null) { - return new Result(ResultStatus.NotFound, null); + return new Result(ResultStatus.NotFound, null, null, null); } - return new Result(ResultStatus.Success, fileInfo); + return new Result(ResultStatus.Success, fileResult.Value.File, fileResult.Value.Stream, fileResult.Value.ContentLength); } } diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CachedFileProvider.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CachedFileProvider.cs index fbc4ce6..2d0b320 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CachedFileProvider.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CachedFileProvider.cs @@ -16,6 +16,7 @@ public sealed class CachedFileProvider : IDisposable private readonly FileStatisticsService _fileStatisticsService; private readonly LightlessMetrics _metrics; private readonly ServerTokenGenerator _generator; + private readonly IShardFileInventoryReporter _inventoryReporter; private readonly Uri _remoteCacheSourceUri; private readonly string _hotStoragePath; private readonly ConcurrentDictionary _currentTransfers = new(StringComparer.Ordinal); @@ -27,13 +28,15 @@ public sealed class CachedFileProvider : IDisposable private bool _isDistributionServer; public CachedFileProvider(IConfigurationService configuration, ILogger logger, - FileStatisticsService fileStatisticsService, LightlessMetrics metrics, ServerTokenGenerator generator) + FileStatisticsService fileStatisticsService, LightlessMetrics metrics, ServerTokenGenerator generator, + IShardFileInventoryReporter inventoryReporter) { _configuration = configuration; _logger = logger; _fileStatisticsService = fileStatisticsService; _metrics = metrics; _generator = generator; + _inventoryReporter = inventoryReporter; _remoteCacheSourceUri = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DistributionFileServerAddress), null); _isDistributionServer = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.IsDistributionNode), false); _hotStoragePath = configuration.GetValue(nameof(StaticFilesServerConfiguration.CacheDirectory)); @@ -97,10 +100,11 @@ public sealed class CachedFileProvider : IDisposable _metrics.IncGauge(MetricsAPI.GaugeFilesTotal); _metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash).Length); + _inventoryReporter.ReportAdded(hash); response.Dispose(); } - private bool TryCopyFromColdStorage(string hash, string destinationFilePath) + private bool TryCopyFromColdStorage(string hash, string destinationFilePath) { if (!_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false)) return false; @@ -124,6 +128,7 @@ public sealed class CachedFileProvider : IDisposable _metrics.IncGauge(MetricsAPI.GaugeFilesTotal); _metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, new FileInfo(destinationFilePath).Length); + _inventoryReporter.ReportAdded(hash); return true; } catch (Exception ex) @@ -134,6 +139,93 @@ public sealed class CachedFileProvider : IDisposable return false; } + public async Task GetFileStreamForDirectDownloadAsync(string hash, CancellationToken ct) + { + var destinationFilePath = FilePathUtil.GetFilePath(_hotStoragePath, hash); + + var existing = GetLocalFilePath(hash); + if (existing != null) + { + return new CachedFileStreamResult(existing, null, existing.Length); + } + + if (TryCopyFromColdStorage(hash, destinationFilePath)) + { + var coldFile = GetLocalFilePath(hash); + if (coldFile != null) + { + return new CachedFileStreamResult(coldFile, null, coldFile.Length); + } + } + + if (_remoteCacheSourceUri == null) + { + return null; + } + + TaskCompletionSource? completion = null; + + await _downloadSemaphore.WaitAsync(ct).ConfigureAwait(false); + try + { + if (_currentTransfers.TryGetValue(hash, out var downloadTask) + && !(downloadTask?.IsCompleted ?? true)) + { + completion = null; + } + else + { + completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _currentTransfers[hash] = completion.Task; + _metrics.IncGauge(MetricsAPI.GaugeFilesDownloadingFromCache); + } + } + finally + { + _downloadSemaphore.Release(); + } + + if (completion == null) + { + var waited = await DownloadAndGetLocalFileInfo(hash).ConfigureAwait(false); + if (waited == null) return null; + return new CachedFileStreamResult(waited, null, waited.Length); + } + + var downloadUrl = LightlessFiles.DistributionGetFullPath(_remoteCacheSourceUri, hash); + _logger.LogInformation("Did not find {hash}, streaming from {server}", hash, downloadUrl); + + using var requestMessage = new HttpRequestMessage(HttpMethod.Get, downloadUrl); + requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _generator.Token); + + HttpResponseMessage response; + try + { + response = await _httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false); + response.EnsureSuccessStatusCode(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to stream {url}", downloadUrl); + FinalizeStreamingDownload(hash, null, destinationFilePath, completion, false, 0); + return null; + } + + var tempFileName = destinationFilePath + ".dl"; + var fileStream = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.Read, bufferSize: 64 * 1024, useAsync: true); + var sourceStream = await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false); + + var stream = new StreamingCacheWriteStream( + sourceStream, + fileStream, + response, + bytesWritten => FinalizeStreamingDownload(hash, tempFileName, destinationFilePath, completion, true, bytesWritten), + bytesWritten => FinalizeStreamingDownload(hash, tempFileName, destinationFilePath, completion, false, bytesWritten), + _logger); + + return new CachedFileStreamResult(null, stream, response.Content.Headers.ContentLength); + } + public async Task DownloadFileWhenRequired(string hash) { var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash); @@ -219,4 +311,174 @@ public sealed class CachedFileProvider : IDisposable { return hashes.Exists(_currentTransfers.Keys.Contains); } -} \ No newline at end of file + + private void FinalizeStreamingDownload(string hash, string? tempFileName, string destinationFilePath, + TaskCompletionSource completion, bool success, long bytesWritten) + { + try + { + if (success) + { + if (!string.IsNullOrEmpty(tempFileName)) + { + File.Move(tempFileName, destinationFilePath, true); + } + + var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash); + if (fi != null) + { + _metrics.IncGauge(MetricsAPI.GaugeFilesTotal); + _metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, fi.Length); + _fileStatisticsService.LogFile(hash, fi.Length); + _inventoryReporter.ReportAdded(hash); + } + } + else if (!string.IsNullOrEmpty(tempFileName)) + { + try { File.Delete(tempFileName); } catch { /* ignore */ } + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to finalize streaming download for {hash} after {bytes} bytes", hash, bytesWritten); + } + finally + { + _metrics.DecGauge(MetricsAPI.GaugeFilesDownloadingFromCache); + _currentTransfers.Remove(hash, out _); + completion.TrySetResult(success); + } + } + + private sealed class StreamingCacheWriteStream : Stream + { + private readonly Stream _source; + private readonly FileStream _cacheStream; + private readonly HttpResponseMessage _response; + private readonly Action _onSuccess; + private readonly Action _onFailure; + private readonly ILogger _logger; + private long _bytesWritten; + private int _completed; + + public StreamingCacheWriteStream(Stream source, FileStream cacheStream, HttpResponseMessage response, + Action onSuccess, Action onFailure, ILogger logger) + { + _source = source; + _cacheStream = cacheStream; + _response = response; + _onSuccess = onSuccess; + _onFailure = onFailure; + _logger = logger; + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => _bytesWritten; + public override long Position + { + get => _bytesWritten; + set => throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (Volatile.Read(ref _completed) != 0) + { + return 0; + } + + try + { + int bytesRead = _source.Read(buffer, offset, count); + if (bytesRead > 0) + { + _cacheStream.Write(buffer, offset, bytesRead); + _bytesWritten += bytesRead; + return bytesRead; + } + + Complete(true); + return 0; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Streaming download failed while reading"); + Complete(false); + throw; + } + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (Volatile.Read(ref _completed) != 0) + { + return 0; + } + + try + { + int bytesRead = await _source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + if (bytesRead > 0) + { + await _cacheStream.WriteAsync(buffer.Slice(0, bytesRead), cancellationToken).ConfigureAwait(false); + _bytesWritten += bytesRead; + return bytesRead; + } + + Complete(true); + return 0; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Streaming download failed while reading"); + Complete(false); + throw; + } + } + + public override void Flush() + { + // no-op + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Complete(false); + } + + base.Dispose(disposing); + } + + private void Complete(bool success) + { + if (Interlocked.Exchange(ref _completed, 1) != 0) + { + return; + } + + try { _cacheStream.Flush(); } catch { /* ignore */ } + try { _cacheStream.Dispose(); } catch { /* ignore */ } + try { _source.Dispose(); } catch { /* ignore */ } + try { _response.Dispose(); } catch { /* ignore */ } + + if (success) + { + _onSuccess(_bytesWritten); + } + else + { + _onFailure(_bytesWritten); + } + } + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + } +} + +public readonly record struct CachedFileStreamResult(FileInfo? File, Stream? Stream, long? ContentLength); \ No newline at end of file diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/MainServerShardRegistrationService.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/MainServerShardRegistrationService.cs index 7f31c29..127f491 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/MainServerShardRegistrationService.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/MainServerShardRegistrationService.cs @@ -1,5 +1,6 @@ using LightlessSyncShared.Services; using LightlessSyncShared.Utils.Configuration; +using LightlessSyncShared.Models; using System.Collections.Concurrent; using System.Collections.Frozen; @@ -11,8 +12,16 @@ public class MainServerShardRegistrationService : IHostedService private readonly IConfigurationService _configurationService; private readonly ConcurrentDictionary _shardConfigs = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _shardHeartbeats = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _shardFileInventory = new(StringComparer.Ordinal); private readonly CancellationTokenSource _periodicCheckCts = new(); + private sealed class ShardFileInventory + { + public long Sequence { get; set; } + public HashSet Files { get; set; } = new(StringComparer.OrdinalIgnoreCase); + public object SyncRoot { get; } = new(); + } + public MainServerShardRegistrationService(ILogger logger, IConfigurationService configurationService) { @@ -32,6 +41,7 @@ public class MainServerShardRegistrationService : IHostedService _shardHeartbeats[shardName] = DateTime.UtcNow; _shardConfigs[shardName] = shardConfiguration; + _shardFileInventory.TryAdd(shardName, new ShardFileInventory()); } public void UnregisterShard(string shardName) @@ -40,6 +50,7 @@ public class MainServerShardRegistrationService : IHostedService _shardHeartbeats.TryRemove(shardName, out _); _shardConfigs.TryRemove(shardName, out _); + _shardFileInventory.TryRemove(shardName, out _); } public List GetConfigurationsByContinent(string continent) @@ -56,6 +67,94 @@ public class MainServerShardRegistrationService : IHostedService } }]; } + public List<(string ShardName, ShardConfiguration Config)> GetShardEntriesByContinent(string continent) + { + var shardConfigs = _shardConfigs + .Where(v => v.Value.Continents.Contains(continent, StringComparer.OrdinalIgnoreCase)) + .Select(kvp => (kvp.Key, kvp.Value)) + .ToList(); + if (shardConfigs.Any()) return shardConfigs; + + shardConfigs = _shardConfigs + .Where(v => v.Value.Continents.Contains("*", StringComparer.OrdinalIgnoreCase)) + .Select(kvp => (kvp.Key, kvp.Value)) + .ToList(); + if (shardConfigs.Any()) return shardConfigs; + + var fallback = new ShardConfiguration() + { + Continents = ["*"], + FileMatch = ".*", + RegionUris = new(StringComparer.Ordinal) + { + { "Central", _configurationService.GetValue(nameof(StaticFilesServerConfiguration.CdnFullUrl)) } + } + }; + + return [(string.Empty, fallback)]; + } + + public long ApplyFileInventoryUpdate(string shardName, ShardFileInventoryUpdateDto update) + { + if (!_shardConfigs.ContainsKey(shardName)) + throw new InvalidOperationException("Shard not registered"); + + var inventory = _shardFileInventory.GetOrAdd(shardName, _ => new ShardFileInventory()); + lock (inventory.SyncRoot) + { + if (update.IsFullSnapshot && update.Sequence <= inventory.Sequence) + { + inventory.Files = new HashSet(update.Added ?? [], StringComparer.OrdinalIgnoreCase); + inventory.Sequence = update.Sequence; + return inventory.Sequence; + } + + if (update.Sequence <= inventory.Sequence) + { + return inventory.Sequence; + } + + if (update.IsFullSnapshot) + { + inventory.Files = new HashSet(update.Added ?? [], StringComparer.OrdinalIgnoreCase); + } + else + { + if (update.Added != null) + { + foreach (var hash in update.Added) + { + if (!string.IsNullOrWhiteSpace(hash)) + inventory.Files.Add(hash); + } + } + + if (update.Removed != null) + { + foreach (var hash in update.Removed) + { + if (!string.IsNullOrWhiteSpace(hash)) + inventory.Files.Remove(hash); + } + } + } + + inventory.Sequence = update.Sequence; + return inventory.Sequence; + } + } + + public bool ShardHasFile(string shardName, string hash) + { + if (!_shardFileInventory.TryGetValue(shardName, out var inventory)) + return false; + + lock (inventory.SyncRoot) + { + return inventory.Files.Contains(hash); + } + } + public void ShardHeartbeat(string shardName) { if (!_shardConfigs.ContainsKey(shardName)) @@ -87,6 +186,7 @@ public class MainServerShardRegistrationService : IHostedService { _shardHeartbeats.TryRemove(kvp.Key, out _); _shardConfigs.TryRemove(kvp.Key, out _); + _shardFileInventory.TryRemove(kvp.Key, out _); } } diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileCleanupService.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileCleanupService.cs index 9615978..e6e6698 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileCleanupService.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileCleanupService.cs @@ -12,13 +12,16 @@ public class ShardFileCleanupService : IHostedService private readonly IConfigurationService _configuration; private readonly ILogger _logger; private readonly LightlessMetrics _metrics; + private readonly IShardFileInventoryReporter _inventoryReporter; private CancellationTokenSource _cleanupCts; - public ShardFileCleanupService(LightlessMetrics metrics, ILogger logger, IConfigurationService configuration) + public ShardFileCleanupService(LightlessMetrics metrics, ILogger logger, IConfigurationService configuration, + IShardFileInventoryReporter inventoryReporter) { _metrics = metrics; _logger = logger; _configuration = configuration; + _inventoryReporter = inventoryReporter; _cacheDir = _configuration.GetValue(nameof(StaticFilesServerConfiguration.CacheDirectory)); } @@ -99,6 +102,7 @@ public class ShardFileCleanupService : IHostedService _metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, oldestFile.Length); _metrics.DecGauge(MetricsAPI.GaugeFilesTotal); _logger.LogInformation("Deleting {oldestFile} with size {size}MiB", oldestFile.FullName, ByteSize.FromBytes(oldestFile.Length).MebiBytes); + ReportRemoval(oldestFile.Name); oldestFile.Delete(); } } @@ -135,6 +139,7 @@ public class ShardFileCleanupService : IHostedService _metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length); _metrics.DecGauge(MetricsAPI.GaugeFilesTotal); _logger.LogInformation("File outdated: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes); + ReportRemoval(file.Name); file.Delete(); } else if (forcedDeletionAfterHours > 0 && file.LastWriteTime < prevTimeForcedDeletion) @@ -142,6 +147,7 @@ public class ShardFileCleanupService : IHostedService _metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length); _metrics.DecGauge(MetricsAPI.GaugeFilesTotal); _logger.LogInformation("File forcefully deleted: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes); + ReportRemoval(file.Name); file.Delete(); } else if (file.Length == 0 && !string.Equals(file.Extension, ".dl", StringComparison.OrdinalIgnoreCase)) @@ -149,6 +155,7 @@ public class ShardFileCleanupService : IHostedService _metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length); _metrics.DecGauge(MetricsAPI.GaugeFilesTotal); _logger.LogInformation("File with size 0 deleted: {filename}", file.Name); + ReportRemoval(file.Name); file.Delete(); } @@ -160,4 +167,12 @@ public class ShardFileCleanupService : IHostedService _logger.LogWarning(ex, "Error during file cleanup of old files"); } } + + private void ReportRemoval(string fileName) + { + if (fileName.Length == 40 && fileName.All(char.IsAsciiLetterOrDigit)) + { + _inventoryReporter.ReportRemoved(fileName); + } + } } diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileInventoryReporter.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileInventoryReporter.cs new file mode 100644 index 0000000..e6745b8 --- /dev/null +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileInventoryReporter.cs @@ -0,0 +1,282 @@ +using LightlessSync.API.Routes; +using LightlessSyncShared.Models; +using LightlessSyncShared.Services; +using LightlessSyncShared.Utils; +using LightlessSyncShared.Utils.Configuration; +using System.Net.Http.Json; +using System.Linq; + +namespace LightlessSyncStaticFilesServer.Services; + +public interface IShardFileInventoryReporter +{ + void ReportAdded(string hash); + void ReportRemoved(string hash); +} + +public sealed class NullShardFileInventoryReporter : IShardFileInventoryReporter +{ + public void ReportAdded(string hash) { } + public void ReportRemoved(string hash) { } +} + +public sealed class ShardFileInventoryReporter : IHostedService, IShardFileInventoryReporter +{ + private static readonly TimeSpan ResyncInterval = TimeSpan.FromMinutes(30); + private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(10); + private static readonly TimeSpan BatchDelay = TimeSpan.FromSeconds(2); + + private readonly IConfigurationService _configurationService; + private readonly ILogger _logger; + private readonly HttpClient _httpClient = new(); + private readonly string _cacheDir; + private readonly object _pendingLock = new(); + private readonly SemaphoreSlim _signal = new(0); + + private HashSet _pendingAdds = new(StringComparer.OrdinalIgnoreCase); + private HashSet _pendingRemoves = new(StringComparer.OrdinalIgnoreCase); + private CancellationTokenSource? _cts; + private Task? _processTask; + private Task? _resyncTask; + private long _sequence; + private bool _resyncRequested; + + public ShardFileInventoryReporter( + IConfigurationService configurationService, + ILogger logger, + ServerTokenGenerator serverTokenGenerator) + { + _configurationService = configurationService; + _logger = logger; + _cacheDir = configurationService.GetValue(nameof(StaticFilesServerConfiguration.CacheDirectory)); + _httpClient.DefaultRequestHeaders.Authorization = + new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", serverTokenGenerator.Token); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _resyncRequested = true; + _signal.Release(); + + _processTask = Task.Run(() => ProcessUpdatesAsync(_cts.Token), _cts.Token); + _resyncTask = Task.Run(() => ResyncLoopAsync(_cts.Token), _cts.Token); + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + if (_cts == null) + return; + + _cts.Cancel(); + + try + { + if (_processTask != null) await _processTask.ConfigureAwait(false); + } + catch + { + // ignore + } + + try + { + if (_resyncTask != null) await _resyncTask.ConfigureAwait(false); + } + catch + { + // ignore + } + + _httpClient.Dispose(); + } + + public void ReportAdded(string hash) + { + if (!IsValidHash(hash)) + return; + + lock (_pendingLock) + { + _pendingAdds.Add(hash); + _pendingRemoves.Remove(hash); + } + + _signal.Release(); + } + + public void ReportRemoved(string hash) + { + if (!IsValidHash(hash)) + return; + + lock (_pendingLock) + { + _pendingRemoves.Add(hash); + _pendingAdds.Remove(hash); + } + + _signal.Release(); + } + + private async Task ResyncLoopAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + await Task.Delay(ResyncInterval, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + + _resyncRequested = true; + _signal.Release(); + } + } + + private async Task ProcessUpdatesAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + await _signal.WaitAsync(BatchDelay, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + + while (!ct.IsCancellationRequested) + { + ShardFileInventoryUpdateDto? update = null; + + if (_resyncRequested) + { + var snapshot = BuildSnapshot(); + update = new ShardFileInventoryUpdateDto + { + Sequence = Interlocked.Increment(ref _sequence), + IsFullSnapshot = true, + Added = snapshot + }; + } + else + { + HashSet adds; + HashSet removes; + + lock (_pendingLock) + { + if (_pendingAdds.Count == 0 && _pendingRemoves.Count == 0) + break; + + adds = _pendingAdds; + removes = _pendingRemoves; + _pendingAdds = new HashSet(StringComparer.OrdinalIgnoreCase); + _pendingRemoves = new HashSet(StringComparer.OrdinalIgnoreCase); + } + + update = new ShardFileInventoryUpdateDto + { + Sequence = Interlocked.Increment(ref _sequence), + Added = adds.ToList(), + Removed = removes.ToList() + }; + } + + if (update == null) + break; + + await SendUpdateWithRetryAsync(update, ct).ConfigureAwait(false); + + if (update.IsFullSnapshot) + { + lock (_pendingLock) + { + _pendingAdds.Clear(); + _pendingRemoves.Clear(); + _resyncRequested = false; + } + } + } + } + } + + private async Task SendUpdateWithRetryAsync(ShardFileInventoryUpdateDto update, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + await SendUpdateAsync(update, ct).ConfigureAwait(false); + return; + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to send shard file inventory update (seq {seq})", update.Sequence); + try + { + await Task.Delay(RetryDelay, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + throw; + } + } + } + } + + private async Task SendUpdateAsync(ShardFileInventoryUpdateDto update, CancellationToken ct) + { + var mainServer = _configurationService.GetValue(nameof(StaticFilesServerConfiguration.MainFileServerAddress)); + if (mainServer == null) + throw new InvalidOperationException("Main server address is not configured."); + + using var response = await _httpClient.PostAsJsonAsync( + LightlessFiles.MainShardFilesFullPath(mainServer), + update, + ct).ConfigureAwait(false); + + response.EnsureSuccessStatusCode(); + + var ack = await response.Content.ReadFromJsonAsync(cancellationToken: ct) + .ConfigureAwait(false); + + if (ack == null || ack.AppliedSequence < update.Sequence) + throw new InvalidOperationException($"Main server did not apply update {update.Sequence}."); + } + + private List BuildSnapshot() + { + var hashes = new List(); + + if (string.IsNullOrWhiteSpace(_cacheDir) || !Directory.Exists(_cacheDir)) + return hashes; + + foreach (var file in Directory.EnumerateFiles(_cacheDir, "*", SearchOption.AllDirectories)) + { + var name = Path.GetFileName(file); + if (name.EndsWith(".dl", StringComparison.OrdinalIgnoreCase)) + continue; + + if (IsValidHash(name)) + hashes.Add(name); + } + + return hashes; + } + + private static bool IsValidHash(string hash) + { + return hash.Length == 40 && hash.All(char.IsAsciiLetterOrDigit); + } +} diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Startup.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Startup.cs index baa21d5..559e365 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Startup.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Startup.cs @@ -97,6 +97,7 @@ public class Startup // specific services if (_isMain) { + services.AddSingleton(); services.AddSingleton(); services.AddHostedService(); services.AddSingleton, LightlessConfigurationServiceServer>(); @@ -184,6 +185,9 @@ public class Startup } else { + services.AddSingleton(); + services.AddSingleton(sp => sp.GetRequiredService()); + services.AddHostedService(sp => sp.GetRequiredService()); services.AddSingleton(); services.AddHostedService(s => s.GetRequiredService()); services.AddSingleton();