Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import httpx | |
| import uvicorn | |
| import gradio as gr | |
| from fastapi import FastAPI, Request, Header, HTTPException | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| # --------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------- | |
| BYTEZ_CHAT_URL = "https://api.bytez.com/models/v2/openai/v1/chat/completions" | |
| BYTEZ_MODELS_URL = "https://api.bytez.com/models/v2/list/models" | |
| BYTEZ_IMAGE_URL = "https://api.bytez.com/models/v2/google/imagen-4.0-ultra-generate-001" | |
| BYTEZ_AUTH = os.getenv("BYTEZ_API_KEY") # your Bytez key | |
| LOCAL_API_KEY = os.getenv("LOCAL_API_KEY") # optional local guard | |
| BYTEZ_AUTH_2 = os.getenv("BYTEZ_API_KEY_2") | |
| # --------------------------------------------------------------------- | |
| # FastAPI app | |
| # --------------------------------------------------------------------- | |
| api = FastAPI(title="Bytez → OpenAI Proxy (v1 + v2)") | |
| def check_key(auth: str | None): | |
| """Validate the Bearer token (optional local key).""" | |
| if not auth or not auth.startswith("Bearer "): | |
| raise HTTPException(status_code=401, detail="Missing or invalid API key") | |
| user_key = auth.split("Bearer ")[1].strip() | |
| if LOCAL_API_KEY and user_key != LOCAL_API_KEY: | |
| raise HTTPException(status_code=403, detail="Unauthorized API key") | |
| # --------------------------------------------------------------------- | |
| # Root / health | |
| # --------------------------------------------------------------------- | |
| def root(): | |
| return {"status": "ok", "message": "Bytez proxy (v1+v2) running"} | |
| # --------------------------------------------------------------------- | |
| # -------------------------- /v1 ------------------------------------ | |
| # --------------------------------------------------------------------- | |
| async def v1_models(authorization: str = Header(None)): | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| async with httpx.AsyncClient(timeout=30) as c: | |
| r = await c.get(BYTEZ_MODELS_URL, headers={"Authorization": BYTEZ_AUTH}) | |
| try: | |
| data = r.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") | |
| # Transform Bytez → OpenAI list | |
| models_list = [ | |
| {"id": m.get("id") or m.get("name"), "object": "model"} | |
| for m in (data if isinstance(data, list) else data.get("data", [])) | |
| ] | |
| return JSONResponse( | |
| {"object": "list", "data": models_list}, | |
| headers={"Access-Control-Allow-Origin": "*"} | |
| ) | |
| async def v1_chat(request: Request, authorization: str = Header(None)): | |
| """Exactly the same implementation you already had – untouched.""" | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| payload = await request.json() | |
| stream = payload.get("stream", False) | |
| headers = {"Authorization": BYTEZ_AUTH, "Content-Type": "application/json"} | |
| # ---------- streaming helper ---------- | |
| async def v1_event_stream(): | |
| async with httpx.AsyncClient(timeout=120) as client: | |
| async with client.stream("POST", BYTEZ_CHAT_URL, headers=headers, json=payload) as upstream: | |
| async for line in upstream.aiter_lines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| json_str = line[6:] if line.startswith("data: ") else line | |
| try: | |
| chunk = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| if json_str == "[DONE]": | |
| yield "data: [DONE]\n\n" | |
| break | |
| # ----- adapt Bytez chunk to OpenAI ----- | |
| content = "" | |
| if "token" in chunk: | |
| content = chunk["token"] | |
| elif "choices" in chunk and chunk["choices"]: | |
| delta = chunk["choices"][0].get("delta", {}) | |
| content = delta.get("content", "") | |
| elif "text" in chunk: | |
| content = chunk["text"] | |
| else: | |
| content = str(chunk) | |
| openai_chunk = { | |
| "id": "chatcmpl-proxy-stream", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": payload.get("model", "unknown"), | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"role": "assistant", "content": content}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| yield f"data: {json.dumps(openai_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| # ---------- non-stream ---------- | |
| if not stream: | |
| async with httpx.AsyncClient(timeout=120) as c: | |
| r = await c.post(BYTEZ_CHAT_URL, headers=headers, json=payload) | |
| try: | |
| data = r.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") | |
| if "choices" not in data: | |
| content = data.get("output") or data.get("response") or data.get("message") or str(data) | |
| data = { | |
| "id": "chatcmpl-proxy", | |
| "object": "chat.completion", | |
| "choices": [{"index": 0, "message": {"role": "assistant", "content": content}}], | |
| } | |
| return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) | |
| return StreamingResponse( | |
| v1_event_stream(), | |
| media_type="text/event-stream", | |
| headers={"Access-Control-Allow-Origin": "*"}, | |
| ) | |
| # --------------------------------------------------------------------- | |
| # --------------------- /v1/images/generations (FIXED) --------------- | |
| # --------------------------------------------------------------------- | |
| # --------------------------------------------------------------------- | |
| # --------------------- /v1/images/generations ----------------------- | |
| # --------------------------------------------------------------------- | |
| async def v1_images_generations(request: Request, authorization: str = Header(None)): | |
| """ | |
| OpenAI-compatible image generation endpoint → Bytez DALL-E 3 | |
| """ | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| # -------- Parse request -------- | |
| try: | |
| payload = await request.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=400, detail="Invalid JSON") | |
| prompt = payload.get("prompt") | |
| if not prompt: | |
| raise HTTPException(status_code=400, detail="Field 'prompt' is required") | |
| n = int(payload.get("n", 1)) | |
| response_format = payload.get("response_format", "url") # url | b64_json | |
| # -------- Bytez request format -------- | |
| bytez_payload = { | |
| "text": prompt | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {BYTEZ_AUTH}", | |
| "Content-Type": "application/json", | |
| } | |
| # -------- Call Bytez -------- | |
| async with httpx.AsyncClient(timeout=200) as client: | |
| try: | |
| resp = await client.post(BYTEZ_IMAGE_URL, json=bytez_payload, headers=headers) | |
| resp.raise_for_status() | |
| except httpx.HTTPStatusError as e: | |
| detail = None | |
| try: | |
| detail = e.response.json() | |
| except: | |
| detail = e.response.text | |
| raise HTTPException(status_code=e.response.status_code, detail={"upstream_error": detail}) | |
| try: | |
| bytez_data = resp.json() | |
| except: | |
| raise HTTPException(status_code=502, detail="Bytez returned invalid JSON") | |
| # ----------------------------------------------------------------- | |
| # -------- Extract URL (bytez_data.output) -------- | |
| # ----------------------------------------------------------------- | |
| url_output = None | |
| if isinstance(bytez_data, dict): | |
| url_output = bytez_data.get("output") | |
| # ----------------------------------------------------------------- | |
| # -------- Extract Base64 (provider.generatedImages[].imageBytes) -- | |
| # ----------------------------------------------------------------- | |
| b64_list = [] | |
| try: | |
| generated = bytez_data["provider"]["generatedImages"] | |
| for g in generated: | |
| img = g.get("image", {}) | |
| if "imageBytes" in img: | |
| b64_list.append(img["imageBytes"]) | |
| except Exception: | |
| pass # ignore if provider/genImages is missing | |
| # ----------------------------------------------------------------- | |
| # -------- Build OpenAI-compatible response ------------------------ | |
| # ----------------------------------------------------------------- | |
| openai_data = [] | |
| count = max(len(b64_list), 1) | |
| count = min(count, n) | |
| for i in range(count): | |
| # --- Base64 Output --- | |
| if response_format == "b64_json": | |
| if i < len(b64_list): | |
| openai_data.append({"b64_json": b64_list[i]}) | |
| else: | |
| # fallback: turn URL into base64-json payload (OpenAI accepts raw string) | |
| openai_data.append({"b64_json": url_output}) | |
| else: | |
| # --- URL Output --- | |
| if url_output: | |
| openai_data.append({"url": url_output}) | |
| elif i < len(b64_list): | |
| # fallback: convert base64 into a data URL | |
| openai_data.append({"url": f"data:image/png;base64,{b64_list[i]}"}) | |
| else: | |
| openai_data.append({"url": ""}) | |
| result = { | |
| "created": int(time.time()), | |
| "data": openai_data, | |
| } | |
| return JSONResponse(result, headers={"Access-Control-Allow-Origin": "*"}) | |
| # --------------------------------------------------------------------- | |
| # -------------------------- /v2 ------------------------------------ | |
| # --------------------------------------------------------------------- | |
| async def v2_chat_completions(request: Request, authorization: str = Header(None)): | |
| """ | |
| v2 – clean OpenAI-compatible streaming. | |
| * First chunk includes role=assistant (required by Continue.dev) | |
| * Later chunks send only delta.content | |
| * No usage events | |
| """ | |
| check_key(authorization) | |
| if not BYTEZ_AUTH_2: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_2 not configured") | |
| try: | |
| body = await request.body() | |
| payload = json.loads(body.decode("utf-8")) | |
| except json.JSONDecodeError as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}") | |
| stream = payload.get("stream", False) | |
| upstream_headers = { | |
| "Authorization": BYTEZ_AUTH_2, | |
| "Content-Type": "application/json", | |
| } | |
| # Normal content chunk (NO ROLE) | |
| def make_openai_delta(content: str): | |
| return { | |
| "id": f"chatcmpl-v2-{int(time.time())}", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": payload.get("model", "unknown"), | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"content": content}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| async def clean_stream(): | |
| # FIRST CHUNK MUST SET THE ROLE → REQUIRED by Continue.dev | |
| first_chunk = { | |
| "id": f"chatcmpl-v2-{int(time.time())}", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": payload.get("model", "unknown"), | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"role": "assistant", "content": ""}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| # Send first role-setting chunk | |
| yield f"data: {json.dumps(first_chunk)}\n\n" | |
| async with httpx.AsyncClient(timeout=180) as client: | |
| try: | |
| async with client.stream( | |
| "POST", BYTEZ_CHAT_URL, headers=upstream_headers, json=payload | |
| ) as upstream: | |
| async for line in upstream.aiter_lines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| json_str = line[6:] if line.startswith("data: ") else line | |
| # Skip usage events | |
| if "usage" in json_str.lower(): | |
| continue | |
| if json_str == "[DONE]": | |
| yield "data: [DONE]\n\n" | |
| return | |
| try: | |
| chunk = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| text = "" | |
| if isinstance(chunk, dict): | |
| if "token" in chunk: | |
| text = chunk["token"] | |
| elif "choices" in chunk and chunk["choices"]: | |
| delta = chunk["choices"][0].get("delta", {}) | |
| text = delta.get("content", "") | |
| elif "text" in chunk: | |
| text = chunk["text"] | |
| else: | |
| text = str(chunk) | |
| if text: | |
| yield f"data: {json.dumps(make_openai_delta(text))}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| error_chunk = make_openai_delta(f"Error: {str(e)}") | |
| yield f"data: {json.dumps(error_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| # Non-streaming mode | |
| if not stream: | |
| async with httpx.AsyncClient(timeout=120) as c: | |
| r = await c.post(BYTEZ_CHAT_URL, headers=upstream_headers, json=payload) | |
| r.raise_for_status() | |
| data = r.json() | |
| if "choices" not in data: | |
| content = ( | |
| data.get("output") | |
| or data.get("response") | |
| or data.get("message") | |
| or str(data) | |
| ) | |
| data = { | |
| "id": "chatcmpl-v2", | |
| "object": "chat.completion", | |
| "choices": [ | |
| {"index": 0, "message": {"role": "assistant", "content": content}} | |
| ], | |
| } | |
| return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) | |
| # Streaming mode | |
| return StreamingResponse( | |
| clean_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Headers": "*", | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| # --------------------------------------------------------------------- | |
| # Minimal Gradio UI (required for HF Space to start) | |
| # --------------------------------------------------------------------- | |
| with gr.Blocks() as ui: | |
| gr.Markdown( | |
| "### Bytez → OpenAI Proxy (v1 + **v2**)\n" | |
| "- `/v1/models` \n" | |
| "- `/v1/chat/completions` (unchanged) \n" | |
| "- **`/v2/chat/completions`** – clean streaming, no usage chunk" | |
| ) | |
| demo = gr.mount_gradio_app(api, ui, path="/") | |
| # This makes it work on Render, Railway, Fly.io, etc. | |
| app = api | |
| if __name__ == "__main__": | |
| # Only for local testing with Gradio | |
| uvicorn.run(demo, host="0.0.0.0", port=7860) | |