From 557121a9b7461e71c02fa345ac1f5362fabb2ac6 Mon Sep 17 00:00:00 2001 From: cake Date: Fri, 7 Nov 2025 05:27:58 +0100 Subject: [PATCH 1/2] Added batching for the File Frag command for the iscompressed calls. --- LightlessSync/FileCache/FileCompactor.cs | 491 +++++++++++------- .../Compression/BatchFileFragService.cs | 245 +++++++++ 2 files changed, 546 insertions(+), 190 deletions(-) create mode 100644 LightlessSync/Services/Compression/BatchFileFragService.cs diff --git a/LightlessSync/FileCache/FileCompactor.cs b/LightlessSync/FileCache/FileCompactor.cs index 4722b1f..e6505b9 100644 --- a/LightlessSync/FileCache/FileCompactor.cs +++ b/LightlessSync/FileCache/FileCompactor.cs @@ -1,5 +1,6 @@ using LightlessSync.LightlessConfiguration; using LightlessSync.Services; +using LightlessSync.Services.Compression; using Microsoft.Extensions.Logging; using Microsoft.Win32.SafeHandles; using System.Collections.Concurrent; @@ -23,8 +24,12 @@ public sealed class FileCompactor : IDisposable private readonly Channel _compactionQueue; private readonly CancellationTokenSource _compactionCts = new(); - private readonly Task _compactionWorker; - + + private readonly List _workers = []; + private readonly SemaphoreSlim _globalGate; + private static readonly SemaphoreSlim _btrfsGate = new(4, 4); + private readonly BatchFilefragService _fragBatch; + private readonly WOF_FILE_COMPRESSION_INFO_V1 _efInfo = new() { Algorithm = (int)CompressionAlgorithm.XPRESS8K, @@ -57,11 +62,30 @@ public sealed class FileCompactor : IDisposable _compactionQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { - SingleReader = true, + SingleReader = false, SingleWriter = false }); - _compactionWorker = Task.Factory.StartNew(() => ProcessQueueAsync(_compactionCts.Token), _compactionCts.Token, TaskCreationOptions.LongRunning,TaskScheduler.Default).Unwrap(); + int workers = Math.Clamp(Math.Min(Environment.ProcessorCount / 2, 4), 1, 8); + _globalGate = new SemaphoreSlim(workers, workers); + int workerCount = Math.Max(workers * 2, workers); + + for (int i = 0; i < workerCount; i++) + { + _workers.Add(Task.Factory.StartNew( + () => ProcessQueueWorkerAsync(_compactionCts.Token), + _compactionCts.Token, + TaskCreationOptions.LongRunning, + TaskScheduler.Default).Unwrap()); + } + + _fragBatch = new BatchFilefragService( + useShell: _dalamudUtilService.IsWine, + log: _logger, + batchSize: 128, + flushMs: 25); + + _logger.LogInformation("FileCompactor started with {workers} workers", workerCount); } public bool MassCompactRunning { get; private set; } @@ -171,18 +195,12 @@ public sealed class FileCompactor : IDisposable bool isWine = _dalamudUtilService?.IsWine ?? false; string realPath = isWine ? ToLinuxPathIfWine(fileInfo.FullName, isWine) : fileInfo.FullName; - var fileName = "stat"; - var arguments = $"-c %b \"{realPath}\""; + (bool ok, string stdout, string stderr, int code) = + RunProcessDirect("stat", ["-c", "%b", realPath]); - (bool processControl, bool success) = StartProcessInfo(realPath, fileName, arguments, out Process? proc, out string stdout); + if (!ok || !long.TryParse(stdout.Trim(), out var blocks)) + throw new InvalidOperationException($"stat failed (exit {code}): {stderr}"); - if (!processControl && !success) - throw new InvalidOperationException($"stat failed: {proc}"); - - if (!long.TryParse(stdout.Trim(), out var blocks)) - throw new InvalidOperationException($"invalid stat output: {stdout}"); - - // st_blocks are always 512-byte on Linux enviroment. return (flowControl: false, value: blocks * 512L); } catch (Exception ex) @@ -224,18 +242,22 @@ public sealed class FileCompactor : IDisposable var fi = new FileInfo(filePath); if (!fi.Exists) { - _logger.LogTrace("Skip compact: missing {file}", filePath); + _logger.LogTrace("Skip compaction: missing {file}", filePath); return; } var fsType = GetFilesystemType(filePath, _dalamudUtilService.IsWine); - _logger.LogTrace("Detected filesystem {fs} for {file} (isWine={wine})", fsType, filePath, _dalamudUtilService.IsWine); var oldSize = fi.Length; - int blockSize = GetBlockSizeForPath(fi.FullName, _logger, _dalamudUtilService.IsWine); - if (oldSize < Math.Max(blockSize, 8 * 1024)) + + // We skipping small files (128KiB) as they slow down the system a lot for BTRFS. as BTRFS has a different blocksize it requires an different calculation. + long minSizeBytes = fsType == FilesystemType.Btrfs + ? Math.Max(blockSize * 2L, 128 * 1024L) + : Math.Max(blockSize, 8 * 1024L); + + if (oldSize < minSizeBytes) { - _logger.LogTrace("Skip compact: {file} < block {block}", filePath, blockSize); + _logger.LogTrace("Skip compaction: {file} ({size} B) < threshold ({th} B)", filePath, oldSize, minSizeBytes); return; } @@ -243,7 +265,7 @@ public sealed class FileCompactor : IDisposable { if (!IsWOFCompactedFile(filePath)) { - _logger.LogDebug("NTFS compact XPRESS8K: {file}", filePath); + _logger.LogDebug("NTFS compaction XPRESS8K: {file}", filePath); if (WOFCompressFile(filePath)) { var newSize = GetFileSizeOnDisk(fi); @@ -265,7 +287,7 @@ public sealed class FileCompactor : IDisposable { if (!IsBtrfsCompressedFile(filePath)) { - _logger.LogDebug("Btrfs compress zstd: {file}", filePath); + _logger.LogDebug("Btrfs compression zstd: {file}", filePath); if (BtrfsCompressFile(filePath)) { var newSize = GetFileSizeOnDisk(fi); @@ -299,7 +321,7 @@ public sealed class FileCompactor : IDisposable { try { - bool flowControl = DecompressWOFFile(path, out FileStream fs); + bool flowControl = DecompressWOFFile(path); if (!flowControl) { return; @@ -335,10 +357,9 @@ public sealed class FileCompactor : IDisposable /// Decompressing state private bool DecompressBtrfsFile(string path) { - var fs = new FileStream(path, FileMode.Open, FileAccess.ReadWrite, FileShare.Read); - try { + _btrfsGate.Wait(_compactionCts.Token); bool isWine = _dalamudUtilService?.IsWine ?? false; string realPath = isWine ? ToLinuxPathIfWine(path, isWine) : path; @@ -358,27 +379,25 @@ public sealed class FileCompactor : IDisposable return true; } - (bool flowControl, bool value) = FileStreamOpening(realPath, ref fs); + if (!ProbeFileReadable(realPath)) + return false; - if (!flowControl) + (bool ok, string stdout, string stderr, int code) = + isWine + ? RunProcessShell($"btrfs filesystem defragment -- {QuoteSingle(realPath)}") + : RunProcessDirect("btrfs", ["filesystem", "defragment", "--", realPath]); + + if (!ok) { - return value; - } - - string fileName = isWine ? "/bin/bash" : "btrfs"; - string command = isWine ? $"-c \"filesystem defragment -- \"{realPath}\"\"" : $"filesystem defragment -- \"{realPath}\""; - - (bool processControl, bool success) = StartProcessInfo(realPath, fileName, command, out Process? proc, out string stdout); - if (!processControl && !success) - { - return value; + _logger.LogWarning("btrfs defragment (decompress) failed for {file} (exit {code}): {stderr}", + realPath, code, stderr); + return false; } if (!string.IsNullOrWhiteSpace(stdout)) _logger.LogTrace("btrfs defragment output for {file}: {stdout}", realPath, stdout.Trim()); - _logger.LogInformation("Decompressed btrfs file successfully: {file}", realPath); - + _logger.LogInformation("Decompressed (rewritten) Btrfs file: {file}", realPath); return true; } catch (Exception ex) @@ -386,6 +405,11 @@ public sealed class FileCompactor : IDisposable _logger.LogWarning(ex, "Error rewriting {file} for Btrfs decompression", path); return false; } + finally + { + if (_btrfsGate.CurrentCount < 4) + _btrfsGate.Release(); + } } /// @@ -393,38 +417,40 @@ public sealed class FileCompactor : IDisposable /// /// Path of the compressed file /// Decompressing state - private bool DecompressWOFFile(string path, out FileStream fs) + private bool DecompressWOFFile(string path) { - fs = new FileStream(path, FileMode.Open, FileAccess.ReadWrite, FileShare.Read); - var handle = fs.SafeFileHandle; - - if (handle.IsInvalid) + if (TryIsWofExternal(path, out bool isExternal, out int algo)) { - _logger.LogWarning("Invalid handle: {file}", path); - return false; + if (!isExternal) + { + _logger.LogTrace("Already decompressed file: {file}", path); + return true; + } + var compressString = ((CompressionAlgorithm)algo).ToString(); + _logger.LogTrace("WOF compression (algo={algo}) detected for {file}", compressString, path); } - if (!DeviceIoControl(handle, FSCTL_DELETE_EXTERNAL_BACKING, - IntPtr.Zero, 0, IntPtr.Zero, 0, - out _, IntPtr.Zero)) + return WithFileHandleForWOF(path, FileAccess.ReadWrite, h => { - int err = Marshal.GetLastWin32Error(); + if (!DeviceIoControl(h, FSCTL_DELETE_EXTERNAL_BACKING, + IntPtr.Zero, 0, IntPtr.Zero, 0, + out uint _, IntPtr.Zero)) + { + int err = Marshal.GetLastWin32Error(); + // 342 error code means its been decompressed after the control, we handle it as it succesfully been decompressed. + if (err == 342) + { + _logger.LogTrace("Successfully decompressed NTFS file {file}", path); + return true; + } - if (err == 342) - { - _logger.LogTrace("File {file} not externally backed (already decompressed)", path); - } - else - { _logger.LogWarning("DeviceIoControl failed for {file} with Win32 error {err}", path, err); + return false; } - } - else - { - _logger.LogTrace("Successfully decompressed NTFS file {file}", path); - } - return true; + _logger.LogTrace("Successfully decompressed NTFS file {file}", path); + return true; + }); } /// @@ -455,7 +481,6 @@ public sealed class FileCompactor : IDisposable /// Compessing state private bool WOFCompressFile(string path) { - FileStream? fs = null; int size = Marshal.SizeOf(); IntPtr efInfoPtr = Marshal.AllocHGlobal(size); @@ -464,46 +489,28 @@ public sealed class FileCompactor : IDisposable Marshal.StructureToPtr(_efInfo, efInfoPtr, fDeleteOld: false); ulong length = (ulong)size; - (bool flowControl, bool value) = FileStreamOpening(path, ref fs); - - if (!flowControl) + return WithFileHandleForWOF(path, FileAccess.ReadWrite, h => { - return value; - } + int ret = WofSetFileDataLocation(h, WOF_PROVIDER_FILE, efInfoPtr, length); - if (fs == null) - { - _logger.LogWarning("Failed to open {file} for compression; skipping", path); - return false; - } + // 0x80070158 is the benign "already compressed/unsupported" style return + if (ret != 0 && ret != unchecked((int)0x80070158)) + { + _logger.LogWarning("Failed to compact {file}: {ret}", path, ret.ToString("X")); + return false; + } - var handle = fs.SafeFileHandle; - - if (handle.IsInvalid) - { - _logger.LogWarning("Invalid file handle for {file}", path); - return false; - } - - int ret = WofSetFileDataLocation(handle, WOF_PROVIDER_FILE, efInfoPtr, length); - - // 0x80070158 is WOF error whenever compression fails in an non-fatal way. - if (ret != 0 && ret != unchecked((int)0x80070158)) - { - _logger.LogWarning("Failed to compact {file}: {ret}", path, ret.ToString("X")); - return false; - } - - return true; + return true; + }); } catch (DllNotFoundException ex) { - _logger.LogTrace(ex, "WofUtil.dll not available, this DLL is needed for compression; skipping NTFS compaction for {file}", path); + _logger.LogTrace(ex, "WofUtil not available; skipping NTFS compaction for {file}", path); return false; } catch (EntryPointNotFoundException ex) { - _logger.LogTrace(ex, "WOF entrypoint missing (Wine/older OS); skipping NTFS compaction for {file}", path); + _logger.LogTrace(ex, "WOF entrypoint missing on this system (Wine/older OS); skipping NTFS compaction for {file}", path); return false; } catch (Exception ex) @@ -513,12 +520,8 @@ public sealed class FileCompactor : IDisposable } finally { - fs?.Dispose(); - if (efInfoPtr != IntPtr.Zero) - { Marshal.FreeHGlobal(efInfoPtr); - } } } @@ -549,6 +552,36 @@ public sealed class FileCompactor : IDisposable } } + /// + /// Checks if an File is compacted any WOF compression with an WOF backing + /// + /// Path of the file + /// State of the file, if its an external (no backing) and which algorithm if detected + private static bool TryIsWofExternal(string path, out bool isExternal, out int algorithm) + { + isExternal = false; + algorithm = 0; + try + { + uint buf = (uint)Marshal.SizeOf(); + int hr = WofIsExternalFile(path, out int ext, out _, out var info, ref buf); + if (hr == 0 && ext != 0) + { + isExternal = true; + algorithm = info.Algorithm; + } + return true; + } + catch (DllNotFoundException) + { + return false; + } + catch (EntryPointNotFoundException) + { + return false; + } + } + /// /// Checks if an File is compacted with Btrfs compression /// @@ -558,34 +591,23 @@ public sealed class FileCompactor : IDisposable { try { + _btrfsGate.Wait(_compactionCts.Token); + bool isWine = _dalamudUtilService?.IsWine ?? false; string realPath = isWine ? ToLinuxPathIfWine(path, isWine) : path; - var fi = new FileInfo(realPath); - - if (fi == null) - { - _logger.LogWarning("Failed to open {file} for checking on compression; skipping", realPath); - return false; - } - - string fileName = isWine ? "/bin/bash" : "filefrag"; - string command = isWine ? $"-c \"filefrag -v '{EscapeSingle(realPath)}'\"" : $"-v \"{realPath}\""; - (bool processControl, bool success) = StartProcessInfo(realPath, fileName, command, out Process? proc, out string stdout); - if (!processControl && !success) - { - return success; - } - - bool compressed = stdout.Contains("flags: compressed", StringComparison.OrdinalIgnoreCase); - _logger.LogTrace("Btrfs compression check for {file}: {compressed}", realPath, compressed); - return compressed; + return _fragBatch.IsCompressedAsync(realPath, _compactionCts.Token).GetAwaiter().GetResult(); } catch (Exception ex) { _logger.LogDebug(ex, "Failed to detect Btrfs compression for {file}", path); return false; } + finally + { + if (_btrfsGate.CurrentCount < 4) + _btrfsGate.Release(); + } } /// @@ -595,8 +617,6 @@ public sealed class FileCompactor : IDisposable /// Compessing state private bool BtrfsCompressFile(string path) { - FileStream? fs = null; - try { bool isWine = _dalamudUtilService?.IsWine ?? false; @@ -616,21 +636,22 @@ public sealed class FileCompactor : IDisposable return true; } - (bool flowControl, bool value) = FileStreamOpening(realPath, ref fs); + if (!ProbeFileReadable(realPath)) + return false; - if (!flowControl) + (bool ok, string stdout, string stderr, int code) = + isWine + ? RunProcessShell($"btrfs filesystem defragment -clzo -- {QuoteSingle(realPath)}") + : RunProcessDirect("btrfs", ["filesystem", "defragment", "-clzo", "--", realPath]); + + if (!ok) { - return value; + _logger.LogWarning("btrfs defragment failed for {file} (exit {code}): {stderr}", realPath, code, stderr); + return false; } - string fileName = isWine ? "/bin/bash" : "btrfs"; - string command = isWine ? $"-c \"btrfs filesystem defragment -czstd:1 -- \"{realPath}\"\"" : $"btrfs filesystem defragment -czstd:1 -- \"{realPath}\""; - - (bool processControl, bool success) = StartProcessInfo(realPath, fileName, command, out Process? proc, out string stdout); - if (!processControl && !success) - { - return value; - } + if (!string.IsNullOrWhiteSpace(stdout)) + _logger.LogTrace("btrfs output for {file}: {stdout}", realPath, stdout.Trim()); if (!string.IsNullOrWhiteSpace(stdout)) _logger.LogTrace("btrfs defragment output for {file}: {stdout}", realPath, stdout.Trim()); @@ -648,82 +669,171 @@ public sealed class FileCompactor : IDisposable /// - /// Trying opening file stream in certain amount of tries. + /// Probe file if its readable for certain amount of tries. /// /// Path where the file is located /// Filestream used for the function /// State of the filestream opening - private (bool flowControl, bool value) FileStreamOpening(string path, ref FileStream? fs) + private bool ProbeFileReadable(string path) { for (int attempt = 0; attempt < _maxRetries; attempt++) { try { - fs = new FileStream(path, FileMode.Open, FileAccess.ReadWrite, FileShare.Read); - break; + using var _ = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete); + return true; } - catch (IOException) + catch (IOException ex) { if (attempt == _maxRetries - 1) { - _logger.LogWarning("File still in use after {attempts} attempts, skipping compression for {file}", _maxRetries, path); - return (flowControl: false, value: false); + _logger.LogWarning(ex, "File still in use after {attempts} attempts, skipping {file}", _maxRetries, path); + return false; + } + int delay = 150 * (attempt + 1); + _logger.LogTrace(ex, "File busy, retrying in {delay}ms for {file}", delay, path); + Thread.Sleep(delay); + } + } + return false; + } + + /// + /// Attempt opening file stream for WOF functions + /// + /// File that has to be accessed + /// Permissions for the file + /// Access of the file stream for the WOF function to handle. + /// State of the attempt for the file + private bool WithFileHandleForWOF(string path, FileAccess access, Func body) + { + const FileShare share = FileShare.ReadWrite | FileShare.Delete; + + for (int attempt = 0; attempt < _maxRetries; attempt++) + { + try + { + using var fs = new FileStream(path, FileMode.Open, access, share); + + var handle = fs.SafeFileHandle; + if (handle.IsInvalid) + { + _logger.LogWarning("Invalid file handle for {file}", path); + return false; + } + + return body(handle); + } + catch (IOException ex) + { + if (attempt == _maxRetries - 1) + { + _logger.LogWarning(ex, "File still in use after {attempts} attempts, skipping {file}", _maxRetries, path); + return false; } int delay = 150 * (attempt + 1); - _logger.LogTrace("File in use, retrying in {delay}ms for {file}", delay, path); + _logger.LogTrace(ex, "File busy, retrying in {delay}ms for {file}", delay, path); Thread.Sleep(delay); } } - return (flowControl: true, value: default); + return false; } /// - /// Starts an process with given Filename and Arguments + /// Runs an nonshell process meant for Linux/Wine enviroments /// - /// Path you want to use for the process (Compression is using these) - /// File of the command - /// Arguments used for the command - /// Returns process of the given command - /// Returns output of the given command - /// Returns if the process been done succesfully or not - private (bool processControl, bool success) StartProcessInfo(string path, string fileName, string arguments, out Process? proc, out string stdout) + /// File that has to be excuted + /// Arguments meant for the file/command + /// Working directory used to execute the file with/without arguments + /// Timeout timer for the process + /// State of the process, output of the process and error with exit code + private (bool ok, string stdout, string stderr, int exitCode) RunProcessDirect(string fileName, IEnumerable args, string? workingDir = null, int timeoutMs = 60000) { - var psi = new ProcessStartInfo + var psi = new ProcessStartInfo(fileName) { - FileName = fileName, - Arguments = arguments, RedirectStandardOutput = true, RedirectStandardError = true, UseShellExecute = false, - CreateNoWindow = true, - WorkingDirectory = "/" + CreateNoWindow = true }; - proc = Process.Start(psi); + if (!string.IsNullOrEmpty(workingDir)) psi.WorkingDirectory = workingDir; - if (proc == null) + foreach (var a in args) psi.ArgumentList.Add(a); + + using var proc = Process.Start(psi); + if (proc is null) return (false, "", "failed to start process", -1); + + var outTask = proc.StandardOutput.ReadToEndAsync(_compactionCts.Token); + var errTask = proc.StandardError.ReadToEndAsync(_compactionCts.Token); + + if (!proc.WaitForExit(timeoutMs)) { - _logger.LogWarning("Failed to start {arguments} for {file}", arguments, path); - stdout = string.Empty; - return (processControl: false, success: false); + try + { + proc.Kill(entireProcessTree: true); + } + catch + { + // Ignore this catch on the dispose + } + + Task.WaitAll([outTask, errTask], 1000, _compactionCts.Token); + return (false, outTask.Result, "timeout", -1); } - stdout = proc.StandardOutput.ReadToEnd(); - string stderr = proc.StandardError.ReadToEnd(); - proc.WaitForExit(); - - if (proc.ExitCode != 0 && !string.IsNullOrWhiteSpace(stderr)) - { - _logger.LogTrace("{arguments} exited with code {code}: {stderr}", arguments, proc.ExitCode, stderr); - return (processControl: false, success: false); - } - - return (processControl: true, success: default); + Task.WaitAll(outTask, errTask); + return (proc.ExitCode == 0, outTask.Result, errTask.Result, proc.ExitCode); } - private static string EscapeSingle(string p) => p.Replace("'", "'\\'", StringComparison.Ordinal); + /// + /// Runs an shell using '/bin/bash'/ command meant for Linux/Wine enviroments + /// + /// Command that has to be excuted + /// Timeout timer for the process + /// State of the process, output of the process and error with exit code + private (bool ok, string stdout, string stderr, int exitCode) RunProcessShell(string command, int timeoutMs = 60000) + { + var psi = new ProcessStartInfo("/bin/bash") + { + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true + }; + psi.ArgumentList.Add("-c"); + psi.ArgumentList.Add(command); + using var proc = Process.Start(psi); + if (proc is null) return (false, "", "failed to start /bin/bash", -1); + + var outTask = proc.StandardOutput.ReadToEndAsync(_compactionCts.Token); + var errTask = proc.StandardError.ReadToEndAsync(_compactionCts.Token); + + if (!proc.WaitForExit(timeoutMs)) + { + try + { + proc.Kill(entireProcessTree: true); + } + catch + { + // Ignore this catch on the dispose + } + + Task.WaitAll([outTask, errTask], 1000, _compactionCts.Token); + return (false, outTask.Result, "timeout", -1); + } + + Task.WaitAll(outTask, errTask); + return (proc.ExitCode == 0, outTask.Result, errTask.Result, proc.ExitCode); + } + + /// + /// Enqueues the compaction/decompation of an filepath. + /// + /// Filepath that will be enqueued private void EnqueueCompaction(string filePath) { // Safe-checks @@ -759,9 +869,10 @@ public sealed class FileCompactor : IDisposable return; } + // Channel got closed, skip enqueue on file if (!_compactionQueue.Writer.TryWrite(filePath)) { - _logger.LogTrace("Skip enqueue: compaction channel is closed {file}", filePath); + _logger.LogTrace("Skip enqueue: compaction channel is/got closed {file}", filePath); return; } @@ -775,7 +886,11 @@ public sealed class FileCompactor : IDisposable } } - private async Task ProcessQueueAsync(CancellationToken token) + /// + /// Process the queue with, meant for a worker/thread + /// + /// Cancellation token for the worker whenever it needs to be stopped + private async Task ProcessQueueWorkerAsync(CancellationToken token) { try { @@ -785,28 +900,20 @@ public sealed class FileCompactor : IDisposable { try { - if (token.IsCancellationRequested) - { - return; - } + token.ThrowIfCancellationRequested(); + await _globalGate.WaitAsync(token).ConfigureAwait(false); - if (!_lightlessConfigService.Current.UseCompactor) + try { - continue; + if (_lightlessConfigService.Current.UseCompactor && File.Exists(filePath)) + CompactFile(filePath); } - - if (!File.Exists(filePath)) + finally { - _logger.LogTrace("Skip compact (missing) {file}", filePath); - continue; + _globalGate.Release(); } - - CompactFile(filePath); - } - catch (OperationCanceledException) - { - return; } + catch (OperationCanceledException) { return; } catch (Exception ex) { _logger.LogWarning(ex, "Error compacting file {file}", filePath); @@ -818,9 +925,9 @@ public sealed class FileCompactor : IDisposable } } } - catch (OperationCanceledException) - { - _logger.LogDebug("Compaction queue cancelled"); + catch (OperationCanceledException) + { + // Shutting down worker, this exception is expected } } @@ -836,17 +943,21 @@ public sealed class FileCompactor : IDisposable [DllImport("WofUtil.dll", SetLastError = true)] private static extern int WofSetFileDataLocation(SafeFileHandle FileHandle, ulong Provider, IntPtr ExternalFileInfo, ulong Length); + private static string QuoteSingle(string s) => "'" + s.Replace("'", "'\\''", StringComparison.Ordinal) + "'"; + public void Dispose() { + _fragBatch?.Dispose() _compactionQueue.Writer.TryComplete(); _compactionCts.Cancel(); + try { - _compactionWorker.Wait(TimeSpan.FromSeconds(5)); + Task.WaitAll([.. _workers.Where(t => t != null)], TimeSpan.FromSeconds(5)); } catch { - //ignore on catch ^^ + // Ignore this catch on the dispose } finally { diff --git a/LightlessSync/Services/Compression/BatchFileFragService.cs b/LightlessSync/Services/Compression/BatchFileFragService.cs new file mode 100644 index 0000000..ae5bb71 --- /dev/null +++ b/LightlessSync/Services/Compression/BatchFileFragService.cs @@ -0,0 +1,245 @@ +using Microsoft.Extensions.Logging; +using System.Diagnostics; +using System.Text.RegularExpressions; +using System.Threading.Channels; + +namespace LightlessSync.Services.Compression +{ + /// + /// This batch service is made for the File Frag command, because of each file needing to use this command. + /// It's better to combine into one big command in batches then doing each command on each compressed call. + /// + public sealed partial class BatchFilefragService : IDisposable + { + private readonly Channel<(string path, TaskCompletionSource tcs)> _ch; + private readonly Task _worker; + private readonly bool _useShell; + private readonly ILogger _log; + private readonly int _batchSize; + private readonly TimeSpan _flushDelay; + private readonly CancellationTokenSource _cts = new(); + + public BatchFilefragService(bool useShell, ILogger log, int batchSize = 128, int flushMs = 25) + { + _useShell = useShell; + _log = log; + _batchSize = Math.Max(8, batchSize); + _flushDelay = TimeSpan.FromMilliseconds(Math.Max(5, flushMs)); + _ch = Channel.CreateUnbounded<(string, TaskCompletionSource)>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); + _worker = Task.Run(ProcessAsync, _cts.Token); + } + + /// + /// Checks if the file is compressed using Btrfs using tasks + /// + /// Linux/Wine path given for the file. + /// Cancellation Token + /// If it was compressed or not + public Task IsCompressedAsync(string linuxPath, CancellationToken ct = default) + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + if (!_ch.Writer.TryWrite((linuxPath, tcs))) + { + tcs.TrySetResult(false); + return tcs.Task; + } + + if (ct.CanBeCanceled) + { + var reg = ct.Register(() => tcs.TrySetCanceled(ct)); + _ = tcs.Task.ContinueWith(_ => reg.Dispose(), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + } + + return tcs.Task; + } + + /// + /// Process the pending compression tasks asynchronously + /// + /// Task + private async Task ProcessAsync() + { + var reader = _ch.Reader; + var pending = new List<(string path, TaskCompletionSource tcs)>(_batchSize); + + try + { + while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false)) + { + if (!reader.TryRead(out var first)) continue; + pending.Add(first); + + var flushAt = DateTime.UtcNow + _flushDelay; + while (pending.Count < _batchSize && DateTime.UtcNow < flushAt) + { + if (reader.TryRead(out var item)) + { + pending.Add(item); + continue; + } + + if ((flushAt - DateTime.UtcNow) <= TimeSpan.Zero) break; + try + { + await Task.Delay(TimeSpan.FromMilliseconds(5), _cts.Token).ConfigureAwait(false); + } + catch + { + break; + } + } + + try + { + var map = await RunBatchAsync(pending.Select(p => p.path)).ConfigureAwait(false); + foreach (var (path, tcs) in pending) + { + tcs.TrySetResult(map.TryGetValue(path, out var c) && c); + } + } + catch (Exception ex) + { + _log.LogDebug(ex, "filefrag batch failed. falling back to false"); + foreach (var (_, tcs) in pending) + { + tcs.TrySetResult(false); + } + } + finally + { + pending.Clear(); + } + } + } + catch (OperationCanceledException) + { + //Shutting down worker, exception called + } + } + + /// + /// Running the batch of each file in the queue in one file frag command. + /// + /// Paths that are needed for the command building for the batch return + /// Path of the file and if it went correctly + /// Failing to start filefrag on the system if this exception is found + private async Task> RunBatchAsync(IEnumerable paths) + { + var list = paths.Distinct(StringComparer.Ordinal).ToList(); + var result = list.ToDictionary(p => p, _ => false, StringComparer.Ordinal); + + ProcessStartInfo psi; + if (_useShell) + { + var inner = "filefrag -v -- " + string.Join(' ', list.Select(QuoteSingle)); + psi = new ProcessStartInfo + { + FileName = "/bin/bash", + Arguments = "-c " + QuoteDouble(inner), + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true, + WorkingDirectory = "/" + }; + } + else + { + psi = new ProcessStartInfo + { + FileName = "filefrag", + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false, + CreateNoWindow = true + }; + psi.ArgumentList.Add("-v"); + psi.ArgumentList.Add("--"); + foreach (var p in list) psi.ArgumentList.Add(p); + } + + using var proc = Process.Start(psi) ?? throw new InvalidOperationException("Failed to start filefrag"); + var stdoutTask = proc.StandardOutput.ReadToEndAsync(_cts.Token); + var stderrTask = proc.StandardError.ReadToEndAsync(_cts.Token); + await Task.WhenAll(stdoutTask, stderrTask).ConfigureAwait(false); + try + { + await proc.WaitForExitAsync(_cts.Token).ConfigureAwait(false); + } + catch (Exception ex) + { + _log.LogWarning(ex, "Error in the batch frag service. proc = {proc}", proc); + } + + var stdout = await stdoutTask.ConfigureAwait(false); + var stderr = await stderrTask.ConfigureAwait(false); + + if (proc.ExitCode != 0 && !string.IsNullOrWhiteSpace(stderr)) + _log.LogTrace("filefrag exited {code}: {err}", proc.ExitCode, stderr.Trim()); + + ParseFilefrag(stdout, result); + return result; + } + + /// + /// Parsing the string given from the File Frag command into mapping + /// + /// Output of the process from the File Frag + /// Mapping of the processed files + private static void ParseFilefrag(string output, Dictionary map) + { + var reHeaderColon = ColonRegex(); + var reHeaderSize = SizeRegex(); + + string? current = null; + using var sr = new StringReader(output); + for (string? line = sr.ReadLine(); line != null; line = sr.ReadLine()) + { + var m1 = reHeaderColon.Match(line); + if (m1.Success) { current = m1.Groups[1].Value; continue; } + + var m2 = reHeaderSize.Match(line); + if (m2.Success) { current = m2.Groups[1].Value; continue; } + + if (current is not null && line.Contains("flags:", StringComparison.OrdinalIgnoreCase) && + line.Contains("compressed", StringComparison.OrdinalIgnoreCase) && map.ContainsKey(current)) + { + map[current] = true; + } + } + } + + private static string QuoteSingle(string s) => "'" + s.Replace("'", "'\\''", StringComparison.Ordinal) + "'"; + private static string QuoteDouble(string s) => "\"" + s.Replace("\\", "\\\\", StringComparison.Ordinal).Replace("\"", "\\\"", StringComparison.Ordinal).Replace("$", "\\$", StringComparison.Ordinal).Replace("`", "\\`", StringComparison.Ordinal) + "\""; + + /// + /// Regex of the File Size return on the Linux/Wine systems, giving back the amount + /// + /// Regex of the File Size + [GeneratedRegex(@"^File size of (/.+?) is ", RegexOptions.ExplicitCapture | RegexOptions.CultureInvariant,matchTimeoutMilliseconds: 500)] + private static partial Regex SizeRegex(); + + /// + /// Regex on colons return on the Linux/Wine systems + /// + /// Regex of the colons in the given path + [GeneratedRegex(@"^(/.+?):\s", RegexOptions.ExplicitCapture | RegexOptions.CultureInvariant, matchTimeoutMilliseconds: 500)] + private static partial Regex ColonRegex(); + + public void Dispose() + { + _ch.Writer.TryComplete(); + _cts.Cancel(); + try + { + _worker.Wait(TimeSpan.FromSeconds(2), _cts.Token); + } + catch + { + // Ignore the catch in dispose + } + _cts.Dispose(); + } + } +} \ No newline at end of file From e9082ab8d0c5f79323eed1037ee396e05ab9c410 Mon Sep 17 00:00:00 2001 From: cake Date: Fri, 7 Nov 2025 06:07:34 +0100 Subject: [PATCH 2/2] forget semicolomn.. --- LightlessSync/FileCache/FileCompactor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LightlessSync/FileCache/FileCompactor.cs b/LightlessSync/FileCache/FileCompactor.cs index e6505b9..be89c1f 100644 --- a/LightlessSync/FileCache/FileCompactor.cs +++ b/LightlessSync/FileCache/FileCompactor.cs @@ -947,7 +947,7 @@ public sealed class FileCompactor : IDisposable public void Dispose() { - _fragBatch?.Dispose() + _fragBatch?.Dispose(); _compactionQueue.Writer.TryComplete(); _compactionCts.Cancel();