blackbox2api / api /utils.py
dan92's picture
Update api/utils.py
fd49f49 verified
from datetime import datetime
import json
from typing import Any, Dict, Optional
import uuid
import asyncio
import httpx
from api import validate
from api.config import MODEL_MAPPING, headers
from fastapi import Depends, security, HTTPException
from fastapi.security import HTTPAuthorizationCredentials
from api.config import APP_SECRET, BASE_URL
from api.models import ChatRequest
from api.logger import setup_logger
logger = setup_logger(__name__)
def create_chat_completion_data(
content: str, model: str, timestamp: int, finish_reason: Optional[str] = None
) -> Dict[str, Any]:
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": timestamp,
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content, "role": "assistant"},
"finish_reason": finish_reason,
}
],
"usage": None,
}
def verify_app_secret(credentials: HTTPAuthorizationCredentials = Depends(security)):
if credentials.credentials != APP_SECRET:
raise HTTPException(status_code=403, detail="Invalid APP_SECRET")
return credentials.credentials
def message_to_dict(message):
if isinstance(message.content, str):
return {"role": message.role, "content": message.content}
elif isinstance(message.content, list) and len(message.content) == 2:
if "image_url" in message.content[1]:
return {
"role": message.role,
"content": message.content[0]["text"],
"data": {
"imageBase64": message.content[1]["image_url"]["url"],
"fileText": "",
"title": "snapshoot",
},
}
else:
return {
"role": message.role,
"content": message.content[0]["text"],
"data": {
"imageBase64": "",
"fileText": "",
"title": "snapshoot",
},
}
else:
return {"role": message.role, "content": message.content}
async def process_streaming_response(request: ChatRequest):
max_retries = 3
retry_delay = 1
json_data = {
"messages": [message_to_dict(msg) for msg in request.messages],
"previewToken": None,
"userId": None,
"codeModelMode": True,
"agentMode": {},
"trendingAgentMode": {},
"isMicMode": False,
"userSystemPrompt": None,
"maxTokens": request.max_tokens,
"playgroundTopP": request.top_p,
"playgroundTemperature": request.temperature,
"isChromeExt": False,
"githubToken": None,
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
"userSelectedModel": MODEL_MAPPING.get(request.model),
"validated": validate.getHid()
}
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream(
"POST",
f"{BASE_URL}/api/chat",
headers=headers,
json=json_data,
) as response:
if response.status_code == 500:
# 如果是500错误,尝试刷新hid
validate.getHid(True)
# 更新json_data中的validated值
json_data["validated"] = validate.getHid()
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
continue
response.raise_for_status()
async for line in response.aiter_lines():
timestamp = int(datetime.now().timestamp())
if line:
try:
content = line + "\n"
if "https://www.blackbox.ai" in content:
validate.getHid(True)
content = "正在刷新会话,请重试\n"
yield f"data: {json.dumps(create_chat_completion_data(content, request.model, timestamp))}\n\n"
yield f"data: {json.dumps(create_chat_completion_data('', request.model, timestamp, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
return
if content.startswith("$@$v=undefined-rv1$@$"):
content = content[21:]
yield f"data: {json.dumps(create_chat_completion_data(content, request.model, timestamp))}\n\n"
except Exception as e:
logger.error(f"Error processing line: {e}")
continue
yield f"data: {json.dumps(create_chat_completion_data('', request.model, timestamp, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
return
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
if attempt == max_retries - 1:
error_message = "服务暂时不可用,请稍后重试"
timestamp = int(datetime.now().timestamp())
yield f"data: {json.dumps(create_chat_completion_data(error_message, request.model, timestamp))}\n\n"
yield f"data: {json.dumps(create_chat_completion_data('', request.model, timestamp, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
return
await asyncio.sleep(retry_delay)
except httpx.RequestError as e:
logger.error(f"Request error occurred: {e}")
if attempt == max_retries - 1:
error_message = "网络连接错误,请检查网络后重试"
timestamp = int(datetime.now().timestamp())
yield f"data: {json.dumps(create_chat_completion_data(error_message, request.model, timestamp))}\n\n"
yield f"data: {json.dumps(create_chat_completion_data('', request.model, timestamp, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
return
await asyncio.sleep(retry_delay)
except Exception as e:
logger.error(f"Unexpected error: {e}")
if attempt == max_retries - 1:
error_message = "发生未知错误,请重试"
timestamp = int(datetime.now().timestamp())
yield f"data: {json.dumps(create_chat_completion_data(error_message, request.model, timestamp))}\n\n"
yield f"data: {json.dumps(create_chat_completion_data('', request.model, timestamp, 'stop'))}\n\n"
yield "data: [DONE]\n\n"
return
await asyncio.sleep(retry_delay)
async def process_non_streaming_response(request: ChatRequest):
json_data = {
"messages": [message_to_dict(msg) for msg in request.messages],
"previewToken": None,
"userId": None,
"codeModelMode": True,
"agentMode": {},
"trendingAgentMode": {},
"isMicMode": False,
"userSystemPrompt": None,
"maxTokens": request.max_tokens,
"playgroundTopP": request.top_p,
"playgroundTemperature": request.temperature,
"isChromeExt": False,
"githubToken": None,
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
"userSelectedModel": MODEL_MAPPING.get(request.model),
"validated": validate.getHid()
}
full_response = ""
async with httpx.AsyncClient() as client:
async with client.stream(
method="POST", url=f"{BASE_URL}/api/chat", headers=headers, json=json_data
) as response:
async for chunk in response.aiter_text():
full_response += chunk
if "https://www.blackbox.ai" in full_response:
validate.getHid(True)
full_response = "hid已刷新,重新对话即可"
if full_response.startswith("$@$v=undefined-rv1$@$"):
full_response = full_response[21:]
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": request.model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": full_response},
"finish_reason": "stop",
}
],
"usage": None,
}