gcli2api / src /storage /sqlite_manager.py
a3216's picture
sync: github -> hf space
c50496f
"""
SQLite 存储管理器
"""
import asyncio
import json
import os
import time
from typing import Any, Dict, List, Optional, Tuple
import aiosqlite
from log import log
class SQLiteManager:
"""SQLite 数据库管理器"""
# 状态字段常量
STATE_FIELDS = {
"error_codes",
"error_messages",
"disabled",
"last_success",
"user_email",
"model_cooldowns",
"preview",
"tier",
"enable_credit",
}
# 所有必需的列定义(用于自动校验和修复)
REQUIRED_COLUMNS = {
"credentials": [
("disabled", "INTEGER DEFAULT 0"),
("error_codes", "TEXT DEFAULT '[]'"),
("error_messages", "TEXT DEFAULT '[]'"),
("last_success", "REAL"),
("user_email", "TEXT"),
("model_cooldowns", "TEXT DEFAULT '{}'"),
("preview", "INTEGER DEFAULT 1"),
("tier", "TEXT DEFAULT 'pro'"),
("rotation_order", "INTEGER DEFAULT 0"),
("call_count", "INTEGER DEFAULT 0"),
("created_at", "REAL DEFAULT (unixepoch())"),
("updated_at", "REAL DEFAULT (unixepoch())")
],
"antigravity_credentials": [
("disabled", "INTEGER DEFAULT 0"),
("error_codes", "TEXT DEFAULT '[]'"),
("error_messages", "TEXT DEFAULT '[]'"),
("last_success", "REAL"),
("user_email", "TEXT"),
("model_cooldowns", "TEXT DEFAULT '{}'"),
("tier", "TEXT DEFAULT 'pro'"),
("enable_credit", "INTEGER DEFAULT 0"),
("rotation_order", "INTEGER DEFAULT 0"),
("call_count", "INTEGER DEFAULT 0"),
("created_at", "REAL DEFAULT (unixepoch())"),
("updated_at", "REAL DEFAULT (unixepoch())")
]
}
def __init__(self):
self._db_path = None
self._credentials_dir = None
self._initialized = False
self._lock = asyncio.Lock()
# 内存配置缓存 - 初始化时加载一次
self._config_cache: Dict[str, Any] = {}
self._config_loaded = False
async def initialize(self) -> None:
"""初始化 SQLite 数据库"""
if self._initialized:
return
async with self._lock:
if self._initialized:
return
try:
# 获取凭证目录
self._credentials_dir = os.getenv("CREDENTIALS_DIR", "./creds")
self._db_path = os.path.join(self._credentials_dir, "credentials.db")
# 确保目录存在
os.makedirs(self._credentials_dir, exist_ok=True)
# 创建数据库和表
async with aiosqlite.connect(self._db_path) as db:
# 启用 WAL 模式(提升并发性能)
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA foreign_keys=ON")
# 检查并自动修复数据库结构
await self._ensure_schema_compatibility(db)
# 创建表
await self._create_tables(db)
# 修复可能包含路径的凭证文件名
await self._repair_credential_filenames(db)
await db.commit()
# 加载配置到内存
await self._load_config_cache()
self._initialized = True
log.info(f"SQLite storage initialized at {self._db_path}")
except Exception as e:
log.error(f"Error initializing SQLite: {e}")
raise
async def _ensure_schema_compatibility(self, db: aiosqlite.Connection) -> None:
"""
确保数据库结构兼容,自动修复缺失的列
"""
try:
# 检查每个表
for table_name, columns in self.REQUIRED_COLUMNS.items():
# 检查表是否存在
async with db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
) as cursor:
if not await cursor.fetchone():
log.debug(f"Table {table_name} does not exist, will be created")
continue
# 获取现有列
async with db.execute(f"PRAGMA table_info({table_name})") as cursor:
existing_columns = {row[1] for row in await cursor.fetchall()}
# 添加缺失的列
added_count = 0
for col_name, col_def in columns:
if col_name not in existing_columns:
try:
await db.execute(f"ALTER TABLE {table_name} ADD COLUMN {col_name} {col_def}")
log.info(f"Added missing column {table_name}.{col_name}")
added_count += 1
except Exception as e:
log.error(f"Failed to add column {table_name}.{col_name}: {e}")
if added_count > 0:
log.info(f"Table {table_name}: added {added_count} missing column(s)")
except Exception as e:
log.error(f"Error ensuring schema compatibility: {e}")
# 不抛出异常,允许继续初始化
async def _create_tables(self, db: aiosqlite.Connection):
"""创建数据库表和索引"""
# 凭证表
await db.execute("""
CREATE TABLE IF NOT EXISTS credentials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE NOT NULL,
credential_data TEXT NOT NULL,
-- 状态字段
disabled INTEGER DEFAULT 0,
error_codes TEXT DEFAULT '[]',
error_messages TEXT DEFAULT '[]',
last_success REAL,
user_email TEXT,
-- 模型级 CD 支持 (JSON: {model_name: cooldown_timestamp})
model_cooldowns TEXT DEFAULT '{}',
-- preview 状态 (只对 geminicli 有效,默认为 true)
preview INTEGER DEFAULT 1,
-- tier 状态 (只对 geminicli 有效,默认为 pro)
tier TEXT DEFAULT 'pro',
-- 轮换相关
rotation_order INTEGER DEFAULT 0,
call_count INTEGER DEFAULT 0,
-- 时间戳
created_at REAL DEFAULT (unixepoch()),
updated_at REAL DEFAULT (unixepoch())
)
""")
# Antigravity 凭证表(结构相同但独立存储)
await db.execute("""
CREATE TABLE IF NOT EXISTS antigravity_credentials (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE NOT NULL,
credential_data TEXT NOT NULL,
-- 状态字段
disabled INTEGER DEFAULT 0,
error_codes TEXT DEFAULT '[]',
error_messages TEXT DEFAULT '[]',
last_success REAL,
user_email TEXT,
-- 模型级 CD 支持 (JSON: {model_name: cooldown_timestamp})
model_cooldowns TEXT DEFAULT '{}',
-- tier 状态 (默认为 pro)
tier TEXT DEFAULT 'pro',
-- 是否启用信用额度模式(仅 antigravity,有效值 0/1)
enable_credit INTEGER DEFAULT 0,
-- 轮换相关
rotation_order INTEGER DEFAULT 0,
call_count INTEGER DEFAULT 0,
-- 时间戳
created_at REAL DEFAULT (unixepoch()),
updated_at REAL DEFAULT (unixepoch())
)
""")
# 创建索引 - 普通凭证表
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_disabled
ON credentials(disabled)
""")
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_rotation_order
ON credentials(rotation_order)
""")
# 创建索引 - Antigravity 凭证表
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_ag_disabled
ON antigravity_credentials(disabled)
""")
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_ag_rotation_order
ON antigravity_credentials(rotation_order)
""")
# 配置表
await db.execute("""
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at REAL DEFAULT (unixepoch())
)
""")
log.debug("SQLite tables and indexes created")
async def _repair_credential_filenames(self, db: aiosqlite.Connection):
"""
修复凭证数据库中可能包含路径的文件名,确保所有文件名都是 basename
"""
try:
repaired_count = 0
# 修复 credentials 表
async with db.execute("SELECT filename FROM credentials") as cursor:
rows = await cursor.fetchall()
for (filename,) in rows:
basename = os.path.basename(filename)
if basename != filename:
# 检查是否会产生冲突
async with db.execute(
"SELECT COUNT(*) FROM credentials WHERE filename = ?",
(basename,)
) as check_cursor:
count = (await check_cursor.fetchone())[0]
if count == 0:
# 无冲突,直接更新
await db.execute(
"UPDATE credentials SET filename = ? WHERE filename = ?",
(basename, filename)
)
repaired_count += 1
log.info(f"Repaired credential filename: {filename} -> {basename}")
else:
# 有冲突,删除带路径的旧记录(保留 basename 的记录)
await db.execute(
"DELETE FROM credentials WHERE filename = ?",
(filename,)
)
repaired_count += 1
log.warning(f"Removed duplicate credential with path: {filename} (kept {basename})")
# 修复 antigravity_credentials 表
async with db.execute("SELECT filename FROM antigravity_credentials") as cursor:
rows = await cursor.fetchall()
for (filename,) in rows:
basename = os.path.basename(filename)
if basename != filename:
# 检查是否会产生冲突
async with db.execute(
"SELECT COUNT(*) FROM antigravity_credentials WHERE filename = ?",
(basename,)
) as check_cursor:
count = (await check_cursor.fetchone())[0]
if count == 0:
# 无冲突,直接更新
await db.execute(
"UPDATE antigravity_credentials SET filename = ? WHERE filename = ?",
(basename, filename)
)
repaired_count += 1
log.info(f"Repaired antigravity credential filename: {filename} -> {basename}")
else:
# 有冲突,删除带路径的旧记录(保留 basename 的记录)
await db.execute(
"DELETE FROM antigravity_credentials WHERE filename = ?",
(filename,)
)
repaired_count += 1
log.warning(f"Removed duplicate antigravity credential with path: {filename} (kept {basename})")
if repaired_count > 0:
log.info(f"Repaired {repaired_count} credential filename(s)")
else:
log.debug("No credential filenames need repair")
except Exception as e:
log.error(f"Error repairing credential filenames: {e}")
# 不抛出异常,允许继续初始化
async def _load_config_cache(self):
"""加载配置到内存缓存(仅在初始化时调用一次)"""
if self._config_loaded:
return
try:
async with aiosqlite.connect(self._db_path) as db:
async with db.execute("SELECT key, value FROM config") as cursor:
rows = await cursor.fetchall()
for key, value in rows:
try:
self._config_cache[key] = json.loads(value)
except json.JSONDecodeError:
self._config_cache[key] = value
self._config_loaded = True
log.debug(f"Loaded {len(self._config_cache)} config items into cache")
except Exception as e:
log.error(f"Error loading config cache: {e}")
self._config_cache = {}
async def close(self) -> None:
"""关闭数据库连接"""
self._initialized = False
log.debug("SQLite storage closed")
def _ensure_initialized(self):
"""确保已初始化"""
if not self._initialized:
raise RuntimeError("SQLite manager not initialized")
def _get_table_name(self, mode: str) -> str:
"""根据 mode 获取对应的表名"""
if mode == "antigravity":
return "antigravity_credentials"
elif mode == "geminicli":
return "credentials"
else:
raise ValueError(f"Invalid mode: {mode}. Must be 'geminicli' or 'antigravity'")
# ============ SQL 方法 ============
async def get_next_available_credential(
self, mode: str = "geminicli", model_name: Optional[str] = None
) -> Optional[Tuple[str, Dict[str, Any]]]:
"""
随机获取一个可用凭证(负载均衡)
- 未禁用
- 如果提供了 model_name,还会检查模型级冷却和preview状态
- 随机选择
Args:
mode: 凭证模式 ("geminicli" 或 "antigravity")
model_name: 完整模型名(如 "gemini-2.0-flash-exp", "gemini-3-flash-preview")
"""
self._ensure_initialized()
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
current_time = time.time()
if mode == "geminicli":
tier_clause = ""
if model_name and "pro" in model_name.lower():
tier_clause = "AND (tier IS NULL OR tier != 'free')"
async with db.execute(f"""
SELECT filename, credential_data, model_cooldowns, preview
FROM {table_name}
WHERE disabled = 0 {tier_clause}
ORDER BY RANDOM()
""") as cursor:
rows = await cursor.fetchall()
if not model_name:
if rows:
filename, credential_json, _, _ = rows[0]
credential_data = json.loads(credential_json)
return filename, credential_data
return None
is_preview_model = "preview" in model_name.lower()
non_preview_creds = []
preview_creds = []
for filename, credential_json, model_cooldowns_json, preview in rows:
model_cooldowns = json.loads(model_cooldowns_json or '{}')
model_cooldown = model_cooldowns.get(model_name)
if model_cooldown is None or current_time >= model_cooldown:
if preview:
preview_creds.append((filename, credential_json))
else:
non_preview_creds.append((filename, credential_json))
if is_preview_model:
if preview_creds:
filename, credential_json = preview_creds[0]
credential_data = json.loads(credential_json)
return filename, credential_data
else:
if non_preview_creds:
filename, credential_json = non_preview_creds[0]
credential_data = json.loads(credential_json)
return filename, credential_data
elif preview_creds:
filename, credential_json = preview_creds[0]
credential_data = json.loads(credential_json)
return filename, credential_data
return None
else:
async with db.execute(f"""
SELECT filename, credential_data, model_cooldowns, enable_credit
FROM {table_name}
WHERE disabled = 0
ORDER BY RANDOM()
""") as cursor:
rows = await cursor.fetchall()
if not model_name:
if rows:
filename, credential_json, _, enable_credit = rows[0]
credential_data = json.loads(credential_json)
credential_data["enable_credit"] = bool(enable_credit)
return filename, credential_data
return None
for filename, credential_json, model_cooldowns_json, enable_credit in rows:
model_cooldowns = json.loads(model_cooldowns_json or '{}')
model_cooldown = model_cooldowns.get(model_name)
if model_cooldown is None or current_time >= model_cooldown:
credential_data = json.loads(credential_json)
credential_data["enable_credit"] = bool(enable_credit)
return filename, credential_data
return None
except Exception as e:
log.error(f"Error getting next available credential (mode={mode}, model_name={model_name}): {e}")
return None
async def get_available_credentials_list(self) -> List[str]:
"""
获取所有可用凭证列表
- 未禁用
- 按轮换顺序排序
"""
self._ensure_initialized()
try:
async with aiosqlite.connect(self._db_path) as db:
async with db.execute("""
SELECT filename
FROM credentials
WHERE disabled = 0
ORDER BY rotation_order ASC
""") as cursor:
rows = await cursor.fetchall()
return [row[0] for row in rows]
except Exception as e:
log.error(f"Error getting available credentials list: {e}")
return []
# ============ StorageBackend 协议方法 ============
async def store_credential(self, filename: str, credential_data: Dict[str, Any], mode: str = "geminicli") -> bool:
"""存储或更新凭证"""
self._ensure_initialized()
# 统一使用 basename 处理文件名
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 检查凭证是否存在
async with db.execute(f"""
SELECT disabled, error_codes, last_success, user_email,
rotation_order, call_count
FROM {table_name} WHERE filename = ?
""", (filename,)) as cursor:
existing = await cursor.fetchone()
if existing:
# 更新现有凭证(保留状态)
await db.execute(f"""
UPDATE {table_name}
SET credential_data = ?,
updated_at = unixepoch()
WHERE filename = ?
""", (json.dumps(credential_data), filename))
else:
# 插入新凭证
async with db.execute(f"""
SELECT COALESCE(MAX(rotation_order), -1) + 1 FROM {table_name}
""") as cursor:
row = await cursor.fetchone()
next_order = row[0]
await db.execute(f"""
INSERT INTO {table_name}
(filename, credential_data, rotation_order, last_success)
VALUES (?, ?, ?, ?)
""", (filename, json.dumps(credential_data), next_order, time.time()))
await db.commit()
log.debug(f"Stored credential: {filename} (mode={mode})")
return True
except Exception as e:
log.error(f"Error storing credential {filename}: {e}")
return False
async def get_credential(self, filename: str, mode: str = "geminicli") -> Optional[Dict[str, Any]]:
"""获取凭证数据"""
self._ensure_initialized()
# 统一使用 basename 处理文件名
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 精确匹配
async with db.execute(f"""
SELECT credential_data FROM {table_name} WHERE filename = ?
""", (filename,)) as cursor:
row = await cursor.fetchone()
if row:
return json.loads(row[0])
return None
except Exception as e:
log.error(f"Error getting credential {filename}: {e}")
return None
async def list_credentials(self, mode: str = "geminicli") -> List[str]:
"""列出所有凭证文件名(包括禁用的)"""
self._ensure_initialized()
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
async with db.execute(f"""
SELECT filename FROM {table_name} ORDER BY rotation_order
""") as cursor:
rows = await cursor.fetchall()
return [row[0] for row in rows]
except Exception as e:
log.error(f"Error listing credentials: {e}")
return []
async def delete_credential(self, filename: str, mode: str = "geminicli") -> bool:
"""删除凭证"""
self._ensure_initialized()
# 统一使用 basename 处理文件名
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 精确匹配删除
result = await db.execute(f"""
DELETE FROM {table_name} WHERE filename = ?
""", (filename,))
deleted_count = result.rowcount
await db.commit()
if deleted_count > 0:
log.debug(f"Deleted {deleted_count} credential(s): {filename} (mode={mode})")
return True
else:
log.warning(f"No credential found to delete: {filename} (mode={mode})")
return False
except Exception as e:
log.error(f"Error deleting credential {filename}: {e}")
return False
async def update_credential_state(self, filename: str, state_updates: Dict[str, Any], mode: str = "geminicli") -> bool:
"""更新凭证状态"""
self._ensure_initialized()
# 统一使用 basename 处理文件名
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
log.debug(f"[DB] update_credential_state 开始: filename={filename}, state_updates={state_updates}, mode={mode}, table={table_name}")
# 构建动态 SQL
set_clauses = []
values = []
for key, value in state_updates.items():
if key in self.STATE_FIELDS:
if key == "enable_credit" and mode != "antigravity":
continue
if key in ("error_codes", "error_messages", "model_cooldowns"):
# JSON 字段需要序列化
set_clauses.append(f"{key} = ?")
values.append(json.dumps(value))
else:
set_clauses.append(f"{key} = ?")
values.append(value)
if not set_clauses:
log.info(f"[DB] 没有需要更新的状态字段")
return True
set_clauses.append("updated_at = unixepoch()")
values.append(filename)
log.debug(f"[DB] SQL参数: set_clauses={set_clauses}, values={values}")
async with aiosqlite.connect(self._db_path) as db:
# 精确匹配更新
sql_exact = f"""
UPDATE {table_name}
SET {', '.join(set_clauses)}
WHERE filename = ?
"""
log.debug(f"[DB] 执行精确匹配SQL: {sql_exact}")
log.debug(f"[DB] SQL参数值: {values}")
result = await db.execute(sql_exact, values)
updated_count = result.rowcount
log.debug(f"[DB] 精确匹配 rowcount={updated_count}")
# 提交前检查
log.debug(f"[DB] 准备commit,总更新行数={updated_count}")
await db.commit()
log.debug(f"[DB] commit完成")
success = updated_count > 0
log.debug(f"[DB] update_credential_state 结束: success={success}, updated_count={updated_count}")
return success
except Exception as e:
log.error(f"[DB] Error updating credential state {filename}: {e}")
return False
async def get_credential_state(self, filename: str, mode: str = "geminicli") -> Dict[str, Any]:
"""获取凭证状态(不包含error_messages)"""
self._ensure_initialized()
# 统一使用 basename 处理文件名
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 精确匹配
if mode == "geminicli":
async with db.execute(f"""
SELECT disabled, error_codes, last_success, user_email, model_cooldowns, preview, tier
FROM {table_name} WHERE filename = ?
""", (filename,)) as cursor:
row = await cursor.fetchone()
if row:
error_codes_json = row[1] or '[]'
model_cooldowns_json = row[4] or '{}'
return {
"disabled": bool(row[0]),
"error_codes": json.loads(error_codes_json),
"last_success": row[2] or time.time(),
"user_email": row[3],
"model_cooldowns": json.loads(model_cooldowns_json),
"preview": bool(row[5]) if row[5] is not None else True,
"tier": row[6] if row[6] is not None else "pro",
}
# 返回默认状态
return {
"disabled": False,
"error_codes": [],
"last_success": time.time(),
"user_email": None,
"model_cooldowns": {},
"preview": True,
"tier": "pro",
}
else:
# antigravity 模式
async with db.execute(f"""
SELECT disabled, error_codes, last_success, user_email, model_cooldowns, tier, enable_credit
FROM {table_name} WHERE filename = ?
""", (filename,)) as cursor:
row = await cursor.fetchone()
if row:
error_codes_json = row[1] or '[]'
model_cooldowns_json = row[4] or '{}'
return {
"disabled": bool(row[0]),
"error_codes": json.loads(error_codes_json),
"last_success": row[2] or time.time(),
"user_email": row[3],
"model_cooldowns": json.loads(model_cooldowns_json),
"tier": row[5] if row[5] is not None else "pro",
"enable_credit": bool(row[6]) if row[6] is not None else False,
}
# 返回默认状态
return {
"disabled": False,
"error_codes": [],
"last_success": time.time(),
"user_email": None,
"model_cooldowns": {},
"tier": "pro",
"enable_credit": False,
}
except Exception as e:
log.error(f"Error getting credential state {filename}: {e}")
return {}
async def get_all_credential_states(self, mode: str = "geminicli") -> Dict[str, Dict[str, Any]]:
"""获取所有凭证状态(不包含error_messages)"""
self._ensure_initialized()
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
if mode == "geminicli":
async with db.execute(f"""
SELECT filename, disabled, error_codes, last_success,
user_email, model_cooldowns, preview, tier
FROM {table_name}
""") as cursor:
rows = await cursor.fetchall()
states = {}
current_time = time.time()
for row in rows:
filename = row[0]
error_codes_json = row[2] or '[]'
model_cooldowns_json = row[5] or '{}'
model_cooldowns = json.loads(model_cooldowns_json)
# 自动过滤掉已过期的模型CD
if model_cooldowns:
model_cooldowns = {
k: v for k, v in model_cooldowns.items()
if v > current_time
}
states[filename] = {
"disabled": bool(row[1]),
"error_codes": json.loads(error_codes_json),
"last_success": row[3] or time.time(),
"user_email": row[4],
"model_cooldowns": model_cooldowns,
"preview": bool(row[6]) if row[6] is not None else True,
"tier": row[7] if row[7] is not None else "pro",
}
return states
else:
# antigravity 模式
async with db.execute(f"""
SELECT filename, disabled, error_codes, last_success,
user_email, model_cooldowns, tier, enable_credit
FROM {table_name}
""") as cursor:
rows = await cursor.fetchall()
states = {}
current_time = time.time()
for row in rows:
filename = row[0]
error_codes_json = row[2] or '[]'
model_cooldowns_json = row[5] or '{}'
model_cooldowns = json.loads(model_cooldowns_json)
# 自动过滤掉已过期的模型CD
if model_cooldowns:
model_cooldowns = {
k: v for k, v in model_cooldowns.items()
if v > current_time
}
states[filename] = {
"disabled": bool(row[1]),
"error_codes": json.loads(error_codes_json),
"last_success": row[3] or time.time(),
"user_email": row[4],
"model_cooldowns": model_cooldowns,
"tier": row[6] if row[6] is not None else "pro",
"enable_credit": bool(row[7]) if row[7] is not None else False,
}
return states
except Exception as e:
log.error(f"Error getting all credential states: {e}")
return {}
async def get_credentials_summary(
self,
offset: int = 0,
limit: Optional[int] = None,
status_filter: str = "all",
mode: str = "geminicli",
error_code_filter: Optional[str] = None,
cooldown_filter: Optional[str] = None,
preview_filter: Optional[str] = None,
tier_filter: Optional[str] = None
) -> Dict[str, Any]:
"""
获取凭证的摘要信息(不包含完整凭证数据)- 支持分页和状态筛选
Args:
offset: 跳过的记录数(默认0)
limit: 返回的最大记录数(None表示返回所有)
status_filter: 状态筛选(all=全部, enabled=仅启用, disabled=仅禁用)
mode: 凭证模式 ("geminicli" 或 "antigravity")
error_code_filter: 错误码筛选(格式如"400"或"403",筛选包含该错误码的凭证)
cooldown_filter: 冷却状态筛选("in_cooldown"=冷却中, "no_cooldown"=未冷却)
preview_filter: Preview筛选("preview"=支持preview, "no_preview"=不支持preview,仅geminicli模式有效)
tier_filter: tier筛选("free", "pro", "ultra")
Returns:
包含 items(凭证列表)、total(总数)、offset、limit 的字典
"""
self._ensure_initialized()
try:
# 根据 mode 选择表名
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 先计算全局统计数据(不受筛选条件影响)
global_stats = {"total": 0, "normal": 0, "disabled": 0}
async with db.execute(f"""
SELECT disabled, COUNT(*) FROM {table_name} GROUP BY disabled
""") as stats_cursor:
stats_rows = await stats_cursor.fetchall()
for disabled, count in stats_rows:
global_stats["total"] += count
if disabled:
global_stats["disabled"] = count
else:
global_stats["normal"] = count
# 构建WHERE子句
where_clauses = []
count_params = []
if status_filter == "enabled":
where_clauses.append("disabled = 0")
elif status_filter == "disabled":
where_clauses.append("disabled = 1")
filter_value = None
filter_int = None
if error_code_filter and str(error_code_filter).strip().lower() != "all":
filter_value = str(error_code_filter).strip()
try:
filter_int = int(filter_value)
except ValueError:
filter_int = None
# 构建WHERE子句
where_clause = ""
if where_clauses:
where_clause = "WHERE " + " AND ".join(where_clauses)
# 先获取所有数据(用于冷却筛选,因为需要在Python中判断)
if mode == "geminicli":
all_query = f"""
SELECT filename, disabled, error_codes, last_success,
user_email, rotation_order, model_cooldowns, preview, tier
FROM {table_name}
{where_clause}
ORDER BY rotation_order
"""
else:
all_query = f"""
SELECT filename, disabled, error_codes, last_success,
user_email, rotation_order, model_cooldowns, tier, enable_credit
FROM {table_name}
{where_clause}
ORDER BY rotation_order
"""
async with db.execute(all_query, count_params) as cursor:
all_rows = await cursor.fetchall()
current_time = time.time()
all_summaries = []
for row in all_rows:
filename = row[0]
error_codes_json = row[2] or '[]'
model_cooldowns_json = row[6] or '{}'
model_cooldowns = json.loads(model_cooldowns_json)
# 自动过滤掉已过期的模型CD
active_cooldowns = {}
if model_cooldowns:
active_cooldowns = {
k: v for k, v in model_cooldowns.items()
if v > current_time
}
error_codes = json.loads(error_codes_json)
if filter_value:
match = False
for code in error_codes:
if code == filter_value or code == filter_int:
match = True
break
if isinstance(code, str) and filter_int is not None:
try:
if int(code) == filter_int:
match = True
break
except ValueError:
pass
if not match:
continue
summary = {
"filename": filename,
"disabled": bool(row[1]),
"error_codes": error_codes,
"last_success": row[3] or current_time,
"user_email": row[4],
"rotation_order": row[5],
"model_cooldowns": active_cooldowns,
"tier": row[8] if mode == "geminicli" and row[8] is not None else (
row[7] if mode != "geminicli" and row[7] is not None else "pro"
),
}
if mode != "geminicli":
summary["enable_credit"] = bool(row[8]) if row[8] is not None else False
if mode == "geminicli":
summary["preview"] = bool(row[7]) if row[7] is not None else True
if preview_filter:
preview_value = summary.get("preview", True)
if preview_filter == "preview" and not preview_value:
continue
elif preview_filter == "no_preview" and preview_value:
continue
# 应用tier筛选
if tier_filter and tier_filter in ("free", "pro", "ultra"):
if summary["tier"] != tier_filter:
continue
# 应用冷却筛选
if cooldown_filter == "in_cooldown":
# 只保留有冷却的凭证
if active_cooldowns:
all_summaries.append(summary)
elif cooldown_filter == "no_cooldown":
# 只保留没有冷却的凭证
if not active_cooldowns:
all_summaries.append(summary)
else:
# 不筛选冷却状态
all_summaries.append(summary)
# 应用分页
total_count = len(all_summaries)
if limit is not None:
summaries = all_summaries[offset:offset + limit]
else:
summaries = all_summaries[offset:]
return {
"items": summaries,
"total": total_count,
"offset": offset,
"limit": limit,
"stats": global_stats,
}
except Exception as e:
log.error(f"Error getting credentials summary: {e}")
return {
"items": [],
"total": 0,
"offset": offset,
"limit": limit,
"stats": {"total": 0, "normal": 0, "disabled": 0},
}
async def get_duplicate_credentials_by_email(self, mode: str = "geminicli") -> Dict[str, Any]:
"""
获取按邮箱分组的重复凭证信息(只查询邮箱和文件名,不加载完整凭证数据)
用于去重操作
Args:
mode: 凭证模式 ("geminicli" 或 "antigravity")
Returns:
包含 email_groups(邮箱分组)、duplicate_count(重复数量)、no_email_count(无邮箱数量)的字典
"""
self._ensure_initialized()
try:
# 根据 mode 选择表名
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 查询所有凭证的文件名和邮箱(不加载完整凭证数据)
query = f"""
SELECT filename, user_email
FROM {table_name}
ORDER BY filename
"""
async with db.execute(query) as cursor:
rows = await cursor.fetchall()
# 按邮箱分组
email_to_files = {}
no_email_files = []
for filename, user_email in rows:
if user_email:
if user_email not in email_to_files:
email_to_files[user_email] = []
email_to_files[user_email].append(filename)
else:
no_email_files.append(filename)
# 找出重复的邮箱组
duplicate_groups = []
total_duplicate_count = 0
for email, files in email_to_files.items():
if len(files) > 1:
# 保留第一个文件,其他为重复
duplicate_groups.append({
"email": email,
"kept_file": files[0],
"duplicate_files": files[1:],
"duplicate_count": len(files) - 1,
})
total_duplicate_count += len(files) - 1
return {
"email_groups": email_to_files,
"duplicate_groups": duplicate_groups,
"duplicate_count": total_duplicate_count,
"no_email_files": no_email_files,
"no_email_count": len(no_email_files),
"unique_email_count": len(email_to_files),
"total_count": len(rows),
}
except Exception as e:
log.error(f"Error getting duplicate credentials by email: {e}")
return {
"email_groups": {},
"duplicate_groups": [],
"duplicate_count": 0,
"no_email_files": [],
"no_email_count": 0,
"unique_email_count": 0,
"total_count": 0,
}
# ============ 配置管理(内存缓存)============
async def set_config(self, key: str, value: Any) -> bool:
"""设置配置(写入数据库 + 更新内存缓存)"""
self._ensure_initialized()
try:
async with aiosqlite.connect(self._db_path) as db:
await db.execute("""
INSERT INTO config (key, value, updated_at)
VALUES (?, ?, unixepoch())
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
""", (key, json.dumps(value)))
await db.commit()
# 更新内存缓存
self._config_cache[key] = value
return True
except Exception as e:
log.error(f"Error setting config {key}: {e}")
return False
async def reload_config_cache(self):
"""重新加载配置缓存(在批量修改配置后调用)"""
self._ensure_initialized()
self._config_loaded = False
await self._load_config_cache()
log.info("Config cache reloaded from database")
async def get_config(self, key: str, default: Any = None) -> Any:
"""获取配置(从内存缓存)"""
self._ensure_initialized()
return self._config_cache.get(key, default)
async def get_all_config(self) -> Dict[str, Any]:
"""获取所有配置(从内存缓存)"""
self._ensure_initialized()
return self._config_cache.copy()
async def delete_config(self, key: str) -> bool:
"""删除配置"""
self._ensure_initialized()
try:
async with aiosqlite.connect(self._db_path) as db:
await db.execute("DELETE FROM config WHERE key = ?", (key,))
await db.commit()
# 从内存缓存移除
self._config_cache.pop(key, None)
return True
except Exception as e:
log.error(f"Error deleting config {key}: {e}")
return False
async def get_credential_errors(self, filename: str, mode: str = "geminicli") -> Dict[str, Any]:
"""
专门获取凭证的错误信息(包含 error_codes 和 error_messages)
Args:
filename: 凭证文件名
mode: 凭证模式 ("geminicli" 或 "antigravity")
Returns:
包含 error_codes 和 error_messages 的字典
"""
self._ensure_initialized()
# 统一使用 basename 处理文件名
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 精确匹配
async with db.execute(f"""
SELECT error_codes, error_messages FROM {table_name} WHERE filename = ?
""", (filename,)) as cursor:
row = await cursor.fetchone()
if row:
error_codes_json = row[0] or '[]'
error_messages_json = row[1] or '[]'
return {
"filename": filename,
"error_codes": json.loads(error_codes_json),
"error_messages": json.loads(error_messages_json),
}
# 凭证不存在,返回空错误信息
return {
"filename": filename,
"error_codes": [],
"error_messages": [],
}
except Exception as e:
log.error(f"Error getting credential errors {filename}: {e}")
return {
"filename": filename,
"error_codes": [],
"error_messages": [],
"error": str(e)
}
# ============ 模型级冷却管理 ============
async def set_model_cooldown(
self,
filename: str,
model_name: str,
cooldown_until: Optional[float],
mode: str = "geminicli"
) -> bool:
"""
设置特定模型的冷却时间
Args:
filename: 凭证文件名
model_name: 模型名(完整模型名,如 "gemini-2.0-flash-exp")
cooldown_until: 冷却截止时间戳(None 表示清除冷却)
mode: 凭证模式 ("geminicli" 或 "antigravity")
Returns:
是否成功
"""
self._ensure_initialized()
# 统一使用 basename 处理文件名
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 获取当前的 model_cooldowns
async with db.execute(f"""
SELECT model_cooldowns FROM {table_name} WHERE filename = ?
""", (filename,)) as cursor:
row = await cursor.fetchone()
if not row:
log.warning(f"Credential {filename} not found")
return False
model_cooldowns = json.loads(row[0] or '{}')
# 更新或删除指定模型的冷却时间
if cooldown_until is None:
model_cooldowns.pop(model_name, None)
else:
model_cooldowns[model_name] = cooldown_until
# 写回数据库
await db.execute(f"""
UPDATE {table_name}
SET model_cooldowns = ?,
updated_at = unixepoch()
WHERE filename = ?
""", (json.dumps(model_cooldowns), filename))
await db.commit()
log.debug(f"Set model cooldown: {filename}, model_name={model_name}, cooldown_until={cooldown_until}")
return True
except Exception as e:
log.error(f"Error setting model cooldown for {filename}: {e}")
return False
async def clear_all_model_cooldowns(
self,
filename: str,
mode: str = "geminicli"
) -> bool:
"""清除某个凭证的所有模型冷却时间"""
self._ensure_initialized()
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
result = await db.execute(f"""
UPDATE {table_name}
SET model_cooldowns = '{{}}',
updated_at = unixepoch()
WHERE filename = ?
""", (filename,))
updated_count = result.rowcount
await db.commit()
if updated_count == 0:
log.warning(f"Credential {filename} not found")
return False
log.debug(f"Cleared all model cooldowns: {filename} (mode={mode})")
return True
except Exception as e:
log.error(f"Error clearing all model cooldowns for {filename}: {e}")
return False
async def record_success(
self,
filename: str,
model_name: Optional[str] = None,
mode: str = "geminicli"
) -> None:
"""
成功调用后的条件写入:
- 只有当前 error_codes 非空时才清除错误并写 last_success
- 只有当前存在该模型的冷却键时才清除
通过 SQL WHERE 条件匹配实现
"""
self._ensure_initialized()
filename = os.path.basename(filename)
try:
table_name = self._get_table_name(mode)
async with aiosqlite.connect(self._db_path) as db:
# 条件写入:只有 error_codes 非空时才触发
await db.execute(f"""
UPDATE {table_name}
SET last_success = unixepoch(),
error_codes = '[]',
error_messages = '{{}}',
updated_at = unixepoch()
WHERE filename = ?
AND (error_codes IS NOT NULL AND error_codes != '[]' AND error_codes != '')
""", (filename,))
# 条件删除模型冷却:只有模型键存在时才写入
if model_name:
async with db.execute(f"""
SELECT model_cooldowns FROM {table_name} WHERE filename = ?
""", (filename,)) as cursor:
row = await cursor.fetchone()
if row:
cooldowns = json.loads(row[0] or '{}')
if model_name in cooldowns:
cooldowns.pop(model_name)
await db.execute(f"""
UPDATE {table_name}
SET model_cooldowns = ?, updated_at = unixepoch()
WHERE filename = ?
""", (json.dumps(cooldowns), filename))
await db.commit()
except Exception as e:
log.error(f"Error recording success for {filename}: {e}")