SamarpeetGarad commited on
Commit
28be3d1
·
verified ·
1 Parent(s): 02315c0

Upload utils/metrics.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. utils/metrics.py +154 -0
utils/metrics.py ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ RadioFlow Metrics Tracking
3
+ Performance monitoring and analytics
4
+ """
5
+
6
+ from dataclasses import dataclass, field
7
+ from typing import Dict, List, Any, Optional
8
+ from datetime import datetime
9
+ import time
10
+ import json
11
+
12
+
13
+ @dataclass
14
+ class WorkflowMetrics:
15
+ """Metrics for a single workflow execution"""
16
+ workflow_id: str
17
+ start_time: str
18
+ end_time: Optional[str] = None
19
+ total_duration_ms: float = 0
20
+ agent_durations: Dict[str, float] = field(default_factory=dict)
21
+ status: str = "pending"
22
+ findings_count: int = 0
23
+ priority_score: float = 0
24
+ error_count: int = 0
25
+
26
+
27
+ class MetricsTracker:
28
+ """
29
+ Track and analyze RadioFlow performance metrics.
30
+ Useful for demo and competition presentation.
31
+ """
32
+
33
+ def __init__(self):
34
+ self.workflows: List[WorkflowMetrics] = []
35
+ self.current_workflow: Optional[WorkflowMetrics] = None
36
+ self._start_time: Optional[float] = None
37
+
38
+ def start_workflow(self, workflow_id: Optional[str] = None) -> str:
39
+ """Start tracking a new workflow."""
40
+ if workflow_id is None:
41
+ workflow_id = f"wf_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
42
+
43
+ self.current_workflow = WorkflowMetrics(
44
+ workflow_id=workflow_id,
45
+ start_time=datetime.now().isoformat()
46
+ )
47
+ self._start_time = time.time()
48
+
49
+ return workflow_id
50
+
51
+ def record_agent(self, agent_name: str, duration_ms: float, success: bool = True):
52
+ """Record an agent's execution."""
53
+ if self.current_workflow:
54
+ self.current_workflow.agent_durations[agent_name] = duration_ms
55
+ if not success:
56
+ self.current_workflow.error_count += 1
57
+
58
+ def end_workflow(
59
+ self,
60
+ findings_count: int = 0,
61
+ priority_score: float = 0,
62
+ status: str = "success"
63
+ ):
64
+ """Complete the current workflow tracking."""
65
+ if self.current_workflow and self._start_time:
66
+ self.current_workflow.end_time = datetime.now().isoformat()
67
+ self.current_workflow.total_duration_ms = (time.time() - self._start_time) * 1000
68
+ self.current_workflow.findings_count = findings_count
69
+ self.current_workflow.priority_score = priority_score
70
+ self.current_workflow.status = status
71
+
72
+ self.workflows.append(self.current_workflow)
73
+ self.current_workflow = None
74
+ self._start_time = None
75
+
76
+ def get_summary_stats(self) -> Dict[str, Any]:
77
+ """Get summary statistics across all workflows."""
78
+ if not self.workflows:
79
+ return {
80
+ "total_workflows": 0,
81
+ "avg_duration_ms": 0,
82
+ "success_rate": 0,
83
+ "avg_findings": 0,
84
+ "agent_avg_times": {}
85
+ }
86
+
87
+ total = len(self.workflows)
88
+ successful = sum(1 for w in self.workflows if w.status == "success")
89
+
90
+ # Calculate agent average times
91
+ agent_times: Dict[str, List[float]] = {}
92
+ for workflow in self.workflows:
93
+ for agent, duration in workflow.agent_durations.items():
94
+ if agent not in agent_times:
95
+ agent_times[agent] = []
96
+ agent_times[agent].append(duration)
97
+
98
+ agent_avg = {
99
+ agent: sum(times) / len(times)
100
+ for agent, times in agent_times.items()
101
+ }
102
+
103
+ return {
104
+ "total_workflows": total,
105
+ "avg_duration_ms": sum(w.total_duration_ms for w in self.workflows) / total,
106
+ "success_rate": successful / total * 100,
107
+ "avg_findings": sum(w.findings_count for w in self.workflows) / total,
108
+ "avg_priority": sum(w.priority_score for w in self.workflows) / total,
109
+ "agent_avg_times": agent_avg
110
+ }
111
+
112
+ def get_latest_workflow(self) -> Optional[WorkflowMetrics]:
113
+ """Get the most recent completed workflow."""
114
+ return self.workflows[-1] if self.workflows else None
115
+
116
+ def export_metrics(self) -> str:
117
+ """Export all metrics as JSON."""
118
+ data = {
119
+ "summary": self.get_summary_stats(),
120
+ "workflows": [
121
+ {
122
+ "workflow_id": w.workflow_id,
123
+ "start_time": w.start_time,
124
+ "end_time": w.end_time,
125
+ "total_duration_ms": w.total_duration_ms,
126
+ "agent_durations": w.agent_durations,
127
+ "status": w.status,
128
+ "findings_count": w.findings_count,
129
+ "priority_score": w.priority_score
130
+ }
131
+ for w in self.workflows
132
+ ]
133
+ }
134
+ return json.dumps(data, indent=2)
135
+
136
+ def format_for_display(self) -> str:
137
+ """Format metrics for UI display."""
138
+ stats = self.get_summary_stats()
139
+
140
+ lines = [
141
+ "📊 **RadioFlow Performance Metrics**",
142
+ "",
143
+ f"**Total Analyses:** {stats['total_workflows']}",
144
+ f"**Success Rate:** {stats['success_rate']:.1f}%",
145
+ f"**Avg Processing Time:** {stats['avg_duration_ms']:.0f}ms",
146
+ f"**Avg Findings per Study:** {stats['avg_findings']:.1f}",
147
+ "",
148
+ "**Agent Performance:**"
149
+ ]
150
+
151
+ for agent, avg_time in stats.get('agent_avg_times', {}).items():
152
+ lines.append(f" • {agent}: {avg_time:.0f}ms avg")
153
+
154
+ return "\n".join(lines)