Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import uuid | |
| import time | |
| import shutil | |
| import mimetypes | |
| import requests | |
| import subprocess | |
| from datetime import datetime | |
| from typing import List, Optional | |
| import gradio as gr | |
| from PIL import Image | |
| # ================================ | |
| # Config | |
| # ================================ | |
| ENDPOINT_URL = "https://moonmath-ai-dev--moonmath-i2v-backend-moonmathinference-run.modal.run" # your backend | |
| OUT_DIR = "outputs" | |
| TMP_DIR = "tmp" | |
| os.makedirs(OUT_DIR, exist_ok=True) | |
| os.makedirs(TMP_DIR, exist_ok=True) | |
| # If ffmpeg is not on PATH for any reason, set absolute path here | |
| FFMPEG_BIN = "ffmpeg" | |
| # ================================ | |
| # Helpers | |
| # ================================ | |
| def _ts() -> str: | |
| return datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| def _safe_filename(prefix: str, ext: str) -> str: | |
| return f"{prefix}_{_ts()}_{uuid.uuid4().hex[:8]}.{ext}" | |
| def _run_ffmpeg(args: List[str]) -> None: | |
| """Run ffmpeg with given args; raise on failure with readable message.""" | |
| cmd = [FFMPEG_BIN] + args | |
| proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if proc.returncode != 0: | |
| raise RuntimeError( | |
| f"ffmpeg failed ({proc.returncode}).\nSTDOUT:\n{proc.stdout.decode(errors='ignore')}\n\nSTDERR:\n{proc.stderr.decode(errors='ignore')}" | |
| ) | |
| def extract_last_frame(video_path: str) -> str: | |
| """ | |
| Extract the very last frame as PNG using ffmpeg. | |
| Uses -sseof -1 to seek to the last second. | |
| """ | |
| out_path = os.path.join(TMP_DIR, _safe_filename("lastframe", "png")) | |
| # Seek to last second and output a single frame | |
| _run_ffmpeg([ | |
| "-sseof", "-1", | |
| "-i", video_path, | |
| "-update", "1", | |
| "-frames:v", "1", | |
| "-q:v", "2", | |
| out_path | |
| ]) | |
| return out_path | |
| def concat_videos_ffmpeg(video_paths: List[str]) -> str: | |
| """ | |
| Concatenate MP4s using ffmpeg concat demuxer. | |
| """ | |
| if len(video_paths) == 1: | |
| # copy the single file to a new name to keep UX consistent | |
| dst = os.path.join(OUT_DIR, _safe_filename("continuous", "mp4")) | |
| shutil.copy(video_paths[0], dst) | |
| return dst | |
| list_file = os.path.join(TMP_DIR, f"concat_{uuid.uuid4().hex}.txt") | |
| with open(list_file, "w") as f: | |
| for p in video_paths: | |
| # paths must be quoted if they may contain spaces | |
| f.write(f"file '{os.path.abspath(p)}'\n") | |
| out_path = os.path.join(OUT_DIR, _safe_filename("continuous", "mp4")) | |
| _run_ffmpeg([ | |
| "-f", "concat", | |
| "-safe", "0", | |
| "-i", list_file, | |
| "-c", "copy", | |
| out_path | |
| ]) | |
| try: | |
| os.remove(list_file) | |
| except Exception: | |
| pass | |
| return out_path | |
| def zip_files(file_paths: List[str]) -> str: | |
| base = os.path.join(OUT_DIR, f"used_{_ts()}") | |
| shutil.make_archive(base, "zip", root_dir=os.path.dirname(file_paths[0]) if file_paths else ".", base_dir=".") | |
| # The above zips the entire directory — better: copy into temp pack dir first | |
| # For precision, we’ll do that instead: | |
| stamp = _ts() | |
| pack_dir = os.path.join(TMP_DIR, f"pack_{stamp}_{uuid.uuid4().hex[:6]}") | |
| os.makedirs(pack_dir, exist_ok=True) | |
| for p in file_paths: | |
| shutil.copy(p, pack_dir) | |
| zip_base = os.path.join(OUT_DIR, f"used_{stamp}") | |
| shutil.make_archive(zip_base, "zip", pack_dir) | |
| shutil.rmtree(pack_dir, ignore_errors=True) | |
| return f"{zip_base}.zip" | |
| def _guess_filename_from_response(resp: requests.Response, default_ext: str = "mp4") -> str: | |
| # Try Content-Disposition | |
| cd = resp.headers.get("Content-Disposition", "") | |
| if "filename=" in cd: | |
| name = cd.split("filename=")[-1].strip('"; ') | |
| if name: | |
| return name | |
| # Otherwise, attempt from content-type | |
| ctype = resp.headers.get("Content-Type", "") | |
| ext = mimetypes.guess_extension(ctype) or f".{default_ext}" | |
| return _safe_filename("gen", ext.lstrip(".")) | |
| def save_bytes_to_file(content: bytes, ext: str = "mp4") -> str: | |
| path = os.path.join(OUT_DIR, _safe_filename("gen", ext)) | |
| with open(path, "wb") as f: | |
| f.write(content) | |
| return path | |
| # ================================ | |
| # Backend call | |
| # ================================ | |
| def call_generator(prompt: str, seed_frame_path: Optional[str]) -> str: | |
| """ | |
| Calls your Modal endpoint. Supports two response modes: | |
| 1) Direct video bytes (Content-Type like video/mp4) | |
| 2) JSON with {"video_url": "..."} which we then download. | |
| We send: | |
| - form field: prompt | |
| - optional file: seed_frame (PNG/JPG) | |
| """ | |
| files = {} | |
| data = {"prompt": prompt} | |
| if seed_frame_path and os.path.exists(seed_frame_path): | |
| files["seed_frame"] = (os.path.basename(seed_frame_path), open(seed_frame_path, "rb"), "image/png") | |
| # Make the request | |
| resp = requests.post(ENDPOINT_URL, data=data, files=files, timeout=600) | |
| if files: | |
| # close file handles | |
| for _, f in files.items(): | |
| # f is a tuple (name, fileobj, mimetype) | |
| try: | |
| f[1].close() | |
| except Exception: | |
| pass | |
| if resp.status_code != 200: | |
| raise RuntimeError(f"Generator returned {resp.status_code}: {resp.text[:500]}") | |
| ctype = resp.headers.get("Content-Type", "") | |
| if ctype.startswith("video/"): | |
| # Direct bytes | |
| ext = mimetypes.guess_extension(ctype) or ".mp4" | |
| out_path = os.path.join(OUT_DIR, _safe_filename("gen", ext.lstrip("."))) | |
| with open(out_path, "wb") as f: | |
| f.write(resp.content) | |
| return out_path | |
| # Assume JSON with URL | |
| try: | |
| payload = resp.json() | |
| except Exception: | |
| # fallback: treat body as raw mp4 | |
| return save_bytes_to_file(resp.content, "mp4") | |
| video_url = payload.get("video_url") | |
| if not video_url: | |
| # Maybe it returned base64? If so, add handling here if needed. | |
| raise RuntimeError(f"Unexpected response: {json.dumps(payload)[:500]}") | |
| # Download the video | |
| r2 = requests.get(video_url, stream=True, timeout=600) | |
| if r2.status_code != 200: | |
| raise RuntimeError(f"Failed to fetch video from URL: {r2.status_code}") | |
| ext = mimetypes.guess_extension(r2.headers.get("Content-Type", "")) or ".mp4" | |
| out_path = os.path.join(OUT_DIR, _safe_filename("gen", ext.lstrip("."))) | |
| with open(out_path, "wb") as f: | |
| for chunk in r2.iter_content(chunk_size=1024 * 1024): | |
| if chunk: | |
| f.write(chunk) | |
| return out_path | |
| # ================================ | |
| # Gradio UI | |
| # ================================ | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Continuous Video (Prompt → Use → Chain → Download)") | |
| state_used = gr.State([]) # list[str] of used clip paths | |
| state_seed = gr.State(None) # path to last-frame image | |
| state_current = gr.State(None) # most recent generated video path | |
| with gr.Row(): | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| placeholder="Describe your next shot…", | |
| lines=2, | |
| autofocus=True | |
| ) | |
| with gr.Row(): | |
| video_out = gr.Video(label="Video Output", interactive=False) | |
| with gr.Row(): | |
| btn_generate = gr.Button("Generate", variant="primary") | |
| btn_use = gr.Button("Use (chain this)") | |
| btn_download = gr.Button("Download (A+B+C & ZIP)") | |
| btn_reset = gr.Button("Reset Session", variant="stop") | |
| files_out = gr.Files(label="Downloads", height=100) | |
| # ---------- Handlers ---------- | |
| def on_generate(prompt_text, seed_frame): | |
| if not prompt_text or not prompt_text.strip(): | |
| return gr.update(value=None), None | |
| try: | |
| vid_path = call_generator(prompt_text.strip(), seed_frame) | |
| except Exception as e: | |
| # Surface the error in the UI | |
| err_mp4 = os.path.join(OUT_DIR, _safe_filename("error", "txt")) | |
| with open(err_mp4, "w") as f: | |
| f.write(str(e)) | |
| return gr.update(value=None), None | |
| return vid_path, vid_path | |
| btn_generate.click( | |
| on_generate, | |
| inputs=[prompt, state_seed], | |
| outputs=[video_out, state_current], | |
| ) | |
| def on_use(current_path, used_paths): | |
| if not current_path or not os.path.exists(current_path): | |
| return used_paths, None | |
| new_used = list(used_paths) | |
| if current_path not in new_used: | |
| new_used.append(current_path) | |
| # Extract last frame -> seed for next gen | |
| try: | |
| seed_img = extract_last_frame(current_path) | |
| except Exception as e: | |
| # If extraction fails, keep chaining logic but without seed | |
| seed_img = None | |
| return new_used, seed_img | |
| btn_use.click( | |
| on_use, | |
| inputs=[state_current, state_used], | |
| outputs=[state_used, state_seed], | |
| ) | |
| def on_download(used_paths): | |
| if not used_paths: | |
| return [] | |
| try: | |
| concat_path = concat_videos_ffmpeg(used_paths) | |
| except Exception as e: | |
| # If concat fails, skip it but still offer the ZIP | |
| concat_path = None | |
| zip_path = zip_files(used_paths) | |
| files = [zip_path] | |
| if concat_path: | |
| files.insert(0, concat_path) | |
| return files | |
| btn_download.click( | |
| on_download, | |
| inputs=[state_used], | |
| outputs=[files_out], | |
| ) | |
| def on_reset(): | |
| # Clear temp files and state | |
| try: | |
| for f in os.listdir(TMP_DIR): | |
| fp = os.path.join(TMP_DIR, f) | |
| if os.path.isfile(fp): | |
| os.remove(fp) | |
| except Exception: | |
| pass | |
| return None, [], None, gr.update(value=None), gr.update(value=None), gr.update(value=[]) | |
| btn_reset.click( | |
| on_reset, | |
| inputs=None, | |
| outputs=[state_seed, state_used, state_current, prompt, video_out, files_out], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |