caarleexx commited on
Commit
f300ad2
·
verified ·
1 Parent(s): ecb0b2d

Update llm/metrics.py

Browse files
Files changed (1) hide show
  1. llm/metrics.py +102 -263
llm/metrics.py CHANGED
@@ -1,280 +1,119 @@
1
- """Coleta e análise de métricas de uso de LLM."""
 
 
 
2
 
3
- from typing import Dict, List, Optional, Tuple
4
- from dataclasses import dataclass, field, asdict
5
- from datetime import datetime, timedelta
6
- from collections import defaultdict
7
- import json
8
  import logging
 
 
 
9
 
10
  logger = logging.getLogger(__name__)
11
 
12
-
13
- @dataclass
14
- class RequestMetric:
15
- """Métrica de uma requisição individual."""
16
- timestamp: datetime
17
- provider: str
18
- model: str
19
- tokens_input: int
20
- tokens_output: int
21
- cost_usd: float
22
- latency_ms: float
23
- success: bool
24
- error_msg: Optional[str] = None
25
- user_id: Optional[str] = None
26
- request_id: Optional[str] = None
27
-
28
-
29
- @dataclass
30
- class ProviderStats:
31
- """Estatísticas de um provedor."""
32
- provider: str
33
- total_requests: int = 0
34
- successful_requests: int = 0
35
- failed_requests: int = 0
36
- total_tokens: int = 0
37
- total_cost: float = 0.0
38
- average_latency_ms: float = 0.0
39
- min_latency_ms: float = float('inf')
40
- max_latency_ms: float = 0.0
41
- average_tokens: float = 0.0
42
-
43
-
44
- class MetricsCollector:
45
- """Coletor centralizado de métricas."""
46
 
47
- def __init__(self, window_size: int = 10000):
48
- """
49
- Inicializa o coletor.
 
 
 
 
 
 
50
 
51
- Args:
52
- window_size: Número máximo de métricas em memória
53
- """
54
- self.window_size = window_size
55
- self.metrics: List[RequestMetric] = []
56
- self.aggregates: Dict[str, Dict] = defaultdict(dict)
57
 
58
- def record_request(self,
59
- provider: str,
60
- model: str,
61
- tokens_input: int,
62
- tokens_output: int,
63
- cost_usd: float,
64
- latency_ms: float,
65
- success: bool,
66
- error_msg: Optional[str] = None,
67
- user_id: Optional[str] = None,
68
- request_id: Optional[str] = None):
69
- """Registra métrica de requisição."""
70
- metric = RequestMetric(
71
- timestamp=datetime.now(),
72
- provider=provider,
73
- model=model,
74
- tokens_input=tokens_input,
75
- tokens_output=tokens_output,
76
- cost_usd=cost_usd,
77
- latency_ms=latency_ms,
78
- success=success,
79
- error_msg=error_msg,
80
- user_id=user_id,
81
- request_id=request_id,
82
  )
83
-
84
- self.metrics.append(metric)
85
-
86
- # Manter janela de tamanho máximo
87
- if len(self.metrics) > self.window_size:
88
- self.metrics.pop(0)
89
-
90
- logger.debug(f"Métrica registrada: {provider}/{model}")
91
 
