# puter_server.py #!/usr/bin/env python3 """ Puter.com Reverse OpenAI-Compatible API Server (edited for proper async streaming with httpx) """ import json import time import uuid import logging from typing import Any, Dict, List, Optional, Union, AsyncGenerator import httpx from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field try: from .config import ( PUTER_HEADERS, PUTER_AUTH_BEARER, SERVER_CONFIG, MODEL_MAPPING, ) except ImportError: from config import ( PUTER_HEADERS, PUTER_AUTH_BEARER, SERVER_CONFIG, MODEL_MAPPING, ) logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) PUTER_URL = "https://api.puter.com/drivers/call" REQUEST_TIMEOUT = 120 # seconds # ===== OpenAI-compatible models (same as your original) ===== class OpenAIMessage(BaseModel): role: Optional[str] = Field(default=None, description="Role") content: Optional[Union[str, List[Dict[str, Any]]]] = None name: Optional[str] = None function_call: Optional[Dict[str, Any]] = None tool_calls: Optional[List[Dict[str, Any]]] = None tool_call_id: Optional[str] = None def get_text(self) -> str: if isinstance(self.content, str): return self.content if isinstance(self.content, list): parts: List[str] = [] for item in self.content: if isinstance(item, dict) and item.get("type") == "text": parts.append(item.get("text", "")) return "".join(parts) return str(self.content) if self.content is not None else "" class Config: extra = "allow" class OpenAIFunction(BaseModel): name: str description: Optional[str] = None parameters: Optional[Dict[str, Any]] = None class Config: extra = "allow" class OpenAITool(BaseModel): type: str = Field(default="function") function: Optional[OpenAIFunction] = None class Config: extra = "allow" class OpenAIChatRequest(BaseModel): model: str messages: List[OpenAIMessage] max_tokens: Optional[int] = None temperature: Optional[float] = None top_p: Optional[float] = None n: Optional[int] = 1 stream: Optional[bool] = False stop: Optional[Union[str, List[str]]] = None presence_penalty: Optional[float] = None frequency_penalty: Optional[float] = None logit_bias: Optional[Dict[str, float]] = None user: Optional[str] = None tools: Optional[List[OpenAITool]] = None tool_choice: Optional[Union[str, Dict[str, Any]]] = None functions: Optional[List[OpenAIFunction]] = None function_call: Optional[Union[str, Dict[str, Any]]] = None class Config: extra = "allow" class OpenAIChoice(BaseModel): index: int = 0 message: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIChatResponse(BaseModel): id: str object: str = "chat.completion" created: int model: str choices: List[OpenAIChoice] usage: Optional[Dict[str, int]] = None class OpenAIStreamChoice(BaseModel): index: int = 0 delta: Dict[str, Any] finish_reason: Optional[str] = None class OpenAIStreamChunk(BaseModel): id: str object: str = "chat.completion.chunk" created: int model: str choices: List[OpenAIStreamChoice] def _build_puter_payload(openai_req: OpenAIChatRequest, stream_upstream: bool = True) -> Dict[str, Any]: mapped_messages: List[Dict[str, str]] = [] for m in openai_req.messages: txt = m.get_text() mapped_messages.append({"content": txt}) mapping = MODEL_MAPPING.get(openai_req.model) or MODEL_MAPPING.get("default") driver = mapping["driver"] puter_model = mapping["puter_model"] payload: Dict[str, Any] = { "interface": "puter-chat-completion", "driver": driver, "test_mode": False, "method": "complete", "args": { "messages": mapped_messages, "model": puter_model, "stream": stream_upstream, }, } return payload def _headers_with_auth() -> Dict[str, str]: h = dict(PUTER_HEADERS) h["authorization"] = f"Bearer {PuterAuth.token}" return h class PuterAuth: token: str = PUTER_AUTH_BEARER async def _stream_openai_chunks(openai_req: OpenAIChatRequest, request_id: str) -> AsyncGenerator[str, None]: """ Async stream from upstream Puter API and yield SSE-compatible chunks. """ headers = _headers_with_auth() payload = _build_puter_payload(openai_req, stream_upstream=True) timeout = httpx.Timeout(REQUEST_TIMEOUT) async with httpx.AsyncClient(timeout=timeout) as client: try: async with client.stream("POST", PUTER_URL, headers=headers, json=payload) as resp: if resp.status_code != 200: detail = (await resp.aread())[:500] raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}") created = int(time.time()) # initial role chunk initial = OpenAIStreamChunk( id=request_id, created=created, model=openai_req.model, choices=[OpenAIStreamChoice(index=0, delta={"role": "assistant"}, finish_reason=None)], ) yield f"data: {initial.model_dump_json()}\n\n" async for line in resp.aiter_lines(): if not line: continue text_piece: Optional[str] = None try: obj = json.loads(line) for k in ("delta", "text", "content", "output"): v = obj.get(k) if isinstance(v, str) and v: text_piece = v break except Exception: # fallback raw text if line and line != "[DONE]": text_piece = line if not text_piece: continue chunk = OpenAIStreamChunk( id=request_id, created=created, model=openai_req.model, choices=[OpenAIStreamChoice(index=0, delta={"content": text_piece}, finish_reason=None)], ) yield f"data: {chunk.model_dump_json()}\n\n" final = OpenAIStreamChunk( id=request_id, created=created, model=openai_req.model, choices=[OpenAIStreamChoice(index=0, delta={}, finish_reason="stop")], ) yield f"data: {final.model_dump_json()}\n\n" yield "data: [DONE]\n\n" except httpx.RequestError as e: raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}") async def _complete_non_streaming(openai_req: OpenAIChatRequest) -> str: """ Request upstream without streaming and return full content as string. """ headers = _headers_with_auth() payload = _build_puter_payload(openai_req, stream_upstream=False) timeout = httpx.Timeout(REQUEST_TIMEOUT) async with httpx.AsyncClient(timeout=timeout) as client: try: resp = await client.post(PUTER_URL, headers=headers, json=payload) except httpx.RequestError as e: raise HTTPException(status_code=502, detail=f"Upstream connection error: {e}") if resp.status_code != 200: detail = (resp.text)[:500] raise HTTPException(status_code=502, detail=f"Upstream error {resp.status_code}: {detail}") # attempt to parse JSON with expected fields, fallback to raw text try: data = resp.json() # attempt common fields if isinstance(data, dict): # search for text-like fields for k in ("output", "content", "text", "message", "result"): v = data.get(k) if isinstance(v, str): return v # else try joining array of outputs if isinstance(data.get("choices"), list): parts = [] for c in data.get("choices"): if isinstance(c, dict): text = c.get("text") or (c.get("message") and c["message"].get("content")) if text: parts.append(text) if parts: return "".join(parts) # fallback to raw text body return resp.text except Exception: return resp.text # ===== FastAPI app ===== app = FastAPI( title="Puter Reverse OpenAI API", version="1.0.0", description="OpenAI-compatible API proxying to api.puter.com (async streaming enabled)" ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") async def root(): return {"message": "Puter Reverse OpenAI API", "status": "running", "version": "1.0.0"} @app.get("/health") async def health(): return {"status": "healthy", "timestamp": int(time.time())} @app.get("/v1/models") async def models(): created = int(time.time()) data = [] for key in [k for k in MODEL_MAPPING.keys() if k != "default"]: data.append({"id": key, "object": "model", "created": created, "owned_by": "puter"}) if not data: data.append({"id": "o3-mini", "object": "model", "created": created, "owned_by": "puter"}) return {"object": "list", "data": data} @app.post("/v1/chat/completions") async def chat(request: OpenAIChatRequest): req_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" logger.info(f"[{req_id}] model={request.model}, stream={bool(request.stream)}") if bool(request.stream): return StreamingResponse( _stream_openai_chunks(request, req_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", }, ) content = await _complete_non_streaming(request) created = int(time.time()) response = OpenAIChatResponse( id=req_id, created=created, model=request.model, choices=[OpenAIChoice(index=0, message={"role": "assistant", "content": content}, finish_reason="stop")], usage={ "prompt_tokens": len(" ".join([m.get_text() for m in request.messages]).split()), "completion_tokens": len(content.split()), "total_tokens": len(" ".join([m.get_text() for m in request.messages]).split()) + len(content.split()), }, ) return response @app.post("/v1/chat/completions/raw") async def raw(req: Request): body = await req.body() try: obj = json.loads(body) _ = OpenAIChatRequest(**obj) return {"valid": True} except Exception as e: return JSONResponse(status_code=422, content={"valid": False, "error": str(e)}) if __name__ == "__main__": try: import uvicorn host = SERVER_CONFIG.get("host", "0.0.0.0") port = int(SERVER_CONFIG.get("port", 8781)) logger.info(f"Starting Puter Reverse API on {host}:{port}") uvicorn.run(app, host=host, port=port, log_level="info") except Exception as e: logger.error(f"Failed to start server: {e}")