GAP / app /core /tracking.py
misonL's picture
Upload 52 files
e82bac2 verified
Raw
History Blame Contribute Delete
7.62 kB
# -*- coding: utf-8 -*-
"""
全局跟踪模块。
定义用于在内存中跟踪 API 使用情况、速率限制、缓存统计、Key 分数等的全局变量和线程锁。
提供更新这些统计数据的辅助函数。
注意:这些数据是内存中的,应用重启后会丢失(除非有持久化机制)。
"""
import logging # 导入日志模块
import threading # 导入线程模块,用于创建锁保护共享数据
import time # 导入时间模块,用于时间戳
from collections import defaultdict, Counter # 导入 defaultdict 和 Counter,方便数据统计
from typing import Dict, Any, Optional, List # 导入类型提示
logger = logging.getLogger(__name__) # 获取当前模块的 logger 实例
# --- 使用情况跟踪数据结构 ---
# `usage_data`: 存储每个 API Key 对每个模型的使用情况统计。
# 结构: {api_key: {model_name: {统计项: 值}}}
# 统计项包括:
# - 'rpm_count': 当前 RPM (每分钟请求数) 窗口内的请求计数。
# - 'rpm_timestamp': 当前 RPM 窗口的开始时间戳。
# - 'rpd_count': 当日 (太平洋时间) 的总请求计数。
# - 'tpd_input_count': 当日 (太平洋时间) 的总输入 Token 计数。
# - 'tpm_input_count': 当前 TPM (每分钟输入 Token 数) 窗口内的输入 Token 计数。
# - 'tpm_input_timestamp': 当前 TPM 输入窗口的开始时间戳。
# - 'last_request_timestamp': 此 Key-模型组合最后一次被请求的时间戳 (用于 Key 选择策略)。
usage_data: Dict[str, Dict[str, Dict[str, Any]]] = defaultdict(
lambda: defaultdict(lambda: { # 使用嵌套 defaultdict 简化初始化
'rpm_count': 0,
'rpm_timestamp': 0.0,
'rpd_count': 0,
'tpd_input_count': 0,
'tpm_input_count': 0,
'tpm_input_timestamp': 0.0,
'last_request_timestamp': 0.0
})
)
# `usage_lock`: 用于保护对 `usage_data` 并发访问的线程锁。
usage_lock = threading.Lock()
# --- Key 健康度评分缓存 ---
# `key_scores_cache`: 存储每个 API Key 对每个模型的健康度评分(或其他评分)。
# 结构: {model_name: {api_key: score}}
key_scores_cache: Dict[str, Dict[str, float]] = defaultdict(lambda: defaultdict(float))
# `cache_lock`: 用于保护对 `key_scores_cache` 和 `cache_last_updated` 并发访问的线程锁。
cache_lock = threading.Lock()
# `cache_last_updated`: 记录每个模型的分数缓存上次更新的时间戳。
# 结构: {model_name: timestamp}
cache_last_updated: Dict[str, float] = defaultdict(float)
# --- 每日 RPD 总量跟踪 ---
# `daily_rpd_totals`: 存储每天 (太平洋时间) 所有 Key 的总 RPD (每日请求数)。
# 结构: {'YYYY-MM-DD': total_rpd}
daily_rpd_totals: Dict[str, int] = defaultdict(int)
# `daily_totals_lock`: 用于保护对 `daily_rpd_totals` 并发访问的线程锁。
daily_totals_lock = threading.Lock()
# --- 每日 IP 输入 Token 消耗计数 ---
# `ip_daily_input_token_counts`: 存储每个 IP 地址每天 (太平洋时间) 的总输入 Token 消耗。
# 结构: {'YYYY-MM-DD': Counter({'ip_address': total_input_tokens})}
ip_daily_input_token_counts: Dict[str, Counter[str]] = defaultdict(Counter) # 使用 Counter 更方便计数
# `ip_input_token_counts_lock`: 用于保护对 `ip_daily_input_token_counts` 并发访问的线程锁。
ip_input_token_counts_lock = threading.Lock()
# --- 缓存使用情况跟踪 ---
# `cache_hit_count`: 记录原生缓存的总命中次数。
cache_hit_count: int = 0
# `cache_miss_count`: 记录原生缓存的总未命中次数。
cache_miss_count: int = 0
# `total_tokens_saved`: 记录通过缓存命估算节省的总 Token 数量。
total_tokens_saved: int = 0
# `cache_tracking_lock`: 用于保护以上缓存跟踪变量并发访问的线程锁。
cache_tracking_lock = threading.Lock()
# --- Key 筛选跟踪 ---
# 这些变量用于统计 Key 选择策略的执行情况和效果。
key_selection_total_attempts: int = 0 # Key 选择的总尝试次数
key_selection_successful_selections: int = 0 # 成功选定 Key 的次数
key_selection_failed_selections: int = 0 # 未能选定 Key 的次数
key_selection_failure_reasons: Dict[str, int] = defaultdict(int) # 按原因统计选择失败的次数
# key_selection_lock = threading.Lock() # 注意:原代码中定义了锁但未使用,如果需要并发更新这些统计量,应使用锁
# --- 缓存统计更新函数 ---
def increment_cache_hit_count():
"""(线程安全) 增加缓存命中计数。"""
with cache_tracking_lock: # 获取锁
global cache_hit_count # 声明修改全局变量
cache_hit_count += 1 # 计数加 1
logger.debug(f"缓存命中计数: {cache_hit_count}") # 记录调试日志
def increment_cache_miss_count():
"""(线程安全) 增加缓存未命中计数。"""
with cache_tracking_lock: # 获取锁
global cache_miss_count # 声明修改全局变量
cache_miss_count += 1 # 计数加 1
logger.debug(f"缓存未命中计数: {cache_miss_count}") # 记录调试日志
def add_tokens_saved(tokens: int):
"""(线程安全) 增加通过缓存节省的总 Token 数。"""
if tokens > 0: # 只有当节省的 Token 数大于 0 时才更新
with cache_tracking_lock: # 获取锁
global total_tokens_saved # 声明修改全局变量
total_tokens_saved += tokens # 累加 Token 数
logger.debug(f"节省的总 token 数: {total_tokens_saved}") # 记录调试日志
def track_cache_hit(request_id: str, cache_id: str, tokens_saved: int):
"""
(线程安全) 记录一次缓存命中事件,并更新相关统计数据。
Args:
request_id (str): 相关的请求 ID。
cache_id (str): 命中的缓存条目的 ID (Gemini Cache ID 或数据库 ID)。
tokens_saved (int): 本次命中估算节省的 Token 数量。
"""
with cache_tracking_lock: # 获取锁
global cache_hit_count, total_tokens_saved # 声明修改全局变量
cache_hit_count += 1 # 命中计数加 1
total_tokens_saved += tokens_saved # 累加节省的 Token 数
# 记录详细的命中日志
logger.info(f"缓存命中: Request ID: {request_id}, Cache ID: {cache_id}, 节省 Token: {tokens_saved}")
def track_cache_miss(request_id: str, content_hash: str):
"""
(线程安全) 记录一次缓存未命中事件,并更新相关统计数据。
Args:
request_id (str): 相关的请求 ID。
content_hash (str): 未命中内容的哈希值。
"""
with cache_tracking_lock: # 获取锁
global cache_miss_count # 声明修改全局变量
cache_miss_count += 1 # 未命中计数加 1
# 记录详细的未命中日志
logger.info(f"缓存未命中: Request ID: {request_id}, Content Hash: {content_hash[:10]}...") # 只记录哈希前缀
# --- 常量定义 ---
RPM_WINDOW_SECONDS = 60 # RPM (每分钟请求数) 计算的时间窗口(秒)
TPM_WINDOW_SECONDS = 60 # TPM (每分钟 Token 数) 计算的时间窗口(秒)
CACHE_REFRESH_INTERVAL_SECONDS = 300 # Key 分数缓存的刷新间隔 (秒,例如 300 秒 = 5 分钟)
# --- Key 分数缓存更新函数 ---
def update_cache_timestamp(model_name: str):
"""
(线程安全) 更新指定模型的分数缓存的最后更新时间戳为当前时间。
Args:
model_name (str): 需要更新时间戳的模型名称。
"""
global cache_last_updated # 声明修改全局变量
with cache_lock: # 获取分数缓存锁
cache_last_updated[model_name] = time.time() # 更新时间戳