pr-1 / app.py
shiv-4567892009's picture
Updated app.py
ad3b7a8 verified
from flask import Flask, request, Response, jsonify, render_template_string
import requests
import json
import uuid
import time
import os
import re
import base64
import tempfile
import mimetypes
app = Flask(__name__)
# Configuration
ONYX_BASE_URL = os.environ.get("ONYX_BASE_URL", "https://cloud.onyx.app")
ONYX_API_TOKEN = os.environ.get("ONYX_SECRET", "<your-token-here>")
# Store chat sessions
chat_sessions_cache = {}
# Store uploaded files metadata
files_cache = {}
def get_headers():
"""Get authorization headers"""
return {
"Authorization": f"Bearer {ONYX_API_TOKEN}",
"Content-Type": "application/json"
}
def create_chat_session(persona_id=0):
"""Create a new chat session in Onyx"""
url = f"{ONYX_BASE_URL}/api/chat/create-chat-session"
payload = {
"persona_id": persona_id,
"description": "OpenAI Compatible API Session"
}
try:
response = requests.post(url, json=payload, headers=get_headers(), timeout=30)
if response.status_code == 200:
data = response.json()
session_id = data.get('chat_session_id') or data.get('id') or data
print(f"Created chat session: {session_id}")
return str(session_id)
else:
print(f"Failed to create chat session: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Error creating chat session: {e}")
return None
def get_or_create_session(session_key="default", persona_id=0):
"""Get existing session or create new one"""
if session_key not in chat_sessions_cache:
session_id = create_chat_session(persona_id)
if session_id:
chat_sessions_cache[session_key] = {
"session_id": session_id,
"parent_message_id": None
}
return chat_sessions_cache.get(session_key)
def parse_model_string(model):
"""
Parse model string in format 'provider/model_version'
Examples:
- 'openai/gpt-4' -> ('openai', 'gpt-4')
- 'anthropic/claude-3-opus' -> ('anthropic', 'claude-3-opus')
- 'gpt-4' -> ('openai', 'gpt-4')
"""
if '/' in model:
parts = model.split('/', 1)
return parts[0], parts[1]
elif ':' in model:
parts = model.split(':', 1)
return parts[0], parts[1]
else:
return "openai", model
# Known provider name mappings
PROVIDER_ALIASES = {
"openai": "OpenAI",
"anthropic": "Anthropic",
"google": "Google",
"azure": "Azure",
"bedrock": "Bedrock",
"cohere": "Cohere",
"mistral": "Mistral",
}
def normalize_provider_name(provider):
"""Normalize provider name to match Onyx configuration."""
provider_lower = provider.lower().strip()
return PROVIDER_ALIASES.get(provider_lower, provider_lower)
# ============== Image Upload Support ==============
def extract_files_from_messages(messages, msg_format="openai"):
"""
Extract images and other files from message content blocks.
Supports:
- OpenAI image_url (data or http or file-id)
- Anthropic image (base64 or url or file-id)
- Custom type: "file" with "file_id"
- Automatic detection of file IDs in text if they exist in files_cache
Returns:
list of dicts: [{"base64_data": str, "media_type": str, "filename": str, "onyx_file_id": str, "type": str}, ...]
"""
files = []
for msg in messages:
content = msg.get('content', '')
# Handle string content: scan for file-ids
if isinstance(content, str):
# Simple regex for file- followed by hex characters
matches = re.findall(r'(file-[a-f0-9]{24})', content)
for file_id in matches:
if file_id in files_cache:
record = files_cache[file_id]
mime = record.get('content_type', 'application/octet-stream')
is_image = mime.startswith('image/')
files.append({
"base64_data": record.get('_data', ''),
"media_type": mime,
"filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"),
"onyx_file_id": record.get('onyx_file_id'),
"type": "image" if is_image else "document"
})
continue
if not isinstance(content, list):
continue
for block in content:
if not isinstance(block, dict):
continue
block_type = block.get('type')
# OpenAI format: image_url
if block_type == 'image_url':
image_url_obj = block.get('image_url', {})
url = image_url_obj.get('url', '')
# Check if it's a file ID from our cache
if url in files_cache:
record = files_cache[url]
files.append({
"base64_data": record.get('_data', ''),
"media_type": record.get('content_type', 'image/png'),
"filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"),
"onyx_file_id": record.get('onyx_file_id'),
"type": "image"
})
elif url.startswith('data:'):
try:
header, b64_data = url.split(',', 1)
media_type = header.split(':')[1].split(';')[0]
ext = media_type.split('/')[-1]
if ext == 'jpeg': ext = 'jpg'
files.append({
"base64_data": b64_data,
"media_type": media_type,
"filename": f"image_{uuid.uuid4().hex[:8]}.{ext}",
"type": "image"
})
except Exception as e:
print(f"Failed to parse data URL: {e}")
elif url.startswith('http'):
try:
resp = requests.get(url, timeout=30)
if resp.status_code == 200:
content_type = resp.headers.get('content-type', 'image/png')
media_type = content_type.split(';')[0].strip()
ext = media_type.split('/')[-1]
if ext == 'jpeg': ext = 'jpg'
b64_data = base64.b64encode(resp.content).decode('utf-8')
files.append({
"base64_data": b64_data,
"media_type": media_type,
"filename": f"image_{uuid.uuid4().hex[:8]}.{ext}",
"type": "image"
})
except Exception as e:
print(f"Failed to download image: {e}")
# Anthropic format: image
elif block_type == 'image':
source = block.get('source', {})
if source.get('type') == 'base64':
media_type = source.get('media_type', 'image/png')
b64_data = source.get('data', '')
ext = media_type.split('/')[-1]
if ext == 'jpeg': ext = 'jpg'
files.append({
"base64_data": b64_data,
"media_type": media_type,
"filename": f"image_{uuid.uuid4().hex[:8]}.{ext}",
"type": "image"
})
elif source.get('type') == 'url':
url = source.get('url', '')
if url in files_cache:
record = files_cache[url]
files.append({
"base64_data": record.get('_data', ''),
"media_type": record.get('content_type', 'image/png'),
"filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"),
"onyx_file_id": record.get('onyx_file_id'),
"type": "image"
})
elif url.startswith('http'):
try:
resp = requests.get(url, timeout=30)
if resp.status_code == 200:
content_type = resp.headers.get('content-type', 'image/png')
media_type = content_type.split(';')[0].strip()
ext = media_type.split('/')[-1]
if ext == 'jpeg': ext = 'jpg'
b64_data = base64.b64encode(resp.content).decode('utf-8')
files.append({
"base64_data": b64_data,
"media_type": media_type,
"filename": f"image_{uuid.uuid4().hex[:8]}.{ext}",
"type": "image"
})
except Exception as e:
print(f"Failed to download image: {e}")
# Generic file/document type
elif block_type in ['file', 'document', 'attachment']:
file_id = block.get('file_id') or block.get('id')
if file_id and file_id in files_cache:
record = files_cache[file_id]
mime = record.get('content_type', 'application/octet-stream')
is_image = mime.startswith('image/')
files.append({
"base64_data": record.get('_data', ''),
"media_type": mime,
"filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"),
"onyx_file_id": record.get('onyx_file_id'),
"type": "image" if is_image else "document"
})
# Support text scan inside content blocks too
elif block_type == 'text':
text = block.get('text', '')
matches = re.findall(r'(file-[a-f0-9]{24})', text)
for file_id in matches:
if file_id in files_cache:
record = files_cache[file_id]
mime = record.get('content_type', 'application/octet-stream')
is_image = mime.startswith('image/')
files.append({
"base64_data": record.get('_data', ''),
"media_type": mime,
"filename": record.get('filename', f"file_{uuid.uuid4().hex[:8]}"),
"onyx_file_id": record.get('onyx_file_id'),
"type": "image" if is_image else "document"
})
return files
def upload_file_to_onyx(file_data, chat_session_id):
"""
Upload an image or document to Onyx's file upload API.
Args:
file_data: dict with base64_data, media_type, filename, onyx_file_id, type
chat_session_id: The chat session to associate the file with
Returns:
dict: File descriptor from Onyx, or None on failure
"""
# If we already have an Onyx file ID, just return the descriptor
if file_data.get('onyx_file_id'):
print(f"Using existing Onyx file ID: {file_data['onyx_file_id']}")
return {
"id": file_data['onyx_file_id'],
"type": file_data.get('type', 'document'),
"name": file_data.get('filename', 'file'),
}
# Decode base64 to binary
try:
file_bytes = base64.b64decode(file_data['base64_data'])
except Exception as e:
print(f"Failed to decode base64 file: {e}")
return None
# Try multiple Onyx file upload endpoints
upload_endpoints = [
f"{ONYX_BASE_URL}/api/chat/file",
f"{ONYX_BASE_URL}/api/chat/upload-file",
f"{ONYX_BASE_URL}/api/manage/upload-file",
]
headers = {
"Authorization": f"Bearer {ONYX_API_TOKEN}",
}
for url in upload_endpoints:
try:
# Upload as multipart form data
files = {
'file': (file_data['filename'], file_bytes, file_data['media_type'])
}
form_data = {
'chat_session_id': chat_session_id
}
print(f"Uploading file to: {url}")
response = requests.post(
url,
files=files,
data=form_data,
headers=headers,
timeout=60
)
print(f"Upload response: {response.status_code}")
if response.status_code == 200:
result = response.json()
print(f"Upload success: {result}")
# Normalize the file descriptor format
f_id = result.get('file_id') or result.get('id') or result.get('document_id', '')
file_descriptor = {
"id": f_id,
"type": file_data.get('type', 'document'),
"name": file_data['filename'],
}
# If the response includes the full descriptor, use it
if isinstance(result, dict):
for key in ['file_id', 'id', 'document_id', 'name', 'type']:
if key in result and result[key]:
file_descriptor[key] = result[key]
return file_descriptor
elif response.status_code == 404:
continue # Try next endpoint
else:
print(f"Upload failed: {response.status_code} - {response.text}")
continue
except Exception as e:
print(f"Upload error at {url}: {e}")
continue
print("All upload endpoints failed")
return None
def upload_files_for_session(files, chat_session_id):
"""
Upload multiple files and return file descriptors.
"""
file_descriptors = []
# Remove duplicates
seen_ids = set()
unique_files = []
for f in files:
f_id = f.get('onyx_file_id') or f.get('base64_data')[:100]
if f_id not in seen_ids:
seen_ids.add(f_id)
unique_files.append(f)
for file_data in unique_files:
descriptor = upload_file_to_onyx(file_data, chat_session_id)
if descriptor:
file_descriptors.append(descriptor)
else:
print(f"Warning: Failed to upload file {file_data.get('filename', '?')}")
return file_descriptors
# ============== Tool Calling Support ==============
def build_tools_system_prompt(tools, tool_format="anthropic"):
"""
Build a system prompt injection that teaches the model how to call tools.
The model will output structured JSON when it wants to call a tool.
Args:
tools: List of tool definitions (Anthropic or OpenAI format)
tool_format: "anthropic" or "openai"
Returns:
str: System prompt text with tool instructions
"""
if not tools:
return ""
tools_list = []
for tool in tools:
if tool_format == "openai":
# OpenAI format: {"type": "function", "function": {"name": ..., "description": ..., "parameters": ...}}
func = tool.get('function', tool)
tools_list.append({
"name": func.get('name', ''),
"description": func.get('description', ''),
"parameters": func.get('parameters', func.get('input_schema', {}))
})
else:
# Anthropic format: {"name": ..., "description": ..., "input_schema": ...}
tools_list.append({
"name": tool.get('name', ''),
"description": tool.get('description', ''),
"parameters": tool.get('input_schema', tool.get('parameters', {}))
})
tools_json = json.dumps(tools_list, indent=2)
prompt = f"""You have access to the following tools/functions that you can call to help accomplish your task:
<available_tools>
{tools_json}
</available_tools>
CRITICAL INSTRUCTIONS FOR TOOL CALLING:
1. When you need to use a tool, you MUST output a JSON block wrapped in <tool_call> tags.
2. You can call MULTIPLE tools in a single response by outputting multiple <tool_call> blocks.
3. The format for each tool call MUST be exactly:
<tool_call>
{{"name": "tool_name", "id": "unique_call_id", "input": {{"param1": "value1", "param2": "value2"}}}}
</tool_call>
4. The "id" should be a unique identifier like "call_1", "call_2", etc.
5. The "input" must match the parameter schema defined for the tool.
6. You can mix text and tool calls in your response. Text before/after tool calls is fine.
7. After a tool is executed, you will receive the result and can continue your work.
8. You MUST use the tools when they are needed. Do NOT say you cannot access tools.
9. ALL listed tools are available to you and you MUST use them when needed.
10. Do NOT refuse to call a tool by saying you don't have access or capability.
IMPORTANT: You have FULL access to ALL the tools listed above. Use them freely whenever needed."""
return prompt
def parse_tool_calls_from_response(response_text):
"""
Parse tool call blocks from the model's response text.
Looks for patterns like:
<tool_call>
{"name": "...", "id": "...", "input": {...}}
</tool_call>
Also handles common variations the model might output.
Returns:
tuple: (text_content, list_of_tool_calls)
- text_content: The text parts with tool_call blocks removed
- list_of_tool_calls: List of parsed tool call dicts
"""
tool_calls = []
text_parts = []
# Pattern 1: <tool_call>...</tool_call>
pattern = r'<tool_call>\s*(.*?)\s*</tool_call>'
matches = list(re.finditer(pattern, response_text, re.DOTALL))
if matches:
last_end = 0
for match in matches:
# Capture text before this tool call
before_text = response_text[last_end:match.start()].strip()
if before_text:
text_parts.append(before_text)
last_end = match.end()
try:
tool_data = json.loads(match.group(1).strip())
tool_call = {
"name": tool_data.get("name", ""),
"id": tool_data.get("id", f"toolu_{uuid.uuid4().hex[:24]}"),
"input": tool_data.get("input", tool_data.get("arguments", tool_data.get("parameters", {})))
}
tool_calls.append(tool_call)
except json.JSONDecodeError as e:
print(f"Failed to parse tool call JSON: {e}")
text_parts.append(match.group(0)) # Keep unparseable block as text
# Capture text after last tool call
after_text = response_text[last_end:].strip()
if after_text:
text_parts.append(after_text)
else:
# Pattern 2: Try ```tool_call ... ``` or ```json with tool calling structure
code_pattern = r'```(?:tool_call|json)?\s*(\{[^`]*?"name"\s*:\s*"[^"]+?"[^`]*?\})\s*```'
code_matches = list(re.finditer(code_pattern, response_text, re.DOTALL))
if code_matches:
last_end = 0
for match in code_matches:
before_text = response_text[last_end:match.start()].strip()
if before_text:
text_parts.append(before_text)
last_end = match.end()
try:
tool_data = json.loads(match.group(1).strip())
if "name" in tool_data and ("input" in tool_data or "arguments" in tool_data or "parameters" in tool_data):
tool_call = {
"name": tool_data.get("name", ""),
"id": tool_data.get("id", f"toolu_{uuid.uuid4().hex[:24]}"),
"input": tool_data.get("input", tool_data.get("arguments", tool_data.get("parameters", {})))
}
tool_calls.append(tool_call)
else:
text_parts.append(match.group(0))
except json.JSONDecodeError:
text_parts.append(match.group(0))
after_text = response_text[last_end:].strip()
if after_text:
text_parts.append(after_text)
else:
# No tool calls found
text_parts.append(response_text)
clean_text = "\n\n".join(text_parts).strip()
return clean_text, tool_calls
def convert_tool_results_to_text(messages):
"""
Convert tool_result messages back into readable text for the model.
Anthropic sends tool results as:
{"role": "user", "content": [{"type": "tool_result", "tool_use_id": "...", "content": "..."}]}
OpenAI sends tool results as:
{"role": "tool", "tool_call_id": "...", "content": "..."}
We convert these into human-readable text the model can understand.
"""
converted = []
for msg in messages:
role = msg.get('role', '')
content = msg.get('content', '')
# Handle Anthropic tool_result
if role == 'user' and isinstance(content, list):
text_parts = []
tool_results = []
for block in content:
if isinstance(block, dict):
if block.get('type') == 'tool_result':
tool_use_id = block.get('tool_use_id', '')
result_content = block.get('content', '')
is_error = block.get('is_error', False)
# Handle content that is a list of blocks
if isinstance(result_content, list):
result_text = ""
for rc in result_content:
if isinstance(rc, dict) and rc.get('type') == 'text':
result_text += rc.get('text', '')
elif isinstance(rc, str):
result_text += rc
result_content = result_text
status = "ERROR" if is_error else "SUCCESS"
tool_results.append(f"<tool_result id=\"{tool_use_id}\" status=\"{status}\">\n{result_content}\n</tool_result>")
elif block.get('type') == 'text':
text_parts.append(block.get('text', ''))
elif isinstance(block, str):
text_parts.append(block)
if tool_results:
combined = "\n\n".join(tool_results)
if text_parts:
combined += "\n\n" + "\n".join(text_parts)
converted.append({
"role": "user",
"content": combined
})
else:
converted.append(msg)
# Handle OpenAI tool role
elif role == 'tool':
tool_call_id = msg.get('tool_call_id', '')
converted.append({
"role": "user",
"content": f"<tool_result id=\"{tool_call_id}\" status=\"SUCCESS\">\n{content}\n</tool_result>"
})
# Handle assistant messages with tool_calls (OpenAI format)
elif role == 'assistant' and msg.get('tool_calls'):
# Reconstruct what the assistant said + the tool calls it made
assistant_text = ""
if isinstance(content, str) and content:
assistant_text = content + "\n\n"
for tc in msg.get('tool_calls', []):
func = tc.get('function', {})
tc_id = tc.get('id', '')
try:
args = json.loads(func.get('arguments', '{}'))
except (json.JSONDecodeError, TypeError):
args = func.get('arguments', {})
tool_call_json = json.dumps({
"name": func.get('name', ''),
"id": tc_id,
"input": args
})
assistant_text += f"<tool_call>\n{tool_call_json}\n</tool_call>\n\n"
converted.append({
"role": "assistant",
"content": assistant_text.strip()
})
# Handle assistant messages with tool_use content blocks (Anthropic format)
elif role == 'assistant' and isinstance(content, list):
assistant_text = ""
for block in content:
if isinstance(block, dict):
if block.get('type') == 'text':
assistant_text += block.get('text', '') + "\n\n"
elif block.get('type') == 'tool_use':
tool_call_json = json.dumps({
"name": block.get('name', ''),
"id": block.get('id', ''),
"input": block.get('input', {})
})
assistant_text += f"<tool_call>\n{tool_call_json}\n</tool_call>\n\n"
converted.append({
"role": "assistant",
"content": assistant_text.strip()
})
else:
converted.append(msg)
return converted
# ============== Payload Builders ==============
def build_onyx_payload(messages, model_provider, model_version, temperature, chat_session_id, parent_message_id=None, stream=True, tools=None):
"""Convert OpenAI format to Onyx payload with tool + image support"""
# Extract files from messages BEFORE converting tool results
found_files = extract_files_from_messages(messages, msg_format="openai")
file_descriptors = []
if found_files:
print(f"Found {len(found_files)} file(s) in OpenAI messages, uploading...")
file_descriptors = upload_files_for_session(found_files, chat_session_id)
print(f"Successfully uploaded {len(file_descriptors)} file(s)")
# Convert tool results in message history to text format
processed_messages = convert_tool_results_to_text(messages)
# Extract the last user message
last_user_message = ""
for msg in reversed(processed_messages):
if msg.get('role') == 'user':
content = msg.get('content', '')
if isinstance(content, list):
text_parts = [p.get('text', '') for p in content if isinstance(p, dict) and p.get('type') == 'text']
last_user_message = ' '.join(text_parts)
else:
last_user_message = content
break
# Build system prompt from system messages
system_prompt = ""
for msg in processed_messages:
if msg.get('role') == 'system':
content = msg.get('content', '')
if isinstance(content, list):
text_parts = [p.get('text', '') for p in content if isinstance(p, dict) and p.get('type') == 'text']
system_prompt += ' '.join(text_parts) + "\n"
elif isinstance(content, str):
system_prompt += content + "\n"
# Build conversation history (all messages except last user message)
history_text = ""
for msg in processed_messages[:-1] if processed_messages else []:
role = msg.get('role', '')
content = msg.get('content', '')
if role == 'system':
continue # Already handled
if isinstance(content, list):
text_parts = []
for p in content:
if isinstance(p, dict) and p.get('type') == 'text':
text_parts.append(p.get('text', ''))
content = ' '.join(text_parts)
if content:
history_text += f"[{role}]: {content}\n\n"
# Inject tool definitions into the system prompt
tools_prompt = build_tools_system_prompt(tools, tool_format="openai") if tools else ""
# If files were uploaded, add a note to the message
file_note = ""
if file_descriptors:
file_names = ", ".join([f.get('name', 'file') for f in file_descriptors])
file_note = f"\n[Note: {len(file_descriptors)} file(s) ({file_names}) have been attached to this message. Please analyze them as requested.]\n"
# Construct the full message
full_message = last_user_message
prefix_parts = []
if system_prompt.strip():
prefix_parts.append(f"[System Instructions]\n{system_prompt.strip()}")
if tools_prompt:
prefix_parts.append(tools_prompt)
if history_text.strip():
prefix_parts.append(f"[Conversation History]\n{history_text.strip()}")
if prefix_parts:
full_message = "\n\n".join(prefix_parts) + f"\n\n[Current User Message]\n{last_user_message}"
if file_note:
full_message += file_note
payload = {
"message": full_message,
"chat_session_id": chat_session_id,
"parent_message_id": parent_message_id if parent_message_id else None,
"stream": stream,
"llm_override": {
"model_provider": model_provider,
"model_version": model_version,
"temperature": temperature
},
"file_descriptors": file_descriptors,
"include_citations": False
}
# Remove keys with None values — Onyx rejects/errors on null parent_message_id
payload = {k: v for k, v in payload.items() if v is not None}
return payload
def build_anthropic_payload_from_messages(messages, system_prompt, model_provider, model_version, temperature, chat_session_id, parent_message_id=None, stream=True, tools=None):
"""Convert Anthropic Messages API format to Onyx payload with full tool + image support"""
# Extract files from messages BEFORE converting tool results
found_files = extract_files_from_messages(messages, msg_format="anthropic")
file_descriptors = []
if found_files:
print(f"Found {len(found_files)} file(s) in Anthropic messages, uploading...")
file_descriptors = upload_files_for_session(found_files, chat_session_id)
print(f"Successfully uploaded {len(file_descriptors)} file(s)")
# Convert tool results in message history to text format
processed_messages = convert_tool_results_to_text(messages)
# Extract the last user message
last_user_message = ""
for msg in reversed(processed_messages):
if msg.get('role') == 'user':
content = msg.get('content', '')
if isinstance(content, list):
text_parts = [p.get('text', '') for p in content if isinstance(p, dict) and p.get('type') == 'text']
last_user_message = ' '.join(text_parts)
elif isinstance(content, str):
last_user_message = content
break
# Process system prompt
sys_text = ""
if system_prompt:
if isinstance(system_prompt, list):
sys_text = ' '.join([s.get('text', '') for s in system_prompt if isinstance(s, dict) and s.get('type') == 'text'])
else:
sys_text = system_prompt
# Build conversation history (all messages except last user)
history_text = ""
for msg in processed_messages[:-1] if processed_messages else []:
role = msg.get('role', '')
content = msg.get('content', '')
if isinstance(content, list):
text_parts = []
for p in content:
if isinstance(p, dict) and p.get('type') == 'text':
text_parts.append(p.get('text', ''))
content = ' '.join(text_parts)
if content:
history_text += f"[{role}]: {content}\n\n"
# Build tool prompt
tools_prompt = build_tools_system_prompt(tools, tool_format="anthropic") if tools else ""
# If files were uploaded, add a note
file_note = ""
if file_descriptors:
file_names = ", ".join([f.get('name', 'file') for f in file_descriptors])
file_note = f"\n[Note: {len(file_descriptors)} file(s) ({file_names}) have been attached to this message. Please analyze them as requested.]\n"
# Construct full message
full_message = last_user_message
prefix_parts = []
if sys_text.strip():
prefix_parts.append(f"[System Instructions]\n{sys_text.strip()}")
if tools_prompt:
prefix_parts.append(tools_prompt)
if history_text.strip():
prefix_parts.append(f"[Conversation History]\n{history_text.strip()}")
if prefix_parts:
full_message = "\n\n".join(prefix_parts) + f"\n\n[Current User Message]\n{last_user_message}"
if file_note:
full_message += file_note
payload = {
"message": full_message,
"chat_session_id": chat_session_id,
"parent_message_id": parent_message_id if parent_message_id else None,
"stream": stream,
"llm_override": {
"model_provider": model_provider,
"model_version": model_version,
"temperature": temperature
},
"file_descriptors": file_descriptors,
"include_citations": False
}
# Remove keys with None values — Onyx rejects/errors on null parent_message_id
payload = {k: v for k, v in payload.items() if v is not None}
return payload
# ============== Response Parsers ==============
def parse_onyx_stream_chunk(chunk_text):
"""Parse a chunk from Onyx stream and extract the text content.
Returns:
tuple: (content, message_id, packet_type)
"""
if not chunk_text or not chunk_text.strip():
return None, None, None
try:
data = json.loads(chunk_text)
if not isinstance(data, dict):
return None, None, None
# Handle new packet-based format
if 'obj' in data:
obj = data['obj']
packet_type = obj.get('type', '')
if packet_type == 'message_delta':
content = obj.get('content', '')
return content, None, 'content'
elif packet_type == 'message_start':
return None, None, 'message_start'
elif packet_type == 'stop':
return None, None, 'stop'
elif packet_type == 'error':
error_msg = obj.get('message', obj.get('error', 'Unknown error'))
return f"[Error: {error_msg}]", None, 'error'
elif packet_type == 'citation_delta':
return None, None, 'citation'
elif packet_type in ['reasoning_start', 'reasoning_delta', 'reasoning_done']:
return None, None, 'reasoning'
else:
return None, None, packet_type
# FALLBACK: Old format
message_id = data.get('message_id')
if 'answer_piece' in data:
return data['answer_piece'], message_id, 'legacy'
elif 'text' in data:
return data['text'], message_id, 'legacy'
elif 'content' in data and isinstance(data['content'], str):
return data['content'], message_id, 'legacy'
elif 'error' in data:
return f"[Error: {data['error']}]", message_id, 'error'
return None, None, None
except json.JSONDecodeError:
if chunk_text.strip() and not chunk_text.strip().startswith('{'):
return chunk_text.strip(), None, 'raw'
return None, None, None
# ============== OpenAI Format Streaming & Collection ==============
def generate_openai_stream_chunk(content, model, chunk_id, finish_reason=None, tool_calls=None):
"""Generate an OpenAI-compatible SSE chunk, with optional tool_calls"""
choice = {
"index": 0,
"delta": {},
"finish_reason": finish_reason
}
if content is not None:
choice["delta"]["content"] = content
if tool_calls is not None:
choice["delta"]["tool_calls"] = tool_calls
chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [choice]
}
return f"data: {json.dumps(chunk)}\n\n"
def stream_onyx_response(payload, model, session_key, has_tools=False):
"""Stream response from Onyx API in OpenAI SSE format"""
final_message_id = None
chunk_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
full_content_buffer = "" # Buffer to detect tool calls
endpoints = [
f"{ONYX_BASE_URL}/api/chat/send-chat-message",
f"{ONYX_BASE_URL}/api/chat/send-message",
]
# Initial assistant role chunk
initial_chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {"role": "assistant"},
"finish_reason": None
}]
}
if not has_tools:
# No tools = simple streaming
yield f"data: {json.dumps(initial_chunk)}\n\n"
last_message_id = None
for url in endpoints:
try:
print(f"Trying endpoint: {url}")
with requests.post(url, json=payload, headers=get_headers(), stream=True, timeout=120) as response:
print(f"Response status: {response.status_code}")
if response.status_code != 200:
continue
buffer = ""
for chunk in response.iter_content(decode_unicode=True):
if not chunk:
continue
buffer += chunk
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line or line == "[DONE]":
continue
if line.startswith("data: "):
line = line[6:]
content, msg_id, packet_type = parse_onyx_stream_chunk(line)
if msg_id:
last_message_id = msg_id
if content and packet_type in ['content', 'legacy', 'raw', 'error']:
if has_tools:
# Buffer content for tool call detection
full_content_buffer += content
else:
yield generate_openai_stream_chunk(content, model, chunk_id)
if packet_type == "stop":
final_message_id = last_message_id
break
break
except Exception as e:
print("Stream error:", e)
continue
# Update session
if final_message_id and session_key in chat_sessions_cache:
chat_sessions_cache[session_key]["parent_message_id"] = final_message_id
# If we have tools, parse the buffered content for tool calls
if has_tools and full_content_buffer:
text_content, tool_calls = parse_tool_calls_from_response(full_content_buffer)
if tool_calls:
# Emit initial chunk
yield f"data: {json.dumps(initial_chunk)}\n\n"
# Emit text content if any
if text_content:
yield generate_openai_stream_chunk(text_content, model, chunk_id)
# Emit tool call chunks
for idx, tc in enumerate(tool_calls):
tc_chunk = [{
"index": idx,
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc["input"])
}
}]
yield generate_openai_stream_chunk(None, model, chunk_id, tool_calls=tc_chunk)
yield generate_openai_stream_chunk("", model, chunk_id, "tool_calls")
else:
# No tool calls detected, emit as normal text
yield f"data: {json.dumps(initial_chunk)}\n\n"
yield generate_openai_stream_chunk(full_content_buffer, model, chunk_id)
yield generate_openai_stream_chunk("", model, chunk_id, "stop")
else:
yield generate_openai_stream_chunk("", model, chunk_id, "stop")
yield "data: [DONE]\n\n"
def collect_full_response(payload, model, session_key, has_tools=False):
"""Collect full streaming response and return as complete OpenAI response"""
full_content = ""
last_message_id = None
endpoints = [
f"{ONYX_BASE_URL}/api/chat/send-chat-message",
f"{ONYX_BASE_URL}/api/chat/send-message",
]
for url in endpoints:
try:
print(f"Trying endpoint: {url}")
is_streaming_request = payload.get('stream', False)
with requests.post(url, json=payload, headers=get_headers(), stream=is_streaming_request, timeout=120) as response:
print(f"Response status: {response.status_code}")
if response.status_code == 404:
continue
if response.status_code != 200:
error_text = response.text
print(f"Error response from {url}: {response.status_code} - {error_text}")
continue # Try next endpoint
if not is_streaming_request:
try:
data = response.json()
full_content = data.get('answer') or data.get('message') or data.get('content') or ""
msg_id = data.get('message_id')
if session_key in chat_sessions_cache and msg_id:
chat_sessions_cache[session_key]['parent_message_id'] = msg_id
break
except json.JSONDecodeError:
full_content = response.text
break
else:
buffer = ""
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
if chunk:
buffer += chunk
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line:
continue
if line.startswith('data: '):
line = line[6:]
if line == '[DONE]':
continue
content, msg_id, packet_type = parse_onyx_stream_chunk(line)
if msg_id:
last_message_id = msg_id
if packet_type == 'stop':
break
if content and packet_type in ['content', 'legacy', 'raw', 'error']:
full_content += content
if buffer.strip():
if buffer.strip().startswith('data: '):
buffer = buffer.strip()[6:]
content, msg_id, packet_type = parse_onyx_stream_chunk(buffer.strip())
if msg_id:
last_message_id = msg_id
if content and packet_type in ['content', 'legacy', 'raw', 'error']:
full_content += content
if session_key in chat_sessions_cache and last_message_id:
chat_sessions_cache[session_key]['parent_message_id'] = last_message_id
break
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
continue
if not full_content:
return {
"error": {
"message": "No response from Onyx API",
"type": "api_error",
"code": 500
}
}, 500
# Parse for tool calls if tools were provided
if has_tools:
text_content, tool_calls = parse_tool_calls_from_response(full_content)
if tool_calls:
# Build response with tool_calls
message = {
"role": "assistant",
"content": text_content if text_content else None,
"tool_calls": []
}
for idx, tc in enumerate(tool_calls):
message["tool_calls"].append({
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc["input"])
}
})
response_data = {
"id": f"chatcmpl-{uuid.uuid4().hex[:24]}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"message": message,
"finish_reason": "tool_calls"
}],
"usage": {
"prompt_tokens": -1,
"completion_tokens": -1,
"total_tokens": -1
}
}
return response_data, 200
# Standard text response
response_data = {
"id": f"chatcmpl-{uuid.uuid4().hex[:24]}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": full_content
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": -1,
"completion_tokens": -1,
"total_tokens": -1
}
}
return response_data, 200
# ============== Anthropic Format Streaming & Collection ==============
def generate_anthropic_stream_events(payload, model, session_key, has_tools=False):
"""Stream response from Onyx in Anthropic Messages SSE format with tool support"""
msg_id = f"msg_{uuid.uuid4().hex[:24]}"
final_message_id = None
full_content_buffer = ""
endpoints = [
f"{ONYX_BASE_URL}/api/chat/send-chat-message",
f"{ONYX_BASE_URL}/api/chat/send-message",
]
last_msg_id = None
for url in endpoints:
try:
with requests.post(url, json=payload, headers=get_headers(), stream=True, timeout=120) as response:
if response.status_code != 200:
continue
buffer = ""
for chunk in response.iter_content(decode_unicode=True):
if not chunk:
continue
buffer += chunk
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line or line == "[DONE]":
continue
if line.startswith("data: "):
line = line[6:]
content, m_id, packet_type = parse_onyx_stream_chunk(line)
if m_id:
last_msg_id = m_id
if content and packet_type in ['content', 'legacy', 'raw', 'error']:
full_content_buffer += content
if packet_type == "stop":
final_message_id = last_msg_id
break
break
except Exception as e:
print(f"Anthropic stream error: {e}")
continue
# Update session
if final_message_id and session_key in chat_sessions_cache:
chat_sessions_cache[session_key]["parent_message_id"] = final_message_id
# Now build the proper Anthropic SSE response
text_content, tool_calls = ("", [])
if has_tools and full_content_buffer:
text_content, tool_calls = parse_tool_calls_from_response(full_content_buffer)
else:
text_content = full_content_buffer
# Determine stop reason
stop_reason = "tool_use" if tool_calls else "end_turn"
# Build content blocks
content_blocks = []
if text_content:
content_blocks.append({"type": "text", "text": text_content})
for tc in tool_calls:
content_blocks.append({
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": tc["input"]
})
# If no content at all, add empty text
if not content_blocks:
content_blocks.append({"type": "text", "text": ""})
# message_start
msg_start = {
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"content": [],
"model": model,
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0}
}
}
yield f"event: message_start\ndata: {json.dumps(msg_start)}\n\n"
# Ping
yield f"event: ping\ndata: {json.dumps({'type': 'ping'})}\n\n"
# Emit content blocks
block_index = 0
for block in content_blocks:
if block["type"] == "text":
# content_block_start
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': {'type': 'text', 'text': ''}})}\n\n"
# Stream text in chunks for better UX
text = block["text"]
chunk_size = 50 # characters per chunk
for i in range(0, len(text), chunk_size):
text_chunk = text[i:i + chunk_size]
delta_event = {
"type": "content_block_delta",
"index": block_index,
"delta": {"type": "text_delta", "text": text_chunk}
}
yield f"event: content_block_delta\ndata: {json.dumps(delta_event)}\n\n"
# content_block_stop
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n"
block_index += 1
elif block["type"] == "tool_use":
# content_block_start for tool_use
yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': {'type': 'tool_use', 'id': block['id'], 'name': block['name'], 'input': {}}})}\n\n"
# Send tool input as delta
input_json = json.dumps(block["input"])
delta_event = {
"type": "content_block_delta",
"index": block_index,
"delta": {"type": "input_json_delta", "partial_json": input_json}
}
yield f"event: content_block_delta\ndata: {json.dumps(delta_event)}\n\n"
# content_block_stop
yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n"
block_index += 1
# message_delta (stop reason)
msg_delta = {
"type": "message_delta",
"delta": {"stop_reason": stop_reason, "stop_sequence": None},
"usage": {"output_tokens": 0}
}
yield f"event: message_delta\ndata: {json.dumps(msg_delta)}\n\n"
# message_stop
yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n"
def collect_anthropic_full_response(payload, model, session_key, has_tools=False):
"""Collect full response and return in Anthropic Messages format with tool support"""
full_content = ""
last_message_id = None
endpoints = [
f"{ONYX_BASE_URL}/api/chat/send-chat-message",
f"{ONYX_BASE_URL}/api/chat/send-message",
]
for url in endpoints:
try:
is_streaming_request = payload.get('stream', False)
with requests.post(url, json=payload, headers=get_headers(), stream=is_streaming_request, timeout=120) as response:
if response.status_code == 404:
continue
if response.status_code != 200:
print(f"Error response from {url}: {response.status_code} - {response.text}")
continue # Try next endpoint
if not is_streaming_request:
try:
data = response.json()
full_content = data.get('answer') or data.get('message') or data.get('content') or ""
msg_id = data.get('message_id')
if session_key in chat_sessions_cache and msg_id:
chat_sessions_cache[session_key]['parent_message_id'] = msg_id
break
except json.JSONDecodeError:
full_content = response.text
break
else:
buffer = ""
for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
if chunk:
buffer += chunk
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line:
continue
if line.startswith('data: '):
line = line[6:]
if line == '[DONE]':
continue
content, msg_id, packet_type = parse_onyx_stream_chunk(line)
if msg_id:
last_message_id = msg_id
if packet_type == 'stop':
break
if content and packet_type in ['content', 'legacy', 'raw', 'error']:
full_content += content
if session_key in chat_sessions_cache and last_message_id:
chat_sessions_cache[session_key]['parent_message_id'] = last_message_id
break
except requests.exceptions.RequestException as e:
print(f"Anthropic request error: {e}")
continue
if not full_content:
return {
"type": "error",
"error": {
"type": "api_error",
"message": "No response from Onyx API"
}
}, 500
# Parse for tool calls
content_blocks = []
stop_reason = "end_turn"
if has_tools:
text_content, tool_calls = parse_tool_calls_from_response(full_content)
if text_content:
content_blocks.append({"type": "text", "text": text_content})
if tool_calls:
stop_reason = "tool_use"
for tc in tool_calls:
content_blocks.append({
"type": "tool_use",
"id": tc["id"],
"name": tc["name"],
"input": tc["input"]
})
else:
content_blocks.append({"type": "text", "text": full_content})
if not content_blocks:
content_blocks.append({"type": "text", "text": ""})
response_data = {
"id": f"msg_{uuid.uuid4().hex[:24]}",
"type": "message",
"role": "assistant",
"content": content_blocks,
"model": model,
"stop_reason": stop_reason,
"stop_sequence": None,
"usage": {
"input_tokens": 0,
"output_tokens": 0
}
}
return response_data, 200
# ============== API Routes ==============
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
"""OpenAI-compatible chat completions endpoint with full tool/function calling support"""
try:
data = request.json
print(f"Received request: {json.dumps(data, indent=2)[:1000]}")
except Exception as e:
return jsonify({"error": {"message": f"Invalid JSON: {e}", "type": "invalid_request_error"}}), 400
# Extract parameters
model = data.get('model', 'openai/gpt-4')
messages = data.get('messages', [])
stream = data.get('stream', False)
temperature = data.get('temperature', 0.7)
tools = data.get('tools', None)
functions = data.get('functions', None) # Legacy function calling
# Convert legacy functions to tools format
if functions and not tools:
tools = [{"type": "function", "function": f} for f in functions]
has_tools = bool(tools)
session_key = data.get('session_id', 'default')
if not messages:
return jsonify({
"error": {
"message": "messages is required",
"type": "invalid_request_error"
}
}), 400
# Parse model string and normalize provider name
model_provider, model_version = parse_model_string(model)
model_provider = normalize_provider_name(model_provider)
print(f"Model provider: {model_provider}, version: {model_version}")
# Get or create chat session
session_info = get_or_create_session(session_key)
if not session_info:
return jsonify({
"error": {
"message": "Failed to create chat session with Onyx API",
"type": "api_error"
}
}), 500
# Build Onyx payload
payload = build_onyx_payload(
messages=messages,
model_provider=model_provider,
model_version=model_version,
temperature=temperature,
chat_session_id=session_info['session_id'],
parent_message_id=session_info.get('parent_message_id'),
stream=stream,
tools=tools
)
if stream:
return Response(
stream_onyx_response(payload, model, session_key, has_tools=has_tools),
content_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
}
)
else:
response_data, status_code = collect_full_response(payload, model, session_key, has_tools=has_tools)
return jsonify(response_data), status_code
@app.route('/v1/sessions', methods=['POST'])
def create_new_session():
"""Create a new chat session"""
data = request.json or {}
persona_id = data.get('persona_id', 0)
session_id = create_chat_session(persona_id)
if session_id:
session_key = str(uuid.uuid4())
chat_sessions_cache[session_key] = {
"session_id": session_id,
"parent_message_id": None
}
return jsonify({
"session_key": session_key,
"chat_session_id": session_id
})
else:
return jsonify({"error": "Failed to create session"}), 500
@app.route('/v1/sessions', methods=['DELETE'])
def clear_sessions():
"""Clear all cached sessions"""
chat_sessions_cache.clear()
return jsonify({"status": "cleared"})
@app.route('/v1/models', methods=['GET'])
def list_models():
"""OpenAI-compatible models listing endpoint"""
models = [
{"id": "openai/gpt-5.2", "object": "model", "owned_by": "openai"},
{"id": "google/gemini-3-pro-preview", "object": "model", "owned_by": "openai"},
{"id": "openai/gpt-4o", "object": "model", "owned_by": "openai"},
{"id": "anthropic/claude-opus-4-6", "object": "model", "owned_by": "anthropic"},
{"id": "anthropic/claude-sonnet-4.5", "object": "model", "owned_by": "anthropic"},
{"id": "anthropic/claude-haiku-4-5", "object": "model", "owned_by": "anthropic"},
]
return jsonify({
"object": "list",
"data": models
})
@app.route('/v1/models/<path:model_id>', methods=['GET'])
def get_model(model_id):
"""OpenAI-compatible model details endpoint"""
return jsonify({
"id": model_id,
"object": "model",
"owned_by": model_id.split('/')[0] if '/' in model_id else "unknown"
})
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
"status": "healthy",
"timestamp": int(time.time()),
"active_sessions": len(chat_sessions_cache)
})
@app.route('/debug/test-onyx', methods=['GET'])
def test_onyx_connection():
"""Test connection to Onyx API"""
results = {}
session_id = create_chat_session()
results['create_session'] = {
"success": session_id is not None,
"session_id": session_id
}
return jsonify(results)
# ============== File Upload API ==============
@app.route('/v1/files', methods=['POST'])
def upload_file():
"""OpenAI-compatible file upload endpoint.
Accepts multipart form data with:
- file: The file to upload
- purpose: The purpose of the file (e.g., 'assistants', 'vision', 'fine-tune')
Also accepts JSON with base64 data:
- file_data: base64-encoded file content
- filename: name of the file
- purpose: purpose string
"""
try:
file_id = f"file-{uuid.uuid4().hex[:24]}"
# Check if it's a multipart upload
if 'file' in request.files:
uploaded_file = request.files['file']
purpose = request.form.get('purpose', 'assistants')
filename = uploaded_file.filename or f"upload_{uuid.uuid4().hex[:8]}"
file_bytes = uploaded_file.read()
media_type = uploaded_file.content_type or mimetypes.guess_type(filename)[0] or 'application/octet-stream'
# Check if it's a JSON upload with base64
elif request.is_json:
data = request.json
purpose = data.get('purpose', 'assistants')
filename = data.get('filename', f"upload_{uuid.uuid4().hex[:8]}")
file_data = data.get('file_data', '')
media_type = data.get('content_type', mimetypes.guess_type(filename)[0] or 'application/octet-stream')
try:
file_bytes = base64.b64decode(file_data)
except Exception as e:
return jsonify({"error": {"message": f"Invalid base64 file_data: {e}", "type": "invalid_request_error"}}), 400
else:
return jsonify({"error": {"message": "No file provided. Use multipart form with 'file' field or JSON with 'file_data' base64 field.", "type": "invalid_request_error"}}), 400
file_size = len(file_bytes)
print(f"Uploading file: {filename} ({file_size} bytes, {media_type})")
# Try to upload to Onyx
onyx_file_id = None
upload_endpoints = [
f"{ONYX_BASE_URL}/api/chat/file",
f"{ONYX_BASE_URL}/api/chat/upload-file",
f"{ONYX_BASE_URL}/api/manage/upload-file",
]
headers = {
"Authorization": f"Bearer {ONYX_API_TOKEN}",
}
for url in upload_endpoints:
try:
files = {
'file': (filename, file_bytes, media_type)
}
resp = requests.post(url, files=files, headers=headers, timeout=60)
if resp.status_code == 200:
result = resp.json()
onyx_file_id = result.get('file_id') or result.get('id') or result.get('document_id')
print(f"Uploaded to Onyx: {onyx_file_id}")
break
elif resp.status_code == 404:
continue
else:
print(f"Upload to {url} failed: {resp.status_code} - {resp.text}")
continue
except Exception as e:
print(f"Upload error at {url}: {e}")
continue
# Store in local cache
file_record = {
"id": file_id,
"object": "file",
"bytes": file_size,
"created_at": int(time.time()),
"filename": filename,
"purpose": purpose,
"status": "processed",
"status_details": None,
"content_type": media_type,
"onyx_file_id": onyx_file_id,
}
# Store file bytes for retrieval
file_record["_data"] = base64.b64encode(file_bytes).decode('utf-8')
files_cache[file_id] = file_record
# Return OpenAI-compatible response (without internal _data)
response = {k: v for k, v in file_record.items() if not k.startswith('_')}
return jsonify(response), 200
except Exception as e:
print(f"File upload error: {e}")
return jsonify({"error": {"message": f"File upload failed: {str(e)}", "type": "api_error"}}), 500
@app.route('/v1/files', methods=['GET'])
def list_files():
"""OpenAI-compatible list files endpoint"""
purpose = request.args.get('purpose', None)
files_list = []
for fid, record in files_cache.items():
if purpose and record.get('purpose') != purpose:
continue
files_list.append({k: v for k, v in record.items() if not k.startswith('_')})
return jsonify({
"object": "list",
"data": files_list
})
@app.route('/v1/files/<file_id>', methods=['GET'])
def retrieve_file(file_id):
"""OpenAI-compatible retrieve file endpoint"""
if file_id not in files_cache:
return jsonify({"error": {"message": f"No file with id '{file_id}' found", "type": "invalid_request_error"}}), 404
record = files_cache[file_id]
response = {k: v for k, v in record.items() if not k.startswith('_')}
return jsonify(response)
@app.route('/v1/files/<file_id>', methods=['DELETE'])
def delete_file(file_id):
"""OpenAI-compatible delete file endpoint"""
if file_id not in files_cache:
return jsonify({"error": {"message": f"No file with id '{file_id}' found", "type": "invalid_request_error"}}), 404
del files_cache[file_id]
return jsonify({
"id": file_id,
"object": "file",
"deleted": True
})
@app.route('/v1/files/<file_id>/content', methods=['GET'])
def retrieve_file_content(file_id):
"""OpenAI-compatible retrieve file content endpoint"""
if file_id not in files_cache:
return jsonify({"error": {"message": f"No file with id '{file_id}' found", "type": "invalid_request_error"}}), 404
record = files_cache[file_id]
b64_data = record.get('_data', '')
if not b64_data:
return jsonify({"error": {"message": "File content not available", "type": "api_error"}}), 404
file_bytes = base64.b64decode(b64_data)
return Response(
file_bytes,
mimetype=record.get('content_type', 'application/octet-stream'),
headers={
'Content-Disposition': f'attachment; filename="{record.get("filename", "file")}"'
}
)
@app.route('/upload', methods=['POST'])
def simple_upload():
"""Simple file upload endpoint (non-OpenAI format) for direct uploads.
Accepts multipart form data with a 'file' field.
Optionally include 'chat_session_id' to associate with a session.
"""
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
uploaded_file = request.files['file']
filename = uploaded_file.filename or f"upload_{uuid.uuid4().hex[:8]}"
file_bytes = uploaded_file.read()
media_type = uploaded_file.content_type or mimetypes.guess_type(filename)[0] or 'application/octet-stream'
chat_session_id = request.form.get('chat_session_id', None)
# Upload to Onyx if session provided
onyx_result = None
if chat_session_id:
file_data = {
"base64_data": base64.b64encode(file_bytes).decode('utf-8'),
"media_type": media_type,
"filename": filename,
"type": "image" if media_type.startswith('image/') else "document"
}
onyx_result = upload_file_to_onyx(file_data, chat_session_id)
# Store locally
file_id = f"file-{uuid.uuid4().hex[:24]}"
file_record = {
"id": file_id,
"filename": filename,
"size": len(file_bytes),
"content_type": media_type,
"uploaded_at": int(time.time()),
"onyx_descriptor": onyx_result,
"_data": base64.b64encode(file_bytes).decode('utf-8')
}
files_cache[file_id] = file_record
response = {k: v for k, v in file_record.items() if not k.startswith('_')}
return jsonify(response), 200
# ============== Anthropic Messages API ==============
@app.route('/v1/messages', methods=['POST'])
def anthropic_messages():
"""Anthropic Messages API compatible endpoint — used by Claude Code, with full tool support"""
try:
data = request.json
print(f"[Anthropic] Received request: {json.dumps(data, indent=2)[:1000]}")
except Exception as e:
return jsonify({
"type": "error",
"error": {"type": "invalid_request_error", "message": f"Invalid JSON: {e}"}
}), 400
# Extract Anthropic parameters
model = data.get('model', 'claude-opus-4-6')
messages = data.get('messages', [])
system_prompt = data.get('system', '')
stream = data.get('stream', False)
temperature = data.get('temperature', 0.7)
max_tokens = data.get('max_tokens', 4096)
tools = data.get('tools', None)
has_tools = bool(tools)
session_key = f"anthropic_{model}"
if not messages:
return jsonify({
"type": "error",
"error": {"type": "invalid_request_error", "message": "messages is required"}
}), 400
# Parse model
if '/' not in model:
full_model = f"anthropic/{model}"
else:
full_model = model
model_provider, model_version = parse_model_string(full_model)
model_provider = normalize_provider_name(model_provider)
print(f"[Anthropic] Provider: {model_provider}, Version: {model_version}")
if has_tools:
print(f"[Anthropic] Tools provided: {[t.get('name', '?') for t in tools]}")
# Get or create session
session_info = get_or_create_session(session_key)
if not session_info:
return jsonify({
"type": "error",
"error": {"type": "api_error", "message": "Failed to create chat session"}
}), 500
# Build Onyx payload
payload = build_anthropic_payload_from_messages(
messages=messages,
system_prompt=system_prompt,
model_provider=model_provider,
model_version=model_version,
temperature=temperature,
chat_session_id=session_info['session_id'],
parent_message_id=session_info.get('parent_message_id'),
stream=stream,
tools=tools
)
if stream:
return Response(
generate_anthropic_stream_events(payload, model, session_key, has_tools=has_tools),
content_type='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
}
)
else:
response_data, status_code = collect_anthropic_full_response(payload, model, session_key, has_tools=has_tools)
return jsonify(response_data), status_code
@app.route('/', methods=['GET'])
def root():
"""Root endpoint with a premium Chat UI"""
html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Onyx AI Proxy | Premium Chat</title>
<link href="https://fonts.googleapis.com/css2?family=Outfit:wght@300;400;600&family=Inter:wght@400;500&display=swap" rel="stylesheet">
<style>
:root {
--bg: #0f172a;
--panel: #1e293b;
--accent: #38bdf8;
--accent-glow: rgba(56, 189, 248, 0.3);
--text: #f8fafc;
--text-muted: #94a3b8;
--user-msg: #334155;
--ai-msg: #1e293b;
--border: rgba(255, 255, 255, 0.1);
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: 'Inter', sans-serif;
background: var(--bg);
color: var(--text);
height: 100vh;
display: flex;
flex-direction: column;
overflow: hidden;
}
h1, h2, h3 { font-family: 'Outfit', sans-serif; }
/* --- Header --- */
header {
padding: 1.5rem 2rem;
display: flex;
justify-content: space-between;
align-items: center;
border-bottom: 1px solid var(--border);
background: rgba(15, 23, 42, 0.8);
backdrop-filter: blur(12px);
z-index: 100;
}
.logo {
font-size: 1.5rem;
font-weight: 600;
display: flex;
align-items: center;
gap: 0.75rem;
color: var(--accent);
}
.status {
font-size: 0.85rem;
color: var(--text-muted);
display: flex;
align-items: center;
gap: 0.5rem;
}
.status-dot {
width: 8px;
height: 8px;
background: #22c55e;
border-radius: 50%;
box-shadow: 0 0 8px #22c55e;
}
/* --- Chat Container --- */
#chat-container {
flex: 1;
overflow-y: auto;
padding: 2rem;
display: flex;
flex-direction: column;
gap: 1.5rem;
scroll-behavior: smooth;
}
.message {
max-width: 80%;
padding: 1rem 1.25rem;
border-radius: 1rem;
line-height: 1.6;
position: relative;
animation: fadeIn 0.3s ease-out;
}
@keyframes fadeIn {
from { opacity: 0; transform: translateY(10px); }
to { opacity: 1; transform: translateY(0); }
}
.user-message {
align-self: flex-end;
background: var(--user-msg);
border-bottom-right-radius: 0.25rem;
}
.ai-message {
align-self: flex-start;
background: var(--ai-msg);
border: 1px solid var(--border);
border-bottom-left-radius: 0.25rem;
}
.message-content {
white-space: pre-wrap;
}
.attachment-preview {
display: flex;
flex-wrap: wrap;
gap: 0.5rem;
margin-top: 0.75rem;
}
.preview-item {
width: 100px;
height: 100px;
border-radius: 0.5rem;
object-fit: cover;
border: 1px solid var(--border);
}
.file-pill {
background: rgba(255, 255, 255, 0.05);
padding: 0.35rem 0.75rem;
border-radius: 2rem;
font-size: 0.8rem;
display: flex;
align-items: center;
gap: 0.5rem;
border: 1px solid var(--border);
}
/* --- Input Area --- */
.input-wrapper {
padding: 2rem;
background: linear-gradient(to top, var(--bg), transparent);
}
.input-panel {
max-width: 900px;
margin: 0 auto;
background: var(--panel);
border: 1px solid var(--border);
border-radius: 1.25rem;
padding: 0.75rem;
box-shadow: 0 20px 25px -5px rgba(0, 0, 0, 0.3);
display: flex;
flex-direction: column;
gap: 0.75rem;
}
.attachments-bar {
display: flex;
gap: 0.5rem;
flex-wrap: wrap;
padding: 0 0.5rem;
}
.attachment-tag {
background: var(--bg);
border: 1px solid var(--border);
padding: 0.25rem 0.6rem;
border-radius: 0.5rem;
font-size: 0.75rem;
display: flex;
align-items: center;
gap: 0.4rem;
}
.attachment-tag button {
background: none;
border: none;
color: var(--text-muted);
cursor: pointer;
font-size: 1rem;
}
.input-controls {
display: flex;
align-items: center;
gap: 0.75rem;
}
textarea {
flex: 1;
background: none;
border: none;
color: var(--text);
font-family: inherit;
font-size: 1rem;
padding: 0.5rem;
resize: none;
max-height: 200px;
outline: none;
}
.btn-icon {
width: 40px;
height: 40px;
display: flex;
align-items: center;
justify-content: center;
border-radius: 0.75rem;
cursor: pointer;
transition: all 0.2s;
color: var(--text-muted);
background: rgba(255, 255, 255, 0.05);
border: 1px solid var(--border);
}
.btn-icon:hover {
color: var(--text);
background: rgba(255, 255, 255, 0.1);
}
#send-btn {
background: var(--accent);
color: var(--bg);
border: none;
}
#send-btn:hover {
box-shadow: 0 0 15px var(--accent-glow);
transform: scale(1.05);
}
/* --- Scrollbar --- */
::-webkit-scrollbar { width: 6px; }
::-webkit-scrollbar-track { background: transparent; }
::-webkit-scrollbar-thumb { background: var(--border); border-radius: 10px; }
</style>
</head>
<body>
<header>
<div class="logo">
<svg width="24" height="24" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round"><path d="m12 14 4-4"/><path d="M3.34 19a10 10 0 1 1 17.32 0"/><path d="m9.05 14.87 2.22.45c.42.08.86-.14 1-.55l2.46-7.09a.48.48 0 0 0-.54-.64l-7.09 2.46a.48.48 0 0 0-.1 0 1 1 0 0 0-.45 1l.45 2.22"/></svg>
<span>Onyx Edge</span>
</div>
<div class="status">
<div class="status-dot"></div>
<span>System Active</span>
</div>
</header>
<main id="chat-container">
<div class="message ai-message">
<div class="message-content">Welcome to the Onyx edge proxy. Upload files or images and I'll analyze them for you. How can I help today?</div>
</div>
</main>
<div class="input-wrapper">
<div class="input-panel">
<div id="attachments-bar" class="attachments-bar"></div>
<div class="input-controls">
<label class="btn-icon" title="Upload Files">
<svg width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round"><path d="m21.44 11.05-9.19 9.19a6 6 0 0 1-8.49-8.49l8.57-8.57A4 4 0 1 1 18 8.84l-8.59 8.51a2 2 0 0 1-2.83-2.83l8.49-8.48"/></svg>
<input type="file" id="file-input" multiple hidden>
</label>
<textarea id="user-input" placeholder="Type a message or drop files..." rows="1"></textarea>
<button id="send-btn" class="btn-icon">
<svg width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2.5" stroke-linecap="round" stroke-linejoin="round"><line x1="22" y1="2" x2="11" y2="13"/><polyline points="22 2 15 22 11 13 2 9 22 2"/></svg>
</button>
</div>
</div>
</div>
<script>
const chatContainer = document.getElementById('chat-container');
const userInput = document.getElementById('user-input');
const sendBtn = document.getElementById('send-btn');
const fileInput = document.getElementById('file-input');
const attachmentsBar = document.getElementById('attachments-bar');
let attachments = [];
// Auto-expand textarea
userInput.addEventListener('input', () => {
userInput.style.height = 'auto';
userInput.style.height = userInput.scrollHeight + 'px';
});
// Handle file selection
fileInput.addEventListener('change', async (e) => {
const files = Array.from(e.target.files);
for (const file of files) {
await uploadFile(file);
}
fileInput.value = '';
});
async function uploadFile(file) {
const formData = new FormData();
formData.append('file', file);
// Show loading placeholder if possible
const tag = createAttachmentTag(file.name, true);
attachmentsBar.appendChild(tag);
try {
const resp = await fetch('/v1/files', {
method: 'POST',
body: formData
});
const data = await resp.json();
if (data.id) {
attachments.push({
id: data.id,
name: file.name,
type: file.type,
tag: tag
});
tag.classList.remove('loading');
tag.innerHTML = `<span>${file.name}</span><button onclick="removeAttachment('${data.id}')">&times;</button>`;
}
} catch (err) {
console.error('Upload failed', err);
tag.remove();
}
}
function createAttachmentTag(name, loading) {
const div = document.createElement('div');
div.className = 'attachment-tag' + (loading ? ' loading' : '');
div.innerHTML = `<span>${name}</span>`;
return div;
}
window.removeAttachment = (id) => {
const idx = attachments.findIndex(a => a.id === id);
if (idx > -1) {
attachments[idx].tag.remove();
attachments.splice(idx, 1);
}
};
// Send Message
async function sendMessage() {
const text = userInput.value.trim();
if (!text && attachments.length === 0) return;
// Add user message to UI
const userMsgDiv = addMessage(text, 'user', attachments);
// Clear input
userInput.value = '';
userInput.style.height = 'auto';
const currentAttachments = [...attachments];
attachments = [];
attachmentsBar.innerHTML = '';
// AI placeholder
const aiMsgDiv = addMessage('Processing...', 'ai');
const aiContent = aiMsgDiv.querySelector('.message-content');
try {
// Build content blocks
const contentBlocks = [{ type: 'text', text: text }];
currentAttachments.forEach(att => {
if (att.type.startsWith('image/')) {
contentBlocks.push({
type: 'image_url',
image_url: { url: att.id }
});
} else {
contentBlocks.push({
type: 'file',
file_id: att.id
});
}
});
const response = await fetch('/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: contentBlocks }],
stream: true
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
aiContent.innerText = '';
let fullText = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const dataStr = line.slice(6);
if (dataStr === '[DONE]') continue;
try {
const json = JSON.parse(dataStr);
const delta = json.choices[0].delta.content;
if (delta) {
fullText += delta;
aiContent.innerText = fullText;
chatContainer.scrollTop = chatContainer.scrollHeight;
}
} catch (e) {}
}
}
}
} catch (err) {
aiContent.innerText = 'Error: ' + err.message;
}
}
function addMessage(text, role, attachments = []) {
const div = document.createElement('div');
div.className = `message ${role}-message`;
let html = `<div class="message-content">${text}</div>`;
if (attachments.length > 0) {
html += `<div class="attachment-preview">`;
attachments.forEach(att => {
if (att.type.startsWith('image/')) {
// We'd need a temp URL for local preview, but for now just show filename
html += `<div class="file-pill">🖼️ ${att.name}</div>`;
} else {
html += `<div class="file-pill">📄 ${att.name}</div>`;
}
});
html += `</div>`;
}
div.innerHTML = html;
chatContainer.appendChild(div);
chatContainer.scrollTop = chatContainer.scrollHeight;
return div;
}
sendBtn.addEventListener('click', sendMessage);
userInput.addEventListener('keydown', (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
});
</script>
</body>
</html>
"""
return render_template_string(html)
# ============== Error Handlers ==============
@app.errorhandler(404)
def not_found(e):
return jsonify({
"error": {
"message": "Endpoint not found",
"type": "invalid_request_error",
"code": 404
}
}), 404
@app.errorhandler(500)
def server_error(e):
return jsonify({
"error": {
"message": f"Internal server error: {str(e)}",
"type": "server_error",
"code": 500
}
}), 500
if __name__ == '__main__':
print("=" * 60)
print("OpenAI + Anthropic Compatible Onyx API Proxy v3.1")
print("Full Function/Tool Calling + File Upload Support")
print("=" * 60)
print(f"Onyx Base URL: {ONYX_BASE_URL}")
print(f"Token configured: {'Yes' if ONYX_API_TOKEN != '<your-token-here>' else 'No'}")
print("=" * 60)
app.run(
host='0.0.0.0',
port=7860,
debug=True,
threaded=True
)