#!/usr/bin/env python3 """ AnythingLLM to OpenAI API Proxy - 修复版本 支持list格式的content """ import json import os import time import uuid import logging import threading from typing import Any, Dict, List, Optional, TypedDict, Union import requests from fastapi import FastAPI, HTTPException, Depends, Query, Request from fastapi.responses import StreamingResponse, JSONResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field import uvicorn # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 配置类 class HFConfig: ANYTHINGLLM_BASE_URL = os.getenv("ANYTHINGLLM_BASE_URL", "https://ai.renpho.nl:3002") ANYTHINGLLM_WORKSPACE = os.getenv("ANYTHINGLLM_WORKSPACE", "liufuwei") BEARER_TOKEN = os.getenv("BEARER_TOKEN", "") CLIENT_API_KEYS = os.getenv("CLIENT_API_KEYS", "sk-hf-spaces-key1,sk-hf-spaces-key2").split(",") DEBUG = os.getenv("DEBUG", "false").lower() == "true" REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "120")) # 全局变量 VALID_CLIENT_KEYS: set = set(HFConfig.CLIENT_API_KEYS) ANYTHINGLLM_MODELS: List[str] = ["claude-3-7-sonnet", "claude-sonnet-4", "deepseek-chat", "anythingllm"] DEBUG_MODE = HFConfig.DEBUG # Pydantic Models class ChatMessage(BaseModel): role: str content: Union[str, List[Dict[str, Any]]] class ChatCompletionRequest(BaseModel): model: str messages: List[ChatMessage] stream: bool = True class ModelInfo(BaseModel): id: str object: str = "model" created: int owned_by: str class ModelList(BaseModel): object: str = "list" data: List[ModelInfo] # FastAPI App app = FastAPI( title="AnythingLLM OpenAI API Adapter - Fixed", description="Fixed version supporting list content format", version="1.2.1-hf" ) # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], ) security = HTTPBearer(auto_error=False) def log_debug(message: str): """Debug日志函数""" if DEBUG_MODE: logger.info(f"[DEBUG] {message}") def create_requests_session(): """创建requests会话""" session = requests.Session() session.proxies = {'http': None, 'https': None} session.trust_env = False session.verify = True return session def decode_text_content(text): """解码文本内容""" if not text: return text try: if isinstance(text, str): try: decoded = text.encode('latin-1').decode('utf-8') return decoded except (UnicodeEncodeError, UnicodeDecodeError): return text return text except Exception: return text def extract_text_from_content(content): """从content中提取文本,支持string和list格式""" if isinstance(content, str): # 直接返回字符串内容 return content elif isinstance(content, list): # 处理list格式:[{'type': 'text', 'text': '实际内容'}] text_parts = [] for item in content: if isinstance(item, dict): if item.get('type') == 'text' and 'text' in item: text_parts.append(item['text']) elif 'content' in item: # 有些格式可能用content字段 text_parts.append(str(item['content'])) else: # 如果不是标准格式,尝试转换为字符串 text_parts.append(str(item)) else: text_parts.append(str(item)) return ' '.join(text_parts) else: # 其他格式,转换为字符串 return str(content) def extract_last_user_message(messages): """提取最后一条用户消息 - 支持list格式content""" log_debug(f"提取用户消息,总消息数: {len(messages)}") # 从后往前查找用户消息 for i, message in enumerate(reversed(messages)): log_debug(f"检查倒序消息 {i}: role='{message.role}', content_type={type(message.content)}") if message.role == "user": # 提取文本内容 text_content = extract_text_from_content(message.content) log_debug(f"找到用户消息,提取的文本: '{text_content[:100]}...'") if text_content.strip(): # 确保不是空内容 return text_content.strip() else: log_debug(f"用户消息内容为空,继续查找...") continue log_debug("❌ 没有找到有效的用户消息!") return "" async def authenticate_client( auth: Optional[HTTPAuthorizationCredentials] = Depends(security), ): """Authenticate client based on API key in Authorization header""" if not VALID_CLIENT_KEYS or not VALID_CLIENT_KEYS - {""}: log_debug("跳过客户端认证(未配置密钥)") return if not auth or not auth.credentials: log_debug("❌ 缺少Authorization头") raise HTTPException( status_code=401, detail="API key required in Authorization header.", headers={"WWW-Authenticate": "Bearer"}, ) if auth.credentials not in VALID_CLIENT_KEYS: log_debug(f"❌ 无效的API密钥: {auth.credentials[:20]}...") raise HTTPException(status_code=403, detail="Invalid client API key.") log_debug(f"✅ 客户端认证成功: {auth.credentials[:20]}...") def convert_anythingllm_to_openai_chunk(chunk_data, request_id): """将AnythingLLM的响应块转换为OpenAI格式""" try: data = json.loads(chunk_data) if data.get("type") == "textResponseChunk": raw_content = data.get("textResponse", "") content = decode_text_content(raw_content) finish_reason = None if data.get("close", False): finish_reason = "stop" openai_chunk = { "id": f"chatcmpl-{request_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": "anythingllm", "choices": [{ "index": 0, "delta": {"content": content} if content else {}, "finish_reason": finish_reason }], "system_fingerprint": None } return json.dumps(openai_chunk, ensure_ascii=False) elif data.get("type") == "finalizeResponseStream": openai_chunk = { "id": f"chatcmpl-{request_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": "anythingllm", "choices": [{ "index": 0, "delta": {}, "finish_reason": "stop" }], "system_fingerprint": None } return json.dumps(openai_chunk, ensure_ascii=False) return None except json.JSONDecodeError: return None @app.on_event("startup") async def startup(): """应用启动时初始化配置""" logger.info("Starting AnythingLLM OpenAI API Adapter (Fixed Version)...") logger.info(f"Bearer Token configured: {bool(HFConfig.BEARER_TOKEN)}") logger.info(f"Base URL: {HFConfig.ANYTHINGLLM_BASE_URL}") logger.info(f"Workspace: {HFConfig.ANYTHINGLLM_WORKSPACE}") logger.info("Server initialization completed.") @app.get("/v1/models", response_model=ModelList) async def list_v1_models(_: None = Depends(authenticate_client)): """List available models - authenticated""" model_infos = [ ModelInfo( id=model, created=int(time.time()), owned_by="anythingllm" ) for model in ANYTHINGLLM_MODELS ] return ModelList(data=model_infos) @app.get("/models", response_model=ModelList) async def list_models_no_auth(): """List available models without authentication""" model_infos = [ ModelInfo( id=model, created=int(time.time()), owned_by="anythingllm" ) for model in ANYTHINGLLM_MODELS ] return ModelList(data=model_infos) @app.get("/debug") async def toggle_debug(enable: bool = Query(None)): """切换调试模式""" global DEBUG_MODE if enable is not None: DEBUG_MODE = enable return {"debug_mode": DEBUG_MODE} @app.post("/v1/chat/completions") async def chat_completions( request: Request, _: None = Depends(authenticate_client) ): """Create chat completion using AnythingLLM backend""" print("=" * 80) print("🔥 收到新的聊天完成请求") print(f"📍 请求URL: {request.url}") print(f"📍 请求方法: {request.method}") print(f"📍 请求头:") for key, value in request.headers.items(): if key.lower() == 'authorization': print(f" {key}: {value[:20]}...") else: print(f" {key}: {value}") # 解析请求体 try: body = await request.json() print(f"✅ 请求体解析成功") print(f"📊 消息数量: {len(body.get('messages', []))}") print(f"📊 模型: {body.get('model', 'N/A')}") print(f"📊 流式: {body.get('stream', False)}") # 打印完整请求体 print(f"📋 完整请求体:") try: import json formatted_body = json.dumps(body, indent=2, ensure_ascii=False) print(formatted_body) except Exception as e: print(f" 无法格式化请求体: {e}") print(f" 原始请求体: {body}") # 打印每个消息的详细信息 messages = body.get('messages', []) for i, msg in enumerate(messages): print(f"📝 消息 {i+1}:") print(f" 角色: {msg.get('role', 'N/A')}") content = msg.get('content') if isinstance(content, str): print(f" 内容类型: 字符串") print(f" 内容长度: {len(content)}") print(f" 内容预览: {content[:100]}...") elif isinstance(content, list): print(f" 内容类型: 列表") print(f" 元素数量: {len(content)}") for j, item in enumerate(content): if isinstance(item, dict): print(f" 元素 {j+1}: type={item.get('type')}, text长度={len(item.get('text', ''))}") if item.get('type') == 'text': print(f" 文本预览: {item.get('text', '')[:50]}...") else: print(f" 内容类型: {type(content).__name__}") print(f" 内容: {content}") log_debug(f"请求体解析成功,消息数量: {len(body.get('messages', []))}") except Exception as e: print(f"❌ JSON解析失败: {e}") log_debug(f"❌ JSON解析失败: {e}") raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") # 解析请求参数 try: chat_request = ChatCompletionRequest(**body) except Exception as e: log_debug(f"❌ 请求参数解析失败: {e}") raise HTTPException(status_code=400, detail=f"Invalid request format: {str(e)}") # 验证输入 if chat_request.model not in ANYTHINGLLM_MODELS: log_debug(f"❌ 不支持的模型: {chat_request.model}") raise HTTPException(status_code=404, detail=f"Model '{chat_request.model}' not found.") if not chat_request.messages: log_debug("❌ 没有提供消息") raise HTTPException(status_code=400, detail="No messages provided in the request.") # 检查Bearer Token if not HFConfig.BEARER_TOKEN: log_debug("❌ 没有配置Bearer Token") raise HTTPException(status_code=503, detail="No valid AnythingLLM tokens available.") # 提取用户消息 user_message = extract_last_user_message(chat_request.messages) if not user_message: log_debug("❌ 没有找到用户消息") raise HTTPException(status_code=400, detail="No user message found") log_debug(f"✅ 提取到用户消息: '{user_message[:100]}...'") # 创建新线程并发送请求 try: session = create_requests_session() # 1. 创建新线程 thread_url = f"{HFConfig.ANYTHINGLLM_BASE_URL}/api/workspace/{HFConfig.ANYTHINGLLM_WORKSPACE}/thread/new" thread_headers = { "Authorization": f"Bearer {HFConfig.BEARER_TOKEN}", "Accept": "*/*", "Content-Type": "application/json", } thread_response = session.post(thread_url, headers=thread_headers, timeout=30) thread_response.raise_for_status() thread_data = thread_response.json() thread_slug = thread_data.get("thread", {}).get("slug") if not thread_slug: raise Exception("Failed to create thread") log_debug(f"创建线程成功: {thread_slug}") # 2. 发送聊天请求 chat_url = f"{HFConfig.ANYTHINGLLM_BASE_URL}/api/workspace/{HFConfig.ANYTHINGLLM_WORKSPACE}/thread/{thread_slug}/stream-chat" payload = { "message": user_message, "attachments": [] } headers = { "Authorization": f"Bearer {HFConfig.BEARER_TOKEN}", "Content-Type": "text/plain;charset=UTF-8", "Accept": "text/event-stream" } print("🚀 准备发送到AnythingLLM:") print(f"📍 URL: {chat_url}") print(f"📍 请求头:") for key, value in headers.items(): if key.lower() == 'authorization': print(f" {key}: {value[:20]}...") else: print(f" {key}: {value}") print(f"📍 Payload:") print(f" message长度: {len(user_message)}") print(f" message预览: {user_message[:200]}...") print(f" attachments: {payload['attachments']}") print(f"📍 完整Payload JSON:") try: formatted_payload = json.dumps(payload, indent=2, ensure_ascii=False) print(formatted_payload) except Exception as e: print(f" 无法格式化payload: {e}") print(f" 原始payload: {payload}") log_debug(f"发送聊天请求到: {chat_url}") if chat_request.stream: # 流式响应 async def generate_stream(): request_id = str(uuid.uuid4())[:8] try: print(f"📤 发送POST请求到AnythingLLM...") with session.post( chat_url, data=json.dumps(payload), headers=headers, stream=True, timeout=HFConfig.REQUEST_TIMEOUT ) as response: print(f"📥 收到AnythingLLM响应:") print(f" 状态码: {response.status_code}") print(f" 响应头:") for key, value in response.headers.items(): print(f" {key}: {value}") if response.status_code != 200: print(f"❌ AnythingLLM返回错误状态码: {response.status_code}") try: error_text = response.text print(f"❌ 错误响应内容: {error_text}") except: print("❌ 无法读取错误响应内容") error_chunk = { "id": f"chatcmpl-{request_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": chat_request.model, "choices": [{ "index": 0, "delta": {"content": f"Error: AnythingLLM API returned {response.status_code}"}, "finish_reason": "stop" }] } yield f"data: {json.dumps(error_chunk)}\n\n" yield "data: [DONE]\n\n" return print(f"✅ 开始处理流式响应...") line_count = 0 for line in response.iter_lines(decode_unicode=True): line_count += 1 if line and line.startswith("data: "): chunk_data = line[6:] print(f"📦 收到数据块 {line_count}: {chunk_data[:100]}...") if chunk_data.strip(): openai_chunk = convert_anythingllm_to_openai_chunk(chunk_data, request_id) if openai_chunk: print(f"✅ 转换为OpenAI格式: {openai_chunk[:100]}...") yield f"data: {openai_chunk}\n\n" else: print(f"⚠️ 转换失败,跳过此数据块") elif line: print(f"📦 收到非data行 {line_count}: {line[:100]}...") print(f"✅ 流式响应处理完成,共处理 {line_count} 行") yield "data: [DONE]\n\n" except Exception as e: print(f"❌ 流式处理异常: {type(e).__name__}: {str(e)}") import traceback print(f"❌ 异常堆栈: {traceback.format_exc()}") logger.error(f"Stream error: {e}") error_chunk = { "id": f"chatcmpl-{request_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": chat_request.model, "choices": [{ "index": 0, "delta": {"content": f"Error: {str(e)}"}, "finish_reason": "stop" }] } yield f"data: {json.dumps(error_chunk)}\n\n" yield "data: [DONE]\n\n" print(f"🎯 返回流式响应") print("=" * 80) return StreamingResponse( generate_stream(), media_type='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } ) else: # 非流式响应 print(f"📤 发送非流式POST请求到AnythingLLM...") response = session.post( chat_url, data=json.dumps(payload), headers=headers, timeout=HFConfig.REQUEST_TIMEOUT ) print(f"📥 收到非流式响应:") print(f" 状态码: {response.status_code}") print(f" 响应长度: {len(response.text)}") if response.status_code != 200: print(f"❌ 非流式请求失败: {response.status_code}") print(f"❌ 响应内容: {response.text}") raise HTTPException(status_code=500, detail=f"AnythingLLM API returned {response.status_code}") print(f"✅ 开始解析非流式响应...") full_response = "" line_count = 0 for line in response.text.split('\n'): line_count += 1 if line.startswith("data: "): chunk_data = line[6:] print(f"📦 解析数据行 {line_count}: {chunk_data[:100]}...") try: data = json.loads(chunk_data) if data.get("type") == "textResponseChunk": raw_text = data.get("textResponse", "") decoded_text = decode_text_content(raw_text) full_response = decoded_text # 使用最新的完整文本 print(f"✅ 提取文本: {decoded_text[:100]}...") except json.JSONDecodeError as e: print(f"⚠️ JSON解析失败: {e}") continue print(f"✅ 非流式响应解析完成,最终响应长度: {len(full_response)}") openai_response = { "id": f"chatcmpl-{str(uuid.uuid4())[:8]}", "object": "chat.completion", "created": int(time.time()), "model": chat_request.model, "choices": [{ "index": 0, "message": { "role": "assistant", "content": full_response }, "finish_reason": "stop" }], "usage": { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0 }, "system_fingerprint": None } print(f"🎯 返回非流式响应") print("=" * 80) return JSONResponse(content=openai_response) except Exception as e: print(f"❌ 请求处理异常: {type(e).__name__}: {str(e)}") import traceback print(f"❌ 异常堆栈: {traceback.format_exc()}") print("=" * 80) logger.error(f"Request processing error: {e}") raise HTTPException(status_code=503, detail=f"Service temporarily unavailable: {str(e)}") @app.get("/health") async def health_check(): """健康检查""" return JSONResponse(content={ "status": "healthy", "service": "anythingllm-to-openai-proxy-fixed", "version": "1.2.1-hf", "timestamp": int(time.time()), "config": { "base_url": HFConfig.ANYTHINGLLM_BASE_URL, "workspace": HFConfig.ANYTHINGLLM_WORKSPACE, "bearer_token_configured": bool(HFConfig.BEARER_TOKEN), "debug": DEBUG_MODE, "models": ANYTHINGLLM_MODELS } }) @app.get("/") async def root(): """API根路径""" return JSONResponse(content={ "service": "AnythingLLM to OpenAI API Proxy (Fixed)", "version": "1.2.1-hf", "platform": "Hugging Face Spaces", "fix": "Support for list format content in messages", "endpoints": { "chat_completions": "/v1/chat/completions", "models": "/v1/models", "health": "/health", "debug": "/debug" } }) if __name__ == "__main__": uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info" )