| |
| """ |
| 全局跟踪模块。 |
| 定义用于在内存中跟踪 API 使用情况、速率限制、缓存统计、Key 分数等的全局变量和线程锁。 |
| 提供更新这些统计数据的辅助函数。 |
| 注意:这些数据是内存中的,应用重启后会丢失(除非有持久化机制)。 |
| """ |
| import logging |
| import threading |
| import time |
| from collections import defaultdict, Counter |
| from typing import Dict, Any, Optional, List |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| usage_data: Dict[str, Dict[str, Dict[str, Any]]] = defaultdict( |
| lambda: defaultdict(lambda: { |
| 'rpm_count': 0, |
| 'rpm_timestamp': 0.0, |
| 'rpd_count': 0, |
| 'tpd_input_count': 0, |
| 'tpm_input_count': 0, |
| 'tpm_input_timestamp': 0.0, |
| 'last_request_timestamp': 0.0 |
| }) |
| ) |
| |
| usage_lock = threading.Lock() |
|
|
|
|
| |
|
|
| |
| |
| key_scores_cache: Dict[str, Dict[str, float]] = defaultdict(lambda: defaultdict(float)) |
| |
| cache_lock = threading.Lock() |
| |
| |
| cache_last_updated: Dict[str, float] = defaultdict(float) |
|
|
| |
|
|
| |
| |
| daily_rpd_totals: Dict[str, int] = defaultdict(int) |
| |
| daily_totals_lock = threading.Lock() |
|
|
| |
|
|
| |
| |
| ip_daily_input_token_counts: Dict[str, Counter[str]] = defaultdict(Counter) |
| |
| ip_input_token_counts_lock = threading.Lock() |
|
|
| |
|
|
| |
| cache_hit_count: int = 0 |
| |
| cache_miss_count: int = 0 |
| |
| total_tokens_saved: int = 0 |
| |
| cache_tracking_lock = threading.Lock() |
|
|
| |
| |
| key_selection_total_attempts: int = 0 |
| key_selection_successful_selections: int = 0 |
| key_selection_failed_selections: int = 0 |
| key_selection_failure_reasons: Dict[str, int] = defaultdict(int) |
| |
|
|
| |
|
|
| def increment_cache_hit_count(): |
| """(线程安全) 增加缓存命中计数。""" |
| with cache_tracking_lock: |
| global cache_hit_count |
| cache_hit_count += 1 |
| logger.debug(f"缓存命中计数: {cache_hit_count}") |
|
|
| def increment_cache_miss_count(): |
| """(线程安全) 增加缓存未命中计数。""" |
| with cache_tracking_lock: |
| global cache_miss_count |
| cache_miss_count += 1 |
| logger.debug(f"缓存未命中计数: {cache_miss_count}") |
|
|
| def add_tokens_saved(tokens: int): |
| """(线程安全) 增加通过缓存节省的总 Token 数。""" |
| if tokens > 0: |
| with cache_tracking_lock: |
| global total_tokens_saved |
| total_tokens_saved += tokens |
| logger.debug(f"节省的总 token 数: {total_tokens_saved}") |
|
|
| def track_cache_hit(request_id: str, cache_id: str, tokens_saved: int): |
| """ |
| (线程安全) 记录一次缓存命中事件,并更新相关统计数据。 |
| |
| Args: |
| request_id (str): 相关的请求 ID。 |
| cache_id (str): 命中的缓存条目的 ID (Gemini Cache ID 或数据库 ID)。 |
| tokens_saved (int): 本次命中估算节省的 Token 数量。 |
| """ |
| with cache_tracking_lock: |
| global cache_hit_count, total_tokens_saved |
| cache_hit_count += 1 |
| total_tokens_saved += tokens_saved |
| |
| logger.info(f"缓存命中: Request ID: {request_id}, Cache ID: {cache_id}, 节省 Token: {tokens_saved}") |
|
|
| def track_cache_miss(request_id: str, content_hash: str): |
| """ |
| (线程安全) 记录一次缓存未命中事件,并更新相关统计数据。 |
| |
| Args: |
| request_id (str): 相关的请求 ID。 |
| content_hash (str): 未命中内容的哈希值。 |
| """ |
| with cache_tracking_lock: |
| global cache_miss_count |
| cache_miss_count += 1 |
| |
| logger.info(f"缓存未命中: Request ID: {request_id}, Content Hash: {content_hash[:10]}...") |
|
|
| |
| RPM_WINDOW_SECONDS = 60 |
| TPM_WINDOW_SECONDS = 60 |
| CACHE_REFRESH_INTERVAL_SECONDS = 300 |
|
|
| |
| def update_cache_timestamp(model_name: str): |
| """ |
| (线程安全) 更新指定模型的分数缓存的最后更新时间戳为当前时间。 |
| |
| Args: |
| model_name (str): 需要更新时间戳的模型名称。 |
| """ |
| global cache_last_updated |
| with cache_lock: |
| cache_last_updated[model_name] = time.time() |
|
|