Continue / app.py
Shalmoni's picture
Update app.py
62998c3 verified
import os, uuid, time, shutil, mimetypes, subprocess, requests, gradio as gr
from datetime import datetime
from typing import Optional, List
# --- config ---
ENDPOINT = "https://moonmath-ai-dev--moonmath-i2v-backend-moonmathinference-run.modal.run"
FFMPEG = "ffmpeg"
OUT, TMP = "outputs", "tmp"
os.makedirs(OUT, exist_ok=True); os.makedirs(TMP, exist_ok=True)
ts = lambda: datetime.utcnow().strftime("%Y%m%d_%H%M%S")
fname = lambda p,e: f"{p}_{ts()}_{uuid.uuid4().hex[:6]}.{e}"
abspath= lambda p: os.path.abspath(p)
def run_ffmpeg(args: List[str]):
p = subprocess.run([FFMPEG]+args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p.returncode:
raise RuntimeError(p.stderr.decode(errors="ignore"))
def extract_last_frame(video_path: str) -> str:
out = os.path.join(TMP, fname("last", "png"))
run_ffmpeg(["-sseof","-1","-i",video_path,"-frames:v","1","-q:v","2",out])
return out
def concat_videos(paths: List[str]) -> str:
if not paths: raise ValueError("No videos selected.")
if len(paths)==1:
dst = os.path.join(OUT, fname("continuous","mp4")); shutil.copy(paths[0], dst); return dst
listfile = os.path.join(TMP, f"concat_{uuid.uuid4().hex}.txt")
with open(listfile,"w") as f:
for p in paths: f.write(f"file '{abspath(p)}'\n")
out = os.path.join(OUT, fname("continuous","mp4"))
run_ffmpeg(["-f","concat","-safe","0","-i",listfile,"-c","copy",out])
os.remove(listfile)
return out
def zip_used(paths: List[str]) -> str:
pack = os.path.join(TMP, f"pack_{uuid.uuid4().hex[:6]}")
os.makedirs(pack, exist_ok=True)
for p in paths: shutil.copy(p, pack)
base = os.path.join(OUT, f"used_{ts()}")
shutil.make_archive(base, "zip", pack)
shutil.rmtree(pack, ignore_errors=True)
return base + ".zip"
def save_video_bytes(content: bytes, content_type: str) -> str:
ext = (mimetypes.guess_extension(content_type) or ".mp4").lstrip(".")
path = os.path.join(OUT, fname("gen", ext))
with open(path,"wb") as f: f.write(content)
return path
def call_backend(
prompt: str,
image_bytes_path: str,
negative_prompt: Optional[str],
fps: int,
vlen: int,
steps: Optional[int],
seed: Optional[int]
) -> str:
params = {
"prompt": prompt,
"frames_per_second": str(fps),
"video_length": str(vlen),
}
if negative_prompt: params["negative_prompt"] = negative_prompt
if steps is not None: params["num_inference_steps"] = str(steps)
if seed is None: seed = int(time.time())
params["seed"] = str(seed)
files = {"image_bytes": (os.path.basename(image_bytes_path), open(image_bytes_path,"rb"),
"application/octet-stream")}
try:
r = requests.post(ENDPOINT, params=params, files=files, headers={"accept":"application/json"}, timeout=600)
finally:
try: files["image_bytes"][1].close()
except: pass
if r.status_code != 200:
raise RuntimeError(f"Backend {r.status_code}: {r.text[:500]}")
ctype = r.headers.get("Content-Type","")
if ctype.startswith("video/"): # raw video bytes
return save_video_bytes(r.content, ctype)
# expect JSON with { "video_url": ... }
try:
payload = r.json()
url = payload.get("video_url")
if not url: raise ValueError("no video_url in response")
r2 = requests.get(url, stream=True, timeout=600)
if r2.status_code != 200: raise RuntimeError(f"fetch video {r2.status_code}")
path = os.path.join(OUT, fname("gen","mp4"))
with open(path,"wb") as f:
for chunk in r2.iter_content(1<<20):
if chunk: f.write(chunk)
return path
except Exception:
# fallback if backend mislabels content
return save_video_bytes(r.content, ctype or "video/mp4")
# ================= UI =================
with gr.Blocks() as demo:
gr.Markdown("## Continuous Video — chain last frame → next first frame")
# state
used_videos = gr.State([]) # list[str]
last_seed_img = gr.State(None) # PNG path (becomes image_bytes)
current_video = gr.State(None) # latest generated video
with gr.Row():
prompt = gr.Textbox(label="Prompt", placeholder="Describe your shot…", lines=2)
with gr.Row():
start_file = gr.File(
label="Initial start image or video (only for the very first clip)",
file_types=["image","video"]
)
with gr.Row():
negative = gr.Textbox(label="Negative prompt (optional)", placeholder="What to avoid…")
with gr.Row():
fps = gr.Slider(1,60, value=24, step=1, label="Frames per second")
vlen = gr.Slider(1,12, value=4, step=1, label="Video length (seconds)")
steps = gr.Slider(1,100, value=30, step=1, label="Num inference steps (optional)")
seed = gr.Number(label="Seed (optional)", precision=0)
video_out = gr.Video(label="Output")
status_md = gr.Markdown("") # shows ✅ Saved… after Chain
with gr.Row():
gen_btn = gr.Button("Generate", variant="primary")
chain_btn= gr.Button("Chain (save & clear)")
dl_btn = gr.Button("Download (concatenate & ZIP)")
files_out = gr.Files(label="Downloads")
# ---- handlers ----
def on_generate(prompt_txt, start_file_obj, neg, fps_val, vlen_val, steps_val, seed_val, seed_img):
if not prompt_txt or not prompt_txt.strip():
raise gr.Error("Prompt is required.")
# choose image_bytes source: last stolen frame > uploaded start file
image_path = seed_img
if not image_path:
if not start_file_obj: raise gr.Error("First generation requires an initial image OR video.")
image_path = start_file_obj.name
try:
out = call_backend(
prompt=prompt_txt.strip(),
image_bytes_path=image_path,
negative_prompt=(neg.strip() if (neg and neg.strip()) else None),
fps=int(fps_val),
vlen=int(vlen_val),
steps=int(steps_val) if steps_val else None,
seed=int(seed_val) if (seed_val not in [None,""]) else None
)
except Exception as e:
raise gr.Error(str(e))
return out, out, gr.update(value="") # show video, set state, clear status
gen_btn.click(
on_generate,
inputs=[prompt, start_file, negative, fps, vlen, steps, seed, last_seed_img],
outputs=[video_out, current_video, status_md]
)
def on_chain(curr, used):
"""
Save current video, steal last frame for next seed,
and CLEAR: preview, prompt, negative, start_file.
Also show a visible 'saved' banner.
"""
if not curr or not os.path.exists(curr):
raise gr.Error("Generate a video first.")
new_used = used[:] if used else []
if curr not in new_used: new_used.append(curr)
# last frame for next seed
seed_img = extract_last_frame(curr)
saved_idx = len(new_used)
saved_msg = f"✅ **Saved clip #{saved_idx}** — last frame captured. Ready for your next prompt."
return (
new_used, # used_videos
seed_img, # last_seed_img (image_bytes for next gen)
gr.update(value=None), # CLEAR prompt
gr.update(value=None), # CLEAR negative prompt
gr.update(value=None), # CLEAR start_file
gr.update(value=None), # CLEAR video preview
gr.update(value=saved_msg) # status banner
)
chain_btn.click(
on_chain,
inputs=[current_video, used_videos],
outputs=[used_videos, last_seed_img, prompt, negative, start_file, video_out, status_md]
)
def on_download(used):
if not used: raise gr.Error("Nothing to download. Click Chain after generating clips.")
files = []
try:
merged = concat_videos(used); files.append(merged)
except Exception as e:
print("concat error:", e)
try:
zipped = zip_used(used); files.append(zipped)
except Exception as e:
print("zip error:", e)
return files
dl_btn.click(on_download, inputs=[used_videos], outputs=[files_out])
if __name__ == "__main__":
demo.launch()