changes
This commit is contained in:
220
LightlessSync/Services/PairProcessingLimiter.cs
Normal file
220
LightlessSync/Services/PairProcessingLimiter.cs
Normal file
@@ -0,0 +1,220 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using LightlessSync.LightlessConfiguration;
|
||||
using LightlessSync.Services.Mediator;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace LightlessSync.Services;
|
||||
|
||||
public sealed class PairProcessingLimiter : DisposableMediatorSubscriberBase
|
||||
{
|
||||
private const int HardLimit = 32;
|
||||
private readonly LightlessConfigService _configService;
|
||||
private readonly object _limitLock = new();
|
||||
private readonly SemaphoreSlim _semaphore;
|
||||
private int _currentLimit;
|
||||
private int _pendingReductions;
|
||||
private int _waiting;
|
||||
private int _inFlight;
|
||||
|
||||
public PairProcessingLimiter(ILogger<PairProcessingLimiter> logger, LightlessMediator mediator, LightlessConfigService configService)
|
||||
: base(logger, mediator)
|
||||
{
|
||||
_configService = configService;
|
||||
_currentLimit = CalculateLimit();
|
||||
var initialCount = _configService.Current.EnablePairProcessingLimiter ? _currentLimit : HardLimit;
|
||||
_semaphore = new SemaphoreSlim(initialCount, HardLimit);
|
||||
|
||||
Mediator.Subscribe<PairProcessingLimitChangedMessage>(this, _ => UpdateSemaphoreLimit());
|
||||
}
|
||||
|
||||
public ValueTask<IAsyncDisposable> AcquireAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return WaitInternalAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public PairProcessingLimiterSnapshot GetSnapshot()
|
||||
{
|
||||
lock (_limitLock)
|
||||
{
|
||||
var enabled = IsEnabled;
|
||||
var limit = enabled ? _currentLimit : CalculateLimit();
|
||||
var waiting = Math.Max(0, Volatile.Read(ref _waiting));
|
||||
var inFlight = Math.Max(0, Volatile.Read(ref _inFlight));
|
||||
return new PairProcessingLimiterSnapshot(enabled, limit, inFlight, waiting);
|
||||
}
|
||||
}
|
||||
|
||||
private bool IsEnabled => _configService.Current.EnablePairProcessingLimiter;
|
||||
|
||||
private async ValueTask<IAsyncDisposable> WaitInternalAsync(CancellationToken token)
|
||||
{
|
||||
if (!IsEnabled)
|
||||
{
|
||||
return NoopReleaser.Instance;
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _waiting);
|
||||
try
|
||||
{
|
||||
await _semaphore.WaitAsync(token).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
Interlocked.Decrement(ref _waiting);
|
||||
throw;
|
||||
}
|
||||
|
||||
Interlocked.Decrement(ref _waiting);
|
||||
|
||||
if (!IsEnabled)
|
||||
{
|
||||
_semaphore.Release();
|
||||
return NoopReleaser.Instance;
|
||||
}
|
||||
|
||||
Interlocked.Increment(ref _inFlight);
|
||||
return new Releaser(this);
|
||||
}
|
||||
|
||||
private void UpdateSemaphoreLimit()
|
||||
{
|
||||
lock (_limitLock)
|
||||
{
|
||||
var enabled = IsEnabled;
|
||||
var desiredLimit = CalculateLimit();
|
||||
|
||||
if (!enabled)
|
||||
{
|
||||
var releaseAmount = HardLimit - _semaphore.CurrentCount;
|
||||
if (releaseAmount > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
_semaphore.Release(releaseAmount);
|
||||
}
|
||||
catch (SemaphoreFullException)
|
||||
{
|
||||
// ignore, already at max
|
||||
}
|
||||
}
|
||||
|
||||
_currentLimit = desiredLimit;
|
||||
_pendingReductions = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
if (desiredLimit == _currentLimit)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (desiredLimit > _currentLimit)
|
||||
{
|
||||
var increment = desiredLimit - _currentLimit;
|
||||
var allowed = Math.Min(increment, HardLimit - _semaphore.CurrentCount);
|
||||
if (allowed > 0)
|
||||
{
|
||||
_semaphore.Release(allowed);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
var decrement = _currentLimit - desiredLimit;
|
||||
var removed = 0;
|
||||
while (removed < decrement && _semaphore.Wait(0))
|
||||
{
|
||||
removed++;
|
||||
}
|
||||
|
||||
var remaining = decrement - removed;
|
||||
if (remaining > 0)
|
||||
{
|
||||
_pendingReductions += remaining;
|
||||
}
|
||||
}
|
||||
|
||||
_currentLimit = desiredLimit;
|
||||
Logger.LogDebug("Pair processing concurrency updated to {limit} (pending reductions: {pending})", _currentLimit, _pendingReductions);
|
||||
}
|
||||
}
|
||||
|
||||
private int CalculateLimit()
|
||||
{
|
||||
var configured = _configService.Current.MaxConcurrentPairApplications;
|
||||
return Math.Clamp(configured, 1, HardLimit);
|
||||
}
|
||||
|
||||
private void ReleaseOne()
|
||||
{
|
||||
var inFlight = Interlocked.Decrement(ref _inFlight);
|
||||
if (inFlight < 0)
|
||||
{
|
||||
Interlocked.Exchange(ref _inFlight, 0);
|
||||
}
|
||||
|
||||
if (!IsEnabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
lock (_limitLock)
|
||||
{
|
||||
if (_pendingReductions > 0)
|
||||
{
|
||||
_pendingReductions--;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
_semaphore.Release();
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
base.Dispose(disposing);
|
||||
if (!disposing)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_semaphore.Dispose();
|
||||
}
|
||||
|
||||
private sealed class Releaser : IAsyncDisposable
|
||||
{
|
||||
private PairProcessingLimiter? _owner;
|
||||
|
||||
public Releaser(PairProcessingLimiter owner)
|
||||
{
|
||||
_owner = owner;
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
var owner = Interlocked.Exchange(ref _owner, null);
|
||||
owner?.ReleaseOne();
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class NoopReleaser : IAsyncDisposable
|
||||
{
|
||||
public static readonly NoopReleaser Instance = new();
|
||||
|
||||
private NoopReleaser()
|
||||
{
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public readonly record struct PairProcessingLimiterSnapshot(bool IsEnabled, int Limit, int InFlight, int Waiting)
|
||||
{
|
||||
public int Remaining => Math.Max(0, Limit - InFlight);
|
||||
}
|
||||
Reference in New Issue
Block a user