gcli2api / src /usage_stats.py
lightspeed's picture
Upload 22 files
5868187 verified
"""
Usage statistics module for tracking API calls per credential file.
Uses the simpler logic: compare current time with next_reset_time.
"""
import os
import time
from datetime import datetime, timezone, timedelta
from threading import Lock
from typing import Dict, Any, Optional
from config import get_credentials_dir, is_mongodb_mode
from log import log
from .state_manager import get_state_manager
from .storage_adapter import get_storage_adapter
def _get_next_utc_7am() -> datetime:
"""
Calculate the next UTC 07:00 time for quota reset.
"""
now = datetime.now(timezone.utc)
today_7am = now.replace(hour=7, minute=0, second=0, microsecond=0)
if now < today_7am:
return today_7am
else:
return today_7am + timedelta(days=1)
class UsageStats:
"""
Simplified usage statistics manager with clear reset logic.
"""
def __init__(self):
self._lock = Lock()
# 状态文件路径将在初始化时异步设置
self._state_file = None
self._state_manager = None
self._storage_adapter = None
self._stats_cache: Dict[str, Dict[str, Any]] = {}
self._initialized = False
self._cache_dirty = False # 缓存脏标记,减少不必要的写入
self._last_save_time = 0
self._save_interval = 60 # 最多每分钟保存一次,减少I/O
self._max_cache_size = 100 # 严格限制缓存大小
async def initialize(self):
"""Initialize the usage stats module."""
if self._initialized:
return
# 初始化存储适配器
self._storage_adapter = await get_storage_adapter()
# 只在文件模式下创建本地状态文件
if not await is_mongodb_mode():
credentials_dir = await get_credentials_dir()
self._state_file = os.path.join(credentials_dir, "creds_state.toml")
self._state_manager = get_state_manager(self._state_file)
await self._load_stats()
self._initialized = True
storage_type = "MongoDB" if await is_mongodb_mode() else "File"
log.debug(f"Usage statistics module initialized with {storage_type} storage backend")
def _normalize_filename(self, filename: str) -> str:
"""Normalize filename to relative path for consistent storage."""
if not filename:
return ""
if os.path.sep not in filename and "/" not in filename:
return filename
return os.path.basename(filename)
def _is_gemini_2_5_pro(self, model_name: str) -> bool:
"""
Check if model is gemini-2.5-pro variant (including prefixes and suffixes).
"""
if not model_name:
return False
try:
from config import get_base_model_name, get_base_model_from_feature_model
# Remove feature prefixes (流式抗截断/, 假流式/)
base_with_suffix = get_base_model_from_feature_model(model_name)
# Remove thinking/search suffixes (-maxthinking, -nothinking, -search)
pure_base_model = get_base_model_name(base_with_suffix)
# Check if the pure base model is exactly "gemini-2.5-pro"
return pure_base_model == "gemini-2.5-pro"
except ImportError:
# Fallback logic if config import fails
clean_model = model_name
for prefix in ["流式抗截断/", "假流式/"]:
if clean_model.startswith(prefix):
clean_model = clean_model[len(prefix):]
break
for suffix in ["-maxthinking", "-nothinking", "-search"]:
if clean_model.endswith(suffix):
clean_model = clean_model[:-len(suffix)]
break
return clean_model == "gemini-2.5-pro"
async def _load_stats(self):
"""Load statistics from unified storage"""
try:
# 从统一存储获取所有使用统计,添加超时机制防止卡死
import asyncio
async def load_stats_with_timeout():
all_usage_stats = await self._storage_adapter.get_all_usage_stats()
log.debug(f"Processing {len(all_usage_stats)} usage statistics items...")
# 直接处理统计数据
stats_cache = {}
processed_count = 0
for filename, stats_data in all_usage_stats.items():
if isinstance(stats_data, dict):
normalized_filename = self._normalize_filename(filename)
# 提取使用统计字段
usage_data = {
"gemini_2_5_pro_calls": stats_data.get("gemini_2_5_pro_calls", 0),
"total_calls": stats_data.get("total_calls", 0),
"next_reset_time": stats_data.get("next_reset_time"),
"daily_limit_gemini_2_5_pro": stats_data.get("daily_limit_gemini_2_5_pro", 100),
"daily_limit_total": stats_data.get("daily_limit_total", 1000)
}
# 只加载有实际使用数据的统计,或者有reset时间的
if (usage_data.get("gemini_2_5_pro_calls", 0) > 0 or
usage_data.get("total_calls", 0) > 0 or
usage_data.get("next_reset_time")):
stats_cache[normalized_filename] = usage_data
processed_count += 1
return stats_cache, processed_count
# 设置15秒超时防止卡死
try:
self._stats_cache, processed_count = await asyncio.wait_for(
load_stats_with_timeout(), timeout=15.0
)
log.debug(f"Loaded usage statistics for {processed_count} credential files")
except asyncio.TimeoutError:
log.error("Loading usage statistics timed out after 30 seconds, using empty cache")
self._stats_cache = {}
return
except Exception as e:
log.error(f"Failed to load usage statistics: {e}")
self._stats_cache = {}
async def _save_stats(self):
"""Save statistics to unified storage."""
current_time = time.time()
# 使用脏标记和时间间隔控制,减少不必要的写入
if not self._cache_dirty or (current_time - self._last_save_time < self._save_interval):
return
try:
# 批量更新使用统计到存储适配器
log.debug(f"Saving {len(self._stats_cache)} usage statistics items...")
saved_count = 0
for filename, stats in self._stats_cache.items():
try:
stats_data = {
"gemini_2_5_pro_calls": stats.get("gemini_2_5_pro_calls", 0),
"total_calls": stats.get("total_calls", 0),
"next_reset_time": stats.get("next_reset_time"),
"daily_limit_gemini_2_5_pro": stats.get("daily_limit_gemini_2_5_pro", 100),
"daily_limit_total": stats.get("daily_limit_total", 1000)
}
success = await self._storage_adapter.update_usage_stats(filename, stats_data)
if success:
saved_count += 1
except Exception as e:
log.error(f"Failed to save stats for {filename}: {e}")
continue
self._cache_dirty = False # 清除脏标记
self._last_save_time = current_time
log.debug(f"Successfully saved {saved_count}/{len(self._stats_cache)} usage statistics to unified storage")
except Exception as e:
log.error(f"Failed to save usage statistics: {e}")
def _get_or_create_stats(self, filename: str) -> Dict[str, Any]:
"""Get or create statistics entry for a credential file."""
normalized_filename = self._normalize_filename(filename)
if normalized_filename not in self._stats_cache:
# 严格控制缓存大小 - 超过限制时删除最旧的条目
if len(self._stats_cache) >= self._max_cache_size:
# 删除最旧的统计数据(基于next_reset_time或没有该字段的)
oldest_key = min(self._stats_cache.keys(),
key=lambda k: self._stats_cache[k].get('next_reset_time', ''))
del self._stats_cache[oldest_key]
self._cache_dirty = True
log.debug(f"Removed oldest usage stats cache entry: {oldest_key}")
next_reset = _get_next_utc_7am()
self._stats_cache[normalized_filename] = {
"gemini_2_5_pro_calls": 0,
"total_calls": 0,
"next_reset_time": next_reset.isoformat(),
"daily_limit_gemini_2_5_pro": 100,
"daily_limit_total": 1000
}
self._cache_dirty = True # 标记缓存已修改
return self._stats_cache[normalized_filename]
def _check_and_reset_daily_quota(self, stats: Dict[str, Any]) -> bool:
"""
Simple reset logic: if current time >= next_reset_time, then reset.
"""
try:
next_reset_str = stats.get("next_reset_time")
if not next_reset_str:
# No next reset time recorded, set it up
next_reset = _get_next_utc_7am()
stats["next_reset_time"] = next_reset.isoformat()
return False
next_reset = datetime.fromisoformat(next_reset_str)
now = datetime.now(timezone.utc)
# Simple comparison: if current time >= next reset time, then reset
if now >= next_reset:
old_gemini_calls = stats.get("gemini_2_5_pro_calls", 0)
old_total_calls = stats.get("total_calls", 0)
# Reset counters and set new next reset time
new_next_reset = _get_next_utc_7am()
stats.update({
"gemini_2_5_pro_calls": 0,
"total_calls": 0,
"next_reset_time": new_next_reset.isoformat()
})
self._cache_dirty = True # 标记缓存已修改
log.info(f"Daily quota reset performed. Previous stats - Gemini 2.5 Pro: {old_gemini_calls}, Total: {old_total_calls}")
return True
return False
except Exception as e:
log.error(f"Error in daily quota reset check: {e}")
return False
async def record_successful_call(self, filename: str, model_name: str):
"""Record a successful API call for statistics."""
if not self._initialized:
await self.initialize()
with self._lock:
try:
normalized_filename = self._normalize_filename(filename)
stats = self._get_or_create_stats(normalized_filename)
# Check and perform daily reset if needed
reset_performed = self._check_and_reset_daily_quota(stats)
# Increment counters
is_gemini_2_5_pro = self._is_gemini_2_5_pro(model_name)
stats["total_calls"] += 1
if is_gemini_2_5_pro:
stats["gemini_2_5_pro_calls"] += 1
self._cache_dirty = True # 标记缓存已修改
log.debug(f"Usage recorded - File: {normalized_filename}, Model: {model_name}, "
f"Gemini 2.5 Pro: {stats['gemini_2_5_pro_calls']}/{stats.get('daily_limit_gemini_2_5_pro', 100)}, "
f"Total: {stats['total_calls']}/{stats.get('daily_limit_total', 1000)}")
if reset_performed:
log.info(f"Daily quota was reset for {normalized_filename}")
except Exception as e:
log.error(f"Failed to record usage statistics: {e}")
# Save stats asynchronously
try:
await self._save_stats()
except Exception as e:
log.error(f"Failed to save usage statistics after recording: {e}")
async def get_usage_stats(self, filename: str = None) -> Dict[str, Any]:
"""Get usage statistics."""
if not self._initialized:
await self.initialize()
with self._lock:
if filename:
normalized_filename = self._normalize_filename(filename)
stats = self._get_or_create_stats(normalized_filename)
# Check for daily reset before returning stats
self._check_and_reset_daily_quota(stats)
return {
"filename": normalized_filename,
"gemini_2_5_pro_calls": stats.get("gemini_2_5_pro_calls", 0),
"total_calls": stats.get("total_calls", 0),
"daily_limit_gemini_2_5_pro": stats.get("daily_limit_gemini_2_5_pro", 100),
"daily_limit_total": stats.get("daily_limit_total", 1000),
"next_reset_time": stats.get("next_reset_time")
}
else:
# Return all statistics
all_stats = {}
for filename, stats in self._stats_cache.items():
# Check for daily reset for each file
self._check_and_reset_daily_quota(stats)
all_stats[filename] = {
"gemini_2_5_pro_calls": stats.get("gemini_2_5_pro_calls", 0),
"total_calls": stats.get("total_calls", 0),
"daily_limit_gemini_2_5_pro": stats.get("daily_limit_gemini_2_5_pro", 100),
"daily_limit_total": stats.get("daily_limit_total", 1000),
"next_reset_time": stats.get("next_reset_time")
}
return all_stats
async def get_aggregated_stats(self) -> Dict[str, Any]:
"""Get aggregated statistics across all credential files."""
if not self._initialized:
await self.initialize()
all_stats = await self.get_usage_stats()
total_gemini_2_5_pro = 0
total_all_models = 0
total_files = len(all_stats)
for stats in all_stats.values():
total_gemini_2_5_pro += stats["gemini_2_5_pro_calls"]
total_all_models += stats["total_calls"]
return {
"total_files": total_files,
"total_gemini_2_5_pro_calls": total_gemini_2_5_pro,
"total_all_model_calls": total_all_models,
"avg_gemini_2_5_pro_per_file": total_gemini_2_5_pro / max(total_files, 1),
"avg_total_per_file": total_all_models / max(total_files, 1),
"next_reset_time": _get_next_utc_7am().isoformat()
}
async def update_daily_limits(self, filename: str, gemini_2_5_pro_limit: int = None,
total_limit: int = None):
"""Update daily limits for a specific credential file."""
if not self._initialized:
await self.initialize()
with self._lock:
try:
normalized_filename = self._normalize_filename(filename)
stats = self._get_or_create_stats(normalized_filename)
if gemini_2_5_pro_limit is not None:
stats["daily_limit_gemini_2_5_pro"] = gemini_2_5_pro_limit
if total_limit is not None:
stats["daily_limit_total"] = total_limit
log.info(f"Updated daily limits for {normalized_filename}: "
f"Gemini 2.5 Pro = {stats.get('daily_limit_gemini_2_5_pro', 100)}, "
f"Total = {stats.get('daily_limit_total', 1000)}")
except Exception as e:
log.error(f"Failed to update daily limits: {e}")
raise
await self._save_stats()
async def reset_stats(self, filename: str = None):
"""Reset usage statistics."""
if not self._initialized:
await self.initialize()
with self._lock:
if filename:
normalized_filename = self._normalize_filename(filename)
if normalized_filename in self._stats_cache:
# Manual reset: reset counters and set new next reset time
next_reset = _get_next_utc_7am()
self._stats_cache[normalized_filename].update({
"gemini_2_5_pro_calls": 0,
"total_calls": 0,
"next_reset_time": next_reset.isoformat()
})
log.info(f"Reset usage statistics for {normalized_filename}")
else:
# Reset all statistics
next_reset = _get_next_utc_7am()
for filename, stats in self._stats_cache.items():
stats.update({
"gemini_2_5_pro_calls": 0,
"total_calls": 0,
"next_reset_time": next_reset.isoformat()
})
log.info("Reset usage statistics for all credential files")
await self._save_stats()
# Global instance
_usage_stats_instance: Optional[UsageStats] = None
async def get_usage_stats_instance() -> UsageStats:
"""Get the global usage statistics instance."""
global _usage_stats_instance
if _usage_stats_instance is None:
_usage_stats_instance = UsageStats()
await _usage_stats_instance.initialize()
return _usage_stats_instance
async def record_successful_call(filename: str, model_name: str):
"""Convenience function to record a successful API call."""
stats = await get_usage_stats_instance()
await stats.record_successful_call(filename, model_name)
async def get_usage_stats(filename: str = None) -> Dict[str, Any]:
"""Convenience function to get usage statistics."""
stats = await get_usage_stats_instance()
return await stats.get_usage_stats(filename)
async def get_aggregated_stats() -> Dict[str, Any]:
"""Convenience function to get aggregated statistics."""
stats = await get_usage_stats_instance()
return await stats.get_aggregated_stats()