using System; using System.Threading; using System.Threading.Tasks; using LightlessSync.LightlessConfiguration; using LightlessSync.Services.Mediator; using Microsoft.Extensions.Logging; namespace LightlessSync.Services; public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase { private const int HardLimit = 32; private readonly LightlessConfigService _configService; private readonly object _limitLock = new(); private readonly SemaphoreSlim _semaphore; private int _currentLimit; private int _pendingReductions; private int _pendingIncrements; private int _waiting; private int _inFlight; public PairProcessingLimiter(ILogger logger, LightlessMediator mediator, LightlessConfigService configService) : base(logger, mediator) { _configService = configService; _currentLimit = CalculateLimit(); var initialCount = _configService.Current.EnablePairProcessingLimiter ? _currentLimit : HardLimit; _semaphore = new SemaphoreSlim(initialCount, HardLimit); Mediator.Subscribe(this, _ => UpdateSemaphoreLimit()); } public ValueTask AcquireAsync(CancellationToken cancellationToken) { return WaitInternalAsync(cancellationToken); } public PairProcessingLimiterSnapshot GetSnapshot() { lock (_limitLock) { var enabled = IsEnabled; var limit = enabled ? _currentLimit : CalculateLimit(); var waiting = Math.Max(0, Volatile.Read(ref _waiting)); var inFlight = Math.Max(0, Volatile.Read(ref _inFlight)); return new PairProcessingLimiterSnapshot(enabled, limit, inFlight, waiting); } } private bool IsEnabled => _configService.Current.EnablePairProcessingLimiter; private async ValueTask WaitInternalAsync(CancellationToken token) { if (!IsEnabled) { return NoopReleaser.Instance; } Interlocked.Increment(ref _waiting); try { await _semaphore.WaitAsync(token).ConfigureAwait(false); } catch { Interlocked.Decrement(ref _waiting); throw; } Interlocked.Decrement(ref _waiting); if (!IsEnabled) { TryReleaseSemaphore(); return NoopReleaser.Instance; } Interlocked.Increment(ref _inFlight); return new Releaser(this); } private void UpdateSemaphoreLimit() { lock (_limitLock) { var enabled = IsEnabled; var desiredLimit = CalculateLimit(); if (!enabled) { var releaseAmount = HardLimit - _semaphore.CurrentCount; if (releaseAmount > 0) { TryReleaseSemaphore(releaseAmount); } _currentLimit = desiredLimit; _pendingReductions = 0; _pendingIncrements = 0; return; } if (desiredLimit == _currentLimit) { return; } if (desiredLimit > _currentLimit) { var increment = desiredLimit - _currentLimit; _pendingIncrements += increment; var available = HardLimit - _semaphore.CurrentCount; var toRelease = Math.Min(_pendingIncrements, available); if (toRelease > 0 && TryReleaseSemaphore(toRelease)) { _pendingIncrements -= toRelease; } } else { var decrement = _currentLimit - desiredLimit; var removed = 0; while (removed < decrement && _semaphore.Wait(0)) { removed++; } var remaining = decrement - removed; if (remaining > 0) { _pendingReductions += remaining; } if (_pendingIncrements > 0) { var offset = Math.Min(_pendingIncrements, _pendingReductions); _pendingIncrements -= offset; _pendingReductions -= offset; } } _currentLimit = desiredLimit; Logger.LogDebug("Pair processing concurrency updated to {limit} (pending reductions: {pending})", _currentLimit, _pendingReductions); } } private int CalculateLimit() { var configured = _configService.Current.MaxConcurrentPairApplications; return Math.Clamp(configured, 1, HardLimit); } private bool TryReleaseSemaphore(int count = 1) { if (count <= 0) { return true; } try { _semaphore.Release(count); return true; } catch (SemaphoreFullException ex) { Logger.LogDebug(ex, "Attempted to release {count} pair processing slots but semaphore is already at the hard limit.", count); return false; } } private void ReleaseOne() { var inFlight = Interlocked.Decrement(ref _inFlight); if (inFlight < 0) { Interlocked.Exchange(ref _inFlight, 0); } if (!IsEnabled) { return; } lock (_limitLock) { if (_pendingReductions > 0) { _pendingReductions--; return; } if (_pendingIncrements > 0) { if (!TryReleaseSemaphore()) { return; } _pendingIncrements--; return; } } TryReleaseSemaphore(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (!disposing) { return; } _semaphore.Dispose(); } private sealed class Releaser : IAsyncDisposable { private PairProcessingLimiter? _owner; public Releaser(PairProcessingLimiter owner) { _owner = owner; } public ValueTask DisposeAsync() { var owner = Interlocked.Exchange(ref _owner, null); owner?.ReleaseOne(); return ValueTask.CompletedTask; } } private sealed class NoopReleaser : IAsyncDisposable { public static readonly NoopReleaser Instance = new(); private NoopReleaser() { } public ValueTask DisposeAsync() { return ValueTask.CompletedTask; } } } public readonly record struct PairProcessingLimiterSnapshot(bool IsEnabled, int Limit, int InFlight, int Waiting) { public int Remaining => Math.Max(0, Limit - InFlight); }