Spaces:
Sleeping
Sleeping
File size: 8,429 Bytes
62998c3 0c80388 62998c3 c6dce8c 62998c3 0c80388 62998c3 0c80388 62998c3 c6dce8c 62998c3 c6dce8c 0c80388 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 0c80388 62998c3 0c80388 c6dce8c 62998c3 0c80388 62998c3 0c80388 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 0c80388 62998c3 c6dce8c 62998c3 0c80388 62998c3 0c80388 62998c3 0c80388 62998c3 c6dce8c 62998c3 c6dce8c 62998c3 0c80388 62998c3 0c80388 62998c3 0c80388 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
import os, uuid, time, shutil, mimetypes, subprocess, requests, gradio as gr
from datetime import datetime
from typing import Optional, List
# --- config ---
ENDPOINT = "https://moonmath-ai-dev--moonmath-i2v-backend-moonmathinference-run.modal.run"
FFMPEG = "ffmpeg"
OUT, TMP = "outputs", "tmp"
os.makedirs(OUT, exist_ok=True); os.makedirs(TMP, exist_ok=True)
ts = lambda: datetime.utcnow().strftime("%Y%m%d_%H%M%S")
fname = lambda p,e: f"{p}_{ts()}_{uuid.uuid4().hex[:6]}.{e}"
abspath= lambda p: os.path.abspath(p)
def run_ffmpeg(args: List[str]):
p = subprocess.run([FFMPEG]+args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p.returncode:
raise RuntimeError(p.stderr.decode(errors="ignore"))
def extract_last_frame(video_path: str) -> str:
out = os.path.join(TMP, fname("last", "png"))
run_ffmpeg(["-sseof","-1","-i",video_path,"-frames:v","1","-q:v","2",out])
return out
def concat_videos(paths: List[str]) -> str:
if not paths: raise ValueError("No videos selected.")
if len(paths)==1:
dst = os.path.join(OUT, fname("continuous","mp4")); shutil.copy(paths[0], dst); return dst
listfile = os.path.join(TMP, f"concat_{uuid.uuid4().hex}.txt")
with open(listfile,"w") as f:
for p in paths: f.write(f"file '{abspath(p)}'\n")
out = os.path.join(OUT, fname("continuous","mp4"))
run_ffmpeg(["-f","concat","-safe","0","-i",listfile,"-c","copy",out])
os.remove(listfile)
return out
def zip_used(paths: List[str]) -> str:
pack = os.path.join(TMP, f"pack_{uuid.uuid4().hex[:6]}")
os.makedirs(pack, exist_ok=True)
for p in paths: shutil.copy(p, pack)
base = os.path.join(OUT, f"used_{ts()}")
shutil.make_archive(base, "zip", pack)
shutil.rmtree(pack, ignore_errors=True)
return base + ".zip"
def save_video_bytes(content: bytes, content_type: str) -> str:
ext = (mimetypes.guess_extension(content_type) or ".mp4").lstrip(".")
path = os.path.join(OUT, fname("gen", ext))
with open(path,"wb") as f: f.write(content)
return path
def call_backend(
prompt: str,
image_bytes_path: str,
negative_prompt: Optional[str],
fps: int,
vlen: int,
steps: Optional[int],
seed: Optional[int]
) -> str:
params = {
"prompt": prompt,
"frames_per_second": str(fps),
"video_length": str(vlen),
}
if negative_prompt: params["negative_prompt"] = negative_prompt
if steps is not None: params["num_inference_steps"] = str(steps)
if seed is None: seed = int(time.time())
params["seed"] = str(seed)
files = {"image_bytes": (os.path.basename(image_bytes_path), open(image_bytes_path,"rb"),
"application/octet-stream")}
try:
r = requests.post(ENDPOINT, params=params, files=files, headers={"accept":"application/json"}, timeout=600)
finally:
try: files["image_bytes"][1].close()
except: pass
if r.status_code != 200:
raise RuntimeError(f"Backend {r.status_code}: {r.text[:500]}")
ctype = r.headers.get("Content-Type","")
if ctype.startswith("video/"): # raw video bytes
return save_video_bytes(r.content, ctype)
# expect JSON with { "video_url": ... }
try:
payload = r.json()
url = payload.get("video_url")
if not url: raise ValueError("no video_url in response")
r2 = requests.get(url, stream=True, timeout=600)
if r2.status_code != 200: raise RuntimeError(f"fetch video {r2.status_code}")
path = os.path.join(OUT, fname("gen","mp4"))
with open(path,"wb") as f:
for chunk in r2.iter_content(1<<20):
if chunk: f.write(chunk)
return path
except Exception:
# fallback if backend mislabels content
return save_video_bytes(r.content, ctype or "video/mp4")
# ================= UI =================
with gr.Blocks() as demo:
gr.Markdown("## Continuous Video — chain last frame → next first frame")
# state
used_videos = gr.State([]) # list[str]
last_seed_img = gr.State(None) # PNG path (becomes image_bytes)
current_video = gr.State(None) # latest generated video
with gr.Row():
prompt = gr.Textbox(label="Prompt", placeholder="Describe your shot…", lines=2)
with gr.Row():
start_file = gr.File(
label="Initial start image or video (only for the very first clip)",
file_types=["image","video"]
)
with gr.Row():
negative = gr.Textbox(label="Negative prompt (optional)", placeholder="What to avoid…")
with gr.Row():
fps = gr.Slider(1,60, value=24, step=1, label="Frames per second")
vlen = gr.Slider(1,12, value=4, step=1, label="Video length (seconds)")
steps = gr.Slider(1,100, value=30, step=1, label="Num inference steps (optional)")
seed = gr.Number(label="Seed (optional)", precision=0)
video_out = gr.Video(label="Output")
status_md = gr.Markdown("") # shows ✅ Saved… after Chain
with gr.Row():
gen_btn = gr.Button("Generate", variant="primary")
chain_btn= gr.Button("Chain (save & clear)")
dl_btn = gr.Button("Download (concatenate & ZIP)")
files_out = gr.Files(label="Downloads")
# ---- handlers ----
def on_generate(prompt_txt, start_file_obj, neg, fps_val, vlen_val, steps_val, seed_val, seed_img):
if not prompt_txt or not prompt_txt.strip():
raise gr.Error("Prompt is required.")
# choose image_bytes source: last stolen frame > uploaded start file
image_path = seed_img
if not image_path:
if not start_file_obj: raise gr.Error("First generation requires an initial image OR video.")
image_path = start_file_obj.name
try:
out = call_backend(
prompt=prompt_txt.strip(),
image_bytes_path=image_path,
negative_prompt=(neg.strip() if (neg and neg.strip()) else None),
fps=int(fps_val),
vlen=int(vlen_val),
steps=int(steps_val) if steps_val else None,
seed=int(seed_val) if (seed_val not in [None,""]) else None
)
except Exception as e:
raise gr.Error(str(e))
return out, out, gr.update(value="") # show video, set state, clear status
gen_btn.click(
on_generate,
inputs=[prompt, start_file, negative, fps, vlen, steps, seed, last_seed_img],
outputs=[video_out, current_video, status_md]
)
def on_chain(curr, used):
"""
Save current video, steal last frame for next seed,
and CLEAR: preview, prompt, negative, start_file.
Also show a visible 'saved' banner.
"""
if not curr or not os.path.exists(curr):
raise gr.Error("Generate a video first.")
new_used = used[:] if used else []
if curr not in new_used: new_used.append(curr)
# last frame for next seed
seed_img = extract_last_frame(curr)
saved_idx = len(new_used)
saved_msg = f"✅ **Saved clip #{saved_idx}** — last frame captured. Ready for your next prompt."
return (
new_used, # used_videos
seed_img, # last_seed_img (image_bytes for next gen)
gr.update(value=None), # CLEAR prompt
gr.update(value=None), # CLEAR negative prompt
gr.update(value=None), # CLEAR start_file
gr.update(value=None), # CLEAR video preview
gr.update(value=saved_msg) # status banner
)
chain_btn.click(
on_chain,
inputs=[current_video, used_videos],
outputs=[used_videos, last_seed_img, prompt, negative, start_file, video_out, status_md]
)
def on_download(used):
if not used: raise gr.Error("Nothing to download. Click Chain after generating clips.")
files = []
try:
merged = concat_videos(used); files.append(merged)
except Exception as e:
print("concat error:", e)
try:
zipped = zip_used(used); files.append(zipped)
except Exception as e:
print("zip error:", e)
return files
dl_btn.click(on_download, inputs=[used_videos], outputs=[files_out])
if __name__ == "__main__":
demo.launch()
|