""" Gemini OpenAI 兼容 API 服务 启动: python server.py 后台: http://localhost:8000/admin API: http://localhost:8000/v1 """ from fastapi import FastAPI, HTTPException, Header, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse, JSONResponse from pydantic import BaseModel from typing import List, Dict, Any, Optional, Union import uvicorn import time import uuid import json import os import re import httpx import hashlib import secrets import asyncio from persistence import PersistenceManager # ============ 持久化配置 ============ # 如果要在 HF 上持久化,请设置 DATASET_REPO_ID 环境变量(例如: 'luoluoluo22/gemini-config') pm = PersistenceManager( db_path="data.sqlite", dataset_repo=os.getenv("DATASET_REPO_ID"), hf_token=os.getenv("HF_TOKEN") ) # ============ 配置 ============ API_KEY = os.getenv("API_KEY", "sk-geminixxxxx") HOST = "0.0.0.0" PORT = int(os.getenv("PORT", 7860)) CONFIG_FILE = "config_data.json" # 后台登录账号密码 ADMIN_USERNAME = os.getenv("ADMIN_USERNAME", "admin") ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin123") # Token 自动刷新配置 TOKEN_REFRESH_INTERVAL_MIN = 1800 # 刷新间隔最小秒数(默认 30 分钟) TOKEN_REFRESH_INTERVAL_MAX = 3600 # 刷新间隔最大秒数(默认 60 分钟) TOKEN_AUTO_REFRESH = True # 是否启用自动刷新 TOKEN_BACKGROUND_REFRESH = True # 是否启用后台定时刷新(防止长时间不用失效) # 媒体文件外网访问地址 # 在 Hugging Face Spaces 中,可以直接获取 SPACE_ID 来构造 URL SPACE_ID = os.getenv("SPACE_ID") if SPACE_ID: MEDIA_BASE_URL = f"https://{SPACE_ID.replace('/', '-')}.hf.space" else: MEDIA_BASE_URL = os.getenv("MEDIA_BASE_URL", "http://127.0.0.1:7860") # ============================== import random from datetime import datetime # 后台刷新任务控制 _background_refresh_task = None _background_refresh_stop = False app = FastAPI(title="Gemini OpenAI API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 静态文件路由 (用于示例图片) from fastapi.responses import FileResponse # 生成的媒体文件缓存目录 MEDIA_CACHE_DIR = os.path.join(os.path.dirname(__file__), "media_cache") os.makedirs(MEDIA_CACHE_DIR, exist_ok=True) @app.get("/static/{filename}") async def serve_static(filename: str): """提供静态文件(示例图片等)""" file_path = os.path.join(os.path.dirname(__file__), filename) if os.path.exists(file_path): return FileResponse(file_path) raise HTTPException(status_code=404, detail="文件不存在") @app.get("/media/{media_filename}") async def serve_media(media_filename: str): """提供缓存的媒体文件""" # 安全检查:只允许字母数字、下划线、点和常见后缀 import re if not re.match(r'^[a-zA-Z0-9_-]+(\.(png|jpg|jpeg|gif|webp|mp4))?$', media_filename): raise HTTPException(status_code=400, detail="无效的媒体文件名") # 直接查找文件(带后缀名) file_path = os.path.join(MEDIA_CACHE_DIR, media_filename) if os.path.exists(file_path): return FileResponse(file_path) # 兼容旧版本:不带后缀名的请求,尝试查找匹配的文件 media_id = media_filename.rsplit('.', 1)[0] if '.' in media_filename else media_filename for ext in [".png", ".jpg", ".jpeg", ".gif", ".webp", ".mp4"]: file_path = os.path.join(MEDIA_CACHE_DIR, media_id + ext) if os.path.exists(file_path): return FileResponse(file_path) raise HTTPException(status_code=404, detail="媒体文件不存在") def cleanup_old_media(max_age_hours: int = 1): """清理过期的媒体缓存文件""" import time now = time.time() max_age_seconds = max_age_hours * 3600 try: for filename in os.listdir(MEDIA_CACHE_DIR): file_path = os.path.join(MEDIA_CACHE_DIR, filename) if os.path.isfile(file_path): file_age = now - os.path.getmtime(file_path) if file_age > max_age_seconds: os.remove(file_path) except Exception: pass # 存储有效的 session token _admin_sessions = set() def generate_session_token(): """生成随机 session token""" return secrets.token_hex(32) def verify_admin_session(request: Request): """验证管理员 session""" token = request.cookies.get("admin_session") # 增加调试日志 if not token or token not in _admin_sessions: return False return True # 默认可用模型列表 (Gemini 3 官网三个模型: 快速/思考/Pro) DEFAULT_MODELS = ["gemini-3.0-flash", "gemini-3.0-flash-thinking", "gemini-3.0-pro"] # 默认模型 ID (用于请求头选择模型) DEFAULT_MODEL_IDS = { "flash": "56fdd199312815e2", "pro": "e6fa609c3fa255c0", "thinking": "e051ce1aa80aa576", } # 配置存储 (内存中的镜像,初始为默认值) _config = { "SNLM0E": "", "SECURE_1PSID": "", "SECURE_1PSIDTS": "", "SAPISID": "", "SID": "", "HSID": "", "SSID": "", "APISID": "", "PUSH_ID": "", "FULL_COOKIE": "", # 存储完整cookie字符串 "API_KEY": os.getenv("API_KEY", "sk-geminixxxxx"), # 动态 API Key "MODELS": DEFAULT_MODELS.copy(), # 可用模型列表 "MODEL_IDS": DEFAULT_MODEL_IDS.copy(), # 模型 ID 映射 } # Cookie 字段映射 (浏览器cookie名 -> 配置字段名) COOKIE_FIELD_MAP = { "__Secure-1PSID": "SECURE_1PSID", "__Secure-1PSIDTS": "SECURE_1PSIDTS", "SAPISID": "SAPISID", "__Secure-1PAPISID": "SAPISID", # 也映射到 SAPISID "SID": "SID", "HSID": "HSID", "SSID": "SSID", "APISID": "APISID", } def parse_cookie_string(cookie_str: str) -> dict: """解析完整cookie字符串,提取所需字段""" result = {} if not cookie_str: return result for item in cookie_str.split(";"): item = item.strip() if "=" in item: eq_index = item.index("=") key = item[:eq_index].strip() value = item[eq_index + 1:].strip() if key in COOKIE_FIELD_MAP: result[COOKIE_FIELD_MAP[key]] = value return result def fetch_tokens_from_page(cookies_str: str) -> dict: """从 Gemini 页面自动获取 SNLM0E、PUSH_ID 和可用模型列表""" result = {"snlm0e": "", "push_id": "", "models": []} try: session = httpx.Client( timeout=30.0, follow_redirects=True, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", } ) # 设置 cookies for item in cookies_str.split(";"): item = item.strip() if "=" in item: key, value = item.split("=", 1) session.cookies.set(key.strip(), value.strip(), domain=".google.com") resp = session.get("https://gemini.google.com") if resp.status_code != 200: return result html = resp.text # 获取 SNLM0E (AT Token) snlm0e_patterns = [ r'"SNlM0e":"([^"]+)"', r'SNlM0e["\s:]+["\']([^"\']+)["\']', r'"at":"([^"]+)"', ] for pattern in snlm0e_patterns: match = re.search(pattern, html) if match: result["snlm0e"] = match.group(1) break # 获取 PUSH_ID push_id_patterns = [ r'"push[_-]?id["\s:]+["\'](feeds/[a-z0-9]+)["\']', r'push[_-]?id["\s:=]+["\'](feeds/[a-z0-9]+)["\']', r'feedName["\s:]+["\'](feeds/[a-z0-9]+)["\']', r'clientId["\s:]+["\'](feeds/[a-z0-9]+)["\']', r'(feeds/[a-z0-9]{14,})', ] for pattern in push_id_patterns: matches = re.findall(pattern, html, re.IGNORECASE) if matches: result["push_id"] = matches[0] break # 获取可用模型列表 (从页面中提取 gemini 模型 ID) model_patterns = [ r'"(gemini-[a-z0-9\.\-]+)"', # 匹配 "gemini-xxx" 格式 r"'(gemini-[a-z0-9\.\-]+)'", # 匹配 'gemini-xxx' 格式 ] models_found = set() for pattern in model_patterns: matches = re.findall(pattern, html, re.IGNORECASE) for m in matches: # 过滤有效的模型名称 if any(x in m.lower() for x in ['flash', 'pro', 'ultra', 'nano']): models_found.add(m) if models_found: result["models"] = sorted(list(models_found)) # 获取模型 ID (用于 x-goog-ext-525001261-jspb 请求头) # 这些 ID 用于选择不同的模型版本 model_id_pattern = r'\["([a-f0-9]{16})","gemini[^"]*(?:flash|pro|thinking)[^"]*"\]' model_ids = re.findall(model_id_pattern, html, re.IGNORECASE) if model_ids: result["model_ids"] = list(set(model_ids)) #备用方案:直接搜索 16 位十六进制 ID(在模型配置附近) if not result.get("model_ids"): # 搜索类似 "56fdd199312815e2" 的模式 hex_id_pattern = r'"([a-f0-9]{16})"' # 在包含 gemini 或 model 的上下文中查找 context_pattern = r'.{0,100}(?:gemini|model|flash|pro|thinking).{0,100}' contexts = re.findall(context_pattern, html, re.IGNORECASE) hex_ids = set() for ctx in contexts: ids = re.findall(hex_id_pattern, ctx) hex_ids.update(ids) if hex_ids: result["model_ids"] = list(hex_ids) return result except Exception: return result _client = None _last_token_refresh = 0 # 上次 token 刷新时间 _token_refresh_count = 0 # token 刷新次数统计 def try_refresh_tokens(force: bool = False) -> dict: """ 尝试刷新 token Args: force: 是否强制刷新,忽略时间间隔 Returns: dict: {"success": bool, "message": str, "snlm0e": str, "push_id": str} """ global _client, _last_token_refresh, _token_refresh_count, _config result = {"success": False, "message": "", "snlm0e": "", "push_id": ""} if not TOKEN_AUTO_REFRESH and not force: result["message"] = "自动刷新已禁用" return result current_time = time.time() now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 检查是否需要刷新(除非强制刷新) if not force and (current_time - _last_token_refresh) < TOKEN_REFRESH_INTERVAL_MIN: result["message"] = f"距离上次刷新不足 {TOKEN_REFRESH_INTERVAL_MIN} 秒" return result try: # 如果 client 存在,使用 client 的刷新方法 if _client is not None: refresh_result = _client.refresh_tokens() if refresh_result["success"]: # 更新配置 if refresh_result["snlm0e"]: _config["SNLM0E"] = refresh_result["snlm0e"] result["snlm0e"] = refresh_result["snlm0e"] if refresh_result["push_id"]: _config["PUSH_ID"] = refresh_result["push_id"] result["push_id"] = refresh_result["push_id"] # 保存配置 save_config() _last_token_refresh = current_time _token_refresh_count += 1 result["success"] = True result["message"] = f"Token 刷新成功 (第 {_token_refresh_count} 次)" print(f"✅ [{now_str}] Token 自动刷新成功 (第 {_token_refresh_count} 次)") else: result["message"] = refresh_result.get("error", "刷新失败") print(f"⚠️ [{now_str}] Token 刷新失败: {result['message']}") else: # client 不存在,使用 fetch_tokens_from_page cookies = _config.get("FULL_COOKIE", "") if not cookies: cookies = f"__Secure-1PSID={_config.get('SECURE_1PSID', '')}" if _config.get("SECURE_1PSIDTS"): cookies += f"; __Secure-1PSIDTS={_config['SECURE_1PSIDTS']}" tokens = fetch_tokens_from_page(cookies) if tokens.get("snlm0e"): _config["SNLM0E"] = tokens["snlm0e"] result["snlm0e"] = tokens["snlm0e"] if tokens.get("push_id"): _config["PUSH_ID"] = tokens["push_id"] result["push_id"] = tokens["push_id"] if tokens.get("snlm0e"): save_config() _last_token_refresh = current_time _token_refresh_count += 1 result["success"] = True result["message"] = f"Token 刷新成功 (第 {_token_refresh_count} 次)" print(f"✅ [{now_str}] Token 自动刷新成功 (第 {_token_refresh_count} 次)") else: result["message"] = "无法从页面获取新 token" return result except Exception as e: result["message"] = f"刷新异常: {str(e)}" print(f"❌ [{now_str}] Token 刷新异常: {e}") return result def reset_client(): """重置 client,下次请求时会重新创建""" global _client _client = None now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"🔄 [{now_str}] Client 已重置,下次请求将重新创建") # ============ 后台定时刷新任务 ============ def get_current_time_str(): """获取当前时间字符串""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def get_random_refresh_interval(): """获取随机刷新间隔""" return random.randint(TOKEN_REFRESH_INTERVAL_MIN, TOKEN_REFRESH_INTERVAL_MAX) async def background_token_refresh(): """后台定时刷新 token 任务""" global _background_refresh_stop print(f"🔄 [{get_current_time_str()}] 后台 Token 定时刷新任务已启动") while not _background_refresh_stop: try: # 随机等待间隔 interval = get_random_refresh_interval() print(f"⏳ [{get_current_time_str()}] 下次刷新将在 {interval} 秒后") await asyncio.sleep(interval) if _background_refresh_stop: break if not TOKEN_BACKGROUND_REFRESH: continue # 执行刷新 print(f"⏰ [{get_current_time_str()}] 后台定时刷新 Token...") result = try_refresh_tokens(force=True) if result["success"]: print(f"✅ [{get_current_time_str()}] 后台刷新成功: {result['message']}") else: print(f"⚠️ [{get_current_time_str()}] 后台刷新失败: {result['message']}") except asyncio.CancelledError: break except Exception as e: print(f"❌ [{get_current_time_str()}] 后台刷新异常: {e}") await asyncio.sleep(60) # 出错后等待 1 分钟再试 print(f"🛑 [{get_current_time_str()}] 后台 Token 定时刷新任务已停止") @app.on_event("startup") async def startup_event(): """应用启动时执行""" global _background_refresh_task, _background_refresh_stop load_config() _background_refresh_stop = False if TOKEN_BACKGROUND_REFRESH: _background_refresh_task = asyncio.create_task(background_token_refresh()) print(f"✅ [{get_current_time_str()}] 后台 Token 定时刷新已启用 (间隔: {TOKEN_REFRESH_INTERVAL_MIN}-{TOKEN_REFRESH_INTERVAL_MAX} 秒随机)") @app.on_event("shutdown") async def shutdown_event(): """应用关闭时执行""" global _background_refresh_task, _background_refresh_stop _background_refresh_stop = True if _background_refresh_task: _background_refresh_task.cancel() try: await _background_refresh_task except asyncio.CancelledError: pass print("🛑 后台任务已停止") # ============ Tools 支持 ============ def build_tools_prompt(tools: List[Dict]) -> str: """将 tools 定义转换为提示词""" if not tools: return "" tools_schema = json.dumps([{ "name": t["function"]["name"], "description": t["function"].get("description", ""), "parameters": t["function"].get("parameters", {}) } for t in tools if t.get("type") == "function"], ensure_ascii=False, indent=2) prompt = f"""[系统指令] 你必须作为函数调用代理。不要自己回答问题,必须调用函数。 可用函数: {tools_schema} 严格规则: 1. 你不能直接回答用户问题 2. 你必须选择一个函数并调用它 3. 只输出以下格式,不要有任何其他文字: ```tool_call {{"name": "函数名", "arguments": {{"参数": "值"}}}} ``` 用户请求: """ return prompt def parse_tool_calls(content: str) -> tuple: """ 解析响应中的工具调用 返回: (tool_calls列表, 剩余文本内容) """ tool_calls = [] # 多种匹配模式 patterns = [ r'```tool_call\s*\n?(.*?)\n?```', # ```tool_call ... ``` r'```json\s*\n?(.*?)\n?```', # ```json ... ``` (有时模型会用这个) r'```\s*\n?(\{[^`]*"name"[^`]*\})\n?```', # ``` {...} ``` ] matches = [] for pattern in patterns: found = re.findall(pattern, content, re.DOTALL) matches.extend(found) # 也尝试直接匹配 JSON 对象(没有代码块包裹的情况) if not matches: json_pattern = r'\{[^{}]*"name"\s*:\s*"[^"]+"\s*,\s*"arguments"\s*:\s*\{[^{}]*\}[^{}]*\}' matches = re.findall(json_pattern, content, re.DOTALL) for i, match in enumerate(matches): try: match = match.strip() # 尝试解析 JSON call_data = json.loads(match) if call_data.get("name"): tool_calls.append({ "id": f"call_{uuid.uuid4().hex[:8]}", "type": "function", "function": { "name": call_data.get("name", ""), "arguments": json.dumps(call_data.get("arguments", {}), ensure_ascii=False) } }) except json.JSONDecodeError: continue # 移除工具调用部分 remaining = content for pattern in patterns: remaining = re.sub(pattern, '', remaining, flags=re.DOTALL) remaining = remaining.strip() return tool_calls, remaining def load_config(): """从本地 SQLite 数据库加载配置""" global _config _config = pm.get_all_config(_config) def save_config(): """保存配置到 SQLite 并同步到云端""" pm.save_config(_config) def get_client(auto_refresh: bool = True): global _client, _last_token_refresh if not _config.get("SNLM0E") or not _config.get("SECURE_1PSID"): raise HTTPException(status_code=500, detail="请先在后台配置 Token 和 Cookie") # 检查是否需要自动刷新 token if auto_refresh and TOKEN_AUTO_REFRESH: current_time = time.time() if (current_time - _last_token_refresh) >= TOKEN_REFRESH_INTERVAL_MIN: try_refresh_tokens() # 如果 client 已存在,直接复用,保持会话上下文 if _client is not None: return _client cookies = f"__Secure-1PSID={_config['SECURE_1PSID']}" if _config.get("SECURE_1PSIDTS"): cookies += f"; __Secure-1PSIDTS={_config['SECURE_1PSIDTS']}" if _config.get("SAPISID"): cookies += f"; SAPISID={_config['SAPISID']}; __Secure-1PAPISID={_config['SAPISID']}" if _config.get("SID"): cookies += f"; SID={_config['SID']}" if _config.get("HSID"): cookies += f"; HSID={_config['HSID']}" if _config.get("SSID"): cookies += f"; SSID={_config['SSID']}" if _config.get("APISID"): cookies += f"; APISID={_config['APISID']}" # 构建媒体文件的基础 URL (优先使用配置的外网地址) media_base_url = MEDIA_BASE_URL if MEDIA_BASE_URL else f"http://localhost:{PORT}" from client import GeminiClient _client = GeminiClient( secure_1psid=_config["SECURE_1PSID"], snlm0e=_config["SNLM0E"], cookies_str=cookies, push_id=_config.get("PUSH_ID") or None, model_ids=_config.get("MODEL_IDS") or DEFAULT_MODEL_IDS, debug=False, media_base_url=media_base_url, ) return _client def get_login_html(): return '''
请登录以访问后台管理
配置 Google Gemini 的认证信息,保存后即可调用 API 退出登录
Base URL:
API Key:
可用模型: gemini-3.0-flash | gemini-3.0-pro | gemini-3.0-flash-thinking
from openai import OpenAI
client = OpenAI(base_url="", api_key="")
response = client.chat.completions.create(
model="gemini-3.0-flash", # 或 gemini-3.0-pro / gemini-3.0-flash-thinking
messages=[{"role": "user", "content": "你好"}]
)
print(response.choices[0].message.content)
import base64
from openai import OpenAI
client = OpenAI(base_url="", api_key="")
# 读取本地图片
with open("image.png", "rb") as f:
img_b64 = base64.b64encode(f.read()).decode()
response = client.chat.completions.create(
model="gemini-3.0-flash",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": "请描述这张图片"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_b64}"}}
]
}]
)
print(response.choices[0].message.content)
stream = client.chat.completions.create(
model="gemini-3.0-flash",
messages=[{"role": "user", "content": "写一首诗"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
以下是 image.png 示例图片,可用于测试图片识别功能(点击放大):