""" Gemini OpenAI 兼容 API 服务 启动: python server.py 后台: http://localhost:8000/admin API: http://localhost:8000/v1 """ from fastapi import FastAPI, HTTPException, Header, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse, JSONResponse from pydantic import BaseModel from typing import List, Dict, Any, Optional, Union import uvicorn import time import uuid import json import os import re import httpx import hashlib import secrets import asyncio from persistence import PersistenceManager # ============ 持久化配置 ============ # 如果要在 HF 上持久化,请设置 DATASET_REPO_ID 环境变量(例如: 'luoluoluo22/gemini-config') pm = PersistenceManager( db_path="data.sqlite", dataset_repo=os.getenv("DATASET_REPO_ID"), hf_token=os.getenv("HF_TOKEN") ) # ============ 配置 ============ API_KEY = os.getenv("API_KEY", "sk-geminixxxxx") HOST = "0.0.0.0" PORT = int(os.getenv("PORT", 7860)) CONFIG_FILE = "config_data.json" # 后台登录账号密码 ADMIN_USERNAME = os.getenv("ADMIN_USERNAME", "admin") ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin123") # Token 自动刷新配置 TOKEN_REFRESH_INTERVAL_MIN = 1800 # 刷新间隔最小秒数(默认 30 分钟) TOKEN_REFRESH_INTERVAL_MAX = 3600 # 刷新间隔最大秒数(默认 60 分钟) TOKEN_AUTO_REFRESH = True # 是否启用自动刷新 TOKEN_BACKGROUND_REFRESH = True # 是否启用后台定时刷新(防止长时间不用失效) # 媒体文件外网访问地址 # 在 Hugging Face Spaces 中,可以直接获取 SPACE_ID 来构造 URL SPACE_ID = os.getenv("SPACE_ID") if SPACE_ID: MEDIA_BASE_URL = f"https://{SPACE_ID.replace('/', '-')}.hf.space" else: MEDIA_BASE_URL = os.getenv("MEDIA_BASE_URL", "http://127.0.0.1:7860") # ============================== import random from datetime import datetime # 后台刷新任务控制 _background_refresh_task = None _background_refresh_stop = False app = FastAPI(title="Gemini OpenAI API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 静态文件路由 (用于示例图片) from fastapi.responses import FileResponse # 生成的媒体文件缓存目录 MEDIA_CACHE_DIR = os.path.join(os.path.dirname(__file__), "media_cache") os.makedirs(MEDIA_CACHE_DIR, exist_ok=True) @app.get("/static/{filename}") async def serve_static(filename: str): """提供静态文件(示例图片等)""" file_path = os.path.join(os.path.dirname(__file__), filename) if os.path.exists(file_path): return FileResponse(file_path) raise HTTPException(status_code=404, detail="文件不存在") @app.get("/media/{media_filename}") async def serve_media(media_filename: str): """提供缓存的媒体文件""" # 安全检查:只允许字母数字、下划线、点和常见后缀 import re if not re.match(r'^[a-zA-Z0-9_-]+(\.(png|jpg|jpeg|gif|webp|mp4))?$', media_filename): raise HTTPException(status_code=400, detail="无效的媒体文件名") # 直接查找文件(带后缀名) file_path = os.path.join(MEDIA_CACHE_DIR, media_filename) if os.path.exists(file_path): return FileResponse(file_path) # 兼容旧版本:不带后缀名的请求,尝试查找匹配的文件 media_id = media_filename.rsplit('.', 1)[0] if '.' in media_filename else media_filename for ext in [".png", ".jpg", ".jpeg", ".gif", ".webp", ".mp4"]: file_path = os.path.join(MEDIA_CACHE_DIR, media_id + ext) if os.path.exists(file_path): return FileResponse(file_path) raise HTTPException(status_code=404, detail="媒体文件不存在") def cleanup_old_media(max_age_hours: int = 1): """清理过期的媒体缓存文件""" import time now = time.time() max_age_seconds = max_age_hours * 3600 try: for filename in os.listdir(MEDIA_CACHE_DIR): file_path = os.path.join(MEDIA_CACHE_DIR, filename) if os.path.isfile(file_path): file_age = now - os.path.getmtime(file_path) if file_age > max_age_seconds: os.remove(file_path) except Exception: pass # 存储有效的 session token _admin_sessions = set() def generate_session_token(): """生成随机 session token""" return secrets.token_hex(32) def verify_admin_session(request: Request): """验证管理员 session""" token = request.cookies.get("admin_session") # 增加调试日志 if not token or token not in _admin_sessions: return False return True # 默认可用模型列表 (Gemini 3 官网三个模型: 快速/思考/Pro) DEFAULT_MODELS = ["gemini-3.0-flash", "gemini-3.0-flash-thinking", "gemini-3.0-pro"] # 默认模型 ID (用于请求头选择模型) DEFAULT_MODEL_IDS = { "flash": "56fdd199312815e2", "pro": "e6fa609c3fa255c0", "thinking": "e051ce1aa80aa576", } # 配置存储 (内存中的镜像,初始为默认值) _config = { "SNLM0E": "", "SECURE_1PSID": "", "SECURE_1PSIDTS": "", "SAPISID": "", "SID": "", "HSID": "", "SSID": "", "APISID": "", "PUSH_ID": "", "FULL_COOKIE": "", # 存储完整cookie字符串 "API_KEY": os.getenv("API_KEY", "sk-geminixxxxx"), # 动态 API Key "MODELS": DEFAULT_MODELS.copy(), # 可用模型列表 "MODEL_IDS": DEFAULT_MODEL_IDS.copy(), # 模型 ID 映射 } # Cookie 字段映射 (浏览器cookie名 -> 配置字段名) COOKIE_FIELD_MAP = { "__Secure-1PSID": "SECURE_1PSID", "__Secure-1PSIDTS": "SECURE_1PSIDTS", "SAPISID": "SAPISID", "__Secure-1PAPISID": "SAPISID", # 也映射到 SAPISID "SID": "SID", "HSID": "HSID", "SSID": "SSID", "APISID": "APISID", } def parse_cookie_string(cookie_str: str) -> dict: """解析完整cookie字符串,提取所需字段""" result = {} if not cookie_str: return result for item in cookie_str.split(";"): item = item.strip() if "=" in item: eq_index = item.index("=") key = item[:eq_index].strip() value = item[eq_index + 1:].strip() if key in COOKIE_FIELD_MAP: result[COOKIE_FIELD_MAP[key]] = value return result def fetch_tokens_from_page(cookies_str: str) -> dict: """从 Gemini 页面自动获取 SNLM0E、PUSH_ID 和可用模型列表""" result = {"snlm0e": "", "push_id": "", "models": []} try: session = httpx.Client( timeout=30.0, follow_redirects=True, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } ) # 设置 cookies for item in cookies_str.split(";"): item = item.strip() if "=" in item: key, value = item.split("=", 1) session.cookies.set(key.strip(), value.strip(), domain=".google.com") resp = session.get("https://gemini.google.com") if resp.status_code != 200: return result html = resp.text # 获取 SNLM0E (AT Token) snlm0e_patterns = [ r'"SNlM0e":"([^"]+)"', r'SNlM0e["\s:]+["\']([^"\']+)["\']', r'"at":"([^"]+)"', ] for pattern in snlm0e_patterns: match = re.search(pattern, html) if match: result["snlm0e"] = match.group(1) break # 获取 PUSH_ID push_id_patterns = [ r'"push[_-]?id["\s:]+["\'](feeds/[a-z0-9]+)["\']', r'push[_-]?id["\s:=]+["\'](feeds/[a-z0-9]+)["\']', r'feedName["\s:]+["\'](feeds/[a-z0-9]+)["\']', r'clientId["\s:]+["\'](feeds/[a-z0-9]+)["\']', r'(feeds/[a-z0-9]{14,})', ] for pattern in push_id_patterns: matches = re.findall(pattern, html, re.IGNORECASE) if matches: result["push_id"] = matches[0] break # 获取可用模型列表 (从页面中提取 gemini 模型 ID) model_patterns = [ r'"(gemini-[a-z0-9\.\-]+)"', # 匹配 "gemini-xxx" 格式 r"'(gemini-[a-z0-9\.\-]+)'", # 匹配 'gemini-xxx' 格式 ] models_found = set() for pattern in model_patterns: matches = re.findall(pattern, html, re.IGNORECASE) for m in matches: # 过滤有效的模型名称 if any(x in m.lower() for x in ['flash', 'pro', 'ultra', 'nano']): models_found.add(m) if models_found: result["models"] = sorted(list(models_found)) # 获取模型 ID (用于 x-goog-ext-525001261-jspb 请求头) # 这些 ID 用于选择不同的模型版本 model_id_pattern = r'\["([a-f0-9]{16})","gemini[^"]*(?:flash|pro|thinking)[^"]*"\]' model_ids = re.findall(model_id_pattern, html, re.IGNORECASE) if model_ids: result["model_ids"] = list(set(model_ids)) #备用方案:直接搜索 16 位十六进制 ID(在模型配置附近) if not result.get("model_ids"): # 搜索类似 "56fdd199312815e2" 的模式 hex_id_pattern = r'"([a-f0-9]{16})"' # 在包含 gemini 或 model 的上下文中查找 context_pattern = r'.{0,100}(?:gemini|model|flash|pro|thinking).{0,100}' contexts = re.findall(context_pattern, html, re.IGNORECASE) hex_ids = set() for ctx in contexts: ids = re.findall(hex_id_pattern, ctx) hex_ids.update(ids) if hex_ids: result["model_ids"] = list(hex_ids) return result except Exception: return result _client = None _last_token_refresh = 0 # 上次 token 刷新时间 _token_refresh_count = 0 # token 刷新次数统计 def try_refresh_tokens(force: bool = False) -> dict: """ 尝试刷新 token Args: force: 是否强制刷新,忽略时间间隔 Returns: dict: {"success": bool, "message": str, "snlm0e": str, "push_id": str} """ global _client, _last_token_refresh, _token_refresh_count, _config result = {"success": False, "message": "", "snlm0e": "", "push_id": ""} if not TOKEN_AUTO_REFRESH and not force: result["message"] = "自动刷新已禁用" return result current_time = time.time() now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 检查是否需要刷新(除非强制刷新) if not force and (current_time - _last_token_refresh) < TOKEN_REFRESH_INTERVAL_MIN: result["message"] = f"距离上次刷新不足 {TOKEN_REFRESH_INTERVAL_MIN} 秒" return result try: # 如果 client 存在,使用 client 的刷新方法 if _client is not None: refresh_result = _client.refresh_tokens() if refresh_result["success"]: # 更新配置 if refresh_result["snlm0e"]: _config["SNLM0E"] = refresh_result["snlm0e"] result["snlm0e"] = refresh_result["snlm0e"] if refresh_result["push_id"]: _config["PUSH_ID"] = refresh_result["push_id"] result["push_id"] = refresh_result["push_id"] # 保存配置 save_config() _last_token_refresh = current_time _token_refresh_count += 1 result["success"] = True result["message"] = f"Token 刷新成功 (第 {_token_refresh_count} 次)" print(f"✅ [{now_str}] Token 自动刷新成功 (第 {_token_refresh_count} 次)") else: result["message"] = refresh_result.get("error", "刷新失败") print(f"⚠️ [{now_str}] Token 刷新失败: {result['message']}") else: # client 不存在,使用 fetch_tokens_from_page cookies = _config.get("FULL_COOKIE", "") if not cookies: cookies = f"__Secure-1PSID={_config.get('SECURE_1PSID', '')}" if _config.get("SECURE_1PSIDTS"): cookies += f"; __Secure-1PSIDTS={_config['SECURE_1PSIDTS']}" tokens = fetch_tokens_from_page(cookies) if tokens.get("snlm0e"): _config["SNLM0E"] = tokens["snlm0e"] result["snlm0e"] = tokens["snlm0e"] if tokens.get("push_id"): _config["PUSH_ID"] = tokens["push_id"] result["push_id"] = tokens["push_id"] if tokens.get("snlm0e"): save_config() _last_token_refresh = current_time _token_refresh_count += 1 result["success"] = True result["message"] = f"Token 刷新成功 (第 {_token_refresh_count} 次)" print(f"✅ [{now_str}] Token 自动刷新成功 (第 {_token_refresh_count} 次)") else: result["message"] = "无法从页面获取新 token" return result except Exception as e: result["message"] = f"刷新异常: {str(e)}" print(f"❌ [{now_str}] Token 刷新异常: {e}") return result def reset_client(): """重置 client,下次请求时会重新创建""" global _client _client = None now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"🔄 [{now_str}] Client 已重置,下次请求将重新创建") # ============ 后台定时刷新任务 ============ def get_current_time_str(): """获取当前时间字符串""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def get_random_refresh_interval(): """获取随机刷新间隔""" return random.randint(TOKEN_REFRESH_INTERVAL_MIN, TOKEN_REFRESH_INTERVAL_MAX) async def background_token_refresh(): """后台定时刷新 token 任务""" global _background_refresh_stop print(f"🔄 [{get_current_time_str()}] 后台 Token 定时刷新任务已启动") while not _background_refresh_stop: try: # 随机等待间隔 interval = get_random_refresh_interval() print(f"⏳ [{get_current_time_str()}] 下次刷新将在 {interval} 秒后") await asyncio.sleep(interval) if _background_refresh_stop: break if not TOKEN_BACKGROUND_REFRESH: continue # 执行刷新 print(f"⏰ [{get_current_time_str()}] 后台定时刷新 Token...") result = try_refresh_tokens(force=True) if result["success"]: print(f"✅ [{get_current_time_str()}] 后台刷新成功: {result['message']}") else: print(f"⚠️ [{get_current_time_str()}] 后台刷新失败: {result['message']}") except asyncio.CancelledError: break except Exception as e: print(f"❌ [{get_current_time_str()}] 后台刷新异常: {e}") await asyncio.sleep(60) # 出错后等待 1 分钟再试 print(f"🛑 [{get_current_time_str()}] 后台 Token 定时刷新任务已停止") @app.on_event("startup") async def startup_event(): """应用启动时执行""" global _background_refresh_task, _background_refresh_stop load_config() _background_refresh_stop = False if TOKEN_BACKGROUND_REFRESH: _background_refresh_task = asyncio.create_task(background_token_refresh()) print(f"✅ [{get_current_time_str()}] 后台 Token 定时刷新已启用 (间隔: {TOKEN_REFRESH_INTERVAL_MIN}-{TOKEN_REFRESH_INTERVAL_MAX} 秒随机)") @app.on_event("shutdown") async def shutdown_event(): """应用关闭时执行""" global _background_refresh_task, _background_refresh_stop _background_refresh_stop = True if _background_refresh_task: _background_refresh_task.cancel() try: await _background_refresh_task except asyncio.CancelledError: pass print("🛑 后台任务已停止") # ============ Tools 支持 ============ def build_tools_prompt(tools: List[Dict]) -> str: """将 tools 定义转换为提示词""" if not tools: return "" tools_schema = json.dumps([{ "name": t["function"]["name"], "description": t["function"].get("description", ""), "parameters": t["function"].get("parameters", {}) } for t in tools if t.get("type") == "function"], ensure_ascii=False, indent=2) prompt = f"""[系统指令] 你必须作为函数调用代理。不要自己回答问题,必须调用函数。 可用函数: {tools_schema} 严格规则: 1. 你不能直接回答用户问题 2. 你必须选择一个函数并调用它 3. 只输出以下格式,不要有任何其他文字: ```tool_call {{"name": "函数名", "arguments": {{"参数": "值"}}}} ``` 用户请求: """ return prompt def parse_tool_calls(content: str) -> tuple: """ 解析响应中的工具调用 返回: (tool_calls列表, 剩余文本内容) """ tool_calls = [] # 多种匹配模式 patterns = [ r'```tool_call\s*\n?(.*?)\n?```', # ```tool_call ... ``` r'```json\s*\n?(.*?)\n?```', # ```json ... ``` (有时模型会用这个) r'```\s*\n?(\{[^`]*"name"[^`]*\})\n?```', # ``` {...} ``` ] matches = [] for pattern in patterns: found = re.findall(pattern, content, re.DOTALL) matches.extend(found) # 也尝试直接匹配 JSON 对象(没有代码块包裹的情况) if not matches: json_pattern = r'\{[^{}]*"name"\s*:\s*"[^"]+"\s*,\s*"arguments"\s*:\s*\{[^{}]*\}[^{}]*\}' matches = re.findall(json_pattern, content, re.DOTALL) for i, match in enumerate(matches): try: match = match.strip() # 尝试解析 JSON call_data = json.loads(match) if call_data.get("name"): tool_calls.append({ "id": f"call_{uuid.uuid4().hex[:8]}", "type": "function", "function": { "name": call_data.get("name", ""), "arguments": json.dumps(call_data.get("arguments", {}), ensure_ascii=False) } }) except json.JSONDecodeError: continue # 移除工具调用部分 remaining = content for pattern in patterns: remaining = re.sub(pattern, '', remaining, flags=re.DOTALL) remaining = remaining.strip() return tool_calls, remaining def load_config(): """从本地 SQLite 数据库加载配置""" global _config _config = pm.get_all_config(_config) def save_config(): """保存配置到 SQLite 并同步到云端""" pm.save_config(_config) def get_client(auto_refresh: bool = True): global _client, _last_token_refresh if not _config.get("SNLM0E") or not _config.get("SECURE_1PSID"): raise HTTPException(status_code=500, detail="请先在后台配置 Token 和 Cookie") # 检查是否需要自动刷新 token if auto_refresh and TOKEN_AUTO_REFRESH: current_time = time.time() if (current_time - _last_token_refresh) >= TOKEN_REFRESH_INTERVAL_MIN: try_refresh_tokens() # 如果 client 已存在,直接复用,保持会话上下文 if _client is not None: return _client cookies = f"__Secure-1PSID={_config['SECURE_1PSID']}" if _config.get("SECURE_1PSIDTS"): cookies += f"; __Secure-1PSIDTS={_config['SECURE_1PSIDTS']}" if _config.get("SAPISID"): cookies += f"; SAPISID={_config['SAPISID']}; __Secure-1PAPISID={_config['SAPISID']}" if _config.get("SID"): cookies += f"; SID={_config['SID']}" if _config.get("HSID"): cookies += f"; HSID={_config['HSID']}" if _config.get("SSID"): cookies += f"; SSID={_config['SSID']}" if _config.get("APISID"): cookies += f"; APISID={_config['APISID']}" # 构建媒体文件的基础 URL (优先使用配置的外网地址) media_base_url = MEDIA_BASE_URL if MEDIA_BASE_URL else f"http://localhost:{PORT}" from client import GeminiClient _client = GeminiClient( secure_1psid=_config["SECURE_1PSID"], snlm0e=_config["SNLM0E"], cookies_str=cookies, push_id=_config.get("PUSH_ID") or None, model_ids=_config.get("MODEL_IDS") or DEFAULT_MODEL_IDS, debug=False, media_base_url=media_base_url, ) return _client def get_login_html(): return ''' 登录 - Gemini API

