Automatic Streaming Detection
When usingsubmit_request() with intelligent mode, streaming responses are automatically detected and wrapped. You don’t need to set any special flags—the scheduler handles everything transparently.
Detection Mechanism
The scheduler detects streaming responses by checking if the response:- Is an async generator
- Has
stream=Trueattribute - Has an
__aiter__method
How It Works
The library uses a Reservation and Refund model for streaming requests:- Detection: Response is checked for async generator,
stream=True, or__aiter__ - Wrapping: Iterator is wrapped with
RateLimitedAsyncIterator - Token Extraction: Extracts
usage.total_tokensfrom the final chunk - Capacity Release: Releases capacity with refund (reserved - actual tokens)
- Cleanup: Background task cleans stale streams after 5 minutes of inactivity
The 5-minute cleanup interval is configurable via
reservation_ttl in StateConfig (default: 300 seconds). See Configuration for details.Token extraction requires
stream_options.include_usage=True in your API call (for OpenAI-compatible APIs).Components
Imports
StreamingReservationContext
TheStreamingReservationContext tracks the lifecycle of a streaming request. It holds:
| Field | Type | Description |
|---|---|---|
reservation_id | str | Unique identifier for this reservation |
bucket_id | str | The rate limit bucket this reservation belongs to |
request_id | str | The original request ID |
reserved_tokens | int | Number of tokens reserved at request start |
backend | BaseBackend | Reference to the rate limit backend for release |
created_at | float | Timestamp when the reservation was created (default: time.time()) |
final_tokens | Optional[int] | Actual token count extracted from stream completion (runtime tracking) |
chunk_count | int | Number of chunks received (runtime tracking, default: 0) |
last_chunk_at | Optional[float] | Timestamp of the last chunk received (runtime tracking) |
metrics_callback | Optional[MetricsCallback] | Optional callback to record completion metrics |
error_metrics_callback | Optional[ErrorMetricsCallback] | Optional callback to record error metrics |
record_chunk()- Record chunk receipt for activity trackingset_final_tokens(tokens)- Set final token count from usage fieldactual_tokens_for_release(property) - Tokens to use for release: actual if known, else reservedduration_seconds(property) - Stream duration from creation to now
RateLimitedAsyncIterator
TheRateLimitedAsyncIterator wraps the async iterator returned by your HTTP client. It intercepts the stream iteration to handle rate limit logic transparently.
StreamingInFlightTracker
TheStreamingInFlightTracker runs a background cleanup task that releases capacity for hung or abandoned streams after a configurable timeout (default: 5 minutes via StateConfig.reservation_ttl). This prevents capacity leaks when streams fail unexpectedly.
StreamingResponseProtocol
StreamingResponseProtocol defines the interface for streaming responses. While this protocol is defined in the library, intelligent mode currently uses direct attribute checks (_iterator, __aiter__) for detection rather than protocol matching. The protocol exists for future use and type annotations.
Refund Mechanism
The refund logic ensures that you don’t over-consume your rate limits due to conservative estimates.Refund Flow Example
- Reserve estimated tokens (e.g., 4000) at request start
- Stream completes, final chunk contains
usage.total_tokens: 42 - Calculate refund: 4000 - 42 = 3958 tokens
- Return 3958 tokens to the capacity pool
Refund Outcomes
- Actual < Reserved: The difference is added back to
remaining_tokens. - Actual >= Reserved: No refund is given (the library handles this gracefully).
Fallback Behavior
If the stream fails or if token usage information cannot be extracted from the response, the library uses a conservative fallback:Actual Tokensis assumed to be equal toReserved Tokens.- Refund is 0.
Usage
Streaming works automatically with intelligent mode. Simply usesubmit_request() with a streaming API call:
OpenAI Example
For OpenAI-compatible APIs, enable usage reporting in the stream.The Scheduler requires a client that implements
ClientProtocol (providing base_url, timeout, and get_headers()). Since the OpenAI SDK client doesn’t implement this protocol directly, you need a simple wrapper:Using the Factory Function
Alternatively, use thecreate_scheduler() factory function for simpler setup:
Advanced Types
These types are exported for advanced use cases but most users won’t need them directly.RateLimitedAsyncIterator
An async iterator wrapper that tracks token usage from streaming responses.StreamingInFlightEntry
A dataclass representing an in-flight streaming request entry.StreamingInFlightTracker
Tracks streaming requests that are currently in progress. Used internally by the scheduler.These types are primarily used internally by
IntelligentModeStrategy. Most users should use StreamingReservationContext for streaming support.Best Practices
Troubleshooting
No Refund Occurring
If you’re not seeing refunds:- Ensure
stream_options.include_usage=Trueis set in your API call - Verify the final chunk contains
usage.total_tokens - Check that the stream completes normally (not cancelled or errored)
Capacity Leaks
If capacity isn’t being released:- The
StreamingInFlightTrackerwill clean up stale streams after the configured timeout (default: 5 minutes) - Ensure you’re consuming the entire stream (not abandoning it mid-stream)
- Check logs for cleanup activity