File size: 8,696 Bytes
81e1375
 
 
 
 
 
 
 
 
 
a1bf7ef
81e1375
 
 
84a7681
81e1375
a1bf7ef
81e1375
 
 
 
 
84a7681
81e1375
 
 
 
 
 
 
a1bf7ef
81e1375
 
 
 
9ea4826
81e1375
 
0d65ddf
9ea4826
81e1375
 
0d65ddf
81e1375
 
 
0d65ddf
81e1375
 
a1bf7ef
81e1375
9ea4826
81e1375
 
0d65ddf
 
84a7681
a1bf7ef
 
 
84a7681
a1bf7ef
 
 
 
81e1375
a1bf7ef
9ea4826
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0d65ddf
9ea4826
 
84a7681
9ea4826
 
 
 
 
 
 
0d65ddf
9ea4826
 
0d65ddf
9ea4826
 
81e1375
 
a1bf7ef
81e1375
9ea4826
a1bf7ef
9ea4826
 
 
 
 
a1bf7ef
9ea4826
 
a1bf7ef
0d65ddf
9ea4826
a1bf7ef
9ea4826
a1bf7ef
9ea4826
 
81e1375
 
001e0b0
9ea4826
 
 
 
a1bf7ef
9ea4826
81e1375
 
 
 
 
 
a1bf7ef
 
81e1375
a1bf7ef
9ea4826
81e1375
a1bf7ef
81e1375
 
 
 
 
 
 
 
 
 
 
 
a1bf7ef
81e1375
 
a1bf7ef
81e1375
a1bf7ef
 
81e1375
a1bf7ef
0d65ddf
 
a1bf7ef
 
 
 
 
 
 
 
0d65ddf
81e1375
 
001e0b0
81e1375
 
 
 
a1bf7ef
001e0b0
a1bf7ef
 
81e1375
a1bf7ef
 
 
 
 
 
 
 
 
0d65ddf
a1bf7ef
9ea4826
001e0b0
0d65ddf
81e1375
a1bf7ef
 
b3b68e4
a1bf7ef
81e1375
b2d663e
84a7681
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import os
import time
import json
import pathlib
import requests
import gradio as gr

BASE = "https://api.kie.ai/api/v1"
CREATE_URL = f"{BASE}/jobs/createTask"

def make_headers(user_key: str | None):
    api_key = (user_key or os.getenv("KIE_API_KEY") or "").strip()
    if not api_key:
        raise gr.Error("Masukkan KIE API key atau set KIE_API_KEY di Secrets.")
    return {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}

def create_task(headers, prompt: str, aspect_ratio: str, duration: str | None):
    payload = {
        "model": "sora-2-text-to-video",
        "input": {
            "prompt": prompt,
            "aspect_ratio": aspect_ratio,
            "remove_watermark": False,
        },
    }
    if duration:
        try:
            d = int(duration)
            if d > 0:
                payload["input"]["duration"] = d
        except Exception:
            pass

    r = requests.post(CREATE_URL, headers=headers, json=payload, timeout=60)
    if r.status_code >= 400:
        raise gr.Error(f"createTask gagal: {r.status_code} {r.text}")

    data = r.json()
    raw_preview = json.dumps(data)[:500]

    task_id = (
        data.get("task_id") or data.get("taskId") or data.get("id") or
        (data.get("data") or {}).get("taskId") or (data.get("data") or {}).get("id")
    )
    status_url = (
        data.get("status_url") or data.get("statusUrl") or
        (data.get("data") or {}).get("status_url") or (data.get("data") or {}).get("statusUrl")
    )
    if not task_id:
        raise gr.Error(f"Tidak menemukan task_id: {raw_preview}")

    return task_id, status_url, raw_preview

def infer_status_and_result(obj: dict):
    status = obj.get("status") or obj.get("state") or (obj.get("data") or {}).get("status")
    progress = obj.get("progress") or (obj.get("data") or {}).get("progress")
    for key in ["result_url","output_url","video_url","download_url"]:
        if key in obj and isinstance(obj[key], str) and obj[key].startswith("http"):
            return status, progress, obj[key]
    data = obj.get("data") or {}
    for key in ["result_url","output_url","video_url","download_url"]:
        v = data.get(key)
        if isinstance(v, str) and v.startswith("http"):
            return status, progress, v
    return status, progress, None

def probe_status_url(headers, task_id: str):
    candidates = [
        f"{BASE}/jobs/getTask?taskId={task_id}",
        f"{BASE}/jobs/get-task?taskId={task_id}",
        f"{BASE}/jobs/task?taskId={task_id}",
        f"{BASE}/jobs/{task_id}",
        f"{BASE}/jobs/status/{task_id}",
        f"{BASE}/jobs/getTask/{task_id}",
        f"{BASE}/jobs/task/{task_id}",
    ]
    for url in candidates:
        try:
            r = requests.get(url, headers=headers, timeout=30)
            if r.status_code < 400:
                try:
                    data = r.json()
                except Exception:
                    continue
                status, prog, _ = infer_status_and_result(data)
                if status is not None:
                    return url
        except Exception:
            pass
    # POST variant
    post_url = f"{BASE}/jobs/getTask"
    try:
        r = requests.post(post_url, headers=headers, json={"taskId": task_id}, timeout=30)
        if r.status_code < 400:
            data = r.json()
            status, prog, _ = infer_status_and_result(data)
            if status is not None:
                return post_url + " (POST)"
    except Exception:
        pass
    return None

