radioflow / utils /metrics.py
SamarpeetGarad's picture
Upload utils/metrics.py with huggingface_hub
28be3d1 verified
"""
RadioFlow Metrics Tracking
Performance monitoring and analytics
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime
import time
import json
@dataclass
class WorkflowMetrics:
"""Metrics for a single workflow execution"""
workflow_id: str
start_time: str
end_time: Optional[str] = None
total_duration_ms: float = 0
agent_durations: Dict[str, float] = field(default_factory=dict)
status: str = "pending"
findings_count: int = 0
priority_score: float = 0
error_count: int = 0
class MetricsTracker:
"""
Track and analyze RadioFlow performance metrics.
Useful for demo and competition presentation.
"""
def __init__(self):
self.workflows: List[WorkflowMetrics] = []
self.current_workflow: Optional[WorkflowMetrics] = None
self._start_time: Optional[float] = None
def start_workflow(self, workflow_id: Optional[str] = None) -> str:
"""Start tracking a new workflow."""
if workflow_id is None:
workflow_id = f"wf_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.current_workflow = WorkflowMetrics(
workflow_id=workflow_id,
start_time=datetime.now().isoformat()
)
self._start_time = time.time()
return workflow_id
def record_agent(self, agent_name: str, duration_ms: float, success: bool = True):
"""Record an agent's execution."""
if self.current_workflow:
self.current_workflow.agent_durations[agent_name] = duration_ms
if not success:
self.current_workflow.error_count += 1
def end_workflow(
self,
findings_count: int = 0,
priority_score: float = 0,
status: str = "success"
):
"""Complete the current workflow tracking."""
if self.current_workflow and self._start_time:
self.current_workflow.end_time = datetime.now().isoformat()
self.current_workflow.total_duration_ms = (time.time() - self._start_time) * 1000
self.current_workflow.findings_count = findings_count
self.current_workflow.priority_score = priority_score
self.current_workflow.status = status
self.workflows.append(self.current_workflow)
self.current_workflow = None
self._start_time = None
def get_summary_stats(self) -> Dict[str, Any]:
"""Get summary statistics across all workflows."""
if not self.workflows:
return {
"total_workflows": 0,
"avg_duration_ms": 0,
"success_rate": 0,
"avg_findings": 0,
"agent_avg_times": {}
}
total = len(self.workflows)
successful = sum(1 for w in self.workflows if w.status == "success")
# Calculate agent average times
agent_times: Dict[str, List[float]] = {}
for workflow in self.workflows:
for agent, duration in workflow.agent_durations.items():
if agent not in agent_times:
agent_times[agent] = []
agent_times[agent].append(duration)
agent_avg = {
agent: sum(times) / len(times)
for agent, times in agent_times.items()
}
return {
"total_workflows": total,
"avg_duration_ms": sum(w.total_duration_ms for w in self.workflows) / total,
"success_rate": successful / total * 100,
"avg_findings": sum(w.findings_count for w in self.workflows) / total,
"avg_priority": sum(w.priority_score for w in self.workflows) / total,
"agent_avg_times": agent_avg
}
def get_latest_workflow(self) -> Optional[WorkflowMetrics]:
"""Get the most recent completed workflow."""
return self.workflows[-1] if self.workflows else None
def export_metrics(self) -> str:
"""Export all metrics as JSON."""
data = {
"summary": self.get_summary_stats(),
"workflows": [
{
"workflow_id": w.workflow_id,
"start_time": w.start_time,
"end_time": w.end_time,
"total_duration_ms": w.total_duration_ms,
"agent_durations": w.agent_durations,
"status": w.status,
"findings_count": w.findings_count,
"priority_score": w.priority_score
}
for w in self.workflows
]
}
return json.dumps(data, indent=2)
def format_for_display(self) -> str:
"""Format metrics for UI display."""
stats = self.get_summary_stats()
lines = [
"📊 **RadioFlow Performance Metrics**",
"",
f"**Total Analyses:** {stats['total_workflows']}",
f"**Success Rate:** {stats['success_rate']:.1f}%",
f"**Avg Processing Time:** {stats['avg_duration_ms']:.0f}ms",
f"**Avg Findings per Study:** {stats['avg_findings']:.1f}",
"",
"**Agent Performance:**"
]
for agent, avg_time in stats.get('agent_avg_times', {}).items():
lines.append(f" • {agent}: {avg_time:.0f}ms avg")
return "\n".join(lines)