| """ |
| Telemetry System for Semantic Scalpel |
| |
| Tracks usage metrics for analytics and monitoring: |
| - Query counts and latency |
| - Example click rates |
| - Error rates and types |
| - BSV anchor statistics |
| |
| Data is privacy-preserving (no raw text stored). |
| |
| Created by Bryan Daugherty |
| SmartLedger Blockchain Solutions Inc |
| """ |
|
|
| import json |
| import time |
| import threading |
| from collections import defaultdict |
| from dataclasses import dataclass, field |
| from datetime import datetime, timedelta |
| from typing import Any, Dict, List, Optional |
| from enum import Enum |
|
|
| try: |
| import httpx |
| HAS_HTTPX = True |
| except ImportError: |
| HAS_HTTPX = False |
|
|
| from config import get_config |
|
|
|
|
| class EventType(str, Enum): |
| """Types of telemetry events.""" |
| QUERY = "query" |
| EXAMPLE_CLICK = "example_click" |
| TAB_VIEW = "tab_view" |
| ERROR = "error" |
| BSV_ANCHOR = "bsv_anchor" |
| SHARE_CLICK = "share_click" |
| COST_CALC = "cost_calc" |
|
|
|
|
| @dataclass |
| class TelemetryEvent: |
| """A single telemetry event.""" |
| event_type: EventType |
| timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z") |
| properties: Dict[str, Any] = field(default_factory=dict) |
| session_id: Optional[str] = None |
|
|
|
|
| @dataclass |
| class TelemetryStats: |
| """Aggregated telemetry statistics.""" |
| total_queries: int = 0 |
| total_examples_clicked: int = 0 |
| total_errors: int = 0 |
| total_bsv_anchors: int = 0 |
| total_shares: int = 0 |
|
|
| |
| queries_by_confidence: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) |
| examples_by_name: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) |
| errors_by_type: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) |
| bsv_by_mode: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) |
|
|
| |
| avg_latency_ms: float = 0.0 |
| p95_latency_ms: float = 0.0 |
| latency_samples: List[float] = field(default_factory=list) |
|
|
| |
| first_event: Optional[str] = None |
| last_event: Optional[str] = None |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "total_queries": self.total_queries, |
| "total_examples_clicked": self.total_examples_clicked, |
| "total_errors": self.total_errors, |
| "total_bsv_anchors": self.total_bsv_anchors, |
| "total_shares": self.total_shares, |
| "queries_by_confidence": dict(self.queries_by_confidence), |
| "examples_by_name": dict(self.examples_by_name), |
| "errors_by_type": dict(self.errors_by_type), |
| "bsv_by_mode": dict(self.bsv_by_mode), |
| "avg_latency_ms": self.avg_latency_ms, |
| "p95_latency_ms": self.p95_latency_ms, |
| "first_event": self.first_event, |
| "last_event": self.last_event, |
| } |
|
|
|
|
| class TelemetryCollector: |
| """ |
| Collects and aggregates telemetry data. |
| |
| Usage: |
| telemetry = TelemetryCollector() |
| |
| # Track a query |
| telemetry.track_query(confidence=0.95, latency_ms=5.2, bsv_mode="async") |
| |
| # Track example click |
| telemetry.track_example_click("garden_path_classic") |
| |
| # Get stats |
| stats = telemetry.get_stats() |
| print(f"Total queries: {stats.total_queries}") |
| """ |
|
|
| def __init__(self, max_events: int = 10000, flush_interval_seconds: int = 300): |
| self.config = get_config() |
| self.max_events = max_events |
| self.flush_interval = flush_interval_seconds |
|
|
| |
| self._events: List[TelemetryEvent] = [] |
| self._stats = TelemetryStats() |
| self._lock = threading.Lock() |
|
|
| |
| self._flush_thread: Optional[threading.Thread] = None |
| self._stop_flush = threading.Event() |
|
|
| if self.config.telemetry_enabled and self.config.telemetry_endpoint: |
| self._start_flush_thread() |
|
|
| def _start_flush_thread(self): |
| """Start background thread to periodically flush events.""" |
| def flush_loop(): |
| while not self._stop_flush.wait(self.flush_interval): |
| self._flush_to_endpoint() |
|
|
| self._flush_thread = threading.Thread(target=flush_loop, daemon=True) |
| self._flush_thread.start() |
|
|
| def _flush_to_endpoint(self): |
| """Flush events to remote endpoint.""" |
| if not self.config.telemetry_endpoint or not HAS_HTTPX: |
| return |
|
|
| with self._lock: |
| if not self._events: |
| return |
|
|
| events_to_send = self._events.copy() |
| self._events = [] |
|
|
| try: |
| with httpx.Client(timeout=10.0) as client: |
| client.post( |
| self.config.telemetry_endpoint, |
| json={ |
| "events": [ |
| { |
| "type": e.event_type.value, |
| "timestamp": e.timestamp, |
| "properties": e.properties, |
| } |
| for e in events_to_send |
| ], |
| "stats": self._stats.to_dict(), |
| } |
| ) |
| except Exception: |
| |
| with self._lock: |
| remaining_capacity = self.max_events - len(self._events) |
| self._events = events_to_send[:remaining_capacity] + self._events |
|
|
| def _record_event(self, event: TelemetryEvent): |
| """Record an event with stats update.""" |
| if not self.config.telemetry_enabled: |
| return |
|
|
| with self._lock: |
| |
| if len(self._events) < self.max_events: |
| self._events.append(event) |
|
|
| |
| if self._stats.first_event is None: |
| self._stats.first_event = event.timestamp |
| self._stats.last_event = event.timestamp |
|
|
| |
| |
| |
|
|
| def track_query( |
| self, |
| confidence: float, |
| latency_ms: float, |
| bsv_mode: str = "async", |
| bsv_success: bool = True, |
| is_example: bool = False, |
| ): |
| """Track a prediction query.""" |
| |
| if confidence >= 0.85: |
| bucket = "high" |
| elif confidence >= 0.70: |
| bucket = "medium" |
| else: |
| bucket = "low" |
|
|
| event = TelemetryEvent( |
| event_type=EventType.QUERY, |
| properties={ |
| "confidence_bucket": bucket, |
| "latency_ms": round(latency_ms, 1), |
| "bsv_mode": bsv_mode, |
| "bsv_success": bsv_success, |
| "is_example": is_example, |
| } |
| ) |
| self._record_event(event) |
|
|
| |
| with self._lock: |
| self._stats.total_queries += 1 |
| self._stats.queries_by_confidence[bucket] += 1 |
|
|
| |
| self._stats.latency_samples.append(latency_ms) |
| if len(self._stats.latency_samples) > 1000: |
| self._stats.latency_samples = self._stats.latency_samples[-1000:] |
|
|
| samples = self._stats.latency_samples |
| self._stats.avg_latency_ms = sum(samples) / len(samples) |
| sorted_samples = sorted(samples) |
| p95_idx = int(len(sorted_samples) * 0.95) |
| self._stats.p95_latency_ms = sorted_samples[p95_idx] if sorted_samples else 0 |
|
|
| def track_example_click(self, example_name: str): |
| """Track when a user clicks an example.""" |
| event = TelemetryEvent( |
| event_type=EventType.EXAMPLE_CLICK, |
| properties={"example_name": example_name} |
| ) |
| self._record_event(event) |
|
|
| with self._lock: |
| self._stats.total_examples_clicked += 1 |
| self._stats.examples_by_name[example_name] += 1 |
|
|
| def track_tab_view(self, tab_name: str): |
| """Track when a user views a tab.""" |
| event = TelemetryEvent( |
| event_type=EventType.TAB_VIEW, |
| properties={"tab_name": tab_name} |
| ) |
| self._record_event(event) |
|
|
| def track_error(self, error_type: str, error_message: str = ""): |
| """Track an error.""" |
| event = TelemetryEvent( |
| event_type=EventType.ERROR, |
| properties={ |
| "error_type": error_type, |
| "error_message": error_message[:100], |
| } |
| ) |
| self._record_event(event) |
|
|
| with self._lock: |
| self._stats.total_errors += 1 |
| self._stats.errors_by_type[error_type] += 1 |
|
|
| def track_bsv_anchor(self, mode: str, success: bool, is_simulated: bool): |
| """Track a BSV anchor operation.""" |
| event = TelemetryEvent( |
| event_type=EventType.BSV_ANCHOR, |
| properties={ |
| "mode": mode, |
| "success": success, |
| "is_simulated": is_simulated, |
| } |
| ) |
| self._record_event(event) |
|
|
| with self._lock: |
| self._stats.total_bsv_anchors += 1 |
| mode_key = f"{mode}_{'simulated' if is_simulated else 'live'}" |
| self._stats.bsv_by_mode[mode_key] += 1 |
|
|
| def track_share_click(self, platform: str): |
| """Track when a user clicks share.""" |
| event = TelemetryEvent( |
| event_type=EventType.SHARE_CLICK, |
| properties={"platform": platform} |
| ) |
| self._record_event(event) |
|
|
| with self._lock: |
| self._stats.total_shares += 1 |
|
|
| def track_cost_calculation(self, queries_per_month: int): |
| """Track cost calculator usage.""" |
| event = TelemetryEvent( |
| event_type=EventType.COST_CALC, |
| properties={"queries_per_month": queries_per_month} |
| ) |
| self._record_event(event) |
|
|
| |
| |
| |
|
|
| def get_stats(self) -> TelemetryStats: |
| """Get current telemetry statistics.""" |
| with self._lock: |
| return TelemetryStats( |
| total_queries=self._stats.total_queries, |
| total_examples_clicked=self._stats.total_examples_clicked, |
| total_errors=self._stats.total_errors, |
| total_bsv_anchors=self._stats.total_bsv_anchors, |
| total_shares=self._stats.total_shares, |
| queries_by_confidence=dict(self._stats.queries_by_confidence), |
| examples_by_name=dict(self._stats.examples_by_name), |
| errors_by_type=dict(self._stats.errors_by_type), |
| bsv_by_mode=dict(self._stats.bsv_by_mode), |
| avg_latency_ms=self._stats.avg_latency_ms, |
| p95_latency_ms=self._stats.p95_latency_ms, |
| first_event=self._stats.first_event, |
| last_event=self._stats.last_event, |
| ) |
|
|
| def get_stats_summary(self) -> str: |
| """Get a formatted stats summary.""" |
| stats = self.get_stats() |
| return f""" |
| Telemetry Summary |
| ----------------- |
| Total Queries: {stats.total_queries} |
| Examples Clicked: {stats.total_examples_clicked} |
| BSV Anchors: {stats.total_bsv_anchors} |
| Errors: {stats.total_errors} |
| Shares: {stats.total_shares} |
| |
| Performance: |
| Avg Latency: {stats.avg_latency_ms:.1f}ms |
| P95 Latency: {stats.p95_latency_ms:.1f}ms |
| |
| Confidence Distribution: |
| High (>=85%): {stats.queries_by_confidence.get('high', 0)} |
| Medium (70-85%): {stats.queries_by_confidence.get('medium', 0)} |
| Low (<70%): {stats.queries_by_confidence.get('low', 0)} |
| |
| Top Examples: |
| {chr(10).join(f' {k}: {v}' for k, v in sorted(stats.examples_by_name.items(), key=lambda x: -x[1])[:5])} |
| """ |
|
|
| def shutdown(self): |
| """Shutdown telemetry collector.""" |
| if self._flush_thread: |
| self._stop_flush.set() |
| self._flush_to_endpoint() |
|
|
|
|
| |
| _telemetry: Optional[TelemetryCollector] = None |
|
|
|
|
| def get_telemetry() -> TelemetryCollector: |
| """Get or create the global telemetry instance.""" |
| global _telemetry |
| if _telemetry is None: |
| _telemetry = TelemetryCollector() |
| return _telemetry |
|
|