Gemini API

请登录以访问后台管理

''' def get_admin_html(): return ''' Gemini API 配置
🔄 检查中...

🤖 Gemini API 配置

配置 Google Gemini 的认证信息,保存后即可调用 API 退出登录

获取方法:
1. 打开 gemini.google.com 并登录
2. F12 → 网络 → 发送内容到聊天 → 点击任意请求 → Copy 请求头内完整cookie
🔑 Cookie 配置

✅ 已解析的字段:

🎯 模型 ID 配置 (可选,如果模型切换失效请更新)
获取方法:F12 → Network → 在 Gemini 中切换模型发送消息 → 找到请求头 x-goog-ext-525001261-jspb → 复制整个数组值粘贴到下方输入框

✅ 已提取的模型 ID:

🔐 安全配置

📡 API 调用信息

Base URL:

API Key:

可用模型: gemini-3.0-flash | gemini-3.0-pro | gemini-3.0-flash-thinking

💬 文本对话

from openai import OpenAI
client = OpenAI(base_url="", api_key="")

response = client.chat.completions.create(
    model="gemini-3.0-flash",  # 或 gemini-3.0-pro / gemini-3.0-flash-thinking
    messages=[{"role": "user", "content": "你好"}]
)
print(response.choices[0].message.content)

🖼️ 图片识别

