Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Multi-Model AI API β HuggingFace Spaces Edition | |
| With load balancing, 10 req/s rate limiting, vision support, and multimodal fixes. | |
| """ | |
| import re, os, json, uuid, time, random, string, logging, threading, base64 | |
| from abc import ABC, abstractmethod | |
| from collections import deque | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, Generator, List, Optional, Tuple, Union | |
| from io import BytesIO | |
| import requests | |
| from flask import Flask, request as freq, jsonify, Response, stream_with_context | |
| try: | |
| from gradio_client import Client as GradioClient, handle_file | |
| HAS_GRADIO_CLIENT = True | |
| except ImportError: | |
| HAS_GRADIO_CLIENT = False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIG & CONSTANTS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| VERSION = "3.0.0-hf-lb" | |
| APP_NAME = "Multi-Model-AI-API" | |
| DEFAULT_SYSTEM_PROMPT = "You are a helpful, friendly AI assistant." | |
| DEFAULT_MODEL = "gpt-oss-120b" | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| log = logging.getLogger(APP_NAME) | |
| USER_AGENTS = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/144.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/605.1.15 Safari/605.1.15", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/143.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0", | |
| ] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MULTIMODAL HELPERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_text_and_images(content: Any) -> Tuple[str, List[str]]: | |
| """ | |
| Parse OpenAI-style multimodal content. | |
| Returns (text, [base64_or_url, ...]) | |
| Handles: str, list of {type, text/image_url} | |
| """ | |
| if content is None: | |
| return "", [] | |
| if isinstance(content, str): | |
| return content.strip(), [] | |
| texts: List[str] = [] | |
| images: List[str] = [] | |
| if isinstance(content, list): | |
| for block in content: | |
| if not isinstance(block, dict): | |
| texts.append(str(block)) | |
| continue | |
| btype = block.get("type", "") | |
| if btype == "text": | |
| t = block.get("text", "") | |
| if t: | |
| texts.append(t) | |
| elif btype == "image_url": | |
| img = block.get("image_url", {}) | |
| url = img.get("url", "") if isinstance(img, dict) else str(img) | |
| if url: | |
| images.append(url) | |
| elif btype == "image": | |
| # Alternative format | |
| src = block.get("source", {}) | |
| if isinstance(src, dict): | |
| data = src.get("data", "") | |
| if data: | |
| media = src.get("media_type", "image/jpeg") | |
| images.append(f"data:{media};base64,{data}") | |
| return " ".join(texts).strip(), images | |
| def decode_image_to_bytes(image_url: str) -> Optional[Tuple[bytes, str]]: | |
| """Convert image URL or data URI to (bytes, media_type).""" | |
| try: | |
| if image_url.startswith("data:"): | |
| # data:image/jpeg;base64,/9j/... | |
| header, data = image_url.split(",", 1) | |
| media_type = header.split(";")[0].split(":")[1] | |
| return base64.b64decode(data), media_type | |
| else: | |
| # Remote URL | |
| r = requests.get(image_url, timeout=15) | |
| r.raise_for_status() | |
| ct = r.headers.get("content-type", "image/jpeg").split(";")[0] | |
| return r.content, ct | |
| except Exception as e: | |
| log.warning(f"Failed to decode image: {e}") | |
| return None | |
| def save_image_temp(image_url: str) -> Optional[str]: | |
| """Save image to a temp file and return path (for gradio_client).""" | |
| import tempfile | |
| result = decode_image_to_bytes(image_url) | |
| if not result: | |
| return None | |
| data, media_type = result | |
| ext = media_type.split("/")[-1].replace("jpeg", "jpg") | |
| with tempfile.NamedTemporaryFile(suffix=f".{ext}", delete=False) as f: | |
| f.write(data) | |
| return f.name | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODEL REGISTRY | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ModelDef: | |
| model_id: str | |
| display_name: str | |
| provider_type: str | |
| space_id: str | |
| owned_by: str | |
| description: str = "" | |
| supports_system_prompt: bool = True | |
| supports_temperature: bool = True | |
| supports_streaming: bool = True | |
| supports_history: bool = True | |
| supports_vision: bool = False | |
| supports_thinking: bool = False | |
| thinking_default: bool = True | |
| max_tokens_default: int = 4096 | |
| default_temperature: float = 0.7 | |
| fn_index: Optional[int] = None | |
| api_name: Optional[str] = None | |
| extra_params: Dict[str, Any] = field(default_factory=dict) | |
| clean_analysis: bool = False | |
| lb_pool_size: int = 2 | |
| lb_enabled: bool = True | |
| is_beta: bool = False | |
| MODEL_REGISTRY: Dict[str, ModelDef] = {} | |
| def register_model(m: ModelDef): | |
| MODEL_REGISTRY[m.model_id] = m | |
| def _init_registry(): | |
| register_model(ModelDef( | |
| model_id="gpt-oss-120b", display_name="AMD GPT-OSS-120B", | |
| provider_type="gradio_sse", space_id="https://amd-gpt-oss-120b-chatbot.hf.space", | |
| owned_by="amd", description="AMD open-source 120B model", | |
| fn_index=8, clean_analysis=True, default_temperature=0.0, | |
| supports_vision=False, supports_thinking=False, | |
| lb_pool_size=3, lb_enabled=True, | |
| )) | |
| register_model(ModelDef( | |
| model_id="command-a-vision", display_name="Cohere Command-A Vision", | |
| provider_type="gradio_client", space_id="CohereLabs/command-a-vision", | |
| owned_by="cohere", description="Cohere multimodal command model", | |
| api_name="/chat", supports_vision=True, supports_system_prompt=False, | |
| supports_temperature=False, supports_streaming=False, supports_history=False, | |
| supports_thinking=False, max_tokens_default=700, | |
| extra_params={"max_new_tokens": 700}, | |
| lb_pool_size=2, lb_enabled=True, | |
| )) | |
| register_model(ModelDef( | |
| model_id="command-a-translate", display_name="Cohere Command-A Translate", | |
| provider_type="gradio_client", space_id="CohereLabs/command-a-translate", | |
| owned_by="cohere", description="Cohere translation model", | |
| api_name="/chat", supports_vision=False, supports_system_prompt=False, | |
| supports_temperature=False, supports_streaming=False, supports_history=False, | |
| supports_thinking=False, max_tokens_default=700, | |
| extra_params={"max_new_tokens": 700}, | |
| lb_pool_size=1, lb_enabled=False, | |
| )) | |
| register_model(ModelDef( | |
| model_id="command-a-reasoning", display_name="Cohere Command-A Reasoning", | |
| provider_type="gradio_client", space_id="CohereLabs/command-a-reasoning", | |
| owned_by="cohere", description="Cohere reasoning model with thinking budget", | |
| api_name="/chat", supports_vision=False, supports_system_prompt=False, | |
| supports_temperature=False, supports_streaming=False, supports_history=False, | |
| supports_thinking=True, thinking_default=True, max_tokens_default=4096, | |
| extra_params={"thinking_budget": 500}, | |
| lb_pool_size=2, lb_enabled=True, | |
| )) | |
| register_model(ModelDef( | |
| model_id="minimax-vl-01", display_name="MiniMax VL-01", | |
| provider_type="gradio_client", space_id="MiniMaxAI/MiniMax-VL-01", | |
| owned_by="minimax", description="MiniMax vision-language model", | |
| api_name="/chat", supports_vision=True, supports_system_prompt=False, | |
| supports_temperature=True, supports_streaming=False, supports_history=False, | |
| supports_thinking=False, max_tokens_default=12800, default_temperature=0.1, | |
| extra_params={"max_tokens": 12800, "top_p": 0.9}, | |
| lb_pool_size=2, lb_enabled=True, | |
| )) | |
| register_model(ModelDef( | |
| model_id="glm-4.5", display_name="GLM-4.5 (ZhipuAI)", | |
| provider_type="gradio_client", space_id="zai-org/GLM-4.5-Space", | |
| owned_by="zhipuai", description="ZhipuAI GLM-4.5 with thinking mode", | |
| api_name="/chat_wrapper", supports_vision=False, supports_system_prompt=True, | |
| supports_temperature=True, supports_streaming=False, supports_history=False, | |
| supports_thinking=True, thinking_default=True, default_temperature=1.0, | |
| extra_params={"thinking_enabled": True}, | |
| lb_pool_size=2, lb_enabled=True, | |
| )) | |
| register_model(ModelDef( | |
| model_id="chatgpt", display_name="ChatGPT (Community)", | |
| provider_type="gradio_client", space_id="yuntian-deng/ChatGPT", | |
| owned_by="community", description="ChatGPT via community Space", | |
| api_name="/predict", supports_vision=False, supports_system_prompt=False, | |
| supports_temperature=True, supports_streaming=False, supports_history=True, | |
| supports_thinking=False, default_temperature=1.0, | |
| extra_params={"top_p": 1.0}, | |
| lb_pool_size=2, lb_enabled=True, | |
| )) | |
| register_model(ModelDef( | |
| model_id="qwen3-vl", display_name="Qwen3-VL (Alibaba)", | |
| provider_type="gradio_client", space_id="Qwen/Qwen3-VL-Demo", | |
| owned_by="alibaba", description="Alibaba Qwen3 Vision-Language model", | |
| api_name="/add_message", supports_vision=True, supports_system_prompt=False, | |
| supports_temperature=False, supports_streaming=False, supports_history=False, | |
| supports_thinking=False, max_tokens_default=4096, | |
| lb_pool_size=2, lb_enabled=True, | |
| )) | |
| register_model(ModelDef( | |
| model_id="qwen2.5-coder", display_name="Qwen2.5-Coder Artifacts (BETA)", | |
| provider_type="gradio_client", space_id="Qwen/Qwen2.5-Coder-Artifacts", | |
| owned_by="alibaba", description="Alibaba Qwen2.5 Coder β code generation model (BETA)", | |
| api_name="/generation_code", supports_vision=False, supports_system_prompt=True, | |
| supports_temperature=False, supports_streaming=False, supports_history=False, | |
| supports_thinking=False, max_tokens_default=4096, | |
| extra_params={ | |
| "system_prompt_override": ( | |
| "You are a helpful assistant. You are a skilled programming assistant. " | |
| "You help users write, debug, and understand code across all languages. " | |
| "Respond with clear explanations and clean code. " | |
| "Do NOT generate HTML artifacts or web page previews. " | |
| "Do NOT wrap everything in a single HTML file. " | |
| "Just provide the code the user asks for with explanations." | |
| ), | |
| }, | |
| lb_pool_size=2, lb_enabled=True, | |
| is_beta=True, | |
| )) | |
| _init_registry() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CONFIG | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Config: | |
| default_model: str = DEFAULT_MODEL | |
| default_system_prompt: str = DEFAULT_SYSTEM_PROMPT | |
| timeout_stream: int = 300 | |
| max_retries: int = 3 | |
| retry_backoff_base: float = 1.5 | |
| retry_jitter: float = 0.5 | |
| rate_limit_rps: int = 10 | |
| rate_limit_burst: int = 15 | |
| pool_size: int = 2 | |
| max_history_messages: int = 50 | |
| max_message_length: int = 32000 | |
| default_temperature: float = 0.7 | |
| include_thinking: bool = True | |
| log_sse_raw: bool = False | |
| def from_env(cls) -> "Config": | |
| cfg = cls() | |
| env_map = { | |
| "MMAI_TIMEOUT": ("timeout_stream", int), | |
| "MMAI_MAX_RETRIES": ("max_retries", int), | |
| "MMAI_RATE_LIMIT_RPS": ("rate_limit_rps", int), | |
| "MMAI_RATE_LIMIT_BURST": ("rate_limit_burst", int), | |
| "MMAI_POOL_SIZE": ("pool_size", int), | |
| "MMAI_SYSTEM_PROMPT": ("default_system_prompt", str), | |
| "MMAI_TEMPERATURE": ("default_temperature", float), | |
| "MMAI_DEFAULT_MODEL": ("default_model", str), | |
| "MMAI_INCLUDE_THINKING": ("include_thinking", | |
| lambda x: x.lower() in ("1", "true")), | |
| } | |
| for env_key, (attr, conv) in env_map.items(): | |
| val = os.environ.get(env_key) | |
| if val is not None: | |
| try: | |
| setattr(cfg, attr, conv(val)) | |
| except (ValueError, TypeError): | |
| pass | |
| return cfg | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # EXCEPTIONS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class APIError(Exception): | |
| def __init__(self, message: str, code: str = "UNKNOWN", status: int = 500): | |
| super().__init__(message) | |
| self.code = code | |
| self.status = status | |
| def to_dict(self): | |
| return {"error": str(self), "code": self.code} | |
| class ModelNotFoundError(APIError): | |
| def __init__(self, model_id: str): | |
| super().__init__( | |
| f"Model '{model_id}' not found. Available: {list(MODEL_REGISTRY.keys())}", | |
| "MODEL_NOT_FOUND", 404, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RESPONSE CLEANER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ResponseCleaner: | |
| def clean_analysis(cls, text: str) -> str: | |
| if not text: | |
| return text | |
| original = text.strip() | |
| for pattern in [ | |
| r'\*\*π¬\s*Response:\*\*\s*\n*(.*?)$', | |
| r'\*\*Response:\*\*\s*\n*(.*?)$', | |
| r'---+\s*\n*\*\*π¬\s*Response:\*\*\s*\n*(.*?)$', | |
| ]: | |
| match = re.search(pattern, original, re.DOTALL) | |
| if match: | |
| cleaned = match.group(1).strip() | |
| if cleaned: | |
| return cleaned | |
| for pattern in [r'assistantfinal\s*(.*?)$', r'assistant\s*final\s*(.*?)$']: | |
| match = re.search(pattern, original, re.DOTALL | re.IGNORECASE) | |
| if match: | |
| cleaned = match.group(1).strip() | |
| if cleaned: | |
| return cleaned | |
| if re.match(r'^analysis', original, re.IGNORECASE): | |
| return "" | |
| return original | |
| def _decode_html_entities(cls, text: str) -> str: | |
| entities = { | |
| ''': "'", ''': "'", ''': "'", | |
| '"': '"', '"': '"', '"': '"', | |
| '&': '&', '<': '<', '>': '>', | |
| ' ': ' ', '’': '\u2019', '‘': '\u2018', | |
| '”': '\u201d', '“': '\u201c', | |
| '—': 'β', '–': 'β', '…': 'β¦', | |
| } | |
| for entity, char in entities.items(): | |
| text = text.replace(entity, char) | |
| text = re.sub(r'&#x([0-9a-fA-F]+);', | |
| lambda m: chr(int(m.group(1), 16)), text) | |
| text = re.sub(r'&#(\d+);', lambda m: chr(int(m.group(1))), text) | |
| return text | |
| def _strip_html(cls, text: str) -> str: | |
| text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE) | |
| text = re.sub(r'<[^>]+>', '', text) | |
| return cls._decode_html_entities(text).strip() | |
| def clean_glm(cls, text: str, include_thinking: bool = True) -> str: | |
| if not text: | |
| return text | |
| if '<details' not in text and '<div' not in text: | |
| return text.strip() | |
| thinking_text = "" | |
| thinking_match = re.search( | |
| r'<details[^>]*>.*?<div[^>]*>(.*?)</div>\s*</details>', | |
| text, re.DOTALL | re.IGNORECASE, | |
| ) | |
| if thinking_match: | |
| thinking_text = cls._strip_html(thinking_match.group(1)).strip() | |
| text_without_details = re.sub( | |
| r'<details[^>]*>.*?</details>', '', text, | |
| flags=re.DOTALL | re.IGNORECASE, | |
| ).strip() | |
| div_match = re.search( | |
| r"<div[^>]*>\s*(.*?)\s*</div>", | |
| text_without_details, re.DOTALL | re.IGNORECASE, | |
| ) | |
| response_text = ( | |
| cls._strip_html(div_match.group(1)).strip() | |
| if div_match | |
| else cls._strip_html(text_without_details).strip() | |
| ) | |
| if thinking_text and include_thinking: | |
| return f"<thinking>\n{thinking_text}\n</thinking>\n{response_text}" | |
| return response_text | |
| def extract_qwen_text(cls, result: Any) -> str: | |
| if result is None: | |
| return "" | |
| if isinstance(result, str): | |
| return result.strip() | |
| if isinstance(result, tuple): | |
| for el in result: | |
| if isinstance(el, dict): | |
| value = el.get("value") | |
| if isinstance(value, list): | |
| for msg in reversed(value): | |
| if isinstance(msg, dict) and msg.get("role") == "assistant": | |
| content = msg.get("content", "") | |
| if isinstance(content, str): | |
| return content.strip() | |
| if isinstance(content, list): | |
| texts = [] | |
| for block in content: | |
| if isinstance(block, str): | |
| texts.append(block) | |
| elif isinstance(block, dict) and block.get("type") != "file": | |
| bc = block.get("content", "") | |
| if isinstance(bc, str) and bc.strip(): | |
| texts.append(bc) | |
| return "\n".join(t for t in texts if t.strip()).strip() | |
| return str(content) | |
| return str(result) if result else "" | |
| def extract_chatgpt_text(cls, result: Any) -> str: | |
| if isinstance(result, str): | |
| return result.strip() | |
| if isinstance(result, tuple) and len(result) >= 1: | |
| chatbot = result[0] | |
| if isinstance(chatbot, (list, tuple)) and chatbot: | |
| last = chatbot[-1] | |
| if isinstance(last, (list, tuple)) and len(last) >= 2: | |
| msg = last[1] | |
| if isinstance(msg, str): | |
| return msg.strip() | |
| if isinstance(msg, dict): | |
| return str(msg.get("value", msg.get("content", ""))).strip() | |
| return str(msg).strip() if msg else "" | |
| return str(chatbot).strip() if chatbot else "" | |
| return str(result) | |
| def extract_qwen_coder_text(cls, result: Any) -> str: | |
| if result is None: | |
| return "" | |
| if isinstance(result, str): | |
| return result.strip() | |
| if isinstance(result, tuple): | |
| if len(result) >= 1 and isinstance(result[0], str): | |
| text = result[0].strip() | |
| if text: | |
| return text | |
| if len(result) >= 2 and isinstance(result[1], str): | |
| return result[1].strip() | |
| if isinstance(result, (list, dict)): | |
| return str(result) | |
| return str(result) if result else "" | |
| def clean(cls, text: str, model_id: str = "", | |
| include_thinking: bool = True) -> str: | |
| if not text: | |
| return text | |
| text = text.strip() | |
| if model_id == "gpt-oss-120b": | |
| text = cls.clean_analysis(text) | |
| elif model_id == "glm-4.5": | |
| text = cls.clean_glm(text, include_thinking=include_thinking) | |
| if '&' in text and ';' in text: | |
| text = cls._decode_html_entities(text) | |
| return text.strip() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # THINKING PARSER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ThinkingParser: | |
| def split(text: str) -> Tuple[Optional[str], str]: | |
| match = re.match( | |
| r'\s*<thinking>\s*\n?(.*?)\n?\s*</thinking>\s*\n?(.*)', | |
| text, re.DOTALL | re.IGNORECASE, | |
| ) | |
| if match: | |
| thinking = match.group(1).strip() | |
| response = match.group(2).strip() | |
| return (thinking if thinking else None, response) | |
| return (None, text.strip()) | |
| def format(thinking: Optional[str], response: str) -> str: | |
| if thinking: | |
| return f"<thinking>\n{thinking}\n</thinking>\n{response}" | |
| return response | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DATA MODELS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Message: | |
| role: str | |
| content: str | |
| thinking: Optional[str] = None | |
| timestamp: float = field(default_factory=time.time) | |
| message_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| class Conversation: | |
| conversation_id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| messages: List[Message] = field(default_factory=list) | |
| created_at: float = field(default_factory=time.time) | |
| updated_at: float = field(default_factory=time.time) | |
| title: Optional[str] = None | |
| system_prompt: str = DEFAULT_SYSTEM_PROMPT | |
| model_id: str = DEFAULT_MODEL | |
| def add_message(self, role: str, content: str, | |
| max_messages: int = 50, | |
| thinking: Optional[str] = None) -> Message: | |
| msg = Message(role=role, content=content, thinking=thinking) | |
| self.messages.append(msg) | |
| self.updated_at = time.time() | |
| if self.title is None and role == "user": | |
| self.title = content[:80] | |
| if len(self.messages) > max_messages: | |
| system_msgs = [m for m in self.messages if m.role == "system"] | |
| other_msgs = [m for m in self.messages if m.role != "system"] | |
| self.messages = system_msgs + other_msgs[-(max_messages - len(system_msgs)):] | |
| return msg | |
| def build_gradio_history(self) -> List[List[str]]: | |
| history = [] | |
| non_system = [m for m in self.messages if m.role != "system"] | |
| i = 0 | |
| while i < len(non_system) - 1: | |
| if (non_system[i].role == "user" | |
| and i + 1 < len(non_system) | |
| and non_system[i + 1].role == "assistant"): | |
| history.append([non_system[i].content, non_system[i + 1].content]) | |
| i += 2 | |
| else: | |
| i += 1 | |
| return history | |
| def build_chatbot_tuples(self) -> List[List[str]]: | |
| return self.build_gradio_history() | |
| def to_dict(self) -> Dict: | |
| return { | |
| "conversation_id": self.conversation_id, | |
| "title": self.title, | |
| "model": self.model_id, | |
| "message_count": len(self.messages), | |
| "created_at": self.created_at, | |
| "updated_at": self.updated_at, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # METRICS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Metrics: | |
| _lock: threading.Lock = field(default_factory=threading.Lock, repr=False) | |
| total_requests: int = 0 | |
| successful_requests: int = 0 | |
| failed_requests: int = 0 | |
| total_retries: int = 0 | |
| total_chars_received: int = 0 | |
| active_streams: int = 0 | |
| requests_per_model: Dict[str, int] = field(default_factory=dict) | |
| _latencies: deque = field(default_factory=lambda: deque(maxlen=1000), repr=False) | |
| started_at: float = field(default_factory=time.time) | |
| lb_total_dispatches: int = 0 | |
| lb_failovers: int = 0 | |
| def record_request(self, success: bool, duration_ms: float, | |
| chars: int = 0, model: str = ""): | |
| with self._lock: | |
| self.total_requests += 1 | |
| if success: | |
| self.successful_requests += 1 | |
| self.total_chars_received += chars | |
| else: | |
| self.failed_requests += 1 | |
| self._latencies.append(duration_ms) | |
| if model: | |
| self.requests_per_model[model] = ( | |
| self.requests_per_model.get(model, 0) + 1 | |
| ) | |
| def record_retry(self): | |
| with self._lock: | |
| self.total_retries += 1 | |
| def record_lb_dispatch(self, failover: bool = False): | |
| with self._lock: | |
| self.lb_total_dispatches += 1 | |
| if failover: | |
| self.lb_failovers += 1 | |
| def to_dict(self) -> Dict: | |
| with self._lock: | |
| avg = (sum(self._latencies) / len(self._latencies) | |
| if self._latencies else 0) | |
| rate = (self.successful_requests / self.total_requests | |
| if self.total_requests else 1) | |
| return { | |
| "total_requests": self.total_requests, | |
| "successful": self.successful_requests, | |
| "failed": self.failed_requests, | |
| "success_rate": round(rate, 4), | |
| "retries": self.total_retries, | |
| "chars_received": self.total_chars_received, | |
| "avg_latency_ms": round(avg, 1), | |
| "active_streams": self.active_streams, | |
| "uptime_s": round(time.time() - self.started_at, 1), | |
| "per_model": dict(self.requests_per_model), | |
| "load_balancer": { | |
| "total_dispatches": self.lb_total_dispatches, | |
| "failovers": self.lb_failovers, | |
| }, | |
| } | |
| metrics = Metrics() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # RATE LIMITER β token bucket (10 req/s) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RateLimiter: | |
| def __init__(self, rps: int = 10, burst: int = 15): | |
| self.rate = float(rps) | |
| self.max_tokens = float(burst) | |
| self.tokens = float(burst) | |
| self.last_refill = time.monotonic() | |
| self._lock = threading.Lock() | |
| def acquire(self, timeout: float = 10.0) -> bool: | |
| deadline = time.monotonic() + timeout | |
| while True: | |
| with self._lock: | |
| now = time.monotonic() | |
| elapsed = now - self.last_refill | |
| self.tokens = min( | |
| self.max_tokens, | |
| self.tokens + elapsed * self.rate, | |
| ) | |
| self.last_refill = now | |
| if self.tokens >= 1.0: | |
| self.tokens -= 1.0 | |
| return True | |
| if time.monotonic() >= deadline: | |
| return False | |
| time.sleep(0.05) | |
| def get_info(self) -> Dict: | |
| with self._lock: | |
| return { | |
| "rate_rps": self.rate, | |
| "burst": self.max_tokens, | |
| "available_tokens": round(self.tokens, 2), | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CIRCUIT BREAKER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CircuitBreaker: | |
| def __init__(self, threshold: int = 5, recovery: int = 60): | |
| self.threshold = threshold | |
| self.recovery = recovery | |
| self.state = "closed" | |
| self.failures = 0 | |
| self.successes = 0 | |
| self.last_failure = 0.0 | |
| self._lock = threading.Lock() | |
| def can_execute(self) -> bool: | |
| with self._lock: | |
| if self.state == "closed": | |
| return True | |
| if self.state == "open": | |
| if time.time() - self.last_failure >= self.recovery: | |
| self.state = "half_open" | |
| return True | |
| return False | |
| return self.successes < 2 | |
| def record_success(self): | |
| with self._lock: | |
| if self.state == "half_open": | |
| self.successes += 1 | |
| if self.successes >= 2: | |
| self.state = "closed" | |
| self.failures = 0 | |
| self.successes = 0 | |
| else: | |
| self.failures = max(0, self.failures - 1) | |
| def record_failure(self): | |
| with self._lock: | |
| self.failures += 1 | |
| self.last_failure = time.time() | |
| if self.state == "half_open" or self.failures >= self.threshold: | |
| self.state = "open" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SSE PARSER (for GPT-OSS) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class GradioSSEParser: | |
| def parse_sse(response: requests.Response, | |
| log_raw: bool = False) -> Generator[Dict, None, None]: | |
| buffer = "" | |
| for chunk in response.iter_content(chunk_size=None, decode_unicode=True): | |
| if chunk is None: | |
| continue | |
| buffer += chunk | |
| while "\n" in buffer: | |
| line, buffer = buffer.split("\n", 1) | |
| line = line.strip() | |
| if not line or not line.startswith("data:"): | |
| continue | |
| data_str = line[5:].strip() | |
| if not data_str: | |
| continue | |
| try: | |
| yield json.loads(data_str) | |
| except json.JSONDecodeError: | |
| continue | |
| def extract_text(output: Dict) -> str: | |
| data = output.get("data", []) | |
| if not data: | |
| return "" | |
| first = data[0] | |
| if isinstance(first, str): | |
| return first | |
| if isinstance(first, list): | |
| try: | |
| if first and isinstance(first[0], list): | |
| return str(first[0][-1]) | |
| except (IndexError, TypeError): | |
| pass | |
| return "" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODEL PROVIDERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ModelProvider(ABC): | |
| def __init__(self, model_def: ModelDef, config: Config, instance_id: int = 0): | |
| self.model_def = model_def | |
| self.config = config | |
| self.instance_id = instance_id | |
| self.ready = False | |
| self._lock = threading.Lock() | |
| self._consecutive_failures = 0 | |
| self._last_success_time = 0.0 | |
| self._last_failure_time = 0.0 | |
| self._total_requests = 0 | |
| self._total_failures = 0 | |
| self._latencies: deque = deque(maxlen=50) | |
| def initialize(self) -> bool: ... | |
| def generate(self, message: str, history=None, system_prompt=None, | |
| temperature=None, max_tokens=None, images=None, **kwargs) -> str: ... | |
| def generate_stream(self, message: str, **kwargs) -> Generator[str, None, None]: | |
| yield self.generate(message, **kwargs) | |
| def record_success(self, latency_ms: float): | |
| self._consecutive_failures = 0 | |
| self._last_success_time = time.time() | |
| self._total_requests += 1 | |
| self._latencies.append(latency_ms) | |
| def record_failure(self): | |
| self._consecutive_failures += 1 | |
| self._last_failure_time = time.time() | |
| self._total_requests += 1 | |
| self._total_failures += 1 | |
| def avg_latency(self) -> float: | |
| return sum(self._latencies) / len(self._latencies) if self._latencies else 0.0 | |
| def health_score(self) -> float: | |
| if not self.ready: | |
| return 0.0 | |
| score = 1.0 | |
| score -= min(self._consecutive_failures * 0.2, 0.8) | |
| if self._latencies: | |
| avg = self.avg_latency | |
| if avg > 10000: | |
| score -= 0.3 | |
| elif avg > 5000: | |
| score -= 0.15 | |
| if self._total_requests > 5: | |
| fail_rate = self._total_failures / self._total_requests | |
| score -= fail_rate * 0.4 | |
| return max(0.0, min(1.0, score)) | |
| def get_instance_info(self) -> Dict: | |
| return { | |
| "instance_id": self.instance_id, | |
| "ready": self.ready, | |
| "health_score": round(self.health_score, 3), | |
| "consecutive_failures": self._consecutive_failures, | |
| "total_requests": self._total_requests, | |
| "total_failures": self._total_failures, | |
| "avg_latency_ms": round(self.avg_latency, 1), | |
| } | |
| class GptOssProvider(ModelProvider): | |
| def __init__(self, model_def, config, instance_id=0): | |
| super().__init__(model_def, config, instance_id) | |
| self._session = requests.Session() | |
| self._rotate() | |
| def _rotate(self): | |
| self._session.headers.update({ | |
| "User-Agent": random.choice(USER_AGENTS), | |
| "Accept-Language": "fr-FR,fr;q=0.9", | |
| "Origin": "https://gptunlimited.org", | |
| "Referer": "https://gptunlimited.org/", | |
| }) | |
| def _hash(self): | |
| return ''.join(random.choices(string.ascii_lowercase + string.digits, k=12)) | |
| def initialize(self) -> bool: | |
| with self._lock: | |
| if self.ready: | |
| return True | |
| self._rotate() | |
| try: | |
| r = self._session.get( | |
| f"{self.model_def.space_id}/gradio_api/info", timeout=15, | |
| ) | |
| self.ready = r.status_code == 200 | |
| return self.ready | |
| except Exception: | |
| return False | |
| def generate(self, message, history=None, system_prompt=None, | |
| temperature=None, max_tokens=None, images=None, **kw): | |
| if not self.ready: | |
| self.initialize() | |
| sys_p = system_prompt or self.config.default_system_prompt | |
| temp = (temperature if temperature is not None | |
| else self.model_def.default_temperature) | |
| h = self._hash() | |
| payload = { | |
| "data": [message, history or [], sys_p, temp], | |
| "event_data": None, | |
| "fn_index": self.model_def.fn_index, | |
| "trigger_id": None, | |
| "session_hash": h, | |
| } | |
| r = self._session.post( | |
| f"{self.model_def.space_id}/gradio_api/queue/join?", | |
| json=payload, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30, | |
| ) | |
| if r.status_code != 200: | |
| raise APIError(f"Queue join failed: {r.status_code}") | |
| data = r.json() | |
| if not data.get("event_id"): | |
| raise APIError("No event_id") | |
| resp = self._session.get( | |
| f"{self.model_def.space_id}/gradio_api/queue/data", | |
| params={"session_hash": h}, | |
| headers={"Accept": "text/event-stream"}, | |
| timeout=self.config.timeout_stream, | |
| stream=True, | |
| ) | |
| full = "" | |
| for d in GradioSSEParser.parse_sse(resp): | |
| msg = d.get("msg", "") | |
| if msg in ("process_generating", "process_completed"): | |
| output = d.get("output", {}) | |
| if not output.get("success", True): | |
| raise APIError(f"Gradio error: {output.get('error')}") | |
| t = GradioSSEParser.extract_text(output) | |
| if t: | |
| full = t | |
| if msg == "process_completed": | |
| break | |
| elif msg == "close_stream": | |
| break | |
| if not full.strip(): | |
| raise APIError("Empty response", "EMPTY") | |
| return (ResponseCleaner.clean_analysis(full) | |
| if self.model_def.clean_analysis else full) | |
| def generate_stream(self, message, history=None, system_prompt=None, | |
| temperature=None, max_tokens=None, images=None, **kw): | |
| if not self.ready: | |
| self.initialize() | |
| sys_p = system_prompt or self.config.default_system_prompt | |
| temp = (temperature if temperature is not None | |
| else self.model_def.default_temperature) | |
| h = self._hash() | |
| payload = { | |
| "data": [message, history or [], sys_p, temp], | |
| "event_data": None, | |
| "fn_index": self.model_def.fn_index, | |
| "trigger_id": None, | |
| "session_hash": h, | |
| } | |
| self._session.post( | |
| f"{self.model_def.space_id}/gradio_api/queue/join?", | |
| json=payload, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30, | |
| ) | |
| resp = self._session.get( | |
| f"{self.model_def.space_id}/gradio_api/queue/data", | |
| params={"session_hash": h}, | |
| headers={"Accept": "text/event-stream"}, | |
| timeout=self.config.timeout_stream, | |
| stream=True, | |
| ) | |
| metrics.active_streams += 1 | |
| last = "" | |
| try: | |
| for d in GradioSSEParser.parse_sse(resp): | |
| msg = d.get("msg", "") | |
| if msg in ("process_generating", "process_completed"): | |
| output = d.get("output", {}) | |
| if not output.get("success", True): | |
| raise APIError("Gradio error") | |
| raw = GradioSSEParser.extract_text(output) | |
| if raw: | |
| if self.model_def.clean_analysis: | |
| cleaned = ResponseCleaner.clean_analysis(raw) | |
| if cleaned and len(cleaned) > len(last): | |
| yield cleaned[len(last):] | |
| last = cleaned | |
| else: | |
| if len(raw) > len(last): | |
| yield raw[len(last):] | |
| last = raw | |
| if msg == "process_completed": | |
| return | |
| elif msg == "close_stream": | |
| return | |
| finally: | |
| metrics.active_streams = max(0, metrics.active_streams - 1) | |
| class GradioClientProvider(ModelProvider): | |
| """Generic provider for all gradio_client based models.""" | |
| def __init__(self, model_def, config, instance_id=0): | |
| super().__init__(model_def, config, instance_id) | |
| self._client = None | |
| self._chat_counter = 0 | |
| def initialize(self) -> bool: | |
| if not HAS_GRADIO_CLIENT: | |
| raise APIError("gradio_client not installed", "MISSING_DEP") | |
| with self._lock: | |
| if self.ready: | |
| return True | |
| try: | |
| log.info( | |
| f"[Instance {self.instance_id}] Connecting to " | |
| f"{self.model_def.space_id}..." | |
| ) | |
| self._client = GradioClient(self.model_def.space_id) | |
| self.ready = True | |
| return True | |
| except Exception as e: | |
| log.error( | |
| f"[Instance {self.instance_id}] Init failed for " | |
| f"{self.model_def.model_id}: {e}" | |
| ) | |
| return False | |
| def generate(self, message, history=None, system_prompt=None, | |
| temperature=None, max_tokens=None, images=None, **kw): | |
| if not self.ready: | |
| self.initialize() | |
| if not self._client: | |
| raise APIError(f"{self.model_def.model_id} not initialized") | |
| mid = self.model_def.model_id | |
| images = images or [] | |
| try: | |
| if mid == "command-a-vision": | |
| max_new = (max_tokens | |
| or self.model_def.extra_params.get("max_new_tokens", 700)) | |
| # Build multimodal message | |
| msg_payload: Any | |
| if images: | |
| img_path = save_image_temp(images[0]) | |
| if img_path: | |
| msg_payload = {"text": message, "files": [handle_file(img_path)]} | |
| else: | |
| msg_payload = {"text": message, "files": []} | |
| else: | |
| msg_payload = {"text": message, "files": []} | |
| result = self._client.predict( | |
| message=msg_payload, | |
| max_new_tokens=max_new, | |
| api_name=self.model_def.api_name, | |
| ) | |
| elif mid == "command-a-translate": | |
| max_new = (max_tokens | |
| or self.model_def.extra_params.get("max_new_tokens", 700)) | |
| result = self._client.predict( | |
| message=message, | |
| max_new_tokens=max_new, | |
| api_name=self.model_def.api_name, | |
| ) | |
| elif mid == "command-a-reasoning": | |
| thinking_budget = kw.get( | |
| "thinking_budget", | |
| self.model_def.extra_params.get("thinking_budget", 500), | |
| ) | |
| result = self._client.predict( | |
| message=message, | |
| thinking_budget=thinking_budget, | |
| api_name=self.model_def.api_name, | |
| ) | |
| return self._extract_reasoning(result) | |
| elif mid == "minimax-vl-01": | |
| temp = (temperature if temperature is not None | |
| else self.model_def.default_temperature) | |
| max_tok = (max_tokens | |
| or self.model_def.extra_params.get("max_tokens", 12800)) | |
| top_p = kw.get("top_p", | |
| self.model_def.extra_params.get("top_p", 0.9)) | |
| # Vision support | |
| if images: | |
| img_path = save_image_temp(images[0]) | |
| files = [handle_file(img_path)] if img_path else [] | |
| else: | |
| files = [] | |
| result = self._client.predict( | |
| message={"text": message, "files": files}, | |
| max_tokens=max_tok, temperature=temp, top_p=top_p, | |
| api_name=self.model_def.api_name, | |
| ) | |
| elif mid == "glm-4.5": | |
| sys_p = system_prompt or self.config.default_system_prompt | |
| temp = (temperature if temperature is not None | |
| else self.model_def.default_temperature) | |
| thinking = kw.get("thinking_enabled", | |
| self.model_def.thinking_default) | |
| include = kw.get("include_thinking", | |
| self.config.include_thinking) | |
| result = self._client.predict( | |
| msg=message, sys_prompt=sys_p, | |
| thinking_enabled=thinking, temperature=temp, | |
| api_name=self.model_def.api_name, | |
| ) | |
| return self._extract_glm(result, include) | |
| elif mid == "chatgpt": | |
| temp = (temperature if temperature is not None | |
| else self.model_def.default_temperature) | |
| top_p = kw.get("top_p", | |
| self.model_def.extra_params.get("top_p", 1.0)) | |
| chat_hist = [] | |
| if history: | |
| for pair in history: | |
| if isinstance(pair, (list, tuple)) and len(pair) == 2: | |
| chat_hist.append([str(pair[0]), str(pair[1])]) | |
| result = self._client.predict( | |
| inputs=message, top_p=top_p, temperature=temp, | |
| chat_counter=self._chat_counter, chatbot=chat_hist, | |
| api_name=self.model_def.api_name, | |
| ) | |
| self._chat_counter += 1 | |
| return ResponseCleaner.extract_chatgpt_text(result) | |
| elif mid == "qwen3-vl": | |
| # Vision support | |
| if images: | |
| img_path = save_image_temp(images[0]) | |
| files = [handle_file(img_path)] if img_path else [] | |
| result = self._client.predict( | |
| input_value={"files": files, "text": message}, | |
| api_name="/add_message", | |
| ) | |
| else: | |
| result = self._client.predict( | |
| input_value={"files": None, "text": message}, | |
| api_name="/add_message", | |
| ) | |
| return ResponseCleaner.extract_qwen_text(result) | |
| elif mid == "qwen2.5-coder": | |
| sys_override = self.model_def.extra_params.get( | |
| "system_prompt_override", "" | |
| ) | |
| if sys_override: | |
| try: | |
| self._client.predict( | |
| input=sys_override, | |
| api_name="/lambda_1", | |
| ) | |
| except Exception as e: | |
| log.warning(f"[qwen2.5-coder] Failed to set system prompt: {e}") | |
| result = self._client.predict( | |
| query=message, | |
| api_name="/generation_code", | |
| ) | |
| return ResponseCleaner.extract_qwen_coder_text(result) | |
| else: | |
| raise APIError(f"Unknown model handler: {mid}") | |
| if isinstance(result, str): | |
| return result.strip() | |
| if isinstance(result, dict): | |
| return json.dumps(result, ensure_ascii=False) | |
| if isinstance(result, (list, tuple)): | |
| return str(result[0]).strip() if result else "" | |
| return str(result) | |
| except APIError: | |
| raise | |
| except Exception as e: | |
| raise APIError(f"{mid} error: {e}", "PROVIDER_ERROR") | |
| def _extract_reasoning(self, result: Any) -> str: | |
| if result is None: | |
| return "" | |
| if isinstance(result, str): | |
| return result.strip() | |
| if isinstance(result, dict): | |
| for key in ("response", "output", "answer", "text", "content", "result"): | |
| if key in result: | |
| val = result[key] | |
| if isinstance(val, str): | |
| return val.strip() | |
| return str(val) | |
| thinking = result.get("thinking", "") | |
| response = result.get("response", result.get("output", "")) | |
| if thinking and response: | |
| return f"<thinking>\n{thinking}\n</thinking>\n{response}" | |
| if response: | |
| return str(response).strip() | |
| return json.dumps(result, ensure_ascii=False, indent=2) | |
| if isinstance(result, (list, tuple)): | |
| if len(result) == 1: | |
| return str(result[0]).strip() | |
| texts = [] | |
| for item in result: | |
| if isinstance(item, str) and item.strip(): | |
| texts.append(item.strip()) | |
| if texts: | |
| return "\n".join(texts) | |
| return json.dumps(result, ensure_ascii=False) | |
| if isinstance(result, (int, float, bool)): | |
| return str(result) | |
| return str(result) | |
| def _extract_glm(self, result, include_thinking: bool = True) -> str: | |
| if isinstance(result, tuple) and len(result) >= 1: | |
| chatbot = result[0] | |
| if isinstance(chatbot, list) and chatbot: | |
| for msg in reversed(chatbot): | |
| if isinstance(msg, dict) and msg.get("role") == "assistant": | |
| content = msg.get("content", "") | |
| raw = content if isinstance(content, str) else str(content) | |
| return ResponseCleaner.clean_glm(raw, include_thinking) | |
| last = chatbot[-1] | |
| if isinstance(last, dict): | |
| raw = last.get("content", "") | |
| raw = raw if isinstance(raw, str) else str(raw) | |
| return ResponseCleaner.clean_glm(raw, include_thinking) | |
| return ResponseCleaner.clean_glm(str(chatbot), include_thinking) | |
| if isinstance(result, str): | |
| return ResponseCleaner.clean_glm(result, include_thinking) | |
| return ResponseCleaner.clean_glm(str(result), include_thinking) | |
| def create_provider(model_id: str, config: Config, | |
| instance_id: int = 0) -> ModelProvider: | |
| if model_id not in MODEL_REGISTRY: | |
| raise ModelNotFoundError(model_id) | |
| mdef = MODEL_REGISTRY[model_id] | |
| if model_id == "gpt-oss-120b": | |
| return GptOssProvider(mdef, config, instance_id) | |
| return GradioClientProvider(mdef, config, instance_id) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # LOAD BALANCER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class LoadBalancedProviderPool: | |
| def __init__(self, model_id: str, config: Config): | |
| self.model_id = model_id | |
| self.config = config | |
| self.mdef = MODEL_REGISTRY[model_id] | |
| pool_size = self.mdef.lb_pool_size if self.mdef.lb_enabled else 1 | |
| self._instances: List[ModelProvider] = [] | |
| self._rr_index = 0 | |
| self._lock = threading.Lock() | |
| for i in range(pool_size): | |
| self._instances.append(create_provider(model_id, config, instance_id=i)) | |
| log.info( | |
| f"[LB] Created pool for '{model_id}' with {len(self._instances)} " | |
| f"instance(s), lb_enabled={self.mdef.lb_enabled}" | |
| ) | |
| def pool_size(self) -> int: | |
| return len(self._instances) | |
| def initialize_all(self) -> int: | |
| ok = 0 | |
| for inst in self._instances: | |
| try: | |
| if inst.initialize(): | |
| ok += 1 | |
| except Exception as e: | |
| log.warning( | |
| f"[LB] Failed to init {self.model_id} " | |
| f"instance {inst.instance_id}: {e}" | |
| ) | |
| return ok | |
| def initialize_one(self) -> bool: | |
| for inst in self._instances: | |
| try: | |
| if inst.initialize(): | |
| return True | |
| except Exception: | |
| continue | |
| return False | |
| def _select_instance(self) -> ModelProvider: | |
| if len(self._instances) == 1: | |
| return self._instances[0] | |
| with self._lock: | |
| scored = [] | |
| for inst in self._instances: | |
| score = inst.health_score | |
| scored.append((inst, max(score, 0.05))) | |
| total_weight = sum(s for _, s in scored) | |
| if total_weight <= 0: | |
| inst = self._instances[self._rr_index % len(self._instances)] | |
| self._rr_index += 1 | |
| return inst | |
| r = random.uniform(0, total_weight) | |
| cumulative = 0.0 | |
| for inst, weight in scored: | |
| cumulative += weight | |
| if r <= cumulative: | |
| return inst | |
| return scored[-1][0] | |
| def _get_ordered_instances(self) -> List[ModelProvider]: | |
| return sorted(self._instances, key=lambda p: p.health_score, reverse=True) | |
| def execute(self, fn_name: str, **kwargs) -> Any: | |
| primary = self._select_instance() | |
| metrics.record_lb_dispatch() | |
| if not primary.ready: | |
| try: | |
| primary.initialize() | |
| except Exception: | |
| pass | |
| start = time.monotonic() | |
| try: | |
| result = self._call_provider(primary, fn_name, **kwargs) | |
| latency = (time.monotonic() - start) * 1000 | |
| primary.record_success(latency) | |
| return result | |
| except Exception as primary_err: | |
| primary.record_failure() | |
| log.warning( | |
| f"[LB] Primary instance {primary.instance_id} for " | |
| f"'{self.model_id}' failed: {primary_err}" | |
| ) | |
| for inst in self._get_ordered_instances(): | |
| if inst is primary: | |
| continue | |
| if not inst.ready: | |
| try: | |
| inst.initialize() | |
| except Exception: | |
| continue | |
| metrics.record_lb_dispatch(failover=True) | |
| start = time.monotonic() | |
| try: | |
| result = self._call_provider(inst, fn_name, **kwargs) | |
| latency = (time.monotonic() - start) * 1000 | |
| inst.record_success(latency) | |
| log.info( | |
| f"[LB] Failover to instance {inst.instance_id} " | |
| f"for '{self.model_id}' succeeded" | |
| ) | |
| return result | |
| except Exception as e: | |
| inst.record_failure() | |
| log.warning( | |
| f"[LB] Failover instance {inst.instance_id} " | |
| f"for '{self.model_id}' failed: {e}" | |
| ) | |
| raise APIError( | |
| f"All {len(self._instances)} instances for '{self.model_id}' failed", | |
| "ALL_INSTANCES_FAILED", | |
| ) | |
| def execute_stream(self, **kwargs) -> Generator[str, None, None]: | |
| primary = self._select_instance() | |
| metrics.record_lb_dispatch() | |
| if not primary.ready: | |
| try: | |
| primary.initialize() | |
| except Exception: | |
| pass | |
| try: | |
| yield from self._call_provider_stream(primary, **kwargs) | |
| return | |
| except Exception as primary_err: | |
| primary.record_failure() | |
| log.warning( | |
| f"[LB] Stream primary instance {primary.instance_id} " | |
| f"for '{self.model_id}' failed: {primary_err}" | |
| ) | |
| for inst in self._get_ordered_instances(): | |
| if inst is primary: | |
| continue | |
| if not inst.ready: | |
| try: | |
| inst.initialize() | |
| except Exception: | |
| continue | |
| metrics.record_lb_dispatch(failover=True) | |
| try: | |
| yield from self._call_provider_stream(inst, **kwargs) | |
| return | |
| except Exception as e: | |
| inst.record_failure() | |
| log.warning( | |
| f"[LB] Stream failover instance {inst.instance_id} " | |
| f"for '{self.model_id}' failed: {e}" | |
| ) | |
| raise APIError( | |
| f"All streaming instances for '{self.model_id}' failed", | |
| "ALL_INSTANCES_FAILED", | |
| ) | |
| def _call_provider(self, provider: ModelProvider, fn_name: str, | |
| **kwargs) -> Any: | |
| if not provider.ready: | |
| provider.initialize() | |
| fn = getattr(provider, fn_name) | |
| return fn(**kwargs) | |
| def _call_provider_stream(self, provider: ModelProvider, | |
| **kwargs) -> Generator[str, None, None]: | |
| if not provider.ready: | |
| provider.initialize() | |
| start = time.monotonic() | |
| try: | |
| yield from provider.generate_stream(**kwargs) | |
| latency = (time.monotonic() - start) * 1000 | |
| provider.record_success(latency) | |
| except Exception: | |
| provider.record_failure() | |
| raise | |
| def get_pool_info(self) -> Dict: | |
| return { | |
| "model_id": self.model_id, | |
| "lb_enabled": self.mdef.lb_enabled, | |
| "pool_size": len(self._instances), | |
| "is_beta": self.mdef.is_beta, | |
| "instances": [inst.get_instance_info() for inst in self._instances], | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MULTI-MODEL CLIENT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MultiModelClient: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self._lb_pools: Dict[str, LoadBalancedProviderPool] = {} | |
| self._lock = threading.Lock() | |
| self._conversations: Dict[str, Conversation] = {} | |
| self._active_conv_id: Optional[str] = None | |
| self._current_model = config.default_model | |
| self.rate_limiter = RateLimiter(config.rate_limit_rps, config.rate_limit_burst) | |
| self.circuit_breaker = CircuitBreaker() | |
| def current_model(self): | |
| return self._current_model | |
| def current_model(self, m): | |
| if m not in MODEL_REGISTRY: | |
| raise ModelNotFoundError(m) | |
| self._current_model = m | |
| def _get_lb_pool(self, model_id: str) -> LoadBalancedProviderPool: | |
| if model_id not in self._lb_pools: | |
| with self._lock: | |
| if model_id not in self._lb_pools: | |
| self._lb_pools[model_id] = LoadBalancedProviderPool( | |
| model_id, self.config | |
| ) | |
| return self._lb_pools[model_id] | |
| def _ensure_ready(self, model_id: str) -> LoadBalancedProviderPool: | |
| lb_pool = self._get_lb_pool(model_id) | |
| has_ready = any(inst.ready for inst in lb_pool._instances) | |
| if not has_ready: | |
| if not lb_pool.initialize_one(): | |
| raise APIError(f"Cannot init any instance for {model_id}", | |
| "INIT_FAILED") | |
| return lb_pool | |
| def active_conversation(self) -> Conversation: | |
| if self._active_conv_id not in self._conversations: | |
| conv = Conversation( | |
| system_prompt=self.config.default_system_prompt, | |
| model_id=self._current_model, | |
| ) | |
| self._conversations[conv.conversation_id] = conv | |
| self._active_conv_id = conv.conversation_id | |
| return self._conversations[self._active_conv_id] | |
| def new_conversation(self, system_prompt=None, | |
| model_id=None) -> Conversation: | |
| conv = Conversation( | |
| system_prompt=system_prompt or self.config.default_system_prompt, | |
| model_id=model_id or self._current_model, | |
| ) | |
| self._conversations[conv.conversation_id] = conv | |
| self._active_conv_id = conv.conversation_id | |
| return conv | |
| def init_model(self, model_id: str) -> bool: | |
| try: | |
| lb_pool = self._get_lb_pool(model_id) | |
| return lb_pool.initialize_one() | |
| except Exception: | |
| return False | |
| def init_model_all(self, model_id: str) -> int: | |
| try: | |
| lb_pool = self._get_lb_pool(model_id) | |
| return lb_pool.initialize_all() | |
| except Exception: | |
| return 0 | |
| def send_message( | |
| self, | |
| message: Any, # str OR list (multimodal) | |
| *, | |
| stream: bool = False, | |
| model: Optional[str] = None, | |
| conversation_id: Optional[str] = None, | |
| system_prompt: Optional[str] = None, | |
| temperature: Optional[float] = None, | |
| max_tokens: Optional[int] = None, | |
| include_thinking: Optional[bool] = None, | |
| images: Optional[List[str]] = None, | |
| **kwargs, | |
| ) -> Union[str, Generator]: | |
| model_id = model or self._current_model | |
| if model_id not in MODEL_REGISTRY: | |
| raise ModelNotFoundError(model_id) | |
| mdef = MODEL_REGISTRY[model_id] | |
| # ββ Normalise multimodal content ββββββββββββββββββββββ | |
| if isinstance(message, list): | |
| text, extracted_images = extract_text_and_images(message) | |
| if not images: | |
| images = extracted_images | |
| message = text | |
| if isinstance(message, str): | |
| message = message.strip() | |
| else: | |
| message = str(message).strip() | |
| if not message and not images: | |
| raise APIError("Empty message", "INVALID_INPUT", 400) | |
| if len(message) > self.config.max_message_length: | |
| raise APIError("Message too long", "INVALID_INPUT", 400) | |
| if not self.circuit_breaker.can_execute(): | |
| raise APIError("Circuit breaker open", "CIRCUIT_OPEN", 503) | |
| if not self.rate_limiter.acquire(timeout=10.0): | |
| raise APIError("Rate limited (10 req/s max)", "RATE_LIMITED", 429) | |
| conv = (self._conversations.get(conversation_id, self.active_conversation) | |
| if conversation_id else self.active_conversation) | |
| conv.model_id = model_id | |
| if system_prompt: | |
| conv.system_prompt = system_prompt | |
| history = conv.build_gradio_history() if mdef.supports_history else None | |
| conv.add_message("user", message, self.config.max_history_messages) | |
| eff_temp = (temperature if temperature is not None | |
| else mdef.default_temperature) | |
| eff_sys = conv.system_prompt if mdef.supports_system_prompt else None | |
| eff_thinking = (include_thinking if include_thinking is not None | |
| else self.config.include_thinking) | |
| extra = dict(kwargs) | |
| if mdef.supports_thinking: | |
| extra["include_thinking"] = eff_thinking | |
| start = time.monotonic() | |
| for attempt in range(self.config.max_retries + 1): | |
| try: | |
| if attempt > 0: | |
| time.sleep( | |
| self.config.retry_backoff_base ** attempt | |
| + random.uniform(0, self.config.retry_jitter) | |
| ) | |
| metrics.record_retry() | |
| lb_pool = self._ensure_ready(model_id) | |
| if stream and mdef.supports_streaming: | |
| gen = lb_pool.execute_stream( | |
| message=message, | |
| history=history, | |
| system_prompt=eff_sys, | |
| temperature=eff_temp, | |
| max_tokens=max_tokens, | |
| images=images, | |
| **extra, | |
| ) | |
| return self._wrap_stream(gen, conv, start, model_id) | |
| result = lb_pool.execute( | |
| "generate", | |
| message=message, | |
| history=history, | |
| system_prompt=eff_sys, | |
| temperature=eff_temp, | |
| max_tokens=max_tokens, | |
| images=images, | |
| **extra, | |
| ) | |
| dur = (time.monotonic() - start) * 1000 | |
| thinking, response = ThinkingParser.split(result) | |
| conv.add_message("assistant", response, | |
| self.config.max_history_messages, | |
| thinking=thinking) | |
| metrics.record_request(True, dur, len(result), model_id) | |
| self.circuit_breaker.record_success() | |
| return result | |
| except APIError: | |
| self.circuit_breaker.record_failure() | |
| if attempt == self.config.max_retries: | |
| dur = (time.monotonic() - start) * 1000 | |
| metrics.record_request(False, dur, model=model_id) | |
| raise | |
| except Exception as e: | |
| self.circuit_breaker.record_failure() | |
| if attempt == self.config.max_retries: | |
| dur = (time.monotonic() - start) * 1000 | |
| metrics.record_request(False, dur, model=model_id) | |
| raise APIError(str(e)) | |
| def _wrap_stream(self, gen, conv, start, model_id): | |
| full = "" | |
| try: | |
| for chunk in gen: | |
| full += chunk | |
| yield chunk | |
| thinking, response = ThinkingParser.split(full) | |
| conv.add_message("assistant", response, | |
| self.config.max_history_messages, | |
| thinking=thinking) | |
| metrics.record_request( | |
| True, (time.monotonic() - start) * 1000, | |
| len(full), model_id, | |
| ) | |
| self.circuit_breaker.record_success() | |
| except Exception: | |
| metrics.record_request( | |
| False, (time.monotonic() - start) * 1000, model=model_id, | |
| ) | |
| self.circuit_breaker.record_failure() | |
| raise | |
| def get_status(self) -> Dict: | |
| lb_info = {} | |
| for model_id, lb_pool in self._lb_pools.items(): | |
| lb_info[model_id] = lb_pool.get_pool_info() | |
| return { | |
| "version": VERSION, | |
| "current_model": self._current_model, | |
| "models": list(MODEL_REGISTRY.keys()), | |
| "load_balancer": lb_info, | |
| "conversations": len(self._conversations), | |
| "circuit_breaker": self.circuit_breaker.state, | |
| "rate_limiter": self.rate_limiter.get_info(), | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SESSION POOL | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SessionPool: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self._clients = [ | |
| MultiModelClient(config) for _ in range(config.pool_size) | |
| ] | |
| self._idx = 0 | |
| self._lock = threading.Lock() | |
| def init_default(self): | |
| for c in self._clients: | |
| c.init_model(self.config.default_model) | |
| def init_model(self, model_id: str) -> int: | |
| total = 0 | |
| for c in self._clients: | |
| total += c.init_model_all(model_id) | |
| return total | |
| def acquire(self) -> MultiModelClient: | |
| with self._lock: | |
| c = self._clients[self._idx % len(self._clients)] | |
| self._idx += 1 | |
| return c | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ALIAS RESOLVER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ALIASES = { | |
| "gpt-oss": "gpt-oss-120b", "gptoss": "gpt-oss-120b", "amd": "gpt-oss-120b", | |
| "command-a": "command-a-vision", "command-vision": "command-a-vision", | |
| "cohere-vision": "command-a-vision", | |
| "command-translate": "command-a-translate", | |
| "cohere-translate": "command-a-translate", "translate": "command-a-translate", | |
| "command-reasoning": "command-a-reasoning", "reasoning": "command-a-reasoning", | |
| "cohere-reasoning": "command-a-reasoning", "command-r": "command-a-reasoning", | |
| "minimax": "minimax-vl-01", "minimax-vl": "minimax-vl-01", | |
| "glm": "glm-4.5", "glm4": "glm-4.5", "glm-4": "glm-4.5", "zhipu": "glm-4.5", | |
| "gpt": "chatgpt", "gpt-3.5": "chatgpt", "gpt3": "chatgpt", "openai": "chatgpt", | |
| "qwen": "qwen3-vl", "qwen3": "qwen3-vl", "qwen-vl": "qwen3-vl", | |
| "qwen-coder": "qwen2.5-coder", "qwen2.5": "qwen2.5-coder", | |
| "qwen25-coder": "qwen2.5-coder", "coder": "qwen2.5-coder", | |
| } | |
| def resolve_alias(model_id: str) -> str: | |
| if not model_id: | |
| return config.default_model | |
| return ALIASES.get(model_id.lower(), model_id) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FLASK APP | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| config = Config.from_env() | |
| pool = SessionPool(config) | |
| pool.init_default() | |
| app = Flask(APP_NAME) | |
| def cors(response): | |
| response.headers["Access-Control-Allow-Origin"] = "*" | |
| response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization" | |
| response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" | |
| return response | |
| def handle_api_error(e: APIError): | |
| return jsonify({"ok": False, **e.to_dict()}), e.status | |
| def index(): | |
| return jsonify({ | |
| "name": APP_NAME, | |
| "version": VERSION, | |
| "default_model": config.default_model, | |
| "features": ["load_balancing", "10_req_per_second_limit", "failover", "vision"], | |
| "models": list(MODEL_REGISTRY.keys()), | |
| "beta_models": [mid for mid, mdef in MODEL_REGISTRY.items() if mdef.is_beta], | |
| "vision_models": [mid for mid, mdef in MODEL_REGISTRY.items() if mdef.supports_vision], | |
| "endpoints": { | |
| "POST /chat": "Chat with any model", | |
| "POST /chat/stream": "Streaming chat", | |
| "POST /v1/chat/completions": "OpenAI-compatible (supports vision)", | |
| "GET /v1/models": "List models", | |
| "POST /models/init": "Init a model", | |
| "GET /health": "Health check", | |
| "GET /metrics": "Metrics", | |
| "GET /lb/status": "Load balancer status", | |
| }, | |
| }) | |
| def chat(): | |
| data = freq.get_json(force=True, silent=True) or {} | |
| raw_message = data.get("message", "") | |
| images = data.get("images", []) | |
| # Support multimodal content directly in message field | |
| if isinstance(raw_message, list): | |
| text, extracted = extract_text_and_images(raw_message) | |
| images = images or extracted | |
| message = text | |
| else: | |
| message = str(raw_message).strip() | |
| if not message and not images: | |
| return jsonify({"ok": False, "error": "'message' required"}), 400 | |
| model_id = resolve_alias(data.get("model", config.default_model)) | |
| include_thinking = data.get("include_thinking", config.include_thinking) | |
| client = pool.acquire() | |
| if data.get("new_conversation"): | |
| client.new_conversation(data.get("system_prompt"), model_id) | |
| extra = {} | |
| if model_id == "command-a-reasoning" and "thinking_budget" in data: | |
| extra["thinking_budget"] = data["thinking_budget"] | |
| result = client.send_message( | |
| message, model=model_id, | |
| system_prompt=data.get("system_prompt"), | |
| temperature=data.get("temperature"), | |
| max_tokens=data.get("max_tokens"), | |
| include_thinking=include_thinking, | |
| images=images or None, | |
| **extra, | |
| ) | |
| thinking, clean = ThinkingParser.split(result) | |
| mdef = MODEL_REGISTRY.get(model_id) | |
| resp = { | |
| "ok": True, | |
| "response": clean, | |
| "model": model_id, | |
| "conversation_id": client.active_conversation.conversation_id, | |
| "history_size": len(client.active_conversation.messages), | |
| } | |
| if thinking: | |
| resp["thinking"] = thinking | |
| if mdef and mdef.is_beta: | |
| resp["beta"] = True | |
| return jsonify(resp) | |
| def chat_stream(): | |
| data = freq.get_json(force=True, silent=True) or {} | |
| raw_message = data.get("message", "") | |
| images = data.get("images", []) | |
| if isinstance(raw_message, list): | |
| text, extracted = extract_text_and_images(raw_message) | |
| images = images or extracted | |
| message = text | |
| else: | |
| message = str(raw_message).strip() | |
| if not message and not images: | |
| return jsonify({"ok": False, "error": "'message' required"}), 400 | |
| model_id = resolve_alias(data.get("model", config.default_model)) | |
| include_thinking = data.get("include_thinking", config.include_thinking) | |
| client = pool.acquire() | |
| if data.get("new_conversation"): | |
| client.new_conversation(data.get("system_prompt"), model_id) | |
| mdef = MODEL_REGISTRY.get(model_id) | |
| use_stream = mdef.supports_streaming if mdef else False | |
| extra = {} | |
| if model_id == "command-a-reasoning" and "thinking_budget" in data: | |
| extra["thinking_budget"] = data["thinking_budget"] | |
| def generate(): | |
| try: | |
| if use_stream: | |
| for chunk in client.send_message( | |
| message, stream=True, model=model_id, | |
| system_prompt=data.get("system_prompt"), | |
| temperature=data.get("temperature"), | |
| max_tokens=data.get("max_tokens"), | |
| include_thinking=include_thinking, | |
| images=images or None, | |
| **extra, | |
| ): | |
| yield f"data: {json.dumps({'chunk': chunk})}\n\n" | |
| else: | |
| result = client.send_message( | |
| message, model=model_id, | |
| system_prompt=data.get("system_prompt"), | |
| temperature=data.get("temperature"), | |
| max_tokens=data.get("max_tokens"), | |
| include_thinking=include_thinking, | |
| images=images or None, | |
| **extra, | |
| ) | |
| yield f"data: {json.dumps({'chunk': result})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except APIError as e: | |
| yield f"data: {json.dumps(e.to_dict())}\n\n" | |
| return Response(stream_with_context(generate()), | |
| content_type="text/event-stream") | |
| def list_models(): | |
| models = [] | |
| for mid, mdef in MODEL_REGISTRY.items(): | |
| model_info = { | |
| "id": mid, | |
| "object": "model", | |
| "owned_by": mdef.owned_by, | |
| "created": 0, | |
| "description": mdef.description, | |
| "capabilities": { | |
| "vision": mdef.supports_vision, | |
| "streaming": mdef.supports_streaming, | |
| "system_prompt": mdef.supports_system_prompt, | |
| "temperature": mdef.supports_temperature, | |
| "history": mdef.supports_history, | |
| "thinking": mdef.supports_thinking, | |
| }, | |
| "load_balancing": { | |
| "enabled": mdef.lb_enabled, | |
| "pool_size": mdef.lb_pool_size, | |
| }, | |
| } | |
| if mdef.is_beta: | |
| model_info["beta"] = True | |
| models.append(model_info) | |
| return jsonify({"object": "list", "data": models}) | |
| def openai_compat(): | |
| if freq.method == "OPTIONS": | |
| return "", 200 | |
| data = freq.get_json(force=True, silent=True) or {} | |
| messages = data.get("messages", []) | |
| do_stream = data.get("stream", False) | |
| temperature = data.get("temperature") | |
| max_tokens = data.get("max_tokens") | |
| model_id = resolve_alias(data.get("model", config.default_model)) | |
| include_thinking = data.get("include_thinking", config.include_thinking) | |
| if model_id not in MODEL_REGISTRY: | |
| return jsonify({ | |
| "error": { | |
| "message": f"Model '{model_id}' not found. Available: {list(MODEL_REGISTRY.keys())}", | |
| "type": "invalid_request_error", | |
| "available_models": list(MODEL_REGISTRY.keys()), | |
| } | |
| }), 404 | |
| if not messages: | |
| return jsonify({"error": {"message": "messages required"}}), 400 | |
| # ββ Extract user message, system prompt, and images βββββββ | |
| user_msg: str = "" | |
| system_prompt: Optional[str] = None | |
| images: List[str] = [] | |
| for msg in messages: | |
| role = msg.get("role", "") | |
| content = msg.get("content", "") | |
| if role == "system": | |
| system_prompt = content if isinstance(content, str) else str(content) | |
| if role == "user": | |
| if isinstance(content, list): | |
| text, imgs = extract_text_and_images(content) | |
| user_msg = text | |
| images.extend(imgs) | |
| elif isinstance(content, str): | |
| user_msg = content | |
| else: | |
| user_msg = str(content) | |
| if not user_msg and not images: | |
| return jsonify({"error": {"message": "No user message"}}), 400 | |
| rid = f"chatcmpl-{uuid.uuid4().hex[:29]}" | |
| created = int(time.time()) | |
| client = pool.acquire() | |
| client.new_conversation(system_prompt, model_id) | |
| # Replay history (all but the last user message) | |
| for msg in messages[:-1]: | |
| role = msg.get("role") | |
| content = msg.get("content", "") | |
| if role in ("user", "assistant") and content: | |
| text = ( | |
| extract_text_and_images(content)[0] | |
| if isinstance(content, list) | |
| else str(content) | |
| ) | |
| if text: | |
| client.active_conversation.add_message(role, text) | |
| mdef = MODEL_REGISTRY[model_id] | |
| extra = {} | |
| if model_id == "command-a-reasoning" and "thinking_budget" in data: | |
| extra["thinking_budget"] = data["thinking_budget"] | |
| if do_stream: | |
| def generate(): | |
| try: | |
| yield f"data: {json.dumps({'id': rid, 'object': 'chat.completion.chunk', 'created': created, 'model': model_id, 'choices': [{'index': 0, 'delta': {'role': 'assistant'}, 'finish_reason': None}]})}\n\n" | |
| if mdef.supports_streaming: | |
| for chunk in client.send_message( | |
| user_msg, stream=True, model=model_id, | |
| temperature=temperature, max_tokens=max_tokens, | |
| include_thinking=include_thinking, | |
| images=images or None, **extra, | |
| ): | |
| yield f"data: {json.dumps({'id': rid, 'object': 'chat.completion.chunk', 'created': created, 'model': model_id, 'choices': [{'index': 0, 'delta': {'content': chunk}, 'finish_reason': None}]})}\n\n" | |
| else: | |
| result = client.send_message( | |
| user_msg, model=model_id, temperature=temperature, | |
| max_tokens=max_tokens, include_thinking=include_thinking, | |
| images=images or None, **extra, | |
| ) | |
| yield f"data: {json.dumps({'id': rid, 'object': 'chat.completion.chunk', 'created': created, 'model': model_id, 'choices': [{'index': 0, 'delta': {'content': result}, 'finish_reason': None}]})}\n\n" | |
| yield f"data: {json.dumps({'id': rid, 'object': 'chat.completion.chunk', 'created': created, 'model': model_id, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| yield f"data: {json.dumps({'error': {'message': str(e)}})}\n\n" | |
| return Response(stream_with_context(generate()), | |
| content_type="text/event-stream") | |
| result = client.send_message( | |
| user_msg, model=model_id, temperature=temperature, | |
| max_tokens=max_tokens, include_thinking=include_thinking, | |
| images=images or None, **extra, | |
| ) | |
| return jsonify({ | |
| "id": rid, | |
| "object": "chat.completion", | |
| "created": created, | |
| "model": model_id, | |
| "choices": [{ | |
| "index": 0, | |
| "message": {"role": "assistant", "content": result}, | |
| "finish_reason": "stop", | |
| }], | |
| "usage": { | |
| "prompt_tokens": len(user_msg) // 4, | |
| "completion_tokens": len(result) // 4, | |
| "total_tokens": (len(user_msg) + len(result)) // 4, | |
| }, | |
| }) | |
| def new_conv(): | |
| data = freq.get_json(force=True, silent=True) or {} | |
| model_id = resolve_alias(data.get("model", config.default_model)) | |
| client = pool.acquire() | |
| conv = client.new_conversation(data.get("system_prompt"), model_id) | |
| return jsonify({ | |
| "ok": True, | |
| "conversation_id": conv.conversation_id, | |
| "model": model_id, | |
| }) | |
| def health(): | |
| client = pool.acquire() | |
| return jsonify(client.get_status()) | |
| def metrics_endpoint(): | |
| return jsonify(metrics.to_dict()) | |
| def lb_status(): | |
| all_pools = {} | |
| for client in pool._clients: | |
| for model_id, lb_pool in client._lb_pools.items(): | |
| key = model_id | |
| if key not in all_pools: | |
| all_pools[key] = [] | |
| all_pools[key].append(lb_pool.get_pool_info()) | |
| return jsonify({ | |
| "ok": True, | |
| "version": VERSION, | |
| "rate_limit": f"{config.rate_limit_rps} req/s", | |
| "models": all_pools, | |
| }) | |
| def conversations(): | |
| client = pool.acquire() | |
| return jsonify({ | |
| "conversations": [c.to_dict() for c in client._conversations.values()] | |
| }) | |
| def init_model_ep(): | |
| data = freq.get_json(force=True, silent=True) or {} | |
| model_id = resolve_alias(data.get("model", "")) | |
| if not model_id or model_id not in MODEL_REGISTRY: | |
| return jsonify({ | |
| "ok": False, | |
| "error": f"Unknown model. Available: {list(MODEL_REGISTRY.keys())}", | |
| }), 400 | |
| count = pool.init_model(model_id) | |
| mdef = MODEL_REGISTRY[model_id] | |
| resp = { | |
| "ok": True, | |
| "model": model_id, | |
| "initialized_instances": count, | |
| "lb_enabled": mdef.lb_enabled, | |
| "pool_size_per_client": mdef.lb_pool_size, | |
| } | |
| if mdef.is_beta: | |
| resp["beta"] = True | |
| return jsonify(resp) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ENTRY POINT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| log.info(f"Starting {APP_NAME} v{VERSION} on port {port}") | |
| log.info(f"Models: {list(MODEL_REGISTRY.keys())}") | |
| log.info(f"Rate limit: {config.rate_limit_rps} req/s (burst: {config.rate_limit_burst})") | |
| for mid, mdef in MODEL_REGISTRY.items(): | |
| lb_str = ( | |
| f"LB ON (pool={mdef.lb_pool_size})" | |
| if mdef.lb_enabled | |
| else "LB OFF (single instance)" | |
| ) | |
| vision_str = " [VISION]" if mdef.supports_vision else "" | |
| beta_str = " [BETA]" if mdef.is_beta else "" | |
| log.info(f" {mid}: {lb_str}{vision_str}{beta_str}") | |
| app.run(host="0.0.0.0", port=port, threaded=True) |