import os import json import time import httpx import uvicorn import gradio as gr from fastapi import FastAPI, Request, Header, HTTPException from fastapi.responses import JSONResponse, StreamingResponse # --------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------- BYTEZ_CHAT_URL = "https://api.bytez.com/models/v2/openai/v1/chat/completions" BYTEZ_MODELS_URL = "https://api.bytez.com/models/v2/list/models" BYTEZ_IMAGE_URL = "https://api.bytez.com/models/v2/google/imagen-4.0-ultra-generate-001" BYTEZ_AUTH = os.getenv("BYTEZ_API_KEY") # your Bytez key LOCAL_API_KEY = os.getenv("LOCAL_API_KEY") # optional local guard BYTEZ_AUTH_2 = os.getenv("BYTEZ_API_KEY_2") # --------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------- api = FastAPI(title="Bytez → OpenAI Proxy (v1 + v2)") def check_key(auth: str | None): """Validate the Bearer token (optional local key).""" if not auth or not auth.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid API key") user_key = auth.split("Bearer ")[1].strip() if LOCAL_API_KEY and user_key != LOCAL_API_KEY: raise HTTPException(status_code=403, detail="Unauthorized API key") # --------------------------------------------------------------------- # Root / health # --------------------------------------------------------------------- @api.get("/") def root(): return {"status": "ok", "message": "Bytez proxy (v1+v2) running"} # --------------------------------------------------------------------- # -------------------------- /v1 ------------------------------------ # --------------------------------------------------------------------- @api.get("/v1/models") async def v1_models(authorization: str = Header(None)): check_key(authorization) if not BYTEZ_AUTH: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") async with httpx.AsyncClient(timeout=30) as c: r = await c.get(BYTEZ_MODELS_URL, headers={"Authorization": BYTEZ_AUTH}) try: data = r.json() except json.JSONDecodeError: raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") # Transform Bytez → OpenAI list models_list = [ {"id": m.get("id") or m.get("name"), "object": "model"} for m in (data if isinstance(data, list) else data.get("data", [])) ] return JSONResponse( {"object": "list", "data": models_list}, headers={"Access-Control-Allow-Origin": "*"} ) @api.post("/v1/chat/completions") async def v1_chat(request: Request, authorization: str = Header(None)): """Exactly the same implementation you already had – untouched.""" check_key(authorization) if not BYTEZ_AUTH: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") payload = await request.json() stream = payload.get("stream", False) headers = {"Authorization": BYTEZ_AUTH, "Content-Type": "application/json"} # ---------- streaming helper ---------- async def v1_event_stream(): async with httpx.AsyncClient(timeout=120) as client: async with client.stream("POST", BYTEZ_CHAT_URL, headers=headers, json=payload) as upstream: async for line in upstream.aiter_lines(): line = line.strip() if not line: continue json_str = line[6:] if line.startswith("data: ") else line try: chunk = json.loads(json_str) except json.JSONDecodeError: continue if json_str == "[DONE]": yield "data: [DONE]\n\n" break # ----- adapt Bytez chunk to OpenAI ----- content = "" if "token" in chunk: content = chunk["token"] elif "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}) content = delta.get("content", "") elif "text" in chunk: content = chunk["text"] else: content = str(chunk) openai_chunk = { "id": "chatcmpl-proxy-stream", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"role": "assistant", "content": content}, "finish_reason": None, } ], } yield f"data: {json.dumps(openai_chunk)}\n\n" yield "data: [DONE]\n\n" # ---------- non-stream ---------- if not stream: async with httpx.AsyncClient(timeout=120) as c: r = await c.post(BYTEZ_CHAT_URL, headers=headers, json=payload) try: data = r.json() except json.JSONDecodeError: raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") if "choices" not in data: content = data.get("output") or data.get("response") or data.get("message") or str(data) data = { "id": "chatcmpl-proxy", "object": "chat.completion", "choices": [{"index": 0, "message": {"role": "assistant", "content": content}}], } return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) return StreamingResponse( v1_event_stream(), media_type="text/event-stream", headers={"Access-Control-Allow-Origin": "*"}, ) # --------------------------------------------------------------------- # --------------------- /v1/images/generations (FIXED) --------------- # --------------------------------------------------------------------- # --------------------------------------------------------------------- # --------------------- /v1/images/generations ----------------------- # --------------------------------------------------------------------- @api.post("/v1/images/generations") async def v1_images_generations(request: Request, authorization: str = Header(None)): """ OpenAI-compatible image generation endpoint → Bytez DALL-E 3 """ check_key(authorization) if not BYTEZ_AUTH: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") # -------- Parse request -------- try: payload = await request.json() except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") prompt = payload.get("prompt") if not prompt: raise HTTPException(status_code=400, detail="Field 'prompt' is required") n = int(payload.get("n", 1)) response_format = payload.get("response_format", "url") # url | b64_json # -------- Bytez request format -------- bytez_payload = { "text": prompt } headers = { "Authorization": f"Bearer {BYTEZ_AUTH}", "Content-Type": "application/json", } # -------- Call Bytez -------- async with httpx.AsyncClient(timeout=200) as client: try: resp = await client.post(BYTEZ_IMAGE_URL, json=bytez_payload, headers=headers) resp.raise_for_status() except httpx.HTTPStatusError as e: detail = None try: detail = e.response.json() except: detail = e.response.text raise HTTPException(status_code=e.response.status_code, detail={"upstream_error": detail}) try: bytez_data = resp.json() except: raise HTTPException(status_code=502, detail="Bytez returned invalid JSON") # ----------------------------------------------------------------- # -------- Extract URL (bytez_data.output) -------- # ----------------------------------------------------------------- url_output = None if isinstance(bytez_data, dict): url_output = bytez_data.get("output") # ----------------------------------------------------------------- # -------- Extract Base64 (provider.generatedImages[].imageBytes) -- # ----------------------------------------------------------------- b64_list = [] try: generated = bytez_data["provider"]["generatedImages"] for g in generated: img = g.get("image", {}) if "imageBytes" in img: b64_list.append(img["imageBytes"]) except Exception: pass # ignore if provider/genImages is missing # ----------------------------------------------------------------- # -------- Build OpenAI-compatible response ------------------------ # ----------------------------------------------------------------- openai_data = [] count = max(len(b64_list), 1) count = min(count, n) for i in range(count): # --- Base64 Output --- if response_format == "b64_json": if i < len(b64_list): openai_data.append({"b64_json": b64_list[i]}) else: # fallback: turn URL into base64-json payload (OpenAI accepts raw string) openai_data.append({"b64_json": url_output}) else: # --- URL Output --- if url_output: openai_data.append({"url": url_output}) elif i < len(b64_list): # fallback: convert base64 into a data URL openai_data.append({"url": f"data:image/png;base64,{b64_list[i]}"}) else: openai_data.append({"url": ""}) result = { "created": int(time.time()), "data": openai_data, } return JSONResponse(result, headers={"Access-Control-Allow-Origin": "*"}) # --------------------------------------------------------------------- # -------------------------- /v2 ------------------------------------ # --------------------------------------------------------------------- @api.post("/v2/chat/completions") async def v2_chat_completions(request: Request, authorization: str = Header(None)): """ v2 – clean OpenAI-compatible streaming. * First chunk includes role=assistant (required by Continue.dev) * Later chunks send only delta.content * No usage events """ check_key(authorization) if not BYTEZ_AUTH_2: raise HTTPException(status_code=500, detail="Server BYTEZ_API_2 not configured") try: body = await request.body() payload = json.loads(body.decode("utf-8")) except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") stream = payload.get("stream", False) upstream_headers = { "Authorization": BYTEZ_AUTH_2, "Content-Type": "application/json", } # Normal content chunk (NO ROLE) def make_openai_delta(content: str): return { "id": f"chatcmpl-v2-{int(time.time())}", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"content": content}, "finish_reason": None, } ], } async def clean_stream(): # FIRST CHUNK MUST SET THE ROLE → REQUIRED by Continue.dev first_chunk = { "id": f"chatcmpl-v2-{int(time.time())}", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None, } ], } # Send first role-setting chunk yield f"data: {json.dumps(first_chunk)}\n\n" async with httpx.AsyncClient(timeout=180) as client: try: async with client.stream( "POST", BYTEZ_CHAT_URL, headers=upstream_headers, json=payload ) as upstream: async for line in upstream.aiter_lines(): line = line.strip() if not line: continue json_str = line[6:] if line.startswith("data: ") else line # Skip usage events if "usage" in json_str.lower(): continue if json_str == "[DONE]": yield "data: [DONE]\n\n" return try: chunk = json.loads(json_str) except json.JSONDecodeError: continue text = "" if isinstance(chunk, dict): if "token" in chunk: text = chunk["token"] elif "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}) text = delta.get("content", "") elif "text" in chunk: text = chunk["text"] else: text = str(chunk) if text: yield f"data: {json.dumps(make_openai_delta(text))}\n\n" yield "data: [DONE]\n\n" except Exception as e: error_chunk = make_openai_delta(f"Error: {str(e)}") yield f"data: {json.dumps(error_chunk)}\n\n" yield "data: [DONE]\n\n" # Non-streaming mode if not stream: async with httpx.AsyncClient(timeout=120) as c: r = await c.post(BYTEZ_CHAT_URL, headers=upstream_headers, json=payload) r.raise_for_status() data = r.json() if "choices" not in data: content = ( data.get("output") or data.get("response") or data.get("message") or str(data) ) data = { "id": "chatcmpl-v2", "object": "chat.completion", "choices": [ {"index": 0, "message": {"role": "assistant", "content": content}} ], } return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) # Streaming mode return StreamingResponse( clean_stream(), media_type="text/event-stream", headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # --------------------------------------------------------------------- # Minimal Gradio UI (required for HF Space to start) # --------------------------------------------------------------------- with gr.Blocks() as ui: gr.Markdown( "### Bytez → OpenAI Proxy (v1 + **v2**)\n" "- `/v1/models` \n" "- `/v1/chat/completions` (unchanged) \n" "- **`/v2/chat/completions`** – clean streaming, no usage chunk" ) demo = gr.mount_gradio_app(api, ui, path="/") # This makes it work on Render, Railway, Fly.io, etc. app = api if __name__ == "__main__": # Only for local testing with Gradio uvicorn.run(demo, host="0.0.0.0", port=7860)