Skip to main content
The observability module provides a comprehensive metrics and monitoring framework for the Adaptive Rate Limiter. It supports both in-memory dictionary-based metrics (for JSON export) and optional Prometheus integration for production observability.

Table of Contents

Quick Start

from adaptive_rate_limiter.observability import (
    get_metrics_collector,
    REQUESTS_SCHEDULED_TOTAL,
    REQUESTS_COMPLETED_TOTAL,
    ACTIVE_REQUESTS,
)

# Get the global metrics collector singleton
collector = get_metrics_collector()

# Increment a counter
collector.inc_counter(REQUESTS_SCHEDULED_TOTAL, labels={"bucket_id": "tier:1", "model_id": "gpt-5"})

# Update a gauge
collector.set_gauge(ACTIVE_REQUESTS, 5, labels={"bucket_id": "tier:1"})

# Get all metrics as a dictionary
metrics = collector.get_metrics()
print(metrics)

Installation

The base observability module works without any additional dependencies. For Prometheus support:
# With prometheus support
pip install adaptive-rate-limiter[metrics]

The library automatically detects if prometheus_client is available:
from adaptive_rate_limiter.observability import PROMETHEUS_AVAILABLE

if PROMETHEUS_AVAILABLE:
    print("Prometheus metrics enabled!")
else:
    print("Using dict-based metrics only")

Architecture Overview

The observability module consists of several components:
observability/
├── __init__.py      # Public API exports (44 symbols)
├── collector.py     # UnifiedMetricsCollector - core metrics engine
├── constants.py     # 30+ metric name constants
├── metrics.py       # StreamingMetrics and PrometheusStreamingMetrics
└── protocols.py     # MetricsCollectorProtocol, PrometheusExporterProtocol
Key Design Principles:
  1. Dual-mode operation: All metrics work with or without Prometheus
  2. Protocol-based: Custom collectors can implement MetricsCollectorProtocol
  3. Thread-safe: All operations are safe for concurrent access
  4. Cardinality protection: Automatic limits prevent label explosion
  5. Prometheus naming conventions: All metrics follow adaptive_rl_ prefix

UnifiedMetricsCollector

The UnifiedMetricsCollector is the central metrics engine that supports both dict-based and Prometheus metrics simultaneously.

Initialization

from adaptive_rate_limiter.observability import (
    UnifiedMetricsCollector,
    get_metrics_collector,
    reset_metrics_collector,
)

# Option 1: Use the global singleton (recommended)
collector = get_metrics_collector(enable_prometheus=True)

# Option 2: Create a standalone instance
collector = UnifiedMetricsCollector(enable_prometheus=True)

# For testing: reset the global singleton
reset_metrics_collector()

Counter Operations

Counters are monotonically increasing values, ideal for tracking totals like requests, errors, etc.
from adaptive_rate_limiter.observability import (
    get_metrics_collector,
    REQUESTS_SCHEDULED_TOTAL,
    REQUESTS_COMPLETED_TOTAL,
    REQUESTS_FAILED_TOTAL,
)

collector = get_metrics_collector()

# Basic increment (default: 1)
collector.inc_counter(REQUESTS_SCHEDULED_TOTAL)

# Increment by specific value
collector.inc_counter(REQUESTS_SCHEDULED_TOTAL, value=5)

# With labels for dimensional metrics
collector.inc_counter(
    REQUESTS_COMPLETED_TOTAL,
    labels={"bucket_id": "tier:1", "model_id": "gpt-5"}
)

# Track failures with reason
collector.inc_counter(
    REQUESTS_FAILED_TOTAL,
    labels={
        "bucket_id": "tier:1",
        "model_id": "gpt-5",
        "reason": "timeout"
    }
)

Gauge Operations

Gauges represent values that can go up or down, like queue depth or active connections.
from adaptive_rate_limiter.observability import (
    get_metrics_collector,
    ACTIVE_REQUESTS,
    QUEUE_DEPTH,
    RESERVATIONS_ACTIVE,
)

collector = get_metrics_collector()

