replcitae / app.py
Samfy001's picture
Update app.py
32c3c8a verified
import base64 as _b64, json as _j, time as _t, uuid as _u, logging as _l, traceback as _tb, os as _o
from fastapi import FastAPI as _FA, HTTPException as _HE
from fastapi.responses import StreamingResponse as _SR, JSONResponse as _JR
from pydantic import BaseModel as _BM, Field as _F
from typing import List as _L, Optional as _O, Dict as _D, Any as _A, Union as _U
import replicate as _r
from contextlib import asynccontextmanager as _acm
# Obfuscated configuration
_l.basicConfig(level=_l.INFO)
_lg = _l.getLogger(__name__)
_TOKEN = _b64.b64decode(b'cjhfWDdxeVpLTkZLZlZpUWdRaDJJcUhIa1BmdkFqRGhqSzFBWVl0Yw==').decode('utf-8')
# Supported models configuration
_MODELS = {
# Anthropic Claude Models
"claude-4-sonnet": "anthropic/claude-4-sonnet",
"claude-3.7-sonnet": "anthropic/claude-3.7-sonnet",
"claude-3.5-sonnet": "anthropic/claude-3.5-sonnet",
"claude-3.5-haiku": "anthropic/claude-3.5-haiku",
# OpenAI GPT Models
"gpt-4.1": "openai/gpt-4.1",
"gpt-4.1-mini": "openai/gpt-4.1-mini",
"gpt-4.1-nano": "openai/gpt-4.1-nano",
"gpt-5": "openai/gpt-5",
"gpt-5-mini": "openai/gpt-5-mini",
"gpt-5-nano": "openai/gpt-5-nano",
# Alternative naming (with provider prefix)
"anthropic/claude-4-sonnet": "anthropic/claude-4-sonnet",
"anthropic/claude-3.7-sonnet": "anthropic/claude-3.7-sonnet",
"anthropic/claude-3.5-sonnet": "anthropic/claude-3.5-sonnet",
"anthropic/claude-3.5-haiku": "anthropic/claude-3.5-haiku",
"openai/gpt-4.1": "openai/gpt-4.1",
"openai/gpt-4.1-mini": "openai/gpt-4.1-mini",
"openai/gpt-4.1-nano": "openai/gpt-4.1-nano",
"openai/gpt-5": "openai/gpt-5",
"openai/gpt-5-mini": "openai/gpt-5-mini",
"openai/gpt-5-nano": "openai/gpt-5-nano"
}
# Model metadata for OpenAI compatibility
_MODEL_INFO = {
"claude-4-sonnet": {"owned_by": "anthropic", "context_length": 200000},
"claude-3.7-sonnet": {"owned_by": "anthropic", "context_length": 200000},
"claude-3.5-sonnet": {"owned_by": "anthropic", "context_length": 200000},
"claude-3.5-haiku": {"owned_by": "anthropic", "context_length": 200000},
"gpt-4.1": {"owned_by": "openai", "context_length": 128000},
"gpt-4.1-mini": {"owned_by": "openai", "context_length": 128000},
"gpt-4.1-nano": {"owned_by": "openai", "context_length": 128000},
"gpt-5": {"owned_by": "openai", "context_length": 400000},
"gpt-5-mini": {"owned_by": "openai", "context_length": 400000},
"gpt-5-nano": {"owned_by": "openai", "context_length": 400000}
}
# OpenAI Compatible Models
class _CM(_BM):
role: str = _F(..., description="Message role")
content: _O[_U[str, _L[_D[str, _A]]]] = _F(None, description="Message content")
name: _O[str] = _F(None, description="Message name")
function_call: _O[_D[str, _A]] = _F(None, description="Function call")
tool_calls: _O[_L[_D[str, _A]]] = _F(None, description="Tool calls")
tool_call_id: _O[str] = _F(None, description="Tool call ID")
class _FC(_BM):
name: str = _F(..., description="Function name")
arguments: str = _F(..., description="Function arguments")
class _TC(_BM):
id: str = _F(..., description="Tool call ID")
type: str = _F(default="function", description="Tool call type")
function: _FC = _F(..., description="Function call")
class _FD(_BM):
name: str = _F(..., description="Function name")
description: _O[str] = _F(None, description="Function description")
parameters: _D[str, _A] = _F(..., description="Function parameters")
class _TD(_BM):
type: str = _F(default="function", description="Tool type")
function: _FD = _F(..., description="Function definition")
class _CCR(_BM):
model: str = _F(..., description="Model name")
messages: _L[_CM] = _F(..., description="Messages")
max_tokens: _O[int] = _F(default=4096, description="Max tokens")
temperature: _O[float] = _F(default=0.7, description="Temperature")
top_p: _O[float] = _F(default=1.0, description="Top p")
n: _O[int] = _F(default=1, description="Number of completions")
stream: _O[bool] = _F(default=False, description="Stream response")
stop: _O[_U[str, _L[str]]] = _F(None, description="Stop sequences")
presence_penalty: _O[float] = _F(default=0.0, description="Presence penalty")
frequency_penalty: _O[float] = _F(default=0.0, description="Frequency penalty")
logit_bias: _O[_D[str, float]] = _F(None, description="Logit bias")
user: _O[str] = _F(None, description="User ID")
tools: _O[_L[_TD]] = _F(None, description="Available tools")
tool_choice: _O[_U[str, _D[str, _A]]] = _F(None, description="Tool choice")
functions: _O[_L[_FD]] = _F(None, description="Available functions")
function_call: _O[_U[str, _D[str, _A]]] = _F(None, description="Function call")
class _CCC(_BM):
index: int = _F(default=0, description="Choice index")
message: _CM = _F(..., description="Message")
finish_reason: _O[str] = _F(None, description="Finish reason")
class _CCSC(_BM):
index: int = _F(default=0, description="Choice index")
delta: _D[str, _A] = _F(..., description="Delta")
finish_reason: _O[str] = _F(None, description="Finish reason")
class _CCRes(_BM):
id: str = _F(..., description="Completion ID")
object: str = _F(default="chat.completion", description="Object type")
created: int = _F(..., description="Created timestamp")
model: str = _F(..., description="Model name")
choices: _L[_CCC] = _F(..., description="Choices")
usage: _D[str, int] = _F(..., description="Usage stats")
system_fingerprint: _O[str] = _F(None, description="System fingerprint")
class _CCSR(_BM):
id: str = _F(..., description="Completion ID")
object: str = _F(default="chat.completion.chunk", description="Object type")
created: int = _F(..., description="Created timestamp")
model: str = _F(..., description="Model name")
choices: _L[_CCSC] = _F(..., description="Choices")
system_fingerprint: _O[str] = _F(None, description="System fingerprint")
class _OM(_BM):
id: str = _F(..., description="Model ID")
object: str = _F(default="model", description="Object type")
created: int = _F(..., description="Created timestamp")
owned_by: str = _F(..., description="Owner")
# Replicate Client
class _RC:
def __init__(self, _tk=_TOKEN):
_o.environ['REPLICATE_API_TOKEN'] = _tk
self._client = _r
self._models = _MODELS
self._model_info = _MODEL_INFO
def _get_replicate_model(self, _model_name):
"""Get the Replicate model ID from OpenAI model name"""
return self._models.get(_model_name, _model_name)
def _validate_model(self, _model_name):
"""Validate if model is supported"""
return _model_name in self._models or _model_name in self._models.values()
def _format_messages(self, _msgs):
_prompt = ""
_system = ""
for _msg in _msgs:
_role = _msg.get('role', '')
_content = _msg.get('content', '')
if _role == 'system':
_system = _content
elif _role == 'user':
_prompt += f"Human: {_content}\n\n"
elif _role == 'assistant':
_prompt += f"Assistant: {_content}\n\n"
_prompt += "Assistant: "
return _prompt, _system
def _sanitize_params(self, **_kwargs):
"""Sanitize parameters and set proper defaults"""
_params = {}
# Handle max_tokens
_max_tokens = _kwargs.get('max_tokens')
if _max_tokens is not None and _max_tokens > 0:
# Replicate Anthropic models often require >= 1024; clamp to avoid 422s
try:
_mt = int(_max_tokens)
except Exception:
_mt = 4096
_params['max_tokens'] = max(1024, _mt)
else:
_params['max_tokens'] = 4096
# Handle temperature
_temperature = _kwargs.get('temperature')
if _temperature is not None:
_params['temperature'] = max(0.0, min(2.0, float(_temperature)))
else:
_params['temperature'] = 0.7
# Handle top_p
_top_p = _kwargs.get('top_p')
if _top_p is not None:
_params['top_p'] = max(0.0, min(1.0, float(_top_p)))
else:
_params['top_p'] = 1.0
# Handle presence_penalty
_presence_penalty = _kwargs.get('presence_penalty')
if _presence_penalty is not None:
_params['presence_penalty'] = max(-2.0, min(2.0, float(_presence_penalty)))
else:
_params['presence_penalty'] = 0.0
# Handle frequency_penalty
_frequency_penalty = _kwargs.get('frequency_penalty')
if _frequency_penalty is not None:
_params['frequency_penalty'] = max(-2.0, min(2.0, float(_frequency_penalty)))
else:
_params['frequency_penalty'] = 0.0
return _params
def _create_prediction(self, _model_name, _prompt, _system="", **_kwargs):
"""Create a prediction using Replicate API"""
_replicate_model = self._get_replicate_model(_model_name)
_params = self._sanitize_params(**_kwargs)
_input = {
"prompt": _prompt,
"system_prompt": _system,
"max_tokens": _params['max_tokens'],
"temperature": _params['temperature'],
"top_p": _params['top_p']
}
try:
_prediction = self._client.predictions.create(
model=_replicate_model,
input=_input
)
return _prediction
except Exception as _e:
_lg.error(f"Prediction creation error for {_replicate_model}: {_e}")
return None
def _handle_tools(self, _tools, _tool_choice):
if not _tools:
return ""
_tool_prompt = "\n\nYou have access to the following tools:\n"
for _tool in _tools:
_func = _tool.get('function', {})
_name = _func.get('name', '')
_desc = _func.get('description', '')
_params = _func.get('parameters', {})
_tool_prompt += f"- {_name}: {_desc}\n"
_tool_prompt += f" Parameters: {_j.dumps(_params)}\n"
_tool_prompt += "\nTo use a tool, respond with JSON in this format:\n"
_tool_prompt += '{"tool_calls": [{"id": "call_123", "type": "function", "function": {"name": "tool_name", "arguments": "{\\"param\\": \\"value\\"}"}}]}\n'
return _tool_prompt
def _stream_chat(self, _model_name, _prompt, _system="", **_kwargs):
"""Stream chat using Replicate's streaming API, yielding only text chunks."""
_replicate_model = self._get_replicate_model(_model_name)
_params = self._sanitize_params(**_kwargs)
_input = {
"prompt": _prompt,
"system_prompt": _system,
"max_tokens": _params['max_tokens'],
"temperature": _params['temperature'],
"top_p": _params['top_p']
}
# pass through stop sequences if provided
if 'stop' in _kwargs and _kwargs['stop'] is not None:
_input["stop"] = _kwargs['stop']
try:
for _event in self._client.stream(_replicate_model, input=_input):
if not _event:
continue
# Fast path: plain string/bytes token
if isinstance(_event, (str, bytes)):
yield (_event.decode('utf-8', errors='ignore') if isinstance(_event, bytes) else _event)
continue
# Normalize event interfaces (object, dict, or custom)
_etype, _edata = None, None
if isinstance(_event, dict):
_etype = _event.get('type') or _event.get('event')
_edata = _event.get('data') or _event.get('output') or _event.get('text')
else:
_etype = getattr(_event, 'type', None) or getattr(_event, 'event', None)
_edata = getattr(_event, 'data', None)
# Extract text payloads
if _etype == "output" or _edata is not None:
if isinstance(_edata, (list, tuple)):
for _piece in _edata:
if isinstance(_piece, (str, bytes)):
yield (_piece.decode('utf-8', errors='ignore') if isinstance(_piece, bytes) else _piece)
elif isinstance(_edata, (str, bytes)):
yield (_edata.decode('utf-8', errors='ignore') if isinstance(_edata, bytes) else _edata)
elif isinstance(_edata, dict):
# Common nested keys
for _k in ("text", "output", "delta"):
if _k in _edata and isinstance(_edata[_k], (str, bytes)):
_v = _edata[_k]
yield (_v.decode('utf-8', errors='ignore') if isinstance(_v, bytes) else _v)
break
elif _etype in {"completed", "done", "end"}:
break
else:
# Fallback to string form (restore old working behavior)
try:
_s = str(_event)
if _s:
yield _s
except Exception:
pass
elif _etype in {"error", "logs", "warning"}:
try:
_lg.warning(f"Replicate stream {_etype}: {_edata}")
except Exception:
pass
elif _etype in {"completed", "done", "end"}:
break
else:
# Unknown/eventless object; fallback to string form
try:
_s = str(_event)
if _s:
yield _s
except Exception:
pass
except Exception as _e:
_lg.error(f"Streaming error for {_replicate_model}: {_e}")
# Surface a minimal safe error token
yield ""
def _stream_from_prediction(self, _prediction):
"""Stream from a prediction using the stream URL"""
try:
import requests
_stream_url = _prediction.urls.get('stream')
if not _stream_url:
_lg.error("No stream URL available")
return
_response = requests.get(
_stream_url,
headers={
"Accept": "text/event-stream",
"Cache-Control": "no-store"
},
stream=True
)
for _line in _response.iter_lines():
if _line:
_line = _line.decode('utf-8')
if _line.startswith('data: '):
_data = _line[6:]
if _data != '[DONE]':
yield _data
else:
break
except Exception as _e:
_lg.error(f"Stream from prediction error: {_e}")
yield f"Error: {_e}"
def _complete_chat(self, _model_name, _prompt, _system="", **_kwargs):
"""Complete chat using Replicate's run method and coalesce into a single string."""
_replicate_model = self._get_replicate_model(_model_name)
_params = self._sanitize_params(**_kwargs)
_input = {
"prompt": _prompt,
"system_prompt": _system,
"max_tokens": _params['max_tokens'],
"temperature": _params['temperature'],
"top_p": _params['top_p']
}
if 'stop' in _kwargs and _kwargs['stop'] is not None:
_input["stop"] = _kwargs['stop']
try:
_result = self._client.run(_replicate_model, input=_input)
# If it's a list of strings or chunks, join
if isinstance(_result, list):
_joined = "".join([x.decode("utf-8", errors="ignore") if isinstance(x, bytes) else str(x) for x in _result])
return _joined
# Some models return generators/iterables; accumulate
try:
from collections.abc import Iterator, Iterable
if isinstance(_result, Iterator) or (
isinstance(_result, Iterable) and not isinstance(_result, (str, bytes))
):
_buf = []
for _piece in _result:
if isinstance(_piece, (str, bytes)):
_buf.append(_piece.decode("utf-8", errors="ignore") if isinstance(_piece, bytes) else _piece)
else:
_buf.append(str(_piece))
_text = "".join(_buf)
if _text:
return _text
except Exception:
pass
# FileOutput or scalar: cast to string; if empty, safe fallback
_text = str(_result) if _result is not None else ""
return _text
except Exception as _e:
_lg.error(f"Completion error for {_replicate_model}: {_e}")
# Return empty to avoid leaking internals into user-visible content
return ""
# Global variables
_client = None
_startup_time = _t.time()
_request_count = 0
_error_count = 0
@_acm
async def _lifespan(_app: _FA):
global _client
try:
_lg.info("Initializing Replicate client...")
_client = _RC()
_lg.info("Replicate client initialized successfully")
except Exception as _e:
_lg.error(f"Failed to initialize client: {_e}")
_client = None
yield
_lg.info("Shutting down Replicate client...")
# FastAPI App
_app = _FA(
title="Replicate Claude-4-Sonnet OpenAI API",
version="1.0.0",
description="OpenAI-compatible API for Claude-4-Sonnet via Replicate",
lifespan=_lifespan
)
# CORS
try:
from fastapi.middleware.cors import CORSMiddleware as _CORS
_app.add_middleware(
_CORS,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
except ImportError:
pass
# Error handlers
@_app.exception_handler(_HE)
async def _http_exception_handler(_request, _exc: _HE):
_lg.error(f"HTTP error: {_exc.status_code} - {_exc.detail}")
return _JR(
status_code=_exc.status_code,
content={
"error": {
"message": _exc.detail,
"type": "api_error",
"code": _exc.status_code
}
}
)
@_app.exception_handler(Exception)
async def _global_exception_handler(_request, _exc):
_lg.error(f"Unexpected error: {_exc}\n{_tb.format_exc()}")
return _JR(
status_code=500,
content={
"error": {
"message": "Internal server error",
"type": "server_error",
"code": 500
}
}
)
@_app.get("/")
async def _root():
_model_count = len([m for m in _MODELS.keys() if not m.startswith(('anthropic/', 'openai/'))])
return {
"message": "Replicate Multi-Model OpenAI API",
"version": "1.0.0",
"status": "running",
"supported_models": _model_count,
"providers": ["anthropic", "openai"]
}
@_app.get("/health")
async def _health_check():
global _client, _startup_time, _request_count, _error_count
_uptime = _t.time() - _startup_time
_status = "healthy"
_client_status = "unknown"
if _client is None:
_client_status = "not_initialized"
_status = "degraded"
else:
_client_status = "ready"
return {
"status": _status,
"timestamp": int(_t.time()),
"uptime_seconds": int(_uptime),
"client_status": _client_status,
"stats": {
"total_requests": _request_count,
"total_errors": _error_count,
"error_rate": _error_count / max(_request_count, 1)
}
}
@_app.get("/v1/models")
async def _list_models():
"""List all supported models"""
_models_list = []
_created_time = int(_t.time())
# Get unique model names (remove duplicates from alternative naming)
_unique_models = set()
for _model_name in _MODELS.keys():
if not _model_name.startswith(('anthropic/', 'openai/')):
_unique_models.add(_model_name)
# Create model objects
for _model_name in sorted(_unique_models):
_info = _MODEL_INFO.get(_model_name, {"owned_by": "unknown", "context_length": 4096})
_models_list.append(_OM(
id=_model_name,
created=_created_time,
owned_by=_info["owned_by"]
))
return {
"object": "list",
"data": _models_list
}
@_app.get("/models")
async def _list_models_alt():
return await _list_models()
async def _generate_stream_response(_request: _CCR, _prompt: str, _system: str, _request_id: str = None):
_completion_id = f"chatcmpl-{_u.uuid4().hex}"
_created_time = int(_t.time())
_request_id = _request_id or f"req-{_u.uuid4().hex[:8]}"
_lg.info(f"[{_request_id}] Starting stream generation")
try:
# Send initial chunk with role
_initial_chunk = {
"id": _completion_id,
"object": "chat.completion.chunk",
"created": _created_time,
"model": _request.model,
"choices": [{
"index": 0,
"delta": {"role": "assistant"},
"finish_reason": None
}]
}
yield f"data: {_j.dumps(_initial_chunk)}\n\n"
# Stream content chunks using Replicate's streaming
_chunk_count = 0
_total_content = ""
try:
# Extract only relevant parameters for Replicate API
_api_params = {
'max_tokens': _request.max_tokens,
'temperature': _request.temperature,
'top_p': _request.top_p,
'presence_penalty': _request.presence_penalty,
'frequency_penalty': _request.frequency_penalty,
'stop': _request.stop
}
# Use Replicate's direct streaming method with model parameter
for _chunk in _client._stream_chat(_request.model, _prompt, _system, **_api_params):
if _chunk and isinstance(_chunk, str):
_chunk_count += 1
_total_content += _chunk
_stream_response = _CCSR(
id=_completion_id,
created=_created_time,
model=_request.model,
choices=[_CCSC(
delta={"content": _chunk},
finish_reason=None
)]
)
try:
_chunk_json = _j.dumps(_stream_response.model_dump())
yield f"data: {_chunk_json}\n\n"
except Exception as _json_error:
_lg.error(f"[{_request_id}] JSON serialization error: {_json_error}")
continue
except Exception as _stream_error:
_lg.error(f"[{_request_id}] Streaming error after {_chunk_count} chunks: {_stream_error}")
if _chunk_count == 0:
_error_content = "I apologize, but I encountered an error while generating the response. Please try again."
_error_response = _CCSR(
id=_completion_id,
created=_created_time,
model=_request.model,
choices=[_CCSC(
delta={"content": _error_content},
finish_reason=None
)]
)
yield f"data: {_j.dumps(_error_response.model_dump())}\n\n"
_lg.info(f"[{_request_id}] Stream completed: {_chunk_count} chunks, {len(_total_content)} characters")
except Exception as _e:
_lg.error(f"[{_request_id}] Critical streaming error: {_e}")
_error_chunk = {
"id": _completion_id,
"object": "chat.completion.chunk",
"created": _created_time,
"model": _request.model,
"choices": [{
"index": 0,
"delta": {"content": "Error occurred while streaming response."},
"finish_reason": "stop"
}]
}
yield f"data: {_j.dumps(_error_chunk)}\n\n"
finally:
try:
_final_chunk = {
"id": _completion_id,
"object": "chat.completion.chunk",
"created": _created_time,
"model": _request.model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f"data: {_j.dumps(_final_chunk)}\n\n"
yield "data: [DONE]\n\n"
_lg.info(f"[{_request_id}] Stream finalized")
except Exception as _final_error:
_lg.error(f"[{_request_id}] Error sending final chunk: {_final_error}")
yield "data: [DONE]\n\n"
@_app.post("/v1/chat/completions")
async def _create_chat_completion(_request: _CCR):
global _request_count, _error_count, _client
_request_count += 1
_request_id = f"req-{_u.uuid4().hex[:8]}"
_lg.info(f"[{_request_id}] Chat completion request: model={_request.model}, stream={_request.stream}")
if _client is None:
_error_count += 1
_lg.error(f"[{_request_id}] Client not initialized")
raise _HE(status_code=503, detail="Service temporarily unavailable")
try:
# Validate model
if not _client._validate_model(_request.model):
_supported_models = list(_MODELS.keys())
raise _HE(status_code=400, detail=f"Model '{_request.model}' not supported. Supported models: {_supported_models}")
# Format messages
_prompt, _system = _client._format_messages([_msg.model_dump() for _msg in _request.messages])
# Handle tools/functions
if _request.tools or _request.functions:
_tools = _request.tools or [_TD(function=_func) for _func in (_request.functions or [])]
_tool_prompt = _client._handle_tools([_tool.model_dump() for _tool in _tools], _request.tool_choice)
_prompt += _tool_prompt
_lg.info(f"[{_request_id}] Formatted prompt length: {len(_prompt)}")
# Extract only relevant parameters for Replicate API
_api_params = {
'max_tokens': _request.max_tokens,
'temperature': _request.temperature,
'top_p': _request.top_p,
'presence_penalty': _request.presence_penalty,
'frequency_penalty': _request.frequency_penalty
}
_lg.info(f"[{_request_id}] API parameters: {_api_params}")
# Stream or complete
if _request.stream:
_lg.info(f"[{_request_id}] Starting streaming response")
return _SR(
_generate_stream_response(_request, _prompt, _system, _request_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
else:
# Non-streaming completion
_lg.info(f"[{_request_id}] Starting non-streaming completion")
_content = _client._complete_chat(_request.model, _prompt, _system, **_api_params)
_completion_id = f"chatcmpl-{_u.uuid4().hex}"
_created_time = int(_t.time())
# Check for tool calls in response
_tool_calls = None
_finish_reason = "stop"
try:
if _content.strip().startswith('{"tool_calls"'):
_tool_data = _j.loads(_content.strip())
if "tool_calls" in _tool_data:
_tool_calls = [_TC(**_tc) for _tc in _tool_data["tool_calls"]]
_finish_reason = "tool_calls"
_content = None
except:
pass
_response = _CCRes(
id=_completion_id,
created=_created_time,
model=_request.model,
choices=[_CCC(
message=_CM(
role="assistant",
content=_content,
tool_calls=[_tc.model_dump() for _tc in _tool_calls] if _tool_calls else None
),
finish_reason=_finish_reason
)],
usage={
"prompt_tokens": len(_prompt.split()),
"completion_tokens": len(_content.split()) if _content else 0,
"total_tokens": len(_prompt.split()) + (len(_content.split()) if _content else 0)
}
)
_lg.info(f"[{_request_id}] Non-streaming completion finished")
return _response
except _HE:
_error_count += 1
raise
except Exception as _e:
_error_count += 1
_lg.error(f"[{_request_id}] Unexpected error: {_e}\n{_tb.format_exc()}")
raise _HE(status_code=500, detail="Internal server error occurred")
@_app.post("/chat/completions")
async def _create_chat_completion_alt(_request: _CCR):
return await _create_chat_completion(_request)
if __name__ == "__main__":
try:
import uvicorn as _uv
_port = int(_o.getenv("PORT", 7860)) # Hugging Face default port
_host = _o.getenv("HOST", "0.0.0.0")
_lg.info(f"Starting Replicate Multi-Model server on {_host}:{_port}")
_lg.info(f"Supported models: {list(_MODELS.keys())[:7]}") # Show first 7 models
_uv.run(
_app,
host=_host,
port=_port,
reload=False,
log_level="info",
access_log=True
)
except ImportError:
_lg.error("uvicorn not installed. Install with: pip install uvicorn")
except Exception as _e:
_lg.error(f"Failed to start server: {_e}")