Spaces:
Running
Running
| import os, uuid, time, shutil, mimetypes, subprocess, requests, gradio as gr | |
| from datetime import datetime | |
| from typing import Optional, List | |
| # --- config --- | |
| ENDPOINT = "https://moonmath-ai-dev--moonmath-i2v-backend-moonmathinference-run.modal.run" | |
| FFMPEG = "ffmpeg" | |
| OUT, TMP = "outputs", "tmp" | |
| os.makedirs(OUT, exist_ok=True); os.makedirs(TMP, exist_ok=True) | |
| ts = lambda: datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| fname = lambda p,e: f"{p}_{ts()}_{uuid.uuid4().hex[:6]}.{e}" | |
| abspath= lambda p: os.path.abspath(p) | |
| def run_ffmpeg(args: List[str]): | |
| p = subprocess.run([FFMPEG]+args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if p.returncode: | |
| raise RuntimeError(p.stderr.decode(errors="ignore")) | |
| def extract_last_frame(video_path: str) -> str: | |
| out = os.path.join(TMP, fname("last", "png")) | |
| run_ffmpeg(["-sseof","-1","-i",video_path,"-frames:v","1","-q:v","2",out]) | |
| return out | |
| def concat_videos(paths: List[str]) -> str: | |
| if not paths: raise ValueError("No videos selected.") | |
| if len(paths)==1: | |
| dst = os.path.join(OUT, fname("continuous","mp4")); shutil.copy(paths[0], dst); return dst | |
| listfile = os.path.join(TMP, f"concat_{uuid.uuid4().hex}.txt") | |
| with open(listfile,"w") as f: | |
| for p in paths: f.write(f"file '{abspath(p)}'\n") | |
| out = os.path.join(OUT, fname("continuous","mp4")) | |
| run_ffmpeg(["-f","concat","-safe","0","-i",listfile,"-c","copy",out]) | |
| os.remove(listfile) | |
| return out | |
| def zip_used(paths: List[str]) -> str: | |
| pack = os.path.join(TMP, f"pack_{uuid.uuid4().hex[:6]}") | |
| os.makedirs(pack, exist_ok=True) | |
| for p in paths: shutil.copy(p, pack) | |
| base = os.path.join(OUT, f"used_{ts()}") | |
| shutil.make_archive(base, "zip", pack) | |
| shutil.rmtree(pack, ignore_errors=True) | |
| return base + ".zip" | |
| def save_video_bytes(content: bytes, content_type: str) -> str: | |
| ext = (mimetypes.guess_extension(content_type) or ".mp4").lstrip(".") | |
| path = os.path.join(OUT, fname("gen", ext)) | |
| with open(path,"wb") as f: f.write(content) | |
| return path | |
| def call_backend( | |
| prompt: str, | |
| image_bytes_path: str, | |
| negative_prompt: Optional[str], | |
| fps: int, | |
| vlen: int, | |
| steps: Optional[int], | |
| seed: Optional[int] | |
| ) -> str: | |
| params = { | |
| "prompt": prompt, | |
| "frames_per_second": str(fps), | |
| "video_length": str(vlen), | |
| } | |
| if negative_prompt: params["negative_prompt"] = negative_prompt | |
| if steps is not None: params["num_inference_steps"] = str(steps) | |
| if seed is None: seed = int(time.time()) | |
| params["seed"] = str(seed) | |
| files = {"image_bytes": (os.path.basename(image_bytes_path), open(image_bytes_path,"rb"), | |
| "application/octet-stream")} | |
| try: | |
| r = requests.post(ENDPOINT, params=params, files=files, headers={"accept":"application/json"}, timeout=600) | |
| finally: | |
| try: files["image_bytes"][1].close() | |
| except: pass | |
| if r.status_code != 200: | |
| raise RuntimeError(f"Backend {r.status_code}: {r.text[:500]}") | |
| ctype = r.headers.get("Content-Type","") | |
| if ctype.startswith("video/"): # raw video bytes | |
| return save_video_bytes(r.content, ctype) | |
| # expect JSON with { "video_url": ... } | |
| try: | |
| payload = r.json() | |
| url = payload.get("video_url") | |
| if not url: raise ValueError("no video_url in response") | |
| r2 = requests.get(url, stream=True, timeout=600) | |
| if r2.status_code != 200: raise RuntimeError(f"fetch video {r2.status_code}") | |
| path = os.path.join(OUT, fname("gen","mp4")) | |
| with open(path,"wb") as f: | |
| for chunk in r2.iter_content(1<<20): | |
| if chunk: f.write(chunk) | |
| return path | |
| except Exception: | |
| # fallback if backend mislabels content | |
| return save_video_bytes(r.content, ctype or "video/mp4") | |
| # ================= UI ================= | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Continuous Video — chain last frame → next first frame") | |
| # state | |
| used_videos = gr.State([]) # list[str] | |
| last_seed_img = gr.State(None) # PNG path (becomes image_bytes) | |
| current_video = gr.State(None) # latest generated video | |
| with gr.Row(): | |
| prompt = gr.Textbox(label="Prompt", placeholder="Describe your shot…", lines=2) | |
| with gr.Row(): | |
| start_file = gr.File( | |
| label="Initial start image or video (only for the very first clip)", | |
| file_types=["image","video"] | |
| ) | |
| with gr.Row(): | |
| negative = gr.Textbox(label="Negative prompt (optional)", placeholder="What to avoid…") | |
| with gr.Row(): | |
| fps = gr.Slider(1,60, value=24, step=1, label="Frames per second") | |
| vlen = gr.Slider(1,12, value=4, step=1, label="Video length (seconds)") | |
| steps = gr.Slider(1,100, value=30, step=1, label="Num inference steps (optional)") | |
| seed = gr.Number(label="Seed (optional)", precision=0) | |
| video_out = gr.Video(label="Output") | |
| status_md = gr.Markdown("") # shows ✅ Saved… after Chain | |
| with gr.Row(): | |
| gen_btn = gr.Button("Generate", variant="primary") | |
| chain_btn= gr.Button("Chain (save & clear)") | |
| dl_btn = gr.Button("Download (concatenate & ZIP)") | |
| files_out = gr.Files(label="Downloads") | |
| # ---- handlers ---- | |
| def on_generate(prompt_txt, start_file_obj, neg, fps_val, vlen_val, steps_val, seed_val, seed_img): | |
| if not prompt_txt or not prompt_txt.strip(): | |
| raise gr.Error("Prompt is required.") | |
| # choose image_bytes source: last stolen frame > uploaded start file | |
| image_path = seed_img | |
| if not image_path: | |
| if not start_file_obj: raise gr.Error("First generation requires an initial image OR video.") | |
| image_path = start_file_obj.name | |
| try: | |
| out = call_backend( | |
| prompt=prompt_txt.strip(), | |
| image_bytes_path=image_path, | |
| negative_prompt=(neg.strip() if (neg and neg.strip()) else None), | |
| fps=int(fps_val), | |
| vlen=int(vlen_val), | |
| steps=int(steps_val) if steps_val else None, | |
| seed=int(seed_val) if (seed_val not in [None,""]) else None | |
| ) | |
| except Exception as e: | |
| raise gr.Error(str(e)) | |
| return out, out, gr.update(value="") # show video, set state, clear status | |
| gen_btn.click( | |
| on_generate, | |
| inputs=[prompt, start_file, negative, fps, vlen, steps, seed, last_seed_img], | |
| outputs=[video_out, current_video, status_md] | |
| ) | |
| def on_chain(curr, used): | |
| """ | |
| Save current video, steal last frame for next seed, | |
| and CLEAR: preview, prompt, negative, start_file. | |
| Also show a visible 'saved' banner. | |
| """ | |
| if not curr or not os.path.exists(curr): | |
| raise gr.Error("Generate a video first.") | |
| new_used = used[:] if used else [] | |
| if curr not in new_used: new_used.append(curr) | |
| # last frame for next seed | |
| seed_img = extract_last_frame(curr) | |
| saved_idx = len(new_used) | |
| saved_msg = f"✅ **Saved clip #{saved_idx}** — last frame captured. Ready for your next prompt." | |
| return ( | |
| new_used, # used_videos | |
| seed_img, # last_seed_img (image_bytes for next gen) | |
| gr.update(value=None), # CLEAR prompt | |
| gr.update(value=None), # CLEAR negative prompt | |
| gr.update(value=None), # CLEAR start_file | |
| gr.update(value=None), # CLEAR video preview | |
| gr.update(value=saved_msg) # status banner | |
| ) | |
| chain_btn.click( | |
| on_chain, | |
| inputs=[current_video, used_videos], | |
| outputs=[used_videos, last_seed_img, prompt, negative, start_file, video_out, status_md] | |
| ) | |
| def on_download(used): | |
| if not used: raise gr.Error("Nothing to download. Click Chain after generating clips.") | |
| files = [] | |
| try: | |
| merged = concat_videos(used); files.append(merged) | |
| except Exception as e: | |
| print("concat error:", e) | |
| try: | |
| zipped = zip_used(used); files.append(zipped) | |
| except Exception as e: | |
| print("zip error:", e) | |
| return files | |
| dl_btn.click(on_download, inputs=[used_videos], outputs=[files_out]) | |
| if __name__ == "__main__": | |
| demo.launch() | |