# Set to a specific value
collector.set_gauge(ACTIVE_REQUESTS, 10, labels={"bucket_id": "tier:1"})

# Increment a gauge
collector.inc_gauge(ACTIVE_REQUESTS, value=1, labels={"bucket_id": "tier:1"})

# Decrement a gauge
collector.dec_gauge(ACTIVE_REQUESTS, value=1, labels={"bucket_id": "tier:1"})

# Queue depth tracking
collector.set_gauge(QUEUE_DEPTH, 25, labels={"bucket_id": "tier:1"})

Histogram Operations

Histograms track the distribution of values, allowing percentile calculations.
from adaptive_rate_limiter.observability import (
    get_metrics_collector,
    STREAMING_DURATION_SECONDS,
    BACKEND_LATENCY_SECONDS,
)

collector = get_metrics_collector()

# Record a latency observation
collector.observe_histogram(
    BACKEND_LATENCY_SECONDS,
    0.045,  # 45ms
    labels={"operation": "read"}
)

# Track streaming duration
collector.observe_histogram(
    STREAMING_DURATION_SECONDS,
    125.5,  # 125.5 seconds
    labels={"bucket_id": "tier:1"}
)

Exporting Metrics

collector = get_metrics_collector()

# Structured dict format for JSON APIs
metrics = collector.get_metrics()
# Returns:
# {
#     "counters": {
#         "adaptive_rl_requests_scheduled_total": {
#             "bucket_id=tier:1,model_id=gpt-5": 42,
#             "": 100  # Unlabeled
#         }
#     },
#     "gauges": {...},
#     "histograms": {
#         "adaptive_rl_backend_latency_seconds": {
#             "operation=read": {
#                 "count": 150,
#                 "sum": 7.5,
#                 "avg": 0.05,
#                 "min": 0.01,
#                 "max": 0.2
#             }
#         }
#     }
# }

# Flat dict format for simpler use cases
flat = collector.get_flat_metrics()
# Returns:
# {
#     "adaptive_rl_requests_scheduled_total{bucket_id=tier:1,model_id=gpt-5}": 42,
#     "adaptive_rl_active_requests{bucket_id=tier:1}": 10,
#     ...
# }

Prometheus Integration

Enabling Prometheus

When prometheus_client is installed, metrics are automatically registered with Prometheus:
from adaptive_rate_limiter.observability import (
    PROMETHEUS_AVAILABLE,
    get_metrics_collector,
)

# Check availability
if PROMETHEUS_AVAILABLE:
    # Prometheus metrics are automatically enabled
    collector = get_metrics_collector(enable_prometheus=True)

    # All inc_counter, set_gauge, observe_histogram calls
    # update both dict-based and Prometheus metrics
else:
    # Falls back to dict-based metrics only
    collector = get_metrics_collector(enable_prometheus=False)

HTTP Server for Scraping

Start an HTTP server for Prometheus to scrape:
collector = get_metrics_collector()

# Start the metrics server (default: localhost only for security)
success = collector.start_http_server(host="127.0.0.1", port=9090)

if success:
    print("Prometheus metrics available at http://127.0.0.1:9090/metrics")

# For container environments (Kubernetes, Docker), bind to all interfaces:
collector.start_http_server(host="0.0.0.0", port=9090)

# Check server status
if collector.server_running:
    print("Metrics server is running")
Security Note: By default, the server binds to 127.0.0.1 (localhost only). For container deployments where Prometheus scrapes from outside, use 0.0.0.0 and ensure network-level security controls.

Using with Grafana

Example Prometheus queries for Grafana dashboards:
# Request rate (requests/second)
rate(adaptive_rl_requests_scheduled_total[5m])

# Success rate
sum(rate(adaptive_rl_requests_completed_total[5m])) /
sum(rate(adaptive_rl_requests_scheduled_total[5m]))

# Current queue depth
adaptive_rl_queue_depth

# P95 backend latency
histogram_quantile(0.95, rate(adaptive_rl_backend_latency_seconds_bucket[5m]))

