Skip to main content
The Adaptive Rate Limiter uses a reservation system to manage concurrency and rate limits. This document covers the local storage layer for reservation state.

Imports

from adaptive_rate_limiter.reservation import ReservationContext, ReservationTracker

Architecture

The reservation system uses a Two-Layer Architecture:
  1. Local Layer (ReservationTracker):
    • In-memory tracking within the application process.
    • Fast O(1) lookups.
    • LOCAL-ONLY: Does NOT call backends directly.
    • Manages the lifecycle of local request objects.
    • Handles cleanup of stale local state via background task.
  2. Distributed Layer (Backend):
    • Redis-based tracking (when using RedisBackend).
    • Manages global concurrency and token quotas.
    • Handles “Orphan Recovery” for crashed instances.
    • Strategy layer (e.g., IntelligentModeStrategy) handles backend release calls.
The IntelligentModeStrategy creates its own internal ReservationTracker using a composition pattern. The tracker is a pure local storage layer.

ReservationContext

The ReservationContext dataclass holds reservation metadata:
from adaptive_rate_limiter.reservation import ReservationContext

context = ReservationContext(
    reservation_id="res_abc123",    # Required - unique identifier
    bucket_id="shared_tier:chat",   # Required - rate limit bucket
    estimated_tokens=500,           # Required - tokens reserved
    # created_at auto-populated with time.time()
)
FieldTypeDescription
reservation_idstrUnique identifier for this reservation
bucket_idstrRate limit bucket (e.g., "shared_tier:chat")
estimated_tokensintNumber of tokens reserved
created_atfloatTimestamp (auto-populated via time.time())

ReservationTracker

The ReservationTracker class provides local storage for reservation state. It does not interact with backends—that responsibility belongs to the strategy layer.

Configuration

from adaptive_rate_limiter.reservation import ReservationTracker

tracker = ReservationTracker(
    max_reservations=10000,         # Maximum stored reservations
    max_reservation_age=240,        # Seconds before stale (default: 240s / 4 min)
    stale_cleanup_interval=60,      # Background cleanup interval (default: 60s)
)

Lifecycle

The ReservationTracker runs a background cleanup task that must be explicitly started and stopped. Failure to call start() will prevent stale reservation cleanup, causing memory leaks in long-running applications.

async start()

Starts the background cleanup task. This method is idempotent—calling it multiple times has no effect.
await tracker.start()  # Start background cleanup

async stop()

Stops the background cleanup task. Call this during application shutdown to ensure clean termination.
await tracker.stop()  # Stop background cleanup

Proper Usage Pattern

from adaptive_rate_limiter.reservation import ReservationTracker

tracker = ReservationTracker()

# Start the background cleanup task
await tracker.start()

try:
    # Use the tracker...
    await tracker.store(
        request_id="req_123",
        bucket_id="shared_tier:chat",
        reservation_id="res_abc",
        estimated_tokens=500
    )
    # ... application logic ...
finally:
    # Always stop on shutdown
    await tracker.stop()

Key Features

  • Compound Key Indexing: Stores reservations using (request_id, bucket_id) tuples for precise lookups.
  • Secondary Index: Maintains a mapping of request_id -> Set[(request_id, bucket_id)] to allow clearing all reservations for a single request (e.g., on timeout).
  • Stale Cleanup: A background task periodically removes reservations that have exceeded their max_reservation_age to prevent memory leaks.

Methods

Store a Reservation

await tracker.store(request_id, bucket_id, reservation_id, estimated_tokens)
Stores a reservation context for later retrieval. Raises: ReservationCapacityError if the tracker is at maximum capacity (as configured by max_reservations).
from adaptive_rate_limiter.exceptions import ReservationCapacityError

try:
    await tracker.store(
        request_id="req_123",
        bucket_id="shared_tier:chat",
        reservation_id="res_abc",
        estimated_tokens=500
    )