def poll_until_done(headers, status_url: str, task_id: str, interval=5, max_wait=60*40):
    waited = 0
    last_status = None
    used_post = status_url.endswith("(POST)")
    while waited <= max_wait:
        try:
            if used_post:
                r = requests.post(status_url.replace(" (POST)", ""), headers=headers, json={"taskId": task_id}, timeout=60)
            else:
                r = requests.get(status_url, headers=headers, timeout=60)
        except Exception as e:
            yield f"[WARN] Poll error: {e}", None
            time.sleep(interval); waited += interval; continue

        if r.status_code == 404:
            yield f"[WARN] Poll 404 di {status_url} — mencoba endpoint lain...", None
            new_url = probe_status_url(headers, task_id)
            if not new_url:
                raise gr.Error("Tidak menemukan endpoint status. Mohon cek dokumentasi Kie AI.")
            status_url = new_url
            used_post = status_url.endswith("(POST)")
            continue

        if r.status_code >= 400:
            yield f"[WARN] Poll failed: {r.status_code} {r.text}", None
        else:
            try:
                data = r.json()
            except Exception:
                data = {}
                yield "[WARN] Response status bukan JSON.", None

            status, progress, result_url = infer_status_and_result(data)
            if status != last_status:
                yield f"{status} - progress: {progress}", None
                last_status = status

            s = (str(status) or "").lower()
            if s in {"succeeded","completed","success","done"}:
                yield "Selesai di server — mengunduh...", result_url
                return
            if s in {"failed","error","canceled","cancelled"}:
                raise gr.Error(f"Task gagal:\n{json.dumps(data, indent=2)}")

        time.sleep(interval); waited += interval
    raise gr.Error("Timeout menunggu task selesai.")

def download_result(url: str, task_id: str):
    if not url:
        raise gr.Error("Server tidak mengirimkan URL hasil.")
    out = pathlib.Path("/tmp") / f"{task_id}.mp4"
    out.parent.mkdir(parents=True, exist_ok=True)
    with requests.get(url, stream=True, timeout=300) as r:
        if r.status_code >= 400:
            raise gr.Error(f"Gagal download hasil: {r.status_code} {r.text}")
        with open(out, "wb") as f:
            for chunk in r.iter_content(chunk_size=1 << 20):
                if chunk: f.write(chunk)
    return str(out)

def run_job(user_api_key, prompt, aspect_ratio, duration):
    headers = make_headers(user_api_key)
    task_id, status_url, preview = create_task(headers, prompt, aspect_ratio, duration)
    yield f"Task dibuat: {task_id}\nStatus URL (server): {status_url or '-'}\nCreate resp: {preview}", None

    if not status_url:
        found = probe_status_url(headers, task_id)
        if not found:
            raise gr.Error("Tidak bisa menentukan endpoint status. Kirim response createTask penuh agar saya mappingkan field yang benar.")
        status_url = found

    result_link = None
    for status_msg, maybe_url in poll_until_done(headers, status_url, task_id):
        yield status_msg, None
        if maybe_url:
            result_link = maybe_url

    path = download_result(result_link, task_id)
    yield f"Selesai ✅ (task {task_id})", path

def runtime_info(user_api_key):
    key = (user_api_key or os.getenv("KIE_API_KEY") or "").strip()
    src = "BYOK (user input)" if user_api_key else ("ENV KIE_API_KEY" if os.getenv("KIE_API_KEY") else "NONE")
    masked = (key[:4] + "..." + key[-4:]) if key else "(none)"
    return f"Key source: {src}\nKey hint: {masked}\nBase: {BASE}"

with gr.Blocks(title="Kie AI — Sora‑2 Text‑to‑Video (Polling)") as demo:
    gr.Markdown("## Kie AI — Sora‑2 Text‑to‑Video\n- Gunakan API key Kie AI (bukan OpenAI).\n- remove_watermark dimatikan (False).")
    api = gr.Textbox(label="KIE API Key (opsional, BYOK)", type="password", placeholder="kie-...")
    prompt = gr.Textbox(
        label="Prompt",
        lines=7,
        value=(
            "Photorealistic handheld POV at night in a foggy tropical forest; "
            "a large animal silhouette crosses the path, eyes reflecting light; "
            "volumetric fog, wet leaves; vertical 9:16; no blood, no gore, no text, no watermark."
        ),
    )
    with gr.Row():
        aspect = gr.Dropdown(["portrait", "landscape", "square", "9:16", "16:9"], value="portrait", label="Aspect ratio")
        duration = gr.Textbox(value="", label="Duration (detik, opsional)", placeholder="mis. 12 (kosongkan jika ragu)")
    run = gr.Button("Generate")
    status = gr.Textbox(label="Status", lines=6)
    video = gr.Video(label="Hasil video")
    check = gr.Button("Check runtime")
    info = gr.Textbox(label="Runtime info")

    run.click(run_job, inputs=[api, prompt, aspect, duration], outputs=[status, video])
    check.click(runtime_info, inputs=[api], outputs=[info])

# Tambahkan launch jika Space type = Python
if __name__ == "__main__":
    demo.queue().launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", "7860")))