| import logging |
| import json |
| import time |
| import asyncio |
| import os |
| import traceback |
| import sys |
| from contextlib import asynccontextmanager |
| import random |
|
|
| import uvicorn |
| from fastapi import FastAPI, Request, HTTPException |
| from fastapi.responses import StreamingResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from google import genai |
| from google.genai import types |
| from typing import Optional, List, Dict, Any |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s [%(levelname)s]: %(message)s', |
| datefmt='%Y-%m-%d %H:%M:%S' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
|
|
| |
| GEMINI_MODELS = { |
| "gemini-2.0-flash-exp": "gemini-2.0-flash-exp", |
| "gemini-2.5-flash-preview-05-20": "gemini-2.5-flash-preview-05-20", |
| "gemini-2.5-flash": "gemini-2.5-flash", |
| "gemini-2.5-flash-preview-04-17": "gemini-2.5-flash-preview-04-17", |
| "gemini-2.5-pro": "gemini-2.5-pro" |
| } |
|
|
| |
| SUPPORTED_MODELS = [ |
| { |
| "id": "gemini-2.5-flash-preview-05-20", |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "google", |
| "permission": [], |
| "root": "gemini-2.5-flash-preview-05-20", |
| "parent": None, |
| "description": "Gemini 2.5 Flash Preview - 最新实验性模型" |
| }, |
| { |
| "id": "gemini-2.5-flash-preview-04-17", |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "google", |
| "permission": [], |
| "root": "gemini-2.5-flash-preview-04-17", |
| "parent": None, |
| "description": "gemini-2.5-flash-preview-04-17- 经典专业模型" |
| }, |
| { |
| "id": "gemini-2.5-flash", |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "google", |
| "permission": [], |
| "root": "gemini-2.5-flash", |
| "parent": None, |
| "description": "gemini-2.5-flash稳定经典专业模型" |
| }, |
| { |
| "id": "gemini-2.5-pro", |
| "object": "model", |
| "created": int(time.time()), |
| "owned_by": "google", |
| "permission": [], |
| "root": "gemini-2.5-pro", |
| "parent": None, |
| "description": "gemini-2.5-pro稳定经典专业模型" |
| } |
| ] |
|
|
|
|
| def get_model_name(requested_model: str) -> str: |
| """获取实际的Gemini模型名称""" |
| print(f"实际模型名称:{GEMINI_MODELS.get(requested_model)}") |
| return GEMINI_MODELS.get(requested_model, "gemini-2.5-flash") |
|
|
|
|
|
|
| def convert_messages(messages): |
| content_parts = [] |
| system_instruction = None |
|
|
| for message in messages: |
| role = message.get("role", "user") |
| content = message.get("content", "") |
|
|
| if role == "system": |
| system_instruction = content |
| elif role == "assistant": |
| content_parts.append({ |
| "role": "model", |
| "parts": [{"text": content}] |
| }) |
| elif role == "user": |
| content_parts.append({ |
| "role": "user", |
| "parts": [{"text": content}] |
| }) |
|
|
| return content_parts, system_instruction |
|
|
|
|
| def handle_error(error): |
| """简化的错误处理""" |
| error_str = str(error).lower() |
|
|
| if "prompt_feedback" in error_str: |
| if "other" in error_str: |
| return "您的输入内容可能过长或触发了安全策略。请尝试缩短您的问题。", "length" |
| elif "safety" in error_str: |
| return "您的请求被安全策略阻止。请尝试修改您的问题。", "content_filter" |
| elif "safety" in error_str: |
| return "您的请求被安全策略过滤。请尝试修改您的问题。", "content_filter" |
|
|
| return "生成内容时遇到错误。请稍后重试。", "stop" |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| try: |
| setup_gemini() |
| logger.info("应用启动完成") |
| yield |
| except Exception as e: |
| logger.error(f"应用启动失败: {str(e)}") |
| raise |
| finally: |
| logger.info("应用关闭") |
|
|
|
|
| |
| app = FastAPI( |
| lifespan=lifespan, |
| title="Gemini Official API (ap3)", |
| version="1.3.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
|
|
|
|
| |
| API_KEYS = [ |
| 'AIzaSyDy9CHEMt0-MLbByITwKBwVaf3M0PJOSEc', |
| 'AIzaSyC3sG_4yjXf1KVrB6Sgh4omk9TdhSSJgSk' |
| ] |
|
|
|
|
| def get_random_api_key(): |
| """获取随机API密钥""" |
| return random.choice(API_KEYS) |
|
|
|
|
| def setup_gemini(api_key=None): |
| """配置Gemini API""" |
| if not api_key: |
| api_key = get_random_api_key() |
|
|
| if not API_KEYS: |
| logger.error("请设置有效的API密钥列表") |
| raise ValueError("API_KEYS未设置") |
|
|
| client = genai.Client(api_key=api_key) |
| return client, api_key |
|
|
|
|
| |
| SAFETY_SETTINGS = [ |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ), |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ), |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ), |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ), |
| types.SafetySetting( |
| category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY, |
| threshold=types.HarmBlockThreshold.BLOCK_NONE, |
| ), |
| ] |
|
|
|
|
| async def try_generate_content(model_name, content_parts, config, max_retries=3): |
| """带重试机制的内容生成""" |
| last_error = None |
| used_keys = set() |
|
|
| for attempt in range(max_retries): |
| try: |
| available_keys = [key for key in API_KEYS if key not in used_keys] |
| if not available_keys: |
| used_keys.clear() |
| available_keys = API_KEYS |
|
|
| api_key = random.choice(available_keys) |
| used_keys.add(api_key) |
|
|
| client, current_key = setup_gemini(api_key) |
| logger.info(f"尝试第 {attempt + 1} 次,使用密钥: {current_key[:20]}...") |
|
|
| response = client.models.generate_content( |
| model=model_name, |
| contents=content_parts, |
| config=config |
| ) |
|
|
| return response, current_key |
|
|
| except Exception as e: |
| last_error = e |
| error_str = str(e).lower() |
|
|
| if any(code in error_str for code in ['400', '401', '403', '429', '500', '502', '503', '504']): |
| logger.warning(f"第 {attempt + 1} 次尝试失败: {str(e)}") |
| if attempt < max_retries - 1: |
| await asyncio.sleep(1) |
| continue |
| else: |
| raise e |
|
|
| raise last_error |
|
|
|
|
| @app.post("/v1/chat/completions") |
| async def chat_completions(request: Request): |
| """聊天对话接口""" |
| try: |
| body = await request.json() |
| messages = body.get('messages', []) |
| stream = body.get('stream', False) |
| max_tokens = body.get('max_tokens', 65536) |
| temperature = body.get('temperature', 1.2) |
| top_p = body.get('top_p', 0.0) |
| requested_model = body.get('model', 'gemini-2.5-flash') |
|
|
| model_name = get_model_name(requested_model) |
| content_parts, system_instruction = convert_messages(messages) |
|
|
| config = types.GenerateContentConfig( |
| max_output_tokens=max_tokens, |
| temperature=temperature, |
| top_p=top_p, |
| system_instruction=system_instruction, |
| safety_settings=SAFETY_SETTINGS, |
| ) |
|
|
| if stream: |
| client, api_key = setup_gemini() |
| return StreamingResponse( |
| stream_response_with_retry(client, model_name, content_parts, config), |
| media_type='text/event-stream' |
| ) |
| else: |
| response, used_key = await try_generate_content(model_name, content_parts, config) |
| response_text = response.text if response else "" |
| finish_reason = "stop" |
| if not response_text: |
| response_text = "无法生成回复。请尝试修改您的问题。" |
|
|
| logger.info(f"成功生成回复,使用密钥: {used_key[:20]}...") |
| return { |
| 'id': f'chatcmpl-{int(time.time())}-{random.randint(1000, 9999)}', |
| 'object': 'chat.completion', |
| 'created': int(time.time()), |
| 'model': requested_model, |
| 'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': response_text}, 'finish_reason': finish_reason}], |
| 'usage': {'prompt_tokens': len(content_parts), 'completion_tokens': len(response_text.split()), 'total_tokens': len(content_parts) + len(response_text.split())} |
| } |
| except Exception as e: |
| logger.error(f"处理聊天请求出错: {str(e)}") |
| error_message, finish_reason = handle_error(e) |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
| async def stream_response_with_retry(client, model_name, content_parts, config, max_retries=3): |
| """带重试机制的流式响应生成器""" |
| last_error = None |
| used_keys = set() |
| for attempt in range(max_retries): |
| try: |
| available_keys = [key for key in API_KEYS if key not in used_keys] |
| if not available_keys: |
| used_keys.clear() |
| available_keys = API_KEYS |
| api_key = random.choice(available_keys) |
| used_keys.add(api_key) |
|
|
| current_client, current_key = setup_gemini(api_key) |
| logger.info(f"流式响应尝试第 {attempt + 1} 次,使用密钥: {current_key[:20]}...") |
|
|
| for chunk in current_client.models.generate_content_stream(model=model_name, contents=content_parts, config=config): |
| if chunk and hasattr(chunk, 'text') and chunk.text: |
| data = {'id': f'chatcmpl-{int(time.time())}-{random.randint(1000, 9999)}', 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': model_name, 'choices': [{'index': 0, 'delta': {'role': 'assistant', 'content': chunk.text}, 'finish_reason': None}]} |
| yield f'data: {json.dumps(data, ensure_ascii=False)}\n\n' |
| await asyncio.sleep(0.01) |
|
|
| final_data = {'id': f'chatcmpl-{int(time.time())}-{random.randint(1000, 9999)}', 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': model_name, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]} |
| yield f'data: {json.dumps(final_data, ensure_ascii=False)}\n\n' |
| yield 'data: [DONE]\n\n' |
|
|
| logger.info(f"流式响应成功,使用密钥: {current_key[:20]}...") |
| return |
| except Exception as e: |
| last_error = e |
| error_str = str(e).lower() |
| if any(code in error_str for code in ['400', '401', '403', '429', '500', '502', '503', '504']): |
| logger.warning(f"流式响应第 {attempt + 1} 次尝试失败: {str(e)}") |
| if attempt < max_retries - 1: |
| await asyncio.sleep(1) |
| continue |
| else: |
| break |
| logger.error(f"流式响应所有重试失败: {str(last_error)}") |
| error_message, finish_reason = handle_error(last_error) |
| error_data = {'id': f'chatcmpl-{int(time.time())}-error', 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': model_name, 'choices': [{'index': 0, 'delta': {'role': 'assistant', 'content': error_message}, 'finish_reason': finish_reason}]} |
| yield f'data: {json.dumps(error_data, ensure_ascii=False)}\n\n' |
| yield 'data: [DONE]\n\n' |
|
|
| @app.get("/v1/models") |
| async def list_models(): |
| """获取可用模型列表""" |
| try: |
| return {"object": "list", "data": SUPPORTED_MODELS} |
| except Exception as e: |
| logger.error(f"获取模型列表出错: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/v1/models/{model_id}") |
| async def get_model_info(model_id: str): |
| """获取特定模型信息""" |
| try: |
| for model in SUPPORTED_MODELS: |
| if model["id"] == model_id: |
| return model |
| raise HTTPException(status_code=404, detail=f"模型 {model_id} 未找到") |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"获取模型信息出错: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/v1/chat/completions/v1/models") |
| async def list_models_alternative(): |
| """获取可用模型列表 - 兼容路径""" |
| try: |
| return {"object": "list", "data": SUPPORTED_MODELS} |
| except Exception as e: |
| logger.error(f"获取模型列表出错: {str(e)}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/health") |
| async def health_check(): |
| """健康检查端点""" |
| try: |
| return {"status": "healthy", "timestamp": int(time.time()), "api": "gemini-official", "available_models": [model["id"] for model in SUPPORTED_MODELS], "version": "1.3.0"} |
| except Exception as e: |
| logger.error(f"健康检查失败: {str(e)}") |
| return {"status": "unhealthy", "timestamp": int(time.time()), "error": str(e)} |
|
|
| @app.get("/") |
| async def root(): |
| """根路径信息""" |
| return {"name": "Gemini Official API (ap3)", "version": "1.3.0", "description": "Google Gemini官方API接口服务", "endpoints": {"models": "/v1/models", "models_alt": "/v1/chat/completions/v1/models", "chat": "/v1/chat/completions", "health": "/health"}} |
|
|
| @app.exception_handler(404) |
| async def not_found_handler(request: Request, exc: HTTPException): |
| """处理404错误""" |
| return {"error": "未找到", "requested_path": str(request.url.path), "message": "请求的路径不存在", "available_endpoints": {"models": "/v1/models", "models_alt": "/v1/chat/completions/v1/models", "chat": "/v1/chat/completions", "health": "/health", "info": "/"}} |
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7862)) |
| print(f"🚀 启动Gemini官方API服务器于端口 {port}") |
| print(f"📊 支持的模型: {[model['id'] for model in SUPPORTED_MODELS]}") |
| print(f"🔑 已配置 {len(API_KEYS)} 个API密钥") |
| print("🔄 支持自动重试和密钥轮换") |
| uvicorn.run(app, host="0.0.0.0", port=port) |