using System; using System.Threading; using System.Threading.Tasks; using LightlessSync.LightlessConfiguration; using LightlessSync.Services.Mediator; using Microsoft.Extensions.Logging; namespace LightlessSync.Services; public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase { private const int HardLimit = 32; private readonly LightlessConfigService _configService; private readonly object _limitLock = new(); private readonly SemaphoreSlim _semaphore; private int _currentLimit; private int _pendingReductions; private int _waiting; private int _inFlight; public PairProcessingLimiter(ILogger logger, LightlessMediator mediator, LightlessConfigService configService) : base(logger, mediator) { _configService = configService; _currentLimit = CalculateLimit(); var initialCount = _configService.Current.EnablePairProcessingLimiter ? _currentLimit : HardLimit; _semaphore = new SemaphoreSlim(initialCount, HardLimit); Mediator.Subscribe(this, _ => UpdateSemaphoreLimit()); } public ValueTask AcquireAsync(CancellationToken cancellationToken) { return WaitInternalAsync(cancellationToken); } public PairProcessingLimiterSnapshot GetSnapshot() { lock (_limitLock) { var enabled = IsEnabled; var limit = enabled ? _currentLimit : CalculateLimit(); var waiting = Math.Max(0, Volatile.Read(ref _waiting)); var inFlight = Math.Max(0, Volatile.Read(ref _inFlight)); return new PairProcessingLimiterSnapshot(enabled, limit, inFlight, waiting); } } private bool IsEnabled => _configService.Current.EnablePairProcessingLimiter; private async ValueTask WaitInternalAsync(CancellationToken token) { if (!IsEnabled) { return NoopReleaser.Instance; } Interlocked.Increment(ref _waiting); try { await _semaphore.WaitAsync(token).ConfigureAwait(false); } catch { Interlocked.Decrement(ref _waiting); throw; } Interlocked.Decrement(ref _waiting); if (!IsEnabled) { _semaphore.Release(); return NoopReleaser.Instance; } Interlocked.Increment(ref _inFlight); return new Releaser(this); } private void UpdateSemaphoreLimit() { lock (_limitLock) { var enabled = IsEnabled; var desiredLimit = CalculateLimit(); if (!enabled) { var releaseAmount = HardLimit - _semaphore.CurrentCount; if (releaseAmount > 0) { try { _semaphore.Release(releaseAmount); } catch (SemaphoreFullException) { // ignore, already at max } } _currentLimit = desiredLimit; _pendingReductions = 0; return; } if (desiredLimit == _currentLimit) { return; } if (desiredLimit > _currentLimit) { var increment = desiredLimit - _currentLimit; var allowed = Math.Min(increment, HardLimit - _semaphore.CurrentCount); if (allowed > 0) { _semaphore.Release(allowed); } } else { var decrement = _currentLimit - desiredLimit; var removed = 0; while (removed < decrement && _semaphore.Wait(0)) { removed++; } var remaining = decrement - removed; if (remaining > 0) { _pendingReductions += remaining; } } _currentLimit = desiredLimit; Logger.LogDebug("Pair processing concurrency updated to {limit} (pending reductions: {pending})", _currentLimit, _pendingReductions); } } private int CalculateLimit() { var configured = _configService.Current.MaxConcurrentPairApplications; return Math.Clamp(configured, 1, HardLimit); } private void ReleaseOne() { var inFlight = Interlocked.Decrement(ref _inFlight); if (inFlight < 0) { Interlocked.Exchange(ref _inFlight, 0); } if (!IsEnabled) { return; } lock (_limitLock) { if (_pendingReductions > 0) { _pendingReductions--; return; } } _semaphore.Release(); } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (!disposing) { return; } _semaphore.Dispose(); } private sealed class Releaser : IAsyncDisposable { private PairProcessingLimiter? _owner; public Releaser(PairProcessingLimiter owner) { _owner = owner; } public ValueTask DisposeAsync() { var owner = Interlocked.Exchange(ref _owner, null); owner?.ReleaseOne(); return ValueTask.CompletedTask; } } private sealed class NoopReleaser : IAsyncDisposable { public static readonly NoopReleaser Instance = new(); private NoopReleaser() { } public ValueTask DisposeAsync() { return ValueTask.CompletedTask; } } } public readonly record struct PairProcessingLimiterSnapshot(bool IsEnabled, int Limit, int InFlight, int Waiting) { public int Remaining => Math.Max(0, Limit - InFlight); }