tars-conversation-app / src /shared_state.py
latishab's picture
Update TARS Conversation App with TarsApp framework
e8ed0e1 verified
"""Shared state for metrics and transcriptions between Pipecat pipeline and Gradio UI."""
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from collections import deque
import threading
import time
@dataclass
class MetricEntry:
"""Single turn metrics entry."""
turn_number: int
timestamp: int
stt_ttfb_ms: Optional[float] = None
memory_latency_ms: Optional[float] = None
llm_ttfb_ms: Optional[float] = None
tts_ttfb_ms: Optional[float] = None
vision_latency_ms: Optional[float] = None
total_ms: Optional[float] = None
@dataclass
class MetricsStore:
"""Thread-safe storage for pipeline metrics and transcriptions."""
metrics: deque = field(default_factory=lambda: deque(maxlen=100))
service_info: Optional[Dict] = None
transcriptions: deque = field(default_factory=lambda: deque(maxlen=50))
lock: threading.Lock = field(default_factory=threading.Lock)
def add_metric(self, metric: dict):
"""Add a new metric entry."""
with self.lock:
self.metrics.append(MetricEntry(**metric))
def get_metrics(self) -> List[MetricEntry]:
"""Get all stored metrics."""
with self.lock:
return list(self.metrics)
def set_service_info(self, info: dict):
"""Store service configuration info."""
with self.lock:
self.service_info = info
def get_service_info(self) -> Optional[Dict]:
"""Get service configuration info."""
with self.lock:
return self.service_info
def add_transcription(self, role: str, text: str):
"""Add a transcription entry."""
with self.lock:
self.transcriptions.append({
"role": role,
"text": text,
"time": time.time()
})
def get_transcriptions(self) -> List[dict]:
"""Get all transcriptions."""
with self.lock:
return list(self.transcriptions)
def clear_metrics(self):
"""Clear all stored metrics."""
with self.lock:
self.metrics.clear()
# Global instance
metrics_store = MetricsStore()