using Microsoft.Extensions.Logging; using System.Text.RegularExpressions; using System.Threading.Channels; namespace LightlessSync.Services.Compactor { /// /// This batch service is made for the File Frag command, because of each file needing to use this command. /// It's better to combine into one big command in batches then doing each command on each compressed call. /// public sealed partial class BatchFilefragService : IDisposable { private readonly Channel<(string path, TaskCompletionSource tcs)> _ch; private readonly Task _worker; private readonly bool _useShell; private readonly ILogger _log; private readonly int _batchSize; private readonly TimeSpan _flushDelay; private readonly CancellationTokenSource _cts = new(); public delegate (bool ok, string stdout, string stderr, int exitCode) RunDirect(string fileName, IEnumerable args, string? workingDir, int timeoutMs); private readonly RunDirect _runDirect; public delegate (bool ok, string stdout, string stderr, int exitCode) RunShell(string command, string? workingDir, int timeoutMs); private readonly RunShell _runShell; public BatchFilefragService(bool useShell, ILogger log, int batchSize = 128, int flushMs = 25, RunDirect? runDirect = null, RunShell? runShell = null) { _useShell = useShell; _log = log; _batchSize = Math.Max(8, batchSize); _flushDelay = TimeSpan.FromMilliseconds(Math.Max(5, flushMs)); _ch = Channel.CreateUnbounded<(string, TaskCompletionSource)>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false }); // require runners to be setup, wouldnt start otherwise if (runDirect is null || runShell is null) throw new ArgumentNullException(nameof(runDirect), "Provide process runners from FileCompactor"); _runDirect = runDirect; _runShell = runShell; _worker = Task.Run(ProcessAsync, _cts.Token); } /// /// Checks if the file is compressed using Btrfs using tasks /// /// Linux/Wine path given for the file. /// Cancellation Token /// If it was compressed or not public Task IsCompressedAsync(string linuxPath, CancellationToken ct = default) { var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); if (!_ch.Writer.TryWrite((linuxPath, tcs))) { tcs.TrySetResult(false); return tcs.Task; } if (ct.CanBeCanceled) { var reg = ct.Register(() => tcs.TrySetCanceled(ct)); _ = tcs.Task.ContinueWith(_ => reg.Dispose(), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } return tcs.Task; } /// /// Process the pending compression tasks asynchronously /// /// Task private async Task ProcessAsync() { var reader = _ch.Reader; var pending = new List<(string path, TaskCompletionSource tcs)>(_batchSize); try { while (await reader.WaitToReadAsync(_cts.Token).ConfigureAwait(false)) { if (!reader.TryRead(out var first)) continue; pending.Add(first); var flushAt = DateTime.UtcNow + _flushDelay; while (pending.Count < _batchSize && DateTime.UtcNow < flushAt) { if (reader.TryRead(out var item)) { pending.Add(item); continue; } if ((flushAt - DateTime.UtcNow) <= TimeSpan.Zero) break; try { await Task.Delay(TimeSpan.FromMilliseconds(5), _cts.Token).ConfigureAwait(false); } catch { break; } } try { var map = RunBatch(pending.Select(p => p.path)); foreach (var (path, tcs) in pending) { tcs.TrySetResult(map.TryGetValue(path, out var c) && c); } } catch (Exception ex) { _log.LogDebug(ex, "filefrag batch failed. falling back to false"); foreach (var (_, tcs) in pending) { tcs.TrySetResult(false); } } finally { pending.Clear(); } } } catch (OperationCanceledException) { //Shutting down worker, exception called } } /// /// Running the batch of each file in the queue in one file frag command. /// /// Paths that are needed for the command building for the batch return /// Path of the file and if it went correctly /// Failing to start filefrag on the system if this exception is found private Dictionary RunBatch(IEnumerable paths) { var list = paths.Distinct(StringComparer.Ordinal).ToList(); var result = list.ToDictionary(p => p, _ => false, StringComparer.Ordinal); (bool ok, string stdout, string stderr, int code) res; if (_useShell) { var inner = "filefrag -v -- " + string.Join(' ', list.Select(QuoteSingle)); res = _runShell(inner, timeoutMs: 15000, workingDir: "/"); } else { var args = new List { "-v", "--" }; args.AddRange(list); res = _runDirect("filefrag", args, workingDir: "/", timeoutMs: 15000); } if (!string.IsNullOrWhiteSpace(res.stderr)) _log.LogTrace("filefrag stderr (batch): {err}", res.stderr.Trim()); ParseFilefrag(res.stdout, result); return result; } /// /// Parsing the string given from the File Frag command into mapping /// /// Output of the process from the File Frag /// Mapping of the processed files private static void ParseFilefrag(string output, Dictionary map) { var reHeaderColon = ColonRegex(); var reHeaderSize = SizeRegex(); string? current = null; using var sr = new StringReader(output); for (string? line = sr.ReadLine(); line != null; line = sr.ReadLine()) { var m1 = reHeaderColon.Match(line); if (m1.Success) { current = m1.Groups[1].Value; continue; } var m2 = reHeaderSize.Match(line); if (m2.Success) { current = m2.Groups[1].Value; continue; } if (current is not null && line.Contains("flags:", StringComparison.OrdinalIgnoreCase) && line.Contains("compressed", StringComparison.OrdinalIgnoreCase) && map.ContainsKey(current)) { map[current] = true; } } } private static string QuoteSingle(string s) => "'" + s.Replace("'", "'\\''", StringComparison.Ordinal) + "'"; /// /// Regex of the File Size return on the Linux/Wine systems, giving back the amount /// /// Regex of the File Size [GeneratedRegex(@"^File size of (/.+?) is ", RegexOptions.ExplicitCapture | RegexOptions.CultureInvariant, matchTimeoutMilliseconds: 500)] private static partial Regex SizeRegex(); /// /// Regex on colons return on the Linux/Wine systems /// /// Regex of the colons in the given path [GeneratedRegex(@"^(/.+?):\s", RegexOptions.ExplicitCapture | RegexOptions.CultureInvariant, matchTimeoutMilliseconds: 500)] private static partial Regex ColonRegex(); public void Dispose() { _ch.Writer.TryComplete(); _cts.Cancel(); try { _worker.Wait(TimeSpan.FromSeconds(2), _cts.Token); } catch { // Ignore the catch in dispose } _cts.Dispose(); } } }