""" Usage statistics module for tracking API calls per credential file. Uses the simpler logic: compare current time with next_reset_time. """ import os import time from datetime import datetime, timezone, timedelta from threading import Lock from typing import Dict, Any, Optional from config import get_credentials_dir, is_mongodb_mode from log import log from .state_manager import get_state_manager from .storage_adapter import get_storage_adapter def _get_next_utc_7am() -> datetime: """ Calculate the next UTC 07:00 time for quota reset. """ now = datetime.now(timezone.utc) today_7am = now.replace(hour=7, minute=0, second=0, microsecond=0) if now < today_7am: return today_7am else: return today_7am + timedelta(days=1) class UsageStats: """ Simplified usage statistics manager with clear reset logic. """ def __init__(self): self._lock = Lock() # 状态文件路径将在初始化时异步设置 self._state_file = None self._state_manager = None self._storage_adapter = None self._stats_cache: Dict[str, Dict[str, Any]] = {} self._initialized = False self._cache_dirty = False # 缓存脏标记,减少不必要的写入 self._last_save_time = 0 self._save_interval = 60 # 最多每分钟保存一次,减少I/O self._max_cache_size = 100 # 严格限制缓存大小 async def initialize(self): """Initialize the usage stats module.""" if self._initialized: return # 初始化存储适配器 self._storage_adapter = await get_storage_adapter() # 只在文件模式下创建本地状态文件 if not await is_mongodb_mode(): credentials_dir = await get_credentials_dir() self._state_file = os.path.join(credentials_dir, "creds_state.toml") self._state_manager = get_state_manager(self._state_file) await self._load_stats() self._initialized = True storage_type = "MongoDB" if await is_mongodb_mode() else "File" log.debug(f"Usage statistics module initialized with {storage_type} storage backend") def _normalize_filename(self, filename: str) -> str: """Normalize filename to relative path for consistent storage.""" if not filename: return "" if os.path.sep not in filename and "/" not in filename: return filename return os.path.basename(filename) def _is_gemini_2_5_pro(self, model_name: str) -> bool: """ Check if model is gemini-2.5-pro variant (including prefixes and suffixes). """ if not model_name: return False try: from config import get_base_model_name, get_base_model_from_feature_model # Remove feature prefixes (流式抗截断/, 假流式/) base_with_suffix = get_base_model_from_feature_model(model_name) # Remove thinking/search suffixes (-maxthinking, -nothinking, -search) pure_base_model = get_base_model_name(base_with_suffix) # Check if the pure base model is exactly "gemini-2.5-pro" return pure_base_model == "gemini-2.5-pro" except ImportError: # Fallback logic if config import fails clean_model = model_name for prefix in ["流式抗截断/", "假流式/"]: if clean_model.startswith(prefix): clean_model = clean_model[len(prefix):] break for suffix in ["-maxthinking", "-nothinking", "-search"]: if clean_model.endswith(suffix): clean_model = clean_model[:-len(suffix)] break return clean_model == "gemini-2.5-pro" async def _load_stats(self): """Load statistics from unified storage""" try: # 从统一存储获取所有使用统计,添加超时机制防止卡死 import asyncio async def load_stats_with_timeout(): all_usage_stats = await self._storage_adapter.get_all_usage_stats() log.debug(f"Processing {len(all_usage_stats)} usage statistics items...") # 直接处理统计数据 stats_cache = {} processed_count = 0 for filename, stats_data in all_usage_stats.items(): if isinstance(stats_data, dict): normalized_filename = self._normalize_filename(filename) # 提取使用统计字段 usage_data = { "gemini_2_5_pro_calls": stats_data.get("gemini_2_5_pro_calls", 0), "total_calls": stats_data.get("total_calls", 0), "next_reset_time": stats_data.get("next_reset_time"), "daily_limit_gemini_2_5_pro": stats_data.get("daily_limit_gemini_2_5_pro", 100), "daily_limit_total": stats_data.get("daily_limit_total", 1000) } # 只加载有实际使用数据的统计,或者有reset时间的 if (usage_data.get("gemini_2_5_pro_calls", 0) > 0 or usage_data.get("total_calls", 0) > 0 or usage_data.get("next_reset_time")): stats_cache[normalized_filename] = usage_data processed_count += 1 return stats_cache, processed_count # 设置15秒超时防止卡死 try: self._stats_cache, processed_count = await asyncio.wait_for( load_stats_with_timeout(), timeout=15.0 ) log.debug(f"Loaded usage statistics for {processed_count} credential files") except asyncio.TimeoutError: log.error("Loading usage statistics timed out after 30 seconds, using empty cache") self._stats_cache = {} return except Exception as e: log.error(f"Failed to load usage statistics: {e}") self._stats_cache = {} async def _save_stats(self): """Save statistics to unified storage.""" current_time = time.time() # 使用脏标记和时间间隔控制,减少不必要的写入 if not self._cache_dirty or (current_time - self._last_save_time < self._save_interval): return try: # 批量更新使用统计到存储适配器 log.debug(f"Saving {len(self._stats_cache)} usage statistics items...") saved_count = 0 for filename, stats in self._stats_cache.items(): try: stats_data = { "gemini_2_5_pro_calls": stats.get("gemini_2_5_pro_calls", 0), "total_calls": stats.get("total_calls", 0), "next_reset_time": stats.get("next_reset_time"), "daily_limit_gemini_2_5_pro": stats.get("daily_limit_gemini_2_5_pro", 100), "daily_limit_total": stats.get("daily_limit_total", 1000) } success = await self._storage_adapter.update_usage_stats(filename, stats_data) if success: saved_count += 1 except Exception as e: log.error(f"Failed to save stats for {filename}: {e}") continue self._cache_dirty = False # 清除脏标记 self._last_save_time = current_time log.debug(f"Successfully saved {saved_count}/{len(self._stats_cache)} usage statistics to unified storage") except Exception as e: log.error(f"Failed to save usage statistics: {e}") def _get_or_create_stats(self, filename: str) -> Dict[str, Any]: """Get or create statistics entry for a credential file.""" normalized_filename = self._normalize_filename(filename) if normalized_filename not in self._stats_cache: # 严格控制缓存大小 - 超过限制时删除最旧的条目 if len(self._stats_cache) >= self._max_cache_size: # 删除最旧的统计数据(基于next_reset_time或没有该字段的) oldest_key = min(self._stats_cache.keys(), key=lambda k: self._stats_cache[k].get('next_reset_time', '')) del self._stats_cache[oldest_key] self._cache_dirty = True log.debug(f"Removed oldest usage stats cache entry: {oldest_key}") next_reset = _get_next_utc_7am() self._stats_cache[normalized_filename] = { "gemini_2_5_pro_calls": 0, "total_calls": 0, "next_reset_time": next_reset.isoformat(), "daily_limit_gemini_2_5_pro": 100, "daily_limit_total": 1000 } self._cache_dirty = True # 标记缓存已修改 return self._stats_cache[normalized_filename] def _check_and_reset_daily_quota(self, stats: Dict[str, Any]) -> bool: """ Simple reset logic: if current time >= next_reset_time, then reset. """ try: next_reset_str = stats.get("next_reset_time") if not next_reset_str: # No next reset time recorded, set it up next_reset = _get_next_utc_7am() stats["next_reset_time"] = next_reset.isoformat() return False next_reset = datetime.fromisoformat(next_reset_str) now = datetime.now(timezone.utc) # Simple comparison: if current time >= next reset time, then reset if now >= next_reset: old_gemini_calls = stats.get("gemini_2_5_pro_calls", 0) old_total_calls = stats.get("total_calls", 0) # Reset counters and set new next reset time new_next_reset = _get_next_utc_7am() stats.update({ "gemini_2_5_pro_calls": 0, "total_calls": 0, "next_reset_time": new_next_reset.isoformat() }) self._cache_dirty = True # 标记缓存已修改 log.info(f"Daily quota reset performed. Previous stats - Gemini 2.5 Pro: {old_gemini_calls}, Total: {old_total_calls}") return True return False except Exception as e: log.error(f"Error in daily quota reset check: {e}") return False async def record_successful_call(self, filename: str, model_name: str): """Record a successful API call for statistics.""" if not self._initialized: await self.initialize() with self._lock: try: normalized_filename = self._normalize_filename(filename) stats = self._get_or_create_stats(normalized_filename) # Check and perform daily reset if needed reset_performed = self._check_and_reset_daily_quota(stats) # Increment counters is_gemini_2_5_pro = self._is_gemini_2_5_pro(model_name) stats["total_calls"] += 1 if is_gemini_2_5_pro: stats["gemini_2_5_pro_calls"] += 1 self._cache_dirty = True # 标记缓存已修改 log.debug(f"Usage recorded - File: {normalized_filename}, Model: {model_name}, " f"Gemini 2.5 Pro: {stats['gemini_2_5_pro_calls']}/{stats.get('daily_limit_gemini_2_5_pro', 100)}, " f"Total: {stats['total_calls']}/{stats.get('daily_limit_total', 1000)}") if reset_performed: log.info(f"Daily quota was reset for {normalized_filename}") except Exception as e: log.error(f"Failed to record usage statistics: {e}") # Save stats asynchronously try: await self._save_stats() except Exception as e: log.error(f"Failed to save usage statistics after recording: {e}") async def get_usage_stats(self, filename: str = None) -> Dict[str, Any]: """Get usage statistics.""" if not self._initialized: await self.initialize() with self._lock: if filename: normalized_filename = self._normalize_filename(filename) stats = self._get_or_create_stats(normalized_filename) # Check for daily reset before returning stats self._check_and_reset_daily_quota(stats) return { "filename": normalized_filename, "gemini_2_5_pro_calls": stats.get("gemini_2_5_pro_calls", 0), "total_calls": stats.get("total_calls", 0), "daily_limit_gemini_2_5_pro": stats.get("daily_limit_gemini_2_5_pro", 100), "daily_limit_total": stats.get("daily_limit_total", 1000), "next_reset_time": stats.get("next_reset_time") } else: # Return all statistics all_stats = {} for filename, stats in self._stats_cache.items(): # Check for daily reset for each file self._check_and_reset_daily_quota(stats) all_stats[filename] = { "gemini_2_5_pro_calls": stats.get("gemini_2_5_pro_calls", 0), "total_calls": stats.get("total_calls", 0), "daily_limit_gemini_2_5_pro": stats.get("daily_limit_gemini_2_5_pro", 100), "daily_limit_total": stats.get("daily_limit_total", 1000), "next_reset_time": stats.get("next_reset_time") } return all_stats async def get_aggregated_stats(self) -> Dict[str, Any]: """Get aggregated statistics across all credential files.""" if not self._initialized: await self.initialize() all_stats = await self.get_usage_stats() total_gemini_2_5_pro = 0 total_all_models = 0 total_files = len(all_stats) for stats in all_stats.values(): total_gemini_2_5_pro += stats["gemini_2_5_pro_calls"] total_all_models += stats["total_calls"] return { "total_files": total_files, "total_gemini_2_5_pro_calls": total_gemini_2_5_pro, "total_all_model_calls": total_all_models, "avg_gemini_2_5_pro_per_file": total_gemini_2_5_pro / max(total_files, 1), "avg_total_per_file": total_all_models / max(total_files, 1), "next_reset_time": _get_next_utc_7am().isoformat() } async def update_daily_limits(self, filename: str, gemini_2_5_pro_limit: int = None, total_limit: int = None): """Update daily limits for a specific credential file.""" if not self._initialized: await self.initialize() with self._lock: try: normalized_filename = self._normalize_filename(filename) stats = self._get_or_create_stats(normalized_filename) if gemini_2_5_pro_limit is not None: stats["daily_limit_gemini_2_5_pro"] = gemini_2_5_pro_limit if total_limit is not None: stats["daily_limit_total"] = total_limit log.info(f"Updated daily limits for {normalized_filename}: " f"Gemini 2.5 Pro = {stats.get('daily_limit_gemini_2_5_pro', 100)}, " f"Total = {stats.get('daily_limit_total', 1000)}") except Exception as e: log.error(f"Failed to update daily limits: {e}") raise await self._save_stats() async def reset_stats(self, filename: str = None): """Reset usage statistics.""" if not self._initialized: await self.initialize() with self._lock: if filename: normalized_filename = self._normalize_filename(filename) if normalized_filename in self._stats_cache: # Manual reset: reset counters and set new next reset time next_reset = _get_next_utc_7am() self._stats_cache[normalized_filename].update({ "gemini_2_5_pro_calls": 0, "total_calls": 0, "next_reset_time": next_reset.isoformat() }) log.info(f"Reset usage statistics for {normalized_filename}") else: # Reset all statistics next_reset = _get_next_utc_7am() for filename, stats in self._stats_cache.items(): stats.update({ "gemini_2_5_pro_calls": 0, "total_calls": 0, "next_reset_time": next_reset.isoformat() }) log.info("Reset usage statistics for all credential files") await self._save_stats() # Global instance _usage_stats_instance: Optional[UsageStats] = None async def get_usage_stats_instance() -> UsageStats: """Get the global usage statistics instance.""" global _usage_stats_instance if _usage_stats_instance is None: _usage_stats_instance = UsageStats() await _usage_stats_instance.initialize() return _usage_stats_instance async def record_successful_call(filename: str, model_name: str): """Convenience function to record a successful API call.""" stats = await get_usage_stats_instance() await stats.record_successful_call(filename, model_name) async def get_usage_stats(filename: str = None) -> Dict[str, Any]: """Convenience function to get usage statistics.""" stats = await get_usage_stats_instance() return await stats.get_usage_stats(filename) async def get_aggregated_stats() -> Dict[str, Any]: """Convenience function to get aggregated statistics.""" stats = await get_usage_stats_instance() return await stats.get_aggregated_stats()