from flask import Flask, request, Response, jsonify, render_template_string import requests import json import uuid import time import os import re import base64 import tempfile import mimetypes app = Flask(__name__) # Configuration ONYX_BASE_URL = os.environ.get("ONYX_BASE_URL", "https://cloud.onyx.app") ONYX_API_TOKEN = os.environ.get("ONYX_SECRET", "") # Store chat sessions chat_sessions_cache = {} # Store uploaded files metadata files_cache = {} def get_headers(): """Get authorization headers""" return { "Authorization": f"Bearer {ONYX_API_TOKEN}", "Content-Type": "application/json" } def create_chat_session(persona_id=0): """Create a new chat session in Onyx""" url = f"{ONYX_BASE_URL}/api/chat/create-chat-session" payload = { "persona_id": persona_id, "description": "OpenAI Compatible API Session" } try: response = requests.post(url, json=payload, headers=get_headers(), timeout=30) if response.status_code == 200: data = response.json() session_id = data.get('chat_session_id') or data.get('id') or data print(f"Created chat session: {session_id}") return str(session_id) else: print(f"Failed to create chat session: {response.status_code} - {response.text}") return None except Exception as e: print(f"Error creating chat session: {e}") return None def get_or_create_session(session_key="default", persona_id=0): """Get existing session or create new one""" if session_key not in chat_sessions_cache: session_id = create_chat_session(persona_id) if session_id: chat_sessions_cache[session_key] = { "session_id": session_id, "parent_message_id": None } return chat_sessions_cache.get(session_key) def parse_model_string(model): """ Parse model string in format 'provider/model_version' Examples: - 'openai/gpt-4' -> ('openai', 'gpt-4') - 'anthropic/claude-3-opus' -> ('anthropic', 'claude-3-opus') - 'gpt-4' -> ('openai', 'gpt-4') """ if '/' in model: parts = model.split('/', 1) return parts[0], parts[1] elif ':' in model: parts = model.split(':', 1) return parts[0], parts[1] else: return "openai", model # Known provider name mappings PROVIDER_ALIASES = { "openai": "OpenAI", "anthropic": "Anthropic", "google": "Google", "azure": "Azure", "bedrock": "Bedrock", "cohere": "Cohere", "mistral": "Mistral", } def normalize_provider_name(provider): """Normalize provider name to match Onyx configuration.""" provider_lower = provider.lower().strip() return PROVIDER_ALIASES.get(provider_lower, provider_lower) # ============== Image Upload Support ============== def extract_files_from_messages(messages, msg_format="openai"): """ Extract images and other files from message content blocks. Supports: - OpenAI image_url (data or http or file-id) - Anthropic image (base64 or url or file-id) - Custom type: "file" with "file_id" - Automatic detection of file IDs in text if they exist in files_cache Returns: list of dicts: [{"base64_data": str, "media_type": str, "filename": str, "onyx_file_id": str, "type": str}, ...] """ files = [] for msg in messages: content = msg.get('content', '') # Handle string content: scan for file-ids if isinstance(content, str): # Simple regex for file- followed by hex characters matches = re.findall(r'(file-[a-f0-9]{24})', content) for file_id in matches: if file_id in files_cache: record = files_cache[file_id] mime = record.get('content_type', 'application/octet-stream') is_image = mime.startswith('image/') files.append({ "base64_data": record.get('_data', ''), "media_type": mime, "filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"), "onyx_file_id": record.get('onyx_file_id'), "type": "image" if is_image else "document" }) continue if not isinstance(content, list): continue for block in content: if not isinstance(block, dict): continue block_type = block.get('type') # OpenAI format: image_url if block_type == 'image_url': image_url_obj = block.get('image_url', {}) url = image_url_obj.get('url', '') # Check if it's a file ID from our cache if url in files_cache: record = files_cache[url] files.append({ "base64_data": record.get('_data', ''), "media_type": record.get('content_type', 'image/png'), "filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"), "onyx_file_id": record.get('onyx_file_id'), "type": "image" }) elif url.startswith('data:'): try: header, b64_data = url.split(',', 1) media_type = header.split(':')[1].split(';')[0] ext = media_type.split('/')[-1] if ext == 'jpeg': ext = 'jpg' files.append({ "base64_data": b64_data, "media_type": media_type, "filename": f"image_{uuid.uuid4().hex[:8]}.{ext}", "type": "image" }) except Exception as e: print(f"Failed to parse data URL: {e}") elif url.startswith('http'): try: resp = requests.get(url, timeout=30) if resp.status_code == 200: content_type = resp.headers.get('content-type', 'image/png') media_type = content_type.split(';')[0].strip() ext = media_type.split('/')[-1] if ext == 'jpeg': ext = 'jpg' b64_data = base64.b64encode(resp.content).decode('utf-8') files.append({ "base64_data": b64_data, "media_type": media_type, "filename": f"image_{uuid.uuid4().hex[:8]}.{ext}", "type": "image" }) except Exception as e: print(f"Failed to download image: {e}") # Anthropic format: image elif block_type == 'image': source = block.get('source', {}) if source.get('type') == 'base64': media_type = source.get('media_type', 'image/png') b64_data = source.get('data', '') ext = media_type.split('/')[-1] if ext == 'jpeg': ext = 'jpg' files.append({ "base64_data": b64_data, "media_type": media_type, "filename": f"image_{uuid.uuid4().hex[:8]}.{ext}", "type": "image" }) elif source.get('type') == 'url': url = source.get('url', '') if url in files_cache: record = files_cache[url] files.append({ "base64_data": record.get('_data', ''), "media_type": record.get('content_type', 'image/png'), "filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"), "onyx_file_id": record.get('onyx_file_id'), "type": "image" }) elif url.startswith('http'): try: resp = requests.get(url, timeout=30) if resp.status_code == 200: content_type = resp.headers.get('content-type', 'image/png') media_type = content_type.split(';')[0].strip() ext = media_type.split('/')[-1] if ext == 'jpeg': ext = 'jpg' b64_data = base64.b64encode(resp.content).decode('utf-8') files.append({ "base64_data": b64_data, "media_type": media_type, "filename": f"image_{uuid.uuid4().hex[:8]}.{ext}", "type": "image" }) except Exception as e: print(f"Failed to download image: {e}") # Generic file/document type elif block_type in ['file', 'document', 'attachment']: file_id = block.get('file_id') or block.get('id') if file_id and file_id in files_cache: record = files_cache[file_id] mime = record.get('content_type', 'application/octet-stream') is_image = mime.startswith('image/') files.append({ "base64_data": record.get('_data', ''), "media_type": mime, "filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"), "onyx_file_id": record.get('onyx_file_id'), "type": "image" if is_image else "document" }) # Support text scan inside content blocks too elif block_type == 'text': text = block.get('text', '') matches = re.findall(r'(file-[a-f0-9]{24})', text) for file_id in matches: if file_id in files_cache: record = files_cache[file_id] mime = record.get('content_type', 'application/octet-stream') is_image = mime.startswith('image/') files.append({ "base64_data": record.get('_data', ''), "media_type": mime, "filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"), "onyx_file_id": record.get('onyx_file_id'), "type": "image" if is_image else "document" }) return files def upload_file_to_onyx(file_data, chat_session_id): """ Upload an image or document to Onyx's file upload API. Args: file_data: dict with base64_data, media_type, filename, onyx_file_id, type chat_session_id: The chat session to associate the file with Returns: dict: File descriptor from Onyx, or None on failure """ # If we already have an Onyx file ID, just return the descriptor if file_data.get('onyx_file_id'): print(f"Using existing Onyx file ID: {file_data['onyx_file_id']}") return { "id": file_data['onyx_file_id'], "type": file_data.get('type', 'document'), "name": file_data.get('filename', 'file'), } # Decode base64 to binary try: file_bytes = base64.b64decode(file_data['base64_data']) except Exception as e: print(f"Failed to decode base64 file: {e}") return None # Try multiple Onyx file upload endpoints upload_endpoints = [ f"{ONYX_BASE_URL}/api/chat/file", f"{ONYX_BASE_URL}/api/chat/upload-file", f"{ONYX_BASE_URL}/api/manage/upload-file", ] headers = { "Authorization": f"Bearer {ONYX_API_TOKEN}", } for url in upload_endpoints: try: # Upload as multipart form data files = { 'file': (file_data['filename'], file_bytes, file_data['media_type']) } form_data = { 'chat_session_id': chat_session_id } print(f"Uploading file to: {url}") response = requests.post( url, files=files, data=form_data, headers=headers, timeout=60 ) print(f"Upload response: {response.status_code}") if response.status_code == 200: result = response.json() print(f"Upload success: {result}") # Normalize the file descriptor format f_id = result.get('file_id') or result.get('id') or result.get('document_id', '') file_descriptor = { "id": f_id, "type": file_data.get('type', 'document'), "name": file_data['filename'], } # If the response includes the full descriptor, use it if isinstance(result, dict): for key in ['file_id', 'id', 'document_id', 'name', 'type']: if key in result and result[key]: file_descriptor[key] = result[key] return file_descriptor elif response.status_code == 404: continue # Try next endpoint else: print(f"Upload failed: {response.status_code} - {response.text}") continue except Exception as e: print(f"Upload error at {url}: {e}") continue print("All upload endpoints failed") return None def upload_files_for_session(files, chat_session_id): """ Upload multiple files and return file descriptors. """ file_descriptors = [] # Remove duplicates seen_ids = set() unique_files = [] for f in files: f_id = f.get('onyx_file_id') or f.get('base64_data')[:100] if f_id not in seen_ids: seen_ids.add(f_id) unique_files.append(f) for file_data in unique_files: descriptor = upload_file_to_onyx(file_data, chat_session_id) if descriptor: file_descriptors.append(descriptor) else: print(f"Warning: Failed to upload file {file_data.get('filename', '?')}") return file_descriptors # ============== Tool Calling Support ============== def build_tools_system_prompt(tools, tool_format="anthropic"): """ Build a system prompt injection that teaches the model how to call tools. The model will output structured JSON when it wants to call a tool. Args: tools: List of tool definitions (Anthropic or OpenAI format) tool_format: "anthropic" or "openai" Returns: str: System prompt text with tool instructions """ if not tools: return "" tools_list = [] for tool in tools: if tool_format == "openai": # OpenAI format: {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}} func = tool.get('function', tool) tools_list.append({ "name": func.get('name', ''), "description": func.get('description', ''), "parameters": func.get('parameters', func.get('input_schema', {})) }) else: # Anthropic format: {"name": ..., "description": ..., "input_schema": ...} tools_list.append({ "name": tool.get('name', ''), "description": tool.get('description', ''), "parameters": tool.get('input_schema', tool.get('parameters', {})) }) tools_json = json.dumps(tools_list, indent=2) prompt = f"""You have access to the following tools/functions that you can call to help accomplish your task: {tools_json} CRITICAL INSTRUCTIONS FOR TOOL CALLING: 1. When you need to use a tool, you MUST output a JSON block wrapped in tags. 2. You can call MULTIPLE tools in a single response by outputting multiple blocks. 3. The format for each tool call MUST be exactly: {{"name": "tool_name", "id": "unique_call_id", "input": {{"param1": "value1", "param2": "value2"}}}} 4. The "id" should be a unique identifier like "call_1", "call_2", etc. 5. The "input" must match the parameter schema defined for the tool. 6. You can mix text and tool calls in your response. Text before/after tool calls is fine. 7. After a tool is executed, you will receive the result and can continue your work. 8. You MUST use the tools when they are needed. Do NOT say you cannot access tools. 9. ALL listed tools are available to you and you MUST use them when needed. 10. Do NOT refuse to call a tool by saying you don't have access or capability. IMPORTANT: You have FULL access to ALL the tools listed above. Use them freely whenever needed.""" return prompt def parse_tool_calls_from_response(response_text): """ Parse tool call blocks from the model's response text. Looks for patterns like: {"name": "...", "id": "...", "input": {...}} Also handles common variations the model might output. Returns: tuple: (text_content, list_of_tool_calls) - text_content: The text parts with tool_call blocks removed - list_of_tool_calls: List of parsed tool call dicts """ tool_calls = [] text_parts = [] # Pattern 1: ... pattern = r'\s*(.*?)\s*' matches = list(re.finditer(pattern, response_text, re.DOTALL)) if matches: last_end = 0 for match in matches: # Capture text before this tool call before_text = response_text[last_end:match.start()].strip() if before_text: text_parts.append(before_text) last_end = match.end() try: tool_data = json.loads(match.group(1).strip()) tool_call = { "name": tool_data.get("name", ""), "id": tool_data.get("id", f"toolu_{uuid.uuid4().hex[:24]}"), "input": tool_data.get("input", tool_data.get("arguments", tool_data.get("parameters", {}))) } tool_calls.append(tool_call) except json.JSONDecodeError as e: print(f"Failed to parse tool call JSON: {e}") text_parts.append(match.group(0)) # Keep unparseable block as text # Capture text after last tool call after_text = response_text[last_end:].strip() if after_text: text_parts.append(after_text) else: # Pattern 2: Try ```tool_call ... ``` or ```json with tool calling structure code_pattern = r'```(?:tool_call|json)?\s*(\{[^`]*?"name"\s*:\s*"[^"]+?"[^`]*?\})\s*```' code_matches = list(re.finditer(code_pattern, response_text, re.DOTALL)) if code_matches: last_end = 0 for match in code_matches: before_text = response_text[last_end:match.start()].strip() if before_text: text_parts.append(before_text) last_end = match.end() try: tool_data = json.loads(match.group(1).strip()) if "name" in tool_data and ("input" in tool_data or "arguments" in tool_data or "parameters" in tool_data): tool_call = { "name": tool_data.get("name", ""), "id": tool_data.get("id", f"toolu_{uuid.uuid4().hex[:24]}"), "input": tool_data.get("input", tool_data.get("arguments", tool_data.get("parameters", {}))) } tool_calls.append(tool_call) else: text_parts.append(match.group(0)) except json.JSONDecodeError: text_parts.append(match.group(0)) after_text = response_text[last_end:].strip() if after_text: text_parts.append(after_text) else: # No tool calls found text_parts.append(response_text) clean_text = "\n\n".join(text_parts).strip() return clean_text, tool_calls def convert_tool_results_to_text(messages): """ Convert tool_result messages back into readable text for the model. Anthropic sends tool results as: {"role": "user", "content": [{"type": "tool_result", "tool_use_id": "...", "content": "..."}]} OpenAI sends tool results as: {"role": "tool", "tool_call_id": "...", "content": "..."} We convert these into human-readable text the model can understand. """ converted = [] for msg in messages: role = msg.get('role', '') content = msg.get('content', '') # Handle Anthropic tool_result if role == 'user' and isinstance(content, list): text_parts = [] tool_results = [] for block in content: if isinstance(block, dict): if block.get('type') == 'tool_result': tool_use_id = block.get('tool_use_id', '') result_content = block.get('content', '') is_error = block.get('is_error', False) # Handle content that is a list of blocks if isinstance(result_content, list): result_text = "" for rc in result_content: if isinstance(rc, dict) and rc.get('type') == 'text': result_text += rc.get('text', '') elif isinstance(rc, str): result_text += rc result_content = result_text status = "ERROR" if is_error else "SUCCESS" tool_results.append(f"\n{result_content}\n") elif block.get('type') == 'text': text_parts.append(block.get('text', '')) elif isinstance(block, str): text_parts.append(block) if tool_results: combined = "\n\n".join(tool_results) if text_parts: combined += "\n\n" + "\n".join(text_parts) converted.append({ "role": "user", "content": combined }) else: converted.append(msg) # Handle OpenAI tool role elif role == 'tool': tool_call_id = msg.get('tool_call_id', '') converted.append({ "role": "user", "content": f"\n{content}\n" }) # Handle assistant messages with tool_calls (OpenAI format) elif role == 'assistant' and msg.get('tool_calls'): # Reconstruct what the assistant said + the tool calls it made assistant_text = "" if isinstance(content, str) and content: assistant_text = content + "\n\n" for tc in msg.get('tool_calls', []): func = tc.get('function', {}) tc_id = tc.get('id', '') try: args = json.loads(func.get('arguments', '{}')) except (json.JSONDecodeError, TypeError): args = func.get('arguments', {}) tool_call_json = json.dumps({ "name": func.get('name', ''), "id": tc_id, "input": args }) assistant_text += f"\n{tool_call_json}\n\n\n" converted.append({ "role": "assistant", "content": assistant_text.strip() }) # Handle assistant messages with tool_use content blocks (Anthropic format) elif role == 'assistant' and isinstance(content, list): assistant_text = "" for block in content: if isinstance(block, dict): if block.get('type') == 'text': assistant_text += block.get('text', '') + "\n\n" elif block.get('type') == 'tool_use': tool_call_json = json.dumps({ "name": block.get('name', ''), "id": block.get('id', ''), "input": block.get('input', {}) }) assistant_text += f"\n{tool_call_json}\n\n\n" converted.append({ "role": "assistant", "content": assistant_text.strip() }) else: converted.append(msg) return converted # ============== Payload Builders ============== def build_onyx_payload(messages, model_provider, model_version, temperature, chat_session_id, parent_message_id=None, stream=True, tools=None): """Convert OpenAI format to Onyx payload with tool + image support""" # Extract files from messages BEFORE converting tool results found_files = extract_files_from_messages(messages, msg_format="openai") file_descriptors = [] if found_files: print(f"Found {len(found_files)} file(s) in OpenAI messages, uploading...") file_descriptors = upload_files_for_session(found_files, chat_session_id) print(f"Successfully uploaded {len(file_descriptors)} file(s)") # Convert tool results in message history to text format processed_messages = convert_tool_results_to_text(messages) # Extract the last user message last_user_message = "" for msg in reversed(processed_messages): if msg.get('role') == 'user': content = msg.get('content', '') if isinstance(content, list): text_parts = [p.get('text', '') for p in content if isinstance(p, dict) and p.get('type') == 'text'] last_user_message = ' '.join(text_parts) else: last_user_message = content break # Build system prompt from system messages system_prompt = "" for msg in processed_messages: if msg.get('role') == 'system': content = msg.get('content', '') if isinstance(content, list): text_parts = [p.get('text', '') for p in content if isinstance(p, dict) and p.get('type') == 'text'] system_prompt += ' '.join(text_parts) + "\n" elif isinstance(content, str): system_prompt += content + "\n" # Build conversation history (all messages except last user message) history_text = "" for msg in processed_messages[:-1] if processed_messages else []: role = msg.get('role', '') content = msg.get('content', '') if role == 'system': continue # Already handled if isinstance(content, list): text_parts = [] for p in content: if isinstance(p, dict) and p.get('type') == 'text': text_parts.append(p.get('text', '')) content = ' '.join(text_parts) if content: history_text += f"[{role}]: {content}\n\n" # Inject tool definitions into the system prompt tools_prompt = build_tools_system_prompt(tools, tool_format="openai") if tools else "" # If files were uploaded, add a note to the message file_note = "" if file_descriptors: file_names = ", ".join([f.get('name', 'file') for f in file_descriptors]) file_note = f"\n[Note: {len(file_descriptors)} file(s) ({file_names}) have been attached to this message. Please analyze them as requested.]\n" # Construct the full message full_message = last_user_message prefix_parts = [] if system_prompt.strip(): prefix_parts.append(f"[System Instructions]\n{system_prompt.strip()}") if tools_prompt: prefix_parts.append(tools_prompt) if history_text.strip(): prefix_parts.append(f"[Conversation History]\n{history_text.strip()}") if prefix_parts: full_message = "\n\n".join(prefix_parts) + f"\n\n[Current User Message]\n{last_user_message}" if file_note: full_message += file_note payload = { "message": full_message, "chat_session_id": chat_session_id, "parent_message_id": parent_message_id if parent_message_id else None, "stream": stream, "llm_override": { "model_provider": model_provider, "model_version": model_version, "temperature": temperature }, "file_descriptors": file_descriptors, "include_citations": False } # Remove keys with None values — Onyx rejects/errors on null parent_message_id payload = {k: v for k, v in payload.items() if v is not None} return payload def build_anthropic_payload_from_messages(messages, system_prompt, model_provider, model_version, temperature, chat_session_id, parent_message_id=None, stream=True, tools=None): """Convert Anthropic Messages API format to Onyx payload with full tool + image support""" # Extract files from messages BEFORE converting tool results found_files = extract_files_from_messages(messages, msg_format="anthropic") file_descriptors = [] if found_files: print(f"Found {len(found_files)} file(s) in Anthropic messages, uploading...") file_descriptors = upload_files_for_session(found_files, chat_session_id) print(f"Successfully uploaded {len(file_descriptors)} file(s)") # Convert tool results in message history to text format processed_messages = convert_tool_results_to_text(messages) # Extract the last user message last_user_message = "" for msg in reversed(processed_messages): if msg.get('role') == 'user': content = msg.get('content', '') if isinstance(content, list): text_parts = [p.get('text', '') for p in content if isinstance(p, dict) and p.get('type') == 'text'] last_user_message = ' '.join(text_parts) elif isinstance(content, str): last_user_message = content break # Process system prompt sys_text = "" if system_prompt: if isinstance(system_prompt, list): sys_text = ' '.join([s.get('text', '') for s in system_prompt if isinstance(s, dict) and s.get('type') == 'text']) else: sys_text = system_prompt # Build conversation history (all messages except last user) history_text = "" for msg in processed_messages[:-1] if processed_messages else []: role = msg.get('role', '') content = msg.get('content', '') if isinstance(content, list): text_parts = [] for p in content: if isinstance(p, dict) and p.get('type') == 'text': text_parts.append(p.get('text', '')) content = ' '.join(text_parts) if content: history_text += f"[{role}]: {content}\n\n" # Build tool prompt tools_prompt = build_tools_system_prompt(tools, tool_format="anthropic") if tools else "" # If files were uploaded, add a note file_note = "" if file_descriptors: file_names = ", ".join([f.get('name', 'file') for f in file_descriptors]) file_note = f"\n[Note: {len(file_descriptors)} file(s) ({file_names}) have been attached to this message. Please analyze them as requested.]\n" # Construct full message full_message = last_user_message prefix_parts = [] if sys_text.strip(): prefix_parts.append(f"[System Instructions]\n{sys_text.strip()}") if tools_prompt: prefix_parts.append(tools_prompt) if history_text.strip(): prefix_parts.append(f"[Conversation History]\n{history_text.strip()}") if prefix_parts: full_message = "\n\n".join(prefix_parts) + f"\n\n[Current User Message]\n{last_user_message}" if file_note: full_message += file_note payload = { "message": full_message, "chat_session_id": chat_session_id, "parent_message_id": parent_message_id if parent_message_id else None, "stream": stream, "llm_override": { "model_provider": model_provider, "model_version": model_version, "temperature": temperature }, "file_descriptors": file_descriptors, "include_citations": False } # Remove keys with None values — Onyx rejects/errors on null parent_message_id payload = {k: v for k, v in payload.items() if v is not None} return payload # ============== Response Parsers ============== def parse_onyx_stream_chunk(chunk_text): """Parse a chunk from Onyx stream and extract the text content. Returns: tuple: (content, message_id, packet_type) """ if not chunk_text or not chunk_text.strip(): return None, None, None try: data = json.loads(chunk_text) if not isinstance(data, dict): return None, None, None # Handle new packet-based format if 'obj' in data: obj = data['obj'] packet_type = obj.get('type', '') if packet_type == 'message_delta': content = obj.get('content', '') return content, None, 'content' elif packet_type == 'message_start': return None, None, 'message_start' elif packet_type == 'stop': return None, None, 'stop' elif packet_type == 'error': error_msg = obj.get('message', obj.get('error', 'Unknown error')) return f"[Error: {error_msg}]", None, 'error' elif packet_type == 'citation_delta': return None, None, 'citation' elif packet_type in ['reasoning_start', 'reasoning_delta', 'reasoning_done']: return None, None, 'reasoning' else: return None, None, packet_type # FALLBACK: Old format message_id = data.get('message_id') if 'answer_piece' in data: return data['answer_piece'], message_id, 'legacy' elif 'text' in data: return data['text'], message_id, 'legacy' elif 'content' in data and isinstance(data['content'], str): return data['content'], message_id, 'legacy' elif 'error' in data: return f"[Error: {data['error']}]", message_id, 'error' return None, None, None except json.JSONDecodeError: if chunk_text.strip() and not chunk_text.strip().startswith('{'): return chunk_text.strip(), None, 'raw' return None, None, None # ============== OpenAI Format Streaming & Collection ============== def generate_openai_stream_chunk(content, model, chunk_id, finish_reason=None, tool_calls=None): """Generate an OpenAI-compatible SSE chunk, with optional tool_calls""" choice = { "index": 0, "delta": {}, "finish_reason": finish_reason } if content is not None: choice["delta"]["content"] = content if tool_calls is not None: choice["delta"]["tool_calls"] = tool_calls chunk = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [choice] } return f"data: {json.dumps(chunk)}\n\n" def stream_onyx_response(payload, model, session_key, has_tools=False): """Stream response from Onyx API in OpenAI SSE format""" final_message_id = None chunk_id = f"chatcmpl-{uuid.uuid4().hex[:24]}" full_content_buffer = "" # Buffer to detect tool calls endpoints = [ f"{ONYX_BASE_URL}/api/chat/send-chat-message", f"{ONYX_BASE_URL}/api/chat/send-message", ] # Initial assistant role chunk initial_chunk = { "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "delta": {"role": "assistant"}, "finish_reason": None }] } if not has_tools: # No tools = simple streaming yield f"data: {json.dumps(initial_chunk)}\n\n" last_message_id = None for url in endpoints: try: print(f"Trying endpoint: {url}") with requests.post(url, json=payload, headers=get_headers(), stream=True, timeout=120) as response: print(f"Response status: {response.status_code}") if response.status_code != 200: continue buffer = "" for chunk in response.iter_content(decode_unicode=True): if not chunk: continue buffer += chunk while '\n' in buffer: line, buffer = buffer.split('\n', 1) line = line.strip() if not line or line == "[DONE]": continue if line.startswith("data: "): line = line[6:] content, msg_id, packet_type = parse_onyx_stream_chunk(line) if msg_id: last_message_id = msg_id if content and packet_type in ['content', 'legacy', 'raw', 'error']: if has_tools: # Buffer content for tool call detection full_content_buffer += content else: yield generate_openai_stream_chunk(content, model, chunk_id) if packet_type == "stop": final_message_id = last_message_id break break except Exception as e: print("Stream error:", e) continue # Update session if final_message_id and session_key in chat_sessions_cache: chat_sessions_cache[session_key]["parent_message_id"] = final_message_id # If we have tools, parse the buffered content for tool calls if has_tools and full_content_buffer: text_content, tool_calls = parse_tool_calls_from_response(full_content_buffer) if tool_calls: # Emit initial chunk yield f"data: {json.dumps(initial_chunk)}\n\n" # Emit text content if any if text_content: yield generate_openai_stream_chunk(text_content, model, chunk_id) # Emit tool call chunks for idx, tc in enumerate(tool_calls): tc_chunk = [{ "index": idx, "id": tc["id"], "type": "function", "function": { "name": tc["name"], "arguments": json.dumps(tc["input"]) } }] yield generate_openai_stream_chunk(None, model, chunk_id, tool_calls=tc_chunk) yield generate_openai_stream_chunk("", model, chunk_id, "tool_calls") else: # No tool calls detected, emit as normal text yield f"data: {json.dumps(initial_chunk)}\n\n" yield generate_openai_stream_chunk(full_content_buffer, model, chunk_id) yield generate_openai_stream_chunk("", model, chunk_id, "stop") else: yield generate_openai_stream_chunk("", model, chunk_id, "stop") yield "data: [DONE]\n\n" def collect_full_response(payload, model, session_key, has_tools=False): """Collect full streaming response and return as complete OpenAI response""" full_content = "" last_message_id = None endpoints = [ f"{ONYX_BASE_URL}/api/chat/send-chat-message", f"{ONYX_BASE_URL}/api/chat/send-message", ] for url in endpoints: try: print(f"Trying endpoint: {url}") is_streaming_request = payload.get('stream', False) with requests.post(url, json=payload, headers=get_headers(), stream=is_streaming_request, timeout=120) as response: print(f"Response status: {response.status_code}") if response.status_code == 404: continue if response.status_code != 200: error_text = response.text print(f"Error response from {url}: {response.status_code} - {error_text}") continue # Try next endpoint if not is_streaming_request: try: data = response.json() full_content = data.get('answer') or data.get('message') or data.get('content') or "" msg_id = data.get('message_id') if session_key in chat_sessions_cache and msg_id: chat_sessions_cache[session_key]['parent_message_id'] = msg_id break except json.JSONDecodeError: full_content = response.text break else: buffer = "" for chunk in response.iter_content(chunk_size=None, decode_unicode=True): if chunk: buffer += chunk while '\n' in buffer: line, buffer = buffer.split('\n', 1) line = line.strip() if not line: continue if line.startswith('data: '): line = line[6:] if line == '[DONE]': continue content, msg_id, packet_type = parse_onyx_stream_chunk(line) if msg_id: last_message_id = msg_id if packet_type == 'stop': break if content and packet_type in ['content', 'legacy', 'raw', 'error']: full_content += content if buffer.strip(): if buffer.strip().startswith('data: '): buffer = buffer.strip()[6:] content, msg_id, packet_type = parse_onyx_stream_chunk(buffer.strip()) if msg_id: last_message_id = msg_id if content and packet_type in ['content', 'legacy', 'raw', 'error']: full_content += content if session_key in chat_sessions_cache and last_message_id: chat_sessions_cache[session_key]['parent_message_id'] = last_message_id break except requests.exceptions.RequestException as e: print(f"Request error: {e}") continue if not full_content: return { "error": { "message": "No response from Onyx API", "type": "api_error", "code": 500 } }, 500 # Parse for tool calls if tools were provided if has_tools: text_content, tool_calls = parse_tool_calls_from_response(full_content) if tool_calls: # Build response with tool_calls message = { "role": "assistant", "content": text_content if text_content else None, "tool_calls": [] } for idx, tc in enumerate(tool_calls): message["tool_calls"].append({ "id": tc["id"], "type": "function", "function": { "name": tc["name"], "arguments": json.dumps(tc["input"]) } }) response_data = { "id": f"chatcmpl-{uuid.uuid4().hex[:24]}", "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "message": message, "finish_reason": "tool_calls" }], "usage": { "prompt_tokens": -1, "completion_tokens": -1, "total_tokens": -1 } } return response_data, 200 # Standard text response response_data = { "id": f"chatcmpl-{uuid.uuid4().hex[:24]}", "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [{ "index": 0, "message": { "role": "assistant", "content": full_content }, "finish_reason": "stop" }], "usage": { "prompt_tokens": -1, "completion_tokens": -1, "total_tokens": -1 } } return response_data, 200 # ============== Anthropic Format Streaming & Collection ============== def generate_anthropic_stream_events(payload, model, session_key, has_tools=False): """Stream response from Onyx in Anthropic Messages SSE format with tool support""" msg_id = f"msg_{uuid.uuid4().hex[:24]}" final_message_id = None full_content_buffer = "" endpoints = [ f"{ONYX_BASE_URL}/api/chat/send-chat-message", f"{ONYX_BASE_URL}/api/chat/send-message", ] last_msg_id = None for url in endpoints: try: with requests.post(url, json=payload, headers=get_headers(), stream=True, timeout=120) as response: if response.status_code != 200: continue buffer = "" for chunk in response.iter_content(decode_unicode=True): if not chunk: continue buffer += chunk while '\n' in buffer: line, buffer = buffer.split('\n', 1) line = line.strip() if not line or line == "[DONE]": continue if line.startswith("data: "): line = line[6:] content, m_id, packet_type = parse_onyx_stream_chunk(line) if m_id: last_msg_id = m_id if content and packet_type in ['content', 'legacy', 'raw', 'error']: full_content_buffer += content if packet_type == "stop": final_message_id = last_msg_id break break except Exception as e: print(f"Anthropic stream error: {e}") continue # Update session if final_message_id and session_key in chat_sessions_cache: chat_sessions_cache[session_key]["parent_message_id"] = final_message_id # Now build the proper Anthropic SSE response text_content, tool_calls = ("", []) if has_tools and full_content_buffer: text_content, tool_calls = parse_tool_calls_from_response(full_content_buffer) else: text_content = full_content_buffer # Determine stop reason stop_reason = "tool_use" if tool_calls else "end_turn" # Build content blocks content_blocks = [] if text_content: content_blocks.append({"type": "text", "text": text_content}) for tc in tool_calls: content_blocks.append({ "type": "tool_use", "id": tc["id"], "name": tc["name"], "input": tc["input"] }) # If no content at all, add empty text if not content_blocks: content_blocks.append({"type": "text", "text": ""}) # message_start msg_start = { "type": "message_start", "message": { "id": msg_id, "type": "message", "role": "assistant", "content": [], "model": model, "stop_reason": None, "stop_sequence": None, "usage": {"input_tokens": 0, "output_tokens": 0} } } yield f"event: message_start\ndata: {json.dumps(msg_start)}\n\n" # Ping yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n" # Emit content blocks block_index = 0 for block in content_blocks: if block["type"] == "text": # content_block_start yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': {'type': 'text', 'text': ''}})}\n\n" # Stream text in chunks for better UX text = block["text"] chunk_size = 50 # characters per chunk for i in range(0, len(text), chunk_size): text_chunk = text[i:i + chunk_size] delta_event = { "type": "content_block_delta", "index": block_index, "delta": {"type": "text_delta", "text": text_chunk} } yield f"event: content_block_delta\ndata: {json.dumps(delta_event)}\n\n" # content_block_stop yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" block_index += 1 elif block["type"] == "tool_use": # content_block_start for tool_use yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': {'type': 'tool_use', 'id': block['id'], 'name': block['name'], 'input': {}}})}\n\n" # Send tool input as delta input_json = json.dumps(block["input"]) delta_event = { "type": "content_block_delta", "index": block_index, "delta": {"type": "input_json_delta", "partial_json": input_json} } yield f"event: content_block_delta\ndata: {json.dumps(delta_event)}\n\n" # content_block_stop yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" block_index += 1 # message_delta (stop reason) msg_delta = { "type": "message_delta", "delta": {"stop_reason": stop_reason, "stop_sequence": None}, "usage": {"output_tokens": 0} } yield f"event: message_delta\ndata: {json.dumps(msg_delta)}\n\n" # message_stop yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" def collect_anthropic_full_response(payload, model, session_key, has_tools=False): """Collect full response and return in Anthropic Messages format with tool support""" full_content = "" last_message_id = None endpoints = [ f"{ONYX_BASE_URL}/api/chat/send-chat-message", f"{ONYX_BASE_URL}/api/chat/send-message", ] for url in endpoints: try: is_streaming_request = payload.get('stream', False) with requests.post(url, json=payload, headers=get_headers(), stream=is_streaming_request, timeout=120) as response: if response.status_code == 404: continue if response.status_code != 200: print(f"Error response from {url}: {response.status_code} - {response.text}") continue # Try next endpoint if not is_streaming_request: try: data = response.json() full_content = data.get('answer') or data.get('message') or data.get('content') or "" msg_id = data.get('message_id') if session_key in chat_sessions_cache and msg_id: chat_sessions_cache[session_key]['parent_message_id'] = msg_id break except json.JSONDecodeError: full_content = response.text break else: buffer = "" for chunk in response.iter_content(chunk_size=None, decode_unicode=True): if chunk: buffer += chunk while '\n' in buffer: line, buffer = buffer.split('\n', 1) line = line.strip() if not line: continue if line.startswith('data: '): line = line[6:] if line == '[DONE]': continue content, msg_id, packet_type = parse_onyx_stream_chunk(line) if msg_id: last_message_id = msg_id if packet_type == 'stop': break if content and packet_type in ['content', 'legacy', 'raw', 'error']: full_content += content if session_key in chat_sessions_cache and last_message_id: chat_sessions_cache[session_key]['parent_message_id'] = last_message_id break except requests.exceptions.RequestException as e: print(f"Anthropic request error: {e}") continue if not full_content: return { "type": "error", "error": { "type": "api_error", "message": "No response from Onyx API" } }, 500 # Parse for tool calls content_blocks = [] stop_reason = "end_turn" if has_tools: text_content, tool_calls = parse_tool_calls_from_response(full_content) if text_content: content_blocks.append({"type": "text", "text": text_content}) if tool_calls: stop_reason = "tool_use" for tc in tool_calls: content_blocks.append({ "type": "tool_use", "id": tc["id"], "name": tc["name"], "input": tc["input"] }) else: content_blocks.append({"type": "text", "text": full_content}) if not content_blocks: content_blocks.append({"type": "text", "text": ""}) response_data = { "id": f"msg_{uuid.uuid4().hex[:24]}", "type": "message", "role": "assistant", "content": content_blocks, "model": model, "stop_reason": stop_reason, "stop_sequence": None, "usage": { "input_tokens": 0, "output_tokens": 0 } } return response_data, 200 # ============== API Routes ============== @app.route('/v1/chat/completions', methods=['POST']) def chat_completions(): """OpenAI-compatible chat completions endpoint with full tool/function calling support""" try: data = request.json print(f"Received request: {json.dumps(data, indent=2)[:1000]}") except Exception as e: return jsonify({"error": {"message": f"Invalid JSON: {e}", "type": "invalid_request_error"}}), 400 # Extract parameters model = data.get('model', 'openai/gpt-4') messages = data.get('messages', []) stream = data.get('stream', False) temperature = data.get('temperature', 0.7) tools = data.get('tools', None) functions = data.get('functions', None) # Legacy function calling # Convert legacy functions to tools format if functions and not tools: tools = [{"type": "function", "function": f} for f in functions] has_tools = bool(tools) session_key = data.get('session_id', 'default') if not messages: return jsonify({ "error": { "message": "messages is required", "type": "invalid_request_error" } }), 400 # Parse model string and normalize provider name model_provider, model_version = parse_model_string(model) model_provider = normalize_provider_name(model_provider) print(f"Model provider: {model_provider}, version: {model_version}") # Get or create chat session session_info = get_or_create_session(session_key) if not session_info: return jsonify({ "error": { "message": "Failed to create chat session with Onyx API", "type": "api_error" } }), 500 # Build Onyx payload payload = build_onyx_payload( messages=messages, model_provider=model_provider, model_version=model_version, temperature=temperature, chat_session_id=session_info['session_id'], parent_message_id=session_info.get('parent_message_id'), stream=stream, tools=tools ) if stream: return Response( stream_onyx_response(payload, model, session_key, has_tools=has_tools), content_type='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' } ) else: response_data, status_code = collect_full_response(payload, model, session_key, has_tools=has_tools) return jsonify(response_data), status_code @app.route('/v1/sessions', methods=['POST']) def create_new_session(): """Create a new chat session""" data = request.json or {} persona_id = data.get('persona_id', 0) session_id = create_chat_session(persona_id) if session_id: session_key = str(uuid.uuid4()) chat_sessions_cache[session_key] = { "session_id": session_id, "parent_message_id": None } return jsonify({ "session_key": session_key, "chat_session_id": session_id }) else: return jsonify({"error": "Failed to create session"}), 500 @app.route('/v1/sessions', methods=['DELETE']) def clear_sessions(): """Clear all cached sessions""" chat_sessions_cache.clear() return jsonify({"status": "cleared"}) @app.route('/v1/models', methods=['GET']) def list_models(): """OpenAI-compatible models listing endpoint""" models = [ {"id": "openai/gpt-5.2", "object": "model", "owned_by": "openai"}, {"id": "google/gemini-3-pro-preview", "object": "model", "owned_by": "openai"}, {"id": "openai/gpt-4o", "object": "model", "owned_by": "openai"}, {"id": "anthropic/claude-opus-4-6", "object": "model", "owned_by": "anthropic"}, {"id": "anthropic/claude-sonnet-4.5", "object": "model", "owned_by": "anthropic"}, {"id": "anthropic/claude-haiku-4-5", "object": "model", "owned_by": "anthropic"}, ] return jsonify({ "object": "list", "data": models }) @app.route('/v1/models/', methods=['GET']) def get_model(model_id): """OpenAI-compatible model details endpoint""" return jsonify({ "id": model_id, "object": "model", "owned_by": model_id.split('/')[0] if '/' in model_id else "unknown" }) @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ "status": "healthy", "timestamp": int(time.time()), "active_sessions": len(chat_sessions_cache) }) @app.route('/debug/test-onyx', methods=['GET']) def test_onyx_connection(): """Test connection to Onyx API""" results = {} session_id = create_chat_session() results['create_session'] = { "success": session_id is not None, "session_id": session_id } return jsonify(results) # ============== File Upload API ============== @app.route('/v1/files', methods=['POST']) def upload_file(): """OpenAI-compatible file upload endpoint. Accepts multipart form data with: - file: The file to upload - purpose: The purpose of the file (e.g., 'assistants', 'vision', 'fine-tune') Also accepts JSON with base64 data: - file_data: base64-encoded file content - filename: name of the file - purpose: purpose string """ try: file_id = f"file-{uuid.uuid4().hex[:24]}" # Check if it's a multipart upload if 'file' in request.files: uploaded_file = request.files['file'] purpose = request.form.get('purpose', 'assistants') filename = uploaded_file.filename or f"upload_{uuid.uuid4().hex[:8]}" file_bytes = uploaded_file.read() media_type = uploaded_file.content_type or mimetypes.guess_type(filename)[0] or 'application/octet-stream' # Check if it's a JSON upload with base64 elif request.is_json: data = request.json purpose = data.get('purpose', 'assistants') filename = data.get('filename', f"upload_{uuid.uuid4().hex[:8]}") file_data = data.get('file_data', '') media_type = data.get('content_type', mimetypes.guess_type(filename)[0] or 'application/octet-stream') try: file_bytes = base64.b64decode(file_data) except Exception as e: return jsonify({"error": {"message": f"Invalid base64 file_data: {e}", "type": "invalid_request_error"}}), 400 else: return jsonify({"error": {"message": "No file provided. Use multipart form with 'file' field or JSON with 'file_data' base64 field.", "type": "invalid_request_error"}}), 400 file_size = len(file_bytes) print(f"Uploading file: {filename} ({file_size} bytes, {media_type})") # Try to upload to Onyx onyx_file_id = None upload_endpoints = [ f"{ONYX_BASE_URL}/api/chat/file", f"{ONYX_BASE_URL}/api/chat/upload-file", f"{ONYX_BASE_URL}/api/manage/upload-file", ] headers = { "Authorization": f"Bearer {ONYX_API_TOKEN}", } for url in upload_endpoints: try: files = { 'file': (filename, file_bytes, media_type) } resp = requests.post(url, files=files, headers=headers, timeout=60) if resp.status_code == 200: result = resp.json() onyx_file_id = result.get('file_id') or result.get('id') or result.get('document_id') print(f"Uploaded to Onyx: {onyx_file_id}") break elif resp.status_code == 404: continue else: print(f"Upload to {url} failed: {resp.status_code} - {resp.text}") continue except Exception as e: print(f"Upload error at {url}: {e}") continue # Store in local cache file_record = { "id": file_id, "object": "file", "bytes": file_size, "created_at": int(time.time()), "filename": filename, "purpose": purpose, "status": "processed", "status_details": None, "content_type": media_type, "onyx_file_id": onyx_file_id, } # Store file bytes for retrieval file_record["_data"] = base64.b64encode(file_bytes).decode('utf-8') files_cache[file_id] = file_record # Return OpenAI-compatible response (without internal _data) response = {k: v for k, v in file_record.items() if not k.startswith('_')} return jsonify(response), 200 except Exception as e: print(f"File upload error: {e}") return jsonify({"error": {"message": f"File upload failed: {str(e)}", "type": "api_error"}}), 500 @app.route('/v1/files', methods=['GET']) def list_files(): """OpenAI-compatible list files endpoint""" purpose = request.args.get('purpose', None) files_list = [] for fid, record in files_cache.items(): if purpose and record.get('purpose') != purpose: continue files_list.append({k: v for k, v in record.items() if not k.startswith('_')}) return jsonify({ "object": "list", "data": files_list }) @app.route('/v1/files/', methods=['GET']) def retrieve_file(file_id): """OpenAI-compatible retrieve file endpoint""" if file_id not in files_cache: return jsonify({"error": {"message": f"No file with id '{file_id}' found", "type": "invalid_request_error"}}), 404 record = files_cache[file_id] response = {k: v for k, v in record.items() if not k.startswith('_')} return jsonify(response) @app.route('/v1/files/', methods=['DELETE']) def delete_file(file_id): """OpenAI-compatible delete file endpoint""" if file_id not in files_cache: return jsonify({"error": {"message": f"No file with id '{file_id}' found", "type": "invalid_request_error"}}), 404 del files_cache[file_id] return jsonify({ "id": file_id, "object": "file", "deleted": True }) @app.route('/v1/files//content', methods=['GET']) def retrieve_file_content(file_id): """OpenAI-compatible retrieve file content endpoint""" if file_id not in files_cache: return jsonify({"error": {"message": f"No file with id '{file_id}' found", "type": "invalid_request_error"}}), 404 record = files_cache[file_id] b64_data = record.get('_data', '') if not b64_data: return jsonify({"error": {"message": "File content not available", "type": "api_error"}}), 404 file_bytes = base64.b64decode(b64_data) return Response( file_bytes, mimetype=record.get('content_type', 'application/octet-stream'), headers={ 'Content-Disposition': f'attachment; filename="{record.get("filename", "file")}"' } ) @app.route('/upload', methods=['POST']) def simple_upload(): """Simple file upload endpoint (non-OpenAI format) for direct uploads. Accepts multipart form data with a 'file' field. Optionally include 'chat_session_id' to associate with a session. """ if 'file' not in request.files: return jsonify({"error": "No file provided"}), 400 uploaded_file = request.files['file'] filename = uploaded_file.filename or f"upload_{uuid.uuid4().hex[:8]}" file_bytes = uploaded_file.read() media_type = uploaded_file.content_type or mimetypes.guess_type(filename)[0] or 'application/octet-stream' chat_session_id = request.form.get('chat_session_id', None) # Upload to Onyx if session provided onyx_result = None if chat_session_id: file_data = { "base64_data": base64.b64encode(file_bytes).decode('utf-8'), "media_type": media_type, "filename": filename, "type": "image" if media_type.startswith('image/') else "document" } onyx_result = upload_file_to_onyx(file_data, chat_session_id) # Store locally file_id = f"file-{uuid.uuid4().hex[:24]}" file_record = { "id": file_id, "filename": filename, "size": len(file_bytes), "content_type": media_type, "uploaded_at": int(time.time()), "onyx_descriptor": onyx_result, "_data": base64.b64encode(file_bytes).decode('utf-8') } files_cache[file_id] = file_record response = {k: v for k, v in file_record.items() if not k.startswith('_')} return jsonify(response), 200 # ============== Anthropic Messages API ============== @app.route('/v1/messages', methods=['POST']) def anthropic_messages(): """Anthropic Messages API compatible endpoint — used by Claude Code, with full tool support""" try: data = request.json print(f"[Anthropic] Received request: {json.dumps(data, indent=2)[:1000]}") except Exception as e: return jsonify({ "type": "error", "error": {"type": "invalid_request_error", "message": f"Invalid JSON: {e}"} }), 400 # Extract Anthropic parameters model = data.get('model', 'claude-opus-4-6') messages = data.get('messages', []) system_prompt = data.get('system', '') stream = data.get('stream', False) temperature = data.get('temperature', 0.7) max_tokens = data.get('max_tokens', 4096) tools = data.get('tools', None) has_tools = bool(tools) session_key = f"anthropic_{model}" if not messages: return jsonify({ "type": "error", "error": {"type": "invalid_request_error", "message": "messages is required"} }), 400 # Parse model if '/' not in model: full_model = f"anthropic/{model}" else: full_model = model model_provider, model_version = parse_model_string(full_model) model_provider = normalize_provider_name(model_provider) print(f"[Anthropic] Provider: {model_provider}, Version: {model_version}") if has_tools: print(f"[Anthropic] Tools provided: {[t.get('name', '?') for t in tools]}") # Get or create session session_info = get_or_create_session(session_key) if not session_info: return jsonify({ "type": "error", "error": {"type": "api_error", "message": "Failed to create chat session"} }), 500 # Build Onyx payload payload = build_anthropic_payload_from_messages( messages=messages, system_prompt=system_prompt, model_provider=model_provider, model_version=model_version, temperature=temperature, chat_session_id=session_info['session_id'], parent_message_id=session_info.get('parent_message_id'), stream=stream, tools=tools ) if stream: return Response( generate_anthropic_stream_events(payload, model, session_key, has_tools=has_tools), content_type='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' } ) else: response_data, status_code = collect_anthropic_full_response(payload, model, session_key, has_tools=has_tools) return jsonify(response_data), status_code @app.route('/', methods=['GET']) def root(): """Root endpoint with a premium Chat UI""" html = """ Onyx AI Proxy | Premium Chat
System Active
Welcome to the Onyx edge proxy. Upload files or images and I'll analyze them for you. How can I help today?
""" return render_template_string(html) # ============== Error Handlers ============== @app.errorhandler(404) def not_found(e): return jsonify({ "error": { "message": "Endpoint not found", "type": "invalid_request_error", "code": 404 } }), 404 @app.errorhandler(500) def server_error(e): return jsonify({ "error": { "message": f"Internal server error: {str(e)}", "type": "server_error", "code": 500 } }), 500 if __name__ == '__main__': print("=" * 60) print("OpenAI + Anthropic Compatible Onyx API Proxy v3.1") print("Full Function/Tool Calling + File Upload Support") print("=" * 60) print(f"Onyx Base URL: {ONYX_BASE_URL}") print(f"Token configured: {'Yes' if ONYX_API_TOKEN != '' else 'No'}") print("=" * 60) app.run( host='0.0.0.0', port=7860, debug=True, threaded=True )