""" 统计数据库操作 - 使用 storage.py 的统一数据库连接 """ import time from datetime import datetime from typing import Dict, Tuple import asyncio from collections import defaultdict from core.storage import _get_sqlite_conn, _sqlite_lock class StatsDatabase: """统计数据库管理类 - 使用统一的 data.db""" async def insert_request_log( self, timestamp: float, model: str, ttfb_ms: int = None, total_ms: int = None, status: str = "success", status_code: int = None ): """插入请求记录""" def _insert(): conn = _get_sqlite_conn() with _sqlite_lock: conn.execute( """ INSERT INTO request_logs (timestamp, model, ttfb_ms, total_ms, status, status_code) VALUES (?, ?, ?, ?, ?, ?) """, (int(timestamp), model, ttfb_ms, total_ms, status, status_code) ) conn.commit() await asyncio.to_thread(_insert) async def get_stats_by_time_range(self, time_range: str = "24h") -> Dict: """按时间范围获取统计数据""" def _query(): now = time.time() if time_range == "24h": start_time = now - 24 * 3600 bucket_size = 3600 elif time_range == "7d": start_time = now - 7 * 24 * 3600 bucket_size = 6 * 3600 elif time_range == "30d": start_time = now - 30 * 24 * 3600 bucket_size = 24 * 3600 else: start_time = now - 24 * 3600 bucket_size = 3600 conn = _get_sqlite_conn() with _sqlite_lock: rows = conn.execute( """ SELECT timestamp, model, ttfb_ms, total_ms, status, status_code FROM request_logs WHERE timestamp >= ? ORDER BY timestamp """, (int(start_time),) ).fetchall() # 数据分桶 buckets = defaultdict(lambda: { "total": 0, "failed": 0, "rate_limited": 0, "models": defaultdict(int), "model_ttfb": defaultdict(list), "model_total": defaultdict(list) }) for row in rows: ts, model, ttfb, total, status, status_code = row bucket_key = int((ts - start_time) // bucket_size) bucket = buckets[bucket_key] bucket["total"] += 1 bucket["models"][model] += 1 if status != "success": bucket["failed"] += 1 if status_code == 429: bucket["rate_limited"] += 1 if status == "success" and ttfb is not None and total is not None: bucket["model_ttfb"][model].append(ttfb) bucket["model_total"][model].append(total) # 生成结果 num_buckets = int((now - start_time) // bucket_size) + 1 labels = [] total_requests = [] failed_requests = [] rate_limited_requests = [] # 先收集所有出现过的模型 all_models = set() for bucket in buckets.values(): all_models.update(bucket["models"].keys()) all_models.update(bucket["model_ttfb"].keys()) all_models.update(bucket["model_total"].keys()) # 初始化每个模型的数据列表 model_requests = {model: [] for model in all_models} model_ttfb_times = {model: [] for model in all_models} model_total_times = {model: [] for model in all_models} # 遍历每个时间桶 for i in range(num_buckets): bucket_time = start_time + i * bucket_size dt = datetime.fromtimestamp(bucket_time) if time_range == "24h": labels.append(dt.strftime("%H:00")) elif time_range == "7d": labels.append(dt.strftime("%m-%d %H:00")) else: labels.append(dt.strftime("%m-%d")) bucket = buckets[i] total_requests.append(bucket["total"]) failed_requests.append(bucket["failed"]) rate_limited_requests.append(bucket["rate_limited"]) # 为每个模型添加数据(存在则添加实际值,不存在则添加0) for model in all_models: # 请求数 model_requests[model].append(bucket["models"].get(model, 0)) # TTFB平均时间 if model in bucket["model_ttfb"] and bucket["model_ttfb"][model]: avg_ttfb = sum(bucket["model_ttfb"][model]) / len(bucket["model_ttfb"][model]) model_ttfb_times[model].append(avg_ttfb) else: model_ttfb_times[model].append(0) # 总响应平均时间 if model in bucket["model_total"] and bucket["model_total"][model]: avg_total = sum(bucket["model_total"][model]) / len(bucket["model_total"][model]) model_total_times[model].append(avg_total) else: model_total_times[model].append(0) # 数据已经是按时间顺序(旧→新),不需要反转 # ECharts 从左到右渲染,所以最旧的在左边,最新的在右边 return { "labels": labels, "total_requests": total_requests, "failed_requests": failed_requests, "rate_limited_requests": rate_limited_requests, "model_requests": dict(model_requests), "model_ttfb_times": dict(model_ttfb_times), "model_total_times": dict(model_total_times) } return await asyncio.to_thread(_query) async def get_total_counts(self) -> Tuple[int, int]: """获取总成功和失败次数""" def _query(): conn = _get_sqlite_conn() with _sqlite_lock: success = conn.execute( "SELECT COUNT(*) FROM request_logs WHERE status = 'success'" ).fetchone()[0] failed = conn.execute( "SELECT COUNT(*) FROM request_logs WHERE status != 'success'" ).fetchone()[0] return success, failed return await asyncio.to_thread(_query) async def cleanup_old_data(self, days: int = 30): """清理过期数据 - 默认保留30天""" def _cleanup(): cutoff_time = int(time.time() - days * 24 * 3600) conn = _get_sqlite_conn() with _sqlite_lock: cursor = conn.execute( "DELETE FROM request_logs WHERE timestamp < ?", (cutoff_time,) ) conn.commit() return cursor.rowcount return await asyncio.to_thread(_cleanup) # 全局实例 stats_db = StatsDatabase()