Skip to main content
The Adaptive Rate Limiter provides first-class support for streaming responses, which are common in AI/LLM applications. Streaming presents a unique challenge for rate limiting because the final token usage is not known until the stream completes.

Automatic Streaming Detection

When using submit_request() with intelligent mode, streaming responses are automatically detected and wrapped. You don’t need to set any special flags—the scheduler handles everything transparently.

Detection Mechanism

The scheduler detects streaming responses by checking if the response:
  1. Is an async generator
  2. Has stream=True attribute
  3. Has an __aiter__ method
When any of these conditions are met, the response is automatically wrapped with rate-limit-aware tracking.

How It Works

The library uses a Reservation and Refund model for streaming requests:
  1. Detection: Response is checked for async generator, stream=True, or __aiter__
  2. Wrapping: Iterator is wrapped with RateLimitedAsyncIterator
  3. Token Extraction: Extracts usage.total_tokens from the final chunk
  4. Capacity Release: Releases capacity with refund (reserved - actual tokens)
  5. Cleanup: Background task cleans stale streams after 5 minutes of inactivity
The 5-minute cleanup interval is configurable via reservation_ttl in StateConfig (default: 300 seconds). See Configuration for details.
Token extraction requires stream_options.include_usage=True in your API call (for OpenAI-compatible APIs).

Components

Imports

from adaptive_rate_limiter import StreamingResponseProtocol, RequestMetadata
from adaptive_rate_limiter.scheduler import Scheduler, RateLimiterConfig, SchedulerMode
from adaptive_rate_limiter.streaming import (
    StreamingReservationContext,
    RateLimitedAsyncIterator,
    StreamingInFlightEntry,
    StreamingInFlightTracker,
)

StreamingReservationContext

The StreamingReservationContext tracks the lifecycle of a streaming request. It holds:
FieldTypeDescription
reservation_idstrUnique identifier for this reservation
bucket_idstrThe rate limit bucket this reservation belongs to
request_idstrThe original request ID
reserved_tokensintNumber of tokens reserved at request start
backendBaseBackendReference to the rate limit backend for release
created_atfloatTimestamp when the reservation was created (default: time.time())
final_tokensOptional[int]Actual token count extracted from stream completion (runtime tracking)
chunk_countintNumber of chunks received (runtime tracking, default: 0)
last_chunk_atOptional[float]Timestamp of the last chunk received (runtime tracking)
metrics_callbackOptional[MetricsCallback]Optional callback to record completion metrics
error_metrics_callbackOptional[ErrorMetricsCallback]Optional callback to record error metrics
Error Callback Signature:
ErrorMetricsCallback = Callable[[Optional[str]], None]
# Called when streaming errors occur
# Parameter: bucket_id (Optional[str]) - the rate limit bucket, or None if unknown
Methods:

RateLimitedAsyncIterator

The RateLimitedAsyncIterator wraps the async iterator returned by your HTTP client. It intercepts the stream iteration to handle rate limit logic transparently.

StreamingInFlightTracker

The StreamingInFlightTracker runs a background cleanup task that releases capacity for hung or abandoned streams after a configurable timeout (default: 5 minutes via StateConfig.reservation_ttl). This prevents capacity leaks when streams fail unexpectedly.

StreamingResponseProtocol

StreamingResponseProtocol defines the interface for streaming responses. While this protocol is defined in the library, intelligent mode currently uses direct attribute checks (_iterator, __aiter__) for detection rather than protocol matching. The protocol exists for future use and type annotations.

Refund Mechanism

The refund logic ensures that you don’t over-consume your rate limits due to conservative estimates.
Refund = Reserved Tokens - Actual Tokens

Refund Flow Example

  1. Reserve estimated tokens (e.g., 4000) at request start
  2. Stream completes, final chunk contains usage.total_tokens: 42
  3. Calculate refund: 4000 - 42 = 3958 tokens
  4. Return 3958 tokens to the capacity pool

Refund Outcomes

  • Actual < Reserved: The difference is added back to remaining_tokens.
  • Actual >= Reserved: No refund is given (the library handles this gracefully).

Fallback Behavior

If the stream fails or if token usage information cannot be extracted from the response, the library uses a conservative fallback:
  • Actual Tokens is assumed to be equal to Reserved Tokens.
  • Refund is 0.
This prevents accidental over-consumption by assuming the worst-case scenario (that the request used the full reservation).

Usage

Streaming works automatically with intelligent mode. Simply use submit_request() with a streaming API call:
from adaptive_rate_limiter import RequestMetadata
from adaptive_rate_limiter.scheduler import Scheduler, RateLimiterConfig, SchedulerMode

# Define your streaming function
async def my_streaming_fn():
    return await your_client.create_stream(...)

# Create a scheduler with intelligent mode
config = RateLimiterConfig(mode=SchedulerMode.INTELLIGENT)

