"""In-memory analytics for simulation activity.""" from __future__ import annotations from threading import RLock from typing import Any, Mapping from core_engine.score_bounds import enforce_strict_score from core_engine.schemas import VALID_CATEGORIES, EvaluationRecord class AnalyticsTracker: """Track aggregate simulation metrics without storing unbounded records.""" def __init__(self) -> None: self._lock = RLock() self._total_emails_processed = 0 self._category_distribution = {category: 0 for category in sorted(VALID_CATEGORIES)} self._predicted_distribution = {category: 0 for category in sorted(VALID_CATEGORIES)} self._accuracy_sum = 0.0 self._weighted_score_sum = 0.0 def record_evaluation(self, record: EvaluationRecord | Mapping[str, Any]) -> None: """Update analytics counters for a processed email.""" if isinstance(record, EvaluationRecord): expected_category = record.expected_category predicted_category = record.predicted_category category_correct = record.category_correct step_score = record.step_score else: expected_category = str(record.get("expected_category", "general")) predicted_category = str(record.get("predicted_category", "general")) category_correct = bool(record.get("category_correct", False)) raw_step_score = float(record.get("step_score", 0.0)) step_score = raw_step_score / 100 if raw_step_score > 1 else raw_step_score with self._lock: self._total_emails_processed += 1 if expected_category in self._category_distribution: self._category_distribution[expected_category] += 1 if predicted_category in self._predicted_distribution: self._predicted_distribution[predicted_category] += 1 self._accuracy_sum += 1.0 if category_correct else 0.0 self._weighted_score_sum += max(min(step_score, 1.0), 0.0) def snapshot(self) -> dict[str, Any]: """Return analytics as a JSON-ready dictionary.""" with self._lock: total = self._total_emails_processed average_accuracy = self._accuracy_sum / total if total else 0.0 average_weighted_score = self._weighted_score_sum / total if total else 0.0 return { "total_emails_processed": total, "category_distribution": dict(self._category_distribution), "predicted_distribution": dict(self._predicted_distribution), "average_accuracy": enforce_strict_score(round(average_accuracy, 4)), "average_weighted_score": enforce_strict_score( round(average_weighted_score, 4) ), }