import os import time import json import pathlib import requests import gradio as gr BASE = "https://api.kie.ai/api/v1" CREATE_URL = f"{BASE}/jobs/createTask" def make_headers(user_key: str | None): api_key = (user_key or os.getenv("KIE_API_KEY") or "").strip() if not api_key: raise gr.Error("Masukkan KIE API key atau set KIE_API_KEY di Secrets.") return {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} def create_task(headers, prompt: str, aspect_ratio: str, duration: str | None): payload = { "model": "sora-2-text-to-video", "input": { "prompt": prompt, "aspect_ratio": aspect_ratio, "remove_watermark": False, }, } if duration: try: d = int(duration) if d > 0: payload["input"]["duration"] = d except Exception: pass r = requests.post(CREATE_URL, headers=headers, json=payload, timeout=60) if r.status_code >= 400: raise gr.Error(f"createTask gagal: {r.status_code} {r.text}") data = r.json() raw_preview = json.dumps(data)[:500] task_id = ( data.get("task_id") or data.get("taskId") or data.get("id") or (data.get("data") or {}).get("taskId") or (data.get("data") or {}).get("id") ) status_url = ( data.get("status_url") or data.get("statusUrl") or (data.get("data") or {}).get("status_url") or (data.get("data") or {}).get("statusUrl") ) if not task_id: raise gr.Error(f"Tidak menemukan task_id: {raw_preview}") return task_id, status_url, raw_preview def infer_status_and_result(obj: dict): status = obj.get("status") or obj.get("state") or (obj.get("data") or {}).get("status") progress = obj.get("progress") or (obj.get("data") or {}).get("progress") for key in ["result_url","output_url","video_url","download_url"]: if key in obj and isinstance(obj[key], str) and obj[key].startswith("http"): return status, progress, obj[key] data = obj.get("data") or {} for key in ["result_url","output_url","video_url","download_url"]: v = data.get(key) if isinstance(v, str) and v.startswith("http"): return status, progress, v return status, progress, None def probe_status_url(headers, task_id: str): candidates = [ f"{BASE}/jobs/getTask?taskId={task_id}", f"{BASE}/jobs/get-task?taskId={task_id}", f"{BASE}/jobs/task?taskId={task_id}", f"{BASE}/jobs/{task_id}", f"{BASE}/jobs/status/{task_id}", f"{BASE}/jobs/getTask/{task_id}", f"{BASE}/jobs/task/{task_id}", ] for url in candidates: try: r = requests.get(url, headers=headers, timeout=30) if r.status_code < 400: try: data = r.json() except Exception: continue status, prog, _ = infer_status_and_result(data) if status is not None: return url except Exception: pass # POST variant post_url = f"{BASE}/jobs/getTask" try: r = requests.post(post_url, headers=headers, json={"taskId": task_id}, timeout=30) if r.status_code < 400: data = r.json() status, prog, _ = infer_status_and_result(data) if status is not None: return post_url + " (POST)" except Exception: pass return None def poll_until_done(headers, status_url: str, task_id: str, interval=5, max_wait=60*40): waited = 0 last_status = None used_post = status_url.endswith("(POST)") while waited <= max_wait: try: if used_post: r = requests.post(status_url.replace(" (POST)", ""), headers=headers, json={"taskId": task_id}, timeout=60) else: r = requests.get(status_url, headers=headers, timeout=60) except Exception as e: yield f"[WARN] Poll error: {e}", None time.sleep(interval); waited += interval; continue if r.status_code == 404: yield f"[WARN] Poll 404 di {status_url} — mencoba endpoint lain...", None new_url = probe_status_url(headers, task_id) if not new_url: raise gr.Error("Tidak menemukan endpoint status. Mohon cek dokumentasi Kie AI.") status_url = new_url used_post = status_url.endswith("(POST)") continue if r.status_code >= 400: yield f"[WARN] Poll failed: {r.status_code} {r.text}", None else: try: data = r.json() except Exception: data = {} yield "[WARN] Response status bukan JSON.", None status, progress, result_url = infer_status_and_result(data) if status != last_status: yield f"{status} - progress: {progress}", None last_status = status s = (str(status) or "").lower() if s in {"succeeded","completed","success","done"}: yield "Selesai di server — mengunduh...", result_url return if s in {"failed","error","canceled","cancelled"}: raise gr.Error(f"Task gagal:\n{json.dumps(data, indent=2)}") time.sleep(interval); waited += interval raise gr.Error("Timeout menunggu task selesai.") def download_result(url: str, task_id: str): if not url: raise gr.Error("Server tidak mengirimkan URL hasil.") out = pathlib.Path("/tmp") / f"{task_id}.mp4" out.parent.mkdir(parents=True, exist_ok=True) with requests.get(url, stream=True, timeout=300) as r: if r.status_code >= 400: raise gr.Error(f"Gagal download hasil: {r.status_code} {r.text}") with open(out, "wb") as f: for chunk in r.iter_content(chunk_size=1 << 20): if chunk: f.write(chunk) return str(out) def run_job(user_api_key, prompt, aspect_ratio, duration): headers = make_headers(user_api_key) task_id, status_url, preview = create_task(headers, prompt, aspect_ratio, duration) yield f"Task dibuat: {task_id}\nStatus URL (server): {status_url or '-'}\nCreate resp: {preview}", None if not status_url: found = probe_status_url(headers, task_id) if not found: raise gr.Error("Tidak bisa menentukan endpoint status. Kirim response createTask penuh agar saya mappingkan field yang benar.") status_url = found result_link = None for status_msg, maybe_url in poll_until_done(headers, status_url, task_id): yield status_msg, None if maybe_url: result_link = maybe_url path = download_result(result_link, task_id) yield f"Selesai ✅ (task {task_id})", path def runtime_info(user_api_key): key = (user_api_key or os.getenv("KIE_API_KEY") or "").strip() src = "BYOK (user input)" if user_api_key else ("ENV KIE_API_KEY" if os.getenv("KIE_API_KEY") else "NONE") masked = (key[:4] + "..." + key[-4:]) if key else "(none)" return f"Key source: {src}\nKey hint: {masked}\nBase: {BASE}" with gr.Blocks(title="Kie AI — Sora‑2 Text‑to‑Video (Polling)") as demo: gr.Markdown("## Kie AI — Sora‑2 Text‑to‑Video\n- Gunakan API key Kie AI (bukan OpenAI).\n- remove_watermark dimatikan (False).") api = gr.Textbox(label="KIE API Key (opsional, BYOK)", type="password", placeholder="kie-...") prompt = gr.Textbox( label="Prompt", lines=7, value=( "Photorealistic handheld POV at night in a foggy tropical forest; " "a large animal silhouette crosses the path, eyes reflecting light; " "volumetric fog, wet leaves; vertical 9:16; no blood, no gore, no text, no watermark." ), ) with gr.Row(): aspect = gr.Dropdown(["portrait", "landscape", "square", "9:16", "16:9"], value="portrait", label="Aspect ratio") duration = gr.Textbox(value="", label="Duration (detik, opsional)", placeholder="mis. 12 (kosongkan jika ragu)") run = gr.Button("Generate") status = gr.Textbox(label="Status", lines=6) video = gr.Video(label="Hasil video") check = gr.Button("Check runtime") info = gr.Textbox(label="Runtime info") run.click(run_job, inputs=[api, prompt, aspect, duration], outputs=[status, video]) check.click(runtime_info, inputs=[api], outputs=[info]) # Tambahkan launch jika Space type = Python if __name__ == "__main__": demo.queue().launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")))