| | |
| | |
| | """ |
| | Puter.com Reverse OpenAI-Compatible API Server |
| | (edited for proper async streaming with httpx) |
| | """ |
| | import json |
| | import time |
| | import uuid |
| | import logging |
| | from typing import Any, Dict, List, Optional, Union, AsyncGenerator |
| |
|
| | import httpx |
| | from fastapi import FastAPI, HTTPException, Request |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from fastapi.responses import StreamingResponse, JSONResponse |
| | from pydantic import BaseModel, Field |
| |
|
| | try: |
| | from .config import ( |
| | PUTER_HEADERS, |
| | PUTER_AUTH_BEARER, |
| | SERVER_CONFIG, |
| | MODEL_MAPPING, |
| | ) |
| | except ImportError: |
| | from config import ( |
| | PUTER_HEADERS, |
| | PUTER_AUTH_BEARER, |
| | SERVER_CONFIG, |
| | MODEL_MAPPING, |
| | ) |
| |
|
| | logger = logging.getLogger(__name__) |
| | logging.basicConfig(level=logging.INFO) |
| |
|
| | PUTER_URL = "https://api.puter.com/drivers/call" |
| | REQUEST_TIMEOUT = 120 |
| |
|
| |
|
| | |
| | class OpenAIMessage(BaseModel): |
| | role: Optional[str] = Field(default=None, description="Role") |
| | content: Optional[Union[str, List[Dict[str, Any]]]] = None |
| | name: Optional[str] = None |
| | function_call: Optional[Dict[str, Any]] = None |
| | tool_calls: Optional[List[Dict[str, Any]]] = None |
| | tool_call_id: Optional[str] = None |
| |
|
| | def get_text(self) -> str: |
| | if isinstance(self.content, str): |
| | return self.content |
| | if isinstance(self.content, list): |
| | parts: List[str] = [] |
| | for item in self.content: |
| | if isinstance(item, dict) and item.get("type") == "text": |
| | parts.append(item.get("text", "")) |
| | return "".join(parts) |
| | return str(self.content) if self.content is not None else "" |
| |
|
| | class Config: |
| | extra = "allow" |
| |
|
| |
|
| | class OpenAIFunction(BaseModel): |
| | name: str |
| | description: Optional[str] = None |
| | parameters: Optional[Dict[str, Any]] = None |
| |
|
| | class Config: |
| | extra = "allow" |
| |
|
| |
|
| | class OpenAITool(BaseModel): |
| | type: str = Field(default="function") |
| | function: Optional[OpenAIFunction] = None |
| |
|
| | class Config: |
| | extra = "allow" |
| |
|
| |
|
| | class OpenAIChatRequest(BaseModel): |
| | model: str |
| | messages: List[OpenAIMessage] |
| | max_tokens: Optional[int] = None |
| | temperature: Optional[float] = None |
| | top_p: Optional[float] = None |
| | n: Optional[int] = 1 |
| | stream: Optional[bool] = False |
| | stop: Optional[Union[str, List[str]]] = None |
| | presence_penalty: Optional[float] = None |
| | frequency_penalty: Optional[float] = None |
| | logit_bias: Optional[Dict[str, float]] = None |
| | user: Optional[str] = None |
| | tools: Optional[List[OpenAITool]] = None |
| | tool_choice: Optional[Union[str, Dict[str, Any]]] = None |
| | functions: Optional[List[OpenAIFunction]] = None |
| | function_call: Optional[Union[str, Dict[str, Any]]] = None |
| |
|
| | class Config: |
| | extra = "allow" |
| |
|
| |
|
| | class OpenAIChoice(BaseModel): |
| | index: int = 0 |
| | message: Dict[str, Any] |
| | finish_reason: Optional[str] = None |
| |
|
| |
|
| | class OpenAIChatResponse(BaseModel): |
| | id: str |
| | object: str = "chat.completion" |
| | created: int |
| | model: str |
| | choices: List[OpenAIChoice] |
| | usage: Optional[Dict[str, int]] = None |
| |
|
| |
|
| | class OpenAIStreamChoice(BaseModel): |
| | index: int = 0 |
| | delta: Dict[str, Any] |
| | finish_reason: Optional[str] = None |
| |
|
| |
|
| | class OpenAIStreamChunk(BaseModel): |
| | id: str |
| | object: str = "chat.completion.chunk" |
| | created: int |
| | model: str |
| | choices: List[OpenAIStreamChoice] |
| |
|
| |
|
| | def _build_puter_payload(openai_req: OpenAIChatRequest, stream_upstream: bool = True) -> Dict[str, Any]: |
| | mapped_messages: List[Dict[str, str]] = [] |
| | for m in openai_req.messages: |
| | txt = m.get_text() |
| | mapped_messages.append({"content": txt}) |
| |
|
| | mapping = MODEL_MAPPING.get(openai_req.model) or MODEL_MAPPING.get("default") |
| | driver = mapping["driver"] |
| | puter_model = mapping["puter_model"] |
| |
|
| | payload: Dict[str, Any] = { |
| | "interface": "puter-chat-completion", |
| | "driver": driver, |
| | "test_mode": False, |
| | "method": "complete", |
| | "args": { |
| | "messages": mapped_messages, |
| | "model": puter_model, |
| | "stream": stream_upstream, |
| | }, |
| | } |
| | return payload |
| |
|
| |
|
| | def _headers_with_auth() -> Dict[str, str]: |
| | h = dict(PUTER_HEADERS) |
| | h["authorization"] = f"Bearer {PuterAuth.token}" |
| | return h |
| |
|
| |
|
| | class PuterAuth: |
| | token: str = PUTER_AUTH_BEARER |
| |
|
| |
|
| | async def _stream_openai_chunks(openai_req: OpenAIChatRequest, request_id: str) -> AsyncGenerator[str, None]: |
| | """ |
| | Async stream from upstream Puter API and yield SSE-compatible chunks. |
| | """ |
| | headers = _headers_with_auth() |
| | payload = _build_puter_payload(openai_req, stream_upstream=True) |
| |
|
| | timeout = httpx.Timeout(REQUEST_TIMEOUT) |
| | async with httpx.AsyncClient(timeout=timeout) as client: |
| | try: |
| | async with client.stream("POST", PUTER_URL, headers=headers, json=payload) as resp: |
| | if resp.status_code != 200: |
| | detail = (await resp.aread())[:500] |
| | raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}") |
| |
|
| | created = int(time.time()) |
| |
|
| | |
| | initial = OpenAIStreamChunk( |
| | id=request_id, |
| | created=created, |
| | model=openai_req.model, |
| | choices=[OpenAIStreamChoice(index=0, delta={"role": "assistant"}, finish_reason=None)], |
| | ) |
| | yield f"data: {initial.model_dump_json()}\n\n" |
| |
|
| | async for line in resp.aiter_lines(): |
| | if not line: |
| | continue |
| |
|
| | text_piece: Optional[str] = None |
| | try: |
| | obj = json.loads(line) |
| | for k in ("delta", "text", "content", "output"): |
| | v = obj.get(k) |
| | if isinstance(v, str) and v: |
| | text_piece = v |
| | break |
| | except Exception: |
| | |
| | if line and line != "[DONE]": |
| | text_piece = line |
| |
|
| | if not text_piece: |
| | continue |
| |
|
| | chunk = OpenAIStreamChunk( |
| | id=request_id, |
| | created=created, |
| | model=openai_req.model, |
| | choices=[OpenAIStreamChoice(index=0, delta={"content": text_piece}, finish_reason=None)], |
| | ) |
| | yield f"data: {chunk.model_dump_json()}\n\n" |
| |
|
| | final = OpenAIStreamChunk( |
| | id=request_id, |
| | created=created, |
| | model=openai_req.model, |
| | choices=[OpenAIStreamChoice(index=0, delta={}, finish_reason="stop")], |
| | ) |
| | yield f"data: {final.model_dump_json()}\n\n" |
| | yield "data: [DONE]\n\n" |
| |
|
| | except httpx.RequestError as e: |
| | raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}") |
| |
|
| |
|
| | async def _complete_non_streaming(openai_req: OpenAIChatRequest) -> str: |
| | """ |
| | Request upstream without streaming and return full content as string. |
| | """ |
| | headers = _headers_with_auth() |
| | payload = _build_puter_payload(openai_req, stream_upstream=False) |
| |
|
| | timeout = httpx.Timeout(REQUEST_TIMEOUT) |
| | async with httpx.AsyncClient(timeout=timeout) as client: |
| | try: |
| | resp = await client.post(PUTER_URL, headers=headers, json=payload) |
| | except httpx.RequestError as e: |
| | raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}") |
| |
|
| | if resp.status_code != 200: |
| | detail = (resp.text)[:500] |
| | raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}") |
| |
|
| | |
| | try: |
| | data = resp.json() |
| | |
| | if isinstance(data, dict): |
| | |
| | for k in ("output", "content", "text", "message", "result"): |
| | v = data.get(k) |
| | if isinstance(v, str): |
| | return v |
| | |
| | if isinstance(data.get("choices"), list): |
| | parts = [] |
| | for c in data.get("choices"): |
| | if isinstance(c, dict): |
| | text = c.get("text") or (c.get("message") and c["message"].get("content")) |
| | if text: |
| | parts.append(text) |
| | if parts: |
| | return "".join(parts) |
| | |
| | return resp.text |
| | except Exception: |
| | return resp.text |
| |
|
| |
|
| | |
| | app = FastAPI( |
| | title="Puter Reverse OpenAI API", |
| | version="1.0.0", |
| | description="OpenAI-compatible API proxying to api.puter.com (async streaming enabled)" |
| | ) |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| |
|
| | @app.get("/") |
| | async def root(): |
| | return {"message": "Puter Reverse OpenAI API", "status": "running", "version": "1.0.0"} |
| |
|
| |
|
| | @app.get("/health") |
| | async def health(): |
| | return {"status": "healthy", "timestamp": int(time.time())} |
| |
|
| |
|
| | @app.get("/v1/models") |
| | async def models(): |
| | created = int(time.time()) |
| | data = [] |
| | for key in [k for k in MODEL_MAPPING.keys() if k != "default"]: |
| | data.append({"id": key, "object": "model", "created": created, "owned_by": "puter"}) |
| | if not data: |
| | data.append({"id": "o3-mini", "object": "model", "created": created, "owned_by": "puter"}) |
| | return {"object": "list", "data": data} |
| |
|
| |
|
| | @app.post("/v1/chat/completions") |
| | async def chat(request: OpenAIChatRequest): |
| | req_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" |
| | logger.info(f"[{req_id}] model={request.model}, stream={bool(request.stream)}") |
| |
|
| | if bool(request.stream): |
| | return StreamingResponse( |
| | _stream_openai_chunks(request, req_id), |
| | media_type="text/event-stream", |
| | headers={ |
| | "Cache-Control": "no-cache", |
| | "Connection": "keep-alive", |
| | "X-Accel-Buffering": "no", |
| | "Access-Control-Allow-Origin": "*", |
| | "Access-Control-Allow-Headers": "*", |
| | }, |
| | ) |
| |
|
| | content = await _complete_non_streaming(request) |
| | created = int(time.time()) |
| | response = OpenAIChatResponse( |
| | id=req_id, |
| | created=created, |
| | model=request.model, |
| | choices=[OpenAIChoice(index=0, message={"role": "assistant", "content": content}, finish_reason="stop")], |
| | usage={ |
| | "prompt_tokens": len(" ".join([m.get_text() for m in request.messages]).split()), |
| | "completion_tokens": len(content.split()), |
| | "total_tokens": len(" ".join([m.get_text() for m in request.messages]).split()) + len(content.split()), |
| | }, |
| | ) |
| | return response |
| |
|
| |
|
| | @app.post("/v1/chat/completions/raw") |
| | async def raw(req: Request): |
| | body = await req.body() |
| | try: |
| | obj = json.loads(body) |
| | _ = OpenAIChatRequest(**obj) |
| | return {"valid": True} |
| | except Exception as e: |
| | return JSONResponse(status_code=422, content={"valid": False, "error": str(e)}) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | try: |
| | import uvicorn |
| | host = SERVER_CONFIG.get("host", "0.0.0.0") |
| | port = int(SERVER_CONFIG.get("port", 8781)) |
| | logger.info(f"Starting Puter Reverse API on {host}:{port}") |
| | uvicorn.run(app, host=host, port=port, log_level="info") |
| | except Exception as e: |
| | logger.error(f"Failed to start server: {e}") |