File size: 2,841 Bytes
a1933cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""In-memory analytics for simulation activity."""

from __future__ import annotations

from threading import RLock
from typing import Any, Mapping

from core_engine.score_bounds import enforce_strict_score
from core_engine.schemas import VALID_CATEGORIES, EvaluationRecord


class AnalyticsTracker:
    """Track aggregate simulation metrics without storing unbounded records."""

    def __init__(self) -> None:
        self._lock = RLock()
        self._total_emails_processed = 0
        self._category_distribution = {category: 0 for category in sorted(VALID_CATEGORIES)}
        self._predicted_distribution = {category: 0 for category in sorted(VALID_CATEGORIES)}
        self._accuracy_sum = 0.0
        self._weighted_score_sum = 0.0

    def record_evaluation(self, record: EvaluationRecord | Mapping[str, Any]) -> None:
        """Update analytics counters for a processed email."""
        if isinstance(record, EvaluationRecord):
            expected_category = record.expected_category
            predicted_category = record.predicted_category
            category_correct = record.category_correct
            step_score = record.step_score
        else:
            expected_category = str(record.get("expected_category", "general"))
            predicted_category = str(record.get("predicted_category", "general"))
            category_correct = bool(record.get("category_correct", False))
            raw_step_score = float(record.get("step_score", 0.0))
            step_score = raw_step_score / 100 if raw_step_score > 1 else raw_step_score

        with self._lock:
            self._total_emails_processed += 1
            if expected_category in self._category_distribution:
                self._category_distribution[expected_category] += 1
            if predicted_category in self._predicted_distribution:
                self._predicted_distribution[predicted_category] += 1
            self._accuracy_sum += 1.0 if category_correct else 0.0
            self._weighted_score_sum += max(min(step_score, 1.0), 0.0)

    def snapshot(self) -> dict[str, Any]:
        """Return analytics as a JSON-ready dictionary."""
        with self._lock:
            total = self._total_emails_processed
            average_accuracy = self._accuracy_sum / total if total else 0.0
            average_weighted_score = self._weighted_score_sum / total if total else 0.0
            return {
                "total_emails_processed": total,
                "category_distribution": dict(self._category_distribution),
                "predicted_distribution": dict(self._predicted_distribution),
                "average_accuracy": enforce_strict_score(round(average_accuracy, 4)),
                "average_weighted_score": enforce_strict_score(
                    round(average_weighted_score, 4)
                ),
            }