| from datetime import datetime | |
| from http.client import HTTPException | |
| import json | |
| from typing import Any, Dict, Optional | |
| import uuid | |
| import httpx | |
| from api import validate | |
| from api.config import MODEL_MAPPING, headers | |
| from fastapi import Depends, security | |
| from fastapi.security import HTTPAuthorizationCredentials | |
| from api.config import APP_SECRET, BASE_URL | |
| from api.models import ChatRequest | |
| from api.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| def create_chat_completion_data( | |
| content: str, model: str, timestamp: int, finish_reason: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4()}", | |
| "object": "chat.completion.chunk", | |
| "created": timestamp, | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"content": content, "role": "assistant"}, | |
| "finish_reason": finish_reason, | |
| } | |
| ], | |
| "usage": None, | |
| } | |
| def verify_app_secret(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| if credentials.credentials != APP_SECRET: | |
| raise HTTPException(status_code=403, detail="Invalid APP_SECRET") | |
| return credentials.credentials | |
| def message_to_dict(message): | |
| if isinstance(message.content, str): | |
| return {"role": message.role, "content": message.content} | |
| elif isinstance(message.content, list) and len(message.content) == 2: | |
| return { | |
| "role": message.role, | |
| "content": message.content[0]["text"], | |
| "data": { | |
| "imageBase64": message.content[1]["image_url"]["url"], | |
| "fileText": "", | |
| "title": "snapshoot", | |
| }, | |
| } | |
| else: | |
| return {"role": message.role, "content": message.content} | |
| async def process_streaming_response(request: ChatRequest): | |
| json_data = { | |
| "messages": [message_to_dict(msg) for msg in request.messages], | |
| "previewToken": None, | |
| "userId": None, | |
| "codeModelMode": True, | |
| "agentMode": {}, | |
| "trendingAgentMode": {}, | |
| "isMicMode": False, | |
| "userSystemPrompt": None, | |
| "maxTokens": request.max_tokens, | |
| "playgroundTopP": request.top_p, | |
| "playgroundTemperature": request.temperature, | |
| "isChromeExt": False, | |
| "githubToken": None, | |
| "clickedAnswer2": False, | |
| "clickedAnswer3": False, | |
| "clickedForceWebSearch": False, | |
| "visitFromDelta": False, | |
| "mobileClient": False, | |
| "userSelectedModel": MODEL_MAPPING.get(request.model), | |
| "validated": validate.getHid() | |
| } | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| async with client.stream( | |
| "POST", | |
| f"{BASE_URL}/api/chat", | |
| headers=headers, | |
| json=json_data, | |
| timeout=100, | |
| ) as response: | |
| response.raise_for_status() | |
| async for line in response.aiter_lines(): | |
| timestamp = int(datetime.now().timestamp()) | |
| if line: | |
| content = line + "\n" | |
| if "https://www.blackbox.ai" in content: | |
| validate.getHid(True) | |
| content = "hid已刷新,重新对话即可\n" | |
| yield f"data: {json.dumps(create_chat_completion_data(content, request.model, timestamp))}\n\n" | |
| break | |
| if content.startswith("$@$v=undefined-rv1$@$"): | |
| yield f"data: {json.dumps(create_chat_completion_data(content[21:], request.model, timestamp))}\n\n" | |
| else: | |
| yield f"data: {json.dumps(create_chat_completion_data(content, request.model, timestamp))}\n\n" | |
| yield f"data: {json.dumps(create_chat_completion_data('', request.model, timestamp, 'stop'))}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except httpx.HTTPStatusError as e: | |
| logger.error(f"HTTP error occurred: {e}") | |
| raise HTTPException(status_code=e.response.status_code, detail=str(e)) | |
| except httpx.RequestError as e: | |
| logger.error(f"Error occurred during request: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def process_non_streaming_response(request: ChatRequest): | |
| json_data = { | |
| "messages": [message_to_dict(msg) for msg in request.messages], | |
| "previewToken": None, | |
| "userId": None, | |
| "codeModelMode": True, | |
| "agentMode": {}, | |
| "trendingAgentMode": {}, | |
| "isMicMode": False, | |
| "userSystemPrompt": None, | |
| "maxTokens": request.max_tokens, | |
| "playgroundTopP": request.top_p, | |
| "playgroundTemperature": request.temperature, | |
| "isChromeExt": False, | |
| "githubToken": None, | |
| "clickedAnswer2": False, | |
| "clickedAnswer3": False, | |
| "clickedForceWebSearch": False, | |
| "visitFromDelta": False, | |
| "mobileClient": False, | |
| "userSelectedModel": MODEL_MAPPING.get(request.model), | |
| "validated": validate.getHid() | |
| } | |
| full_response = "" | |
| async with httpx.AsyncClient() as client: | |
| async with client.stream( | |
| method="POST", url=f"{BASE_URL}/api/chat", headers=headers, json=json_data | |
| ) as response: | |
| async for chunk in response.aiter_text(): | |
| full_response += chunk | |
| if "https://www.blackbox.ai" in full_response: | |
| validate.getHid(True) | |
| full_response = "hid已刷新,重新对话即可" | |
| if full_response.startswith("$@$v=undefined-rv1$@$"): | |
| full_response = full_response[21:] | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4()}", | |
| "object": "chat.completion", | |
| "created": int(datetime.now().timestamp()), | |
| "model": request.model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": {"role": "assistant", "content": full_response}, | |
| "finish_reason": "stop", | |
| } | |
| ], | |
| "usage": None, | |
| } | |