# Streaming token refund rate
sum(adaptive_rl_streaming_tokens_refunded_total) /
sum(adaptive_rl_streaming_completions_total)

Streaming Metrics

StreamingMetrics Dataclass

The StreamingMetrics dataclass provides detailed tracking for streaming requests:
from adaptive_rate_limiter.observability import StreamingMetrics

metrics = StreamingMetrics()

# Record a successful streaming completion
metrics.record_completion(
    reserved=4000,       # Tokens reserved at start
    actual=500,          # Actual tokens consumed
    extraction_succeeded=True,  # Token extraction worked
    bucket_id="tier:1"   # Optional: per-bucket tracking
)

# Record errors and timeouts
metrics.record_error()
metrics.record_timeout()

# Record background cleanup of stale streams
metrics.record_stale_cleanup(bucket_id="tier:1")

# Record extraction fallback (used reserved as actual)
metrics.record_fallback()

# Get computed rates
print(f"Extraction success rate: {metrics.get_extraction_success_rate():.1%}")
print(f"Token refund rate: {metrics.get_refund_rate():.1%}")

# Export as JSON-serializable dict
stats = metrics.get_stats()
# {
#     "streaming_completions": 1,
#     "streaming_errors": 1,
#     "streaming_timeouts": 1,
#     "streaming_fallbacks": 0,
#     "total_reserved_tokens": 4000,
#     "total_actual_tokens": 500,
#     "total_refunded_tokens": 3500,
#     "extraction_success_rate": 1.0,
#     "refund_rate": 0.875,
#     "extraction_successes": 1,
#     "extraction_failures": 0,
#     "stale_streaming_cleanups": 1,
#     "per_bucket_refunds": {"tier:1": 3500},
#     "per_bucket_cleanups": {"tier:1": 1}
# }

# Reset for testing
metrics.reset()
LRU Eviction for Per-Bucket Tracking:
# Limit tracked buckets to prevent memory issues with many buckets
metrics = StreamingMetrics(max_tracked_buckets=100)

# When > 100 buckets, oldest (least recently used) are evicted

PrometheusStreamingMetrics

For Prometheus integration with streaming:
from adaptive_rate_limiter.observability import (
    PROMETHEUS_AVAILABLE,
    get_prometheus_streaming_metrics,
    reset_prometheus_streaming_metrics,
)

if PROMETHEUS_AVAILABLE:
    # Get or create singleton
    prom_metrics = get_prometheus_streaming_metrics()

    if prom_metrics:
        # Record completion with Prometheus metrics
        prom_metrics.observe_completion(
            bucket_id="tier:1",
            reserved=4000,
            actual=500,
            duration_seconds=125.5,
            extraction_succeeded=True
        )

        # Record errors
        prom_metrics.observe_error(
            bucket_id="tier:1",
            error_type="timeout"  # or "exception", "unknown"
        )

        # Record stale cleanup
        prom_metrics.observe_stale_cleanup(bucket_id="tier:1")

# Reset for testing
reset_prometheus_streaming_metrics()

Metric Constants Reference

All metric names use the adaptive_rl_ prefix and follow Prometheus naming conventions.

Scheduling Metrics

ConstantMetric NameTypeDescription
REQUESTS_SCHEDULED_TOTALadaptive_rl_requests_scheduled_totalCounterTotal requests scheduled
REQUESTS_COMPLETED_TOTALadaptive_rl_requests_completed_totalCounterTotal requests completed successfully
REQUESTS_FAILED_TOTALadaptive_rl_requests_failed_totalCounterTotal requests failed (timeout, error, rate limit)
QUEUE_OVERFLOWS_TOTALadaptive_rl_queue_overflows_totalCounterQueue overflow events (rejected due to full queue)
SCHEDULER_LOOPS_TOTALadaptive_rl_scheduler_loops_totalCounterScheduler main loop iterations
CIRCUIT_BREAKER_REJECTIONS_TOTALadaptive_rl_circuit_breaker_rejections_totalCounterRequests rejected by circuit breaker
REQUEST_TIMEOUTS_TOTALadaptive_rl_request_timeouts_totalCounterRequest timeouts

