GotThatData commited on
Commit
f57efb3
·
verified ·
1 Parent(s): db7610c

Upload telemetry.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. telemetry.py +373 -0
telemetry.py ADDED
@@ -0,0 +1,373 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Telemetry System for Semantic Scalpel
3
+
4
+ Tracks usage metrics for analytics and monitoring:
5
+ - Query counts and latency
6
+ - Example click rates
7
+ - Error rates and types
8
+ - BSV anchor statistics
9
+
10
+ Data is privacy-preserving (no raw text stored).
11
+
12
+ Created by Bryan Daugherty
13
+ SmartLedger Blockchain Solutions Inc
14
+ """
15
+
16
+ import json
17
+ import time
18
+ import threading
19
+ from collections import defaultdict
20
+ from dataclasses import dataclass, field
21
+ from datetime import datetime, timedelta
22
+ from typing import Any, Dict, List, Optional
23
+ from enum import Enum
24
+
25
+ try:
26
+ import httpx
27
+ HAS_HTTPX = True
28
+ except ImportError:
29
+ HAS_HTTPX = False
30
+
31
+ from config import get_config
32
+
33
+
34
+ class EventType(str, Enum):
35
+ """Types of telemetry events."""
36
+ QUERY = "query"
37
+ EXAMPLE_CLICK = "example_click"
38
+ TAB_VIEW = "tab_view"
39
+ ERROR = "error"
40
+ BSV_ANCHOR = "bsv_anchor"
41
+ SHARE_CLICK = "share_click"
42
+ COST_CALC = "cost_calc"
43
+
44
+
45
+ @dataclass
46
+ class TelemetryEvent:
47
+ """A single telemetry event."""
48
+ event_type: EventType
49
+ timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
50
+ properties: Dict[str, Any] = field(default_factory=dict)
51
+ session_id: Optional[str] = None
52
+
53
+
54
+ @dataclass
55
+ class TelemetryStats:
56
+ """Aggregated telemetry statistics."""
57
+ total_queries: int = 0
58
+ total_examples_clicked: int = 0
59
+ total_errors: int = 0
60
+ total_bsv_anchors: int = 0
61
+ total_shares: int = 0
62
+
63
+ # By category
64
+ queries_by_confidence: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
65
+ examples_by_name: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
66
+ errors_by_type: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
67
+ bsv_by_mode: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
68
+
69
+ # Performance
70
+ avg_latency_ms: float = 0.0
71
+ p95_latency_ms: float = 0.0
72
+ latency_samples: List[float] = field(default_factory=list)
73
+
74
+ # Time tracking
75
+ first_event: Optional[str] = None
76
+ last_event: Optional[str] = None
77
+
78
+ def to_dict(self) -> Dict[str, Any]:
79
+ return {
80
+ "total_queries": self.total_queries,
81
+ "total_examples_clicked": self.total_examples_clicked,
82
+ "total_errors": self.total_errors,
83
+ "total_bsv_anchors": self.total_bsv_anchors,
84
+ "total_shares": self.total_shares,
85
+ "queries_by_confidence": dict(self.queries_by_confidence),
86
+ "examples_by_name": dict(self.examples_by_name),
87
+ "errors_by_type": dict(self.errors_by_type),
88
+ "bsv_by_mode": dict(self.bsv_by_mode),
89
+ "avg_latency_ms": self.avg_latency_ms,
90
+ "p95_latency_ms": self.p95_latency_ms,
91
+ "first_event": self.first_event,
92
+ "last_event": self.last_event,
93
+ }
94
+
95
+
96
+ class TelemetryCollector:
97
+ """
98
+ Collects and aggregates telemetry data.
99
+
100
+ Usage:
101
+ telemetry = TelemetryCollector()
102
+
103
+ # Track a query
104
+ telemetry.track_query(confidence=0.95, latency_ms=5.2, bsv_mode="async")
105
+
106
+ # Track example click
107
+ telemetry.track_example_click("garden_path_classic")
108
+
109
+ # Get stats
110
+ stats = telemetry.get_stats()
111
+ print(f"Total queries: {stats.total_queries}")
112
+ """
113
+
114
+ def __init__(self, max_events: int = 10000, flush_interval_seconds: int = 300):
115
+ self.config = get_config()
116
+ self.max_events = max_events
117
+ self.flush_interval = flush_interval_seconds
118
+
119
+ # Event storage
120
+ self._events: List[TelemetryEvent] = []
121
+ self._stats = TelemetryStats()
122
+ self._lock = threading.Lock()
123
+
124
+ # Background flush thread
125
+ self._flush_thread: Optional[threading.Thread] = None
126
+ self._stop_flush = threading.Event()
127
+
128
+ if self.config.telemetry_enabled and self.config.telemetry_endpoint:
129
+ self._start_flush_thread()
130
+
131
+ def _start_flush_thread(self):
132
+ """Start background thread to periodically flush events."""
133
+ def flush_loop():
134
+ while not self._stop_flush.wait(self.flush_interval):
135
+ self._flush_to_endpoint()
136
+
137
+ self._flush_thread = threading.Thread(target=flush_loop, daemon=True)
138
+ self._flush_thread.start()
139
+
140
+ def _flush_to_endpoint(self):
141
+ """Flush events to remote endpoint."""
142
+ if not self.config.telemetry_endpoint or not HAS_HTTPX:
143
+ return
144
+
145
+ with self._lock:
146
+ if not self._events:
147
+ return
148
+
149
+ events_to_send = self._events.copy()
150
+ self._events = []
151
+
152
+ try:
153
+ with httpx.Client(timeout=10.0) as client:
154
+ client.post(
155
+ self.config.telemetry_endpoint,
156
+ json={
157
+ "events": [
158
+ {
159
+ "type": e.event_type.value,
160
+ "timestamp": e.timestamp,
161
+ "properties": e.properties,
162
+ }
163
+ for e in events_to_send
164
+ ],
165
+ "stats": self._stats.to_dict(),
166
+ }
167
+ )
168
+ except Exception:
169
+ # Re-add events on failure (up to max)
170
+ with self._lock:
171
+ remaining_capacity = self.max_events - len(self._events)
172
+ self._events = events_to_send[:remaining_capacity] + self._events
173
+
174
+ def _record_event(self, event: TelemetryEvent):
175
+ """Record an event with stats update."""
176
+ if not self.config.telemetry_enabled:
177
+ return
178
+
179
+ with self._lock:
180
+ # Add to event list (with size limit)
181
+ if len(self._events) < self.max_events:
182
+ self._events.append(event)
183
+
184
+ # Update time tracking
185
+ if self._stats.first_event is None:
186
+ self._stats.first_event = event.timestamp
187
+ self._stats.last_event = event.timestamp
188
+
189
+ # -------------------------------------------------------------------------
190
+ # Public Tracking Methods
191
+ # -------------------------------------------------------------------------
192
+
193
+ def track_query(
194
+ self,
195
+ confidence: float,
196
+ latency_ms: float,
197
+ bsv_mode: str = "async",
198
+ bsv_success: bool = True,
199
+ is_example: bool = False,
200
+ ):
201
+ """Track a prediction query."""
202
+ # Determine confidence bucket
203
+ if confidence >= 0.85:
204
+ bucket = "high"
205
+ elif confidence >= 0.70:
206
+ bucket = "medium"
207
+ else:
208
+ bucket = "low"
209
+
210
+ event = TelemetryEvent(
211
+ event_type=EventType.QUERY,
212
+ properties={
213
+ "confidence_bucket": bucket,
214
+ "latency_ms": round(latency_ms, 1),
215
+ "bsv_mode": bsv_mode,
216
+ "bsv_success": bsv_success,
217
+ "is_example": is_example,
218
+ }
219
+ )
220
+ self._record_event(event)
221
+
222
+ # Update stats
223
+ with self._lock:
224
+ self._stats.total_queries += 1
225
+ self._stats.queries_by_confidence[bucket] += 1
226
+
227
+ # Update latency stats
228
+ self._stats.latency_samples.append(latency_ms)
229
+ if len(self._stats.latency_samples) > 1000:
230
+ self._stats.latency_samples = self._stats.latency_samples[-1000:]
231
+
232
+ samples = self._stats.latency_samples
233
+ self._stats.avg_latency_ms = sum(samples) / len(samples)
234
+ sorted_samples = sorted(samples)
235
+ p95_idx = int(len(sorted_samples) * 0.95)
236
+ self._stats.p95_latency_ms = sorted_samples[p95_idx] if sorted_samples else 0
237
+
238
+ def track_example_click(self, example_name: str):
239
+ """Track when a user clicks an example."""
240
+ event = TelemetryEvent(
241
+ event_type=EventType.EXAMPLE_CLICK,
242
+ properties={"example_name": example_name}
243
+ )
244
+ self._record_event(event)
245
+
246
+ with self._lock:
247
+ self._stats.total_examples_clicked += 1
248
+ self._stats.examples_by_name[example_name] += 1
249
+
250
+ def track_tab_view(self, tab_name: str):
251
+ """Track when a user views a tab."""
252
+ event = TelemetryEvent(
253
+ event_type=EventType.TAB_VIEW,
254
+ properties={"tab_name": tab_name}
255
+ )
256
+ self._record_event(event)
257
+
258
+ def track_error(self, error_type: str, error_message: str = ""):
259
+ """Track an error."""
260
+ event = TelemetryEvent(
261
+ event_type=EventType.ERROR,
262
+ properties={
263
+ "error_type": error_type,
264
+ "error_message": error_message[:100], # Truncate
265
+ }
266
+ )
267
+ self._record_event(event)
268
+
269
+ with self._lock:
270
+ self._stats.total_errors += 1
271
+ self._stats.errors_by_type[error_type] += 1
272
+
273
+ def track_bsv_anchor(self, mode: str, success: bool, is_simulated: bool):
274
+ """Track a BSV anchor operation."""
275
+ event = TelemetryEvent(
276
+ event_type=EventType.BSV_ANCHOR,
277
+ properties={
278
+ "mode": mode,
279
+ "success": success,
280
+ "is_simulated": is_simulated,
281
+ }
282
+ )
283
+ self._record_event(event)
284
+
285
+ with self._lock:
286
+ self._stats.total_bsv_anchors += 1
287
+ mode_key = f"{mode}_{'simulated' if is_simulated else 'live'}"
288
+ self._stats.bsv_by_mode[mode_key] += 1
289
+
290
+ def track_share_click(self, platform: str):
291
+ """Track when a user clicks share."""
292
+ event = TelemetryEvent(
293
+ event_type=EventType.SHARE_CLICK,
294
+ properties={"platform": platform}
295
+ )
296
+ self._record_event(event)
297
+
298
+ with self._lock:
299
+ self._stats.total_shares += 1
300
+
301
+ def track_cost_calculation(self, queries_per_month: int):
302
+ """Track cost calculator usage."""
303
+ event = TelemetryEvent(
304
+ event_type=EventType.COST_CALC,
305
+ properties={"queries_per_month": queries_per_month}
306
+ )
307
+ self._record_event(event)
308
+
309
+ # -------------------------------------------------------------------------
310
+ # Stats Retrieval
311
+ # -------------------------------------------------------------------------
312
+
313
+ def get_stats(self) -> TelemetryStats:
314
+ """Get current telemetry statistics."""
315
+ with self._lock:
316
+ return TelemetryStats(
317
+ total_queries=self._stats.total_queries,
318
+ total_examples_clicked=self._stats.total_examples_clicked,
319
+ total_errors=self._stats.total_errors,
320
+ total_bsv_anchors=self._stats.total_bsv_anchors,
321
+ total_shares=self._stats.total_shares,
322
+ queries_by_confidence=dict(self._stats.queries_by_confidence),
323
+ examples_by_name=dict(self._stats.examples_by_name),
324
+ errors_by_type=dict(self._stats.errors_by_type),
325
+ bsv_by_mode=dict(self._stats.bsv_by_mode),
326
+ avg_latency_ms=self._stats.avg_latency_ms,
327
+ p95_latency_ms=self._stats.p95_latency_ms,
328
+ first_event=self._stats.first_event,
329
+ last_event=self._stats.last_event,
330
+ )
331
+
332
+ def get_stats_summary(self) -> str:
333
+ """Get a formatted stats summary."""
334
+ stats = self.get_stats()
335
+ return f"""
336
+ Telemetry Summary
337
+ -----------------
338
+ Total Queries: {stats.total_queries}
339
+ Examples Clicked: {stats.total_examples_clicked}
340
+ BSV Anchors: {stats.total_bsv_anchors}
341
+ Errors: {stats.total_errors}
342
+ Shares: {stats.total_shares}
343
+
344
+ Performance:
345
+ Avg Latency: {stats.avg_latency_ms:.1f}ms
346
+ P95 Latency: {stats.p95_latency_ms:.1f}ms
347
+
348
+ Confidence Distribution:
349
+ High (>=85%): {stats.queries_by_confidence.get('high', 0)}
350
+ Medium (70-85%): {stats.queries_by_confidence.get('medium', 0)}
351
+ Low (<70%): {stats.queries_by_confidence.get('low', 0)}
352
+
353
+ Top Examples:
354
+ {chr(10).join(f' {k}: {v}' for k, v in sorted(stats.examples_by_name.items(), key=lambda x: -x[1])[:5])}
355
+ """
356
+
357
+ def shutdown(self):
358
+ """Shutdown telemetry collector."""
359
+ if self._flush_thread:
360
+ self._stop_flush.set()
361
+ self._flush_to_endpoint()
362
+
363
+
364
+ # Global telemetry instance
365
+ _telemetry: Optional[TelemetryCollector] = None
366
+
367
+
368
+ def get_telemetry() -> TelemetryCollector:
369
+ """Get or create the global telemetry instance."""
370
+ global _telemetry
371
+ if _telemetry is None:
372
+ _telemetry = TelemetryCollector()
373
+ return _telemetry