diff --git a/LightlessAPI b/LightlessAPI index 4ecd537..89bcc24 160000 --- a/LightlessAPI +++ b/LightlessAPI @@ -1 +1 @@ -Subproject commit 4ecd5375e63082f44b841bcba38d5dd3f4a2a79b +Subproject commit 89bcc242cf7c5a65f3b0addd5347c7eddecbdae1 diff --git a/LightlessSyncServer/LightlessSyncShared/Models/ShardFileInventoryUpdateDto.cs b/LightlessSyncServer/LightlessSyncShared/Models/ShardFileInventoryUpdateDto.cs new file mode 100644 index 0000000..4429e68 --- /dev/null +++ b/LightlessSyncServer/LightlessSyncShared/Models/ShardFileInventoryUpdateDto.cs @@ -0,0 +1,14 @@ +namespace LightlessSyncShared.Models; + +public sealed record ShardFileInventoryUpdateDto +{ + public long Sequence { get; init; } + public bool IsFullSnapshot { get; init; } + public List Added { get; init; } = new(); + public List Removed { get; init; } = new(); +} + +public sealed record ShardFileInventoryUpdateAckDto +{ + public long AppliedSequence { get; init; } +} diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/MainController.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/MainController.cs index 2c8a2be..7bd4719 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/MainController.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/MainController.cs @@ -1,6 +1,7 @@ using LightlessSync.API.Routes; using LightlessSyncShared.Utils.Configuration; using LightlessSyncStaticFilesServer.Services; +using LightlessSyncShared.Models; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; @@ -63,4 +64,19 @@ public class MainController : ControllerBase return BadRequest(); } } + + [HttpPost(LightlessFiles.Main_ShardFiles)] + public IActionResult ShardFilesUpdate([FromBody] ShardFileInventoryUpdateDto update) + { + try + { + var applied = _shardRegistrationService.ApplyFileInventoryUpdate(LightlessUser, update); + return Ok(new ShardFileInventoryUpdateAckDto { AppliedSequence = applied }); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Shard file inventory update failed: {shard}", LightlessUser); + return BadRequest(); + } + } } \ No newline at end of file diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ServerFilesController.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ServerFilesController.cs index d0b5d95..d5af8e9 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ServerFilesController.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ServerFilesController.cs @@ -82,7 +82,9 @@ public class ServerFilesController : ControllerBase } [HttpGet(LightlessFiles.ServerFiles_GetSizes)] - public async Task FilesGetSizes([FromBody] List hashes) + public async Task FilesGetSizes( + [FromBody] List hashes, + [FromQuery(Name = "avoidHost")] List? avoidHosts = null) { using var dbContext = await _lightlessDbContext.CreateDbContextAsync(); var forbiddenFiles = await dbContext.ForbiddenUploadEntries. @@ -94,27 +96,97 @@ public class ServerFilesController : ControllerBase .Select(k => new { k.Hash, k.Size, k.RawSize }) .ToListAsync().ConfigureAwait(false); - var allFileShards = _shardRegistrationService.GetConfigurationsByContinent(Continent); + var avoidHostSet = new HashSet(StringComparer.OrdinalIgnoreCase); + if (avoidHosts != null) + { + foreach (var host in avoidHosts) + { + if (!string.IsNullOrWhiteSpace(host)) + { + avoidHostSet.Add(host); + } + } + } + + var allFileShards = _shardRegistrationService.GetShardEntriesByContinent(Continent); + var shardContexts = new List(allFileShards.Count); + foreach (var shard in allFileShards) + { + shardContexts.Add(new ShardSelectionContext( + shard.ShardName, + shard.Config, + new Regex(shard.Config.FileMatch, RegexOptions.Compiled))); + } foreach (var file in cacheFile) { var forbiddenFile = forbiddenFiles.SingleOrDefault(f => string.Equals(f.Hash, file.Hash, StringComparison.OrdinalIgnoreCase)); - Uri? baseUrl = null; + Uri? queuedBaseUrl = null; + Uri? directBaseUrl = null; + var queuedUrls = new List(); + var hasFileUrls = new List(); + var hasFileDirectUrls = new List(); + var pullThroughUrls = new List(); + var pullThroughDirectUrls = new List(); if (forbiddenFile == null) { - var matchingShards = allFileShards.Where(f => new Regex(f.FileMatch).IsMatch(file.Hash)).ToList(); + var matchingShards = shardContexts + .Where(f => f.FileMatchRegex.IsMatch(file.Hash)) + .ToList(); - var shard = matchingShards.SelectMany(g => g.RegionUris) - .OrderBy(g => Guid.NewGuid()).FirstOrDefault(); + foreach (var shardEntry in matchingShards) + { + var regionUris = shardEntry.GetRegionUris(avoidHostSet); - baseUrl = shard.Value ?? _configuration.GetValue(nameof(StaticFilesServerConfiguration.CdnFullUrl)); + if (regionUris.Count == 0) + { + continue; + } + + foreach (var uri in regionUris) + { + AddBaseUrl(queuedUrls, uri); + } + + var hasFile = !string.IsNullOrEmpty(shardEntry.ShardName) + && _shardRegistrationService.ShardHasFile(shardEntry.ShardName, file.Hash); + + var baseList = hasFile ? hasFileUrls : pullThroughUrls; + var directList = hasFile ? hasFileDirectUrls : pullThroughDirectUrls; + + foreach (var uri in regionUris) + { + AddCandidate(baseList, directList, uri, file.Hash); + } + } + + if (queuedUrls.Count == 0) + { + var fallback = _configuration.GetValue(nameof(StaticFilesServerConfiguration.CdnFullUrl)); + if (fallback != null && (avoidHostSet.Count == 0 || !IsAvoidedHost(fallback, avoidHostSet))) + { + AddBaseUrl(queuedUrls, fallback); + } + } + + if (hasFileUrls.Count == 0 && pullThroughUrls.Count == 0) + { + var fallback = _configuration.GetValue(nameof(StaticFilesServerConfiguration.CdnFullUrl)); + if (fallback != null && (avoidHostSet.Count == 0 || !IsAvoidedHost(fallback, avoidHostSet))) + { + AddCandidate(pullThroughUrls, pullThroughDirectUrls, fallback, file.Hash); + } + } + + queuedBaseUrl = SelectPreferredBase(queuedUrls); + directBaseUrl = SelectPreferredBase(hasFileUrls, pullThroughUrls); } var cdnDownloadUrl = string.Empty; if (forbiddenFile == null) { - var directUri = _cdnDownloadUrlService.TryCreateDirectDownloadUri(baseUrl, file.Hash); + var directUri = _cdnDownloadUrlService.TryCreateDirectDownloadUri(directBaseUrl, file.Hash); if (directUri != null) { cdnDownloadUrl = directUri.ToString(); @@ -128,8 +200,10 @@ public class ServerFilesController : ControllerBase IsForbidden = forbiddenFile != null, Hash = file.Hash, Size = file.Size, - Url = baseUrl?.ToString() ?? string.Empty, + Url = queuedBaseUrl?.ToString() ?? string.Empty, CDNDownloadUrl = cdnDownloadUrl, + HasFileDirectUrls = hasFileDirectUrls, + PullThroughDirectUrls = pullThroughDirectUrls, RawSize = file.RawSize }); } @@ -144,22 +218,127 @@ public class ServerFilesController : ControllerBase return Ok(JsonSerializer.Serialize(allFileShards.SelectMany(t => t.RegionUris.Select(v => v.Value.ToString())))); } + private static bool IsAvoidedHost(Uri uri, HashSet avoidHosts) + { + if (avoidHosts.Count == 0) + return false; + + var host = uri.Host; + if (!string.IsNullOrWhiteSpace(host) && avoidHosts.Contains(host)) + return true; + + var authority = uri.Authority; + if (!string.IsNullOrWhiteSpace(authority) && avoidHosts.Contains(authority)) + return true; + + var absolute = uri.ToString().TrimEnd('/'); + return avoidHosts.Contains(absolute); + } + + private sealed class ShardSelectionContext + { + private List? _cachedUris; + private List? _cachedAvoidedUris; + + public ShardSelectionContext(string shardName, ShardConfiguration config, Regex fileMatchRegex) + { + ShardName = shardName; + Config = config; + FileMatchRegex = fileMatchRegex; + } + + public string ShardName { get; } + public ShardConfiguration Config { get; } + public Regex FileMatchRegex { get; } + + public List GetRegionUris(HashSet avoidHosts) + { + if (_cachedUris == null) + { + _cachedUris = Config.RegionUris.Values.ToList(); + } + + if (avoidHosts.Count == 0) + { + return _cachedUris; + } + + _cachedAvoidedUris ??= _cachedUris.Where(u => !IsAvoidedHost(u, avoidHosts)).ToList(); + return _cachedAvoidedUris.Count > 0 ? _cachedAvoidedUris : _cachedUris; + } + } + + private void AddCandidate(List baseUrls, List directUrls, Uri baseUri, string hash) + { + var baseUrl = baseUri.ToString(); + if (baseUrls.Any(u => string.Equals(u, baseUrl, StringComparison.OrdinalIgnoreCase))) + return; + + baseUrls.Add(baseUrl); + + var direct = _cdnDownloadUrlService.TryCreateDirectDownloadUri(baseUri, hash); + directUrls.Add(direct?.ToString() ?? string.Empty); + } + + private static void AddBaseUrl(List baseUrls, Uri baseUri) + { + var baseUrl = baseUri.ToString(); + if (baseUrls.Any(u => string.Equals(u, baseUrl, StringComparison.OrdinalIgnoreCase))) + return; + + baseUrls.Add(baseUrl); + } + + private static Uri? SelectPreferredBase(List urls) + { + if (urls.Count == 0) + return null; + + var selected = urls[Random.Shared.Next(urls.Count)]; + return Uri.TryCreate(selected, UriKind.Absolute, out var uri) ? uri : null; + } + + private static Uri? SelectPreferredBase(List hasFileUrls, List pullThroughUrls) + { + var list = hasFileUrls.Count > 0 ? hasFileUrls : pullThroughUrls; + if (list.Count == 0) + return null; + + var selected = list[Random.Shared.Next(list.Count)]; + return Uri.TryCreate(selected, UriKind.Absolute, out var uri) ? uri : null; + } + [HttpGet(LightlessFiles.ServerFiles_DirectDownload + "/{hash}")] [AllowAnonymous] public async Task DownloadFileDirect(string hash, [FromQuery] long expires, [FromQuery] string signature) { - var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature).ConfigureAwait(false); + var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature, HttpContext.RequestAborted).ConfigureAwait(false); return result.Status switch { CDNDownloadsService.ResultStatus.Disabled => NotFound(), CDNDownloadsService.ResultStatus.Unauthorized => Unauthorized(), CDNDownloadsService.ResultStatus.NotFound => NotFound(), - CDNDownloadsService.ResultStatus.Success => PhysicalFile(result.File!.FullName, "application/octet-stream"), + CDNDownloadsService.ResultStatus.Success => BuildDirectDownloadResult(result), _ => NotFound() }; } + private IActionResult BuildDirectDownloadResult(CDNDownloadsService.Result result) + { + if (result.Stream != null) + { + if (result.ContentLength.HasValue) + { + Response.ContentLength = result.ContentLength.Value; + } + + return new FileStreamResult(result.Stream, "application/octet-stream"); + } + + return PhysicalFile(result.File!.FullName, "application/octet-stream"); + } + [HttpPost(LightlessFiles.ServerFiles_FilesSend)] public async Task FilesSend([FromBody] FilesSendDto filesSendDto) { diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ShardServerFilesController.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ShardServerFilesController.cs index 819597e..29eb041 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ShardServerFilesController.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Controllers/ShardServerFilesController.cs @@ -20,15 +20,30 @@ public class ShardServerFilesController : ControllerBase [AllowAnonymous] public async Task DownloadFileDirect(string hash, [FromQuery] long expires, [FromQuery] string signature) { - var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature).ConfigureAwait(false); + var result = await _cdnDownloadsService.GetDownloadAsync(hash, expires, signature, HttpContext.RequestAborted).ConfigureAwait(false); return result.Status switch { CDNDownloadsService.ResultStatus.Disabled => NotFound(), CDNDownloadsService.ResultStatus.Unauthorized => Unauthorized(), CDNDownloadsService.ResultStatus.NotFound => NotFound(), - CDNDownloadsService.ResultStatus.Success => PhysicalFile(result.File!.FullName, "application/octet-stream"), + CDNDownloadsService.ResultStatus.Success => BuildDirectDownloadResult(result), _ => NotFound() }; } + + private IActionResult BuildDirectDownloadResult(CDNDownloadsService.Result result) + { + if (result.Stream != null) + { + if (result.ContentLength.HasValue) + { + Response.ContentLength = result.ContentLength.Value; + } + + return new FileStreamResult(result.Stream, "application/octet-stream"); + } + + return PhysicalFile(result.File!.FullName, "application/octet-stream"); + } } diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CDNDownloadsService.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CDNDownloadsService.cs index 3cbd661..5495a9a 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CDNDownloadsService.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CDNDownloadsService.cs @@ -1,4 +1,5 @@ using System.IO; +using System.Threading; using System.Threading.Tasks; namespace LightlessSyncStaticFilesServer.Services; @@ -13,7 +14,7 @@ public class CDNDownloadsService Success } - public readonly record struct Result(ResultStatus Status, FileInfo? File); + public readonly record struct Result(ResultStatus Status, FileInfo? File, Stream? Stream, long? ContentLength); private readonly CDNDownloadUrlService _cdnDownloadUrlService; private readonly CachedFileProvider _cachedFileProvider; @@ -26,31 +27,31 @@ public class CDNDownloadsService public bool DownloadsEnabled => _cdnDownloadUrlService.DirectDownloadsEnabled; - public async Task GetDownloadAsync(string hash, long expiresUnixSeconds, string signature) + public async Task GetDownloadAsync(string hash, long expiresUnixSeconds, string signature, CancellationToken ct) { if (!_cdnDownloadUrlService.DirectDownloadsEnabled) { - return new Result(ResultStatus.Disabled, null); + return new Result(ResultStatus.Disabled, null, null, null); } if (string.IsNullOrEmpty(signature) || string.IsNullOrEmpty(hash)) { - return new Result(ResultStatus.Unauthorized, null); + return new Result(ResultStatus.Unauthorized, null, null, null); } hash = hash.ToUpperInvariant(); if (!_cdnDownloadUrlService.TryValidateSignature(hash, expiresUnixSeconds, signature)) { - return new Result(ResultStatus.Unauthorized, null); + return new Result(ResultStatus.Unauthorized, null, null, null); } - var fileInfo = await _cachedFileProvider.DownloadAndGetLocalFileInfo(hash).ConfigureAwait(false); - if (fileInfo == null) + var fileResult = await _cachedFileProvider.GetFileStreamForDirectDownloadAsync(hash, ct).ConfigureAwait(false); + if (fileResult == null) { - return new Result(ResultStatus.NotFound, null); + return new Result(ResultStatus.NotFound, null, null, null); } - return new Result(ResultStatus.Success, fileInfo); + return new Result(ResultStatus.Success, fileResult.Value.File, fileResult.Value.Stream, fileResult.Value.ContentLength); } } diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CachedFileProvider.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CachedFileProvider.cs index fbc4ce6..2d0b320 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CachedFileProvider.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/CachedFileProvider.cs @@ -16,6 +16,7 @@ public sealed class CachedFileProvider : IDisposable private readonly FileStatisticsService _fileStatisticsService; private readonly LightlessMetrics _metrics; private readonly ServerTokenGenerator _generator; + private readonly IShardFileInventoryReporter _inventoryReporter; private readonly Uri _remoteCacheSourceUri; private readonly string _hotStoragePath; private readonly ConcurrentDictionary _currentTransfers = new(StringComparer.Ordinal); @@ -27,13 +28,15 @@ public sealed class CachedFileProvider : IDisposable private bool _isDistributionServer; public CachedFileProvider(IConfigurationService configuration, ILogger logger, - FileStatisticsService fileStatisticsService, LightlessMetrics metrics, ServerTokenGenerator generator) + FileStatisticsService fileStatisticsService, LightlessMetrics metrics, ServerTokenGenerator generator, + IShardFileInventoryReporter inventoryReporter) { _configuration = configuration; _logger = logger; _fileStatisticsService = fileStatisticsService; _metrics = metrics; _generator = generator; + _inventoryReporter = inventoryReporter; _remoteCacheSourceUri = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DistributionFileServerAddress), null); _isDistributionServer = configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.IsDistributionNode), false); _hotStoragePath = configuration.GetValue(nameof(StaticFilesServerConfiguration.CacheDirectory)); @@ -97,10 +100,11 @@ public sealed class CachedFileProvider : IDisposable _metrics.IncGauge(MetricsAPI.GaugeFilesTotal); _metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash).Length); + _inventoryReporter.ReportAdded(hash); response.Dispose(); } - private bool TryCopyFromColdStorage(string hash, string destinationFilePath) + private bool TryCopyFromColdStorage(string hash, string destinationFilePath) { if (!_configuration.GetValueOrDefault(nameof(StaticFilesServerConfiguration.UseColdStorage), false)) return false; @@ -124,6 +128,7 @@ public sealed class CachedFileProvider : IDisposable _metrics.IncGauge(MetricsAPI.GaugeFilesTotal); _metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, new FileInfo(destinationFilePath).Length); + _inventoryReporter.ReportAdded(hash); return true; } catch (Exception ex) @@ -134,6 +139,93 @@ public sealed class CachedFileProvider : IDisposable return false; } + public async Task GetFileStreamForDirectDownloadAsync(string hash, CancellationToken ct) + { + var destinationFilePath = FilePathUtil.GetFilePath(_hotStoragePath, hash); + + var existing = GetLocalFilePath(hash); + if (existing != null) + { + return new CachedFileStreamResult(existing, null, existing.Length); + } + + if (TryCopyFromColdStorage(hash, destinationFilePath)) + { + var coldFile = GetLocalFilePath(hash); + if (coldFile != null) + { + return new CachedFileStreamResult(coldFile, null, coldFile.Length); + } + } + + if (_remoteCacheSourceUri == null) + { + return null; + } + + TaskCompletionSource? completion = null; + + await _downloadSemaphore.WaitAsync(ct).ConfigureAwait(false); + try + { + if (_currentTransfers.TryGetValue(hash, out var downloadTask) + && !(downloadTask?.IsCompleted ?? true)) + { + completion = null; + } + else + { + completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _currentTransfers[hash] = completion.Task; + _metrics.IncGauge(MetricsAPI.GaugeFilesDownloadingFromCache); + } + } + finally + { + _downloadSemaphore.Release(); + } + + if (completion == null) + { + var waited = await DownloadAndGetLocalFileInfo(hash).ConfigureAwait(false); + if (waited == null) return null; + return new CachedFileStreamResult(waited, null, waited.Length); + } + + var downloadUrl = LightlessFiles.DistributionGetFullPath(_remoteCacheSourceUri, hash); + _logger.LogInformation("Did not find {hash}, streaming from {server}", hash, downloadUrl); + + using var requestMessage = new HttpRequestMessage(HttpMethod.Get, downloadUrl); + requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", _generator.Token); + + HttpResponseMessage response; + try + { + response = await _httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, ct).ConfigureAwait(false); + response.EnsureSuccessStatusCode(); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to stream {url}", downloadUrl); + FinalizeStreamingDownload(hash, null, destinationFilePath, completion, false, 0); + return null; + } + + var tempFileName = destinationFilePath + ".dl"; + var fileStream = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.Read, bufferSize: 64 * 1024, useAsync: true); + var sourceStream = await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false); + + var stream = new StreamingCacheWriteStream( + sourceStream, + fileStream, + response, + bytesWritten => FinalizeStreamingDownload(hash, tempFileName, destinationFilePath, completion, true, bytesWritten), + bytesWritten => FinalizeStreamingDownload(hash, tempFileName, destinationFilePath, completion, false, bytesWritten), + _logger); + + return new CachedFileStreamResult(null, stream, response.Content.Headers.ContentLength); + } + public async Task DownloadFileWhenRequired(string hash) { var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash); @@ -219,4 +311,174 @@ public sealed class CachedFileProvider : IDisposable { return hashes.Exists(_currentTransfers.Keys.Contains); } -} \ No newline at end of file + + private void FinalizeStreamingDownload(string hash, string? tempFileName, string destinationFilePath, + TaskCompletionSource completion, bool success, long bytesWritten) + { + try + { + if (success) + { + if (!string.IsNullOrEmpty(tempFileName)) + { + File.Move(tempFileName, destinationFilePath, true); + } + + var fi = FilePathUtil.GetFileInfoForHash(_hotStoragePath, hash); + if (fi != null) + { + _metrics.IncGauge(MetricsAPI.GaugeFilesTotal); + _metrics.IncGauge(MetricsAPI.GaugeFilesTotalSize, fi.Length); + _fileStatisticsService.LogFile(hash, fi.Length); + _inventoryReporter.ReportAdded(hash); + } + } + else if (!string.IsNullOrEmpty(tempFileName)) + { + try { File.Delete(tempFileName); } catch { /* ignore */ } + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to finalize streaming download for {hash} after {bytes} bytes", hash, bytesWritten); + } + finally + { + _metrics.DecGauge(MetricsAPI.GaugeFilesDownloadingFromCache); + _currentTransfers.Remove(hash, out _); + completion.TrySetResult(success); + } + } + + private sealed class StreamingCacheWriteStream : Stream + { + private readonly Stream _source; + private readonly FileStream _cacheStream; + private readonly HttpResponseMessage _response; + private readonly Action _onSuccess; + private readonly Action _onFailure; + private readonly ILogger _logger; + private long _bytesWritten; + private int _completed; + + public StreamingCacheWriteStream(Stream source, FileStream cacheStream, HttpResponseMessage response, + Action onSuccess, Action onFailure, ILogger logger) + { + _source = source; + _cacheStream = cacheStream; + _response = response; + _onSuccess = onSuccess; + _onFailure = onFailure; + _logger = logger; + } + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => _bytesWritten; + public override long Position + { + get => _bytesWritten; + set => throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (Volatile.Read(ref _completed) != 0) + { + return 0; + } + + try + { + int bytesRead = _source.Read(buffer, offset, count); + if (bytesRead > 0) + { + _cacheStream.Write(buffer, offset, bytesRead); + _bytesWritten += bytesRead; + return bytesRead; + } + + Complete(true); + return 0; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Streaming download failed while reading"); + Complete(false); + throw; + } + } + + public override async ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (Volatile.Read(ref _completed) != 0) + { + return 0; + } + + try + { + int bytesRead = await _source.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + if (bytesRead > 0) + { + await _cacheStream.WriteAsync(buffer.Slice(0, bytesRead), cancellationToken).ConfigureAwait(false); + _bytesWritten += bytesRead; + return bytesRead; + } + + Complete(true); + return 0; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Streaming download failed while reading"); + Complete(false); + throw; + } + } + + public override void Flush() + { + // no-op + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + Complete(false); + } + + base.Dispose(disposing); + } + + private void Complete(bool success) + { + if (Interlocked.Exchange(ref _completed, 1) != 0) + { + return; + } + + try { _cacheStream.Flush(); } catch { /* ignore */ } + try { _cacheStream.Dispose(); } catch { /* ignore */ } + try { _source.Dispose(); } catch { /* ignore */ } + try { _response.Dispose(); } catch { /* ignore */ } + + if (success) + { + _onSuccess(_bytesWritten); + } + else + { + _onFailure(_bytesWritten); + } + } + + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + } +} + +public readonly record struct CachedFileStreamResult(FileInfo? File, Stream? Stream, long? ContentLength); \ No newline at end of file diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/MainServerShardRegistrationService.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/MainServerShardRegistrationService.cs index 7f31c29..127f491 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/MainServerShardRegistrationService.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/MainServerShardRegistrationService.cs @@ -1,5 +1,6 @@ using LightlessSyncShared.Services; using LightlessSyncShared.Utils.Configuration; +using LightlessSyncShared.Models; using System.Collections.Concurrent; using System.Collections.Frozen; @@ -11,8 +12,16 @@ public class MainServerShardRegistrationService : IHostedService private readonly IConfigurationService _configurationService; private readonly ConcurrentDictionary _shardConfigs = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _shardHeartbeats = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary _shardFileInventory = new(StringComparer.Ordinal); private readonly CancellationTokenSource _periodicCheckCts = new(); + private sealed class ShardFileInventory + { + public long Sequence { get; set; } + public HashSet Files { get; set; } = new(StringComparer.OrdinalIgnoreCase); + public object SyncRoot { get; } = new(); + } + public MainServerShardRegistrationService(ILogger logger, IConfigurationService configurationService) { @@ -32,6 +41,7 @@ public class MainServerShardRegistrationService : IHostedService _shardHeartbeats[shardName] = DateTime.UtcNow; _shardConfigs[shardName] = shardConfiguration; + _shardFileInventory.TryAdd(shardName, new ShardFileInventory()); } public void UnregisterShard(string shardName) @@ -40,6 +50,7 @@ public class MainServerShardRegistrationService : IHostedService _shardHeartbeats.TryRemove(shardName, out _); _shardConfigs.TryRemove(shardName, out _); + _shardFileInventory.TryRemove(shardName, out _); } public List GetConfigurationsByContinent(string continent) @@ -56,6 +67,94 @@ public class MainServerShardRegistrationService : IHostedService } }]; } + public List<(string ShardName, ShardConfiguration Config)> GetShardEntriesByContinent(string continent) + { + var shardConfigs = _shardConfigs + .Where(v => v.Value.Continents.Contains(continent, StringComparer.OrdinalIgnoreCase)) + .Select(kvp => (kvp.Key, kvp.Value)) + .ToList(); + if (shardConfigs.Any()) return shardConfigs; + + shardConfigs = _shardConfigs + .Where(v => v.Value.Continents.Contains("*", StringComparer.OrdinalIgnoreCase)) + .Select(kvp => (kvp.Key, kvp.Value)) + .ToList(); + if (shardConfigs.Any()) return shardConfigs; + + var fallback = new ShardConfiguration() + { + Continents = ["*"], + FileMatch = ".*", + RegionUris = new(StringComparer.Ordinal) + { + { "Central", _configurationService.GetValue(nameof(StaticFilesServerConfiguration.CdnFullUrl)) } + } + }; + + return [(string.Empty, fallback)]; + } + + public long ApplyFileInventoryUpdate(string shardName, ShardFileInventoryUpdateDto update) + { + if (!_shardConfigs.ContainsKey(shardName)) + throw new InvalidOperationException("Shard not registered"); + + var inventory = _shardFileInventory.GetOrAdd(shardName, _ => new ShardFileInventory()); + lock (inventory.SyncRoot) + { + if (update.IsFullSnapshot && update.Sequence <= inventory.Sequence) + { + inventory.Files = new HashSet(update.Added ?? [], StringComparer.OrdinalIgnoreCase); + inventory.Sequence = update.Sequence; + return inventory.Sequence; + } + + if (update.Sequence <= inventory.Sequence) + { + return inventory.Sequence; + } + + if (update.IsFullSnapshot) + { + inventory.Files = new HashSet(update.Added ?? [], StringComparer.OrdinalIgnoreCase); + } + else + { + if (update.Added != null) + { + foreach (var hash in update.Added) + { + if (!string.IsNullOrWhiteSpace(hash)) + inventory.Files.Add(hash); + } + } + + if (update.Removed != null) + { + foreach (var hash in update.Removed) + { + if (!string.IsNullOrWhiteSpace(hash)) + inventory.Files.Remove(hash); + } + } + } + + inventory.Sequence = update.Sequence; + return inventory.Sequence; + } + } + + public bool ShardHasFile(string shardName, string hash) + { + if (!_shardFileInventory.TryGetValue(shardName, out var inventory)) + return false; + + lock (inventory.SyncRoot) + { + return inventory.Files.Contains(hash); + } + } + public void ShardHeartbeat(string shardName) { if (!_shardConfigs.ContainsKey(shardName)) @@ -87,6 +186,7 @@ public class MainServerShardRegistrationService : IHostedService { _shardHeartbeats.TryRemove(kvp.Key, out _); _shardConfigs.TryRemove(kvp.Key, out _); + _shardFileInventory.TryRemove(kvp.Key, out _); } } diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileCleanupService.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileCleanupService.cs index 9615978..e6e6698 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileCleanupService.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileCleanupService.cs @@ -12,13 +12,16 @@ public class ShardFileCleanupService : IHostedService private readonly IConfigurationService _configuration; private readonly ILogger _logger; private readonly LightlessMetrics _metrics; + private readonly IShardFileInventoryReporter _inventoryReporter; private CancellationTokenSource _cleanupCts; - public ShardFileCleanupService(LightlessMetrics metrics, ILogger logger, IConfigurationService configuration) + public ShardFileCleanupService(LightlessMetrics metrics, ILogger logger, IConfigurationService configuration, + IShardFileInventoryReporter inventoryReporter) { _metrics = metrics; _logger = logger; _configuration = configuration; + _inventoryReporter = inventoryReporter; _cacheDir = _configuration.GetValue(nameof(StaticFilesServerConfiguration.CacheDirectory)); } @@ -99,6 +102,7 @@ public class ShardFileCleanupService : IHostedService _metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, oldestFile.Length); _metrics.DecGauge(MetricsAPI.GaugeFilesTotal); _logger.LogInformation("Deleting {oldestFile} with size {size}MiB", oldestFile.FullName, ByteSize.FromBytes(oldestFile.Length).MebiBytes); + ReportRemoval(oldestFile.Name); oldestFile.Delete(); } } @@ -135,6 +139,7 @@ public class ShardFileCleanupService : IHostedService _metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length); _metrics.DecGauge(MetricsAPI.GaugeFilesTotal); _logger.LogInformation("File outdated: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes); + ReportRemoval(file.Name); file.Delete(); } else if (forcedDeletionAfterHours > 0 && file.LastWriteTime < prevTimeForcedDeletion) @@ -142,6 +147,7 @@ public class ShardFileCleanupService : IHostedService _metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length); _metrics.DecGauge(MetricsAPI.GaugeFilesTotal); _logger.LogInformation("File forcefully deleted: {fileName}, {fileSize}MiB", file.Name, ByteSize.FromBytes(file.Length).MebiBytes); + ReportRemoval(file.Name); file.Delete(); } else if (file.Length == 0 && !string.Equals(file.Extension, ".dl", StringComparison.OrdinalIgnoreCase)) @@ -149,6 +155,7 @@ public class ShardFileCleanupService : IHostedService _metrics.DecGauge(MetricsAPI.GaugeFilesTotalSize, file.Length); _metrics.DecGauge(MetricsAPI.GaugeFilesTotal); _logger.LogInformation("File with size 0 deleted: {filename}", file.Name); + ReportRemoval(file.Name); file.Delete(); } @@ -160,4 +167,12 @@ public class ShardFileCleanupService : IHostedService _logger.LogWarning(ex, "Error during file cleanup of old files"); } } + + private void ReportRemoval(string fileName) + { + if (fileName.Length == 40 && fileName.All(char.IsAsciiLetterOrDigit)) + { + _inventoryReporter.ReportRemoved(fileName); + } + } } diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileInventoryReporter.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileInventoryReporter.cs new file mode 100644 index 0000000..e6745b8 --- /dev/null +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Services/ShardFileInventoryReporter.cs @@ -0,0 +1,282 @@ +using LightlessSync.API.Routes; +using LightlessSyncShared.Models; +using LightlessSyncShared.Services; +using LightlessSyncShared.Utils; +using LightlessSyncShared.Utils.Configuration; +using System.Net.Http.Json; +using System.Linq; + +namespace LightlessSyncStaticFilesServer.Services; + +public interface IShardFileInventoryReporter +{ + void ReportAdded(string hash); + void ReportRemoved(string hash); +} + +public sealed class NullShardFileInventoryReporter : IShardFileInventoryReporter +{ + public void ReportAdded(string hash) { } + public void ReportRemoved(string hash) { } +} + +public sealed class ShardFileInventoryReporter : IHostedService, IShardFileInventoryReporter +{ + private static readonly TimeSpan ResyncInterval = TimeSpan.FromMinutes(30); + private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(10); + private static readonly TimeSpan BatchDelay = TimeSpan.FromSeconds(2); + + private readonly IConfigurationService _configurationService; + private readonly ILogger _logger; + private readonly HttpClient _httpClient = new(); + private readonly string _cacheDir; + private readonly object _pendingLock = new(); + private readonly SemaphoreSlim _signal = new(0); + + private HashSet _pendingAdds = new(StringComparer.OrdinalIgnoreCase); + private HashSet _pendingRemoves = new(StringComparer.OrdinalIgnoreCase); + private CancellationTokenSource? _cts; + private Task? _processTask; + private Task? _resyncTask; + private long _sequence; + private bool _resyncRequested; + + public ShardFileInventoryReporter( + IConfigurationService configurationService, + ILogger logger, + ServerTokenGenerator serverTokenGenerator) + { + _configurationService = configurationService; + _logger = logger; + _cacheDir = configurationService.GetValue(nameof(StaticFilesServerConfiguration.CacheDirectory)); + _httpClient.DefaultRequestHeaders.Authorization = + new System.Net.Http.Headers.AuthenticationHeaderValue("Bearer", serverTokenGenerator.Token); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _resyncRequested = true; + _signal.Release(); + + _processTask = Task.Run(() => ProcessUpdatesAsync(_cts.Token), _cts.Token); + _resyncTask = Task.Run(() => ResyncLoopAsync(_cts.Token), _cts.Token); + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + if (_cts == null) + return; + + _cts.Cancel(); + + try + { + if (_processTask != null) await _processTask.ConfigureAwait(false); + } + catch + { + // ignore + } + + try + { + if (_resyncTask != null) await _resyncTask.ConfigureAwait(false); + } + catch + { + // ignore + } + + _httpClient.Dispose(); + } + + public void ReportAdded(string hash) + { + if (!IsValidHash(hash)) + return; + + lock (_pendingLock) + { + _pendingAdds.Add(hash); + _pendingRemoves.Remove(hash); + } + + _signal.Release(); + } + + public void ReportRemoved(string hash) + { + if (!IsValidHash(hash)) + return; + + lock (_pendingLock) + { + _pendingRemoves.Add(hash); + _pendingAdds.Remove(hash); + } + + _signal.Release(); + } + + private async Task ResyncLoopAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + await Task.Delay(ResyncInterval, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + + _resyncRequested = true; + _signal.Release(); + } + } + + private async Task ProcessUpdatesAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + await _signal.WaitAsync(BatchDelay, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + + while (!ct.IsCancellationRequested) + { + ShardFileInventoryUpdateDto? update = null; + + if (_resyncRequested) + { + var snapshot = BuildSnapshot(); + update = new ShardFileInventoryUpdateDto + { + Sequence = Interlocked.Increment(ref _sequence), + IsFullSnapshot = true, + Added = snapshot + }; + } + else + { + HashSet adds; + HashSet removes; + + lock (_pendingLock) + { + if (_pendingAdds.Count == 0 && _pendingRemoves.Count == 0) + break; + + adds = _pendingAdds; + removes = _pendingRemoves; + _pendingAdds = new HashSet(StringComparer.OrdinalIgnoreCase); + _pendingRemoves = new HashSet(StringComparer.OrdinalIgnoreCase); + } + + update = new ShardFileInventoryUpdateDto + { + Sequence = Interlocked.Increment(ref _sequence), + Added = adds.ToList(), + Removed = removes.ToList() + }; + } + + if (update == null) + break; + + await SendUpdateWithRetryAsync(update, ct).ConfigureAwait(false); + + if (update.IsFullSnapshot) + { + lock (_pendingLock) + { + _pendingAdds.Clear(); + _pendingRemoves.Clear(); + _resyncRequested = false; + } + } + } + } + } + + private async Task SendUpdateWithRetryAsync(ShardFileInventoryUpdateDto update, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try + { + await SendUpdateAsync(update, ct).ConfigureAwait(false); + return; + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to send shard file inventory update (seq {seq})", update.Sequence); + try + { + await Task.Delay(RetryDelay, ct).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + throw; + } + } + } + } + + private async Task SendUpdateAsync(ShardFileInventoryUpdateDto update, CancellationToken ct) + { + var mainServer = _configurationService.GetValue(nameof(StaticFilesServerConfiguration.MainFileServerAddress)); + if (mainServer == null) + throw new InvalidOperationException("Main server address is not configured."); + + using var response = await _httpClient.PostAsJsonAsync( + LightlessFiles.MainShardFilesFullPath(mainServer), + update, + ct).ConfigureAwait(false); + + response.EnsureSuccessStatusCode(); + + var ack = await response.Content.ReadFromJsonAsync(cancellationToken: ct) + .ConfigureAwait(false); + + if (ack == null || ack.AppliedSequence < update.Sequence) + throw new InvalidOperationException($"Main server did not apply update {update.Sequence}."); + } + + private List BuildSnapshot() + { + var hashes = new List(); + + if (string.IsNullOrWhiteSpace(_cacheDir) || !Directory.Exists(_cacheDir)) + return hashes; + + foreach (var file in Directory.EnumerateFiles(_cacheDir, "*", SearchOption.AllDirectories)) + { + var name = Path.GetFileName(file); + if (name.EndsWith(".dl", StringComparison.OrdinalIgnoreCase)) + continue; + + if (IsValidHash(name)) + hashes.Add(name); + } + + return hashes; + } + + private static bool IsValidHash(string hash) + { + return hash.Length == 40 && hash.All(char.IsAsciiLetterOrDigit); + } +} diff --git a/LightlessSyncServer/LightlessSyncStaticFilesServer/Startup.cs b/LightlessSyncServer/LightlessSyncStaticFilesServer/Startup.cs index baa21d5..559e365 100644 --- a/LightlessSyncServer/LightlessSyncStaticFilesServer/Startup.cs +++ b/LightlessSyncServer/LightlessSyncStaticFilesServer/Startup.cs @@ -97,6 +97,7 @@ public class Startup // specific services if (_isMain) { + services.AddSingleton(); services.AddSingleton(); services.AddHostedService(); services.AddSingleton, LightlessConfigurationServiceServer>(); @@ -184,6 +185,9 @@ public class Startup } else { + services.AddSingleton(); + services.AddSingleton(sp => sp.GetRequiredService()); + services.AddHostedService(sp => sp.GetRequiredService()); services.AddSingleton(); services.AddHostedService(s => s.GetRequiredService()); services.AddSingleton();