anything-llm / app.py
kun2333's picture
Upload app.py
0b50475 verified
#!/usr/bin/env python3
"""
AnythingLLM to OpenAI API Proxy - 修复版本
支持list格式的content
"""
import json
import os
import time
import uuid
import logging
import threading
from typing import Any, Dict, List, Optional, TypedDict, Union
import requests
from fastapi import FastAPI, HTTPException, Depends, Query, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import uvicorn
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 配置类
class HFConfig:
ANYTHINGLLM_BASE_URL = os.getenv("ANYTHINGLLM_BASE_URL", "https://ai.renpho.nl:3002")
ANYTHINGLLM_WORKSPACE = os.getenv("ANYTHINGLLM_WORKSPACE", "liufuwei")
BEARER_TOKEN = os.getenv("BEARER_TOKEN", "")
CLIENT_API_KEYS = os.getenv("CLIENT_API_KEYS", "sk-hf-spaces-key1,sk-hf-spaces-key2").split(",")
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "120"))
# 全局变量
VALID_CLIENT_KEYS: set = set(HFConfig.CLIENT_API_KEYS)
ANYTHINGLLM_MODELS: List[str] = ["claude-3-7-sonnet", "claude-sonnet-4", "deepseek-chat", "anythingllm"]
DEBUG_MODE = HFConfig.DEBUG
# Pydantic Models
class ChatMessage(BaseModel):
role: str
content: Union[str, List[Dict[str, Any]]]
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
stream: bool = True
class ModelInfo(BaseModel):
id: str
object: str = "model"
created: int
owned_by: str
class ModelList(BaseModel):
object: str = "list"
data: List[ModelInfo]
# FastAPI App
app = FastAPI(
title="AnythingLLM OpenAI API Adapter - Fixed",
description="Fixed version supporting list content format",
version="1.2.1-hf"
)
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
security = HTTPBearer(auto_error=False)
def log_debug(message: str):
"""Debug日志函数"""
if DEBUG_MODE:
logger.info(f"[DEBUG] {message}")
def create_requests_session():
"""创建requests会话"""
session = requests.Session()
session.proxies = {'http': None, 'https': None}
session.trust_env = False
session.verify = True
return session
def decode_text_content(text):
"""解码文本内容"""
if not text:
return text
try:
if isinstance(text, str):
try:
decoded = text.encode('latin-1').decode('utf-8')
return decoded
except (UnicodeEncodeError, UnicodeDecodeError):
return text
return text
except Exception:
return text
def extract_text_from_content(content):
"""从content中提取文本,支持string和list格式"""
if isinstance(content, str):
# 直接返回字符串内容
return content
elif isinstance(content, list):
# 处理list格式:[{'type': 'text', 'text': '实际内容'}]
text_parts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'text' and 'text' in item:
text_parts.append(item['text'])
elif 'content' in item:
# 有些格式可能用content字段
text_parts.append(str(item['content']))
else:
# 如果不是标准格式,尝试转换为字符串
text_parts.append(str(item))
else:
text_parts.append(str(item))
return ' '.join(text_parts)
else:
# 其他格式,转换为字符串
return str(content)
def extract_last_user_message(messages):
"""提取最后一条用户消息 - 支持list格式content"""
log_debug(f"提取用户消息,总消息数: {len(messages)}")
# 从后往前查找用户消息
for i, message in enumerate(reversed(messages)):
log_debug(f"检查倒序消息 {i}: role='{message.role}', content_type={type(message.content)}")
if message.role == "user":
# 提取文本内容
text_content = extract_text_from_content(message.content)
log_debug(f"找到用户消息,提取的文本: '{text_content[:100]}...'")
if text_content.strip(): # 确保不是空内容
return text_content.strip()
else:
log_debug(f"用户消息内容为空,继续查找...")
continue
log_debug("❌ 没有找到有效的用户消息!")
return ""
async def authenticate_client(
auth: Optional[HTTPAuthorizationCredentials] = Depends(security),
):
"""Authenticate client based on API key in Authorization header"""
if not VALID_CLIENT_KEYS or not VALID_CLIENT_KEYS - {""}:
log_debug("跳过客户端认证(未配置密钥)")
return
if not auth or not auth.credentials:
log_debug("❌ 缺少Authorization头")
raise HTTPException(
status_code=401,
detail="API key required in Authorization header.",
headers={"WWW-Authenticate": "Bearer"},
)
if auth.credentials not in VALID_CLIENT_KEYS:
log_debug(f"❌ 无效的API密钥: {auth.credentials[:20]}...")
raise HTTPException(status_code=403, detail="Invalid client API key.")
log_debug(f"✅ 客户端认证成功: {auth.credentials[:20]}...")
def convert_anythingllm_to_openai_chunk(chunk_data, request_id):
"""将AnythingLLM的响应块转换为OpenAI格式"""
try:
data = json.loads(chunk_data)
if data.get("type") == "textResponseChunk":
raw_content = data.get("textResponse", "")
content = decode_text_content(raw_content)
finish_reason = None
if data.get("close", False):
finish_reason = "stop"
openai_chunk = {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "anythingllm",
"choices": [{
"index": 0,
"delta": {"content": content} if content else {},
"finish_reason": finish_reason
}],
"system_fingerprint": None
}
return json.dumps(openai_chunk, ensure_ascii=False)
elif data.get("type") == "finalizeResponseStream":
openai_chunk = {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "anythingllm",
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}],
"system_fingerprint": None
}
return json.dumps(openai_chunk, ensure_ascii=False)
return None
except json.JSONDecodeError:
return None
@app.on_event("startup")
async def startup():
"""应用启动时初始化配置"""
logger.info("Starting AnythingLLM OpenAI API Adapter (Fixed Version)...")
logger.info(f"Bearer Token configured: {bool(HFConfig.BEARER_TOKEN)}")
logger.info(f"Base URL: {HFConfig.ANYTHINGLLM_BASE_URL}")
logger.info(f"Workspace: {HFConfig.ANYTHINGLLM_WORKSPACE}")
logger.info("Server initialization completed.")
@app.get("/v1/models", response_model=ModelList)
async def list_v1_models(_: None = Depends(authenticate_client)):
"""List available models - authenticated"""
model_infos = [
ModelInfo(
id=model,
created=int(time.time()),
owned_by="anythingllm"
)
for model in ANYTHINGLLM_MODELS
]
return ModelList(data=model_infos)
@app.get("/models", response_model=ModelList)
async def list_models_no_auth():
"""List available models without authentication"""
model_infos = [
ModelInfo(
id=model,
created=int(time.time()),
owned_by="anythingllm"
)
for model in ANYTHINGLLM_MODELS
]
return ModelList(data=model_infos)
@app.get("/debug")
async def toggle_debug(enable: bool = Query(None)):
"""切换调试模式"""
global DEBUG_MODE
if enable is not None:
DEBUG_MODE = enable
return {"debug_mode": DEBUG_MODE}
@app.post("/v1/chat/completions")
async def chat_completions(
request: Request, _: None = Depends(authenticate_client)
):
"""Create chat completion using AnythingLLM backend"""
print("=" * 80)
print("🔥 收到新的聊天完成请求")
print(f"📍 请求URL: {request.url}")
print(f"📍 请求方法: {request.method}")
print(f"📍 请求头:")
for key, value in request.headers.items():
if key.lower() == 'authorization':
print(f" {key}: {value[:20]}...")
else:
print(f" {key}: {value}")
# 解析请求体
try:
body = await request.json()
print(f"✅ 请求体解析成功")
print(f"📊 消息数量: {len(body.get('messages', []))}")
print(f"📊 模型: {body.get('model', 'N/A')}")
print(f"📊 流式: {body.get('stream', False)}")
# 打印完整请求体
print(f"📋 完整请求体:")
try:
import json
formatted_body = json.dumps(body, indent=2, ensure_ascii=False)
print(formatted_body)
except Exception as e:
print(f" 无法格式化请求体: {e}")
print(f" 原始请求体: {body}")
# 打印每个消息的详细信息
messages = body.get('messages', [])
for i, msg in enumerate(messages):
print(f"📝 消息 {i+1}:")
print(f" 角色: {msg.get('role', 'N/A')}")
content = msg.get('content')
if isinstance(content, str):
print(f" 内容类型: 字符串")
print(f" 内容长度: {len(content)}")
print(f" 内容预览: {content[:100]}...")
elif isinstance(content, list):
print(f" 内容类型: 列表")
print(f" 元素数量: {len(content)}")
for j, item in enumerate(content):
if isinstance(item, dict):
print(f" 元素 {j+1}: type={item.get('type')}, text长度={len(item.get('text', ''))}")
if item.get('type') == 'text':
print(f" 文本预览: {item.get('text', '')[:50]}...")
else:
print(f" 内容类型: {type(content).__name__}")
print(f" 内容: {content}")
log_debug(f"请求体解析成功,消息数量: {len(body.get('messages', []))}")
except Exception as e:
print(f"❌ JSON解析失败: {e}")
log_debug(f"❌ JSON解析失败: {e}")
raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}")
# 解析请求参数
try:
chat_request = ChatCompletionRequest(**body)
except Exception as e:
log_debug(f"❌ 请求参数解析失败: {e}")
raise HTTPException(status_code=400, detail=f"Invalid request format: {str(e)}")
# 验证输入
if chat_request.model not in ANYTHINGLLM_MODELS:
log_debug(f"❌ 不支持的模型: {chat_request.model}")
raise HTTPException(status_code=404, detail=f"Model '{chat_request.model}' not found.")
if not chat_request.messages:
log_debug("❌ 没有提供消息")
raise HTTPException(status_code=400, detail="No messages provided in the request.")
# 检查Bearer Token
if not HFConfig.BEARER_TOKEN:
log_debug("❌ 没有配置Bearer Token")
raise HTTPException(status_code=503, detail="No valid AnythingLLM tokens available.")
# 提取用户消息
user_message = extract_last_user_message(chat_request.messages)
if not user_message:
log_debug("❌ 没有找到用户消息")
raise HTTPException(status_code=400, detail="No user message found")
log_debug(f"✅ 提取到用户消息: '{user_message[:100]}...'")
# 创建新线程并发送请求
try:
session = create_requests_session()
# 1. 创建新线程
thread_url = f"{HFConfig.ANYTHINGLLM_BASE_URL}/api/workspace/{HFConfig.ANYTHINGLLM_WORKSPACE}/thread/new"
thread_headers = {
"Authorization": f"Bearer {HFConfig.BEARER_TOKEN}",
"Accept": "*/*",
"Content-Type": "application/json",
}
thread_response = session.post(thread_url, headers=thread_headers, timeout=30)
thread_response.raise_for_status()
thread_data = thread_response.json()
thread_slug = thread_data.get("thread", {}).get("slug")
if not thread_slug:
raise Exception("Failed to create thread")
log_debug(f"创建线程成功: {thread_slug}")
# 2. 发送聊天请求
chat_url = f"{HFConfig.ANYTHINGLLM_BASE_URL}/api/workspace/{HFConfig.ANYTHINGLLM_WORKSPACE}/thread/{thread_slug}/stream-chat"
payload = {
"message": user_message,
"attachments": []
}
headers = {
"Authorization": f"Bearer {HFConfig.BEARER_TOKEN}",
"Content-Type": "text/plain;charset=UTF-8",
"Accept": "text/event-stream"
}
print("🚀 准备发送到AnythingLLM:")
print(f"📍 URL: {chat_url}")
print(f"📍 请求头:")
for key, value in headers.items():
if key.lower() == 'authorization':
print(f" {key}: {value[:20]}...")
else:
print(f" {key}: {value}")
print(f"📍 Payload:")
print(f" message长度: {len(user_message)}")
print(f" message预览: {user_message[:200]}...")
print(f" attachments: {payload['attachments']}")
print(f"📍 完整Payload JSON:")
try:
formatted_payload = json.dumps(payload, indent=2, ensure_ascii=False)
print(formatted_payload)
except Exception as e:
print(f" 无法格式化payload: {e}")
print(f" 原始payload: {payload}")
log_debug(f"发送聊天请求到: {chat_url}")
if chat_request.stream:
# 流式响应
async def generate_stream():
request_id = str(uuid.uuid4())[:8]
try:
print(f"📤 发送POST请求到AnythingLLM...")
with session.post(
chat_url,
data=json.dumps(payload),
headers=headers,
stream=True,
timeout=HFConfig.REQUEST_TIMEOUT
) as response:
print(f"📥 收到AnythingLLM响应:")
print(f" 状态码: {response.status_code}")
print(f" 响应头:")
for key, value in response.headers.items():
print(f" {key}: {value}")
if response.status_code != 200:
print(f"❌ AnythingLLM返回错误状态码: {response.status_code}")
try:
error_text = response.text
print(f"❌ 错误响应内容: {error_text}")
except:
print("❌ 无法读取错误响应内容")
error_chunk = {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": chat_request.model,
"choices": [{
"index": 0,
"delta": {"content": f"Error: AnythingLLM API returned {response.status_code}"},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
return
print(f"✅ 开始处理流式响应...")
line_count = 0
for line in response.iter_lines(decode_unicode=True):
line_count += 1
if line and line.startswith("data: "):
chunk_data = line[6:]
print(f"📦 收到数据块 {line_count}: {chunk_data[:100]}...")
if chunk_data.strip():
openai_chunk = convert_anythingllm_to_openai_chunk(chunk_data, request_id)
if openai_chunk:
print(f"✅ 转换为OpenAI格式: {openai_chunk[:100]}...")
yield f"data: {openai_chunk}\n\n"
else:
print(f"⚠️ 转换失败,跳过此数据块")
elif line:
print(f"📦 收到非data行 {line_count}: {line[:100]}...")
print(f"✅ 流式响应处理完成,共处理 {line_count} 行")
yield "data: [DONE]\n\n"
except Exception as e:
print(f"❌ 流式处理异常: {type(e).__name__}: {str(e)}")
import traceback
print(f"❌ 异常堆栈: {traceback.format_exc()}")
logger.error(f"Stream error: {e}")
error_chunk = {
"id": f"chatcmpl-{request_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": chat_request.model,
"choices": [{
"index": 0,
"delta": {"content": f"Error: {str(e)}"},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
print(f"🎯 返回流式响应")
print("=" * 80)
return StreamingResponse(
generate_stream(),
media_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
else:
# 非流式响应
print(f"📤 发送非流式POST请求到AnythingLLM...")
response = session.post(
chat_url,
data=json.dumps(payload),
headers=headers,
timeout=HFConfig.REQUEST_TIMEOUT
)
print(f"📥 收到非流式响应:")
print(f" 状态码: {response.status_code}")
print(f" 响应长度: {len(response.text)}")
if response.status_code != 200:
print(f"❌ 非流式请求失败: {response.status_code}")
print(f"❌ 响应内容: {response.text}")
raise HTTPException(status_code=500, detail=f"AnythingLLM API returned {response.status_code}")
print(f"✅ 开始解析非流式响应...")
full_response = ""
line_count = 0
for line in response.text.split('\n'):
line_count += 1
if line.startswith("data: "):
chunk_data = line[6:]
print(f"📦 解析数据行 {line_count}: {chunk_data[:100]}...")
try:
data = json.loads(chunk_data)
if data.get("type") == "textResponseChunk":
raw_text = data.get("textResponse", "")
decoded_text = decode_text_content(raw_text)
full_response = decoded_text # 使用最新的完整文本
print(f"✅ 提取文本: {decoded_text[:100]}...")
except json.JSONDecodeError as e:
print(f"⚠️ JSON解析失败: {e}")
continue
print(f"✅ 非流式响应解析完成,最终响应长度: {len(full_response)}")
openai_response = {
"id": f"chatcmpl-{str(uuid.uuid4())[:8]}",
"object": "chat.completion",
"created": int(time.time()),
"model": chat_request.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": full_response
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
},
"system_fingerprint": None
}
print(f"🎯 返回非流式响应")
print("=" * 80)
return JSONResponse(content=openai_response)
except Exception as e:
print(f"❌ 请求处理异常: {type(e).__name__}: {str(e)}")
import traceback
print(f"❌ 异常堆栈: {traceback.format_exc()}")
print("=" * 80)
logger.error(f"Request processing error: {e}")
raise HTTPException(status_code=503, detail=f"Service temporarily unavailable: {str(e)}")
@app.get("/health")
async def health_check():
"""健康检查"""
return JSONResponse(content={
"status": "healthy",
"service": "anythingllm-to-openai-proxy-fixed",
"version": "1.2.1-hf",
"timestamp": int(time.time()),
"config": {
"base_url": HFConfig.ANYTHINGLLM_BASE_URL,
"workspace": HFConfig.ANYTHINGLLM_WORKSPACE,
"bearer_token_configured": bool(HFConfig.BEARER_TOKEN),
"debug": DEBUG_MODE,
"models": ANYTHINGLLM_MODELS
}
})
@app.get("/")
async def root():
"""API根路径"""
return JSONResponse(content={
"service": "AnythingLLM to OpenAI API Proxy (Fixed)",
"version": "1.2.1-hf",
"platform": "Hugging Face Spaces",
"fix": "Support for list format content in messages",
"endpoints": {
"chat_completions": "/v1/chat/completions",
"models": "/v1/models",
"health": "/health",
"debug": "/debug"
}
})
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info"
)