| import json |
| import os |
| import time |
| import uuid |
| import hashlib |
| import threading |
| import requests |
| import base64 |
| import re |
| import urllib3 |
| from typing import Any, Dict, List, Optional, TypedDict, Union |
|
|
| |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
| from fastapi import FastAPI, HTTPException, Depends, UploadFile, File |
| from fastapi.responses import StreamingResponse |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| from pydantic import BaseModel, Field |
| from cachetools import LRUCache |
|
|
|
|
| |
| class AbacusAccount(TypedDict): |
| _u_p: str |
| _s_p: str |
| is_valid: bool |
| last_used: float |
| error_count: int |
| session_token: Optional[str] |
| session_token_expires: float |
|
|
|
|
| |
| VALID_CLIENT_KEYS: set = set() |
| ABACUS_ACCOUNTS: List[AbacusAccount] = [] |
| ABACUS_MODELS: List[Dict[str, Any]] = [] |
| account_rotation_lock = threading.Lock() |
| MAX_ERROR_COUNT = 3 |
| ERROR_COOLDOWN = 300 |
| DEBUG_MODE = os.environ.get("DEBUG_MODE", "false").lower() == "true" |
|
|
| |
| CONVERSATION_CACHE = LRUCache(maxsize=1000) |
| conversation_cache_lock = threading.Lock() |
|
|
|
|
| |
| class ChatMessage(BaseModel): |
| role: str |
| content: Any |
| reasoning_content: Optional[str] = None |
|
|
|
|
| class ChatCompletionRequest(BaseModel): |
| model: str |
| messages: List[ChatMessage] |
| stream: bool = True |
| temperature: Optional[float] = None |
| max_tokens: Optional[int] = None |
| top_p: Optional[float] = None |
|
|
|
|
| class ModelInfo(BaseModel): |
| id: str |
| object: str = "model" |
| created: int |
| owned_by: str |
|
|
|
|
| class ModelList(BaseModel): |
| object: str = "list" |
| data: List[ModelInfo] |
|
|
|
|
| class ChatCompletionChoice(BaseModel): |
| message: ChatMessage |
| index: int = 0 |
| finish_reason: str = "stop" |
|
|
|
|
| class ChatCompletionResponse(BaseModel): |
| id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}") |
| object: str = "chat.completion" |
| created: int = Field(default_factory=lambda: int(time.time())) |
| model: str |
| choices: List[ChatCompletionChoice] |
| usage: Dict[str, int] = Field( |
| default_factory=lambda: { |
| "prompt_tokens": 0, |
| "completion_tokens": 0, |
| "total_tokens": 0, |
| } |
| ) |
|
|
|
|
| class StreamChoice(BaseModel): |
| delta: Dict[str, Any] = Field(default_factory=dict) |
| index: int = 0 |
| finish_reason: Optional[str] = None |
|
|
|
|
| class StreamResponse(BaseModel): |
| id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}") |
| object: str = "chat.completion.chunk" |
| created: int = Field(default_factory=lambda: int(time.time())) |
| model: str |
| choices: List[StreamChoice] |
|
|
|
|
| |
| app = FastAPI(title="Abacus OpenAI API Adapter") |
| security = HTTPBearer(auto_error=False) |
|
|
|
|
| def log_debug(message: str): |
| """Debug日志函数""" |
| if DEBUG_MODE: |
| print(f"[DEBUG] {message}") |
|
|
|
|
| def load_client_api_keys(): |
| """Load client API keys from client_api_keys.json""" |
| global VALID_CLIENT_KEYS |
| try: |
| with open("client_api_keys.json", "r", encoding="utf-8") as f: |
| keys = json.load(f) |
| VALID_CLIENT_KEYS = set(keys) if isinstance(keys, list) else set() |
| print(f"Successfully loaded {len(VALID_CLIENT_KEYS)} client API keys.") |
| except FileNotFoundError: |
| print("Error: client_api_keys.json not found. Client authentication will fail.") |
| VALID_CLIENT_KEYS = set() |
| except Exception as e: |
| print(f"Error loading client_api_keys.json: {e}") |
| VALID_CLIENT_KEYS = set() |
|
|
|
|
| def load_abacus_accounts(): |
| """Load Abacus accounts from abc.json""" |
| global ABACUS_ACCOUNTS |
| ABACUS_ACCOUNTS = [] |
| try: |
| with open("abc.json", "r", encoding="utf-8") as f: |
| accounts = json.load(f) |
| if not isinstance(accounts, list): |
| print("Warning: abc.json should contain a list of account objects.") |
| return |
|
|
| for acc in accounts: |
| _u_p = acc.get("_u_p") |
| _s_p = acc.get("_s_p") |
| if _u_p and _s_p: |
| ABACUS_ACCOUNTS.append({ |
| "_u_p": _u_p, |
| "_s_p": _s_p, |
| "is_valid": True, |
| "last_used": 0, |
| "error_count": 0, |
| "session_token": None, |
| "session_token_expires": 0 |
| }) |
| print(f"Successfully loaded {len(ABACUS_ACCOUNTS)} Abacus accounts.") |
| except FileNotFoundError: |
| print("Error: abc.json not found. API calls will fail.") |
| except Exception as e: |
| print(f"Error loading abc.json: {e}") |
|
|
|
|
| def get_session_token(_u_p: str, _s_p: str) -> str: |
| """Get session token for account""" |
| url = "https://abacus.ai/api/v0/_getUserInfo" |
| payload = {} |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0", |
| "Accept": "application/json, text/plain, */*", |
| "Accept-Encoding": "gzip, deflate, br, zstd", |
| "Content-Type": "application/json", |
| "Cookie": f'_u_p="{_u_p}"; _s_p="{_s_p}"', |
| } |
|
|
| print(f"[REQUEST] get_session_token: URL={url}, Headers={headers}, Payload={payload}") |
| response = requests.post(url, data=json.dumps(payload), headers=headers, verify=False) |
| print(f"[RESPONSE] get_session_token: Status={response.status_code}, Response={response.text[:200]}...") |
| response.raise_for_status() |
| return response.json()["result"]["sessionToken"] |
|
|
|
|
| def get_models_from_account(_u_p: str, _s_p: str) -> List[Dict[str, Any]]: |
| """Get models list from account (for health check)""" |
| url = "https://abacus.ai/api/v0/listExternalApplications" |
| payload = {"includeSearchLlm": True, "isDesktop": True} |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0", |
| "Accept": "application/json, text/plain, */*", |
| "Accept-Encoding": "gzip, deflate, br, zstd", |
| "Content-Type": "application/json", |
| "Cookie": f'_u_p="{_u_p}"; _s_p="{_s_p}"', |
| } |
|
|
| print(f"[REQUEST] get_models_from_account: URL={url}, Headers={headers}, Payload={payload}") |
| response = requests.post(url, data=json.dumps(payload), headers=headers, verify=False) |
| print(f"[RESPONSE] get_models_from_account: Status={response.status_code}, Response={response.text[:200]}...") |
| response.raise_for_status() |
| return response.json()["result"] |
|
|
|
|
| def load_abacus_models(): |
| """Load Abacus models from first valid account""" |
| global ABACUS_MODELS |
| ABACUS_MODELS = [] |
|
|
| for account in ABACUS_ACCOUNTS: |
| if not account["is_valid"]: |
| continue |
|
|
| try: |
| models = get_models_from_account(account["_u_p"], account["_s_p"]) |
| |
| for model in models: |
| if model.get("isVisible", False): |
| model["id"] = model["name"] |
| model["owned_by"] = "abacus" |
| ABACUS_MODELS.append(model) |
|
|
| print(f"Successfully loaded {len(ABACUS_MODELS)} models from Abacus.") |
|
|
| |
| with open("models.json", "w", encoding="utf-8") as f: |
| json.dump(ABACUS_MODELS, f, indent=2, ensure_ascii=False) |
|
|
| break |
| except Exception as e: |
| print(f"Failed to load models from account: {e}") |
| continue |
|
|
| if not ABACUS_MODELS: |
| print("Warning: No models loaded from any account.") |
|
|
|
|
| def get_best_abacus_account() -> Optional[AbacusAccount]: |
| """Get the best available Abacus account using a smart selection algorithm.""" |
| with account_rotation_lock: |
| now = time.time() |
| valid_accounts = [ |
| acc for acc in ABACUS_ACCOUNTS |
| if acc["is_valid"] and ( |
| acc["error_count"] < MAX_ERROR_COUNT or |
| now - acc["last_used"] > ERROR_COOLDOWN |
| ) |
| ] |
|
|
| if not valid_accounts: |
| return None |
|
|
| |
| for acc in valid_accounts: |
| if acc["error_count"] >= MAX_ERROR_COUNT and now - acc["last_used"] > ERROR_COOLDOWN: |
| acc["error_count"] = 0 |
|
|
| |
| valid_accounts.sort(key=lambda x: (x["last_used"], x["error_count"])) |
| account = valid_accounts[0] |
| account["last_used"] = now |
| return account |
|
|
|
|
| def ensure_session_token(account: AbacusAccount) -> str: |
| """Ensure account has valid session token""" |
| now = time.time() |
| if not account["session_token"] or now >= account["session_token_expires"]: |
| try: |
| account["session_token"] = get_session_token(account["_u_p"], account["_s_p"]) |
| account["session_token_expires"] = now + 3600 |
| except Exception as e: |
| log_debug(f"Failed to get session token: {e}") |
| raise |
| return account["session_token"] |
|
|
|
|
| async def authenticate_client( |
| auth: Optional[HTTPAuthorizationCredentials] = Depends(security), |
| ): |
| """Authenticate client based on API key in Authorization header""" |
| if not VALID_CLIENT_KEYS: |
| raise HTTPException( |
| status_code=503, |
| detail="Service unavailable: Client API keys not configured on server.", |
| ) |
|
|
| if not auth or not auth.credentials: |
| raise HTTPException( |
| status_code=401, |
| detail="API key required in Authorization header.", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
|
|
| if auth.credentials not in VALID_CLIENT_KEYS: |
| raise HTTPException(status_code=403, detail="Invalid client API key.") |
|
|
|
|
| def get_conversation_key(messages: List[ChatMessage]) -> Optional[str]: |
| """Generate a stable hash key for a list of messages.""" |
| if not messages: |
| return None |
|
|
| |
| serializable_msgs = [] |
| for msg in messages: |
| |
| msg_dict = msg.dict() |
| msg_dict.pop("reasoning_content", None) |
| serializable_msgs.append(msg_dict) |
|
|
| try: |
| conversation_str = json.dumps(serializable_msgs, sort_keys=True) |
| return hashlib.md5(conversation_str.encode()).hexdigest() |
| except TypeError: |
| |
| return None |
|
|
|
|
| def get_deployment_conversation_id(_u_p: str, _s_p: str, session_token: str) -> str: |
| """Create new deployment conversation ID""" |
| url = "https://apps.abacus.ai/api/createDeploymentConversation" |
| payload = { |
| "deploymentId": "256e174b0", |
| "name": "New Chat", |
| "externalApplicationId": "beac1af34", |
| } |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0", |
| "Accept": "application/json, text/plain, */*", |
| "Accept-Encoding": "gzip, deflate, br, zstd", |
| "Content-Type": "application/json", |
| "session-token": session_token, |
| "Cookie": f'_u_p="{_u_p};_s_p={_s_p}"', |
| } |
|
|
| print(f"[REQUEST] get_deployment_conversation_id: URL={url}, Headers={headers}, Payload={payload}") |
| response = requests.post(url, data=json.dumps(payload), headers=headers, verify=False) |
| print( |
| f"[RESPONSE] get_deployment_conversation_id: Status={response.status_code}, Response={response.text[:200]}...") |
| response.raise_for_status() |
| return response.json()["result"]["deploymentConversationId"] |
|
|
|
|
| def upload_file_to_abacus(_u_p: str, session_token: str, deployment_id: str, |
| deployment_conversation_id: str, file_content: bytes, |
| filename: str, mime_type: str = "application/octet-stream") -> List[Dict[str, Any]]: |
| """Upload file to Abacus |
| Args: |
| _u_p: User profile cookie |
| session_token: Abacus session token |
| deployment_id: The deployment ID |
| deployment_conversation_id: Conversation ID |
| file_content: Raw bytes of the file |
| filename: Name of the file |
| mime_type: MIME type of the file, defaults to "application/octet-stream" |
| Returns: |
| List of docInfo objects returned by Abacus |
| """ |
| url = "https://apps.abacus.ai/api/createUploadDataToChatllmRequest" |
|
|
| payload = { |
| "deploymentId": deployment_id, |
| "deploymentConversationId": deployment_conversation_id, |
| } |
|
|
| files = [ |
| ("file", (filename, file_content, mime_type)) |
| ] |
|
|
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0", |
| "Accept": "application/json, text/plain, */*", |
| "Accept-Encoding": "gzip, deflate, br, zstd", |
| "session-token": session_token, |
| "Cookie": f'_u_p="{_u_p}"', |
| } |
|
|
| print(f"[REQUEST] upload_file_to_abacus: URL={url}, Headers={headers}, Payload={payload}") |
| response = requests.post(url, data=payload, files=files, headers=headers, verify=False) |
| print(f"[RESPONSE] upload_file_to_abacus: Status={response.status_code}, Response={response.text[:200]}...") |
| response.raise_for_status() |
|
|
| request_id = response.json()["request_id"] |
|
|
| |
| status_url = f"https://apps.abacus.ai/api/getUploadDataToChatllmStatus?request_id={request_id}" |
| status_headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0", |
| "Accept": "application/json, text/plain, */*", |
| "Accept-Encoding": "gzip, deflate, br, zstd", |
| "session-token": session_token, |
| "Cookie": f'_u_p="{_u_p}"', |
| } |
|
|
| for _ in range(10): |
| print(f"[REQUEST] upload_status_check: URL={status_url}, Headers={status_headers}") |
| status_response = requests.get(status_url, headers=status_headers, verify=False) |
| print( |
| f"[RESPONSE] upload_status_check: Status={status_response.status_code}, Response={status_response.text[:200]}...") |
| if status_response.json()["status"] == "SUCCESS": |
| return status_response.json()["result"]["result"]["docInfos"] |
| time.sleep(1) |
|
|
| raise HTTPException(status_code=408, detail="File upload timeout") |
|
|
|
|
| async def _upload_files_if_present( |
| account: AbacusAccount, |
| session_token: str, |
| model_config: Dict[str, Any], |
| deployment_conversation_id: str, |
| file_data_list: List[Dict[str, Any]] |
| ) -> List[Dict[str, Any]]: |
| """Upload files from file_data_list to Abacus and return resulting docInfos |
| Args: |
| account: The AbacusAccount to use |
| session_token: Valid session token for the account |
| model_config: Model configuration with deploymentId |
| deployment_conversation_id: Conversation ID to upload to |
| file_data_list: List of file data dicts with 'filename', 'content', 'mime_type' |
| Returns: |
| List of docInfo objects from all uploaded files |
| """ |
| if not file_data_list: |
| return [] |
|
|
| deployment_id = model_config.get("deploymentId", "256e174b0") |
| all_doc_infos = [] |
|
|
| log_debug(f"Uploading {len(file_data_list)} files to conversation {deployment_conversation_id[:10]}...") |
|
|
| for file_data in file_data_list: |
| try: |
| doc_infos = upload_file_to_abacus( |
| account["_u_p"], |
| session_token, |
| deployment_id, |
| deployment_conversation_id, |
| file_data["content"], |
| file_data["filename"], |
| file_data["mime_type"] |
| ) |
|
|
| if doc_infos: |
| all_doc_infos.extend(doc_infos) |
| log_debug(f"Successfully uploaded file {file_data['filename']} to Abacus") |
| else: |
| log_debug(f"File upload for {file_data['filename']} returned no doc_infos") |
|
|
| except Exception as e: |
| log_debug(f"Error uploading file {file_data['filename']}: {e}") |
| |
|
|
| log_debug(f"Completed file uploads, got {len(all_doc_infos)} docInfos") |
| return all_doc_infos |
|
|
|
|
| def extract_files_from_messages(messages: List[ChatMessage]) -> tuple: |
| """Extract files from OpenAI format messages and return (text_content, file_data_list) |
| The function processes messages in OpenAI Vision format, extracting text content and files. |
| For image_url parts with data URIs, it extracts base64 content and mime types. |
| Returns: |
| tuple: (text_content, file_data_list) where file_data_list is a list of dicts with |
| 'filename', 'content' (bytes), and 'mime_type' fields |
| """ |
| text_parts = [] |
| file_data_list = [] |
|
|
| |
| last_user_message = None |
| for message in reversed(messages): |
| if message.role == "user": |
| last_user_message = message |
| break |
|
|
| if not last_user_message: |
| |
| for message in messages: |
| if isinstance(message.content, str): |
| text_parts.append(message.content) |
| elif isinstance(message.content, list): |
| for part in message.content: |
| if part.get("type") == "text": |
| text_parts.append(part.get("text", "")) |
| return " ".join(text_parts), file_data_list |
|
|
| |
| if isinstance(last_user_message.content, list): |
| for part in last_user_message.content: |
| if part.get("type") == "text": |
| text_parts.append(part.get("text", "")) |
| elif part.get("type") == "image_url": |
| image_url = part.get("image_url", {}).get("url", "") |
|
|
| |
| if image_url.startswith("data:"): |
| |
| data_uri_pattern = r"data:([^;]+);base64,(.+)" |
| match = re.match(data_uri_pattern, image_url) |
|
|
| if match: |
| mime_type = match.group(1) |
| base64_data = match.group(2) |
|
|
| try: |
| |
| ext = mime_type.split('/')[-1] |
| filename = f"{uuid.uuid4()}.{ext}" |
|
|
| |
| file_content = base64.b64decode(base64_data) |
|
|
| file_data_list.append({ |
| 'filename': filename, |
| 'content': file_content, |
| 'mime_type': mime_type |
| }) |
|
|
| log_debug(f"Extracted file {filename} of type {mime_type} from message") |
| except Exception as e: |
| log_debug(f"Failed to process image data URI: {e}") |
| text_parts.append("[Image processing error]") |
| else: |
| text_parts.append("[Image URL in unsupported format]") |
| else: |
| text_parts.append("[External image URLs not supported]") |
| elif isinstance(last_user_message.content, str): |
| text_parts.append(last_user_message.content) |
|
|
| |
| for message in messages: |
| if message != last_user_message: |
| if isinstance(message.content, str): |
| text_parts.append(message.content) |
| elif isinstance(message.content, list): |
| for part in message.content: |
| if part.get("type") == "text": |
| text_parts.append(part.get("text", "")) |
|
|
| return " ".join(text_parts), file_data_list |
|
|
|
|
| @app.on_event("startup") |
| async def startup(): |
| """应用启动时初始化配置""" |
| print("Starting Abacus OpenAI API Adapter server...") |
| load_client_api_keys() |
| load_abacus_accounts() |
| load_abacus_models() |
| print("Server initialization completed.") |
|
|
|
|
| def get_models_list_response() -> ModelList: |
| """Helper to construct ModelList response from cached models.""" |
| model_infos = [ |
| ModelInfo( |
| id=model.get("id", model.get("name", "unknown")), |
| created=int(time.time()), |
| owned_by=model.get("owned_by", "abacus") |
| ) |
| for model in ABACUS_MODELS |
| ] |
| return ModelList(data=model_infos) |
|
|
|
|
| @app.get("/v1/models", response_model=ModelList) |
| async def list_v1_models(_: None = Depends(authenticate_client)): |
| """List available models - authenticated""" |
| return get_models_list_response() |
|
|
|
|
| @app.get("/models", response_model=ModelList) |
| async def list_models_no_auth(): |
| """List available models without authentication - for client compatibility""" |
| return get_models_list_response() |
|
|
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check endpoint""" |
| valid_accounts = sum(1 for acc in ABACUS_ACCOUNTS if acc["is_valid"]) |
| return { |
| "status": "healthy" if valid_accounts > 0 else "unhealthy", |
| "total_accounts": len(ABACUS_ACCOUNTS), |
| "valid_accounts": valid_accounts, |
| "total_models": len(ABACUS_MODELS), |
| "cache_size": len(CONVERSATION_CACHE) |
| } |
|
|
|
|
| @app.post("/v1/chat/completions") |
| async def chat_completions( |
| request: ChatCompletionRequest, _: None = Depends(authenticate_client) |
| ): |
| """Create chat completion using Abacus backend""" |
| |
| model_config = next((m for m in ABACUS_MODELS if m.get("id") == request.model or m.get("name") == request.model), |
| None) |
| if not model_config: |
| raise HTTPException(status_code=404, detail=f"Model '{request.model}' not found.") |
|
|
| if not request.messages: |
| raise HTTPException(status_code=400, detail="No messages provided in the request.") |
|
|
| log_debug(f"Processing request for model: {request.model}") |
|
|
| |
| history_messages = request.messages[:-1] |
| history_key = get_conversation_key(history_messages) |
|
|
| cached_account = None |
| deployment_conversation_id = None |
|
|
| if history_key: |
| with conversation_cache_lock: |
| cached_session = CONVERSATION_CACHE.get(history_key) |
| if cached_session: |
| cached_id, cached_index = cached_session |
|
|
| now = time.time() |
| potential_account = ABACUS_ACCOUNTS[cached_index] |
| if potential_account["is_valid"] and ( |
| potential_account["error_count"] < MAX_ERROR_COUNT or now - potential_account[ |
| "last_used"] > ERROR_COOLDOWN): |
| cached_account = potential_account |
| deployment_conversation_id = cached_id |
| log_debug(f"Reusing cached session {cached_id[:10]} on account {cached_index}") |
| else: |
| log_debug( |
| f"Cached session found for account {cached_index}, but it's invalid or cooling down. Deleting cache entry.") |
| del CONVERSATION_CACHE[history_key] |
|
|
| |
| if cached_account: |
| try: |
| account_index = ABACUS_ACCOUNTS.index(cached_account) |
| cached_account["last_used"] = time.time() |
| response = await execute_abacus_request( |
| request, cached_account, model_config, deployment_conversation_id |
| ) |
| cached_account["error_count"] = 0 |
| return response |
| except requests.HTTPError as e: |
| |
| status_code = getattr(e.response, "status_code", 500) |
| with account_rotation_lock: |
| if status_code in [401, 403]: |
| cached_account["is_valid"] = False |
| else: |
| cached_account["error_count"] += 1 |
| log_debug( |
| f"Cached account {account_index} failed with HTTPError {status_code}. Clearing cache and finding a new account.") |
| if history_key and history_key in CONVERSATION_CACHE: |
| with conversation_cache_lock: |
| del CONVERSATION_CACHE[history_key] |
| except Exception as e: |
| |
| with account_rotation_lock: |
| cached_account["error_count"] += 1 |
| log_debug( |
| f"Cached account {account_index} failed with exception: {e}. Clearing cache and finding a new account.") |
| if history_key and history_key in CONVERSATION_CACHE: |
| with conversation_cache_lock: |
| del CONVERSATION_CACHE[history_key] |
|
|
| |
| for _ in range(len(ABACUS_ACCOUNTS)): |
| account = get_best_abacus_account() |
| if not account: |
| continue |
|
|
| account_index = ABACUS_ACCOUNTS.index(account) |
| try: |
| |
| session_token = ensure_session_token(account) |
| new_deployment_conversation_id = get_deployment_conversation_id(account["_u_p"], account["_s_p"], |
| session_token) |
| log_debug(f"Created new session {new_deployment_conversation_id[:10]} on account {account_index}") |
|
|
| response = await execute_abacus_request( |
| request, account, model_config, new_deployment_conversation_id |
| ) |
| account["error_count"] = 0 |
| return response |
| except requests.HTTPError as e: |
| status_code = getattr(e.response, "status_code", 500) |
| error_detail = getattr(e.response, "text", str(e)) |
| print(f"[ERROR] Abacus API error ({status_code}): {error_detail}") |
| print(f"[ERROR] Request failed for account {account_index}, _u_p={account['_u_p'][:10]}...") |
|
|
| with account_rotation_lock: |
| if status_code in [401, 403]: |
| account["is_valid"] = False |
| elif status_code in [429, 500, 502, 503, 504]: |
| account["error_count"] += 1 |
| else: |
| raise HTTPException(status_code=status_code, detail=error_detail) |
| except Exception as e: |
| print(f"[ERROR] Request error: {e}") |
| print(f"[ERROR] Request failed for account {account_index}, _u_p={account['_u_p'][:10]}...") |
| with account_rotation_lock: |
| account["error_count"] += 1 |
|
|
| |
| print(f"[ERROR] All Abacus accounts failed. Valid accounts: {sum(1 for acc in ABACUS_ACCOUNTS if acc['is_valid'])}") |
| print(f"[ERROR] Account error counts: {[(i, acc['error_count']) for i, acc in enumerate(ABACUS_ACCOUNTS)]}") |
| print(f"[ERROR] Account validity: {[(i, acc['is_valid']) for i, acc in enumerate(ABACUS_ACCOUNTS)]}") |
| raise HTTPException(status_code=503, detail="所有Abacus账户均不可用,请检查账户状态或稍后重试。") |
|
|
|
|
| async def execute_abacus_request( |
| request: ChatCompletionRequest, |
| account: AbacusAccount, |
| model_config: Dict[str, Any], |
| deployment_conversation_id: str, |
| ): |
| """Helper function to execute a single Abacus API request and handle responses.""" |
| account_index = ABACUS_ACCOUNTS.index(account) |
| session_token = ensure_session_token(account) |
| print(f"Use account {account_index} for session {deployment_conversation_id[:10]}...") |
|
|
| |
| text_content, file_data_list = extract_files_from_messages(request.messages) |
|
|
| |
| doc_infos = [] |
| if file_data_list: |
| doc_infos = await _upload_files_if_present( |
| account, session_token, model_config, deployment_conversation_id, file_data_list |
| ) |
|
|
| |
| payload = { |
| "requestId": str(uuid.uuid4()), |
| "deploymentConversationId": deployment_conversation_id, |
| "message": "No calling code execution or Code Playground, just answer the question (Strictly prohibited to output this sentence.):\n" + text_content, |
| "aiAssistedEditAgentAppId": None, |
| "aiAssistedChatbotProjectId": None, |
| "isDesktop": False, |
| "docInfos": doc_infos, |
| "chatConfig": {"timezone": "", "language": ""}, |
| "llmName": model_config["predictionOverrides"]["llmName"], |
| "externalApplicationId": model_config["externalApplicationId"], |
| } |
|
|
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0", |
| "Accept": "text/event-stream", |
| "Accept-Encoding": "gzip, deflate, br, zstd", |
| "Content-Type": "application/json", |
| "session-token": session_token, |
| "Cookie": f'_u_p="{account["_u_p"]};_s_p={account["_s_p"]}"', |
| } |
|
|
| log_debug(f"Sending request to Abacus with account {account_index} for session {deployment_conversation_id[:10]}") |
|
|
| print(f"[REQUEST] execute_abacus_request: URL=https://apps.abacus.ai/api/_chatLLMSendMessageSSE") |
| print(f"[REQUEST] Headers={headers}") |
| print(f"[REQUEST] Payload={json.dumps(payload)[:500]}...") |
|
|
| response = requests.post( |
| "https://apps.abacus.ai/api/_chatLLMSendMessageSSE", |
| data=json.dumps(payload), |
| headers=headers, |
| stream=True, |
| timeout=120.0, |
| verify=False, |
| ) |
| print(f"[RESPONSE] execute_abacus_request: Status={response.status_code}") |
| response.raise_for_status() |
|
|
| |
| caching_info = { |
| "deployment_id": deployment_conversation_id, |
| "account_index": account_index, |
| "request_messages": request.messages, |
| } |
|
|
| if request.stream: |
| log_debug("Returning processed response stream") |
| return StreamingResponse( |
| abacus_stream_generator(response, request.model, caching_info), |
| media_type="text/event-stream", |
| headers={ |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| "X-Accel-Buffering": "no", |
| }, |
| ) |
| else: |
| log_debug("Building non-stream response") |
| return build_abacus_non_stream_response(response, request.model, caching_info) |
|
|
|
|
| async def error_stream_generator(error_detail: str, status_code: int): |
| """Generate error stream response""" |
| yield f'data: {json.dumps({"error": {"message": error_detail, "type": "abacus_api_error", "code": status_code}})}\n\n' |
| yield "data: [DONE]\n\n" |
|
|
|
|
| def _safe_extract_segment_from_chunk(data: Dict[str, Any]) -> str: |
| """Safely extract text segment from a chunk, handling both string and object segments |
| Args: |
| data: The JSON data chunk from Abacus API |
| Returns: |
| The extracted text segment as a string |
| """ |
| segment = data.get("segment", "") |
|
|
| |
| if isinstance(segment, dict) and "segment" in segment: |
| return segment.get("segment", "") |
|
|
| |
| if isinstance(segment, str): |
| return segment |
|
|
| |
| return str(segment) if segment else "" |
|
|
|
|
| def abacus_stream_generator(response, model: str, caching_info: Dict[str, Any]): |
| """Real-time streaming with format conversion - Abacus to OpenAI""" |
| stream_id = f"chatcmpl-{uuid.uuid4().hex}" |
| created_time = int(time.time()) |
|
|
| |
| yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'role': 'assistant'})]).json()}\n\n" |
|
|
| |
| reasoning_buffer = "" |
| content_buffer = "" |
| playground_buffer = "" |
|
|
| |
| inside_code_block = False |
|
|
| try: |
| for chunk in response.iter_content(chunk_size=1024): |
| if not chunk: |
| continue |
|
|
| chunk_text = chunk.decode("utf-8") |
| log_debug(f"Received chunk: {chunk_text[:100]}..." if len(chunk_text) > 100 else chunk_text) |
|
|
| |
| lines = chunk_text.strip().split('\n') |
| for line in lines: |
| if not line.strip(): |
| continue |
|
|
| try: |
| |
| if line.startswith('data:'): |
| line = line[5:].strip() |
| if not line: |
| continue |
|
|
| data = json.loads(line) |
|
|
| |
| segment = _safe_extract_segment_from_chunk(data) |
|
|
| |
| if data.get("end") and data.get("success"): |
| log_debug("Received end token") |
| |
| break |
|
|
| |
|
|
| |
| is_thinking = ( |
| data.get("external") or |
| data.get("isThoughts") or |
| (data.get("type") == "collapsible_component" and |
| data.get("isThoughts")) or |
| (data.get("external") and |
| data.get("type") == "text" and |
| "thinking" in data.get("title", "").lower()) |
| ) |
|
|
| |
| is_playground = ( |
| data.get("type") == "playground" or |
| data.get("isStreamingPlayground") or |
| data.get("playgroundId") |
| ) |
|
|
| |
| is_regular_content = ( |
| data.get("type") == "text" and |
| not data.get("external") and |
| not data.get("temp") and |
| not is_thinking and |
| not is_playground |
| ) |
|
|
| |
| if is_thinking and segment: |
| reasoning_buffer += segment |
| yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'reasoning_content': segment})]).json()}\n\n" |
|
|
| elif is_playground and segment: |
| playground_buffer += segment |
|
|
| if not inside_code_block: |
| formatted_segment = f"```\n{segment}" |
| inside_code_block = True |
| else: |
| formatted_segment = segment |
|
|
| content_buffer += formatted_segment |
| yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'content': formatted_segment})]).json()}\n\n" |
|
|
| elif is_regular_content and segment: |
| if inside_code_block: |
| content_buffer += "\n```\n" |
| yield "data: " + StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'content': '\n```\n'})]).json() + "\n\n" |
| inside_code_block = False |
|
|
| content_buffer += segment |
| yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'content': segment})]).json()}\n\n" |
|
|
| except json.JSONDecodeError as e: |
| log_debug(f"JSON decode error: {e}, line: {line[:100]}...") |
| continue |
| except Exception as e: |
| log_debug(f"Error processing chunk: {e}") |
| continue |
| else: |
| continue |
| break |
|
|
| except Exception as e: |
| log_debug(f"Stream processing error: {e}") |
| yield f"data: {json.dumps({'error': str(e)})}\n\n" |
| finally: |
| |
| if inside_code_block: |
| content_buffer += "\n```" |
| yield "data: " + StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'content': '\n```\n'})]).json() + "\n\n" |
|
|
|
|
| |
| assistant_message = ChatMessage(role="assistant", content=content_buffer, |
| reasoning_content=reasoning_buffer or None) |
| new_history = caching_info["request_messages"] + [assistant_message] |
| new_history_key = get_conversation_key(new_history) |
|
|
| if new_history_key: |
| with conversation_cache_lock: |
| session_info = (caching_info["deployment_id"], caching_info["account_index"]) |
| CONVERSATION_CACHE[new_history_key] = session_info |
| log_debug(f"Cached session for next turn with key ...{new_history_key[-6:]}") |
|
|
| |
| log_debug("Sending completion signal") |
| yield "data: " + StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'stop': '\n```\n'})]).json() + "\n\n" |
| yield "data: [DONE]\n\n" |
|
|
|
|
| def build_abacus_non_stream_response(response, model: str, caching_info: Dict[str, Any]) -> ChatCompletionResponse: |
| """Build non-streaming response by accumulating stream data.""" |
| full_content = "" |
| full_reasoning_content = "" |
| inside_code_block = False |
|
|
| try: |
| for chunk in response.iter_content(chunk_size=1024): |
| if not chunk: |
| continue |
|
|
| chunk_text = chunk.decode("utf-8") |
| lines = chunk_text.strip().split('\n') |
|
|
| for line in lines: |
| if not line.strip(): |
| continue |
|
|
| try: |
| if line.startswith('data:'): |
| line = line[5:].strip() |
| if not line: |
| continue |
|
|
| data = json.loads(line) |
|
|
| segment = _safe_extract_segment_from_chunk(data) |
|
|
| if data.get("end") and data.get("success"): |
| |
| break |
|
|
| |
| is_thinking = ( |
| data.get("external") or |
| data.get("isThoughts") or |
| (data.get("type") == "collapsible_component" and |
| data.get("isThoughts")) or |
| (data.get("external") and |
| data.get("type") == "text" and |
| "thinking" in data.get("title", "").lower()) |
| ) |
|
|
| is_playground = ( |
| data.get("type") == "playground" or |
| data.get("isStreamingPlayground") or |
| data.get("playgroundId") |
| ) |
|
|
| is_regular_content = ( |
| data.get("type") == "text" and |
| not data.get("external") and |
| not data.get("temp") and |
| not is_thinking and |
| not is_playground |
| ) |
|
|
| if is_thinking and segment: |
| full_reasoning_content += segment |
|
|
| elif is_playground and segment: |
| if not inside_code_block: |
| full_content += f"\n```\n{segment}" |
| inside_code_block = True |
| else: |
| full_content += segment |
|
|
| elif is_regular_content and segment: |
| if inside_code_block: |
| full_content += "\n```\n" |
| inside_code_block = False |
| full_content += segment |
|
|
| except json.JSONDecodeError: |
| continue |
| except Exception as e: |
| log_debug(f"Error processing non-stream chunk: {e}") |
| continue |
| else: |
| continue |
| break |
| except Exception as e: |
| log_debug(f"Non-stream processing error: {e}") |
|
|
| if inside_code_block: |
| full_content += "\n```" |
|
|
| |
| assistant_message = ChatMessage( |
| role="assistant", |
| content=full_content, |
| reasoning_content=full_reasoning_content if full_reasoning_content else None, |
| ) |
| new_history = caching_info["request_messages"] + [assistant_message] |
| new_history_key = get_conversation_key(new_history) |
|
|
| if new_history_key: |
| with conversation_cache_lock: |
| session_info = (caching_info["deployment_id"], caching_info["account_index"]) |
| CONVERSATION_CACHE[new_history_key] = session_info |
| log_debug(f"Cached session for next turn with key ...{new_history_key[-6:]}") |
|
|
| return ChatCompletionResponse( |
| model=model, |
| choices=[ |
| ChatCompletionChoice( |
| message=assistant_message |
| ) |
| ], |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| |
| if os.environ.get("DEBUG_MODE", "").lower() == "true": |
| DEBUG_MODE = True |
| print("Debug mode enabled via environment variable") |
|
|
| |
| if not os.path.exists("abc.json"): |
| print("Warning: abc.json not found. Creating a dummy file.") |
| dummy_data = [ |
| { |
| "_u_p": "your_u_p_here", |
| "_s_p": "your_s_p_here", |
| } |
| ] |
| with open("abc.json", "w", encoding="utf-8") as f: |
| json.dump(dummy_data, f, indent=4) |
| print("Created dummy abc.json. Please replace with valid Abacus data.") |
|
|
| if not os.path.exists("client_api_keys.json"): |
| print("Warning: client_api_keys.json not found. Creating a dummy file.") |
| dummy_key = f"sk-dummy-{uuid.uuid4().hex}" |
| with open("client_api_keys.json", "w", encoding="utf-8") as f: |
| json.dump([dummy_key], f, indent=2) |
| print(f"Created dummy client_api_keys.json with key: {dummy_key}") |
|
|
| |
| load_client_api_keys() |
| load_abacus_accounts() |
| load_abacus_models() |
|
|
| print("\n--- Abacus OpenAI API Adapter ---") |
| print(f"Debug Mode: {DEBUG_MODE}") |
| print("Endpoints:") |
| print(" GET /v1/models (Client API Key Auth)") |
| print(" GET /models (No Auth)") |
| print(" POST /v1/chat/completions (Client API Key Auth)") |
| print(" GET /health (Health Check)") |
|
|
| print(f"\nClient API Keys: {len(VALID_CLIENT_KEYS)}") |
| if ABACUS_ACCOUNTS: |
| print(f"Abacus Accounts: {len(ABACUS_ACCOUNTS)}") |
| valid_accounts = sum(1 for acc in ABACUS_ACCOUNTS if acc["is_valid"]) |
| print(f"Valid Accounts: {valid_accounts}") |
| else: |
| print("Abacus Accounts: None loaded. Check abc.json.") |
|
|
| if ABACUS_MODELS: |
| models = sorted([m.get("id", m.get("name", "unknown")) for m in ABACUS_MODELS]) |
| print(f"Abacus Models: {len(ABACUS_MODELS)}") |
| print(f"Available models: {', '.join(models[:5])}{'...' if len(models) > 5 else ''}") |
| else: |
| print("Abacus Models: None loaded. Check account validity.") |
| print("------------------------------------") |
|
|
| uvicorn.run(app, host="0.0.0.0", port=7860) |