The observability module provides a comprehensive metrics and monitoring framework for the Adaptive Rate Limiter. It supports both in-memory dictionary-based metrics (for JSON export) and optional Prometheus integration for production observability.
Table of Contents
Quick Start
from adaptive_rate_limiter.observability import (
get_metrics_collector,
REQUESTS_SCHEDULED_TOTAL,
REQUESTS_COMPLETED_TOTAL,
ACTIVE_REQUESTS,
)
# Get the global metrics collector singleton
collector = get_metrics_collector()
# Increment a counter
collector.inc_counter(REQUESTS_SCHEDULED_TOTAL, labels={"bucket_id": "tier:1", "model_id": "gpt-5"})
# Update a gauge
collector.set_gauge(ACTIVE_REQUESTS, 5, labels={"bucket_id": "tier:1"})
# Get all metrics as a dictionary
metrics = collector.get_metrics()
print(metrics)
Installation
The base observability module works without any additional dependencies. For Prometheus support:
# With prometheus support
pip install adaptive-rate-limiter[metrics]
The library automatically detects if prometheus_client is available:
from adaptive_rate_limiter.observability import PROMETHEUS_AVAILABLE
if PROMETHEUS_AVAILABLE:
print("Prometheus metrics enabled!")
else:
print("Using dict-based metrics only")
Architecture Overview
The observability module consists of several components:
observability/
├── __init__.py # Public API exports (44 symbols)
├── collector.py # UnifiedMetricsCollector - core metrics engine
├── constants.py # 30+ metric name constants
├── metrics.py # StreamingMetrics and PrometheusStreamingMetrics
└── protocols.py # MetricsCollectorProtocol, PrometheusExporterProtocol
Key Design Principles:
- Dual-mode operation: All metrics work with or without Prometheus
- Protocol-based: Custom collectors can implement
MetricsCollectorProtocol
- Thread-safe: All operations are safe for concurrent access
- Cardinality protection: Automatic limits prevent label explosion
- Prometheus naming conventions: All metrics follow
adaptive_rl_ prefix
UnifiedMetricsCollector
The UnifiedMetricsCollector is the central metrics engine that supports both dict-based and Prometheus metrics simultaneously.
Initialization
from adaptive_rate_limiter.observability import (
UnifiedMetricsCollector,
get_metrics_collector,
reset_metrics_collector,
)
# Option 1: Use the global singleton (recommended)
collector = get_metrics_collector(enable_prometheus=True)
# Option 2: Create a standalone instance
collector = UnifiedMetricsCollector(enable_prometheus=True)
# For testing: reset the global singleton
reset_metrics_collector()
Counter Operations
Counters are monotonically increasing values, ideal for tracking totals like requests, errors, etc.
from adaptive_rate_limiter.observability import (
get_metrics_collector,
REQUESTS_SCHEDULED_TOTAL,
REQUESTS_COMPLETED_TOTAL,
REQUESTS_FAILED_TOTAL,
)
collector = get_metrics_collector()
# Basic increment (default: 1)
collector.inc_counter(REQUESTS_SCHEDULED_TOTAL)
# Increment by specific value
collector.inc_counter(REQUESTS_SCHEDULED_TOTAL, value=5)
# With labels for dimensional metrics
collector.inc_counter(
REQUESTS_COMPLETED_TOTAL,
labels={"bucket_id": "tier:1", "model_id": "gpt-5"}
)
# Track failures with reason
collector.inc_counter(
REQUESTS_FAILED_TOTAL,
labels={
"bucket_id": "tier:1",
"model_id": "gpt-5",
"reason": "timeout"
}
)
Gauge Operations
Gauges represent values that can go up or down, like queue depth or active connections.
from adaptive_rate_limiter.observability import (
get_metrics_collector,
ACTIVE_REQUESTS,
QUEUE_DEPTH,
RESERVATIONS_ACTIVE,
)
collector = get_metrics_collector()
# Set to a specific value
collector.set_gauge(ACTIVE_REQUESTS, 10, labels={"bucket_id": "tier:1"})
# Increment a gauge
collector.inc_gauge(ACTIVE_REQUESTS, value=1, labels={"bucket_id": "tier:1"})
# Decrement a gauge
collector.dec_gauge(ACTIVE_REQUESTS, value=1, labels={"bucket_id": "tier:1"})
# Queue depth tracking
collector.set_gauge(QUEUE_DEPTH, 25, labels={"bucket_id": "tier:1"})
Histogram Operations
Histograms track the distribution of values, allowing percentile calculations.
from adaptive_rate_limiter.observability import (
get_metrics_collector,
STREAMING_DURATION_SECONDS,
BACKEND_LATENCY_SECONDS,
)
collector = get_metrics_collector()
# Record a latency observation
collector.observe_histogram(
BACKEND_LATENCY_SECONDS,
0.045, # 45ms
labels={"operation": "read"}
)
# Track streaming duration
collector.observe_histogram(
STREAMING_DURATION_SECONDS,
125.5, # 125.5 seconds
labels={"bucket_id": "tier:1"}
)
Exporting Metrics
collector = get_metrics_collector()
# Structured dict format for JSON APIs
metrics = collector.get_metrics()
# Returns:
# {
# "counters": {
# "adaptive_rl_requests_scheduled_total": {
# "bucket_id=tier:1,model_id=gpt-5": 42,
# "": 100 # Unlabeled
# }
# },
# "gauges": {...},
# "histograms": {
# "adaptive_rl_backend_latency_seconds": {
# "operation=read": {
# "count": 150,
# "sum": 7.5,
# "avg": 0.05,
# "min": 0.01,
# "max": 0.2
# }
# }
# }
# }
# Flat dict format for simpler use cases
flat = collector.get_flat_metrics()
# Returns:
# {
# "adaptive_rl_requests_scheduled_total{bucket_id=tier:1,model_id=gpt-5}": 42,
# "adaptive_rl_active_requests{bucket_id=tier:1}": 10,
# ...
# }
Prometheus Integration
Enabling Prometheus
When prometheus_client is installed, metrics are automatically registered with Prometheus:
from adaptive_rate_limiter.observability import (
PROMETHEUS_AVAILABLE,
get_metrics_collector,
)
# Check availability
if PROMETHEUS_AVAILABLE:
# Prometheus metrics are automatically enabled
collector = get_metrics_collector(enable_prometheus=True)
# All inc_counter, set_gauge, observe_histogram calls
# update both dict-based and Prometheus metrics
else:
# Falls back to dict-based metrics only
collector = get_metrics_collector(enable_prometheus=False)
HTTP Server for Scraping
Start an HTTP server for Prometheus to scrape:
collector = get_metrics_collector()
# Start the metrics server (default: localhost only for security)
success = collector.start_http_server(host="127.0.0.1", port=9090)
if success:
print("Prometheus metrics available at http://127.0.0.1:9090/metrics")
# For container environments (Kubernetes, Docker), bind to all interfaces:
collector.start_http_server(host="0.0.0.0", port=9090)
# Check server status
if collector.server_running:
print("Metrics server is running")
Security Note: By default, the server binds to 127.0.0.1 (localhost only). For container deployments where Prometheus scrapes from outside, use 0.0.0.0 and ensure network-level security controls.
Using with Grafana
Example Prometheus queries for Grafana dashboards:
# Request rate (requests/second)
rate(adaptive_rl_requests_scheduled_total[5m])
# Success rate
sum(rate(adaptive_rl_requests_completed_total[5m])) /
sum(rate(adaptive_rl_requests_scheduled_total[5m]))
# Current queue depth
adaptive_rl_queue_depth
# P95 backend latency
histogram_quantile(0.95, rate(adaptive_rl_backend_latency_seconds_bucket[5m]))
# Streaming token refund rate
sum(adaptive_rl_streaming_tokens_refunded_total) /
sum(adaptive_rl_streaming_completions_total)
Streaming Metrics
StreamingMetrics Dataclass
The StreamingMetrics dataclass provides detailed tracking for streaming requests:
from adaptive_rate_limiter.observability import StreamingMetrics
metrics = StreamingMetrics()
# Record a successful streaming completion
metrics.record_completion(
reserved=4000, # Tokens reserved at start
actual=500, # Actual tokens consumed
extraction_succeeded=True, # Token extraction worked
bucket_id="tier:1" # Optional: per-bucket tracking
)
# Record errors and timeouts
metrics.record_error()
metrics.record_timeout()
# Record background cleanup of stale streams
metrics.record_stale_cleanup(bucket_id="tier:1")
# Record extraction fallback (used reserved as actual)
metrics.record_fallback()
# Get computed rates
print(f"Extraction success rate: {metrics.get_extraction_success_rate():.1%}")
print(f"Token refund rate: {metrics.get_refund_rate():.1%}")
# Export as JSON-serializable dict
stats = metrics.get_stats()
# {
# "streaming_completions": 1,
# "streaming_errors": 1,
# "streaming_timeouts": 1,
# "streaming_fallbacks": 0,
# "total_reserved_tokens": 4000,
# "total_actual_tokens": 500,
# "total_refunded_tokens": 3500,
# "extraction_success_rate": 1.0,
# "refund_rate": 0.875,
# "extraction_successes": 1,
# "extraction_failures": 0,
# "stale_streaming_cleanups": 1,
# "per_bucket_refunds": {"tier:1": 3500},
# "per_bucket_cleanups": {"tier:1": 1}
# }
# Reset for testing
metrics.reset()
LRU Eviction for Per-Bucket Tracking:
# Limit tracked buckets to prevent memory issues with many buckets
metrics = StreamingMetrics(max_tracked_buckets=100)
# When > 100 buckets, oldest (least recently used) are evicted
PrometheusStreamingMetrics
For Prometheus integration with streaming:
from adaptive_rate_limiter.observability import (
PROMETHEUS_AVAILABLE,
get_prometheus_streaming_metrics,
reset_prometheus_streaming_metrics,
)
if PROMETHEUS_AVAILABLE:
# Get or create singleton
prom_metrics = get_prometheus_streaming_metrics()
if prom_metrics:
# Record completion with Prometheus metrics
prom_metrics.observe_completion(
bucket_id="tier:1",
reserved=4000,
actual=500,
duration_seconds=125.5,
extraction_succeeded=True
)
# Record errors
prom_metrics.observe_error(
bucket_id="tier:1",
error_type="timeout" # or "exception", "unknown"
)
# Record stale cleanup
prom_metrics.observe_stale_cleanup(bucket_id="tier:1")
# Reset for testing
reset_prometheus_streaming_metrics()
Metric Constants Reference
All metric names use the adaptive_rl_ prefix and follow Prometheus naming conventions.
Scheduling Metrics
| Constant | Metric Name | Type | Description |
|---|
REQUESTS_SCHEDULED_TOTAL | adaptive_rl_requests_scheduled_total | Counter | Total requests scheduled |
REQUESTS_COMPLETED_TOTAL | adaptive_rl_requests_completed_total | Counter | Total requests completed successfully |
REQUESTS_FAILED_TOTAL | adaptive_rl_requests_failed_total | Counter | Total requests failed (timeout, error, rate limit) |
QUEUE_OVERFLOWS_TOTAL | adaptive_rl_queue_overflows_total | Counter | Queue overflow events (rejected due to full queue) |
SCHEDULER_LOOPS_TOTAL | adaptive_rl_scheduler_loops_total | Counter | Scheduler main loop iterations |
CIRCUIT_BREAKER_REJECTIONS_TOTAL | adaptive_rl_circuit_breaker_rejections_total | Counter | Requests rejected by circuit breaker |
REQUEST_TIMEOUTS_TOTAL | adaptive_rl_request_timeouts_total | Counter | Request timeouts |
Gauge Metrics
| Constant | Metric Name | Type | Description |
|---|
ACTIVE_REQUESTS | adaptive_rl_active_requests | Gauge | Currently active (in-flight) requests |
QUEUE_DEPTH | adaptive_rl_queue_depth | Gauge | Current queue depth |
RESERVATIONS_ACTIVE | adaptive_rl_reservations_active | Gauge | Currently active reservations |
Streaming Metrics
| Constant | Metric Name | Type | Description |
|---|
STREAMING_COMPLETIONS_TOTAL | adaptive_rl_streaming_completions_total | Counter | Streaming requests completed |
STREAMING_ERRORS_TOTAL | adaptive_rl_streaming_errors_total | Counter | Streaming requests that errored |
STREAMING_TIMEOUTS_TOTAL | adaptive_rl_streaming_timeouts_total | Counter | Streaming requests that timed out |
STREAMING_STALE_CLEANUPS_TOTAL | adaptive_rl_streaming_stale_cleanups_total | Counter | Stale streaming entries cleaned up |
STREAMING_TOKENS_REFUNDED_TOTAL | adaptive_rl_streaming_tokens_refunded_total | Counter | Tokens refunded (reserved - actual) |
STREAMING_DURATION_SECONDS | adaptive_rl_streaming_duration_seconds | Histogram | Duration of streaming requests |
Cache Metrics
| Constant | Metric Name | Type | Description |
|---|
CACHE_HITS_TOTAL | adaptive_rl_cache_hits_total | Counter | Cache hits |
CACHE_MISSES_TOTAL | adaptive_rl_cache_misses_total | Counter | Cache misses |
CACHE_EVICTIONS_TOTAL | adaptive_rl_cache_evictions_total | Counter | Cache evictions |
BACKEND_WRITES_TOTAL | adaptive_rl_backend_writes_total | Counter | Backend writes |
BACKEND_READS_TOTAL | adaptive_rl_backend_reads_total | Counter | Backend reads |
VERSION_CONFLICTS_TOTAL | adaptive_rl_version_conflicts_total | Counter | Optimistic locking version conflicts |
Reservation Metrics
| Constant | Metric Name | Type | Description |
|---|
RESERVATION_STALE_CLEANUPS_TOTAL | adaptive_rl_reservation_stale_cleanups_total | Counter | Stale reservation cleanups |
RESERVATION_EMERGENCY_CLEANUPS_TOTAL | adaptive_rl_reservation_emergency_cleanups_total | Counter | Emergency cleanups (memory pressure) |
RESERVATION_BACKPRESSURE_REJECTIONS_TOTAL | adaptive_rl_reservation_backpressure_total | Counter | Backpressure rejections |
Backend Metrics
| Constant | Metric Name | Type | Description |
|---|
BACKEND_LUA_EXECUTIONS_TOTAL | adaptive_rl_backend_lua_executions_total | Counter | Lua script executions (Redis) |
BACKEND_CONNECTION_ERRORS_TOTAL | adaptive_rl_backend_connection_errors_total | Counter | Backend connection errors |
BACKEND_LATENCY_SECONDS | adaptive_rl_backend_latency_seconds | Histogram | Backend operation latency |
Histogram Buckets
from adaptive_rate_limiter.observability import (
LATENCY_BUCKETS,
STREAMING_DURATION_BUCKETS,
TOKEN_BUCKETS,
)
# Latency buckets (seconds): 5ms to 10s
LATENCY_BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
# Streaming duration buckets (seconds): 1s to 20min
STREAMING_DURATION_BUCKETS = [1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1200.0]
# Token count buckets: 100 to 128K
TOKEN_BUCKETS = [100, 500, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000]
Protocols
The module defines protocols for creating custom metrics backends:
MetricsCollectorProtocol
from adaptive_rate_limiter.observability import MetricsCollectorProtocol
class MyCustomCollector:
"""Custom metrics collector implementing the protocol."""
def inc_counter(self, name: str, value: int = 1, labels: dict | None = None) -> None:
# Your implementation (e.g., send to StatsD)
pass
def set_gauge(self, name: str, value: float, labels: dict | None = None) -> None:
pass
def inc_gauge(self, name: str, value: float = 1.0, labels: dict | None = None) -> None:
pass
def dec_gauge(self, name: str, value: float = 1.0, labels: dict | None = None) -> None:
pass
def observe_histogram(self, name: str, value: float, labels: dict | None = None) -> None:
pass
def get_metrics(self) -> dict:
return {"counters": {}, "gauges": {}, "histograms": {}}
def reset(self) -> None:
pass
# Duck typing works!
assert isinstance(MyCustomCollector(), MetricsCollectorProtocol)
PrometheusExporterProtocol
from adaptive_rate_limiter.observability import PrometheusExporterProtocol
class MyExporter:
def start_server(self, host: str, port: int) -> None:
pass
def stop_server(self) -> None:
pass
@property
def is_running(self) -> bool:
return False
Label Best Practices
DO use categorical labels:
# Good - finite set of values
labels = {
"bucket_id": "tier:1", # Categorical: tier:1, tier:2, tier:3
"model_id": "gpt-5", # Categorical: gpt-5, claude-haiku-4.5, venice-uncensored
"reason": "timeout", # Enum: timeout, rate_limit, error
"extraction_succeeded": "true", # Boolean: true, false
}
DON’T use unbounded labels:# BAD - causes label cardinality explosion!
labels = {
"request_id": "abc-123-xyz", # Unique per request - unbounded!
"user_id": "user_42", # Unique per user - unbounded!
"timestamp": "1234567890", # Unique per second - unbounded!
}
Cardinality Protection:
The collector automatically limits label combinations to prevent memory issues:
# Default limit: 1000 unique combinations per metric
UnifiedMetricsCollector.MAX_LABEL_COMBINATIONS = 1000
# When exceeded, new combinations are dropped with a warning
# Existing combinations continue to work
Thread Safety
All components are thread-safe:
import threading
from adaptive_rate_limiter.observability import get_metrics_collector
collector = get_metrics_collector()
def worker():
for _ in range(1000):
collector.inc_counter("my_counter")
# Safe for concurrent access
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# Counter will be exactly 10000
assert collector._counters["my_counter"][""] == 10000
Implementation Details:
Advanced Usage
Custom Registry for Testing
from prometheus_client import CollectorRegistry
from adaptive_rate_limiter.observability import UnifiedMetricsCollector
# Create isolated registry for tests
test_registry = CollectorRegistry()
collector = UnifiedMetricsCollector(enable_prometheus=True, registry=test_registry)
# Use collector in tests without polluting global Prometheus registry
Legacy Metric Name Mapping
For backward compatibility:
from adaptive_rate_limiter.observability import (
LEGACY_METRIC_MAPPING,
legacy_to_prometheus_name,
)
# Convert old names to new Prometheus-style names
old_name = "requests_scheduled"
new_name = legacy_to_prometheus_name(old_name)
# Returns: "adaptive_rl_requests_scheduled_total"
# Full mapping dictionary
print(LEGACY_METRIC_MAPPING)
# {
# "requests_scheduled": "adaptive_rl_requests_scheduled_total",
# "requests_completed": "adaptive_rl_requests_completed_total",
# ...
# }
Metric Definitions
Access the pre-defined metric schemas:
from adaptive_rate_limiter.observability import MetricDefinition, METRIC_DEFINITIONS
# Get definition for a metric
defn = METRIC_DEFINITIONS["adaptive_rl_requests_scheduled_total"]
print(f"Name: {defn.name}")
print(f"Type: {defn.metric_type}") # counter, gauge, histogram
print(f"Description: {defn.description}")
print(f"Labels: {defn.label_names}") # ('bucket_id', 'model_id')
print(f"Buckets: {defn.buckets}") # For histograms only
Reset for Testing
from adaptive_rate_limiter.observability import (
reset_metrics_collector,
reset_prometheus_streaming_metrics,
)
def teardown():
# Reset singletons between tests
reset_metrics_collector()
reset_prometheus_streaming_metrics()