92
- def get_provider_stats(self, provider: Optional[str] = None) -> Dict[str, ProviderStats]:
93
- """
94
- Retorna estatísticas por provedor.
95
-
96
- Args:
97
- provider: Se especificado, retorna apenas para este provedor
98
-
99
- Returns:
100
- Dict de estatísticas
101
- """
102
- stats = defaultdict(lambda: ProviderStats(provider=""))
103
-
104
- for metric in self.metrics:
105
- if provider and metric.provider != provider:
106
- continue
107
-
108
- stat = stats[metric.provider]
109
- if not stat.provider:
110
- stat.provider = metric.provider
111
-
112
- stat.total_requests += 1
113
- if metric.success:
114
- stat.successful_requests += 1
115
- else:
116
- stat.failed_requests += 1
117
-
118
- stat.total_tokens += metric.tokens_input + metric.tokens_output
119
- stat.total_cost += metric.cost_usd
120
- stat.min_latency_ms = min(stat.min_latency_ms, metric.latency_ms)
121
- stat.max_latency_ms = max(stat.max_latency_ms, metric.latency_ms)
122
-
123
- # Calcular médias
124
- for stat in stats.values():
125
- if stat.total_requests > 0:
126
- stat.average_latency_ms = sum(
127
- m.latency_ms for m in self.metrics
128
- if m.provider == stat.provider
129
- ) / stat.total_requests
130
- stat.average_tokens = stat.total_tokens / stat.total_requests
131
-
132
- return dict(stats)
133
 
134
- def get_model_stats(self) -> Dict[str, Dict[str, any]]:
135
- """Retorna estatísticas por modelo."""
136
- model_stats = defaultdict(lambda: {
137
- 'requests': 0,
138
- 'success': 0,
139
- 'cost': 0.0,
140
- 'tokens': 0,
141
- 'latency': 0.0,
142
- })
143
 
144
- for metric in self.metrics:
145
- key = f"{metric.provider}/{metric.model}"
146
- stats = model_stats[key]
147
 
148
- stats['requests'] += 1
149
- if metric.success:
150
- stats['success'] += 1
151
- stats['cost'] += metric.cost_usd
152
- stats['tokens'] += metric.tokens_input + metric.tokens_output
153
- stats['latency'] += metric.latency_ms
154
-
155
- # Calcular médias
156
- for stats in model_stats.values():
157
- if stats['requests'] > 0:
158
- stats['success_rate'] = stats['success'] / stats['requests']
159
- stats['avg_cost'] = stats['cost'] / stats['requests']
160
- stats['avg_tokens'] = stats['tokens'] / stats['requests']
161
- stats['avg_latency'] = stats['latency'] / stats['requests']
162
-
163
- return dict(model_stats)
164
-
165
- def get_time_series(self,
166
- minutes: int = 60,
167
- interval_seconds: int = 60) -> Dict[str, List[Tuple[datetime, float]]]:
168
- """
169
- Retorna série temporal de custos.
170
-
171
- Args:
172
- minutes: Janela de tempo em minutos
173
- interval_seconds: Intervalo de agregação
174
-
175
- Returns:
176
- Dict com séries temporais por provedor
177
- """
178
- cutoff = datetime.now() - timedelta(minutes=minutes)
179
- recent_metrics = [m for m in self.metrics if m.timestamp >= cutoff]
180
-
181
- time_buckets = defaultdict(lambda: defaultdict(float))
182
-
183
- for metric in recent_metrics:
184
- bucket_time = (metric.timestamp.replace(second=0, microsecond=0) -
185
- timedelta(seconds=metric.timestamp.second % interval_seconds))
186
- time_buckets[metric.provider][bucket_time] += metric.cost_usd
187
-
188
- result = {}
189
- for provider, buckets in time_buckets.items():
190
- result[provider] = sorted(buckets.items())
191
-
192
- return result
193
-
194
- def get_summary(self) -> Dict[str, any]:
195
- """Retorna resumo geral de métricas."""
196
- if not self.metrics:
197
- return {
198
- 'total_requests': 0,
199
- 'total_cost': 0.0,
200
- 'total_tokens': 0,
201
  }
202
 
203
- successful = sum(1 for m in self.metrics if m.success)
204
-
205
- return {
206
- 'total_requests': len(self.metrics),
207
- 'successful_requests': successful,
208
- 'failed_requests': len(self.metrics) - successful,
209
- 'success_rate': successful / len(self.metrics),
210
- 'total_cost': sum(m.cost_usd for m in self.metrics),
211
- 'total_tokens': sum(m.tokens_input + m.tokens_output for m in self.metrics),
212
- 'average_cost': sum(m.cost_usd for m in self.metrics) / len(self.metrics),
213
- 'average_latency_ms': sum(m.latency_ms for m in self.metrics) / len(self.metrics),
214
- 'min_latency_ms': min(m.latency_ms for m in self.metrics),
215
- 'max_latency_ms': max(m.latency_ms for m in self.metrics),
216
- 'providers': list(set(m.provider for m in self.metrics)),
217
- 'models': list(set(m.model for m in self.metrics)),
218
- }
219
 
