import asyncio import os import json import logging import uvicorn import argparse import time import re from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import Optional, List, Union, Dict, Any from gemini_webapi import GeminiClient, set_log_level from httpx import RemoteProtocolError, ReadTimeout load_dotenv() # 解析命令行参数 def parse_args(): parser = argparse.ArgumentParser(description='Gemini API 代理服务器') parser.add_argument('--init-timeout', type=float, default=float(os.environ.get('GEMINI_INIT_TIMEOUT', '180')), help='客户端初始化超时时间(秒)') parser.add_argument('--request-timeout', type=float, default=float(os.environ.get('GEMINI_REQUEST_TIMEOUT', '300')), help='请求处理超时时间(秒)') parser.add_argument('--auto-close', type=lambda x: x.lower() == 'true', default=os.environ.get('GEMINI_AUTO_CLOSE', 'false').lower() == 'true', help='是否自动关闭客户端') parser.add_argument('--close-delay', type=float, default=float(os.environ.get('GEMINI_CLOSE_DELAY', '300')), help='自动关闭延迟时间(秒)') parser.add_argument('--auto-refresh', type=lambda x: x.lower() == 'true', default=os.environ.get('GEMINI_AUTO_REFRESH', 'true').lower() == 'true', help='是否自动刷新会话') parser.add_argument('--refresh-interval', type=float, default=float(os.environ.get('GEMINI_REFRESH_INTERVAL', '540')), help='刷新间隔(秒)') parser.add_argument('--host', type=str, default=os.environ.get('GEMINI_HOST', '0.0.0.0'), help='服务器监听地址') parser.add_argument('--port', type=int, default=int(os.environ.get('GEMINI_PORT', '7860')), help='服务器端口') # 添加重试相关参数 parser.add_argument('--max-retries', type=int, default=int(os.environ.get('GEMINI_MAX_RETRIES', '3')), help='最大重试次数') parser.add_argument('--retry-delay', type=float, default=float(os.environ.get('GEMINI_RETRY_DELAY', '2')), help='重试间隔时间(秒)') parser.add_argument('--retry-exceptions', type=str, default=os.environ.get('GEMINI_RETRY_EXCEPTIONS', 'RemoteProtocolError,ReadTimeout'), help='需要重试的异常类型,以逗号分隔') parser.add_argument('--long-response-mode', type=lambda x: x.lower() == 'true', default=os.environ.get('GEMINI_LONG_RESPONSE_MODE', 'true').lower() == 'true', help='是否启用长响应模式,在此模式下会等待更长时间而不是立即重试') parser.add_argument('--long-response-wait', type=float, default=float(os.environ.get('GEMINI_LONG_RESPONSE_WAIT', '180')), help='长响应模式下的等待时间(秒),超过此时间才会重试') parser.add_argument('--max-long-response-retries', type=int, default=int(os.environ.get('GEMINI_MAX_LONG_RESPONSE_RETRIES', '5')), help='长响应模式下的最大重试次数') parser.add_argument('--keep-conversation-history', type=lambda x: x.lower() == 'true', default=os.environ.get('GEMINI_KEEP_CONVERSATION_HISTORY', 'true').lower() == 'true', help='是否保存对话历史,在SillyTavern等应用中非常有用') parser.add_argument('--filter-thinking-vessel', type=lambda x: x.lower() == 'true', default=os.environ.get('GEMINI_FILTER_THINKING_VESSEL', 'true').lower() == 'true', help='是否过滤thinking和vessel标签') return parser.parse_args() # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler("gemini_proxy.log") ] ) logger = logging.getLogger("gemini-proxy") # 设置Gemini API的日志级别 set_log_level("DEBUG") # Gemini API凭据 SECURE_1PSID = os.environ.get("GEMINI_SECURE_1PSID", "") SECURE_1PSIDTS = os.environ.get("GEMINI_SECURE_1PSIDTS", "") # logger.info(f"SECURE_1PSID:{SECURE_1PSID}") # logger.info(f"SECURE_1PSIDTS:{SECURE_1PSIDTS}") # 支持的模型列表 SUPPORTED_MODELS = [ { "id": "gemini-pro", "object": "model", "created": 1678892800, "owned_by": "google", "permission": [], "root": "gemini-pro", "parent": None }, { "id": "gemini-2.5-flash-preview-04-17", "object": "model", "created": 1713571200, # 2024年4月20日(估计的日期) "owned_by": "google", "permission": [], "root": "gemini-2.5-flash-preview-04-17", "parent": None }, { "id": "gemini-2.5-exp-advanced", "object": "model", "created": 1713571200, "owned_by": "google", "permission": [], "root": "gemini-2.5-exp-advanced", "parent": None }, { "id": "gemini-2.0-exp-advanced", "object": "model", "created": 1713571200, "owned_by": "google", "permission": [], "root": "gemini-2.0-exp-advanced", "parent": None }, { "id": "gemini-2.5-pro", "object": "model", "created": 1713571200, "owned_by": "google", "permission": [], "root": "gemini-2.5-pro", "parent": None }, { "id": "gemini-2.5-flash", "object": "model", "created": 1713571200, "owned_by": "google", "permission": [], "root": "gemini-2.5-flash", "parent": None }, { "id": "gemini-2.0-flash-thinking", "object": "model", "created": 1713571200, "owned_by": "google", "permission": [], "root": "gemini-2.0-flash-thinking", "parent": None }, { "id": "gemini-2.0-flash", "object": "model", "created": 1713571200, "owned_by": "google", "permission": [], "root": "gemini-2.0-flash", "parent": None }, { "id": "gemini-2.5-pro-exp-03-25", "object": "model", "created": 1711324800, # 2024年3月25日 "owned_by": "google", "permission": [], "root": "gemini-2.5-pro-exp-03-25", "parent": None }, { "id": "gemini-2.5-pro-preview-03-25", "object": "model", "created": 1711324800, # 2024年3月25日 "owned_by": "google", "permission": [], "root": "gemini-2.5-pro-preview-03-25", "parent": None } ] app = FastAPI(title="Gemini API Proxy") # 添加CORS中间件,允许跨域请求 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有源 allow_credentials=True, allow_methods=["*"], # 允许所有方法 allow_headers=["*"], # 允许所有头 ) # 创建Gemini客户端实例 client = None # 存储会话ID到会话对象的映射 sessions = {} # 存储会话ID到对话历史的映射 conversation_history = {} # 过滤thinking和vessel标签的函数 def filter_thinking_vessel(text): if not text: return text # 过滤\...标签 text = re.sub(r'\\[\s\S]*?\\<\/thinking\\>', '', text) # 过滤...标签 text = re.sub(r'[\s\S]*?<\/thinking>', '', text) # 过滤\...标签 text = re.sub(r'\\[\s\S]*?\\<\/vessel\\>', '', text) # 过滤...标签 text = re.sub(r'[\s\S]*?<\/vessel>', '', text) return text # 存储超时配置 config = { "init_timeout": 180, "request_timeout": 300, "auto_close": False, "close_delay": 300, "auto_refresh": True, "refresh_interval": 540, # 添加重试配置 "max_retries": 3, "retry_delay": 2, "retry_exceptions": ["RemoteProtocolError", "ReadTimeout"], # 长响应模式配置 "long_response_mode": True, "long_response_wait": 180, "max_long_response_retries": 5, # 长响应模式下的最大重试次数 # 对话历史配置 "keep_conversation_history": True, # 是否保存对话历史 "filter_thinking_vessel": True # 是否过滤thinking和vessel标签 } class ChatRequest(BaseModel): messages: List[Dict[str, Any]] model: Optional[str] = None stream: Optional[bool] = True temperature: Optional[float] = None max_tokens: Optional[int] = None session_id: Optional[str] = None class Message(BaseModel): role: str content: str @app.on_event("startup") async def startup_event(): global client try: logger.info("正在初始化Gemini客户端...") logger.info(f"使用初始化超时: {config['init_timeout']}秒") client = GeminiClient(SECURE_1PSID, SECURE_1PSIDTS) await client.init( timeout=config['init_timeout'], auto_close=config['auto_close'], close_delay=config['close_delay'], auto_refresh=config['auto_refresh'], refresh_interval=config['refresh_interval'] ) logger.info("Gemini客户端初始化成功") print("Gemini客户端初始化成功") except Exception as e: logger.error(f"Gemini客户端初始化失败: {e}", exc_info=True) raise e @app.on_event("shutdown") async def shutdown_event(): global client if client: try: logger.info("正在关闭Gemini客户端...") await client.close() logger.info("Gemini客户端已关闭") print("Gemini客户端已关闭") except Exception as e: logger.error(f"关闭Gemini客户端时出错: {e}", exc_info=True) @app.get("/") async def root(): logger.info("收到根路径请求") return {"message": "Gemini API Proxy服务正在运行"} @app.get("/v1/models") async def list_models(): """ 列出支持的模型。 这是一个OpenAI API兼容端点,返回支持的模型列表。 """ logger.info("收到列出模型请求") return { "object": "list", "data": SUPPORTED_MODELS } @app.post("/v1/chat/completions") async def chat_completions(request: ChatRequest): """ 兼容OpenAI风格的聊天完成API。 这个接口可以被SillyTavern等应用使用。 """ global client, sessions, conversation_history logger.info(f"收到聊天完成请求, 模型: {request.model}, 会话ID: {request.session_id}, 流式: {request.stream}") if not client or not client.running: try: logger.warning("客户端未初始化或已关闭,尝试重新初始化...") client = GeminiClient(SECURE_1PSID, SECURE_1PSIDTS) await client.init( timeout=config['init_timeout'], auto_close=config['auto_close'], close_delay=config['close_delay'], auto_refresh=config['auto_refresh'], refresh_interval=config['refresh_interval'] ) logger.info("重新初始化成功") except Exception as e: logger.error(f"Gemini客户端初始化失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Gemini客户端初始化失败: {str(e)}") try: session_id = request.session_id prompt = "" system_message = "" conversation_prompt = "" processed_messages_for_history = [] # 用于存储处理过的消息,以便添加到历史记录 for message in request.messages: role = message.get("role", "") raw_content = message.get("content", "") # 原始content,可能是 str 或 list # text_content_for_prompt 将是用于构建prompt的纯文本字符串 text_content_for_prompt = "" if isinstance(raw_content, str): text_content_for_prompt = raw_content elif isinstance(raw_content, list): # 从列表中提取所有文本部分并拼接 temp_text_parts = [] for part in raw_content: if isinstance(part, dict) and part.get("type") == "text": temp_text_parts.append(part.get("text", "")) text_content_for_prompt = "\\n".join(temp_text_parts) # 对提取出的文本应用过滤规则 final_text_for_prompt = text_content_for_prompt if config['filter_thinking_vessel']: final_text_for_prompt = filter_thinking_vessel(text_content_for_prompt) # 将原始的 raw_content (str 或 list) 添加到待处理历史记录列表 # 但要确保内容不为空或仅包含无效部分 is_effectively_empty_for_history = False if isinstance(raw_content, str) and not raw_content.strip(): is_effectively_empty_for_history = True elif isinstance(raw_content, list): has_meaningful_part = False for part in raw_content: if isinstance(part, dict): if part.get("type") == "text" and part.get("text", "").strip(): has_meaningful_part = True; break elif part.get("type") != "text": # 例如 image_url has_meaningful_part = True; break if not has_meaningful_part: is_effectively_empty_for_history = True if not is_effectively_empty_for_history: processed_messages_for_history.append({"role": role, "content": raw_content}) # 处理系统消息 if role == "system": system_message = final_text_for_prompt # system_message 始终是过滤后的字符串 continue # raw_content 已加入 processed_messages_for_history # 将过滤后的文本添加到对话prompt if final_text_for_prompt.strip(): role_prefix = "User: " if role == "user" else "Assistant: " conversation_prompt += f"{role_prefix}{final_text_for_prompt}\\n\\n" # 构建最终的prompt字符串 if system_message and conversation_prompt: prompt = f"{system_message}\\n\\n{conversation_prompt}" elif conversation_prompt: prompt = conversation_prompt elif system_message: prompt = system_message if not prompt: logger.error("请求中没有找到有效消息 (after processing for prompt)") raise HTTPException(status_code=400, detail="没有找到有效消息 (after processing for prompt)") # 更新对话历史(在实际发送请求之前) if config['keep_conversation_history'] and session_id: if session_id not in conversation_history: conversation_history[session_id] = [] for msg_hist_entry in processed_messages_for_history: hist_role = msg_hist_entry["role"] hist_content_original = msg_hist_entry["content"] # str or list content_to_store_in_history = hist_content_original # 仅对非助手消息应用历史过滤,助手消息在接收后过滤 if config['filter_thinking_vessel'] and hist_role != "assistant": if isinstance(hist_content_original, str): content_to_store_in_history = filter_thinking_vessel(hist_content_original) elif isinstance(hist_content_original, list): filtered_list_parts = [] for part in hist_content_original: if isinstance(part, dict) and part.get("type") == "text": filtered_text = filter_thinking_vessel(part.get("text", "")) filtered_list_parts.append({"type": "text", "text": filtered_text}) else: filtered_list_parts.append(part) # 保留非文本部分 content_to_store_in_history = filtered_list_parts # 避免重复添加的简单检查 is_duplicate = False if conversation_history[session_id]: last_entry = conversation_history[session_id][-1] if last_entry["role"] == hist_role and \ isinstance(last_entry["content"], type(content_to_store_in_history)) and \ last_entry["content"] == content_to_store_in_history: is_duplicate = True if not is_duplicate: # 再次检查待存历史的内容是否有效 is_effectively_empty_hist_store = False if isinstance(content_to_store_in_history, str) and not content_to_store_in_history.strip(): is_effectively_empty_hist_store = True elif isinstance(content_to_store_in_history, list): has_meaningful_part_hist_store = False for part_hist in content_to_store_in_history: if isinstance(part_hist, dict): if part_hist.get("type") == "text" and part_hist.get("text", "").strip(): has_meaningful_part_hist_store = True; break elif part_hist.get("type") != "text": has_meaningful_part_hist_store = True; break if not has_meaningful_part_hist_store: is_effectively_empty_hist_store = True if not is_effectively_empty_hist_store: conversation_history[session_id].append({"role": hist_role, "content": content_to_store_in_history}) chat_session = None if session_id and session_id in sessions: logger.info(f"使用现有会话 {session_id}") chat_session = sessions[session_id] if request.model: chat_session.model = request.model else: logger.info("创建新会话") chat_session = client.start_chat(model=request.model or "gemini-pro") if session_id: logger.info(f"将新会话存储为ID {session_id}") sessions[session_id] = chat_session request_model_id = request.model or chat_session.model or "gemini-pro" # 流式输出逻辑 if request.stream: logger.info(f"向会话 {chat_session.cid} 发送流式消息 (模拟): {prompt[:50]}...") async def stream_generator(): full_assistant_response_text = "" gemini_response_obj = None # To store the full GeminiResponse object try: # 1. 获取完整响应 async def get_full_gemini_response(): return await chat_session.send_message(prompt, timeout=config['request_timeout']) # retry_request 应该用于可以重试的单个请求/响应操作 # 我们在这里获取整个响应,然后分块 try: gemini_response_obj = await retry_request(get_full_gemini_response) full_assistant_response_text = gemini_response_obj.text logger.info(f"已获取完整响应用于模拟流,长度: {len(full_assistant_response_text)}") except Exception as e_fetch: logger.error(f"获取完整响应以进行模拟流式处理时出错: {e_fetch}", exc_info=True) error_response_chunk = { "id": f"chatcmpl-error-{chat_session.cid}", "object": "chat.completion.chunk", "created": int(asyncio.get_event_loop().time()), "model": request_model_id, "choices": [ { "index": 0, "delta": {"role": "assistant", "content": f"Error fetching response: {str(e_fetch)}"}, "finish_reason": "error" } ] } yield f"data: {json.dumps(error_response_chunk)}\n\n" yield "data: [DONE]\n\n" # 即使出错,也尝试记录到历史 if config['keep_conversation_history'] and session_id: error_content_for_history = f"Error fetching response: {str(e_fetch)}" if config['filter_thinking_vessel']: error_content_for_history = filter_thinking_vessel(error_content_for_history) conversation_history[session_id].append({"role": "assistant", "content": error_content_for_history}) return # 2. 将完整响应分块并以SSE格式发送 # 可以根据需要调整分块大小或逻辑(例如按词、按句子) chunk_size = 20 # 例如每次发送20个字符 for i in range(0, len(full_assistant_response_text), chunk_size): text_chunk = full_assistant_response_text[i:i+chunk_size] stream_response_chunk = { "id": f"chatcmpl-{chat_session.cid}", "object": "chat.completion.chunk", "created": int(asyncio.get_event_loop().time()), "model": request_model_id, "choices": [ { "index": 0, "delta": {"content": text_chunk}, # 首次块可包含 role: assistant "finish_reason": None } ] } # 第一个块可以包含 role if i == 0: stream_response_chunk["choices"][0]["delta"]["role"] = "assistant" yield f"data: {json.dumps(stream_response_chunk)}\n\n" await asyncio.sleep(0.02) # 短暂延迟以模拟真实流 # 3. 发送流结束标记 final_chunk = { "id": f"chatcmpl-{chat_session.cid}", "object": "chat.completion.chunk", "created": int(asyncio.get_event_loop().time()), "model": request_model_id, "choices": [ { "index": 0, "delta": {}, "finish_reason": "stop" } ], "usage": { "prompt_tokens": len(prompt), "completion_tokens": len(full_assistant_response_text), "total_tokens": len(prompt) + len(full_assistant_response_text) } } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" # 模拟流式响应结束后,处理完整的助手回复并保存到历史 if config['keep_conversation_history'] and session_id: assistant_content_for_history = full_assistant_response_text if config['filter_thinking_vessel']: assistant_content_for_history = filter_thinking_vessel(assistant_content_for_history) if assistant_content_for_history.strip(): conversation_history[session_id].append({"role": "assistant", "content": assistant_content_for_history}) logger.info(f"成功完成模拟流式响应,总长度: {len(full_assistant_response_text)}") except Exception as e_stream_outer: # 这个外部 try-except 捕获 stream_generator 本身的未预料错误 logger.error(f"模拟流式处理中发生意外错误: {e_stream_outer}", exc_info=True) # 尝试发送一个最终的错误信息 error_response_chunk = { "id": f"chatcmpl-error-outer-{chat_session.cid}", "object": "chat.completion.chunk", "created": int(asyncio.get_event_loop().time()), "model": request_model_id, "choices": [ { "index": 0, "delta": {"role": "assistant", "content": f"Unexpected error during streaming: {str(e_stream_outer)}"}, "finish_reason": "error" } ] } yield f"data: {json.dumps(error_response_chunk)}\n\n" yield f"data: [DONE]\n\n" if config['keep_conversation_history'] and session_id: error_content_for_history = f"Unexpected error during streaming: {str(e_stream_outer)}" if config['filter_thinking_vessel']: error_content_for_history = filter_thinking_vessel(error_content_for_history) conversation_history[session_id].append({"role": "assistant", "content": error_content_for_history}) return StreamingResponse(stream_generator(), media_type="text/event-stream") # 非流式输出逻辑 (保持原有retry_request) else: logger.info(f"向会话 {chat_session.cid} 发送非流式消息: {prompt[:50]}...") async def send_message_with_session_logic(): return await chat_session.send_message(prompt, timeout=config['request_timeout']) response = await retry_request(send_message_with_session_logic) assistant_response_text = response.text # 过滤thinking和vessel标签,如果启用了该功能 if config['filter_thinking_vessel']: assistant_response_text_for_completion = filter_thinking_vessel(assistant_response_text) else: assistant_response_text_for_completion = assistant_response_text # 如果启用了对话历史功能,将助手的回复添加到历史记录中 if config['keep_conversation_history'] and session_id: # 历史记录中的助手回复也需要根据配置进行过滤 assistant_content_for_history = assistant_response_text # 原始回复 if config['filter_thinking_vessel']: assistant_content_for_history = filter_thinking_vessel(assistant_content_for_history) if assistant_content_for_history.strip(): # 只添加非空消息 conversation_history[session_id].append({"role": "assistant", "content": assistant_content_for_history}) completion_response = { "id": f"chatcmpl-{chat_session.cid}", "object": "chat.completion", "created": int(asyncio.get_event_loop().time()), "model": request_model_id, "choices": [ { "index": 0, "message": { "role": "assistant", "content": assistant_response_text_for_completion # 返回给客户端的响应 }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": len(prompt), "completion_tokens": len(assistant_response_text_for_completion), "total_tokens": len(prompt) + len(assistant_response_text_for_completion) } } logger.info(f"成功生成响应,长度: {len(assistant_response_text_for_completion)}") return completion_response except Exception as e: error_message = str(e) error_class = e.__class__.__name__ if "timeout" in error_message.lower(): logger.error(f"处理聊天完成请求时超时: {e}", exc_info=True) raise HTTPException( status_code=500, detail="请求处理超时。如果您正在生成长文本或使用高级模型(如带思维链的模型)," "可能需要更长的处理时间。请稍后重试,或尝试使用不同的模型。" ) elif error_class == "RemoteProtocolError": logger.error(f"服务器连接断开: {e}", exc_info=True) raise HTTPException( status_code=502, detail="Gemini服务器连接断开。这可能是由于服务器负载过高、网络不稳定或请求内容过于复杂。" "您可以尝试简化请求、使用不同的模型,或稍后重试。" ) else: logger.error(f"处理聊天完成请求时出错: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"处理请求时出错: {str(e)}") @app.post("/v1/gemini") async def gemini_direct(request: Request): """ 直接向Gemini API发送请求,不做任何格式转换。 """ global client logger.info("收到直接Gemini API请求") if not client or not client.running: try: logger.warning("客户端未初始化或已关闭,尝试重新初始化...") client = GeminiClient(SECURE_1PSID, SECURE_1PSIDTS) await client.init( timeout=config['init_timeout'], auto_close=config['auto_close'], close_delay=config['close_delay'], auto_refresh=config['auto_refresh'], refresh_interval=config['refresh_interval'] ) logger.info("重新初始化成功") except Exception as e: logger.error(f"Gemini客户端初始化失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Gemini客户端初始化失败: {str(e)}") try: # 获取请求体 body = await request.json() prompt = body.get("prompt", "") model = body.get("model", "") # 获取可能的模型参数 if not prompt: logger.error("请求中缺少'prompt'字段") raise HTTPException(status_code=400, detail="请求中缺少'prompt'字段") # 发送请求到Gemini,使用重试逻辑包装 logger.info(f"向Gemini发送直接请求: {prompt[:50]}...") async def generate_content_with_retry(): # 如果指定了模型,使用指定的模型 if model: return await client.generate_content(prompt, model=model, timeout=config['request_timeout']) else: return await client.generate_content(prompt, timeout=config['request_timeout']) response = await retry_request(generate_content_with_retry) # 构造响应 result = { "response": response.text, "images": [{"url": img.url, "title": img.title} for img in response.images], "thoughts": response.thoughts } logger.info(f"成功生成响应,长度: {len(response.text)}, 图片数量: {len(response.images)}") return result except Exception as e: error_message = str(e) error_class = e.__class__.__name__ if "timeout" in error_message.lower(): logger.error(f"处理直接Gemini请求时超时: {e}", exc_info=True) raise HTTPException( status_code=500, detail="请求处理超时。如果您正在生成长文本或使用高级模型(如带思维链的模型)," "可能需要更长的处理时间。请稍后重试,或尝试使用不同的模型。" ) elif error_class == "RemoteProtocolError": logger.error(f"服务器连接断开: {e}", exc_info=True) raise HTTPException( status_code=502, detail="Gemini服务器连接断开。这可能是由于服务器负载过高、网络不稳定或请求内容过于复杂。" "您可以尝试简化请求、使用不同的模型,或稍后重试。" ) else: logger.error(f"处理直接Gemini请求时出错: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"处理请求时出错: {str(e)}") @app.post("/v1/gemini/chat") async def gemini_chat(request: Request): """ 创建或使用一个Gemini聊天会话。 """ global client, sessions logger.info("收到Gemini聊天请求") if not client or not client.running: try: logger.warning("客户端未初始化或已关闭,尝试重新初始化...") client = GeminiClient(SECURE_1PSID, SECURE_1PSIDTS) await client.init( timeout=config['init_timeout'], auto_close=config['auto_close'], close_delay=config['close_delay'], auto_refresh=config['auto_refresh'], refresh_interval=config['refresh_interval'] ) logger.info("重新初始化成功") except Exception as e: logger.error(f"Gemini客户端初始化失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Gemini客户端初始化失败: {str(e)}") try: # 获取请求体 body = await request.json() prompt = body.get("prompt", "") session_id = body.get("session_id", "") model = body.get("model", "") # 添加模型参数的获取 if not prompt: logger.error("请求中缺少'prompt'字段") raise HTTPException(status_code=400, detail="请求中缺少'prompt'字段") # 获取或创建会话 chat_session = None if session_id and session_id in sessions: logger.info(f"使用现有会话 {session_id}") chat_session = sessions[session_id] if not chat_session: logger.info("创建新会话") chat_session = client.start_chat(model=model or "gemini-pro") if session_id: logger.info(f"将新会话存储为ID {session_id}") sessions[session_id] = chat_session # 发送消息,使用重试逻辑包装 logger.info(f"向会话发送消息: {prompt[:50]}...") if model: # 如果请求中指定了模型,更新会话的模型 chat_session.model = model async def send_message_with_retry(): return await chat_session.send_message(prompt, timeout=config['request_timeout']) response = await retry_request(send_message_with_retry) # 构造响应 result = { "session_id": chat_session.cid, "response": response.text, "images": [{"url": img.url, "title": img.title} for img in response.images], "thoughts": response.thoughts } logger.info(f"成功生成响应,长度: {len(response.text)}, 图片数量: {len(response.images)}") return result except Exception as e: error_message = str(e) error_class = e.__class__.__name__ if "timeout" in error_message.lower(): logger.error(f"处理Gemini聊天请求时超时: {e}", exc_info=True) raise HTTPException( status_code=500, detail="请求处理超时。如果您正在生成长文本或使用高级模型(如带思维链的模型)," "可能需要更长的处理时间。请稍后重试,或尝试使用不同的模型。" ) elif error_class == "RemoteProtocolError": logger.error(f"服务器连接断开: {e}", exc_info=True) raise HTTPException( status_code=502, detail="Gemini服务器连接断开。这可能是由于服务器负载过高、网络不稳定或请求内容过于复杂。" "您可以尝试简化请求、使用不同的模型,或稍后重试。" ) else: logger.error(f"处理Gemini聊天请求时出错: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"处理请求时出错: {str(e)}") # 添加重试请求辅助函数 async def retry_request(func, *args, **kwargs): """ 带重试功能的请求辅助函数 Parameters: ----------- func : 异步函数 要执行的异步函数 *args : 参数列表 传递给func的位置参数 **kwargs : 关键字参数 传递给func的关键字参数 Returns: -------- 返回func的执行结果 Raises: ------- 最后一次尝试的异常 """ max_retries = config["max_retries"] retry_delay = config["retry_delay"] retry_exceptions = config["retry_exceptions"] long_response_mode = config["long_response_mode"] long_response_wait = config["long_response_wait"] last_exception = None retry_count = 0 while retry_count <= max_retries: try: if retry_count > 0: logger.info(f"第{retry_count}次重试...") return await func(*args, **kwargs) except Exception as e: # 获取异常类型名称 exception_name = e.__class__.__name__ error_message = str(e) logger.warning(f"请求异常: {exception_name} - {error_message}") last_exception = e # 特殊处理RemoteProtocolError异常,可能是长响应导致的 if exception_name == "RemoteProtocolError" and "Server disconnected" in error_message and long_response_mode: # 使用单独的计数器跟踪长响应模式的重试次数 long_retry_count = getattr(func, '_long_retry_count', 0) + 1 setattr(func, '_long_retry_count', long_retry_count) max_long_retries = config.get("max_long_response_retries", 3) if long_retry_count <= max_long_retries: logger.info(f"检测到服务器断开连接,可能是长响应导致。启用长响应模式,等待 {long_response_wait} 秒... (第 {long_retry_count}/{max_long_retries} 次尝试)") # 在长响应模式下,我们不增加普通重试次数,而是使用单独的计数器 await asyncio.sleep(long_response_wait) # 等待更长时间 # 尝试直接重新调用原函数,而不是重新发送请求 logger.info("尝试重新连接并获取响应...") try: # 如果是聊天会话,尝试直接获取最新的消息 # 这里我们不重新发送请求,而是尝试直接获取响应 return await func(*args, **kwargs) except Exception as retry_error: logger.warning(f"重新连接失败: {str(retry_error)}") continue else: # 如果长响应模式重试次数超过了最大值,则切换回普通重试模式 logger.warning(f"长响应模式重试次数已达到最大值 {max_long_retries},切换回普通重试模式") # 重置长响应模式重试计数器 setattr(func, '_long_retry_count', 0) # 检查是否是可重试的异常 if exception_name in retry_exceptions and retry_count < max_retries: retry_count += 1 wait_time = retry_delay * (2 ** (retry_count - 1)) # 指数退避 logger.info(f"等待 {wait_time} 秒后重试...") await asyncio.sleep(wait_time) else: # 不可重试或已达到最大重试次数,抛出异常 raise # 理论上不会执行到这里,因为最后一次失败会在循环中抛出异常 raise last_exception if __name__ == "__main__": # 解析命令行参数并更新配置 args = parse_args() config.update({ "init_timeout": args.init_timeout, "request_timeout": args.request_timeout, "auto_close": args.auto_close, "close_delay": args.close_delay, "auto_refresh": args.auto_refresh, "refresh_interval": args.refresh_interval, # 更新重试配置 "max_retries": args.max_retries, "retry_delay": args.retry_delay, "retry_exceptions": args.retry_exceptions.split(','), # 更新长响应模式配置 "long_response_mode": args.long_response_mode, "long_response_wait": args.long_response_wait, "max_long_response_retries": args.max_long_response_retries, # 更新对话历史配置 "keep_conversation_history": args.keep_conversation_history, "filter_thinking_vessel": args.filter_thinking_vessel }) logger.info(f"启动Gemini API代理服务器,监听地址: {args.host}:{args.port}") logger.info(f"客户端配置: 初始化超时={config['init_timeout']}秒, 请求超时={config['request_timeout']}秒") logger.info(f"自动关闭={config['auto_close']}, 关闭延迟={config['close_delay']}秒") logger.info(f"自动刷新={config['auto_refresh']}, 刷新间隔={config['refresh_interval']}秒") logger.info(f"重试配置: 最大重试次数={config['max_retries']}, 重试间隔={config['retry_delay']}秒") logger.info(f"重试异常类型: {', '.join(config['retry_exceptions'])}") logger.info(f"长响应模式: 启用={config['long_response_mode']}, 等待时间={config['long_response_wait']}秒, 最大重试次数={config['max_long_response_retries']}") logger.info(f"对话历史配置: 保存历史={config['keep_conversation_history']}, 过滤thinking和vessel={config['filter_thinking_vessel']}") uvicorn.run("gemini_proxy_server:app", host=args.host, port=args.port)