File size: 10,952 Bytes
ed1b365 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 | """Routing Metrics — Observability for Adaptive Router (Phase 5)
Tracks adapter routing decisions, memory boost application, and performance
metrics to enable monitoring and fine-tuning of the Phase 5 integration.
Exposes metrics for:
- Adapter selection frequency and confidence
- Memory boost hit rate (% of queries with memory boost applied)
- Router strategy selection
- Confidence distribution before/after memory boost
"""
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
@dataclass
class AdapterSelectionRecord:
"""Record of a single routing decision."""
timestamp: float
query_preview: str # First 60 chars of query
primary_adapter: str
secondary_adapters: List[str]
strategy: str # "keyword", "llm", "hybrid"
confidence_before_boost: float # Base confidence from keyword/llm
confidence_after_boost: float # After memory weighting applied
memory_boost_applied: bool
boost_magnitude: float = 0.0 # How much confidence changed
def to_dict(self) -> Dict:
"""Serialize to dict for JSON export."""
return {
"timestamp": self.timestamp,
"query_preview": self.query_preview,
"primary_adapter": self.primary_adapter,
"secondary_adapters": self.secondary_adapters,
"strategy": self.strategy,
"confidence_before_boost": round(self.confidence_before_boost, 3),
"confidence_after_boost": round(self.confidence_after_boost, 3),
"memory_boost_applied": self.memory_boost_applied,
"boost_magnitude": round(self.boost_magnitude, 3),
}
class RoutingMetrics:
"""Track and report on adapter routing decisions.
Maintains rolling statistics on:
- Which adapters are selected (frequency, as primary vs secondary)
- Confidence scores (average, distribution)
- Memory boost effectiveness (hit rate, average boost amount)
- Router strategy usage
- Cold start scenarios (no memory for adapter)
"""
# Maximum records to retain (rolling window to prevent memory bloat)
MAX_SELECTION_RECORDS = 1000
def __init__(self):
self.total_queries: int = 0
# Use deque with maxlen for bounded memory
from collections import deque
self.selection_records: deque = deque(maxlen=self.MAX_SELECTION_RECORDS)
# Per-adapter metrics
self.adapter_selection_counts: Dict[str, int] = {}
self.adapter_primary_count: Dict[str, int] = {}
self.adapter_secondary_count: Dict[str, int] = {}
self.adapter_avg_confidence: Dict[str, float] = {}
self.adapter_boost_hits: Dict[str, int] = {}
self.adapter_avg_boost_magnitude: Dict[str, float] = {}
# Strategy metrics
self.strategy_usage: Dict[str, int] = {
"keyword": 0,
"llm": 0,
"hybrid": 0,
"forced": 0,
}
# Memory metrics
self.memory_boost_count: int = 0
self.cold_start_queries: int = 0
def record_route(self, record: AdapterSelectionRecord) -> None:
"""Record a routing decision.
Args:
record: AdapterSelectionRecord with all routing details
"""
self.total_queries += 1
self.selection_records.append(record)
# Update adapter selection counts
self.adapter_selection_counts[record.primary_adapter] = \
self.adapter_selection_counts.get(record.primary_adapter, 0) + 1
self.adapter_primary_count[record.primary_adapter] = \
self.adapter_primary_count.get(record.primary_adapter, 0) + 1
for secondary in record.secondary_adapters:
self.adapter_selection_counts[secondary] = \
self.adapter_selection_counts.get(secondary, 0) + 1
self.adapter_secondary_count[secondary] = \
self.adapter_secondary_count.get(secondary, 0) + 1
# Update confidence metrics
self._update_adapter_confidence(record.primary_adapter, record.confidence_after_boost)
# Update memory boost metrics
if record.memory_boost_applied:
self.memory_boost_count += 1
self.adapter_boost_hits[record.primary_adapter] = \
self.adapter_boost_hits.get(record.primary_adapter, 0) + 1
self.adapter_avg_boost_magnitude[record.primary_adapter] = \
record.boost_magnitude
# Update strategy metrics
self.strategy_usage[record.strategy] = self.strategy_usage.get(record.strategy, 0) + 1
def _update_adapter_confidence(self, adapter: str, confidence: float) -> None:
"""Update running average confidence for adapter."""
if adapter not in self.adapter_avg_confidence:
self.adapter_avg_confidence[adapter] = confidence
else:
current_count = self.adapter_selection_counts.get(adapter, 1)
old_avg = self.adapter_avg_confidence[adapter]
new_avg = (old_avg * (current_count - 1) + confidence) / current_count
self.adapter_avg_confidence[adapter] = new_avg
def get_adapter_stats(self, adapter: str) -> Dict:
"""Get comprehensive stats for a single adapter.
Returns:
Dict with selection count, hit rate, avg confidence, etc.
"""
selections = self.adapter_selection_counts.get(adapter, 0)
boosts = self.adapter_boost_hits.get(adapter, 0)
return {
"adapter": adapter,
"total_selections": selections,
"primary_selections": self.adapter_primary_count.get(adapter, 0),
"secondary_selections": self.adapter_secondary_count.get(adapter, 0),
"avg_confidence": round(self.adapter_avg_confidence.get(adapter, 0.0), 3),
"memory_boost_hits": boosts,
"memory_boost_rate": round(boosts / max(selections, 1), 3),
"avg_boost_magnitude": round(self.adapter_avg_boost_magnitude.get(adapter, 0.0), 3),
}
def get_summary(self) -> Dict:
"""Return comprehensive summary of routing metrics.
Returns:
Dict with overall statistics and per-adapter breakdown
"""
if self.total_queries == 0:
return {"total_queries": 0, "status": "no data"}
# Compute averages
total_selections = sum(self.adapter_selection_counts.values())
all_confidences = [r.confidence_after_boost for r in self.selection_records]
avg_confidence = sum(all_confidences) / len(all_confidences) if all_confidences else 0.0
# Top adapters
top_adapters = sorted(
self.adapter_selection_counts.items(),
key=lambda x: x[1],
reverse=True,
)[:5]
# Memory boost rate
memory_boost_rate = self.memory_boost_count / max(self.total_queries, 1)
# Most used strategy
top_strategy = max(self.strategy_usage.items(), key=lambda x: x[1])[0]
return {
"total_queries": self.total_queries,
"total_adapter_selections": total_selections,
"avg_confidence": round(avg_confidence, 3),
"confidence_range": (
round(min(all_confidences), 3) if all_confidences else 0.0,
round(max(all_confidences), 3) if all_confidences else 1.0,
),
"top_adapters": [
{
"adapter": name,
"count": count,
"percentage": round(count / max(total_selections, 1), 3),
}
for name, count in top_adapters
],
"memory_boost_rate": round(memory_boost_rate, 3),
"memory_boosts_applied": self.memory_boost_count,
"strategy_distribution": dict(self.strategy_usage),
"primary_strategy": top_strategy,
"cold_start_queries": self.cold_start_queries,
"adapter_stats": {
adapter: self.get_adapter_stats(adapter)
for adapter in self.adapter_selection_counts.keys()
},
}
def get_recent_routes(self, limit: int = 10) -> List[Dict]:
"""Return recent routing decisions for debugging.
Args:
limit: Max records to return
Returns:
List of recent routing records (most recent first)
"""
# Convert deque to list to enable slicing, then reverse for most-recent-first
records_list = list(self.selection_records)
return [
{
"timestamp": r.timestamp,
"query": r.query_preview,
"primary": r.primary_adapter,
"secondary": r.secondary_adapters,
"confidence": round(r.confidence_after_boost, 3),
"strategy": r.strategy,
"boost_applied": r.memory_boost_applied,
}
for r in records_list[-limit:][::-1] # Most recent first
]
def reset(self) -> None:
"""Clear all metrics (for testing or new session)."""
self.__init__()
@staticmethod
def create_record(
query: str,
primary_adapter: str,
secondary_adapters: List[str],
strategy: str,
confidence_before_boost: float,
confidence_after_boost: float,
memory_boost_applied: bool,
) -> AdapterSelectionRecord:
"""Factory method to create a routing record.
Args:
query: The user's query (will be truncated to first 60 chars)
primary_adapter: Selected primary adapter name
secondary_adapters: List of secondary adapters
strategy: Routing strategy used
confidence_before_boost: Base confidence score
confidence_after_boost: Confidence after memory boost (if applied)
memory_boost_applied: Whether memory weighting was applied
Returns:
AdapterSelectionRecord ready to log
"""
boost_magnitude = confidence_after_boost - confidence_before_boost
return AdapterSelectionRecord(
timestamp=time.time(),
query_preview=query[:60] + ("..." if len(query) > 60 else ""),
primary_adapter=primary_adapter,
secondary_adapters=secondary_adapters,
strategy=strategy,
confidence_before_boost=confidence_before_boost,
confidence_after_boost=confidence_after_boost,
memory_boost_applied=memory_boost_applied,
boost_magnitude=boost_magnitude,
)
|