import os import json import time import httpx import uvicorn import gradio as gr from fastapi import FastAPI, Request, Header, HTTPException from fastapi.responses import JSONResponse, StreamingResponse # --------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------- BYTEZ_CHAT_URL = "https://api.bytez.com/models/v2/openai/v1/chat/completions" BYTEZ_MODELS_URL = "https://api.bytez.com/models/v2/list/models" BYTEZ_IMAGE_URL = "https://api.bytez.com/models/v2/openai/dall-e-3" BYTEZ_AUTH = os.getenv("BYTEZ_API_KEY") # your Bytez key LOCAL_API_KEY = os.getenv("LOCAL_API_KEY") # optional local guard BYTEZ_AUTH_2 = os.getenv("BYTEZ_API_KEY_2") # --------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------- api = FastAPI(title="Bytez → OpenAI Proxy (v1 + v2)") def check_key(auth: str | None): """Validate the Bearer token (optional local key).""" if not auth or not auth.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid API key") user_key = auth.split("Bearer ")[1].strip() if LOCAL_API_KEY and user_key != LOCAL_API_KEY: raise HTTPException(status_code=403, detail="Unauthorized API key") # --------------------------------------------------------------------- # Root / health # --------------------------------------------------------------------- @api.get("/") def root(): return {"status": "ok", "message": "Bytez proxy (v1+v2) running"} # --------------------------------------------------------------------- # -------------------------- /v1 ------------------------------------ # --------------------------------------------------------------------- @api.get("/v1/models") async def v1_models(authorization: str = Header(None)): check_key(authorization) if not BYTEZ_AUTH: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") async with httpx.AsyncClient(timeout=30) as c: r = await c.get(BYTEZ_MODELS_URL, headers={"Authorization": BYTEZ_AUTH}) try: data = r.json() except json.JSONDecodeError: raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") # Transform Bytez → OpenAI list models_list = [ {"id": m.get("id") or m.get("name"), "object": "model"} for m in (data if isinstance(data, list) else data.get("data", [])) ] return JSONResponse( {"object": "list", "data": models_list}, headers={"Access-Control-Allow-Origin": "*"} ) @api.post("/v1/chat/completions") async def v1_chat(request: Request, authorization: str = Header(None)): """Exactly the same implementation you already had – untouched.""" check_key(authorization) if not BYTEZ_AUTH: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") payload = await request.json() stream = payload.get("stream", False) headers = {"Authorization": BYTEZ_AUTH, "Content-Type": "application/json"} # ---------- streaming helper ---------- async def v1_event_stream(): async with httpx.AsyncClient(timeout=120) as client: async with client.stream("POST", BYTEZ_CHAT_URL, headers=headers, json=payload) as upstream: async for line in upstream.aiter_lines(): line = line.strip() if not line: continue json_str = line[6:] if line.startswith("data: ") else line try: chunk = json.loads(json_str) except json.JSONDecodeError: continue if json_str == "[DONE]": yield "data: [DONE]\n\n" break # ----- adapt Bytez chunk to OpenAI ----- content = "" if "token" in chunk: content = chunk["token"] elif "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}) content = delta.get("content", "") elif "text" in chunk: content = chunk["text"] else: content = str(chunk) openai_chunk = { "id": "chatcmpl-proxy-stream", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"role": "assistant", "content": content}, "finish_reason": None, } ], } yield f"data: {json.dumps(openai_chunk)}\n\n" yield "data: [DONE]\n\n" # ---------- non-stream ---------- if not stream: async with httpx.AsyncClient(timeout=120) as c: r = await c.post(BYTEZ_CHAT_URL, headers=headers, json=payload) try: data = r.json() except json.JSONDecodeError: raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") if "choices" not in data: content = data.get("output") or data.get("response") or data.get("message") or str(data) data = { "id": "chatcmpl-proxy", "object": "chat.completion", "choices": [{"index": 0, "message": {"role": "assistant", "content": content}}], } return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) return StreamingResponse( v1_event_stream(), media_type="text/event-stream", headers={"Access-Control-Allow-Origin": "*"}, ) # --------------------------------------------------------------------- # --------------------- /v1/images/generations (FIXED) --------------- # --------------------------------------------------------------------- # --------------------------------------------------------------------- # --------------------- /v1/images/generations ----------------------- # --------------------------------------------------------------------- @api.post("/v1/images/generations") async def v1_images_generations(request: Request, authorization: str = Header(None)): """ OpenAI-compatible image generation endpoint that forwards to: https://api.bytez.com/models/v2/openai/dall-e-3 It accepts the usual OpenAI body: { "model": "dall-e-3", # REQUIRED by clients, but ignored "prompt": "text", # REQUIRED "n": 1, "size": "1024x1024", "response_format": "url" | "b64_json", "quality": "standard" | "hd", "style": "vivid" | "natural" } and converts it to the Bytez format from your curl: { "text": "" } """ check_key(authorization) # Use SAME key as chat unless you *really* have a separate one if not BYTEZ_AUTH: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") # ---------------- parse request ---------------- try: payload = await request.json() except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") prompt = payload.get("prompt") or payload.get("text") if not prompt or not str(prompt).strip(): raise HTTPException(status_code=400, detail="Field 'prompt' is required") # These are there only to satisfy OpenAI-compatible clients _model = payload.get("model", "dall-e-3") # ignored n = int(payload.get("n", 1)) response_format = payload.get("response_format", "url") # "url" | "b64_json" size = payload.get("size", "1024x1024") quality = payload.get("quality") style = payload.get("style") # ---------------- build Bytez request ---------------- # Minimal payload that we KNOW works from your curl: bytez_payload = { "text": prompt, } # Only add extras if Bytez actually supports them. # Comment these out if you’re unsure; or keep them if Bytez docs say so. # bytez_payload["num_outputs"] = n # bytez_payload["size"] = size # if quality in ["standard", "hd"]: # bytez_payload["quality"] = quality # if style in ["vivid", "natural"]: # bytez_payload["style"] = style headers = { "Authorization": BYTEZ_AUTH, # << important "Content-Type": "application/json", } # ---------------- call Bytez ---------------- async with httpx.AsyncClient(timeout=200) as client: try: resp = await client.post( BYTEZ_IMAGE_URL, json=bytez_payload, headers=headers, ) resp.raise_for_status() except httpx.HTTPStatusError as e: # Surface Bytez error message directly to client detail = None try: detail = e.response.json() except Exception: detail = e.response.text raise HTTPException( status_code=e.response.status_code, detail={"upstream_error": detail}, ) except Exception as e: raise HTTPException(status_code=502, detail=f"Bytez unreachable: {str(e)}") try: bytez_data = resp.json() except json.JSONDecodeError: raise HTTPException(status_code=502, detail="Bytez returned invalid JSON") # ---------------- map Bytez → OpenAI image response ---------------- # We don't know the exact shape, so we handle several possibilities. images = [] # Case 1: { "images": ["url_or_b64", ...] } if isinstance(bytez_data, dict) and "images" in bytez_data and isinstance(bytez_data["images"], list): images = bytez_data["images"] # Case 2: { "data": [ { "url": "..." } , ... ] } elif isinstance(bytez_data, dict) and "data" in bytez_data and isinstance(bytez_data["data"], list): for item in bytez_data["data"]: if "url" in item: images.append(item["url"]) elif "b64_json" in item: images.append(item["b64_json"]) else: images.append(str(item)) # Case 3: single string elif isinstance(bytez_data, str): images = [bytez_data] # Fallback: treat whole thing as one string else: images = [str(bytez_data)] if not images: raise HTTPException(status_code=500, detail={"error": "No images returned from Bytez", "raw": bytez_data}) # truncate to n images images = images[:n] openai_data = [] for img in images: # If it looks like a data URL already if isinstance(img, str) and img.startswith("data:image"): # Already a data URL; extract base64 if needed b64_part = img.split("base64,", 1)[-1] if response_format == "b64_json": openai_data.append({"b64_json": b64_part}) else: openai_data.append({"url": img}) else: # Raw URL or base64 – we can't be sure, so: if response_format == "b64_json": # Assume it's base64 or treat as a string anyway openai_data.append({"b64_json": str(img)}) else: # Assume it's a URL or make it one if needed openai_data.append({"url": str(img)}) result = { "created": int(time.time()), "data": openai_data, } return JSONResponse(result, headers={"Access-Control-Allow-Origin": "*"}) # --------------------------------------------------------------------- # -------------------------- /v2 ------------------------------------ # --------------------------------------------------------------------- @api.post("/v2/chat/completions") async def v2_chat_completions(request: Request, authorization: str = Header(None)): """ v2 – clean OpenAI-compatible streaming. * First chunk includes role=assistant (required by Continue.dev) * Later chunks send only delta.content * No usage events """ check_key(authorization) if not BYTEZ_AUTH_2: raise HTTPException(status_code=500, detail="Server BYTEZ_API_2 not configured") try: body = await request.body() payload = json.loads(body.decode("utf-8")) except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") stream = payload.get("stream", False) upstream_headers = { "Authorization": BYTEZ_AUTH_2, "Content-Type": "application/json", } # Normal content chunk (NO ROLE) def make_openai_delta(content: str): return { "id": f"chatcmpl-v2-{int(time.time())}", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"content": content}, "finish_reason": None, } ], } async def clean_stream(): # FIRST CHUNK MUST SET THE ROLE → REQUIRED by Continue.dev first_chunk = { "id": f"chatcmpl-v2-{int(time.time())}", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None, } ], } # Send first role-setting chunk yield f"data: {json.dumps(first_chunk)}\n\n" async with httpx.AsyncClient(timeout=180) as client: try: async with client.stream( "POST", BYTEZ_CHAT_URL, headers=upstream_headers, json=payload ) as upstream: async for line in upstream.aiter_lines(): line = line.strip() if not line: continue json_str = line[6:] if line.startswith("data: ") else line # Skip usage events if "usage" in json_str.lower(): continue if json_str == "[DONE]": yield "data: [DONE]\n\n" return try: chunk = json.loads(json_str) except json.JSONDecodeError: continue text = "" if isinstance(chunk, dict): if "token" in chunk: text = chunk["token"] elif "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}) text = delta.get("content", "") elif "text" in chunk: text = chunk["text"] else: text = str(chunk) if text: yield f"data: {json.dumps(make_openai_delta(text))}\n\n" yield "data: [DONE]\n\n" except Exception as e: error_chunk = make_openai_delta(f"Error: {str(e)}") yield f"data: {json.dumps(error_chunk)}\n\n" yield "data: [DONE]\n\n" # Non-streaming mode if not stream: async with httpx.AsyncClient(timeout=120) as c: r = await c.post(BYTEZ_CHAT_URL, headers=upstream_headers, json=payload) r.raise_for_status() data = r.json() if "choices" not in data: content = ( data.get("output") or data.get("response") or data.get("message") or str(data) ) data = { "id": "chatcmpl-v2", "object": "chat.completion", "choices": [ {"index": 0, "message": {"role": "assistant", "content": content}} ], } return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) # Streaming mode return StreamingResponse( clean_stream(), media_type="text/event-stream", headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # --------------------------------------------------------------------- # Minimal Gradio UI (required for HF Space to start) # --------------------------------------------------------------------- with gr.Blocks() as ui: gr.Markdown( "### Bytez → OpenAI Proxy (v1 + **v2**)\n" "- `/v1/models` \n" "- `/v1/chat/completions` (unchanged) \n" "- **`/v2/chat/completions`** – clean streaming, no usage chunk" ) demo = gr.mount_gradio_app(api, ui, path="/") # This makes it work on Render, Railway, Fly.io, etc. app = api if __name__ == "__main__": # Only for local testing with Gradio uvicorn.run(demo, host="0.0.0.0", port=7860)