Spaces:
No application file
No application file
| import os | |
| import json | |
| import httpx | |
| import gradio as gr | |
| import uvicorn | |
| from fastapi import FastAPI, Request, Header, HTTPException, File, UploadFile | |
| from fastapi.responses import JSONResponse, StreamingResponse, FileResponse | |
| import time | |
| # --------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------- | |
| BYTEZ_CHAT_URL = "https://api.bytez.com/models/v2/openai/v1/chat/completions " | |
| BYTEZ_MODELS_URL = "https://api.bytez.com/models/v2/list/models " | |
| BYTEZ_TTS_URL = "https://api.bytez.com/models/v2/openai/tts-1-hd" | |
| BYTEZ_STT_URL = "https://api.bytez.com/models/v2/openai/whisper-1" | |
| BYTEZ_AUTH = os.getenv("BYTEZ_API_KEY") | |
| LOCAL_API_KEY = os.getenv("LOCAL_API_KEY") | |
| # --------------------------------------------------------------------- | |
| # FastAPI backend | |
| # --------------------------------------------------------------------- | |
| api = FastAPI(title="Bytez → OpenAI Proxy") | |
| def check_key(auth: str | None): | |
| if not auth or not auth.startswith("Bearer "): | |
| raise HTTPException(status_code=401, detail="Missing or invalid API key") | |
| user_key = auth.split("Bearer ")[1].strip() | |
| if LOCAL_API_KEY and user_key != LOCAL_API_KEY: | |
| raise HTTPException(status_code=403, detail="Unauthorized API key") | |
| # --------------------------------------------------------------------- | |
| # Root / health | |
| # --------------------------------------------------------------------- | |
| def root(): | |
| return {"status": "ok", "message": "Bytez proxy running"} | |
| # --------------------------------------------------------------------- | |
| # /v1/models → must look OpenAI-style | |
| # --------------------------------------------------------------------- | |
| async def models(authorization: str = Header(None)): | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| async with httpx.AsyncClient(timeout=30) as c: | |
| r = await c.get(BYTEZ_MODELS_URL, headers={"Authorization": BYTEZ_AUTH}) | |
| try: | |
| data = r.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") | |
| # Transform Bytez format → OpenAI format | |
| models_list = [ | |
| {"id": m.get("id") or m.get("name"), "object": "model"} | |
| for m in (data if isinstance(data, list) else data.get("data", [])) | |
| ] | |
| return JSONResponse( | |
| {"object": "list", "data": models_list}, | |
| headers={"Access-Control-Allow-Origin": "*"} | |
| ) | |
| # --------------------------------------------------------------------- | |
| # /v1/chat/completions | |
| # --------------------------------------------------------------------- | |
| async def chat(request: Request, authorization: str = Header(None)): | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| payload = await request.json() | |
| stream = payload.get("stream", False) | |
| headers = { | |
| "Authorization": BYTEZ_AUTH, | |
| "Content-Type": "application/json", | |
| } | |
| async def event_stream(): | |
| async with httpx.AsyncClient(timeout=120) as client: | |
| async with client.stream("POST", BYTEZ_CHAT_URL, headers=headers, json=payload) as upstream: | |
| async for line in upstream.aiter_lines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if line.startswith("data: "): | |
| json_str = line[6:] | |
| else: | |
| json_str = line | |
| try: | |
| chunk = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| if json_str == "[DONE]": | |
| yield "data: [DONE]\n\n" | |
| break | |
| content = "" | |
| if "token" in chunk: | |
| content = chunk["token"] | |
| elif "choices" in chunk and len(chunk["choices"]) > 0: | |
| delta = chunk["choices"][0].get("delta", {}) | |
| content = delta.get("content", "") | |
| elif "text" in chunk: | |
| content = chunk["text"] | |
| else: | |
| content = str(chunk) | |
| openai_chunk = { | |
| "id": "chatcmpl-proxy-stream", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": payload.get("model", "unknown"), | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"role": "assistant", "content": content}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| yield f"data: {json.dumps(openai_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| if stream: | |
| return StreamingResponse( | |
| event_stream(), | |
| media_type="text/event-stream", | |
| headers={"Access-Control-Allow-Origin": "*"} | |
| ) | |
| async with httpx.AsyncClient(timeout=120) as c: | |
| r = await c.post(BYTEZ_CHAT_URL, headers=headers, json=payload) | |
| try: | |
| data = r.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") | |
| if "choices" not in data: | |
| content = ( | |
| data.get("output") | |
| or data.get("response") | |
| or data.get("message") | |
| or str(data) | |
| ) | |
| data = { | |
| "id": "chatcmpl-proxy", | |
| "object": "chat.completion", | |
| "choices": [ | |
| {"index": 0, "message": {"role": "assistant", "content": content}} | |
| ], | |
| } | |
| return JSONResponse(data, headers={"Access-Control-Allow-Origin": "*"}) | |
| # --------------------------------------------------------------------- | |
| # /v1/tts (Text-to-Speech) | |
| # --------------------------------------------------------------------- | |
| async def tts(request: Request, authorization: str = Header(None)): | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| payload = await request.json() | |
| text = payload.get("text", "") | |
| if not text: | |
| raise HTTPException(status_code=400, detail="Text not provided for TTS") | |
| headers = { | |
| "Authorization": BYTEZ_AUTH, | |
| "Content-Type": "application/json", | |
| } | |
| async with httpx.AsyncClient(timeout=120) as c: | |
| r = await c.post(BYTEZ_TTS_URL, headers=headers, json={"text": text}) | |
| if r.status_code != 200: | |
| raise HTTPException(status_code=r.status_code, detail="TTS request failed") | |
| audio_data = r.content | |
| return StreamingResponse(io.BytesIO(audio_data), media_type="audio/wav") | |
| # --------------------------------------------------------------------- | |
| # /v1/stt (Speech-to-Text) | |
| # --------------------------------------------------------------------- | |
| async def stt(file: UploadFile = File(...), authorization: str = Header(None)): | |
| check_key(authorization) | |
| if not BYTEZ_AUTH: | |
| raise HTTPException(status_code=500, detail="Server BYTEZ_API_KEY not configured") | |
| headers = {"Authorization": BYTEZ_AUTH} | |
| file_content = await file.read() | |
| files = {"file": (file.filename, file_content, file.content_type)} | |
| async with httpx.AsyncClient(timeout=120) as c: | |
| r = await c.post(BYTEZ_STT_URL, headers=headers, files=files) | |
| try: | |
| data = r.json() | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=502, detail="Upstream returned invalid JSON") | |
| transcript = data.get("transcription", "") | |
| return {"transcription": transcript} | |
| # --------------------------------------------------------------------- | |
| # Minimal Gradio UI (to make HF Space start) | |
| # --------------------------------------------------------------------- | |
| with gr.Blocks() as ui: | |
| gr.Markdown("### ✅ Jwero Bytez → OpenAI Proxy\n" | |
| "Endpoints: `/v1/models`, `/v1/chat/completions`, `/v1/tts`, `/v1/stt`") | |
| demo = gr.mount_gradio_app(api, ui, path="/") | |
| if __name__ == "__main__": | |
| uvicorn.run(demo, host="0.0.0.0", port=7860) |