Merge pull request 'Some changes on the file compression for linux and windows regards threading.' (#83) from linux-improvements into 1.12.4

Reviewed-on: #83
This commit was merged in pull request #83.
This commit is contained in:
2025-11-09 06:11:34 +01:00
2 changed files with 546 additions and 190 deletions

View File

@@ -1,5 +1,6 @@
using LightlessSync.LightlessConfiguration; using LightlessSync.LightlessConfiguration;
using LightlessSync.Services; using LightlessSync.Services;
using LightlessSync.Services.Compression;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Win32.SafeHandles; using Microsoft.Win32.SafeHandles;
using System.Collections.Concurrent; using System.Collections.Concurrent;
@@ -23,8 +24,12 @@ public sealed class FileCompactor : IDisposable
private readonly Channel<string> _compactionQueue; private readonly Channel<string> _compactionQueue;
private readonly CancellationTokenSource _compactionCts = new(); private readonly CancellationTokenSource _compactionCts = new();
private readonly Task _compactionWorker;
private readonly List<Task> _workers = [];
private readonly SemaphoreSlim _globalGate;
private static readonly SemaphoreSlim _btrfsGate = new(4, 4);
private readonly BatchFilefragService _fragBatch;
private readonly WOF_FILE_COMPRESSION_INFO_V1 _efInfo = new() private readonly WOF_FILE_COMPRESSION_INFO_V1 _efInfo = new()
{ {
Algorithm = (int)CompressionAlgorithm.XPRESS8K, Algorithm = (int)CompressionAlgorithm.XPRESS8K,
@@ -57,11 +62,30 @@ public sealed class FileCompactor : IDisposable
_compactionQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions _compactionQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{ {
SingleReader = true, SingleReader = false,
SingleWriter = false SingleWriter = false
}); });
_compactionWorker = Task.Factory.StartNew(() => ProcessQueueAsync(_compactionCts.Token), _compactionCts.Token, TaskCreationOptions.LongRunning,TaskScheduler.Default).Unwrap(); int workers = Math.Clamp(Math.Min(Environment.ProcessorCount / 2, 4), 1, 8);
_globalGate = new SemaphoreSlim(workers, workers);
int workerCount = Math.Max(workers * 2, workers);
for (int i = 0; i < workerCount; i++)
{
_workers.Add(Task.Factory.StartNew(
() => ProcessQueueWorkerAsync(_compactionCts.Token),
_compactionCts.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).Unwrap());
}
_fragBatch = new BatchFilefragService(
useShell: _dalamudUtilService.IsWine,
log: _logger,
batchSize: 128,
flushMs: 25);
_logger.LogInformation("FileCompactor started with {workers} workers", workerCount);
} }
public bool MassCompactRunning { get; private set; } public bool MassCompactRunning { get; private set; }
@@ -171,18 +195,12 @@ public sealed class FileCompactor : IDisposable
bool isWine = _dalamudUtilService?.IsWine ?? false; bool isWine = _dalamudUtilService?.IsWine ?? false;
string realPath = isWine ? ToLinuxPathIfWine(fileInfo.FullName, isWine) : fileInfo.FullName; string realPath = isWine ? ToLinuxPathIfWine(fileInfo.FullName, isWine) : fileInfo.FullName;
var fileName = "stat"; (bool ok, string stdout, string stderr, int code) =
var arguments = $"-c %b \"{realPath}\""; RunProcessDirect("stat", ["-c", "%b", realPath]);
(bool processControl, bool success) = StartProcessInfo(realPath, fileName, arguments, out Process? proc, out string stdout); if (!ok || !long.TryParse(stdout.Trim(), out var blocks))
throw new InvalidOperationException($"stat failed (exit {code}): {stderr}");
if (!processControl && !success)
throw new InvalidOperationException($"stat failed: {proc}");
if (!long.TryParse(stdout.Trim(), out var blocks))
throw new InvalidOperationException($"invalid stat output: {stdout}");
// st_blocks are always 512-byte on Linux enviroment.
return (flowControl: false, value: blocks * 512L); return (flowControl: false, value: blocks * 512L);
} }
catch (Exception ex) catch (Exception ex)
@@ -224,18 +242,22 @@ public sealed class FileCompactor : IDisposable
var fi = new FileInfo(filePath); var fi = new FileInfo(filePath);
if (!fi.Exists) if (!fi.Exists)
{ {
_logger.LogTrace("Skip compact: missing {file}", filePath); _logger.LogTrace("Skip compaction: missing {file}", filePath);
return; return;
} }
var fsType = GetFilesystemType(filePath, _dalamudUtilService.IsWine); var fsType = GetFilesystemType(filePath, _dalamudUtilService.IsWine);
_logger.LogTrace("Detected filesystem {fs} for {file} (isWine={wine})", fsType, filePath, _dalamudUtilService.IsWine);
var oldSize = fi.Length; var oldSize = fi.Length;
int blockSize = GetBlockSizeForPath(fi.FullName, _logger, _dalamudUtilService.IsWine); int blockSize = GetBlockSizeForPath(fi.FullName, _logger, _dalamudUtilService.IsWine);
if (oldSize < Math.Max(blockSize, 8 * 1024))
// We skipping small files (128KiB) as they slow down the system a lot for BTRFS. as BTRFS has a different blocksize it requires an different calculation.
long minSizeBytes = fsType == FilesystemType.Btrfs
? Math.Max(blockSize * 2L, 128 * 1024L)
: Math.Max(blockSize, 8 * 1024L);
if (oldSize < minSizeBytes)
{ {
_logger.LogTrace("Skip compact: {file} < block {block}", filePath, blockSize); _logger.LogTrace("Skip compaction: {file} ({size} B) < threshold ({th} B)", filePath, oldSize, minSizeBytes);
return; return;
} }
@@ -243,7 +265,7 @@ public sealed class FileCompactor : IDisposable
{ {
if (!IsWOFCompactedFile(filePath)) if (!IsWOFCompactedFile(filePath))
{ {
_logger.LogDebug("NTFS compact XPRESS8K: {file}", filePath); _logger.LogDebug("NTFS compaction XPRESS8K: {file}", filePath);
if (WOFCompressFile(filePath)) if (WOFCompressFile(filePath))
{ {
var newSize = GetFileSizeOnDisk(fi); var newSize = GetFileSizeOnDisk(fi);
@@ -265,7 +287,7 @@ public sealed class FileCompactor : IDisposable
{ {
if (!IsBtrfsCompressedFile(filePath)) if (!IsBtrfsCompressedFile(filePath))
{ {
_logger.LogDebug("Btrfs compress zstd: {file}", filePath); _logger.LogDebug("Btrfs compression zstd: {file}", filePath);
if (BtrfsCompressFile(filePath)) if (BtrfsCompressFile(filePath))
{ {
var newSize = GetFileSizeOnDisk(fi); var newSize = GetFileSizeOnDisk(fi);
@@ -299,7 +321,7 @@ public sealed class FileCompactor : IDisposable
{ {
try try
{ {
bool flowControl = DecompressWOFFile(path, out FileStream fs); bool flowControl = DecompressWOFFile(path);
if (!flowControl) if (!flowControl)
{ {
return; return;
@@ -335,10 +357,9 @@ public sealed class FileCompactor : IDisposable
/// <returns>Decompressing state</returns> /// <returns>Decompressing state</returns>
private bool DecompressBtrfsFile(string path) private bool DecompressBtrfsFile(string path)
{ {
var fs = new FileStream(path, FileMode.Open, FileAccess.ReadWrite, FileShare.Read);
try try
{ {
_btrfsGate.Wait(_compactionCts.Token);
bool isWine = _dalamudUtilService?.IsWine ?? false; bool isWine = _dalamudUtilService?.IsWine ?? false;
string realPath = isWine ? ToLinuxPathIfWine(path, isWine) : path; string realPath = isWine ? ToLinuxPathIfWine(path, isWine) : path;
@@ -358,27 +379,25 @@ public sealed class FileCompactor : IDisposable
return true; return true;
} }
(bool flowControl, bool value) = FileStreamOpening(realPath, ref fs); if (!ProbeFileReadable(realPath))
return false;
if (!flowControl) (bool ok, string stdout, string stderr, int code) =
isWine
? RunProcessShell($"btrfs filesystem defragment -- {QuoteSingle(realPath)}")
: RunProcessDirect("btrfs", ["filesystem", "defragment", "--", realPath]);
if (!ok)
{ {
return value; _logger.LogWarning("btrfs defragment (decompress) failed for {file} (exit {code}): {stderr}",
} realPath, code, stderr);
return false;
string fileName = isWine ? "/bin/bash" : "btrfs";
string command = isWine ? $"-c \"filesystem defragment -- \"{realPath}\"\"" : $"filesystem defragment -- \"{realPath}\"";
(bool processControl, bool success) = StartProcessInfo(realPath, fileName, command, out Process? proc, out string stdout);
if (!processControl && !success)
{
return value;
} }
if (!string.IsNullOrWhiteSpace(stdout)) if (!string.IsNullOrWhiteSpace(stdout))
_logger.LogTrace("btrfs defragment output for {file}: {stdout}", realPath, stdout.Trim()); _logger.LogTrace("btrfs defragment output for {file}: {stdout}", realPath, stdout.Trim());
_logger.LogInformation("Decompressed btrfs file successfully: {file}", realPath); _logger.LogInformation("Decompressed (rewritten) Btrfs file: {file}", realPath);
return true; return true;
} }
catch (Exception ex) catch (Exception ex)
@@ -386,6 +405,11 @@ public sealed class FileCompactor : IDisposable
_logger.LogWarning(ex, "Error rewriting {file} for Btrfs decompression", path); _logger.LogWarning(ex, "Error rewriting {file} for Btrfs decompression", path);
return false; return false;
} }
finally
{
if (_btrfsGate.CurrentCount < 4)
_btrfsGate.Release();
}
} }
/// <summary> /// <summary>
@@ -393,38 +417,40 @@ public sealed class FileCompactor : IDisposable
/// </summary> /// </summary>
/// <param name="path">Path of the compressed file</param> /// <param name="path">Path of the compressed file</param>
/// <returns>Decompressing state</returns> /// <returns>Decompressing state</returns>
private bool DecompressWOFFile(string path, out FileStream fs) private bool DecompressWOFFile(string path)
{ {
fs = new FileStream(path, FileMode.Open, FileAccess.ReadWrite, FileShare.Read); if (TryIsWofExternal(path, out bool isExternal, out int algo))
var handle = fs.SafeFileHandle;
if (handle.IsInvalid)
{ {
_logger.LogWarning("Invalid handle: {file}", path); if (!isExternal)
return false; {
_logger.LogTrace("Already decompressed file: {file}", path);
return true;
}
var compressString = ((CompressionAlgorithm)algo).ToString();
_logger.LogTrace("WOF compression (algo={algo}) detected for {file}", compressString, path);
} }
if (!DeviceIoControl(handle, FSCTL_DELETE_EXTERNAL_BACKING, return WithFileHandleForWOF(path, FileAccess.ReadWrite, h =>
IntPtr.Zero, 0, IntPtr.Zero, 0,
out _, IntPtr.Zero))
{ {
int err = Marshal.GetLastWin32Error(); if (!DeviceIoControl(h, FSCTL_DELETE_EXTERNAL_BACKING,
IntPtr.Zero, 0, IntPtr.Zero, 0,
out uint _, IntPtr.Zero))
{
int err = Marshal.GetLastWin32Error();
// 342 error code means its been decompressed after the control, we handle it as it succesfully been decompressed.
if (err == 342)
{
_logger.LogTrace("Successfully decompressed NTFS file {file}", path);
return true;
}
if (err == 342)
{
_logger.LogTrace("File {file} not externally backed (already decompressed)", path);
}
else
{
_logger.LogWarning("DeviceIoControl failed for {file} with Win32 error {err}", path, err); _logger.LogWarning("DeviceIoControl failed for {file} with Win32 error {err}", path, err);
return false;
} }
}
else
{
_logger.LogTrace("Successfully decompressed NTFS file {file}", path);
}
return true; _logger.LogTrace("Successfully decompressed NTFS file {file}", path);
return true;
});
} }
/// <summary> /// <summary>
@@ -455,7 +481,6 @@ public sealed class FileCompactor : IDisposable
/// <returns>Compessing state</returns> /// <returns>Compessing state</returns>
private bool WOFCompressFile(string path) private bool WOFCompressFile(string path)
{ {
FileStream? fs = null;
int size = Marshal.SizeOf<WOF_FILE_COMPRESSION_INFO_V1>(); int size = Marshal.SizeOf<WOF_FILE_COMPRESSION_INFO_V1>();
IntPtr efInfoPtr = Marshal.AllocHGlobal(size); IntPtr efInfoPtr = Marshal.AllocHGlobal(size);
@@ -464,46 +489,28 @@ public sealed class FileCompactor : IDisposable
Marshal.StructureToPtr(_efInfo, efInfoPtr, fDeleteOld: false); Marshal.StructureToPtr(_efInfo, efInfoPtr, fDeleteOld: false);
ulong length = (ulong)size; ulong length = (ulong)size;
(bool flowControl, bool value) = FileStreamOpening(path, ref fs); return WithFileHandleForWOF(path, FileAccess.ReadWrite, h =>
if (!flowControl)
{ {
return value; int ret = WofSetFileDataLocation(h, WOF_PROVIDER_FILE, efInfoPtr, length);
}
if (fs == null) // 0x80070158 is the benign "already compressed/unsupported" style return
{ if (ret != 0 && ret != unchecked((int)0x80070158))
_logger.LogWarning("Failed to open {file} for compression; skipping", path); {
return false; _logger.LogWarning("Failed to compact {file}: {ret}", path, ret.ToString("X"));
} return false;
}
var handle = fs.SafeFileHandle; return true;
});
if (handle.IsInvalid)
{
_logger.LogWarning("Invalid file handle for {file}", path);
return false;
}
int ret = WofSetFileDataLocation(handle, WOF_PROVIDER_FILE, efInfoPtr, length);
// 0x80070158 is WOF error whenever compression fails in an non-fatal way.
if (ret != 0 && ret != unchecked((int)0x80070158))
{
_logger.LogWarning("Failed to compact {file}: {ret}", path, ret.ToString("X"));
return false;
}
return true;
} }
catch (DllNotFoundException ex) catch (DllNotFoundException ex)
{ {
_logger.LogTrace(ex, "WofUtil.dll not available, this DLL is needed for compression; skipping NTFS compaction for {file}", path); _logger.LogTrace(ex, "WofUtil not available; skipping NTFS compaction for {file}", path);
return false; return false;
} }
catch (EntryPointNotFoundException ex) catch (EntryPointNotFoundException ex)
{ {
_logger.LogTrace(ex, "WOF entrypoint missing (Wine/older OS); skipping NTFS compaction for {file}", path); _logger.LogTrace(ex, "WOF entrypoint missing on this system (Wine/older OS); skipping NTFS compaction for {file}", path);
return false; return false;
} }
catch (Exception ex) catch (Exception ex)
@@ -513,12 +520,8 @@ public sealed class FileCompactor : IDisposable
} }
finally finally
{ {
fs?.Dispose();
if (efInfoPtr != IntPtr.Zero) if (efInfoPtr != IntPtr.Zero)
{
Marshal.FreeHGlobal(efInfoPtr); Marshal.FreeHGlobal(efInfoPtr);
}
} }
} }
@@ -549,6 +552,36 @@ public sealed class FileCompactor : IDisposable
} }
} }
/// <summary>
/// Checks if an File is compacted any WOF compression with an WOF backing
/// </summary>
/// <param name="path">Path of the file</param>
/// <returns>State of the file, if its an external (no backing) and which algorithm if detected</returns>
private static bool TryIsWofExternal(string path, out bool isExternal, out int algorithm)
{
isExternal = false;
algorithm = 0;
try
{
uint buf = (uint)Marshal.SizeOf<WOF_FILE_COMPRESSION_INFO_V1>();
int hr = WofIsExternalFile(path, out int ext, out _, out var info, ref buf);
if (hr == 0 && ext != 0)
{
isExternal = true;
algorithm = info.Algorithm;
}
return true;
}
catch (DllNotFoundException)
{
return false;
}
catch (EntryPointNotFoundException)
{
return false;
}
}
/// <summary> /// <summary>
/// Checks if an File is compacted with Btrfs compression /// Checks if an File is compacted with Btrfs compression
/// </summary> /// </summary>
@@ -558,34 +591,23 @@ public sealed class FileCompactor : IDisposable
{ {
try try
{ {
_btrfsGate.Wait(_compactionCts.Token);
bool isWine = _dalamudUtilService?.IsWine ?? false; bool isWine = _dalamudUtilService?.IsWine ?? false;
string realPath = isWine ? ToLinuxPathIfWine(path, isWine) : path; string realPath = isWine ? ToLinuxPathIfWine(path, isWine) : path;
var fi = new FileInfo(realPath); return _fragBatch.IsCompressedAsync(realPath, _compactionCts.Token).GetAwaiter().GetResult();
if (fi == null)
{
_logger.LogWarning("Failed to open {file} for checking on compression; skipping", realPath);
return false;
}
string fileName = isWine ? "/bin/bash" : "filefrag";
string command = isWine ? $"-c \"filefrag -v '{EscapeSingle(realPath)}'\"" : $"-v \"{realPath}\"";
(bool processControl, bool success) = StartProcessInfo(realPath, fileName, command, out Process? proc, out string stdout);
if (!processControl && !success)
{
return success;
}
bool compressed = stdout.Contains("flags: compressed", StringComparison.OrdinalIgnoreCase);
_logger.LogTrace("Btrfs compression check for {file}: {compressed}", realPath, compressed);
return compressed;
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogDebug(ex, "Failed to detect Btrfs compression for {file}", path); _logger.LogDebug(ex, "Failed to detect Btrfs compression for {file}", path);
return false; return false;
} }
finally
{
if (_btrfsGate.CurrentCount < 4)
_btrfsGate.Release();
}
} }
/// <summary> /// <summary>
@@ -595,8 +617,6 @@ public sealed class FileCompactor : IDisposable
/// <returns>Compessing state</returns> /// <returns>Compessing state</returns>
private bool BtrfsCompressFile(string path) private bool BtrfsCompressFile(string path)
{ {
FileStream? fs = null;
try try
{ {
bool isWine = _dalamudUtilService?.IsWine ?? false; bool isWine = _dalamudUtilService?.IsWine ?? false;
@@ -616,21 +636,22 @@ public sealed class FileCompactor : IDisposable
return true; return true;
} }
(bool flowControl, bool value) = FileStreamOpening(realPath, ref fs); if (!ProbeFileReadable(realPath))
return false;
if (!flowControl) (bool ok, string stdout, string stderr, int code) =
isWine
? RunProcessShell($"btrfs filesystem defragment -clzo -- {QuoteSingle(realPath)}")
: RunProcessDirect("btrfs", ["filesystem", "defragment", "-clzo", "--", realPath]);
if (!ok)
{ {
return value; _logger.LogWarning("btrfs defragment failed for {file} (exit {code}): {stderr}", realPath, code, stderr);
return false;
} }
string fileName = isWine ? "/bin/bash" : "btrfs"; if (!string.IsNullOrWhiteSpace(stdout))
string command = isWine ? $"-c \"btrfs filesystem defragment -czstd:1 -- \"{realPath}\"\"" : $"btrfs filesystem defragment -czstd:1 -- \"{realPath}\""; _logger.LogTrace("btrfs output for {file}: {stdout}", realPath, stdout.Trim());
(bool processControl, bool success) = StartProcessInfo(realPath, fileName, command, out Process? proc, out string stdout);
if (!processControl && !success)
{
return value;
}
if (!string.IsNullOrWhiteSpace(stdout)) if (!string.IsNullOrWhiteSpace(stdout))
_logger.LogTrace("btrfs defragment output for {file}: {stdout}", realPath, stdout.Trim()); _logger.LogTrace("btrfs defragment output for {file}: {stdout}", realPath, stdout.Trim());
@@ -648,82 +669,171 @@ public sealed class FileCompactor : IDisposable
/// <summary> /// <summary>
/// Trying opening file stream in certain amount of tries. /// Probe file if its readable for certain amount of tries.
/// </summary> /// </summary>
/// <param name="path">Path where the file is located</param> /// <param name="path">Path where the file is located</param>
/// <param name="fs">Filestream used for the function</param> /// <param name="fs">Filestream used for the function</param>
/// <returns>State of the filestream opening</returns> /// <returns>State of the filestream opening</returns>
private (bool flowControl, bool value) FileStreamOpening(string path, ref FileStream? fs) private bool ProbeFileReadable(string path)
{ {
for (int attempt = 0; attempt < _maxRetries; attempt++) for (int attempt = 0; attempt < _maxRetries; attempt++)
{ {
try try
{ {
fs = new FileStream(path, FileMode.Open, FileAccess.ReadWrite, FileShare.Read); using var _ = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite | FileShare.Delete);
break; return true;
} }
catch (IOException) catch (IOException ex)
{ {
if (attempt == _maxRetries - 1) if (attempt == _maxRetries - 1)
{ {
_logger.LogWarning("File still in use after {attempts} attempts, skipping compression for {file}", _maxRetries, path); _logger.LogWarning(ex, "File still in use after {attempts} attempts, skipping {file}", _maxRetries, path);
return (flowControl: false, value: false); return false;
}
int delay = 150 * (attempt + 1);
_logger.LogTrace(ex, "File busy, retrying in {delay}ms for {file}", delay, path);
Thread.Sleep(delay);
}
}
return false;
}
/// <summary>
/// Attempt opening file stream for WOF functions
/// </summary>
/// <param name="path">File that has to be accessed</param>
/// <param name="access">Permissions for the file</param>
/// <param name="body">Access of the file stream for the WOF function to handle.</param>
/// <returns>State of the attempt for the file</returns>
private bool WithFileHandleForWOF(string path, FileAccess access, Func<SafeFileHandle, bool> body)
{
const FileShare share = FileShare.ReadWrite | FileShare.Delete;
for (int attempt = 0; attempt < _maxRetries; attempt++)
{
try
{
using var fs = new FileStream(path, FileMode.Open, access, share);
var handle = fs.SafeFileHandle;
if (handle.IsInvalid)
{
_logger.LogWarning("Invalid file handle for {file}", path);
return false;
}
return body(handle);
}
catch (IOException ex)
{
if (attempt == _maxRetries - 1)
{
_logger.LogWarning(ex, "File still in use after {attempts} attempts, skipping {file}", _maxRetries, path);
return false;
} }
int delay = 150 * (attempt + 1); int delay = 150 * (attempt + 1);
_logger.LogTrace("File in use, retrying in {delay}ms for {file}", delay, path); _logger.LogTrace(ex, "File busy, retrying in {delay}ms for {file}", delay, path);
Thread.Sleep(delay); Thread.Sleep(delay);
} }
} }
return (flowControl: true, value: default); return false;
} }
/// <summary> /// <summary>
/// Starts an process with given Filename and Arguments /// Runs an nonshell process meant for Linux/Wine enviroments
/// </summary> /// </summary>
/// <param name="path">Path you want to use for the process (Compression is using these)</param> /// <param name="fileName">File that has to be excuted</param>
/// <param name="fileName">File of the command</param> /// <param name="args">Arguments meant for the file/command</param>
/// <param name="arguments">Arguments used for the command</param> /// <param name="workingDir">Working directory used to execute the file with/without arguments</param>
/// <param name="proc">Returns process of the given command</param> /// <param name="timeoutMs">Timeout timer for the process</param>
/// <param name="stdout">Returns output of the given command</param> /// <returns>State of the process, output of the process and error with exit code</returns>
/// <returns>Returns if the process been done succesfully or not</returns> private (bool ok, string stdout, string stderr, int exitCode) RunProcessDirect(string fileName, IEnumerable<string> args, string? workingDir = null, int timeoutMs = 60000)
private (bool processControl, bool success) StartProcessInfo(string path, string fileName, string arguments, out Process? proc, out string stdout)
{ {
var psi = new ProcessStartInfo var psi = new ProcessStartInfo(fileName)
{ {
FileName = fileName,
Arguments = arguments,
RedirectStandardOutput = true, RedirectStandardOutput = true,
RedirectStandardError = true, RedirectStandardError = true,
UseShellExecute = false, UseShellExecute = false,
CreateNoWindow = true, CreateNoWindow = true
WorkingDirectory = "/"
}; };
proc = Process.Start(psi); if (!string.IsNullOrEmpty(workingDir)) psi.WorkingDirectory = workingDir;
if (proc == null) foreach (var a in args) psi.ArgumentList.Add(a);
using var proc = Process.Start(psi);
if (proc is null) return (false, "", "failed to start process", -1);
var outTask = proc.StandardOutput.ReadToEndAsync(_compactionCts.Token);
var errTask = proc.StandardError.ReadToEndAsync(_compactionCts.Token);
if (!proc.WaitForExit(timeoutMs))
{ {
_logger.LogWarning("Failed to start {arguments} for {file}", arguments, path); try
stdout = string.Empty; {
return (processControl: false, success: false); proc.Kill(entireProcessTree: true);
}
catch
{
// Ignore this catch on the dispose
}
Task.WaitAll([outTask, errTask], 1000, _compactionCts.Token);
return (false, outTask.Result, "timeout", -1);
} }
stdout = proc.StandardOutput.ReadToEnd(); Task.WaitAll(outTask, errTask);
string stderr = proc.StandardError.ReadToEnd(); return (proc.ExitCode == 0, outTask.Result, errTask.Result, proc.ExitCode);
proc.WaitForExit();
if (proc.ExitCode != 0 && !string.IsNullOrWhiteSpace(stderr))
{
_logger.LogTrace("{arguments} exited with code {code}: {stderr}", arguments, proc.ExitCode, stderr);
return (processControl: false, success: false);
}
return (processControl: true, success: default);
} }
private static string EscapeSingle(string p) => p.Replace("'", "'\\'", StringComparison.Ordinal); /// <summary>
/// Runs an shell using '/bin/bash'/ command meant for Linux/Wine enviroments
/// </summary>
/// <param name="command">Command that has to be excuted</param>
/// <param name="timeoutMs">Timeout timer for the process</param>
/// <returns>State of the process, output of the process and error with exit code</returns>
private (bool ok, string stdout, string stderr, int exitCode) RunProcessShell(string command, int timeoutMs = 60000)
{
var psi = new ProcessStartInfo("/bin/bash")
{
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true
};
psi.ArgumentList.Add("-c");
psi.ArgumentList.Add(command);
using var proc = Process.Start(psi);
if (proc is null) return (false, "", "failed to start /bin/bash", -1);
var outTask = proc.StandardOutput.ReadToEndAsync(_compactionCts.Token);
var errTask = proc.StandardError.ReadToEndAsync(_compactionCts.Token);
if (!proc.WaitForExit(timeoutMs))
{
try
{
proc.Kill(entireProcessTree: true);
}
catch
{
// Ignore this catch on the dispose
}
Task.WaitAll([outTask, errTask], 1000, _compactionCts.Token);
return (false, outTask.Result, "timeout", -1);
}
Task.WaitAll(outTask, errTask);
return (proc.ExitCode == 0, outTask.Result, errTask.Result, proc.ExitCode);
}
/// <summary>
/// Enqueues the compaction/decompation of an filepath.
/// </summary>
/// <param name="filePath">Filepath that will be enqueued</param>
private void EnqueueCompaction(string filePath) private void EnqueueCompaction(string filePath)
{ {
// Safe-checks // Safe-checks
@@ -759,9 +869,10 @@ public sealed class FileCompactor : IDisposable
return; return;
} }
// Channel got closed, skip enqueue on file
if (!_compactionQueue.Writer.TryWrite(filePath)) if (!_compactionQueue.Writer.TryWrite(filePath))
{ {
_logger.LogTrace("Skip enqueue: compaction channel is closed {file}", filePath); _logger.LogTrace("Skip enqueue: compaction channel is/got closed {file}", filePath);
return; return;
} }
@@ -775,7 +886,11 @@ public sealed class FileCompactor : IDisposable
} }
} }
private async Task ProcessQueueAsync(CancellationToken token) /// <summary>
/// Process the queue with, meant for a worker/thread
/// </summary>
/// <param name="token">Cancellation token for the worker whenever it needs to be stopped</param>
private async Task ProcessQueueWorkerAsync(CancellationToken token)
{ {
try try
{ {
@@ -785,28 +900,20 @@ public sealed class FileCompactor : IDisposable
{ {
try try
{ {
if (token.IsCancellationRequested) token.ThrowIfCancellationRequested();
{ await _globalGate.WaitAsync(token).ConfigureAwait(false);
return;
}
if (!_lightlessConfigService.Current.UseCompactor) try
{ {
continue; if (_lightlessConfigService.Current.UseCompactor && File.Exists(filePath))
CompactFile(filePath);
} }
finally
if (!File.Exists(filePath))
{ {
_logger.LogTrace("Skip compact (missing) {file}", filePath); _globalGate.Release();
continue;
} }
CompactFile(filePath);
}
catch (OperationCanceledException)
{
return;
} }
catch (OperationCanceledException) { return; }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogWarning(ex, "Error compacting file {file}", filePath); _logger.LogWarning(ex, "Error compacting file {file}", filePath);
@@ -818,9 +925,9 @@ public sealed class FileCompactor : IDisposable
} }
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)
{ {
_logger.LogDebug("Compaction queue cancelled"); // Shutting down worker, this exception is expected
} }
} }
@@ -836,17 +943,21 @@ public sealed class FileCompactor : IDisposable
[DllImport("WofUtil.dll", SetLastError = true)] [DllImport("WofUtil.dll", SetLastError = true)]
private static extern int WofSetFileDataLocation(SafeFileHandle FileHandle, ulong Provider, IntPtr ExternalFileInfo, ulong Length); private static extern int WofSetFileDataLocation(SafeFileHandle FileHandle, ulong Provider, IntPtr ExternalFileInfo, ulong Length);
private static string QuoteSingle(string s) => "'" + s.Replace("'", "'\\''", StringComparison.Ordinal) + "'";
public void Dispose() public void Dispose()
{ {
_fragBatch?.Dispose();
_compactionQueue.Writer.TryComplete(); _compactionQueue.Writer.TryComplete();
_compactionCts.Cancel(); _compactionCts.Cancel();
try try
{ {
_compactionWorker.Wait(TimeSpan.FromSeconds(5)); Task.WaitAll([.. _workers.Where(t => t != null)], TimeSpan.FromSeconds(5));
} }
catch catch
{ {
//ignore on catch ^^ // Ignore this catch on the dispose
} }
finally finally
{ {

View File

@@ -0,0 +1,245 @@
using Microsoft.Extensions.Logging;
using System.Diagnostics;
using System.Text.RegularExpressions;
using System.Threading.Channels;
namespace LightlessSync.Services.Compression
{
/// <summary>
/// This batch service is made for the File Frag command, because of each file needing to use this command.
/// It's better to combine into one big command in batches then doing each command on each compressed call.
/// </summary>
public sealed partial class BatchFilefragService : IDisposable
{
private readonly Channel<(string path, TaskCompletionSource<bool> tcs)> _ch;
private readonly Task _worker;
private readonly bool _useShell;
private readonly ILogger _log;
private readonly int _batchSize;
private readonly TimeSpan _flushDelay;
private readonly CancellationTokenSource _cts = new();
public BatchFilefragService(bool useShell, ILogger log, int batchSize = 128, int flushMs = 25)
{
_useShell = useShell;
_log = log;
_batchSize = Math.Max(8, batchSize);
_flushDelay = TimeSpan.FromMilliseconds(Math.Max(5, flushMs));
_ch = Channel.CreateUnbounded<(string, TaskCompletionSource<bool>)>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
_worker = Task.Run(ProcessAsync, _cts.Token);
}
/// <summary>
/// Checks if the file is compressed using Btrfs using tasks
/// </summary>
/// <param name="linuxPath">Linux/Wine path given for the file.</param>
/// <param name="ct">Cancellation Token</param>
/// <returns>If it was compressed or not</returns>
public Task<bool> IsCompressedAsync(string linuxPath, CancellationToken ct = default)
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!_ch.Writer.TryWrite((linuxPath, tcs)))
{
tcs.TrySetResult(false);
return tcs.Task;
}
if (ct.CanBeCanceled)
{
var reg = ct.Register(() => tcs.TrySetCanceled(ct));
_ = tcs.Task.ContinueWith(_ => reg.Dispose(), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
return tcs.Task;
}
/// <summary>
/// Process the pending compression tasks asynchronously
/// </summary>
/// <returns>Task</returns>
private async Task ProcessAsync()
{
var reader = _ch.Reader;
var pending = new List<(string path, TaskCompletionSource<bool> tcs)>(_batchSize);
try
{
while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false))
{
if (!reader.TryRead(out var first)) continue;
pending.Add(first);
var flushAt = DateTime.UtcNow + _flushDelay;
while (pending.Count < _batchSize && DateTime.UtcNow < flushAt)
{
if (reader.TryRead(out var item))
{
pending.Add(item);
continue;
}
if ((flushAt - DateTime.UtcNow) <= TimeSpan.Zero) break;
try
{
await Task.Delay(TimeSpan.FromMilliseconds(5), _cts.Token).ConfigureAwait(false);
}
catch
{
break;
}
}
try
{
var map = await RunBatchAsync(pending.Select(p => p.path)).ConfigureAwait(false);
foreach (var (path, tcs) in pending)
{
tcs.TrySetResult(map.TryGetValue(path, out var c) && c);
}
}
catch (Exception ex)
{
_log.LogDebug(ex, "filefrag batch failed. falling back to false");
foreach (var (_, tcs) in pending)
{
tcs.TrySetResult(false);
}
}
finally
{
pending.Clear();
}
}
}
catch (OperationCanceledException)
{
//Shutting down worker, exception called
}
}
/// <summary>
/// Running the batch of each file in the queue in one file frag command.
/// </summary>
/// <param name="paths">Paths that are needed for the command building for the batch return</param>
/// <returns>Path of the file and if it went correctly</returns>
/// <exception cref="InvalidOperationException">Failing to start filefrag on the system if this exception is found</exception>
private async Task<Dictionary<string, bool>> RunBatchAsync(IEnumerable<string> paths)
{
var list = paths.Distinct(StringComparer.Ordinal).ToList();
var result = list.ToDictionary(p => p, _ => false, StringComparer.Ordinal);
ProcessStartInfo psi;
if (_useShell)
{
var inner = "filefrag -v -- " + string.Join(' ', list.Select(QuoteSingle));
psi = new ProcessStartInfo
{
FileName = "/bin/bash",
Arguments = "-c " + QuoteDouble(inner),
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true,
WorkingDirectory = "/"
};
}
else
{
psi = new ProcessStartInfo
{
FileName = "filefrag",
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true
};
psi.ArgumentList.Add("-v");
psi.ArgumentList.Add("--");
foreach (var p in list) psi.ArgumentList.Add(p);
}
using var proc = Process.Start(psi) ?? throw new InvalidOperationException("Failed to start filefrag");
var stdoutTask = proc.StandardOutput.ReadToEndAsync(_cts.Token);
var stderrTask = proc.StandardError.ReadToEndAsync(_cts.Token);
await Task.WhenAll(stdoutTask, stderrTask).ConfigureAwait(false);
try
{
await proc.WaitForExitAsync(_cts.Token).ConfigureAwait(false);
}
catch (Exception ex)
{
_log.LogWarning(ex, "Error in the batch frag service. proc = {proc}", proc);
}
var stdout = await stdoutTask.ConfigureAwait(false);
var stderr = await stderrTask.ConfigureAwait(false);
if (proc.ExitCode != 0 && !string.IsNullOrWhiteSpace(stderr))
_log.LogTrace("filefrag exited {code}: {err}", proc.ExitCode, stderr.Trim());
ParseFilefrag(stdout, result);
return result;
}
/// <summary>
/// Parsing the string given from the File Frag command into mapping
/// </summary>
/// <param name="output">Output of the process from the File Frag</param>
/// <param name="map">Mapping of the processed files</param>
private static void ParseFilefrag(string output, Dictionary<string, bool> map)
{
var reHeaderColon = ColonRegex();
var reHeaderSize = SizeRegex();
string? current = null;
using var sr = new StringReader(output);
for (string? line = sr.ReadLine(); line != null; line = sr.ReadLine())
{
var m1 = reHeaderColon.Match(line);
if (m1.Success) { current = m1.Groups[1].Value; continue; }
var m2 = reHeaderSize.Match(line);
if (m2.Success) { current = m2.Groups[1].Value; continue; }
if (current is not null && line.Contains("flags:", StringComparison.OrdinalIgnoreCase) &&
line.Contains("compressed", StringComparison.OrdinalIgnoreCase) && map.ContainsKey(current))
{
map[current] = true;
}
}
}
private static string QuoteSingle(string s) => "'" + s.Replace("'", "'\\''", StringComparison.Ordinal) + "'";
private static string QuoteDouble(string s) => "\"" + s.Replace("\\", "\\\\", StringComparison.Ordinal).Replace("\"", "\\\"", StringComparison.Ordinal).Replace("$", "\\$", StringComparison.Ordinal).Replace("`", "\\`", StringComparison.Ordinal) + "\"";
/// <summary>
/// Regex of the File Size return on the Linux/Wine systems, giving back the amount
/// </summary>
/// <returns>Regex of the File Size</returns>
[GeneratedRegex(@"^File size of (/.+?) is ", RegexOptions.ExplicitCapture | RegexOptions.CultureInvariant,matchTimeoutMilliseconds: 500)]
private static partial Regex SizeRegex();
/// <summary>
/// Regex on colons return on the Linux/Wine systems
/// </summary>
/// <returns>Regex of the colons in the given path</returns>
[GeneratedRegex(@"^(/.+?):\s", RegexOptions.ExplicitCapture | RegexOptions.CultureInvariant, matchTimeoutMilliseconds: 500)]
private static partial Regex ColonRegex();
public void Dispose()
{
_ch.Writer.TryComplete();
_cts.Cancel();
try
{
_worker.Wait(TimeSpan.FromSeconds(2), _cts.Token);
}
catch
{
// Ignore the catch in dispose
}
_cts.Dispose();
}
}
}