Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import json | |
| import pathlib | |
| import requests | |
| import gradio as gr | |
| BASE = "https://api.kie.ai/api/v1" | |
| CREATE_URL = f"{BASE}/jobs/createTask" | |
| def make_headers(user_key: str | None): | |
| api_key = (user_key or os.getenv("KIE_API_KEY") or "").strip() | |
| if not api_key: | |
| raise gr.Error("Masukkan KIE API key atau set KIE_API_KEY di Secrets.") | |
| return {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} | |
| def create_task(headers, prompt: str, aspect_ratio: str, duration: str | None): | |
| payload = { | |
| "model": "sora-2-text-to-video", | |
| "input": { | |
| "prompt": prompt, | |
| "aspect_ratio": aspect_ratio, | |
| "remove_watermark": False, | |
| }, | |
| } | |
| if duration: | |
| try: | |
| d = int(duration) | |
| if d > 0: | |
| payload["input"]["duration"] = d | |
| except Exception: | |
| pass | |
| r = requests.post(CREATE_URL, headers=headers, json=payload, timeout=60) | |
| if r.status_code >= 400: | |
| raise gr.Error(f"createTask gagal: {r.status_code} {r.text}") | |
| data = r.json() | |
| raw_preview = json.dumps(data)[:500] | |
| task_id = ( | |
| data.get("task_id") or data.get("taskId") or data.get("id") or | |
| (data.get("data") or {}).get("taskId") or (data.get("data") or {}).get("id") | |
| ) | |
| status_url = ( | |
| data.get("status_url") or data.get("statusUrl") or | |
| (data.get("data") or {}).get("status_url") or (data.get("data") or {}).get("statusUrl") | |
| ) | |
| if not task_id: | |
| raise gr.Error(f"Tidak menemukan task_id: {raw_preview}") | |
| return task_id, status_url, raw_preview | |
| def infer_status_and_result(obj: dict): | |
| status = obj.get("status") or obj.get("state") or (obj.get("data") or {}).get("status") | |
| progress = obj.get("progress") or (obj.get("data") or {}).get("progress") | |
| for key in ["result_url","output_url","video_url","download_url"]: | |
| if key in obj and isinstance(obj[key], str) and obj[key].startswith("http"): | |
| return status, progress, obj[key] | |
| data = obj.get("data") or {} | |
| for key in ["result_url","output_url","video_url","download_url"]: | |
| v = data.get(key) | |
| if isinstance(v, str) and v.startswith("http"): | |
| return status, progress, v | |
| return status, progress, None | |
| def probe_status_url(headers, task_id: str): | |
| candidates = [ | |
| f"{BASE}/jobs/getTask?taskId={task_id}", | |
| f"{BASE}/jobs/get-task?taskId={task_id}", | |
| f"{BASE}/jobs/task?taskId={task_id}", | |
| f"{BASE}/jobs/{task_id}", | |
| f"{BASE}/jobs/status/{task_id}", | |
| f"{BASE}/jobs/getTask/{task_id}", | |
| f"{BASE}/jobs/task/{task_id}", | |
| ] | |
| for url in candidates: | |
| try: | |
| r = requests.get(url, headers=headers, timeout=30) | |
| if r.status_code < 400: | |
| try: | |
| data = r.json() | |
| except Exception: | |
| continue | |
| status, prog, _ = infer_status_and_result(data) | |
| if status is not None: | |
| return url | |
| except Exception: | |
| pass | |
| # POST variant | |
| post_url = f"{BASE}/jobs/getTask" | |
| try: | |
| r = requests.post(post_url, headers=headers, json={"taskId": task_id}, timeout=30) | |
| if r.status_code < 400: | |
| data = r.json() | |
| status, prog, _ = infer_status_and_result(data) | |
| if status is not None: | |
| return post_url + " (POST)" | |
| except Exception: | |
| pass | |
| return None | |
| def poll_until_done(headers, status_url: str, task_id: str, interval=5, max_wait=60*40): | |
| waited = 0 | |
| last_status = None | |
| used_post = status_url.endswith("(POST)") | |
| while waited <= max_wait: | |
| try: | |
| if used_post: | |
| r = requests.post(status_url.replace(" (POST)", ""), headers=headers, json={"taskId": task_id}, timeout=60) | |
| else: | |
| r = requests.get(status_url, headers=headers, timeout=60) | |
| except Exception as e: | |
| yield f"[WARN] Poll error: {e}", None | |
| time.sleep(interval); waited += interval; continue | |
| if r.status_code == 404: | |
| yield f"[WARN] Poll 404 di {status_url} — mencoba endpoint lain...", None | |
| new_url = probe_status_url(headers, task_id) | |
| if not new_url: | |
| raise gr.Error("Tidak menemukan endpoint status. Mohon cek dokumentasi Kie AI.") | |
| status_url = new_url | |
| used_post = status_url.endswith("(POST)") | |
| continue | |
| if r.status_code >= 400: | |
| yield f"[WARN] Poll failed: {r.status_code} {r.text}", None | |
| else: | |
| try: | |
| data = r.json() | |
| except Exception: | |
| data = {} | |
| yield "[WARN] Response status bukan JSON.", None | |
| status, progress, result_url = infer_status_and_result(data) | |
| if status != last_status: | |
| yield f"{status} - progress: {progress}", None | |
| last_status = status | |
| s = (str(status) or "").lower() | |
| if s in {"succeeded","completed","success","done"}: | |
| yield "Selesai di server — mengunduh...", result_url | |
| return | |
| if s in {"failed","error","canceled","cancelled"}: | |
| raise gr.Error(f"Task gagal:\n{json.dumps(data, indent=2)}") | |
| time.sleep(interval); waited += interval | |
| raise gr.Error("Timeout menunggu task selesai.") | |
| def download_result(url: str, task_id: str): | |
| if not url: | |
| raise gr.Error("Server tidak mengirimkan URL hasil.") | |
| out = pathlib.Path("/tmp") / f"{task_id}.mp4" | |
| out.parent.mkdir(parents=True, exist_ok=True) | |
| with requests.get(url, stream=True, timeout=300) as r: | |
| if r.status_code >= 400: | |
| raise gr.Error(f"Gagal download hasil: {r.status_code} {r.text}") | |
| with open(out, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=1 << 20): | |
| if chunk: f.write(chunk) | |
| return str(out) | |
| def run_job(user_api_key, prompt, aspect_ratio, duration): | |
| headers = make_headers(user_api_key) | |
| task_id, status_url, preview = create_task(headers, prompt, aspect_ratio, duration) | |
| yield f"Task dibuat: {task_id}\nStatus URL (server): {status_url or '-'}\nCreate resp: {preview}", None | |
| if not status_url: | |
| found = probe_status_url(headers, task_id) | |
| if not found: | |
| raise gr.Error("Tidak bisa menentukan endpoint status. Kirim response createTask penuh agar saya mappingkan field yang benar.") | |
| status_url = found | |
| result_link = None | |
| for status_msg, maybe_url in poll_until_done(headers, status_url, task_id): | |
| yield status_msg, None | |
| if maybe_url: | |
| result_link = maybe_url | |
| path = download_result(result_link, task_id) | |
| yield f"Selesai ✅ (task {task_id})", path | |
| def runtime_info(user_api_key): | |
| key = (user_api_key or os.getenv("KIE_API_KEY") or "").strip() | |
| src = "BYOK (user input)" if user_api_key else ("ENV KIE_API_KEY" if os.getenv("KIE_API_KEY") else "NONE") | |
| masked = (key[:4] + "..." + key[-4:]) if key else "(none)" | |
| return f"Key source: {src}\nKey hint: {masked}\nBase: {BASE}" | |
| with gr.Blocks(title="Kie AI — Sora‑2 Text‑to‑Video (Polling)") as demo: | |
| gr.Markdown("## Kie AI — Sora‑2 Text‑to‑Video\n- Gunakan API key Kie AI (bukan OpenAI).\n- remove_watermark dimatikan (False).") | |
| api = gr.Textbox(label="KIE API Key (opsional, BYOK)", type="password", placeholder="kie-...") | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| lines=7, | |
| value=( | |
| "Photorealistic handheld POV at night in a foggy tropical forest; " | |
| "a large animal silhouette crosses the path, eyes reflecting light; " | |
| "volumetric fog, wet leaves; vertical 9:16; no blood, no gore, no text, no watermark." | |
| ), | |
| ) | |
| with gr.Row(): | |
| aspect = gr.Dropdown(["portrait", "landscape", "square", "9:16", "16:9"], value="portrait", label="Aspect ratio") | |
| duration = gr.Textbox(value="", label="Duration (detik, opsional)", placeholder="mis. 12 (kosongkan jika ragu)") | |
| run = gr.Button("Generate") | |
| status = gr.Textbox(label="Status", lines=6) | |
| video = gr.Video(label="Hasil video") | |
| check = gr.Button("Check runtime") | |
| info = gr.Textbox(label="Runtime info") | |
| run.click(run_job, inputs=[api, prompt, aspect, duration], outputs=[status, video]) | |
| check.click(runtime_info, inputs=[api], outputs=[info]) | |
| # Tambahkan launch jika Space type = Python | |
| if __name__ == "__main__": | |
| demo.queue().launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860"))) |