lijunke
deploy: clean start with hf metadata
18081cf
"""
统计数据库操作 - 使用 storage.py 的统一数据库连接
"""
import time
from datetime import datetime
from typing import Dict, Tuple
import asyncio
from collections import defaultdict
from core.storage import _get_sqlite_conn, _sqlite_lock
class StatsDatabase:
"""统计数据库管理类 - 使用统一的 data.db"""
async def insert_request_log(
self, timestamp: float, model: str, ttfb_ms: int = None,
total_ms: int = None, status: str = "success", status_code: int = None
):
"""插入请求记录"""
def _insert():
conn = _get_sqlite_conn()
with _sqlite_lock:
conn.execute(
"""
INSERT INTO request_logs
(timestamp, model, ttfb_ms, total_ms, status, status_code)
VALUES (?, ?, ?, ?, ?, ?)
""",
(int(timestamp), model, ttfb_ms, total_ms, status, status_code)
)
conn.commit()
await asyncio.to_thread(_insert)
async def get_stats_by_time_range(self, time_range: str = "24h") -> Dict:
"""按时间范围获取统计数据"""
def _query():
now = time.time()
if time_range == "24h":
start_time = now - 24 * 3600
bucket_size = 3600
elif time_range == "7d":
start_time = now - 7 * 24 * 3600
bucket_size = 6 * 3600
elif time_range == "30d":
start_time = now - 30 * 24 * 3600
bucket_size = 24 * 3600
else:
start_time = now - 24 * 3600
bucket_size = 3600
conn = _get_sqlite_conn()
with _sqlite_lock:
rows = conn.execute(
"""
SELECT timestamp, model, ttfb_ms, total_ms, status, status_code
FROM request_logs
WHERE timestamp >= ?
ORDER BY timestamp
""",
(int(start_time),)
).fetchall()
# 数据分桶
buckets = defaultdict(lambda: {
"total": 0, "failed": 0, "rate_limited": 0,
"models": defaultdict(int),
"model_ttfb": defaultdict(list),
"model_total": defaultdict(list)
})
for row in rows:
ts, model, ttfb, total, status, status_code = row
bucket_key = int((ts - start_time) // bucket_size)
bucket = buckets[bucket_key]
bucket["total"] += 1
bucket["models"][model] += 1
if status != "success":
bucket["failed"] += 1
if status_code == 429:
bucket["rate_limited"] += 1
if status == "success" and ttfb is not None and total is not None:
bucket["model_ttfb"][model].append(ttfb)
bucket["model_total"][model].append(total)
# 生成结果
num_buckets = int((now - start_time) // bucket_size) + 1
labels = []
total_requests = []
failed_requests = []
rate_limited_requests = []
# 先收集所有出现过的模型
all_models = set()
for bucket in buckets.values():
all_models.update(bucket["models"].keys())
all_models.update(bucket["model_ttfb"].keys())
all_models.update(bucket["model_total"].keys())
# 初始化每个模型的数据列表
model_requests = {model: [] for model in all_models}
model_ttfb_times = {model: [] for model in all_models}
model_total_times = {model: [] for model in all_models}
# 遍历每个时间桶
for i in range(num_buckets):
bucket_time = start_time + i * bucket_size
dt = datetime.fromtimestamp(bucket_time)
if time_range == "24h":
labels.append(dt.strftime("%H:00"))
elif time_range == "7d":
labels.append(dt.strftime("%m-%d %H:00"))
else:
labels.append(dt.strftime("%m-%d"))
bucket = buckets[i]
total_requests.append(bucket["total"])
failed_requests.append(bucket["failed"])
rate_limited_requests.append(bucket["rate_limited"])
# 为每个模型添加数据(存在则添加实际值,不存在则添加0)
for model in all_models:
# 请求数
model_requests[model].append(bucket["models"].get(model, 0))
# TTFB平均时间
if model in bucket["model_ttfb"] and bucket["model_ttfb"][model]:
avg_ttfb = sum(bucket["model_ttfb"][model]) / len(bucket["model_ttfb"][model])
model_ttfb_times[model].append(avg_ttfb)
else:
model_ttfb_times[model].append(0)
# 总响应平均时间
if model in bucket["model_total"] and bucket["model_total"][model]:
avg_total = sum(bucket["model_total"][model]) / len(bucket["model_total"][model])
model_total_times[model].append(avg_total)
else:
model_total_times[model].append(0)
# 数据已经是按时间顺序(旧→新),不需要反转
# ECharts 从左到右渲染,所以最旧的在左边,最新的在右边
return {
"labels": labels,
"total_requests": total_requests,
"failed_requests": failed_requests,
"rate_limited_requests": rate_limited_requests,
"model_requests": dict(model_requests),
"model_ttfb_times": dict(model_ttfb_times),
"model_total_times": dict(model_total_times)
}
return await asyncio.to_thread(_query)
async def get_total_counts(self) -> Tuple[int, int]:
"""获取总成功和失败次数"""
def _query():
conn = _get_sqlite_conn()
with _sqlite_lock:
success = conn.execute(
"SELECT COUNT(*) FROM request_logs WHERE status = 'success'"
).fetchone()[0]
failed = conn.execute(
"SELECT COUNT(*) FROM request_logs WHERE status != 'success'"
).fetchone()[0]
return success, failed
return await asyncio.to_thread(_query)
async def cleanup_old_data(self, days: int = 30):
"""清理过期数据 - 默认保留30天"""
def _cleanup():
cutoff_time = int(time.time() - days * 24 * 3600)
conn = _get_sqlite_conn()
with _sqlite_lock:
cursor = conn.execute(
"DELETE FROM request_logs WHERE timestamp < ?",
(cutoff_time,)
)
conn.commit()
return cursor.rowcount
return await asyncio.to_thread(_cleanup)
# 全局实例
stats_db = StatsDatabase()