Dynamic Concurrency Adjustment¶
Standard Nalix concurrency limits are applied via static [PacketConcurrencyLimit] attributes. However, in high-load or multi-tenant environments, you may need to adjust these limits dynamically based on system health (CPU/Memory) or client-specific metrics.
Learning Signals
- Level: Advanced
- Time: 15 minutes
- Prerequisites: Concurrency Gate API
1. System Architecture Clarification¶
It is important to distinguish between the two concurrency systems in Nalix:
- TaskManager Concurrency: Manages the global background worker thread pool and recurring tasks. It uses its own internal gates to prevent thread starvation.
- Runtime Concurrency (ConcurrencyGate): Manages per-opcode handler execution limits. This is what
ConcurrencyMiddlewareuses to protect your business logic.
While these systems are independent, a Custom Transport Policy acts as a bridge, allowing the runtime to adjust its opcode-level limits based on the overall health of the system (monitored by TaskManager or other providers).
2. The "Advance & Retreat" Pattern¶
Adjusting a SemaphoreSlim (the engine behind the Concurrency Gate) at runtime requires care to avoid deadlocks. Nalix uses a pattern called Advance & Retreat:
- Advance (Scaling Up): Call
Release(n)to instantly add slots to the semaphore. - Retreat (Scaling Down): Use a non-blocking
Wait(0)loop to "capture" free slots. If slots are currently in use, we "retreat" partially and try again later, ensuring we never block the caller.
Core Logic Snippet¶
// Scaling Up
if (delta > 0) {
_semaphore.Release(delta);
}
// Scaling Down
else if (delta < 0) {
for (int i = 0; i < -delta; i++) {
if (!_semaphore.Wait(0)) break; // Stop if no free slots to capture
}
}
3. Designing a Custom Dynamic Policy¶
Here is a complete example of a middleware that lowers the concurrency limit of a specific opcode when the system CPU usage exceeds a threshold.
using Nalix.Abstractions.Middleware;
using Nalix.Abstractions.Networking.Packets;
using Nalix.Framework.Tasks;
using Nalix.Runtime.Throttling;
public class CpuAdaptiveConcurrencyMiddleware : IPacketMiddleware<IPacket>
{
private readonly TaskManager _taskManager;
private readonly ConcurrencyGate _gate;
private int _currentLimit = 100;
public CpuAdaptiveConcurrencyMiddleware(TaskManager taskManager, ConcurrencyGate gate)
{
_taskManager = taskManager;
_gate = gate;
}
public async ValueTask InvokeAsync(IPacketContext<IPacket> context, Func<CancellationToken, ValueTask> next)
{
// 1. Obtain system health metrics (CPU, ThreadPool load, etc.)
// TaskManager properties provide a direct source for these metrics.
int running = _taskManager.PeakRunningWorkerCount;
// 2. Determine if we are under heavy load
bool isUnderPressure = running > 100; // Example metric
// 3. Calculate dynamic limit
int targetLimit = isUnderPressure ? 5 : 50;
// 4. Apply adjustment (Advance/Retreat) to the specific opcode entry
// This effectively "wraps" the ConcurrencyGate with dynamic intelligence.
// ... adjustment logic ...
await next(context.CancellationToken);
}
}
4. Integration with Concurrency Gates¶
Because Nalix's ConcurrencyGate caches entries by opcode, the most effective way to implement dynamic limits is to:
- Calculate the limit in your custom middleware.
- Modify the Attribute: Replace the
PacketConcurrencyLimitAttributein thecontext.Attributescollection. - Trigger Re-creation: If the limit needs to change significantly, your policy can "evict" the old gate entry (if you have an extension point) or use the middleware to manually handle the
SemaphoreSlim.
Example: Per-Client Dynamic Budget¶
You can also use this pattern to implement Fairness Policies, where "VIP" clients get a larger concurrency share than "Guest" clients, even for the same opcode.
public async ValueTask InvokeAsync(IPacketContext<IPacket> context, Func<CancellationToken, ValueTask> next)
{
bool isVip = context.Connection.RemoteIdentity.IsAuthenticated;
// Dynamically assign a limit based on identity
context.Attributes.ConcurrencyLimit = new PacketConcurrencyLimitAttribute(
max: isVip ? 50 : 5,
queue: true,
queueMax: 10
);
await next(context.CancellationToken);
}
Best Practices¶
- Hysteresis: Don't adjust limits on every single packet. Use a "streak" or a "cool-down" period (e.g., only adjust once every 5 seconds) to avoid oscillation.
- Graceful Retreat: When scaling down, never dispose the semaphore while requests are active. Use the
Wait(0)pattern to gradually lower the ceiling. - Observability: Always log when a dynamic policy triggers an adjustment so you can correlate it with performance spikes.