Spaces:
Paused
Paused
| import asyncio | |
| from datetime import datetime, timedelta | |
| from app.utils.logging import log | |
| import app.config.settings as settings | |
| from collections import defaultdict, Counter | |
| import time | |
| import threading | |
| import queue | |
| import functools | |
| class ApiStatsManager: | |
| """API调用统计管理器,优化性能的新实现""" | |
| def __init__(self, enable_background=True, batch_interval=1.0): | |
| # 使用Counter记录API密钥和模型的调用次数 | |
| self.api_key_counts = Counter() # 记录每个API密钥的调用次数 | |
| self.model_counts = Counter() # 记录每个模型的调用次数 | |
| self.api_model_counts = defaultdict(Counter) # 记录每个API密钥对每个模型的调用次数 | |
| # 记录token使用量 | |
| self.api_key_tokens = Counter() # 记录每个API密钥的token使用量 | |
| self.model_tokens = Counter() # 记录每个模型的token使用量 | |
| self.api_model_tokens = defaultdict(Counter) # 记录每个API密钥对每个模型的token使用量 | |
| # 用于时间序列分析的数据结构(最近24小时,按分钟分组) | |
| self.time_buckets = {} # 格式: {timestamp_minute: {"calls": count, "tokens": count}} | |
| # 保存与兼容格式相关的调用日志(最小化存储) | |
| self.recent_calls = [] # 仅保存最近的少量调用,用于前端展示 | |
| self.max_recent_calls = 100 # 最大保存的最近调用记录数 | |
| # 当前时间分钟桶的时间戳(分钟级别) | |
| self.current_minute = self._get_minute_timestamp(datetime.now()) | |
| # 清理间隔(小时) | |
| self.cleanup_interval = 1 | |
| self.last_cleanup = time.time() | |
| # 使用线程锁而不是asyncio锁 | |
| self._counters_lock = threading.Lock() | |
| self._time_series_lock = threading.Lock() | |
| self._recent_calls_lock = threading.Lock() | |
| # 后台处理相关 | |
| self.enable_background = enable_background | |
| self.batch_interval = batch_interval | |
| self._update_queue = queue.Queue() | |
| self._worker_thread = None | |
| self._stop_event = threading.Event() | |
| if enable_background: | |
| self._start_worker() | |
| def _start_worker(self): | |
| """启动后台工作线程""" | |
| if self._worker_thread is None or not self._worker_thread.is_alive(): | |
| self._stop_event.clear() | |
| self._worker_thread = threading.Thread( | |
| target=self._worker_loop, | |
| daemon=True | |
| ) | |
| self._worker_thread.start() | |
| def _worker_loop(self): | |
| """后台工作线程的主循环""" | |
| batch = [] | |
| last_process = time.time() | |
| while not self._stop_event.is_set(): | |
| try: | |
| # 非阻塞获取更新 | |
| try: | |
| update = self._update_queue.get_nowait() | |
| batch.append(update) | |
| except queue.Empty: | |
| pass | |
| # 处理批次或超时 | |
| current_time = time.time() | |
| if batch and (current_time - last_process >= self.batch_interval): | |
| self._process_batch(batch) | |
| batch = [] | |
| last_process = current_time | |
| # 短暂休眠以避免CPU占用过高 | |
| time.sleep(0.01) | |
| except Exception as e: | |
| log('error', f"后台处理线程错误: {str(e)}") | |
| time.sleep(1) # 发生错误时短暂休眠 | |
| def _process_batch(self, batch): | |
| """处理一批更新""" | |
| with self._counters_lock: | |
| for api_key, model, tokens in batch: | |
| self.api_key_counts[api_key] += 1 | |
| self.model_counts[model] += 1 | |
| self.api_model_counts[api_key][model] += 1 | |
| self.api_key_tokens[api_key] += tokens | |
| self.model_tokens[model] += tokens | |
| self.api_model_tokens[api_key][model] += tokens | |
| async def update_stats(self, api_key, model, tokens=0): | |
| """更新API调用统计""" | |
| if self.enable_background: | |
| # 将更新放入队列 | |
| self._update_queue.put((api_key, model, tokens)) | |
| else: | |
| # 同步更新 | |
| with self._counters_lock: | |
| self.api_key_counts[api_key] += 1 | |
| self.model_counts[model] += 1 | |
| self.api_model_counts[api_key][model] += 1 | |
| self.api_key_tokens[api_key] += tokens | |
| self.model_tokens[model] += tokens | |
| self.api_model_tokens[api_key][model] += tokens | |
| # 更新时间序列数据 | |
| now = datetime.now() | |
| minute_ts = self._get_minute_timestamp(now) | |
| with self._time_series_lock: | |
| if minute_ts not in self.time_buckets: | |
| self.time_buckets[minute_ts] = {"calls": 0, "tokens": 0} | |
| self.time_buckets[minute_ts]["calls"] += 1 | |
| self.time_buckets[minute_ts]["tokens"] += tokens | |
| self.current_minute = minute_ts | |
| # 更新最近调用记录 | |
| with self._recent_calls_lock: | |
| compact_call = { | |
| 'api_key': api_key, | |
| 'model': model, | |
| 'timestamp': now, | |
| 'tokens': tokens | |
| } | |
| self.recent_calls.append(compact_call) | |
| if len(self.recent_calls) > self.max_recent_calls: | |
| self.recent_calls.pop(0) | |
| # 记录日志 | |
| log_message = f"API调用已记录: 秘钥 '{api_key[:8]}', 模型 '{model}', 令牌: {tokens if tokens is not None else 0}" | |
| log('info', log_message) | |
| async def cleanup(self): | |
| """清理超过24小时的时间桶数据""" | |
| now = datetime.now() | |
| day_ago_ts = self._get_minute_timestamp(now - timedelta(days=1)) | |
| with self._time_series_lock: | |
| # 直接删除旧的时间桶 | |
| for ts in list(self.time_buckets.keys()): | |
| if ts < day_ago_ts: | |
| del self.time_buckets[ts] | |
| self.last_cleanup = time.time() | |
| async def maybe_cleanup(self, force=False): | |
| """根据需要清理旧数据""" | |
| now = time.time() | |
| if force or (now - self.last_cleanup > self.cleanup_interval * 3600): | |
| await self.cleanup() | |
| self.last_cleanup = now | |
| async def get_api_key_usage(self, api_key, model=None): | |
| """获取API密钥的使用统计""" | |
| with self._counters_lock: | |
| if model: | |
| return self.api_model_counts[api_key][model] | |
| else: | |
| return self.api_key_counts[api_key] | |
| def get_calls_last_24h(self): | |
| """获取过去24小时的总调用次数""" | |
| with self._counters_lock: | |
| return sum(self.api_key_counts.values()) | |
| def get_calls_last_hour(self, now=None): | |
| """获取过去一小时的总调用次数""" | |
| if now is None: | |
| now = datetime.now() | |
| hour_ago_ts = self._get_minute_timestamp(now - timedelta(hours=1)) | |
| with self._time_series_lock: | |
| return sum(data["calls"] for ts, data in self.time_buckets.items() | |
| if ts >= hour_ago_ts) | |
| def get_calls_last_minute(self, now=None): | |
| """获取过去一分钟的总调用次数""" | |
| if now is None: | |
| now = datetime.now() | |
| minute_ago_ts = self._get_minute_timestamp(now - timedelta(minutes=1)) | |
| with self._time_series_lock: | |
| return sum(data["calls"] for ts, data in self.time_buckets.items() | |
| if ts >= minute_ago_ts) | |
| def get_time_series_data(self, minutes=30, now=None): | |
| """获取过去N分钟的时间序列数据""" | |
| if now is None: | |
| now = datetime.now() | |
| calls_series = [] | |
| tokens_series = [] | |
| with self._time_series_lock: | |
| for i in range(minutes, -1, -1): | |
| minute_dt = now - timedelta(minutes=i) | |
| minute_ts = self._get_minute_timestamp(minute_dt) | |
| bucket = self.time_buckets.get(minute_ts, {"calls": 0, "tokens": 0}) | |
| calls_series.append({ | |
| 'time': minute_dt.strftime('%H:%M'), | |
| 'value': bucket["calls"] | |
| }) | |
| tokens_series.append({ | |
| 'time': minute_dt.strftime('%H:%M'), | |
| 'value': bucket["tokens"] | |
| }) | |
| return calls_series, tokens_series | |
| def get_api_key_stats(self, api_keys): | |
| """获取API密钥的详细统计信息""" | |
| stats = [] | |
| with self._counters_lock: | |
| for api_key in api_keys: | |
| api_key_id = api_key[:8] | |
| calls_24h = self.api_key_counts[api_key] | |
| total_tokens = self.api_key_tokens[api_key] | |
| model_stats = {} | |
| for model, count in self.api_model_counts[api_key].items(): | |
| tokens = self.api_model_tokens[api_key][model] | |
| model_stats[model] = { | |
| 'calls': count, | |
| 'tokens': tokens | |
| } | |
| usage_percent = (calls_24h / settings.API_KEY_DAILY_LIMIT) * 100 if settings.API_KEY_DAILY_LIMIT > 0 else 0 | |
| stats.append({ | |
| 'api_key': api_key_id, | |
| 'calls_24h': calls_24h, | |
| 'total_tokens': total_tokens, | |
| 'limit': settings.API_KEY_DAILY_LIMIT, | |
| 'usage_percent': round(usage_percent, 2), | |
| 'model_stats': model_stats | |
| }) | |
| stats.sort(key=lambda x: x['usage_percent'], reverse=True) | |
| return stats | |
| async def reset(self): | |
| """重置所有统计数据""" | |
| with self._counters_lock: | |
| self.api_key_counts.clear() | |
| self.model_counts.clear() | |
| self.api_model_counts.clear() | |
| self.api_key_tokens.clear() | |
| self.model_tokens.clear() | |
| self.api_model_tokens.clear() | |
| with self._time_series_lock: | |
| self.time_buckets.clear() | |
| with self._recent_calls_lock: | |
| self.recent_calls.clear() | |
| self.current_minute = self._get_minute_timestamp(datetime.now()) | |
| self.last_cleanup = time.time() | |
| def _get_minute_timestamp(self, dt): | |
| """将时间戳转换为分钟级别的时间戳(按分钟取整)""" | |
| return int(dt.timestamp() // 60 * 60) | |
| # 创建全局单例实例 | |
| api_stats_manager = ApiStatsManager() | |
| # 兼容现有代码的函数 | |
| def clean_expired_stats(api_call_stats): | |
| """清理过期统计数据的函数 (兼容旧接口)""" | |
| asyncio.create_task(api_stats_manager.cleanup()) | |
| async def update_api_call_stats(api_call_stats, endpoint=None, model=None, token=None): | |
| """更新API调用统计的函数 (兼容旧接口)""" | |
| if endpoint and model: | |
| await api_stats_manager.update_stats(endpoint, model, token if token is not None else 0) | |
| async def get_api_key_usage(api_call_stats, api_key, model=None): | |
| """获取API密钥的调用次数 (兼容旧接口)""" | |
| return await api_stats_manager.get_api_key_usage(api_key, model) |