File size: 12,033 Bytes
17fba62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#!/usr/bin/env python3
"""
虫群智能体系统 — 性能监控模块
请求统计、模型延迟、成功率追踪、告警规则
"""

import json
import logging
import os
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional

logger = logging.getLogger(__name__)

# 默认存储路径
DEFAULT_METRICS_PATH = "/home/admin/swarm/data/metrics.json"


@dataclass
class RequestMetric:
    """单次请求指标"""
    request_id: str
    query: str
    model_id: str
    latency_ms: float
    confidence: float
    success: bool
    timestamp: float = field(default_factory=time.time)
    error: str = ""


class PerformanceMonitor:
    """性能监控器 — 线程安全的请求指标收集和分析"""

    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._initialized = False
            return cls._instance

    def __init__(self):
        if self._initialized:
            return
        self._initialized = True

        # 指标存储(最近N条)
        self._max_history = 1000
        self._metrics: deque = deque(maxlen=self._max_history)

        # 按模型汇总
        self._model_stats: Dict[str, dict] = defaultdict(lambda: {
            "total": 0, "success": 0, "fail": 0,
            "total_latency_ms": 0.0, "total_confidence": 0.0,
            "min_latency_ms": float("inf"), "max_latency_ms": 0.0,
        })

        # 时间窗口统计(最近1小时/24小时)
        self._hourly: Dict[str, deque] = defaultdict(lambda: deque(maxlen=3600))
        self._daily: Dict[str, deque] = defaultdict(lambda: deque(maxlen=86400))

        # 告警规则
        self._alerts: List[dict] = []
        self._alert_callbacks = []

        # 持久化路径
        self._metrics_path = DEFAULT_METRICS_PATH

        # 请求计数器
        self._request_counter = 0

    # ============================================================
    # 记录
    # ============================================================

    def record(self, model_id: str, latency_ms: float,
               confidence: float, success: bool,
               query: str = "", error: str = "") -> str:
        """
        记录一次请求指标,返回request_id
        """
        self._request_counter += 1
        request_id = f"req_{self._request_counter:06d}"

        metric = RequestMetric(
            request_id=request_id,
            query=query[:100],  # 只存前100字
            model_id=model_id,
            latency_ms=latency_ms,
            confidence=confidence,
            success=success,
            error=error[:200],
        )

        with self._lock:
            self._metrics.append(metric)

            # 更新模型汇总
            stats = self._model_stats[model_id]
            stats["total"] += 1
            if success:
                stats["success"] += 1
            else:
                stats["fail"] += 1
            stats["total_latency_ms"] += latency_ms
            stats["total_confidence"] += confidence
            stats["min_latency_ms"] = min(stats["min_latency_ms"], latency_ms)
            stats["max_latency_ms"] = max(stats["max_latency_ms"], latency_ms)

            # 时间窗口
            now = time.time()
            self._hourly[model_id].append((now, latency_ms, success))
            self._daily[model_id].append((now, latency_ms, success))

        # 检查告警
        self._check_alerts(model_id, latency_ms, success, confidence)

        return request_id

    # ============================================================
    # 查询
    # ============================================================

    def get_model_summary(self, model_id: str) -> Optional[dict]:
        """获取单个模型的汇总统计"""
        with self._lock:
            stats = self._model_stats.get(model_id)
            if not stats or stats["total"] == 0:
                return None
            total = stats["total"]
            return {
                "model_id": model_id,
                "total_requests": total,
                "success_count": stats["success"],
                "fail_count": stats["fail"],
                "success_rate": stats["success"] / total,
                "avg_latency_ms": stats["total_latency_ms"] / total,
                "min_latency_ms": stats["min_latency_ms"] if stats["min_latency_ms"] != float("inf") else 0,
                "max_latency_ms": stats["max_latency_ms"],
                "avg_confidence": stats["total_confidence"] / total,
            }

    def get_all_summaries(self) -> Dict[str, dict]:
        """获取所有模型的汇总统计"""
        result = {}
        with self._lock:
            for model_id in self._model_stats:
                result[model_id] = self.get_model_summary(model_id)
        return result

    def get_recent_metrics(self, limit: int = 50) -> List[dict]:
        """获取最近N条请求指标"""
        with self._lock:
            metrics = list(self._metrics)[-limit:]
        return [
            {
                "request_id": m.request_id,
                "model_id": m.model_id,
                "query": m.query,
                "latency_ms": round(m.latency_ms, 1),
                "confidence": round(m.confidence, 3),
                "success": m.success,
                "timestamp": datetime.fromtimestamp(m.timestamp).isoformat(),
                "error": m.error,
            }
            for m in reversed(metrics)
        ]

    def get_hourly_stats(self, model_id: str = "") -> dict:
        """获取最近1小时的统计"""
        cutoff = time.time() - 3600
        return self._window_stats(model_id, cutoff)

    def get_daily_stats(self, model_id: str = "") -> dict:
        """获取最近24小时的统计"""
        cutoff = time.time() - 86400
        return self._window_stats(model_id, cutoff)

    def _window_stats(self, model_id: str, cutoff: float) -> dict:
        """时间窗口统计"""
        with self._lock:
            if model_id:
                sources = {model_id: self._hourly.get(model_id, deque())}
            else:
                sources = self._hourly

            total = 0
            success = 0
            latencies = []

            for mid, entries in sources.items():
                for ts, lat, ok in entries:
                    if ts >= cutoff:
                        total += 1
                        if ok:
                            success += 1
                        latencies.append(lat)

        if total == 0:
            return {"total": 0, "success_rate": 0, "avg_latency_ms": 0}

        return {
            "total": total,
            "success_rate": success / total,
            "avg_latency_ms": sum(latencies) / len(latencies),
            "p50_latency_ms": sorted(latencies)[len(latencies) // 2],
            "p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if len(latencies) > 10 else max(latencies),
        }

    # ============================================================
    # 告警
    # ============================================================

    def add_alert_rule(self, name: str, model_id: str = "",
                       min_latency_ms: float = 0,
                       max_success_rate: float = 1.0,
                       min_confidence: float = 0.0):
        """添加告警规则"""
        self._alerts.append({
            "name": name,
            "model_id": model_id,
            "min_latency_ms": min_latency_ms,
            "max_success_rate": max_success_rate,
            "min_confidence": min_confidence,
        })

    def on_alert(self, callback):
        """注册告警回调"""
        self._alert_callbacks.append(callback)

    def _check_alerts(self, model_id: str, latency_ms: float,
                      success: bool, confidence: float):
        """检查告警规则"""
        summary = self.get_model_summary(model_id)
        if not summary:
            return

        for rule in self._alerts:
            triggered = False
            reason = ""

            # 匹配模型
            if rule["model_id"] and rule["model_id"] != model_id:
                continue

            if rule["min_latency_ms"] > 0 and latency_ms > rule["min_latency_ms"]:
                triggered = True
                reason = f"延迟过高: {latency_ms:.0f}ms > {rule['min_latency_ms']:.0f}ms"

            if summary["success_rate"] < rule["max_success_rate"]:
                triggered = True
                reason = f"成功率过低: {summary['success_rate']:.0%} < {rule['max_success_rate']:.0%}"

            if confidence < rule["min_confidence"]:
                triggered = True
                reason = f"置信度过低: {confidence:.2f} < {rule['min_confidence']:.2f}"

            if triggered:
                alert_msg = {
                    "rule": rule["name"],
                    "model_id": model_id,
                    "reason": reason,
                    "timestamp": datetime.now().isoformat(),
                }
                logger.warning(f"性能告警: {alert_msg}")
                for cb in self._alert_callbacks:
                    try:
                        cb(alert_msg)
                    except Exception:
                        pass

    # ============================================================
    # 持久化
    # ============================================================

    def save(self, path: str = ""):
        """保存指标到文件"""
        path = path or self._metrics_path
        os.makedirs(os.path.dirname(path), exist_ok=True)
        data = {
            "request_counter": self._request_counter,
            "model_stats": dict(self._model_stats),
            "saved_at": datetime.now().isoformat(),
        }
        with open(path, "w") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)

    def load(self, path: str = ""):
        """从文件加载指标"""
        path = path or self._metrics_path
        if not os.path.exists(path):
            return
        try:
            with open(path) as f:
                data = json.load(f)
            self._request_counter = data.get("request_counter", 0)
            for mid, stats in data.get("model_stats", {}).items():
                self._model_stats[mid] = stats
            logger.info(f"性能指标已加载: {path}")
        except Exception as e:
            logger.warning(f"加载性能指标失败: {e}")

    # ============================================================
    # 报告
    # ============================================================

    def get_report(self) -> dict:
        """生成完整性能报告"""
        summaries = self.get_all_summaries()
        hourly = self.get_hourly_stats()
        daily = self.get_daily_stats()
        return {
            "generated_at": datetime.now().isoformat(),
            "total_requests": self._request_counter,
            "models": summaries,
            "hourly": hourly,
            "daily": daily,
            "recent_errors": [
                m for m in self.get_recent_metrics(20)
                if not m["success"]
            ],
        }


# ============================================================
# 便捷函数
# ============================================================

_monitor = None

def get_monitor() -> PerformanceMonitor:
    """获取全局监控器实例"""
    global _monitor
    if _monitor is None:
        _monitor = PerformanceMonitor()
        # 注册默认告警规则
        _monitor.add_alert_rule("高延迟", min_latency_ms=5000)
        _monitor.add_alert_rule("低成功率", max_success_rate=0.8)
        _monitor.add_alert_rule("低置信度", min_confidence=0.1)
        # 尝试加载历史指标
        _monitor.load()
    return _monitor