import os import json import time import httpx import uvicorn import gradio as gr from fastapi import FastAPI, Request, Header, HTTPException from fastapi.responses import JSONResponse, StreamingResponse # --------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------- BYTEZ_CHAT_URL = "https://api.bytez.com/models/v2/openai/v1/chat/completions" BYTEZ_MODELS_URL = "https://api.bytez.com/models/v2/list/models" BYTEZ_IMAGE_URL = "https://api.bytez.com/models/v2/openai/dall-e-3" BYTEZ_AUTH = os.getenv("BYTEZ_API_KEY") # your Bytez key LOCAL_API_KEY = os.getenv("LOCAL_API_KEY") # optional local guard BYTEZ_AUTH_2 = os.getenv("BYTEZ_API_KEY_2") # --------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------- api = FastAPI(title="Bytez → OpenAI Proxy (v1 + v2)") def check_key(auth: str | None): """Validate the Bearer token (optional local key).""" if not auth or not auth.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid API key") user_key = auth.split("Bearer ")[1].strip() if LOCAL_API_KEY and user_key != LOCAL_API_KEY: raise HTTPException(status_code=403, detail="Unauthorized API key") # --------------------------------------------------------------------- # Root / health # --------------------------------------------------------------------- @api.get("/") def root(): return {"status": "ok", "message": "Bytez proxy (v1+v2) running"} # --------------------------------------------------------------------- # -------------------------- /v1 ------------------------------------ # --------------------------------------------------------------------- @api.get("/v1/models") async def v1_models(authorization: str = Header(None)): check_key(authorization) if not BYTEZ_AUTH: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") async with httpx.AsyncClient(timeout=30) as c: r = await c.get(BYTEZ_MODELS_URL, headers={"Authorization": BYTEZ_AUTH}) try: data = r.json() except json.JSONDecodeError: raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") # Transform Bytez → OpenAI list models_list = [ {"id": m.get("id") or m.get("name"), "object": "model"} for m in (data if isinstance(data, list) else data.get("data", [])) ] return JSONResponse( {"object": "list", "data": models_list}, headers={"Access-Control-Allow-Origin": "*"} ) @api.post("/v1/chat/completions") async def v1_chat(request: Request, authorization: str = Header(None)): """Exactly the same implementation you already had – untouched.""" check_key(authorization) if not BYTEZ_AUTH: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") payload = await request.json() stream = payload.get("stream", False) headers = {"Authorization": BYTEZ_AUTH, "Content-Type": "application/json"} # ---------- streaming helper ---------- async def v1_event_stream(): async with httpx.AsyncClient(timeout=120) as client: async with client.stream("POST", BYTEZ_CHAT_URL, headers=headers, json=payload) as upstream: async for line in upstream.aiter_lines(): line = line.strip() if not line: continue json_str = line[6:] if line.startswith("data: ") else line try: chunk = json.loads(json_str) except json.JSONDecodeError: continue if json_str == "[DONE]": yield "data: [DONE]\n\n" break # ----- adapt Bytez chunk to OpenAI ----- content = "" if "token" in chunk: content = chunk["token"] elif "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}) content = delta.get("content", "") elif "text" in chunk: content = chunk["text"] else: content = str(chunk) openai_chunk = { "id": "chatcmpl-proxy-stream", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"role": "assistant", "content": content}, "finish_reason": None, } ], } yield f"data: {json.dumps(openai_chunk)}\n\n" yield "data: [DONE]\n\n" # ---------- non-stream ---------- if not stream: async with httpx.AsyncClient(timeout=120) as c: r = await c.post(BYTEZ_CHAT_URL, headers=headers, json=payload) try: data = r.json() except json.JSONDecodeError: raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") if "choices" not in data: content = data.get("output") or data.get("response") or data.get("message") or str(data) data = { "id": "chatcmpl-proxy", "object": "chat.completion", "choices": [{"index": 0, "message": {"role": "assistant", "content": content}}], } return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) return StreamingResponse( v1_event_stream(), media_type="text/event-stream", headers={"Access-Control-Allow-Origin": "*"}, ) # --------------------------------------------------------------------- # --------------------- /v1/images/generations (FIXED) --------------- # --------------------------------------------------------------------- @api.post("/v1/images/generations") async def v1_images_generations(request: Request, authorization: str = Header(None)): """ Fully OpenAI-compatible DALL·E-3 via Bytez → Accepts `model` field (required by Continue.dev, Cursor, etc.) → Ignores it safely (since Bytez uses URL path, not model name) → Returns proper OpenAI format with url + b64_json """ check_key(authorization) if not BYTEZ_AUTH_2: raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY_2 not configured") try: payload = await request.json() except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") prompt = payload.get("prompt") if not prompt or not prompt.strip(): raise HTTPException(status_code=400, detail="Field 'prompt' is required and cannot be empty") # These fields are REQUIRED by UI tools even if we ignore some model_name = payload.get("model", "dall-e-3") # just for show — we ignore it n = payload.get("n", 1) size = payload.get("size", "1024x1024") quality = payload.get("quality", "standard") style = payload.get("style") # vivid or natural response_format = payload.get("response_format", "url") # url or b64_json # Map OpenAI sizes → Bytez accepts the same strings if size not in ["1024x1024", "1024x1792", "1792x1024"]: size = "1024x1024" # fallback bytez_payload = { "text": prompt, "num_outputs": n, "size": size, } if quality in ["standard", "hd"]: bytez_payload["quality"] = quality if style in ["vivid", "natural"]: bytez_payload["style"] = style headers = { "Authorization": BYTEZ_AUTH_2, "Content-Type": "application/json", } async with httpx.AsyncClient(timeout=200) as client: try: resp = await client.post( "https://api.bytez.com/models/v2/openai/dall-e-3", json=bytez_payload, headers=headers, ) resp.raise_for_status() except httpx.HTTPStatusError as e: try: error_detail = e.response.json() except: error_detail = e.response.text raise HTTPException(status_code=e.response.status_code, detail=error_detail) except Exception as e: raise HTTPException(status_code=502, detail=f"Bytez unreachable: {str(e)}") try: bytez_data = resp.json() except json.JSONDecodeError: raise HTTPException(status_code=502, detail="Bytez returned invalid JSON") # Handle different possible response shapes from Bytez images = bytez_data.get("images") or bytez_data.get("data") or [] if isinstance(images, str): images = [images] if not images: raise HTTPException(status_code=500, detail="No images returned from Bytez") # Build proper OpenAI response openai_images = [] for img_data in images: if img_data.startswith("data:image"): b64 = img_data.split("base64,")[-1] url = img_data else: b64 = img_data url = f"data:image/png;base64,{img_data}" item = {} if response_format == "b64_json" or response_format is None: item["b64_json"] = b64 else: item["url"] = url # Optional: include revised_prompt if Bytez returns it if "revised_prompt" in bytez_data: item["revised_prompt"] = bytez_data["revised_prompt"] openai_images.append(item) final_response = { "created": int(time.time()), "data": openai_images } return JSONResponse(final_response, headers={"Access-Control-Allow-Origin": "*"}) # --------------------------------------------------------------------- # -------------------------- /v2 ------------------------------------ # --------------------------------------------------------------------- @api.post("/v2/chat/completions") async def v2_chat_completions(request: Request, authorization: str = Header(None)): """ v2 – clean OpenAI-compatible streaming. * First chunk includes role=assistant (required by Continue.dev) * Later chunks send only delta.content * No usage events """ check_key(authorization) if not BYTEZ_AUTH_2: raise HTTPException(status_code=500, detail="Server BYTEZ_API_2 not configured") try: body = await request.body() payload = json.loads(body.decode("utf-8")) except json.JSONDecodeError as e: raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") stream = payload.get("stream", False) upstream_headers = { "Authorization": BYTEZ_AUTH_2, "Content-Type": "application/json", } # Normal content chunk (NO ROLE) def make_openai_delta(content: str): return { "id": f"chatcmpl-v2-{int(time.time())}", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"content": content}, "finish_reason": None, } ], } async def clean_stream(): # FIRST CHUNK MUST SET THE ROLE → REQUIRED by Continue.dev first_chunk = { "id": f"chatcmpl-v2-{int(time.time())}", "object": "chat.completion.chunk", "created": int(time.time()), "model": payload.get("model", "unknown"), "choices": [ { "index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": None, } ], } # Send first role-setting chunk yield f"data: {json.dumps(first_chunk)}\n\n" async with httpx.AsyncClient(timeout=180) as client: try: async with client.stream( "POST", BYTEZ_CHAT_URL, headers=upstream_headers, json=payload ) as upstream: async for line in upstream.aiter_lines(): line = line.strip() if not line: continue json_str = line[6:] if line.startswith("data: ") else line # Skip usage events if "usage" in json_str.lower(): continue if json_str == "[DONE]": yield "data: [DONE]\n\n" return try: chunk = json.loads(json_str) except json.JSONDecodeError: continue text = "" if isinstance(chunk, dict): if "token" in chunk: text = chunk["token"] elif "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}) text = delta.get("content", "") elif "text" in chunk: text = chunk["text"] else: text = str(chunk) if text: yield f"data: {json.dumps(make_openai_delta(text))}\n\n" yield "data: [DONE]\n\n" except Exception as e: error_chunk = make_openai_delta(f"Error: {str(e)}") yield f"data: {json.dumps(error_chunk)}\n\n" yield "data: [DONE]\n\n" # Non-streaming mode if not stream: async with httpx.AsyncClient(timeout=120) as c: r = await c.post(BYTEZ_CHAT_URL, headers=upstream_headers, json=payload) r.raise_for_status() data = r.json() if "choices" not in data: content = ( data.get("output") or data.get("response") or data.get("message") or str(data) ) data = { "id": "chatcmpl-v2", "object": "chat.completion", "choices": [ {"index": 0, "message": {"role": "assistant", "content": content}} ], } return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) # Streaming mode return StreamingResponse( clean_stream(), media_type="text/event-stream", headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # --------------------------------------------------------------------- # Minimal Gradio UI (required for HF Space to start) # --------------------------------------------------------------------- with gr.Blocks() as ui: gr.Markdown( "### Bytez → OpenAI Proxy (v1 + **v2**)\n" "- `/v1/models` \n" "- `/v1/chat/completions` (unchanged) \n" "- **`/v2/chat/completions`** – clean streaming, no usage chunk" ) demo = gr.mount_gradio_app(api, ui, path="/") # --------------------------------------------------------------------- # Local dev entrypoint # --------------------------------------------------------------------- if __name__ == "__main__": uvicorn.run(demo, host="0.0.0.0", port=7860)