import base64
from openai import OpenAI
client = OpenAI(base_url="", api_key="")

# 读取本地图片
with open("image.png", "rb") as f:
    img_b64 = base64.b64encode(f.read()).decode()

response = client.chat.completions.create(
    model="gemini-3.0-flash",
    messages=[{
        "role": "user",
        "content": [
            {"type": "text", "text": "请描述这张图片"},
            {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}"}}
        ]
    }]
)
print(response.choices[0].message.content)

🌊 流式响应

stream = client.chat.completions.create(
    model="gemini-3.0-flash",
    messages=[{"role": "user", "content": "写一首诗"}],
    stream=True
)
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

📷 示例图片

以下是 image.png 示例图片,可用于测试图片识别功能(点击放大):

示例图片

(示例图片不可用,请确保 image.png 文件存在)

''' @app.get("/admin/login", response_class=HTMLResponse) async def admin_login_page(): return get_login_html() @app.post("/admin/login") async def admin_login(request: Request): data = await request.json() username = data.get("username", "") password = data.get("password", "") if username == ADMIN_USERNAME and password == ADMIN_PASSWORD: token = generate_session_token() _admin_sessions.add(token) response = JSONResponse({"success": True, "message": "登录成功"}) # 设置长期有效的 Cookie (10年) response.set_cookie(key="admin_session", value=token, httponly=True, max_age=315360000, samesite="lax") return response else: return {"success": False, "message": "用户名或密码错误"} @app.get("/admin/logout") async def admin_logout(request: Request): token = request.cookies.get("admin_session") if token and token in _admin_sessions: _admin_sessions.discard(token) response = RedirectResponse(url="/admin/login", status_code=302) response.delete_cookie("admin_session") return response @app.get("/admin", response_class=HTMLResponse) async def admin_page(request: Request): if not verify_admin_session(request): return RedirectResponse(url="/admin/login", status_code=302) return get_admin_html() @app.post("/admin/save") async def admin_save(request: Request): if not verify_admin_session(request): raise HTTPException(status_code=401, detail="未登录") global _client data = await request.json() # 处理完整 Cookie 字符串,去除前后空格 full_cookie = data.get("FULL_COOKIE", "").strip() if not full_cookie: return {"success": False, "message": "Cookie 是必填项"} # 解析 Cookie 字符串 parsed = parse_cookie_string(full_cookie) if not parsed.get("SECURE_1PSID"): return {"success": False, "message": "Cookie 中未找到 __Secure-1PSID 字段,请确保复制了完整的 Cookie"} # 从页面自动获取 SNLM0E 和 PUSH_ID tokens = fetch_tokens_from_page(full_cookie) if not tokens.get("snlm0e"): return {"success": False, "message": "无法自动获取 AT Token,请检查 Cookie 是否有效或已过期"} # 更新配置 _config["FULL_COOKIE"] = full_cookie _config["SNLM0E"] = tokens["snlm0e"] _config["PUSH_ID"] = tokens.get("push_id", "") # 从解析结果更新各字段 for field in ["SECURE_1PSID", "SECURE_1PSIDTS", "SAPISID", "SID", "HSID", "SSID", "APISID"]: _config[field] = parsed.get(field, "") # 使用自动获取的模型列表,如果获取失败则使用默认值 if tokens.get("models"): _config["MODELS"] = tokens["models"] else: _config["MODELS"] = DEFAULT_MODELS.copy() # 处理模型 ID 配置 model_ids = data.get("MODEL_IDS", {}) if model_ids: # 只更新非空的值 if model_ids.get("flash"): _config["MODEL_IDS"]["flash"] = model_ids["flash"] if model_ids.get("pro"): _config["MODEL_IDS"]["pro"] = model_ids["pro"] if model_ids.get("thinking"): _config["MODEL_IDS"]["thinking"] = model_ids["thinking"] # 更新 API Key if data.get("API_KEY"): _config["API_KEY"] = data["API_KEY"] save_config() _client = None # 构建结果信息 parsed_fields = [k for k in ["SECURE_1PSID", "SECURE_1PSIDTS", "SAPISID", "SID", "HSID", "SSID", "APISID"] if parsed.get(k)] push_id_msg = f",PUSH_ID ✓" if tokens.get("push_id") else ",PUSH_ID ✗ (图片功能不可用)" models_msg = f",{len(_config['MODELS'])} 个模型" if _config.get("MODELS") else "" try: get_client() return { "success": True, "message": f"配置已保存并验证成功!AT Token ✓{push_id_msg}{models_msg}", "need_restart": False } except Exception as e: return { "success": True, "message": f"配置已保存,但连接测试失败: {str(e)[:50]}", "need_restart": False } @app.get("/admin/config") async def admin_get_config(request: Request): if not verify_admin_session(request): raise HTTPException(status_code=401, detail="未登录") return _config # ============ API 路由 ============ class ChatMessage(BaseModel): role: str content: Union[str, List[Dict[str, Any]]] name: Optional[str] = None class Config: extra = "ignore" class FunctionDefinition(BaseModel): name: str description: Optional[str] = None parameters: Optional[Dict[str, Any]] = None class ToolDefinition(BaseModel): type: str = "function" function: FunctionDefinition class ChatCompletionRequest(BaseModel): model: str = "gemini" messages: List[ChatMessage] stream: Optional[bool] = False # Tools 支持 tools: Optional[List[ToolDefinition]] = None tool_choice: Optional[Union[str, Dict[str, Any]]] = None # OpenAI SDK 可能发送的额外字段 temperature: Optional[float] = None max_tokens: Optional[int] = None top_p: Optional[float] = None frequency_penalty: Optional[float] = None presence_penalty: Optional[float] = None stop: Optional[Union[str, List[str]]] = None n: Optional[int] = None user: Optional[str] = None class Config: extra = "ignore" # 忽略未定义的额外字段 class ChatCompletionChoice(BaseModel): index: int message: Dict[str, Any] finish_reason: str class Usage(BaseModel): prompt_tokens: int completion_tokens: int total_tokens: int class ChatCompletionResponse(BaseModel): id: str object: str = "chat.completion" created: int model: str choices: List[ChatCompletionChoice] usage: Usage def verify_api_key(authorization: str = Header(None)): # 优先使用配置中的 API_KEY current_key = _config.get("API_KEY") or API_KEY if not current_key: return True if not authorization or not authorization.startswith("Bearer ") or authorization[7:] != current_key: raise HTTPException(status_code=401, detail="Invalid API key") return True @app.get("/") async def root(): return RedirectResponse(url="/admin") @app.get("/v1/models") async def list_models(authorization: str = Header(None)): verify_api_key(authorization) models = _config.get("MODELS", DEFAULT_MODELS) created = int(time.time()) return { "object": "list", "data": [{"id": m, "object": "model", "created": created, "owned_by": "google"} for m in models] } @app.post("/v1/token/refresh") async def refresh_token_api(authorization: str = Header(None)): """手动刷新 token API""" verify_api_key(authorization) result = try_refresh_tokens(force=True) return { "success": result["success"], "message": result["message"], "snlm0e_updated": bool(result.get("snlm0e")), "push_id_updated": bool(result.get("push_id")), "refresh_count": _token_refresh_count, } @app.get("/v1/token/status") async def token_status_api(authorization: str = Header(None)): """查看 token 状态 API""" verify_api_key(authorization) current_time = time.time() time_since_refresh = int(current_time - _last_token_refresh) if _last_token_refresh > 0 else -1 return { "auto_refresh_enabled": TOKEN_AUTO_REFRESH, "background_refresh_enabled": TOKEN_BACKGROUND_REFRESH, "refresh_interval_range": f"{TOKEN_REFRESH_INTERVAL_MIN}-{TOKEN_REFRESH_INTERVAL_MAX}", "last_refresh_seconds_ago": time_since_refresh, "total_refresh_count": _token_refresh_count, "has_snlm0e": bool(_config.get("SNLM0E")), "has_push_id": bool(_config.get("PUSH_ID")), "client_active": _client is not None, } @app.post("/v1/client/reset") async def reset_client_api(authorization: str = Header(None)): """重置 client API,用于 token 更新后强制重新创建 client""" verify_api_key(authorization) reset_client() return {"success": True, "message": "Client 已重置,下次请求将使用新配置"} def log_api_call(request_data: dict, response_data: dict, error: str = None): """记录 API 调用日志到文件""" import datetime log_entry = { "timestamp": datetime.datetime.now().isoformat(), "request": request_data, "response": response_data, "error": error } try: with open("api_logs.json", "a", encoding="utf-8") as f: f.write(json.dumps(log_entry, ensure_ascii=False, indent=2) + "\n---\n") except Exception as e: print(f"[LOG ERROR] 写入日志失败: {e}") # 用于追踪会话:保存上次请求的所有用户消息内容 _last_user_messages_hash = "" def get_user_messages_hash(messages: list) -> str: """计算所有用户消息的 hash,用于判断是否是同一会话""" content_str = "" for m in messages: role = m.role if hasattr(m, 'role') else m.get('role', '') if role != "user": continue content = m.content if hasattr(m, 'content') else m.get('content', '') if isinstance(content, list): # 对于包含图片的消息,只取文本部分 text_parts = [item.get('text', '') for item in content if item.get('type') == 'text'] content_str += f"{' '.join(text_parts)}|" else: content_str += f"{content}|" return hashlib.md5(content_str.encode()).hexdigest() def is_continuation(current_messages: list, last_hash: str) -> bool: """ 判断当前请求是否是上一次对话的延续 逻辑:如果当前消息去掉最后一条用户消息后的 hash 等于上次的 hash, 说明是同一对话的延续 """ if not last_hash: return False # 找到所有用户消息 user_indices = [i for i, m in enumerate(current_messages) if (m.role if hasattr(m, 'role') else m.get('role', '')) == "user"] if len(user_indices) <= 1: # 只有一条用户消息,无法判断是否延续,视为新对话 return False # 去掉最后一条用户消息,计算剩余消息的 hash last_user_idx = user_indices[-1] prev_messages = current_messages[:last_user_idx] prev_hash = get_user_messages_hash(prev_messages) return prev_hash == last_hash @app.post("/v1/chat/completions") async def chat_completions(request: ChatCompletionRequest, authorization: str = Header(None)): global _last_user_messages_hash verify_api_key(authorization) # 记录请求入参 (图片内容截断显示) request_log = { "model": request.model, "stream": request.stream, "messages": [], "tools": [t.model_dump() for t in request.tools] if request.tools else None } image_count = 0 for m in request.messages: msg_log = {"role": m.role} if isinstance(m.content, list): content_log = [] for item in m.content: if item.get("type") == "image_url": image_count += 1 img_url = item.get("image_url", {}) if isinstance(img_url, dict): url = img_url.get("url", "") else: url = str(img_url) # 判断图片格式 if url.startswith("data:"): img_format = "base64" elif url.startswith("http://") or url.startswith("https://"): img_format = "url" else: img_format = "unknown" content_log.append({ "type": "image_url", "format": img_format, "url_preview": url[:100] + "..." if len(url) > 100 else url }) else: content_log.append(item) msg_log["content"] = content_log else: msg_log["content"] = m.content request_log["messages"].append(msg_log) # 打印图片接收情况 if image_count > 0: print(f"📷 收到 {image_count} 张图片") try: client = get_client() if not is_continuation(request.messages, _last_user_messages_hash): client.reset() # 处理消息 messages = [] for m in request.messages: content = m.content if isinstance(content, list): messages.append({"role": m.role, "content": content}) else: messages.append({"role": m.role, "content": content}) # 如果有 tools,把工具提示词直接加到用户消息前面 if request.tools and len(messages) > 0: tools_prompt = build_tools_prompt([t.model_dump() for t in request.tools]) for i in range(len(messages) - 1, -1, -1): if messages[i]["role"] == "user": original = messages[i]["content"] if isinstance(original, str): messages[i]["content"] = tools_prompt + original break if request.stream: _last_user_messages_hash = get_user_messages_hash(request.messages) stream_iter = client.chat(messages=messages, model=request.model, stream=True) def generate_real_stream(): streamed_text_parts = [] last_chunk = None for chunk in stream_iter: if not chunk: continue last_chunk = chunk try: choices = chunk.get("choices", []) if choices: delta = choices[0].get("delta", {}) or {} piece = delta.get("content") if isinstance(piece, str) and piece: streamed_text_parts.append(piece) except Exception: pass yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" # 流式也记录最终响应摘要,便于日志排查 try: full_stream_text = "".join(streamed_text_parts) completion_id_log = (last_chunk or {}).get("id", f"chatcmpl-{uuid.uuid4().hex[:8]}") created_time_log = (last_chunk or {}).get("created", int(time.time())) response_log = { "id": completion_id_log, "object": "chat.completion", "created": created_time_log, "model": request.model, "choices": [{ "index": 0, "message": {"role": "assistant", "content": full_stream_text}, "finish_reason": "stop" }], "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} } log_api_call(request_log, response_log) except Exception: pass yield "data: [DONE]\n\n" return StreamingResponse( generate_real_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) response = client.chat(messages=messages, model=request.model) _last_user_messages_hash = get_user_messages_hash(request.messages) reply_content = response.choices[0].message.content completion_id = f"chatcmpl-{uuid.uuid4().hex[:8]}" created_time = int(time.time()) # 解析工具调用 tool_calls = [] final_content = reply_content if request.tools: tool_calls, final_content = parse_tool_calls(reply_content) # 处理流式响应 if request.stream: async def generate_stream(): chunk_data = { "id": completion_id, "object": "chat.completion.chunk", "created": created_time, "model": request.model, "choices": [{ "index": 0, "delta": {"role": "assistant"}, "finish_reason": None }] } yield f"data: {json.dumps(chunk_data)}\n\n" if tool_calls: # 流式返回工具调用 for tc in tool_calls: chunk_data = { "id": completion_id, "object": "chat.completion.chunk", "created": created_time, "model": request.model, "choices": [{ "index": 0, "delta": {"tool_calls": [tc]}, "finish_reason": None }] } yield f"data: {json.dumps(chunk_data)}\n\n" else: chunk_data = { "id": completion_id, "object": "chat.completion.chunk", "created": created_time, "model": request.model, "choices": [{ "index": 0, "delta": {"content": final_content}, "finish_reason": None }] } yield f"data: {json.dumps(chunk_data)}\n\n" chunk_data = { "id": completion_id, "object": "chat.completion.chunk", "created": created_time, "model": request.model, "choices": [{ "index": 0, "delta": {}, "finish_reason": "tool_calls" if tool_calls else "stop" }] } yield f"data: {json.dumps(chunk_data)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( generate_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) # 构建响应消息 response_message = {"role": "assistant"} if tool_calls: response_message["content"] = final_content if final_content else None response_message["tool_calls"] = tool_calls finish_reason = "tool_calls" else: response_message["content"] = final_content finish_reason = "stop" response_data = ChatCompletionResponse( id=completion_id, created=created_time, model=request.model, choices=[ChatCompletionChoice(index=0, message=response_message, finish_reason=finish_reason)], usage=Usage(prompt_tokens=response.usage.prompt_tokens, completion_tokens=response.usage.completion_tokens, total_tokens=response.usage.total_tokens) ) log_api_call(request_log, response_data.model_dump()) return JSONResponse( content=response_data.model_dump(), headers={ "Cache-Control": "no-cache", "X-Request-Id": completion_id, } ) except HTTPException: raise except Exception as e: import traceback error_msg = str(e) # 检测是否是 token 过期错误 is_token_error = any(keyword in error_msg.lower() for keyword in [ 'cookie', 'expired', '过期', '401', '403', 'unauthorized', 'push_id', 'snlm0e', 'upload_id', '认证失败' ]) if is_token_error: print(f"[WARN] 检测到 token 可能过期,尝试自动刷新...") refresh_result = try_refresh_tokens(force=True) if refresh_result["success"]: # 刷新成功,重置 client 并提示用户重试 reset_client() error_msg = f"Token 已自动刷新,请重试请求。原错误: {error_msg}" else: error_msg = f"Token 刷新失败 ({refresh_result['message']}),请手动更新 Cookie。原错误: {error_msg}" print(f"[ERROR] Chat error: {error_msg}") traceback.print_exc() log_api_call(request_log, None, error=error_msg) raise HTTPException(status_code=500, detail=error_msg) @app.post("/v1/chat/completions/reset") async def reset_context(authorization: str = Header(None)): verify_api_key(authorization) global _client if _client: _client.reset() return {"status": "ok"} # 注意: load_config() 已在 startup_event 中调用,这里保留是为了兼容直接导入模块的情况 load_config() if __name__ == "__main__": print(f""" ╔══════════════════════════════════════════════════════════╗ ║ Gemini OpenAI Compatible API Server ║ ╠══════════════════════════════════════════════════════════╣ ║ 后台配置: http://localhost:{PORT}/admin ║ ║ API 地址: http://localhost:{PORT}/v1 ║ ║ API Key: {API_KEY} ║ ║ Token 自动刷新: {"开启" if TOKEN_BACKGROUND_REFRESH else "关闭"} ({TOKEN_REFRESH_INTERVAL_MIN}-{TOKEN_REFRESH_INTERVAL_MAX}秒随机) ║ ╚══════════════════════════════════════════════════════════╝ """) uvicorn.run(app, host=HOST, port=PORT)