semantic-scalpel-bsv / telemetry.py
GotThatData's picture
Upload telemetry.py with huggingface_hub
f57efb3 verified
"""
Telemetry System for Semantic Scalpel
Tracks usage metrics for analytics and monitoring:
- Query counts and latency
- Example click rates
- Error rates and types
- BSV anchor statistics
Data is privacy-preserving (no raw text stored).
Created by Bryan Daugherty
SmartLedger Blockchain Solutions Inc
"""
import json
import time
import threading
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from enum import Enum
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
from config import get_config
class EventType(str, Enum):
"""Types of telemetry events."""
QUERY = "query"
EXAMPLE_CLICK = "example_click"
TAB_VIEW = "tab_view"
ERROR = "error"
BSV_ANCHOR = "bsv_anchor"
SHARE_CLICK = "share_click"
COST_CALC = "cost_calc"
@dataclass
class TelemetryEvent:
"""A single telemetry event."""
event_type: EventType
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
properties: Dict[str, Any] = field(default_factory=dict)
session_id: Optional[str] = None
@dataclass
class TelemetryStats:
"""Aggregated telemetry statistics."""
total_queries: int = 0
total_examples_clicked: int = 0
total_errors: int = 0
total_bsv_anchors: int = 0
total_shares: int = 0
# By category
queries_by_confidence: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
examples_by_name: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
errors_by_type: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
bsv_by_mode: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
# Performance
avg_latency_ms: float = 0.0
p95_latency_ms: float = 0.0
latency_samples: List[float] = field(default_factory=list)
# Time tracking
first_event: Optional[str] = None
last_event: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"total_queries": self.total_queries,
"total_examples_clicked": self.total_examples_clicked,
"total_errors": self.total_errors,
"total_bsv_anchors": self.total_bsv_anchors,
"total_shares": self.total_shares,
"queries_by_confidence": dict(self.queries_by_confidence),
"examples_by_name": dict(self.examples_by_name),
"errors_by_type": dict(self.errors_by_type),
"bsv_by_mode": dict(self.bsv_by_mode),
"avg_latency_ms": self.avg_latency_ms,
"p95_latency_ms": self.p95_latency_ms,
"first_event": self.first_event,
"last_event": self.last_event,
}
class TelemetryCollector:
"""
Collects and aggregates telemetry data.
Usage:
telemetry = TelemetryCollector()
# Track a query
telemetry.track_query(confidence=0.95, latency_ms=5.2, bsv_mode="async")
# Track example click
telemetry.track_example_click("garden_path_classic")
# Get stats
stats = telemetry.get_stats()
print(f"Total queries: {stats.total_queries}")
"""
def __init__(self, max_events: int = 10000, flush_interval_seconds: int = 300):
self.config = get_config()
self.max_events = max_events
self.flush_interval = flush_interval_seconds
# Event storage
self._events: List[TelemetryEvent] = []
self._stats = TelemetryStats()
self._lock = threading.Lock()
# Background flush thread
self._flush_thread: Optional[threading.Thread] = None
self._stop_flush = threading.Event()
if self.config.telemetry_enabled and self.config.telemetry_endpoint:
self._start_flush_thread()
def _start_flush_thread(self):
"""Start background thread to periodically flush events."""
def flush_loop():
while not self._stop_flush.wait(self.flush_interval):
self._flush_to_endpoint()
self._flush_thread = threading.Thread(target=flush_loop, daemon=True)
self._flush_thread.start()
def _flush_to_endpoint(self):
"""Flush events to remote endpoint."""
if not self.config.telemetry_endpoint or not HAS_HTTPX:
return
with self._lock:
if not self._events:
return
events_to_send = self._events.copy()
self._events = []
try:
with httpx.Client(timeout=10.0) as client:
client.post(
self.config.telemetry_endpoint,
json={
"events": [
{
"type": e.event_type.value,
"timestamp": e.timestamp,
"properties": e.properties,
}
for e in events_to_send
],
"stats": self._stats.to_dict(),
}
)
except Exception:
# Re-add events on failure (up to max)
with self._lock:
remaining_capacity = self.max_events - len(self._events)
self._events = events_to_send[:remaining_capacity] + self._events
def _record_event(self, event: TelemetryEvent):
"""Record an event with stats update."""
if not self.config.telemetry_enabled:
return
with self._lock:
# Add to event list (with size limit)
if len(self._events) < self.max_events:
self._events.append(event)
# Update time tracking
if self._stats.first_event is None:
self._stats.first_event = event.timestamp
self._stats.last_event = event.timestamp
# -------------------------------------------------------------------------
# Public Tracking Methods
# -------------------------------------------------------------------------
def track_query(
self,
confidence: float,
latency_ms: float,
bsv_mode: str = "async",
bsv_success: bool = True,
is_example: bool = False,
):
"""Track a prediction query."""
# Determine confidence bucket
if confidence >= 0.85:
bucket = "high"
elif confidence >= 0.70:
bucket = "medium"
else:
bucket = "low"
event = TelemetryEvent(
event_type=EventType.QUERY,
properties={
"confidence_bucket": bucket,
"latency_ms": round(latency_ms, 1),
"bsv_mode": bsv_mode,
"bsv_success": bsv_success,
"is_example": is_example,
}
)
self._record_event(event)
# Update stats
with self._lock:
self._stats.total_queries += 1
self._stats.queries_by_confidence[bucket] += 1
# Update latency stats
self._stats.latency_samples.append(latency_ms)
if len(self._stats.latency_samples) > 1000:
self._stats.latency_samples = self._stats.latency_samples[-1000:]
samples = self._stats.latency_samples
self._stats.avg_latency_ms = sum(samples) / len(samples)
sorted_samples = sorted(samples)
p95_idx = int(len(sorted_samples) * 0.95)
self._stats.p95_latency_ms = sorted_samples[p95_idx] if sorted_samples else 0
def track_example_click(self, example_name: str):
"""Track when a user clicks an example."""
event = TelemetryEvent(
event_type=EventType.EXAMPLE_CLICK,
properties={"example_name": example_name}
)
self._record_event(event)
with self._lock:
self._stats.total_examples_clicked += 1
self._stats.examples_by_name[example_name] += 1
def track_tab_view(self, tab_name: str):
"""Track when a user views a tab."""
event = TelemetryEvent(
event_type=EventType.TAB_VIEW,
properties={"tab_name": tab_name}
)
self._record_event(event)
def track_error(self, error_type: str, error_message: str = ""):
"""Track an error."""
event = TelemetryEvent(
event_type=EventType.ERROR,
properties={
"error_type": error_type,
"error_message": error_message[:100], # Truncate
}
)
self._record_event(event)
with self._lock:
self._stats.total_errors += 1
self._stats.errors_by_type[error_type] += 1
def track_bsv_anchor(self, mode: str, success: bool, is_simulated: bool):
"""Track a BSV anchor operation."""
event = TelemetryEvent(
event_type=EventType.BSV_ANCHOR,
properties={
"mode": mode,
"success": success,
"is_simulated": is_simulated,
}
)
self._record_event(event)
with self._lock:
self._stats.total_bsv_anchors += 1
mode_key = f"{mode}_{'simulated' if is_simulated else 'live'}"
self._stats.bsv_by_mode[mode_key] += 1
def track_share_click(self, platform: str):
"""Track when a user clicks share."""
event = TelemetryEvent(
event_type=EventType.SHARE_CLICK,
properties={"platform": platform}
)
self._record_event(event)
with self._lock:
self._stats.total_shares += 1
def track_cost_calculation(self, queries_per_month: int):
"""Track cost calculator usage."""
event = TelemetryEvent(
event_type=EventType.COST_CALC,
properties={"queries_per_month": queries_per_month}
)
self._record_event(event)
# -------------------------------------------------------------------------
# Stats Retrieval
# -------------------------------------------------------------------------
def get_stats(self) -> TelemetryStats:
"""Get current telemetry statistics."""
with self._lock:
return TelemetryStats(
total_queries=self._stats.total_queries,
total_examples_clicked=self._stats.total_examples_clicked,
total_errors=self._stats.total_errors,
total_bsv_anchors=self._stats.total_bsv_anchors,
total_shares=self._stats.total_shares,
queries_by_confidence=dict(self._stats.queries_by_confidence),
examples_by_name=dict(self._stats.examples_by_name),
errors_by_type=dict(self._stats.errors_by_type),
bsv_by_mode=dict(self._stats.bsv_by_mode),
avg_latency_ms=self._stats.avg_latency_ms,
p95_latency_ms=self._stats.p95_latency_ms,
first_event=self._stats.first_event,
last_event=self._stats.last_event,
)
def get_stats_summary(self) -> str:
"""Get a formatted stats summary."""
stats = self.get_stats()
return f"""
Telemetry Summary
-----------------
Total Queries: {stats.total_queries}
Examples Clicked: {stats.total_examples_clicked}
BSV Anchors: {stats.total_bsv_anchors}
Errors: {stats.total_errors}
Shares: {stats.total_shares}
Performance:
Avg Latency: {stats.avg_latency_ms:.1f}ms
P95 Latency: {stats.p95_latency_ms:.1f}ms
Confidence Distribution:
High (>=85%): {stats.queries_by_confidence.get('high', 0)}
Medium (70-85%): {stats.queries_by_confidence.get('medium', 0)}
Low (<70%): {stats.queries_by_confidence.get('low', 0)}
Top Examples:
{chr(10).join(f' {k}: {v}' for k, v in sorted(stats.examples_by_name.items(), key=lambda x: -x[1])[:5])}
"""
def shutdown(self):
"""Shutdown telemetry collector."""
if self._flush_thread:
self._stop_flush.set()
self._flush_to_endpoint()
# Global telemetry instance
_telemetry: Optional[TelemetryCollector] = None
def get_telemetry() -> TelemetryCollector:
"""Get or create the global telemetry instance."""
global _telemetry
if _telemetry is None:
_telemetry = TelemetryCollector()
return _telemetry