Spaces:
Sleeping
Sleeping
v8.1.0: fix tool call cutoff - never emit incomplete tool calls, auto-continue properly
aa3153c verified | """ | |
| Haiku API - OpenAI-compatible proxy for chatgpt.org/claude/chat | |
| Deploy to Hugging Face Spaces (Docker SDK) | |
| Features: | |
| - Tool/function calling support (always detects tool call tags in output) | |
| - Auto-continues when upstream hits the ~1K token output limit | |
| - Rotating proxy with direct-connection fallback | |
| - SSE keep-alive comments during continuation gaps | |
| - Message normalization for Orchids.app compatibility | |
| - Robust error handling with proper JSON error responses | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import time | |
| import uuid | |
| import traceback | |
| from typing import Optional | |
| from urllib.parse import unquote | |
| import httpx | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| app = FastAPI(title="Haiku API", version="8.1.0") | |
| # ββ CORS βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ββ Proxy Config βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PROXY_URL = os.environ.get("PROXY_URL", "") | |
| PROXY_MAX_RETRIES = 4 # rotating proxy: try a few IPs | |
| PROXY_RETRY_DELAY = 1 # seconds between proxy retries | |
| CONNECT_TIMEOUT = 10.0 # short connect timeout | |
| READ_TIMEOUT = 120.0 # long read timeout (for streaming responses) | |
| def _make_client(use_proxy: bool = True) -> httpx.AsyncClient: | |
| """Create an httpx client, with or without proxy.""" | |
| kwargs = dict( | |
| verify=False, | |
| timeout=httpx.Timeout(READ_TIMEOUT, connect=CONNECT_TIMEOUT), | |
| ) | |
| if use_proxy and PROXY_URL: | |
| kwargs["proxy"] = PROXY_URL | |
| return httpx.AsyncClient(**kwargs) | |
| # ββ Session State ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class SessionState: | |
| def __init__(self): | |
| self.xsrf_token: Optional[str] = None | |
| self.csrf_token: Optional[str] = None | |
| self.cookies: Optional[httpx.Cookies] = None | |
| self.last_refresh: float = 0 | |
| self.refresh_interval: float = 600 | |
| self._lock = asyncio.Lock() | |
| async def refresh(self, client: httpx.AsyncClient): | |
| async with self._lock: | |
| now = time.time() | |
| if self.cookies and (now - self.last_refresh) < self.refresh_interval: | |
| return | |
| # Try with proxy first, then fallback to direct | |
| for use_proxy in [True, False]: | |
| if use_proxy and not PROXY_URL: | |
| continue | |
| working_client = client | |
| for attempt in range(PROXY_MAX_RETRIES if use_proxy else 2): | |
| try: | |
| if attempt > 0: | |
| try: | |
| await working_client.aclose() | |
| except: | |
| pass | |
| working_client = _make_client(use_proxy=use_proxy) | |
| resp = await working_client.get( | |
| "https://chatgpt.org/claude/chat", | |
| follow_redirects=True, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| }, | |
| timeout=20.0, | |
| ) | |
| if resp.status_code != 200: | |
| print(f"[Session] GET returned {resp.status_code} (proxy={use_proxy}, attempt {attempt+1})") | |
| await asyncio.sleep(PROXY_RETRY_DELAY) | |
| continue | |
| new_cookies = httpx.Cookies() | |
| for name, value in resp.cookies.items(): | |
| new_cookies.set(name, value, domain="chatgpt.org") | |
| for header in resp.headers.get_list("set-cookie"): | |
| parts = header.split(";")[0] | |
| if "=" in parts: | |
| k, v = parts.split("=", 1) | |
| new_cookies.set(k.strip(), v.strip(), domain="chatgpt.org") | |
| xsrf = new_cookies.get("XSRF-TOKEN", domain="chatgpt.org") | |
| if xsrf: | |
| xsrf = unquote(xsrf) | |
| csrf = None | |
| m = re.search(r'<meta\s+name="csrf-token"\s+content="([^"]+)"', resp.text) | |
| if m: | |
| csrf = m.group(1) | |
| self.cookies = new_cookies | |
| self.xsrf_token = xsrf | |
| self.csrf_token = csrf | |
| self.last_refresh = now | |
| mode = "proxy" if use_proxy else "direct" | |
| print(f"[Session] OK ({mode}) β CSRF:{bool(csrf)} XSRF:{bool(xsrf)} Cookies:{list(new_cookies.keys())}") | |
| return working_client | |
| except (httpx.ConnectError, httpx.ProxyError, httpx.TimeoutException) as e: | |
| print(f"[Session] Connection error (proxy={use_proxy}, attempt {attempt+1}): {type(e).__name__}") | |
| await asyncio.sleep(PROXY_RETRY_DELAY) | |
| continue | |
| except Exception as e: | |
| print(f"[Session] Error (proxy={use_proxy}, attempt {attempt+1}): {type(e).__name__}: {e}") | |
| await asyncio.sleep(PROXY_RETRY_DELAY) | |
| continue | |
| print("[Session] WARNING: All refresh attempts failed (both proxy and direct)") | |
| session = SessionState() | |
| # ββ HTTP Client ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| http_client: Optional[httpx.AsyncClient] = None | |
| async def startup(): | |
| global http_client | |
| http_client = _make_client(use_proxy=bool(PROXY_URL)) | |
| result = await session.refresh(http_client) | |
| if result is not None: | |
| http_client = result | |
| async def shutdown(): | |
| if http_client: | |
| await http_client.aclose() | |
| # ββ Tool Calling Support βββββββββββββββββββββββββββββββββββββββββ | |
| # We support TWO tool call formats that models may output: | |
| # | |
| # Format 1 β Inline JSON (simple): | |
| # <tool_call name="Write">{"file_path": "hello.js", "content": "hi"}</tool_call_> | |
| # <function_call name="Write">{"file_path": "hello.js"}</function_call> | |
| # | |
| # Format 2 β Anthropic XML (Claude's native format): | |
| # <function_calls> | |
| # <invoke name="Write"> | |
| # <parameter name="file_path">hello.js</parameter> | |
| # <parameter name="content">console.log("hi")</parameter> | |
| # </invoke> | |
| # </function_calls> | |
| # Regex for Format 1: inline JSON tool calls | |
| _TOOL_CALL_INLINE_RE = re.compile( | |
| r'<(?:function_call|tool_call)\s+name="([^"]+)">\s*(.*?)\s*</(?:function_call|tool_call)_?>', | |
| re.DOTALL | |
| ) | |
| # Regex for Format 2: Anthropic XML function_calls blocks | |
| _ANTHROPIC_FC_BLOCK_RE = re.compile( | |
| r'<function_calls>\s*(.*?)\s*</function_calls>', | |
| re.DOTALL | |
| ) | |
| # Within a block, match each <invoke name="...">...</invoke> | |
| _ANTHROPIC_INVOKE_RE = re.compile( | |
| r'<invoke\s+name="([^"]+)">\s*(.*?)\s*</invoke>', | |
| re.DOTALL | |
| ) | |
| # Within an invoke, match each <parameter name="...">value</parameter> | |
| _ANTHROPIC_PARAM_RE = re.compile( | |
| r'<parameter\s+name="([^"]+)">(.*?)</parameter>', | |
| re.DOTALL | |
| ) | |
| def _parse_json_args(args_str: str) -> str: | |
| """Try to parse arguments as JSON, with fallbacks. Returns JSON string.""" | |
| try: | |
| args_json = json.loads(args_str) | |
| return json.dumps(args_json) | |
| except json.JSONDecodeError: | |
| pass | |
| # Try to fix common issues | |
| args_cleaned = args_str.strip('`').strip() | |
| if args_cleaned.startswith('json'): | |
| args_cleaned = args_cleaned[4:].strip() | |
| try: | |
| args_json = json.loads(args_cleaned) | |
| return json.dumps(args_json) | |
| except json.JSONDecodeError: | |
| pass | |
| # Last resort: wrap the raw text as an argument | |
| return json.dumps({"raw_input": args_str}) | |
| def _parse_tool_calls(text: str) -> tuple[list[dict], str]: | |
| """Parse tool calls from model text output. | |
| Supports two formats: | |
| 1. Inline JSON: <tool_call name="X">JSON</tool_call_> | |
| 2. Anthropic XML: <function_calls><invoke name="X"><parameter name="p">v</parameter></invoke></function_calls> | |
| Returns (tool_calls, remaining_text) where tool_calls is in OpenAI format. | |
| If no tool calls found, returns ([], original_text). | |
| """ | |
| tool_calls = [] | |
| consumed_spans = [] | |
| # --- Format 1: Inline JSON tool calls --- | |
| for match in _TOOL_CALL_INLINE_RE.finditer(text): | |
| func_name = match.group(1) | |
| args_str = match.group(2).strip() | |
| args_final = _parse_json_args(args_str) | |
| tool_calls.append({ | |
| "id": f"call_{uuid.uuid4().hex[:24]}", | |
| "type": "function", | |
| "function": { | |
| "name": func_name, | |
| "arguments": args_final, | |
| } | |
| }) | |
| consumed_spans.append((match.start(), match.end())) | |
| # --- Format 2: Anthropic XML function_calls --- | |
| for block_match in _ANTHROPIC_FC_BLOCK_RE.finditer(text): | |
| block_text = block_match.group(1) | |
| consumed_spans.append((block_match.start(), block_match.end())) | |
| for invoke_match in _ANTHROPIC_INVOKE_RE.finditer(block_text): | |
| func_name = invoke_match.group(1) | |
| invoke_body = invoke_match.group(2) | |
| params = {} | |
| for param_match in _ANTHROPIC_PARAM_RE.finditer(invoke_body): | |
| param_name = param_match.group(1) | |
| param_value = param_match.group(2) | |
| try: | |
| params[param_name] = json.loads(param_value) | |
| except (json.JSONDecodeError, ValueError): | |
| params[param_name] = param_value | |
| tool_calls.append({ | |
| "id": f"call_{uuid.uuid4().hex[:24]}", | |
| "type": "function", | |
| "function": { | |
| "name": func_name, | |
| "arguments": json.dumps(params), | |
| } | |
| }) | |
| if not tool_calls: | |
| return [], text | |
| # Extract remaining text (not part of any tool call) | |
| remaining_parts = [] | |
| prev_end = 0 | |
| for start, end in sorted(consumed_spans): | |
| if start > prev_end: | |
| chunk = text[prev_end:start].strip() | |
| if chunk: | |
| remaining_parts.append(chunk) | |
| prev_end = max(prev_end, end) | |
| if prev_end < len(text): | |
| chunk = text[prev_end:].strip() | |
| if chunk: | |
| remaining_parts.append(chunk) | |
| remaining_text = "\n".join(remaining_parts) | |
| return tool_calls, remaining_text | |
| def _has_incomplete_tool_call(text: str) -> bool: | |
| """Check if text has an opening tool call tag without a matching close.""" | |
| # Inline format | |
| inline_opens = len(re.findall(r'<(?:function_call|tool_call)\s+name="[^"]+">', text)) | |
| inline_closes = len(re.findall(r'</(?:function_call|tool_call)_?>', text)) | |
| if inline_opens > inline_closes: | |
| return True | |
| # Anthropic XML format | |
| if text.count('<function_calls>') > text.count('</function_calls>'): | |
| return True | |
| invoke_opens = len(re.findall(r'<invoke\s+name="[^"]+">', text)) | |
| if invoke_opens > text.count('</invoke>'): | |
| return True | |
| return False | |
| def _strip_incomplete_tool_tags(text: str) -> str: | |
| """Remove incomplete tool call XML tags from text. | |
| This prevents raw XML tags from leaking into delta.content | |
| when auto-continue fails to complete a tool call.""" | |
| # Remove incomplete Anthropic XML blocks | |
| # e.g. "<function_calls>\n<invoke name="Write">\n<parameter name="content">some unfinished..." | |
| text = re.sub( | |
| r'<function_calls>\s*<invoke[^>]*>.*', | |
| '', text, flags=re.DOTALL | |
| ) | |
| # Remove incomplete inline JSON tool calls | |
| text = re.sub( | |
| r'<(?:function_call|tool_call)\s+name="[^"]+">.*', | |
| '', text, flags=re.DOTALL | |
| ) | |
| # Remove any stray opening/closing tags | |
| text = re.sub(r'</?function_calls>\s*', '', text) | |
| text = re.sub(r'</?invoke[^>]*>\s*', '', text) | |
| text = re.sub(r'</?parameter[^>]*>\s*', '', text) | |
| text = re.sub(r'</?(?:function_call|tool_call)_?>\s*', '', text) | |
| return text.strip() | |
| # ββ Tool System Prompt Builder ββββββββββββββββββββββββββββββββββ | |
| def _build_tool_system_prompt(tools: list[dict], tool_choice=None) -> str: | |
| """Convert OpenAI tools/functions format to a system prompt using | |
| Anthropic XML format β the format Claude natively understands.""" | |
| invoke_blocks = [] | |
| tool_names = [] | |
| for tool in tools: | |
| if "function" in tool: | |
| func = tool["function"] | |
| else: | |
| func = tool | |
| name = func.get("name", "unknown") | |
| desc = func.get("description", "No description") | |
| params = func.get("parameters", {}) | |
| tool_names.append(name) | |
| props = params.get("properties", {}) | |
| required = params.get("required", []) | |
| param_lines = [] | |
| for pname, pdef in props.items(): | |
| ptype = pdef.get("type", "any") | |
| pdesc = pdef.get("description", "") | |
| req = " (required)" if pname in required else "" | |
| param_lines.append(f'<parameter name="{pname}">{ptype}{req} β {pdesc}</parameter>') | |
| params_xml = '\n'.join(param_lines) if param_lines else '' | |
| invoke_blocks.append(f"""<tool_description name="{name}"> | |
| {desc} | |
| Parameters: | |
| {params_xml} | |
| </tool_description>""") | |
| tools_xml = '\n\n'.join(invoke_blocks) | |
| choice_instruction = "" | |
| if tool_choice == "required": | |
| choice_instruction = "\nIMPORTANT: You MUST call at least one tool." | |
| elif tool_choice == "none": | |
| return "" | |
| elif isinstance(tool_choice, dict) and tool_choice.get("type") == "function": | |
| fname = tool_choice.get("function", {}).get("name", "") | |
| choice_instruction = f"\nIMPORTANT: You MUST call the {fname} function." | |
| return f"""In this environment you have access to a set of tools you can use to answer the user's question. | |
| {tools_xml} | |
| ## Tool Call Format | |
| When you need to call a tool, use this EXACT XML format: | |
| <function_calls> | |
| <invoke name="FUNCTION_NAME"> | |
| <parameter name="param_name">value</parameter> | |
| </invoke> | |
| </function_calls> | |
| You may call multiple tools by using multiple <invoke> blocks inside a single <function_calls> block, or by using multiple <function_calls> blocks. | |
| - The parameter values should be the actual values, NOT JSON-encoded strings | |
| - Do NOT wrap tool calls in markdown code blocks | |
| - If you don't need to call any tools, just respond normally with text{choice_instruction}""" | |
| # ββ Message normalization ββββββββββββββββββββββββββββββββββββββββ | |
| def _flatten_content_array(content: list) -> str: | |
| """Convert a content array to plain text.""" | |
| text_parts = [] | |
| for part in content: | |
| if isinstance(part, str): | |
| text_parts.append(part) | |
| elif isinstance(part, dict): | |
| if part.get("type") == "text": | |
| text_parts.append(part.get("text", "")) | |
| return "\n".join(text_parts) | |
| def normalize_messages(messages: list[dict], tools: list[dict] = None, tool_choice=None) -> list[dict]: | |
| """Normalize messages: handle content arrays, tool roles, tool_calls, | |
| and inject tool definitions into system prompt if tools are provided.""" | |
| result = [] | |
| tool_system = None | |
| if tools and tool_choice != "none": | |
| tool_system = _build_tool_system_prompt(tools, tool_choice) | |
| system_injected = False | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if isinstance(content, list): | |
| content = _flatten_content_array(content) | |
| if content is None: | |
| content = "" | |
| content = str(content) | |
| # Handle tool role messages | |
| if role == "tool": | |
| tool_name = msg.get("name", "unknown_tool") | |
| tool_call_id = msg.get("tool_call_id", "") | |
| result.append({ | |
| "role": "user", | |
| "content": f"[Tool Result for {tool_name} (id: {tool_call_id})]:\n{content}" | |
| }) | |
| continue | |
| # Handle assistant messages with tool_calls | |
| if role == "assistant" and msg.get("tool_calls"): | |
| parts = [] | |
| regular_content = content if content and content.strip() else "" | |
| if regular_content: | |
| parts.append(regular_content) | |
| invoke_parts = [] | |
| for tc in msg["tool_calls"]: | |
| func = tc.get("function", {}) | |
| name = func.get("name", "unknown") | |
| args = func.get("arguments", "{}") | |
| try: | |
| args_json = json.loads(args) | |
| except (json.JSONDecodeError, TypeError): | |
| args_json = {} | |
| invoke_lines = [f'<invoke name="{name}">'] | |
| for k, v in args_json.items(): | |
| invoke_lines.append(f'<parameter name="{k}">{v}</parameter>') | |
| invoke_lines.append('</invoke>') | |
| invoke_parts.append('\n'.join(invoke_lines)) | |
| fc_content = '<function_calls>\n' + '\n'.join(invoke_parts) + '\n</function_calls>' | |
| combined = regular_content + '\n\n' + fc_content if regular_content else fc_content | |
| result.append({"role": "assistant", "content": combined}) | |
| continue | |
| # Inject tool system prompt into the first system message | |
| if role == "system" and not system_injected and tool_system: | |
| combined = content + '\n\n' + tool_system if content.strip() else tool_system | |
| result.append({"role": "system", "content": combined}) | |
| system_injected = True | |
| continue | |
| if role == "system" and not content.strip(): | |
| continue | |
| result.append({"role": role, "content": content}) | |
| if tool_system and not system_injected: | |
| result.insert(0, {"role": "system", "content": tool_system}) | |
| return result | |
| # ββ Headers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _headers() -> dict: | |
| h = { | |
| "Accept": "*/*", | |
| "Content-Type": "application/json", | |
| "Origin": "https://chatgpt.org", | |
| "Referer": "https://chatgpt.org/claude/chat", | |
| "X-Requested-With": "XMLHttpRequest", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36", | |
| } | |
| csrf = session.csrf_token or session.xsrf_token | |
| if csrf: | |
| h["X-CSRF-TOKEN"] = csrf | |
| return h | |
| # ββ Proxy-aware request with retry + direct fallback ββββββββββββββ | |
| async def _proxy_post(url: str, **kwargs) -> httpx.Response: | |
| """POST with proxy retry logic, falling back to direct connection.""" | |
| global http_client | |
| # Try with proxy first | |
| if PROXY_URL: | |
| for attempt in range(PROXY_MAX_RETRIES): | |
| try: | |
| resp = await http_client.post(url, **kwargs) | |
| return resp | |
| except (httpx.ConnectError, httpx.ProxyError, httpx.TimeoutException) as e: | |
| print(f"[Proxy] Connection error #{attempt+1}: {type(e).__name__}") | |
| try: | |
| await http_client.aclose() | |
| except: | |
| pass | |
| http_client = _make_client(use_proxy=True) | |
| await asyncio.sleep(PROXY_RETRY_DELAY) | |
| continue | |
| # Fallback: try direct connection | |
| print("[Proxy] Falling back to direct connection") | |
| direct_client = _make_client(use_proxy=False) | |
| try: | |
| resp = await direct_client.post(url, **kwargs) | |
| return resp | |
| finally: | |
| await direct_client.aclose() | |
| # ββ Raw call with retries βββββββββββββββββββββββββββββββββββββββ | |
| async def _raw_call(messages: list[dict], model: str) -> httpx.Response: | |
| """Make a single POST to chatgpt.org/api/chat with full retry logic.""" | |
| await session.refresh(http_client) | |
| payload = {"model": model, "messages": messages} | |
| for attempt in range(2): # CSRF retry | |
| for rate_attempt in range(3): # 429 retry | |
| try: | |
| resp = await _proxy_post( | |
| "https://chatgpt.org/api/chat", | |
| json=payload, | |
| headers=_headers(), | |
| cookies=session.cookies, | |
| ) | |
| except (httpx.ConnectError, httpx.ProxyError, httpx.TimeoutException) as e: | |
| print(f"[Chat] Connection failed: {type(e).__name__}") | |
| session.last_refresh = 0 | |
| raise HTTPException(502, f"Cannot reach upstream: {type(e).__name__}") | |
| if resp.status_code == 419 and attempt == 0: | |
| print("[Chat] 419 -> refreshing session...") | |
| session.last_refresh = 0 | |
| await session.refresh(http_client) | |
| break | |
| if resp.status_code == 429: | |
| wait_time = (rate_attempt + 1) * 10 | |
| print(f"[Chat] 429 rate limited, waiting {wait_time}s (attempt {rate_attempt+1}/3)...") | |
| session.last_refresh = 0 | |
| await session.refresh(http_client) | |
| if rate_attempt < 2: | |
| await asyncio.sleep(wait_time) | |
| continue | |
| raise HTTPException(429, f"Rate limited by upstream after {rate_attempt+1} retries") | |
| if resp.status_code != 200: | |
| session.last_refresh = 0 | |
| raise HTTPException(resp.status_code, f"Upstream {resp.status_code}: {resp.text[:300]}") | |
| return resp | |
| raise HTTPException(500, "Failed after retry") | |
| async def _stream_one_response(resp): | |
| """Stream a single upstream SSE response in real-time. | |
| Yields (text, finish_reason) tuples. finish_reason is None for text chunks.""" | |
| finish_reason = None | |
| try: | |
| async for raw_line in resp.aiter_lines(): | |
| line = raw_line.strip() | |
| if not line or line.startswith(":"): | |
| continue | |
| if not line.startswith("data: "): | |
| continue | |
| payload_str = line[6:] | |
| if payload_str.strip() == "[DONE]": | |
| break | |
| try: | |
| chunk = json.loads(payload_str) | |
| except json.JSONDecodeError: | |
| continue | |
| for choice in chunk.get("choices", []): | |
| delta = choice.get("delta", {}) | |
| c = delta.get("content", "") | |
| if c: | |
| yield c, None | |
| fr = choice.get("finish_reason") | |
| if fr: | |
| if fr in ("stop", "end_turn"): | |
| finish_reason = "stop" | |
| elif fr in ("length", "max_tokens"): | |
| finish_reason = "length" | |
| except (httpx.ReadError, httpx.RemoteProtocolError) as e: | |
| print(f"[Stream] Connection lost during streaming: {type(e).__name__}") | |
| except Exception as e: | |
| print(f"[Stream] Error during streaming: {type(e).__name__}: {e}") | |
| yield "", finish_reason | |
| # ββ Streaming with auto-continue ββββββββββββββββββββββββββββββββ | |
| MAX_CONTINUATIONS = 20 | |
| async def _raw_call_streaming(messages: list[dict], model: str): | |
| """Like _raw_call but yields SSE keep-alive comments during retries, | |
| then yields the httpx.Response object.""" | |
| await session.refresh(http_client) | |
| payload = {"model": model, "messages": messages} | |
| for attempt in range(2): # CSRF retry | |
| for rate_attempt in range(3): # 429 retry | |
| yield ": thinking...\n\n" | |
| try: | |
| resp = await _proxy_post( | |
| "https://chatgpt.org/api/chat", | |
| json=payload, | |
| headers=_headers(), | |
| cookies=session.cookies, | |
| ) | |
| except (httpx.ConnectError, httpx.ProxyError, httpx.TimeoutException) as e: | |
| print(f"[Chat] Connection failed: {type(e).__name__}") | |
| session.last_refresh = 0 | |
| raise HTTPException(502, f"Cannot reach upstream: {type(e).__name__}") | |
| if resp.status_code == 419 and attempt == 0: | |
| print("[Chat] 419 -> refreshing session...") | |
| session.last_refresh = 0 | |
| await session.refresh(http_client) | |
| break | |
| if resp.status_code == 429: | |
| wait_time = (rate_attempt + 1) * 10 | |
| print(f"[Chat] 429 rate limited, waiting {wait_time}s (attempt {rate_attempt+1}/3)...") | |
| session.last_refresh = 0 | |
| await session.refresh(http_client) | |
| if rate_attempt < 2: | |
| for _ in range(wait_time): | |
| yield ": retrying...\n\n" | |
| await asyncio.sleep(1) | |
| continue | |
| raise HTTPException(429, f"Rate limited after {rate_attempt+1} retries") | |
| if resp.status_code != 200: | |
| session.last_refresh = 0 | |
| raise HTTPException(resp.status_code, f"Upstream {resp.status_code}: {resp.text[:300]}") | |
| yield resp | |
| return | |
| raise HTTPException(500, "Failed after retry") | |
| def _emit_tool_call_chunks(chunk_id: str, created: int, model: str, tool_calls: list[dict], remaining_text: str): | |
| """Generate OpenAI streaming chunks for tool calls. Returns list of SSE strings.""" | |
| chunks = [] | |
| for i, tc in enumerate(tool_calls): | |
| # First chunk: role + tool_call with id, name, and start of arguments | |
| sse_start = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": { | |
| "role": "assistant", | |
| "tool_calls": [{ | |
| "index": i, | |
| "id": tc["id"], | |
| "type": "function", | |
| "function": { | |
| "name": tc["function"]["name"], | |
| "arguments": "", | |
| } | |
| }] | |
| }, | |
| "finish_reason": None, | |
| }], | |
| }) | |
| chunks.append(f"data: {sse_start}\n\n") | |
| # Argument chunks | |
| args = tc["function"]["arguments"] | |
| chunk_size = max(1, len(args) // 3) | |
| for offset in range(0, len(args), chunk_size): | |
| arg_piece = args[offset:offset + chunk_size] | |
| sse_arg = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": { | |
| "tool_calls": [{ | |
| "index": i, | |
| "function": { | |
| "arguments": arg_piece, | |
| } | |
| }] | |
| }, | |
| "finish_reason": None, | |
| }], | |
| }) | |
| chunks.append(f"data: {sse_arg}\n\n") | |
| # Remaining text alongside tool calls | |
| if remaining_text.strip(): | |
| sse_text = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"content": remaining_text}, | |
| "finish_reason": None, | |
| }], | |
| }) | |
| chunks.append(f"data: {sse_text}\n\n") | |
| # Final chunk with finish_reason | |
| sse_done = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {}, | |
| "finish_reason": "tool_calls", | |
| }], | |
| }) | |
| chunks.append(f"data: {sse_done}\n\n") | |
| chunks.append("data: [DONE]\n\n") | |
| return chunks | |
| async def _stream_with_auto_continue(messages: list[dict], model: str): | |
| """Stream with real-time output, auto-continue, and keep-alive pings. | |
| ALWAYS buffers the full response to detect tool call tags. | |
| If tool calls are found AND complete, emits them as proper OpenAI tool_calls chunks. | |
| If tool calls are incomplete, auto-continues to collect the rest. | |
| If no tool calls, emits the text as regular content chunks. | |
| """ | |
| chunk_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" | |
| created = int(time.time()) | |
| conversation = list(messages) | |
| total_content = "" | |
| for cont_num in range(MAX_CONTINUATIONS): | |
| yield ": thinking...\n\n" | |
| resp = None | |
| try: | |
| async for result in _raw_call_streaming(conversation, model): | |
| if isinstance(result, str): | |
| yield result | |
| else: | |
| resp = result | |
| except HTTPException as e: | |
| error_data = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"content": f"\n\n[Error: {e.detail}]"}, | |
| "finish_reason": None, | |
| }], | |
| }) | |
| yield f"data: {error_data}\n\n" | |
| yield f"data: {json.dumps({'id': chunk_id, 'object': 'chat.completion.chunk', 'created': created, 'model': model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return | |
| if resp is None: | |
| yield f"data: {json.dumps({'id': chunk_id, 'object': 'chat.completion.chunk', 'created': created, 'model': model, 'choices': [{'index': 0, 'delta': {'content': '[Error: No response from upstream]'}, 'finish_reason': None}]})}\n\n" | |
| yield f"data: {json.dumps({'id': chunk_id, 'object': 'chat.completion.chunk', 'created': created, 'model': model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return | |
| finish_reason = "stop" | |
| chunk_content = "" | |
| # Buffer the full response | |
| async for text, fr in _stream_one_response(resp): | |
| if fr is not None: | |
| finish_reason = fr | |
| continue | |
| if text: | |
| chunk_content += text | |
| total_content += text | |
| yield ": streaming...\n\n" | |
| print(f"[Chat] Chunk #{cont_num+1}: {len(chunk_content)} chars, finish={finish_reason}") | |
| # Check for tool calls in the accumulated text | |
| tool_calls, remaining_text = _parse_tool_calls(total_content) | |
| has_incomplete = _has_incomplete_tool_call(total_content) | |
| print(f"[Chat] tool_calls={len(tool_calls)} incomplete={has_incomplete} finish={finish_reason}") | |
| # ββ Decision tree ββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # 1. If we have COMPLETE tool calls AND no incomplete tags β emit & done | |
| # 2. If we have incomplete tool calls (regardless of complete ones) β auto-continue | |
| # 3. If no tool calls and finish_reason == "stop" and no incomplete tags β emit text & done | |
| # 4. If no tool calls and finish_reason == "stop" but HAS incomplete tags β auto-continue | |
| # (the upstream might report "stop" even when cut off mid-tag) | |
| # 5. If finish_reason == "length" β auto-continue | |
| if tool_calls and not has_incomplete: | |
| # All tool calls are complete β emit them | |
| print(f"[Chat] Emitting {len(tool_calls)} complete tool call(s)") | |
| for sse_chunk in _emit_tool_call_chunks(chunk_id, created, model, tool_calls, remaining_text): | |
| yield sse_chunk | |
| return | |
| if has_incomplete: | |
| # Incomplete tool calls detected β must auto-continue | |
| print(f"[Chat] Incomplete tool call detected, auto-continuing...") | |
| yield ": continuing...\n\n" | |
| conversation.append({"role": "assistant", "content": chunk_content}) | |
| conversation.append({"role": "user", "content": "Continue the tool call exactly from where you left off. Do not repeat the opening tag or any arguments you already wrote. Just continue outputting the parameter values from where you stopped."}) | |
| print(f"[Chat] Auto-continue (incomplete) #{cont_num+1}, total so far: {len(total_content)} chars") | |
| continue | |
| # No tool calls and no incomplete tags | |
| if finish_reason == "stop": | |
| # Regular text response β emit as content | |
| chunk_sz = 50 | |
| for offset in range(0, len(total_content), chunk_sz): | |
| piece = total_content[offset:offset + chunk_sz] | |
| sse_data = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"content": piece}, | |
| "finish_reason": None, | |
| }], | |
| }) | |
| yield f"data: {sse_data}\n\n" | |
| sse_data = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {}, | |
| "finish_reason": "stop", | |
| }], | |
| }) | |
| yield f"data: {sse_data}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return | |
| # finish_reason == "length" β auto-continue for regular text | |
| yield ": continuing...\n\n" | |
| conversation.append({"role": "assistant", "content": chunk_content}) | |
| conversation.append({"role": "user", "content": "Continue exactly from where you left off. Do not repeat any text you already wrote."}) | |
| print(f"[Chat] Auto-continue (length) #{cont_num+1}, total so far: {len(total_content)} chars") | |
| # Safety: max continuations reached β try to emit whatever we have | |
| tool_calls, remaining_text = _parse_tool_calls(total_content) | |
| if tool_calls: | |
| # Best-effort: emit whatever tool calls we managed to parse | |
| print(f"[Chat] Max continuations reached, emitting {len(tool_calls)} partial tool call(s)") | |
| for sse_chunk in _emit_tool_call_chunks(chunk_id, created, model, tool_calls, remaining_text): | |
| yield sse_chunk | |
| else: | |
| # Emit whatever text we have | |
| # Strip any incomplete tool call XML from the output to avoid raw tags in content | |
| clean_content = _strip_incomplete_tool_tags(total_content) | |
| if clean_content.strip(): | |
| chunk_sz = 50 | |
| for offset in range(0, len(clean_content), chunk_sz): | |
| piece = clean_content[offset:offset + chunk_sz] | |
| sse_data = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {"content": piece}, | |
| "finish_reason": None, | |
| }], | |
| }) | |
| yield f"data: {sse_data}\n\n" | |
| sse_data = json.dumps({ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": created, | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "delta": {}, | |
| "finish_reason": "stop", | |
| }], | |
| }) | |
| yield f"data: {sse_data}\n\n" | |
| yield "data: [DONE]\n\n" | |
| # ββ Non-streaming with auto-continue ββββββββββββββββββββββββββββ | |
| async def _collect_with_auto_continue(messages: list[dict], model: str) -> dict: | |
| """Collect the full response, auto-continuing if cut off.""" | |
| conversation = list(messages) | |
| full_content = "" | |
| for cont_num in range(MAX_CONTINUATIONS): | |
| resp = await _raw_call(conversation, model) | |
| content = "" | |
| finish_reason = "stop" | |
| async for text, fr in _stream_one_response(resp): | |
| if fr is not None: | |
| finish_reason = fr | |
| continue | |
| if text: | |
| content += text | |
| full_content += content | |
| print(f"[Chat] Collect #{cont_num+1}: {len(content)} chars, finish={finish_reason}") | |
| # Always check for tool calls | |
| tool_calls, remaining_text = _parse_tool_calls(full_content) | |
| if tool_calls: | |
| if _has_incomplete_tool_call(full_content) and finish_reason == "length": | |
| pass | |
| else: | |
| return { | |
| "tool_calls": tool_calls, | |
| "content": remaining_text if remaining_text.strip() else None, | |
| } | |
| if finish_reason == "stop": | |
| return {"content": full_content, "tool_calls": None} | |
| # Auto-continue | |
| if _has_incomplete_tool_call(content): | |
| conversation.append({"role": "assistant", "content": content}) | |
| conversation.append({"role": "user", "content": "Continue the tool call exactly from where you left off. Do not repeat the opening tag or any arguments you already wrote."}) | |
| else: | |
| conversation.append({"role": "assistant", "content": content}) | |
| conversation.append({"role": "user", "content": "Continue exactly from where you left off. Do not repeat any text you already wrote."}) | |
| return {"content": full_content, "tool_calls": None} | |
| # ββ OpenAI-compatible endpoint ββββββββββββββββββββββββββββββββββ | |
| async def chat_completions(request: Request): | |
| try: | |
| body = await request.json() | |
| except Exception: | |
| raise HTTPException(400, "Invalid JSON") | |
| if not isinstance(body, dict): | |
| raise HTTPException(400, "Body must be a JSON object") | |
| model = body.get("model", "anthropic/claude-haiku-4-5") | |
| messages_raw = body.get("messages", []) | |
| stream = body.get("stream", False) | |
| # Extract tools | |
| tools = body.get("tools") or body.get("functions") or None | |
| tool_choice = body.get("tool_choice", "auto") | |
| # Convert old 'functions' format | |
| if tools and "function" not in tools[0] and "name" in tools[0]: | |
| tools = [{"type": "function", "function": f} for f in tools] | |
| print(f"[Request] model={model} stream={stream} tools={bool(tools)} tool_choice={tool_choice} msgs={len(messages_raw)}") | |
| if not messages_raw or not isinstance(messages_raw, list): | |
| raise HTTPException(400, "messages must be a non-empty array") | |
| messages = normalize_messages(messages_raw, tools=tools, tool_choice=tool_choice) | |
| if not messages: | |
| raise HTTPException(400, "No valid messages after normalization") | |
| try: | |
| if stream: | |
| return StreamingResponse( | |
| _stream_with_auto_continue(messages, model), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| else: | |
| result = await _collect_with_auto_continue(messages, model) | |
| tool_calls = result.get("tool_calls") | |
| content = result.get("content") | |
| if tool_calls: | |
| return JSONResponse({ | |
| "id": f"chatcmpl-{int(time.time())}", | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": content, | |
| "tool_calls": tool_calls, | |
| }, | |
| "finish_reason": "tool_calls", | |
| }], | |
| "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, | |
| }) | |
| else: | |
| return JSONResponse({ | |
| "id": f"chatcmpl-{int(time.time())}", | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [{ | |
| "index": 0, | |
| "message": {"role": "assistant", "content": content or ""}, | |
| "finish_reason": "stop", | |
| }], | |
| "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"[Request] Unhandled error: {type(e).__name__}: {e}") | |
| print(traceback.format_exc()) | |
| raise HTTPException(500, f"Internal error: {type(e).__name__}") | |
| # ββ Models / Health βββββββββββββββββββββββββββββββββββββββββββββ | |
| async def list_models(): | |
| return JSONResponse({ | |
| "object": "list", | |
| "data": [ | |
| {"id": "anthropic/claude-haiku-4-5", "object": "model", "owned_by": "anthropic"}, | |
| ], | |
| }) | |
| async def root(): | |
| return { | |
| "status": "ok", | |
| "version": "8.1.0", | |
| "proxy": bool(PROXY_URL), | |
| "tool_calling": True, | |
| "endpoints": ["/v1/chat/completions", "/v1/models"], | |
| } | |
| async def health(): | |
| return { | |
| "status": "ok", | |
| "session_active": bool(session.cookies), | |
| "proxy": bool(PROXY_URL), | |
| } | |
| async def force_refresh(): | |
| global http_client | |
| session.last_refresh = 0 | |
| result = await session.refresh(http_client) | |
| if result is not None: | |
| http_client = result | |
| return { | |
| "refreshed": True, | |
| "has_cookies": bool(session.cookies), | |
| "has_csrf": bool(session.csrf_token), | |
| "proxy": bool(PROXY_URL), | |
| } | |
| async def debug_session(): | |
| return { | |
| "has_cookies": bool(session.cookies), | |
| "cookie_names": list(session.cookies.keys()) if session.cookies else [], | |
| "has_csrf": bool(session.csrf_token), | |
| "has_xsrf": bool(session.xsrf_token), | |
| "last_refresh_ago": int(time.time() - session.last_refresh) if session.last_refresh else None, | |
| "proxy": bool(PROXY_URL), | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |