Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import httpx | |
| import uvicorn | |
| import gradio as gr | |
| from fastapi import FastAPI, Request, Header, HTTPException | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| # --------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------- | |
| BYTEZ_CHAT_URL = "https://api.bytez.com/models/v2/openai/v1/chat/completions" | |
| BYTEZ_MODELS_URL = "https://api.bytez.com/models/v2/list/models" | |
| BYTEZ_IMAGE_URL = "https://api.bytez.com/models/v2/openai/dall-e-3" | |
| BYTEZ_AUTH = os.getenv("BYTEZ_API_KEY") # your Bytez key | |
| LOCAL_API_KEY = os.getenv("LOCAL_API_KEY") # optional local guard | |
| BYTEZ_AUTH_2 = os.getenv("BYTEZ_API_KEY_2") | |
| # --------------------------------------------------------------------- | |
| # FastAPI app | |
| # --------------------------------------------------------------------- | |
| api = FastAPI(title="Bytez β OpenAI Proxy (v1 + v2)") | |
| def check_key(auth: str | None): | |
| """Validate the Bearer token (optional local key).""" | |
| if not auth or not auth.startswith("Bearer "): | |
| raise HTTPException(status_code=401, detail="Missing or invalid API key") | |
| user_key = auth.split("Bearer ")[1].strip() | |
| if LOCAL_API_KEY and user_key != LOCAL_API_KEY: | |
| raise HTTPException(status_code=403, detail="Unauthorized API key") | |
| # --------------------------------------------------------------------- | |
| # Root / health | |
| # --------------------------------------------------------------------- | |
| def root(): | |
| return {"status": "ok", "message": "Bytez proxy (v1+v2) running"} | |
| # --------------------------------------------------------------------- | |
| # -------------------------- /v1 ------------------------------------ | |
| # --------------------------------------------------------------------- | |
| async def v1_models(authorization: str = Header(None)): | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| async with httpx.AsyncClient(timeout=30) as c: | |
| r = await c.get(BYTEZ_MODELS_URL, headers={"Authorization": BYTEZ_AUTH}) | |
| try: | |
| data = r.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") | |
| # Transform Bytez β OpenAI list | |
| models_list = [ | |
| {"id": m.get("id") or m.get("name"), "object": "model"} | |
| for m in (data if isinstance(data, list) else data.get("data", [])) | |
| ] | |
| return JSONResponse( | |
| {"object": "list", "data": models_list}, | |
| headers={"Access-Control-Allow-Origin": "*"} | |
| ) | |
| async def v1_chat(request: Request, authorization: str = Header(None)): | |
| """Exactly the same implementation you already had β untouched.""" | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| payload = await request.json() | |
| stream = payload.get("stream", False) | |
| headers = {"Authorization": BYTEZ_AUTH, "Content-Type": "application/json"} | |
| # ---------- streaming helper ---------- | |
| async def v1_event_stream(): | |
| async with httpx.AsyncClient(timeout=120) as client: | |
| async with client.stream("POST", BYTEZ_CHAT_URL, headers=headers, json=payload) as upstream: | |
| async for line in upstream.aiter_lines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| json_str = line[6:] if line.startswith("data: ") else line | |
| try: | |
| chunk = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| if json_str == "[DONE]": | |
| yield "data: [DONE]\n\n" | |
| break | |
| # ----- adapt Bytez chunk to OpenAI ----- | |
| content = "" | |
| if "token" in chunk: | |
| content = chunk["token"] | |
| elif "choices" in chunk and chunk["choices"]: | |
| delta = chunk["choices"][0].get("delta", {}) | |
| content = delta.get("content", "") | |
| elif "text" in chunk: | |
| content = chunk["text"] | |
| else: | |
| content = str(chunk) | |
| openai_chunk = { | |
| "id": "chatcmpl-proxy-stream", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": payload.get("model", "unknown"), | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"role": "assistant", "content": content}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| yield f"data: {json.dumps(openai_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| # ---------- non-stream ---------- | |
| if not stream: | |
| async with httpx.AsyncClient(timeout=120) as c: | |
| r = await c.post(BYTEZ_CHAT_URL, headers=headers, json=payload) | |
| try: | |
| data = r.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") | |
| if "choices" not in data: | |
| content = data.get("output") or data.get("response") or data.get("message") or str(data) | |
| data = { | |
| "id": "chatcmpl-proxy", | |
| "object": "chat.completion", | |
| "choices": [{"index": 0, "message": {"role": "assistant", "content": content}}], | |
| } | |
| return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) | |
| return StreamingResponse( | |
| v1_event_stream(), | |
| media_type="text/event-stream", | |
| headers={"Access-Control-Allow-Origin": "*"}, | |
| ) | |
| # --------------------------------------------------------------------- | |
| # --------------------- /v1/images/generations (FIXED) --------------- | |
| # --------------------------------------------------------------------- | |
| # --------------------------------------------------------------------- | |
| # --------------------- /v1/images/generations ----------------------- | |
| # --------------------------------------------------------------------- | |
| async def v1_images_generations(request: Request, authorization: str = Header(None)): | |
| """ | |
| OpenAI-compatible image generation endpoint that forwards to: | |
| https://api.bytez.com/models/v2/openai/dall-e-3 | |
| It accepts the usual OpenAI body: | |
| { | |
| "model": "dall-e-3", # REQUIRED by clients, but ignored | |
| "prompt": "text", # REQUIRED | |
| "n": 1, | |
| "size": "1024x1024", | |
| "response_format": "url" | "b64_json", | |
| "quality": "standard" | "hd", | |
| "style": "vivid" | "natural" | |
| } | |
| and converts it to the Bytez format from your curl: | |
| { "text": "<prompt>" } | |
| """ | |
| check_key(authorization) | |
| # Use SAME key as chat unless you *really* have a separate one | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| # ---------------- parse request ---------------- | |
| try: | |
| payload = await request.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=400, detail="Invalid JSON") | |
| prompt = payload.get("prompt") or payload.get("text") | |
| if not prompt or not str(prompt).strip(): | |
| raise HTTPException(status_code=400, detail="Field 'prompt' is required") | |
| # These are there only to satisfy OpenAI-compatible clients | |
| _model = payload.get("model", "dall-e-3") # ignored | |
| n = int(payload.get("n", 1)) | |
| response_format = payload.get("response_format", "url") # "url" | "b64_json" | |
| size = payload.get("size", "1024x1024") | |
| quality = payload.get("quality") | |
| style = payload.get("style") | |
| # ---------------- build Bytez request ---------------- | |
| # Minimal payload that we KNOW works from your curl: | |
| bytez_payload = { | |
| "text": prompt, | |
| } | |
| # Only add extras if Bytez actually supports them. | |
| # Comment these out if youβre unsure; or keep them if Bytez docs say so. | |
| # bytez_payload["num_outputs"] = n | |
| # bytez_payload["size"] = size | |
| # if quality in ["standard", "hd"]: | |
| # bytez_payload["quality"] = quality | |
| # if style in ["vivid", "natural"]: | |
| # bytez_payload["style"] = style | |
| headers = { | |
| "Authorization": BYTEZ_AUTH, # << important | |
| "Content-Type": "application/json", | |
| } | |
| # ---------------- call Bytez ---------------- | |
| async with httpx.AsyncClient(timeout=200) as client: | |
| try: | |
| resp = await client.post( | |
| BYTEZ_IMAGE_URL, | |
| json=bytez_payload, | |
| headers=headers, | |
| ) | |
| resp.raise_for_status() | |
| except httpx.HTTPStatusError as e: | |
| # Surface Bytez error message directly to client | |
| detail = None | |
| try: | |
| detail = e.response.json() | |
| except Exception: | |
| detail = e.response.text | |
| raise HTTPException( | |
| status_code=e.response.status_code, | |
| detail={"upstream_error": detail}, | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=502, detail=f"Bytez unreachable: {str(e)}") | |
| try: | |
| bytez_data = resp.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=502, detail="Bytez returned invalid JSON") | |
| # ---------------- map Bytez β OpenAI image response ---------------- | |
| # We don't know the exact shape, so we handle several possibilities. | |
| images = [] | |
| # Case 1: { "images": ["url_or_b64", ...] } | |
| if isinstance(bytez_data, dict) and "images" in bytez_data and isinstance(bytez_data["images"], list): | |
| images = bytez_data["images"] | |
| # Case 2: { "data": [ { "url": "..." } , ... ] } | |
| elif isinstance(bytez_data, dict) and "data" in bytez_data and isinstance(bytez_data["data"], list): | |
| for item in bytez_data["data"]: | |
| if "url" in item: | |
| images.append(item["url"]) | |
| elif "b64_json" in item: | |
| images.append(item["b64_json"]) | |
| else: | |
| images.append(str(item)) | |
| # Case 3: single string | |
| elif isinstance(bytez_data, str): | |
| images = [bytez_data] | |
| # Fallback: treat whole thing as one string | |
| else: | |
| images = [str(bytez_data)] | |
| if not images: | |
| raise HTTPException(status_code=500, detail={"error": "No images returned from Bytez", "raw": bytez_data}) | |
| # truncate to n images | |
| images = images[:n] | |
| openai_data = [] | |
| for img in images: | |
| # If it looks like a data URL already | |
| if isinstance(img, str) and img.startswith("data:image"): | |
| # Already a data URL; extract base64 if needed | |
| b64_part = img.split("base64,", 1)[-1] | |
| if response_format == "b64_json": | |
| openai_data.append({"b64_json": b64_part}) | |
| else: | |
| openai_data.append({"url": img}) | |
| else: | |
| # Raw URL or base64 β we can't be sure, so: | |
| if response_format == "b64_json": | |
| # Assume it's base64 or treat as a string anyway | |
| openai_data.append({"b64_json": str(img)}) | |
| else: | |
| # Assume it's a URL or make it one if needed | |
| openai_data.append({"url": str(img)}) | |
| result = { | |
| "created": int(time.time()), | |
| "data": openai_data, | |
| } | |
| return JSONResponse(result, headers={"Access-Control-Allow-Origin": "*"}) | |
| # --------------------------------------------------------------------- | |
| # -------------------------- /v2 ------------------------------------ | |
| # --------------------------------------------------------------------- | |
| async def v2_chat_completions(request: Request, authorization: str = Header(None)): | |
| """ | |
| v2 β clean OpenAI-compatible streaming. | |
| * First chunk includes role=assistant (required by Continue.dev) | |
| * Later chunks send only delta.content | |
| * No usage events | |
| """ | |
| check_key(authorization) | |
| if not BYTEZ_AUTH_2: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_2 not configured") | |
| try: | |
| body = await request.body() | |
| payload = json.loads(body.decode("utf-8")) | |
| except json.JSONDecodeError as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") | |
| stream = payload.get("stream", False) | |
| upstream_headers = { | |
| "Authorization": BYTEZ_AUTH_2, | |
| "Content-Type": "application/json", | |
| } | |
| # Normal content chunk (NO ROLE) | |
| def make_openai_delta(content: str): | |
| return { | |
| "id": f"chatcmpl-v2-{int(time.time())}", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": payload.get("model", "unknown"), | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"content": content}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| async def clean_stream(): | |
| # FIRST CHUNK MUST SET THE ROLE β REQUIRED by Continue.dev | |
| first_chunk = { | |
| "id": f"chatcmpl-v2-{int(time.time())}", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": payload.get("model", "unknown"), | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"role": "assistant", "content": ""}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| # Send first role-setting chunk | |
| yield f"data: {json.dumps(first_chunk)}\n\n" | |
| async with httpx.AsyncClient(timeout=180) as client: | |
| try: | |
| async with client.stream( | |
| "POST", BYTEZ_CHAT_URL, headers=upstream_headers, json=payload | |
| ) as upstream: | |
| async for line in upstream.aiter_lines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| json_str = line[6:] if line.startswith("data: ") else line | |
| # Skip usage events | |
| if "usage" in json_str.lower(): | |
| continue | |
| if json_str == "[DONE]": | |
| yield "data: [DONE]\n\n" | |
| return | |
| try: | |
| chunk = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| text = "" | |
| if isinstance(chunk, dict): | |
| if "token" in chunk: | |
| text = chunk["token"] | |
| elif "choices" in chunk and chunk["choices"]: | |
| delta = chunk["choices"][0].get("delta", {}) | |
| text = delta.get("content", "") | |
| elif "text" in chunk: | |
| text = chunk["text"] | |
| else: | |
| text = str(chunk) | |
| if text: | |
| yield f"data: {json.dumps(make_openai_delta(text))}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| error_chunk = make_openai_delta(f"Error: {str(e)}") | |
| yield f"data: {json.dumps(error_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| # Non-streaming mode | |
| if not stream: | |
| async with httpx.AsyncClient(timeout=120) as c: | |
| r = await c.post(BYTEZ_CHAT_URL, headers=upstream_headers, json=payload) | |
| r.raise_for_status() | |
| data = r.json() | |
| if "choices" not in data: | |
| content = ( | |
| data.get("output") | |
| or data.get("response") | |
| or data.get("message") | |
| or str(data) | |
| ) | |
| data = { | |
| "id": "chatcmpl-v2", | |
| "object": "chat.completion", | |
| "choices": [ | |
| {"index": 0, "message": {"role": "assistant", "content": content}} | |
| ], | |
| } | |
| return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) | |
| # Streaming mode | |
| return StreamingResponse( | |
| clean_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Headers": "*", | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| # --------------------------------------------------------------------- | |
| # Minimal Gradio UI (required for HF Space to start) | |
| # --------------------------------------------------------------------- | |
| with gr.Blocks() as ui: | |
| gr.Markdown( | |
| "### Bytez β OpenAI Proxy (v1 + **v2**)\n" | |
| "- `/v1/models` \n" | |
| "- `/v1/chat/completions` (unchanged) \n" | |
| "- **`/v2/chat/completions`** β clean streaming, no usage chunk" | |
| ) | |
| demo = gr.mount_gradio_app(api, ui, path="/") | |
| # This makes it work on Render, Railway, Fly.io, etc. | |
| app = api | |
| if __name__ == "__main__": | |
| # Only for local testing with Gradio | |
| uvicorn.run(demo, host="0.0.0.0", port=7860) | |