async with Scheduler(client=my_client, config=config) as scheduler:
    # Create request metadata
    metadata = RequestMetadata(
        request_id="stream-1",
        model_id="my-model",
        resource_type="text",
        estimated_tokens=1000  # Initial reservation estimate
    )

    # Submit the request - streaming is detected automatically
    stream = await scheduler.submit_request(
        metadata=metadata,
        request_func=my_streaming_fn
    )

    # Consume the stream normally
    async for chunk in stream:
        print(chunk)

    # BEHIND THE SCENES (automatic):
    # 1. Stream completes
    # 2. Wrapper extracts usage from final chunk
    # 3. Wrapper calls backend.release_streaming_reservation()
    # 4. Unused tokens are returned to the pool

OpenAI Example

For OpenAI-compatible APIs, enable usage reporting in the stream.
The Scheduler requires a client that implements ClientProtocol (providing base_url, timeout, and get_headers()). Since the OpenAI SDK client doesn’t implement this protocol directly, you need a simple wrapper:
from typing import Dict
from adaptive_rate_limiter import ClientProtocol, RequestMetadata
from adaptive_rate_limiter.scheduler import Scheduler, RateLimiterConfig, SchedulerMode
from openai import AsyncOpenAI


class OpenAIClientWrapper:
    """Wrapper to make OpenAI client compatible with ClientProtocol."""

    def __init__(self, client: AsyncOpenAI, timeout: float = 60.0):
        self._client = client
        self._timeout = timeout

    @property
    def base_url(self) -> str:
        return str(self._client.base_url)

    @property
    def timeout(self) -> float:
        return self._timeout

    def get_headers(self) -> Dict[str, str]:
        return {"Authorization": "Bearer ***"}  # Masked for debugging


# Create the OpenAI client and wrap it
openai_client = AsyncOpenAI()
wrapped_client = OpenAIClientWrapper(openai_client)


async def my_streaming_fn():
    return await openai_client.chat.completions.create(
        model="gpt-5",
        messages=[{"role": "user", "content": "Hello!"}],
        stream=True,
        stream_options={"include_usage": True}  # Required for token extraction
    )


# Create scheduler with intelligent mode
config = RateLimiterConfig(mode=SchedulerMode.INTELLIGENT)

async with Scheduler(client=wrapped_client, config=config) as scheduler:
    # Create request metadata
    metadata = RequestMetadata(
        request_id="openai-stream-1",
        model_id="gpt-5",
        resource_type="text",
        estimated_tokens=4000
    )

    # Submit the streaming request
    stream = await scheduler.submit_request(
        metadata=metadata,
        request_func=my_streaming_fn
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")

Using the Factory Function

Alternatively, use the create_scheduler() factory function for simpler setup:
from adaptive_rate_limiter import RequestMetadata
from adaptive_rate_limiter.scheduler import create_scheduler

# Create scheduler using factory function
scheduler = create_scheduler(
    client=my_client,
    mode="intelligent"  # Accepts string: "basic", "intelligent", or "account"
)

async with scheduler:
    metadata = RequestMetadata(
        request_id="factory-stream-1",
        model_id="gpt-5",
        resource_type="text",
        estimated_tokens=2000
    )

    stream = await scheduler.submit_request(
        metadata=metadata,
        request_func=my_streaming_fn
    )

    async for chunk in stream:
        process(chunk)

Advanced Types

These types are exported for advanced use cases but most users won’t need them directly.

RateLimitedAsyncIterator

An async iterator wrapper that tracks token usage from streaming responses.
from adaptive_rate_limiter.streaming import RateLimitedAsyncIterator

StreamingInFlightEntry

A dataclass representing an in-flight streaming request entry.
from adaptive_rate_limiter.streaming import StreamingInFlightEntry

StreamingInFlightTracker

Tracks streaming requests that are currently in progress. Used internally by the scheduler.
from adaptive_rate_limiter.streaming import StreamingInFlightTracker
These types are primarily used internally by IntelligentModeStrategy. Most users should use StreamingReservationContext for streaming support.

Best Practices

  1. Set stream_options.include_usage=True for accurate token accounting
  2. Estimate conservatively with estimated_tokens to avoid exceeding rate limits
  3. Let streams complete naturally to trigger the refund mechanism
  4. Use intelligent mode for automatic streaming support

Troubleshooting

No Refund Occurring

If you’re not seeing refunds:
  • Ensure stream_options.include_usage=True is set in your API call
  • Verify the final chunk contains usage.total_tokens
  • Check that the stream completes normally (not cancelled or errored)

Capacity Leaks

If capacity isn’t being released:
  • The StreamingInFlightTracker will clean up stale streams after the configured timeout (default: 5 minutes)
  • Ensure you’re consuming the entire stream (not abandoning it mid-stream)
  • Check logs for cleanup activity