220
- def get_user_stats(self, user_id: str) -> Optional[Dict[str, any]]:
221
- """Retorna estatísticas para um usuário específico."""
222
- user_metrics = [m for m in self.metrics if m.user_id == user_id]
223
-
224
- if not user_metrics:
225
- return None
226
-
227
- return {
228
- 'user_id': user_id,
229
- 'total_requests': len(user_metrics),
230
- 'total_cost': sum(m.cost_usd for m in user_metrics),
231
- 'total_tokens': sum(m.tokens_input + m.tokens_output for m in user_metrics),
232
- 'providers_used': list(set(m.provider for m in user_metrics)),
233
- 'models_used': list(set(m.model for m in user_metrics)),
234
- }
235
-
236
- def get_cost_by_provider(self) -> Dict[str, float]:
237
- """Retorna custo total por provedor."""
238
- costs = defaultdict(float)
239
- for metric in self.metrics:
240
- costs[metric.provider] += metric.cost_usd
241
- return dict(costs)
242
 
243
- def export_metrics(self, filepath: str):
244
- """Exporta métricas para JSON."""
245
- try:
246
- data = {
247
- 'export_timestamp': datetime.now().isoformat(),
248
- 'metrics': [asdict(m) for m in self.metrics],
249
- 'summary': self.get_summary(),
250
- 'provider_stats': {
251
- k: asdict(v) for k, v in self.get_provider_stats().items()
252
- },
253
- 'model_stats': self.get_model_stats(),
254
- }
255
-
256
- with open(filepath, 'w') as f:
257
- json.dump(data, f, indent=2, default=str)
258
-
259
- logger.info(f"Métricas exportadas para {filepath}")
260
-
261
- except Exception as e:
262
- logger.error(f"Erro ao exportar métricas: {e}")
263
- raise
264
-
265
- def clear(self):
266
- """Limpa todas as métricas."""
267
- self.metrics.clear()
268
- logger.info("Métricas limpas")
269
-
270
-
271
- # Instância global
272
- _metrics_collector: Optional[MetricsCollector] = None
273
-
274
-
275
- def get_metrics_collector() -> MetricsCollector:
276
- """Retorna a instância global do coletor de métricas."""
277
- global _metrics_collector
278
- if _metrics_collector is None:
279
- _metrics_collector = MetricsCollector()
280
- return _metrics_collector
 
1
+ """
2
+ llm/metrics.py
3
+ Coleta e análise de métricas de uso de LLM
4
+ """
5
 
 
 
 
 
 
6
  import logging
7
+ from typing import Dict, Any
8
+ from datetime import datetime
9
+ from collections import defaultdict
10
 
11
  logger = logging.getLogger(__name__)
12
 
13
+ class LLMMetrics:
14
+ """
15
+ Coleta métricas de processamento LLM.
16
+
17
+ Rastreia:
18
+ - Taxa de sucesso/falha por provedor
19
+ - Tempo de processamento
20
+ - Tokens utilizados
21
+ - Custo estimado
22
+ """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
+ def __init__(self):
25
+ self.metricas = defaultdict(lambda: {
26
+ "tentativas": 0,
27
+ "sucessos": 0,
28
+ "falhas": 0,
29
+ "tempo_total_segundos": 0.0,
30
+ "tokens_total": 0,
31
+ "custo_total_usd": 0.0,
32
+ })
33
 
