File size: 2,170 Bytes
e8ed0e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""Shared state for metrics and transcriptions between Pipecat pipeline and Gradio UI."""

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from collections import deque
import threading
import time


@dataclass
class MetricEntry:
    """Single turn metrics entry."""
    turn_number: int
    timestamp: int
    stt_ttfb_ms: Optional[float] = None
    memory_latency_ms: Optional[float] = None
    llm_ttfb_ms: Optional[float] = None
    tts_ttfb_ms: Optional[float] = None
    vision_latency_ms: Optional[float] = None
    total_ms: Optional[float] = None


@dataclass
class MetricsStore:
    """Thread-safe storage for pipeline metrics and transcriptions."""
    metrics: deque = field(default_factory=lambda: deque(maxlen=100))
    service_info: Optional[Dict] = None
    transcriptions: deque = field(default_factory=lambda: deque(maxlen=50))
    lock: threading.Lock = field(default_factory=threading.Lock)

    def add_metric(self, metric: dict):
        """Add a new metric entry."""
        with self.lock:
            self.metrics.append(MetricEntry(**metric))

    def get_metrics(self) -> List[MetricEntry]:
        """Get all stored metrics."""
        with self.lock:
            return list(self.metrics)

    def set_service_info(self, info: dict):
        """Store service configuration info."""
        with self.lock:
            self.service_info = info

    def get_service_info(self) -> Optional[Dict]:
        """Get service configuration info."""
        with self.lock:
            return self.service_info

    def add_transcription(self, role: str, text: str):
        """Add a transcription entry."""
        with self.lock:
            self.transcriptions.append({
                "role": role,
                "text": text,
                "time": time.time()
            })

    def get_transcriptions(self) -> List[dict]:
        """Get all transcriptions."""
        with self.lock:
            return list(self.transcriptions)

    def clear_metrics(self):
        """Clear all stored metrics."""
        with self.lock:
            self.metrics.clear()


# Global instance
metrics_store = MetricsStore()