except ReservationCapacityError as e:
    # Handle capacity limit - the tracker is full
    # Consider: increase max_reservations, reduce reservation age, or implement backpressure
    logger.warning("Reservation tracker at capacity: %s", e)
Before raising the error, store() attempts to clean up stale reservations. The error is only raised if cleanup doesn’t free sufficient capacity.

Get Without Clearing (for Streaming)

context = await tracker.get(request_id, bucket_id)
Retrieves the reservation context without removing it. Use this when a streaming wrapper takes ownership and needs the context to remain available.

Get and Clear Atomically (for Completion)

# With bucket_id (precise lookup)
context = await tracker.get_and_clear(request_id, bucket_id)

# Without bucket_id (optional)
context = await tracker.get_and_clear(request_id)
Retrieves and atomically removes the reservation context. Use this when a request completes and the reservation should be released.

Clear All for Error Recovery

contexts = await tracker.clear_all_for_request(request_id)
Clears all reservations associated with a request_id. Returns a list of all cleared ReservationContext objects. Use this for error recovery scenarios where all reservations for a failed request must be released.

Maintenance Methods

get_and_clear_stale()

Retrieves all stale reservation entries and removes them from tracking. Returns a list of stale entries that exceeded their TTL.

compact_heap()

Optimizes the internal min-heap by removing stale entries. Call this periodically in long-running applications to prevent memory growth.

stale_entry_ratio (property)

Returns the current ratio of stale entries to total entries in the heap. Useful for monitoring and deciding when to trigger compaction.

Monitoring

The ReservationTracker provides properties to monitor its current state:
PropertyTypeDescription
reservation_countintCurrent number of tracked reservations
request_countintCurrent number of unique requests with reservations
# Check current tracker state
print(f"Reservations: {tracker.reservation_count}")
print(f"Unique requests: {tracker.request_count}")

# Monitor for capacity issues (store max_reservations when creating the tracker)
MAX_RESERVATIONS = 10000
tracker = ReservationTracker(max_reservations=MAX_RESERVATIONS)

if tracker.reservation_count > MAX_RESERVATIONS * 0.9:
    logger.warning("Reservation tracker at 90%% capacity")

Usage Pattern

from adaptive_rate_limiter.reservation import ReservationTracker
from adaptive_rate_limiter.exceptions import ReservationCapacityError

tracker = ReservationTracker(max_reservations=10000)

# Start the background cleanup task
await tracker.start()

try:
    # Store when capacity is reserved
    await tracker.store(
        request_id="req_123",
        bucket_id="shared_tier:chat",
        reservation_id="res_abc",
        estimated_tokens=500
    )

    # For streaming: get without clearing (wrapper takes ownership)
    context = await tracker.get("req_123", "shared_tier:chat")

    # For completion: get and clear atomically
    context = await tracker.get_and_clear("req_123", "shared_tier:chat")

    # For error recovery: clear all reservations for a request
    contexts = await tracker.clear_all_for_request("req_123")
finally:
    # Always stop on shutdown
    await tracker.stop()

Distributed Tracking & Orphan Recovery

In a distributed system, a worker might crash after reserving capacity but before releasing it. This creates “orphaned” reservations that permanently consume quota. The RedisBackend implements an Orphan Recovery mechanism:
  1. In-Flight Tracking: Every reservation is recorded in a “pending” set in Redis with a timestamp.
  2. Recovery Task: A background task runs on every instance (leader-elected or randomized) to scan for pending reservations older than the max_request_timeout.
  3. Reclamation: Expired reservations are assumed to be from crashed workers and are automatically released, returning their capacity to the pool.

Drift Correction

The system also handles Clock Drift and State Erasure:
  • Drift Correction: Uses sequence numbers to order requests and updates.
  • State Erasure Prevention: When updating limits from API headers, the system accounts for “in-flight” requests that haven’t yet been reflected in the server’s response headers. This prevents the local state from “jumping back” and ignoring recent consumption.