Gauge Metrics

ConstantMetric NameTypeDescription
ACTIVE_REQUESTSadaptive_rl_active_requestsGaugeCurrently active (in-flight) requests
QUEUE_DEPTHadaptive_rl_queue_depthGaugeCurrent queue depth
RESERVATIONS_ACTIVEadaptive_rl_reservations_activeGaugeCurrently active reservations

Streaming Metrics

ConstantMetric NameTypeDescription
STREAMING_COMPLETIONS_TOTALadaptive_rl_streaming_completions_totalCounterStreaming requests completed
STREAMING_ERRORS_TOTALadaptive_rl_streaming_errors_totalCounterStreaming requests that errored
STREAMING_TIMEOUTS_TOTALadaptive_rl_streaming_timeouts_totalCounterStreaming requests that timed out
STREAMING_STALE_CLEANUPS_TOTALadaptive_rl_streaming_stale_cleanups_totalCounterStale streaming entries cleaned up
STREAMING_TOKENS_REFUNDED_TOTALadaptive_rl_streaming_tokens_refunded_totalCounterTokens refunded (reserved - actual)
STREAMING_DURATION_SECONDSadaptive_rl_streaming_duration_secondsHistogramDuration of streaming requests

Cache Metrics

ConstantMetric NameTypeDescription
CACHE_HITS_TOTALadaptive_rl_cache_hits_totalCounterCache hits
CACHE_MISSES_TOTALadaptive_rl_cache_misses_totalCounterCache misses
CACHE_EVICTIONS_TOTALadaptive_rl_cache_evictions_totalCounterCache evictions
BACKEND_WRITES_TOTALadaptive_rl_backend_writes_totalCounterBackend writes
BACKEND_READS_TOTALadaptive_rl_backend_reads_totalCounterBackend reads
VERSION_CONFLICTS_TOTALadaptive_rl_version_conflicts_totalCounterOptimistic locking version conflicts

Reservation Metrics

ConstantMetric NameTypeDescription
RESERVATION_STALE_CLEANUPS_TOTALadaptive_rl_reservation_stale_cleanups_totalCounterStale reservation cleanups
RESERVATION_EMERGENCY_CLEANUPS_TOTALadaptive_rl_reservation_emergency_cleanups_totalCounterEmergency cleanups (memory pressure)
RESERVATION_BACKPRESSURE_REJECTIONS_TOTALadaptive_rl_reservation_backpressure_totalCounterBackpressure rejections

Backend Metrics

ConstantMetric NameTypeDescription
BACKEND_LUA_EXECUTIONS_TOTALadaptive_rl_backend_lua_executions_totalCounterLua script executions (Redis)
BACKEND_CONNECTION_ERRORS_TOTALadaptive_rl_backend_connection_errors_totalCounterBackend connection errors
BACKEND_LATENCY_SECONDSadaptive_rl_backend_latency_secondsHistogramBackend operation latency

Histogram Buckets

from adaptive_rate_limiter.observability import (
    LATENCY_BUCKETS,
    STREAMING_DURATION_BUCKETS,
    TOKEN_BUCKETS,
)

# Latency buckets (seconds): 5ms to 10s
LATENCY_BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]

# Streaming duration buckets (seconds): 1s to 20min
STREAMING_DURATION_BUCKETS = [1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1200.0]

# Token count buckets: 100 to 128K
TOKEN_BUCKETS = [100, 500, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000]

Protocols

The module defines protocols for creating custom metrics backends:

MetricsCollectorProtocol

from adaptive_rate_limiter.observability import MetricsCollectorProtocol

