KiroProxy User
chore: repo cleanup and maintenance
0edbd7b
"""全局状态管理"""
import time
from collections import deque
from dataclasses import dataclass
from typing import Optional, List, Dict
from pathlib import Path
from ..config import TOKEN_PATH
from ..credential import quota_manager, CredentialStatus
from .account import Account
from .persistence import load_accounts, save_accounts
from .quota_cache import get_quota_cache
from .account_selector import get_account_selector, SelectionStrategy
from .quota_scheduler import get_quota_scheduler
@dataclass
class RequestLog:
"""请求日志"""
id: str
timestamp: float
method: str
path: str
model: str
account_id: Optional[str]
status: int
duration_ms: float
tokens_in: int = 0
tokens_out: int = 0
error: Optional[str] = None
class ProxyState:
"""全局状态管理"""
def __init__(self):
self.accounts: List[Account] = []
self.request_logs: deque = deque(maxlen=1000)
self.total_requests: int = 0
self.total_errors: int = 0
self.session_locks: Dict[str, str] = {}
self.session_timestamps: Dict[str, float] = {}
self.start_time: float = time.time()
self._load_accounts()
def _load_accounts(self):
"""从配置文件加载账号"""
saved = load_accounts()
if saved:
for acc_data in saved:
# 验证 token 文件存在
if Path(acc_data.get("token_path", "")).exists():
self.accounts.append(Account(
id=acc_data["id"],
name=acc_data["name"],
token_path=acc_data["token_path"],
enabled=acc_data.get("enabled", True),
auto_disabled=acc_data.get("auto_disabled", False),
))
print(f"[State] 从配置加载 {len(self.accounts)} 个账号")
# 如果没有账号,尝试添加默认账号
if not self.accounts and TOKEN_PATH.exists():
self.accounts.append(Account(
id="default",
name="默认账号",
token_path=str(TOKEN_PATH)
))
self._save_accounts()
def _save_accounts(self):
"""保存账号到配置文件"""
accounts_data = [
{
"id": acc.id,
"name": acc.name,
"token_path": acc.token_path,
"enabled": acc.enabled,
"auto_disabled": getattr(acc, "auto_disabled", False),
}
for acc in self.accounts
]
save_accounts(accounts_data)
def get_available_account(self, session_id: Optional[str] = None) -> Optional[Account]:
"""获取可用账号(支持会话粘性和智能选择)"""
quota_manager.cleanup_expired()
selector = get_account_selector()
has_priority = bool(selector.get_priority_accounts())
use_session_sticky = bool(session_id) and not has_priority and selector.strategy != SelectionStrategy.RANDOM
# 会话粘性
if use_session_sticky and session_id in self.session_locks:
account_id = self.session_locks[session_id]
ts = self.session_timestamps.get(session_id, 0)
if time.time() - ts < 60:
for acc in self.accounts:
if acc.id == account_id and acc.is_available():
self.session_timestamps[session_id] = time.time()
return acc
# 使用 AccountSelector 选择账号
account = selector.select(self.accounts, session_id)
if account and use_session_sticky:
self.session_locks[session_id] = account.id
self.session_timestamps[session_id] = time.time()
# 标记为活跃账号,便于额度调度器定期更新
if account:
try:
get_quota_scheduler().mark_active(account.id)
except Exception:
pass
return account
def mark_account_used(self, account_id: str) -> None:
"""标记账号被使用"""
scheduler = get_quota_scheduler()
scheduler.mark_active(account_id)
for acc in self.accounts:
if acc.id == account_id:
acc.last_used = time.time()
break
def get_next_available_account(self, exclude_id: str) -> Optional[Account]:
"""获取下一个可用账号(排除指定账号)"""
available = [a for a in self.accounts if a.is_available() and a.id != exclude_id]
if not available:
return None
account = min(available, key=lambda a: a.request_count)
try:
get_quota_scheduler().mark_active(account.id)
except Exception:
pass
return account
def mark_rate_limited(self, account_id: str, duration_seconds: int = 60):
"""标记账号限流"""
for acc in self.accounts:
if acc.id == account_id:
acc.mark_quota_exceeded("Rate limited")
break
def mark_quota_exceeded(self, account_id: str, reason: str = "Quota exceeded"):
"""标记账号配额超限"""
for acc in self.accounts:
if acc.id == account_id:
acc.mark_quota_exceeded(reason)
break
async def refresh_account_token(self, account_id: str) -> tuple:
"""刷新指定账号的 token"""
for acc in self.accounts:
if acc.id == account_id:
return await acc.refresh_token()
return False, "账号不存在"
async def refresh_expiring_tokens(self) -> List[dict]:
"""刷新所有即将过期的 token"""
results = []
for acc in self.accounts:
if acc.enabled and acc.is_token_expiring_soon(10):
success, msg = await acc.refresh_token()
results.append({
"account_id": acc.id,
"success": success,
"message": msg
})
return results
def add_log(self, log: RequestLog):
"""添加请求日志"""
self.request_logs.append(log)
self.total_requests += 1
if log.error:
self.total_errors += 1
def get_stats(self) -> dict:
"""获取统计信息"""
uptime = time.time() - self.start_time
# 获取额度汇总
quota_cache = get_quota_cache()
quota_summary = quota_cache.get_summary()
# 获取选择器状态
selector = get_account_selector()
selector_status = selector.get_status()
# 获取调度器状态
scheduler = get_quota_scheduler()
scheduler_status = scheduler.get_status()
return {
"uptime_seconds": int(uptime),
"total_requests": self.total_requests,
"total_errors": self.total_errors,
"error_rate": f"{(self.total_errors / max(1, self.total_requests) * 100):.1f}%",
"accounts_total": len(self.accounts),
"accounts_available": len([a for a in self.accounts if a.is_available()]),
"accounts_cooldown": len([a for a in self.accounts if a.status == CredentialStatus.COOLDOWN]),
"recent_logs": len(self.request_logs),
# 新增字段
"quota_summary": quota_summary,
"selector": selector_status,
"scheduler": scheduler_status,
}
def get_accounts_status(self) -> List[dict]:
"""获取所有账号状态"""
return [acc.get_status_info() for acc in self.accounts]
def get_accounts_summary(self) -> dict:
"""获取账号汇总统计"""
quota_cache = get_quota_cache()
selector = get_account_selector()
scheduler = get_quota_scheduler()
total_balance = 0.0
total_usage = 0.0
total_limit = 0.0
available_count = 0
cooldown_count = 0
unhealthy_count = 0
disabled_count = 0
for acc in self.accounts:
if not acc.enabled:
disabled_count += 1
elif acc.status == CredentialStatus.COOLDOWN:
cooldown_count += 1
elif acc.status == CredentialStatus.UNHEALTHY:
unhealthy_count += 1
elif acc.is_available():
available_count += 1
quota = quota_cache.get(acc.id)
if quota and not quota.has_error():
total_balance += quota.balance
total_usage += quota.current_usage
total_limit += quota.usage_limit
last_refresh = scheduler.get_last_full_refresh()
last_refresh_ago = None
if last_refresh:
seconds_ago = time.time() - last_refresh
if seconds_ago < 60:
last_refresh_ago = f"{int(seconds_ago)}秒前"
elif seconds_ago < 3600:
last_refresh_ago = f"{int(seconds_ago / 60)}分钟前"
else:
last_refresh_ago = f"{int(seconds_ago / 3600)}小时前"
return {
"total_accounts": len(self.accounts),
"available_accounts": available_count,
"cooldown_accounts": cooldown_count,
"unhealthy_accounts": unhealthy_count,
"disabled_accounts": disabled_count,
"total_balance": round(total_balance, 2),
"total_usage": round(total_usage, 2),
"total_limit": round(total_limit, 2),
"last_refresh": last_refresh_ago,
"last_refresh_timestamp": last_refresh,
"strategy": selector.strategy.value,
"priority_accounts": selector.get_priority_accounts(),
}
def get_valid_account_ids(self) -> set:
"""获取所有有效账号ID集合"""
return {acc.id for acc in self.accounts if acc.enabled}
# 全局状态实例
state = ProxyState()