Files
LightlessClient/LightlessSync/WebAPI/Files/FileUploadManager.cs
defnotken 835a0a637d
All checks were successful
Tag and Release Lightless / tag-and-release (push) Successful in 2m27s
2.0.0 (#92)
2.0.0 Changes:

- Reworked shell finder UI with compact or list view with profile tags showing with the listing, allowing moderators to broadcast the syncshell as well to have it be used more.
- Reworked user list in syncshell admin screen to have filter visible and moved away from table to its own thing, allowing to copy uid/note/alias when clicking on the name.
- Reworked download bars and download box to make it look more modern, removed the jitter around, so it shouldn't vibrate around much.
- Chat has been added to the top menu, working in Zone or in Syncshells to be used there.
- Paired system has been revamped to make pausing and unpausing faster, and loading people should be faster as well.
- Moved to the internal object table to have faster load times for users; people should load in faster
- Compactor is running on a multi-threaded level instead of single-threaded; this should increase the speed of compacting files
- Nameplate Service has been reworked so it wouldn't use the nameplate handler anymore.
- Files can be resized when downloading to reduce load on users if they aren't compressed. (can be toggled to resize all).
- Penumbra Collections are now only made when people are visible, reducing the load on boot-up when having many syncshells in your list.
- Lightfinder plates have been moved away from using Nameplates, but will use an overlay.
- Main UI has been changed a bit with a gradient, and on hover will glow up now.
- Reworked Profile UI for Syncshell and Users to be more user-facing with more customizable items.
- Reworked Settings UI to look more modern.
- Performance should be better due to new systems that would dispose of the collections and better caching of items.

Co-authored-by: defnotken <itsdefnotken@gmail.com>
Co-authored-by: azyges <aaaaaa@aaa.aaa>
Co-authored-by: choco <choco@patat.nl>
Co-authored-by: cake <admin@cakeandbanana.nl>
Co-authored-by: Minmoose <KennethBohr@outlook.com>
Reviewed-on: #92
2025-12-21 17:19:34 +00:00

405 lines
16 KiB
C#

using LightlessSync.API.Data;
using LightlessSync.API.Dto.Files;
using LightlessSync.API.Routes;
using LightlessSync.FileCache;
using LightlessSync.LightlessConfiguration;
using LightlessSync.Services.Mediator;
using LightlessSync.Services.ServerConfiguration;
using LightlessSync.UI;
using LightlessSync.WebAPI.Files.Models;
using Microsoft.Extensions.Logging;
using System.Net.Http.Headers;
using System.Net.Http.Json;
using System.Collections.Concurrent;
namespace LightlessSync.WebAPI.Files;
public sealed class FileUploadManager : DisposableMediatorSubscriberBase
{
private readonly FileCacheManager _fileDbManager;
private readonly LightlessConfigService _lightlessConfigService;
private readonly FileTransferOrchestrator _orchestrator;
private readonly ServerConfigurationManager _serverManager;
private readonly ConcurrentDictionary<string, DateTime> _verifiedUploadedHashes = new(StringComparer.Ordinal);
private readonly object _currentUploadsLock = new();
private readonly Dictionary<string, FileTransfer> _currentUploadsByHash = new(StringComparer.Ordinal);
private CancellationTokenSource? _uploadCancellationTokenSource = new();
public FileUploadManager(ILogger<FileUploadManager> logger, LightlessMediator mediator,
LightlessConfigService lightlessConfigService,
FileTransferOrchestrator orchestrator,
FileCacheManager fileDbManager,
ServerConfigurationManager serverManager) : base(logger, mediator)
{
_lightlessConfigService = lightlessConfigService;
_orchestrator = orchestrator;
_fileDbManager = fileDbManager;
_serverManager = serverManager;
Mediator.Subscribe<DisconnectedMessage>(this, (msg) =>
{
Reset();
});
}
public List<FileTransfer> CurrentUploads { get; } = [];
public bool IsReady => _orchestrator.IsInitialized;
public bool IsUploading
{
get
{
lock (_currentUploadsLock)
{
return CurrentUploads.Count > 0;
}
}
}
public List<FileTransfer> GetCurrentUploadsSnapshot()
{
lock (_currentUploadsLock)
{
return CurrentUploads.ToList();
}
}
public bool CancelUpload()
{
if (IsUploading)
{
Logger.LogDebug("Cancelling current upload");
_uploadCancellationTokenSource?.Cancel();
_uploadCancellationTokenSource?.Dispose();
_uploadCancellationTokenSource = null;
lock (_currentUploadsLock)
{
CurrentUploads.Clear();
_currentUploadsByHash.Clear();
}
return true;
}
return false;
}
public async Task DeleteAllFiles()
{
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
await _orchestrator.SendRequestAsync(HttpMethod.Post, LightlessFiles.ServerFilesDeleteAllFullPath(_orchestrator.FilesCdnUri!)).ConfigureAwait(false);
}
public async Task<List<string>> UploadFiles(List<string> hashesToUpload, IProgress<string> progress, CancellationToken? ct = null)
{
Logger.LogDebug("Trying to upload files");
var filesPresentLocally = hashesToUpload.Where(h => _fileDbManager.GetFileCacheByHash(h) != null).ToHashSet(StringComparer.Ordinal);
var locallyMissingFiles = hashesToUpload.Except(filesPresentLocally, StringComparer.Ordinal).ToList();
if (locallyMissingFiles.Count != 0)
{
return locallyMissingFiles;
}
progress.Report($"Starting upload for {filesPresentLocally.Count} files");
var filesToUpload = await FilesSend([.. filesPresentLocally], [], ct ?? CancellationToken.None).ConfigureAwait(false);
if (filesToUpload.Exists(f => f.IsForbidden))
{
return [.. filesToUpload.Where(f => f.IsForbidden).Select(f => f.Hash)];
}
var cancellationToken = ct ?? CancellationToken.None;
var parallelUploads = Math.Clamp(_lightlessConfigService.Current.ParallelUploads, 1, 8);
using SemaphoreSlim uploadSlots = new(parallelUploads, parallelUploads);
List<Task> uploadTasks = new();
int i = 1;
foreach (var file in filesToUpload)
{
progress.Report($"Uploading file {i++}/{filesToUpload.Count}. Please wait until the upload is completed.");
uploadTasks.Add(UploadSingleFileAsync(file, uploadSlots, cancellationToken));
}
await Task.WhenAll(uploadTasks).ConfigureAwait(false);
return [];
async Task UploadSingleFileAsync(UploadFileDto fileDto, SemaphoreSlim gate, CancellationToken token)
{
await gate.WaitAsync(token).ConfigureAwait(false);
try
{
token.ThrowIfCancellationRequested();
Logger.LogDebug("[{hash}] Compressing", fileDto.Hash);
var data = await _fileDbManager.GetCompressedFileData(fileDto.Hash, token).ConfigureAwait(false);
var cacheEntry = _fileDbManager.GetFileCacheByHash(data.Item1);
if (cacheEntry != null)
{
Logger.LogDebug("[{hash}] Starting upload for {filePath}", data.Item1, cacheEntry.ResolvedFilepath);
}
await UploadFile(data.Item2, fileDto.Hash, postProgress: false, token).ConfigureAwait(false);
}
finally
{
gate.Release();
}
}
}
public async Task<CharacterData> UploadFiles(CharacterData data, List<UserData> visiblePlayers)
{
CancelUpload();
_uploadCancellationTokenSource = new CancellationTokenSource();
var uploadToken = _uploadCancellationTokenSource.Token;
Logger.LogDebug("Sending Character data {hash} to service {url}", data.DataHash.Value, _serverManager.CurrentApiUrl);
HashSet<string> unverifiedUploads = GetUnverifiedFiles(data);
if (unverifiedUploads.Any())
{
await UploadUnverifiedFiles(unverifiedUploads, visiblePlayers, uploadToken).ConfigureAwait(false);
Logger.LogInformation("Upload complete for {hash}", data.DataHash.Value);
}
foreach (var kvp in data.FileReplacements)
{
data.FileReplacements[kvp.Key].RemoveAll(i => _orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, i.Hash, StringComparison.OrdinalIgnoreCase)));
}
return data;
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
Reset();
}
private async Task<List<UploadFileDto>> FilesSend(List<string> hashes, List<string> uids, CancellationToken ct)
{
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
FilesSendDto filesSendDto = new()
{
FileHashes = hashes,
UIDs = uids
};
var response = await _orchestrator.SendRequestAsync(HttpMethod.Post, LightlessFiles.ServerFilesFilesSendFullPath(_orchestrator.FilesCdnUri!), filesSendDto, ct).ConfigureAwait(false);
return await response.Content.ReadFromJsonAsync<List<UploadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
}
private HashSet<string> GetUnverifiedFiles(CharacterData data)
{
HashSet<string> unverifiedUploadHashes = new(StringComparer.Ordinal);
foreach (var item in data.FileReplacements.SelectMany(c => c.Value.Where(f => string.IsNullOrEmpty(f.FileSwapPath)).Select(v => v.Hash).Distinct(StringComparer.Ordinal)).Distinct(StringComparer.Ordinal).ToList())
{
if (!_verifiedUploadedHashes.TryGetValue(item, out var verifiedTime))
{
verifiedTime = DateTime.MinValue;
}
if (verifiedTime < DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(10)))
{
Logger.LogTrace("Verifying {item}, last verified: {date}", item, verifiedTime);
unverifiedUploadHashes.Add(item);
}
}
return unverifiedUploadHashes;
}
private void Reset()
{
_uploadCancellationTokenSource?.Cancel();
_uploadCancellationTokenSource?.Dispose();
_uploadCancellationTokenSource = null;
lock (_currentUploadsLock)
{
CurrentUploads.Clear();
_currentUploadsByHash.Clear();
}
_verifiedUploadedHashes.Clear();
}
private async Task UploadFile(byte[] compressedFile, string fileHash, bool postProgress, CancellationToken uploadToken)
{
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
Logger.LogInformation("[{hash}] Uploading {size}", fileHash, UiSharedService.ByteToString(compressedFile.Length));
if (uploadToken.IsCancellationRequested) return;
try
{
await UploadFileStream(compressedFile, fileHash, _lightlessConfigService.Current.UseAlternativeFileUpload, postProgress, uploadToken).ConfigureAwait(false);
_verifiedUploadedHashes[fileHash] = DateTime.UtcNow;
}
catch (Exception ex)
{
if (!_lightlessConfigService.Current.UseAlternativeFileUpload && ex is not OperationCanceledException)
{
Logger.LogWarning(ex, "[{hash}] Error during file upload, trying alternative file upload", fileHash);
await UploadFileStream(compressedFile, fileHash, munged: true, postProgress, uploadToken).ConfigureAwait(false);
}
else
{
Logger.LogWarning(ex, "[{hash}] File upload cancelled", fileHash);
}
}
}
private async Task UploadFileStream(byte[] compressedFile, string fileHash, bool munged, bool postProgress, CancellationToken uploadToken)
{
if (munged)
{
FileDownloadManager.MungeBuffer(compressedFile.AsSpan());
}
using var ms = new MemoryStream(compressedFile);
Progress<UploadProgress>? prog = !postProgress ? null : new((prog) =>
{
try
{
lock (_currentUploadsLock)
{
if (_currentUploadsByHash.TryGetValue(fileHash, out var transfer))
{
transfer.Transferred = prog.Uploaded;
}
else
{
Logger.LogDebug("[{hash}] Could not find upload transfer during progress update", fileHash);
}
}
}
catch (Exception ex)
{
Logger.LogWarning(ex, "[{hash}] Could not set upload progress", fileHash);
}
});
var streamContent = new ProgressableStreamContent(ms, prog);
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
HttpResponseMessage response;
if (!munged)
response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, LightlessFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false);
else
response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, LightlessFiles.ServerFilesUploadMunged(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false);
Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode);
}
private async Task UploadUnverifiedFiles(HashSet<string> unverifiedUploadHashes, List<UserData> visiblePlayers, CancellationToken uploadToken)
{
unverifiedUploadHashes = unverifiedUploadHashes.Where(h => _fileDbManager.GetFileCacheByHash(h) != null).ToHashSet(StringComparer.Ordinal);
Logger.LogDebug("Verifying {count} files", unverifiedUploadHashes.Count);
var filesToUpload = await FilesSend([.. unverifiedUploadHashes], visiblePlayers.Select(p => p.UID).ToList(), uploadToken).ConfigureAwait(false);
foreach (var file in filesToUpload.Where(f => !f.IsForbidden).DistinctBy(f => f.Hash))
{
try
{
var uploadTransfer = new UploadFileTransfer(file)
{
Total = new FileInfo(_fileDbManager.GetFileCacheByHash(file.Hash)!.ResolvedFilepath).Length,
};
lock (_currentUploadsLock)
{
CurrentUploads.Add(uploadTransfer);
_currentUploadsByHash[file.Hash] = uploadTransfer;
}
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Tried to request file {hash} but file was not present", file.Hash);
}
}
foreach (var file in filesToUpload.Where(c => c.IsForbidden))
{
if (_orchestrator.ForbiddenTransfers.TrueForAll(f => !string.Equals(f.Hash, file.Hash, StringComparison.Ordinal)))
{
_orchestrator.ForbiddenTransfers.Add(new UploadFileTransfer(file)
{
LocalFile = _fileDbManager.GetFileCacheByHash(file.Hash)?.ResolvedFilepath ?? string.Empty,
});
}
_verifiedUploadedHashes[file.Hash] = DateTime.UtcNow;
}
long totalSize;
List<FileTransfer> pendingUploads;
lock (_currentUploadsLock)
{
totalSize = CurrentUploads.Sum(c => c.Total);
pendingUploads = CurrentUploads.Where(f => f.CanBeTransferred && !f.IsTransferred).ToList();
}
var parallelUploads = Math.Clamp(_lightlessConfigService.Current.ParallelUploads, 1, 8);
using SemaphoreSlim uploadSlots = new(parallelUploads, parallelUploads);
Logger.LogDebug("Compressing and uploading files");
List<Task> uploadTasks = new();
foreach (var transfer in pendingUploads)
{
uploadTasks.Add(UploadPendingFileAsync(transfer, uploadSlots, uploadToken));
}
await Task.WhenAll(uploadTasks).ConfigureAwait(false);
long compressedSize;
HashSet<string> uploadedHashes;
lock (_currentUploadsLock)
{
compressedSize = CurrentUploads.Sum(c => c.Total);
uploadedHashes = CurrentUploads.Select(u => u.Hash).ToHashSet(StringComparer.Ordinal);
}
Logger.LogDebug("Upload complete, compressed {size} to {compressed}", UiSharedService.ByteToString(totalSize), UiSharedService.ByteToString(compressedSize));
foreach (var file in unverifiedUploadHashes.Where(c => !uploadedHashes.Contains(c)))
{
_verifiedUploadedHashes[file] = DateTime.UtcNow;
}
lock (_currentUploadsLock)
{
CurrentUploads.Clear();
_currentUploadsByHash.Clear();
}
async Task UploadPendingFileAsync(FileTransfer transfer, SemaphoreSlim gate, CancellationToken token)
{
await gate.WaitAsync(token).ConfigureAwait(false);
try
{
token.ThrowIfCancellationRequested();
Logger.LogDebug("[{hash}] Compressing", transfer.Hash);
var data = await _fileDbManager.GetCompressedFileData(transfer.Hash, token).ConfigureAwait(false);
lock (_currentUploadsLock)
{
if (_currentUploadsByHash.TryGetValue(data.Item1, out var trackedUpload))
{
trackedUpload.Total = data.Item2.Length;
}
}
var cacheEntry = _fileDbManager.GetFileCacheByHash(data.Item1);
if (cacheEntry != null)
{
Logger.LogDebug("[{hash}] Starting upload for {filePath}", data.Item1, cacheEntry.ResolvedFilepath);
}
await UploadFile(data.Item2, transfer.Hash, true, token).ConfigureAwait(false);
}
finally
{
gate.Release();
}
}
}
}