Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| 虫群智能体系统 — 性能监控模块 | |
| 请求统计、模型延迟、成功率追踪、告警规则 | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import threading | |
| import time | |
| from collections import defaultdict, deque | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional | |
| logger = logging.getLogger(__name__) | |
| # 默认存储路径 | |
| DEFAULT_METRICS_PATH = "/home/admin/swarm/data/metrics.json" | |
| class RequestMetric: | |
| """单次请求指标""" | |
| request_id: str | |
| query: str | |
| model_id: str | |
| latency_ms: float | |
| confidence: float | |
| success: bool | |
| timestamp: float = field(default_factory=time.time) | |
| error: str = "" | |
| class PerformanceMonitor: | |
| """性能监控器 — 线程安全的请求指标收集和分析""" | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if self._initialized: | |
| return | |
| self._initialized = True | |
| # 指标存储(最近N条) | |
| self._max_history = 1000 | |
| self._metrics: deque = deque(maxlen=self._max_history) | |
| # 按模型汇总 | |
| self._model_stats: Dict[str, dict] = defaultdict(lambda: { | |
| "total": 0, "success": 0, "fail": 0, | |
| "total_latency_ms": 0.0, "total_confidence": 0.0, | |
| "min_latency_ms": float("inf"), "max_latency_ms": 0.0, | |
| }) | |
| # 时间窗口统计(最近1小时/24小时) | |
| self._hourly: Dict[str, deque] = defaultdict(lambda: deque(maxlen=3600)) | |
| self._daily: Dict[str, deque] = defaultdict(lambda: deque(maxlen=86400)) | |
| # 告警规则 | |
| self._alerts: List[dict] = [] | |
| self._alert_callbacks = [] | |
| # 持久化路径 | |
| self._metrics_path = DEFAULT_METRICS_PATH | |
| # 请求计数器 | |
| self._request_counter = 0 | |
| # ============================================================ | |
| # 记录 | |
| # ============================================================ | |
| def record(self, model_id: str, latency_ms: float, | |
| confidence: float, success: bool, | |
| query: str = "", error: str = "") -> str: | |
| """ | |
| 记录一次请求指标,返回request_id | |
| """ | |
| self._request_counter += 1 | |
| request_id = f"req_{self._request_counter:06d}" | |
| metric = RequestMetric( | |
| request_id=request_id, | |
| query=query[:100], # 只存前100字 | |
| model_id=model_id, | |
| latency_ms=latency_ms, | |
| confidence=confidence, | |
| success=success, | |
| error=error[:200], | |
| ) | |
| with self._lock: | |
| self._metrics.append(metric) | |
| # 更新模型汇总 | |
| stats = self._model_stats[model_id] | |
| stats["total"] += 1 | |
| if success: | |
| stats["success"] += 1 | |
| else: | |
| stats["fail"] += 1 | |
| stats["total_latency_ms"] += latency_ms | |
| stats["total_confidence"] += confidence | |
| stats["min_latency_ms"] = min(stats["min_latency_ms"], latency_ms) | |
| stats["max_latency_ms"] = max(stats["max_latency_ms"], latency_ms) | |
| # 时间窗口 | |
| now = time.time() | |
| self._hourly[model_id].append((now, latency_ms, success)) | |
| self._daily[model_id].append((now, latency_ms, success)) | |
| # 检查告警 | |
| self._check_alerts(model_id, latency_ms, success, confidence) | |
| return request_id | |
| # ============================================================ | |
| # 查询 | |
| # ============================================================ | |
| def get_model_summary(self, model_id: str) -> Optional[dict]: | |
| """获取单个模型的汇总统计""" | |
| with self._lock: | |
| stats = self._model_stats.get(model_id) | |
| if not stats or stats["total"] == 0: | |
| return None | |
| total = stats["total"] | |
| return { | |
| "model_id": model_id, | |
| "total_requests": total, | |
| "success_count": stats["success"], | |
| "fail_count": stats["fail"], | |
| "success_rate": stats["success"] / total, | |
| "avg_latency_ms": stats["total_latency_ms"] / total, | |
| "min_latency_ms": stats["min_latency_ms"] if stats["min_latency_ms"] != float("inf") else 0, | |
| "max_latency_ms": stats["max_latency_ms"], | |
| "avg_confidence": stats["total_confidence"] / total, | |
| } | |
| def get_all_summaries(self) -> Dict[str, dict]: | |
| """获取所有模型的汇总统计""" | |
| result = {} | |
| with self._lock: | |
| for model_id in self._model_stats: | |
| result[model_id] = self.get_model_summary(model_id) | |
| return result | |
| def get_recent_metrics(self, limit: int = 50) -> List[dict]: | |
| """获取最近N条请求指标""" | |
| with self._lock: | |
| metrics = list(self._metrics)[-limit:] | |
| return [ | |
| { | |
| "request_id": m.request_id, | |
| "model_id": m.model_id, | |
| "query": m.query, | |
| "latency_ms": round(m.latency_ms, 1), | |
| "confidence": round(m.confidence, 3), | |
| "success": m.success, | |
| "timestamp": datetime.fromtimestamp(m.timestamp).isoformat(), | |
| "error": m.error, | |
| } | |
| for m in reversed(metrics) | |
| ] | |
| def get_hourly_stats(self, model_id: str = "") -> dict: | |
| """获取最近1小时的统计""" | |
| cutoff = time.time() - 3600 | |
| return self._window_stats(model_id, cutoff) | |
| def get_daily_stats(self, model_id: str = "") -> dict: | |
| """获取最近24小时的统计""" | |
| cutoff = time.time() - 86400 | |
| return self._window_stats(model_id, cutoff) | |
| def _window_stats(self, model_id: str, cutoff: float) -> dict: | |
| """时间窗口统计""" | |
| with self._lock: | |
| if model_id: | |
| sources = {model_id: self._hourly.get(model_id, deque())} | |
| else: | |
| sources = self._hourly | |
| total = 0 | |
| success = 0 | |
| latencies = [] | |
| for mid, entries in sources.items(): | |
| for ts, lat, ok in entries: | |
| if ts >= cutoff: | |
| total += 1 | |
| if ok: | |
| success += 1 | |
| latencies.append(lat) | |
| if total == 0: | |
| return {"total": 0, "success_rate": 0, "avg_latency_ms": 0} | |
| return { | |
| "total": total, | |
| "success_rate": success / total, | |
| "avg_latency_ms": sum(latencies) / len(latencies), | |
| "p50_latency_ms": sorted(latencies)[len(latencies) // 2], | |
| "p99_latency_ms": sorted(latencies)[int(len(latencies) * 0.99)] if len(latencies) > 10 else max(latencies), | |
| } | |
| # ============================================================ | |
| # 告警 | |
| # ============================================================ | |
| def add_alert_rule(self, name: str, model_id: str = "", | |
| min_latency_ms: float = 0, | |
| max_success_rate: float = 1.0, | |
| min_confidence: float = 0.0): | |
| """添加告警规则""" | |
| self._alerts.append({ | |
| "name": name, | |
| "model_id": model_id, | |
| "min_latency_ms": min_latency_ms, | |
| "max_success_rate": max_success_rate, | |
| "min_confidence": min_confidence, | |
| }) | |
| def on_alert(self, callback): | |
| """注册告警回调""" | |
| self._alert_callbacks.append(callback) | |
| def _check_alerts(self, model_id: str, latency_ms: float, | |
| success: bool, confidence: float): | |
| """检查告警规则""" | |
| summary = self.get_model_summary(model_id) | |
| if not summary: | |
| return | |
| for rule in self._alerts: | |
| triggered = False | |
| reason = "" | |
| # 匹配模型 | |
| if rule["model_id"] and rule["model_id"] != model_id: | |
| continue | |
| if rule["min_latency_ms"] > 0 and latency_ms > rule["min_latency_ms"]: | |
| triggered = True | |
| reason = f"延迟过高: {latency_ms:.0f}ms > {rule['min_latency_ms']:.0f}ms" | |
| if summary["success_rate"] < rule["max_success_rate"]: | |
| triggered = True | |
| reason = f"成功率过低: {summary['success_rate']:.0%} < {rule['max_success_rate']:.0%}" | |
| if confidence < rule["min_confidence"]: | |
| triggered = True | |
| reason = f"置信度过低: {confidence:.2f} < {rule['min_confidence']:.2f}" | |
| if triggered: | |
| alert_msg = { | |
| "rule": rule["name"], | |
| "model_id": model_id, | |
| "reason": reason, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| logger.warning(f"性能告警: {alert_msg}") | |
| for cb in self._alert_callbacks: | |
| try: | |
| cb(alert_msg) | |
| except Exception: | |
| pass | |
| # ============================================================ | |
| # 持久化 | |
| # ============================================================ | |
| def save(self, path: str = ""): | |
| """保存指标到文件""" | |
| path = path or self._metrics_path | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| data = { | |
| "request_counter": self._request_counter, | |
| "model_stats": dict(self._model_stats), | |
| "saved_at": datetime.now().isoformat(), | |
| } | |
| with open(path, "w") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| def load(self, path: str = ""): | |
| """从文件加载指标""" | |
| path = path or self._metrics_path | |
| if not os.path.exists(path): | |
| return | |
| try: | |
| with open(path) as f: | |
| data = json.load(f) | |
| self._request_counter = data.get("request_counter", 0) | |
| for mid, stats in data.get("model_stats", {}).items(): | |
| self._model_stats[mid] = stats | |
| logger.info(f"性能指标已加载: {path}") | |
| except Exception as e: | |
| logger.warning(f"加载性能指标失败: {e}") | |
| # ============================================================ | |
| # 报告 | |
| # ============================================================ | |
| def get_report(self) -> dict: | |
| """生成完整性能报告""" | |
| summaries = self.get_all_summaries() | |
| hourly = self.get_hourly_stats() | |
| daily = self.get_daily_stats() | |
| return { | |
| "generated_at": datetime.now().isoformat(), | |
| "total_requests": self._request_counter, | |
| "models": summaries, | |
| "hourly": hourly, | |
| "daily": daily, | |
| "recent_errors": [ | |
| m for m in self.get_recent_metrics(20) | |
| if not m["success"] | |
| ], | |
| } | |
| # ============================================================ | |
| # 便捷函数 | |
| # ============================================================ | |
| _monitor = None | |
| def get_monitor() -> PerformanceMonitor: | |
| """获取全局监控器实例""" | |
| global _monitor | |
| if _monitor is None: | |
| _monitor = PerformanceMonitor() | |
| # 注册默认告警规则 | |
| _monitor.add_alert_rule("高延迟", min_latency_ms=5000) | |
| _monitor.add_alert_rule("低成功率", max_success_rate=0.8) | |
| _monitor.add_alert_rule("低置信度", min_confidence=0.1) | |
| # 尝试加载历史指标 | |
| _monitor.load() | |
| return _monitor | |