class MyCustomCollector:
    """Custom metrics collector implementing the protocol."""

    def inc_counter(self, name: str, value: int = 1, labels: dict | None = None) -> None:
        # Your implementation (e.g., send to StatsD)
        pass

    def set_gauge(self, name: str, value: float, labels: dict | None = None) -> None:
        pass

    def inc_gauge(self, name: str, value: float = 1.0, labels: dict | None = None) -> None:
        pass

    def dec_gauge(self, name: str, value: float = 1.0, labels: dict | None = None) -> None:
        pass

    def observe_histogram(self, name: str, value: float, labels: dict | None = None) -> None:
        pass

    def get_metrics(self) -> dict:
        return {"counters": {}, "gauges": {}, "histograms": {}}

    def reset(self) -> None:
        pass

# Duck typing works!
assert isinstance(MyCustomCollector(), MetricsCollectorProtocol)

PrometheusExporterProtocol

from adaptive_rate_limiter.observability import PrometheusExporterProtocol

class MyExporter:
    def start_server(self, host: str, port: int) -> None:
        pass

    def stop_server(self) -> None:
        pass

    @property
    def is_running(self) -> bool:
        return False

Label Best Practices

DO use categorical labels:
# Good - finite set of values
labels = {
    "bucket_id": "tier:1",    # Categorical: tier:1, tier:2, tier:3
    "model_id": "gpt-5",      # Categorical: gpt-5, claude-haiku-4.5, venice-uncensored
    "reason": "timeout",      # Enum: timeout, rate_limit, error
    "extraction_succeeded": "true",  # Boolean: true, false
}
DON’T use unbounded labels:
# BAD - causes label cardinality explosion!
labels = {
    "request_id": "abc-123-xyz",  # Unique per request - unbounded!
    "user_id": "user_42",          # Unique per user - unbounded!
    "timestamp": "1234567890",     # Unique per second - unbounded!
}
Cardinality Protection: The collector automatically limits label combinations to prevent memory issues:
# Default limit: 1000 unique combinations per metric
UnifiedMetricsCollector.MAX_LABEL_COMBINATIONS = 1000

# When exceeded, new combinations are dropped with a warning
# Existing combinations continue to work

Thread Safety

All components are thread-safe:
import threading
from adaptive_rate_limiter.observability import get_metrics_collector

collector = get_metrics_collector()

def worker():
    for _ in range(1000):
        collector.inc_counter("my_counter")

# Safe for concurrent access
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# Counter will be exactly 10000
assert collector._counters["my_counter"][""] == 10000
Implementation Details:

Advanced Usage

Custom Registry for Testing

from prometheus_client import CollectorRegistry
from adaptive_rate_limiter.observability import UnifiedMetricsCollector

# Create isolated registry for tests
test_registry = CollectorRegistry()
collector = UnifiedMetricsCollector(enable_prometheus=True, registry=test_registry)

# Use collector in tests without polluting global Prometheus registry

Legacy Metric Name Mapping

For backward compatibility:
from adaptive_rate_limiter.observability import (
    LEGACY_METRIC_MAPPING,
    legacy_to_prometheus_name,
)

# Convert old names to new Prometheus-style names
old_name = "requests_scheduled"
new_name = legacy_to_prometheus_name(old_name)
# Returns: "adaptive_rl_requests_scheduled_total"

# Full mapping dictionary
print(LEGACY_METRIC_MAPPING)
# {
#     "requests_scheduled": "adaptive_rl_requests_scheduled_total",
#     "requests_completed": "adaptive_rl_requests_completed_total",
#     ...
# }

Metric Definitions

Access the pre-defined metric schemas:
from adaptive_rate_limiter.observability import MetricDefinition, METRIC_DEFINITIONS

# Get definition for a metric
defn = METRIC_DEFINITIONS["adaptive_rl_requests_scheduled_total"]
print(f"Name: {defn.name}")
print(f"Type: {defn.metric_type}")  # counter, gauge, histogram
print(f"Description: {defn.description}")
print(f"Labels: {defn.label_names}")  # ('bucket_id', 'model_id')
print(f"Buckets: {defn.buckets}")  # For histograms only

Reset for Testing

from adaptive_rate_limiter.observability import (
    reset_metrics_collector,
    reset_prometheus_streaming_metrics,
)

def teardown():
    # Reset singletons between tests
    reset_metrics_collector()
    reset_prometheus_streaming_metrics()