34
+ # Preços aproximados por provedor (USD por 1M tokens)
35
+ self.precos = {
36
+ "groq": 0.000100, # Mais barato
37
+ "openai": 0.015000, # GPT-4 input
38
+ "anthropic": 0.010000, # Claude
39
+ }
40
 
41
+ def registrar_sucesso(
42
+ self,
43
+ provider: str,
44
+ tempo_segundos: float,
45
+ tokens: int,
46
+ ) -> None:
47
+ """Registra processamento bem-sucedido"""
48
+ metrica = self.metricas[provider]
49
+ metrica["tentativas"] += 1
50
+ metrica["sucessos"] += 1
51
+ metrica["tempo_total_segundos"] += tempo_segundos
52
+ metrica["tokens_total"] += tokens
53
+
54
+ # Calcular custo
55
+ custo = (tokens / 1_000_000) * self.precos.get(provider, 0.0)
56
+ metrica["custo_total_usd"] += custo
57
+
58
+ logger.info(
59
+ f"Sucesso {provider}: {tempo_segundos:.2f}s, "
60
+ f"{tokens} tokens, ${custo:.6f}"
 
 
 
 
61
  )
 
 
 
 
 
 
 
 
62
 
63
+ def registrar_falha(self, provider: str) -> None:
64
+ """Registra processamento falhado"""
65
+ metrica = self.metricas[provider]
66
+ metrica["tentativas"] += 1
67
+ metrica["falhas"] += 1
68
+ logger.warning(f"Falha registrada para {provider}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
+ def obter_estatisticas(self) -> Dict[str, Dict[str, Any]]:
71
+ """Retorna estatísticas consolidadas por provedor"""
72
+ stats = {}
 
 
 
 
 
 
73
 
74
+ for provider, metrica in self.metricas.items():
75
+ tentativas = metrica["tentativas"]
76
+ sucessos = metrica["sucessos"]
77
 
78
+ stats[provider] = {
79
+ "tentativas": tentativas,
80
+ "sucessos": sucessos,
81
+ "falhas": metrica["falhas"],
82
+ "sucesso_rate": (sucessos / tentativas * 100) if tentativas > 0 else 0,
83
+ "tempo_medio_segundos": (
84
+ metrica["tempo_total_segundos"] / sucessos
85
+ if sucessos > 0 else 0
86
+ ),
87
+ "tokens_total": metrica["tokens_total"],
88
+ "custo_total_usd": round(metrica["custo_total_usd"], 6),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  }
90
 
91
+ return stats
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
+ def relatorio_consolidado(self) -> str:
94
+ """Gera relatório consolidado em texto"""
95
+ stats = self.obter_estatisticas()
96
+
97
+ relatorio = "\n" + "=" * 80 + "\n"
98
+ relatorio += "RELATÓRIO DE MÉTRICAS LLM\n"
99
+ relatorio += f"Data: {datetime.now().isoformat()}\n"
100
+ relatorio += "=" * 80 + "\n\n"
101
+
102
+ for provider, metrica in stats.items():
103
+ relatorio += f"PROVEDOR: {provider.upper()}\n"
104
+ relatorio += f" Tentativas: {metrica['tentativas']}\n"
105
+ relatorio += f" Sucessos: {metrica['sucessos']}\n"
106
+ relatorio += f" Falhas: {metrica['falhas']}\n"
107
+ relatorio += f" Taxa de sucesso: {metrica['sucesso_rate']:.1f}%\n"
108
+ relatorio += f" Tempo médio: {metrica['tempo_medio_segundos']:.2f}s\n"
109
+ relatorio += f" Tokens totais: {metrica['tokens_total']}\n"
110
+ relatorio += f" Custo total: ${metrica['custo_total_usd']:.6f}\n"
111
+ relatorio += "\n"
112
+
113
+ relatorio += "=" * 80 + "\n"
114
+ return relatorio
115
 
116
+ def resetar_metricas(self) -> None:
117
+ """Reseta todas as métricas"""
118
+ self.metricas.clear()
119
+ logger.info("Métricas foram resetadas")