Shalmoni commited on
Commit
c6dce8c
·
verified ·
1 Parent(s): adbfb86

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +229 -160
app.py CHANGED
@@ -1,218 +1,286 @@
1
  import os
2
- import shutil
 
3
  import uuid
 
 
 
 
 
4
  from datetime import datetime
5
- import numpy as np
6
- from PIL import Image, ImageDraw, ImageFont
7
  import gradio as gr
8
- from moviepy.editor import (
9
- ImageClip,
10
- ColorClip,
11
- TextClip,
12
- CompositeVideoClip,
13
- concatenate_videoclips,
14
- VideoFileClip,
15
- )
16
-
17
- # ---------- Simple "video generator" stub ----------
18
- # Replace `generate_video_from_prompt` with your real model call.
19
- # It creates a short MP4 with a first frame (optionally from a previous video),
20
- # then overlays prompt text for a few seconds.
21
 
 
 
 
 
22
  OUT_DIR = "outputs"
23
- USED_DIR = os.path.join(OUT_DIR, "used")
24
  TMP_DIR = "tmp"
25
  os.makedirs(OUT_DIR, exist_ok=True)
26
- os.makedirs(USED_DIR, exist_ok=True)
27
  os.makedirs(TMP_DIR, exist_ok=True)
28
 
29
- FPS = 24
30
- W, H = 768, 432 # 16:9 HD-ish to keep files light
31
- DURATION = 3.0 # seconds per generated clip for demo
32
 
33
- def _solid_bg(color=(18, 18, 18)):
34
- return ColorClip(size=(W, H), color=color, duration=DURATION)
 
35
 
36
- def _text_overlay(txt: str):
37
- # Use TextClip if ImageMagick is available; otherwise fallback to PIL.
38
- try:
39
- return TextClip(
40
- txt,
41
- fontsize=48,
42
- color="white",
43
- font="Arial-Bold",
44
- size=(W - 80, None),
45
- method="caption",
46
- ).set_position(("center", "center")).set_duration(DURATION)
47
- except Exception:
48
- # PIL fallback
49
- img = Image.new("RGBA", (W, H), (0, 0, 0, 0))
50
- draw = ImageDraw.Draw(img)
51
- # Try to load a font; fallback to default if not available on HF
52
- try:
53
- font = ImageFont.truetype("DejaVuSans-Bold.ttf", 48)
54
- except Exception:
55
- font = ImageFont.load_default()
56
- # simple multiline center
57
- lines = []
58
- words = txt.split()
59
- line = ""
60
- for w in words:
61
- test = (line + " " + w).strip()
62
- if draw.textlength(test, font=font) > (W - 80):
63
- lines.append(line)
64
- line = w
65
- else:
66
- line = test
67
- lines.append(line)
68
-
69
- total_h = sum(font.getbbox(l)[3] for l in lines) + (len(lines)-1)*8
70
- y = (H - total_h)//2
71
- for l in lines:
72
- w_px = draw.textlength(l, font=font)
73
- x = (W - w_px)//2
74
- draw.text((x, y), l, fill=(255,255,255,255), font=font)
75
- y += font.getbbox(l)[3] + 8
76
- pil_path = os.path.join(TMP_DIR, f"txt_{uuid.uuid4().hex}.png")
77
- img.save(pil_path)
78
- return ImageClip(pil_path, duration=DURATION).set_position(("center","center"))
79
-
80
- def extract_last_frame_as_image(video_path: str) -> str:
81
- """Save last frame of video to an image file and return its path."""
82
- with VideoFileClip(video_path) as v:
83
- frame = v.get_frame(v.duration - 1.0 / max(1, v.fps))
84
- img = Image.fromarray(frame)
85
- frame_path = os.path.join(TMP_DIR, f"seed_{uuid.uuid4().hex}.png")
86
- img.save(frame_path)
87
- return frame_path
88
-
89
- def generate_video_from_prompt(prompt: str, seed_frame_path: str | None) -> str:
90
  """
91
- Make a short demo MP4 using:
92
- - If seed_frame_path: start 0.5s with that still frame
93
- - Then a solid background + prompt text
94
  """
95
- # Clips to concatenate
96
- clips = []
97
-
98
- if seed_frame_path and os.path.exists(seed_frame_path):
99
- seed = ImageClip(seed_frame_path, duration=0.5).set_fps(FPS)
100
- clips.append(seed)
101
-
102
- bg = _solid_bg().set_fps(FPS)
103
- txt = _text_overlay(prompt)
104
- comp = CompositeVideoClip([bg, txt]).set_duration(DURATION).set_fps(FPS)
105
- clips.append(comp)
106
-
107
- final = concatenate_videoclips(clips, method="compose")
108
- out_name = f"gen_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.mp4"
109
- out_path = os.path.join(OUT_DIR, out_name)
110
- final.write_videofile(out_path, fps=FPS, codec="libx264", audio=False, verbose=False, logger=None)
111
- final.close()
112
  return out_path
113
 
114
- def concat_used_videos(video_paths: list[str]) -> str:
115
- clips = [VideoFileClip(p) for p in video_paths]
116
- final = concatenate_videoclips(clips, method="compose")
117
- out_path = os.path.join(OUT_DIR, f"continuous_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.mp4")
118
- final.write_videofile(out_path, fps=FPS, codec="libx264", audio=False, verbose=False, logger=None)
119
- for c in clips:
120
- c.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
  return out_path
122
 
123
- def zip_used_videos(video_paths: list[str]) -> str:
124
- # Copy into a temp folder to zip cleanly
125
- stamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
126
- pack_dir = os.path.join(TMP_DIR, f"used_{stamp}_{uuid.uuid4().hex[:6]}")
 
 
 
127
  os.makedirs(pack_dir, exist_ok=True)
128
- for p in video_paths:
129
  shutil.copy(p, pack_dir)
130
  zip_base = os.path.join(OUT_DIR, f"used_{stamp}")
131
  shutil.make_archive(zip_base, "zip", pack_dir)
132
  shutil.rmtree(pack_dir, ignore_errors=True)
133
  return f"{zip_base}.zip"
134
 
135
- # ---------- Gradio App ----------
136
- with gr.Blocks(css=".grow {flex: 1}") as demo:
137
- gr.Markdown("# Continuous Video Prompt → Use → Chain → Download")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
- # Session state
140
- state_used_paths = gr.State([]) # list[str]
141
- state_seed_frame = gr.State(None) # str | None
142
- state_current_path = gr.State(None) # str | None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
 
144
  with gr.Row():
145
  prompt = gr.Textbox(
146
  label="Prompt",
147
  placeholder="Describe your next shot…",
148
  lines=2,
149
- autofocus=True,
150
  )
151
- with gr.Row(equal_height=True):
152
- video_out = gr.Video(label="Video Output", interactive=False).style(height=360)
 
153
  with gr.Row():
154
  btn_generate = gr.Button("Generate", variant="primary")
155
- btn_use = gr.Button("Use (chain this)", variant="secondary")
156
- btn_download = gr.Button("Download (A+B+C & ZIP)", variant="secondary")
157
  btn_reset = gr.Button("Reset Session", variant="stop")
158
 
159
- files_out = gr.Files(label="Downloads (concatenated MP4 + ZIP of used clips)", height=100)
 
 
160
 
161
- # ---- Handlers ----
162
- def do_generate(prompt_text, seed_frame_path):
163
  if not prompt_text or not prompt_text.strip():
164
- return None, None
165
- out_path = generate_video_from_prompt(prompt_text.strip(), seed_frame_path)
166
- return out_path, out_path # gr.Video path AND state_current_path
 
 
 
 
 
 
 
167
 
168
  btn_generate.click(
169
- do_generate,
170
- inputs=[prompt, state_seed_frame],
171
- outputs=[video_out, state_current_path],
172
  )
173
 
174
- def do_use(current_path, used_paths):
175
- """
176
- Save current_path to used list, extract its last frame as the next seed.
177
- """
178
  if not current_path or not os.path.exists(current_path):
179
- # no-op if nothing to use
180
- return used_paths, gr.update(interactive=True), None
181
-
182
- # Append to used list
183
  new_used = list(used_paths)
184
  if current_path not in new_used:
185
  new_used.append(current_path)
186
-
187
- # Extract last frame for next generation seed
188
- next_seed = extract_last_frame_as_image(current_path)
189
- return new_used, gr.update(interactive=True), next_seed
 
 
 
190
 
191
  btn_use.click(
192
- do_use,
193
- inputs=[state_current_path, state_used_paths],
194
- outputs=[state_used_paths, prompt, state_seed_frame],
195
  )
196
 
197
- def do_download(used_paths):
198
- """
199
- Build concatenated video (A+B+C) and a ZIP of used clips.
200
- Returns list of two files for the Files component.
201
- """
202
  if not used_paths:
203
  return []
204
- concat_path = concat_used_videos(used_paths)
205
- zip_path = zip_used_videos(used_paths)
206
- return [concat_path, zip_path]
 
 
 
 
 
 
 
207
 
208
  btn_download.click(
209
- do_download,
210
- inputs=[state_used_paths],
211
  outputs=[files_out],
212
  )
213
 
214
- def do_reset():
215
- # Clear session state and temp
216
  try:
217
  for f in os.listdir(TMP_DIR):
218
  fp = os.path.join(TMP_DIR, f)
@@ -220,13 +288,14 @@ with gr.Blocks(css=".grow {flex: 1}") as demo:
220
  os.remove(fp)
221
  except Exception:
222
  pass
223
- return None, [], None, None, gr.update(value=None), gr.update(value=[])
224
 
225
  btn_reset.click(
226
- do_reset,
227
  inputs=None,
228
- outputs=[state_seed_frame, state_used_paths, state_current_path, prompt, video_out, files_out],
229
  )
230
 
231
  if __name__ == "__main__":
232
  demo.launch()
 
 
1
  import os
2
+ import io
3
+ import json
4
  import uuid
5
+ import time
6
+ import shutil
7
+ import mimetypes
8
+ import requests
9
+ import subprocess
10
  from datetime import datetime
11
+ from typing import List, Optional
12
+
13
  import gradio as gr
14
+ from PIL import Image
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
+ # ================================
17
+ # Config
18
+ # ================================
19
+ ENDPOINT_URL = "https://moonmath-ai-dev--moonmath-i2v-backend-moonmathinference-run.modal.run" # your backend
20
  OUT_DIR = "outputs"
 
21
  TMP_DIR = "tmp"
22
  os.makedirs(OUT_DIR, exist_ok=True)
 
23
  os.makedirs(TMP_DIR, exist_ok=True)
24
 
25
+ # If ffmpeg is not on PATH for any reason, set absolute path here
26
+ FFMPEG_BIN = "ffmpeg"
 
27
 
28
+ # ================================
29
+ # Helpers
30
+ # ================================
31
 
32
+ def _ts() -> str:
33
+ return datetime.utcnow().strftime("%Y%m%d_%H%M%S")
34
+
35
+ def _safe_filename(prefix: str, ext: str) -> str:
36
+ return f"{prefix}_{_ts()}_{uuid.uuid4().hex[:8]}.{ext}"
37
+
38
+ def _run_ffmpeg(args: List[str]) -> None:
39
+ """Run ffmpeg with given args; raise on failure with readable message."""
40
+ cmd = [FFMPEG_BIN] + args
41
+ proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
42
+ if proc.returncode != 0:
43
+ raise RuntimeError(
44
+ f"ffmpeg failed ({proc.returncode}).\nSTDOUT:\n{proc.stdout.decode(errors='ignore')}\n\nSTDERR:\n{proc.stderr.decode(errors='ignore')}"
45
+ )
46
+
47
+ def extract_last_frame(video_path: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  """
49
+ Extract the very last frame as PNG using ffmpeg.
50
+ Uses -sseof -1 to seek to the last second.
 
51
  """
52
+ out_path = os.path.join(TMP_DIR, _safe_filename("lastframe", "png"))
53
+ # Seek to last second and output a single frame
54
+ _run_ffmpeg([
55
+ "-sseof", "-1",
56
+ "-i", video_path,
57
+ "-update", "1",
58
+ "-frames:v", "1",
59
+ "-q:v", "2",
60
+ out_path
61
+ ])
 
 
 
 
 
 
 
62
  return out_path
63
 
64
+ def concat_videos_ffmpeg(video_paths: List[str]) -> str:
65
+ """
66
+ Concatenate MP4s using ffmpeg concat demuxer.
67
+ """
68
+ if len(video_paths) == 1:
69
+ # copy the single file to a new name to keep UX consistent
70
+ dst = os.path.join(OUT_DIR, _safe_filename("continuous", "mp4"))
71
+ shutil.copy(video_paths[0], dst)
72
+ return dst
73
+
74
+ list_file = os.path.join(TMP_DIR, f"concat_{uuid.uuid4().hex}.txt")
75
+ with open(list_file, "w") as f:
76
+ for p in video_paths:
77
+ # paths must be quoted if they may contain spaces
78
+ f.write(f"file '{os.path.abspath(p)}'\n")
79
+
80
+ out_path = os.path.join(OUT_DIR, _safe_filename("continuous", "mp4"))
81
+ _run_ffmpeg([
82
+ "-f", "concat",
83
+ "-safe", "0",
84
+ "-i", list_file,
85
+ "-c", "copy",
86
+ out_path
87
+ ])
88
+ try:
89
+ os.remove(list_file)
90
+ except Exception:
91
+ pass
92
  return out_path
93
 
94
+ def zip_files(file_paths: List[str]) -> str:
95
+ base = os.path.join(OUT_DIR, f"used_{_ts()}")
96
+ shutil.make_archive(base, "zip", root_dir=os.path.dirname(file_paths[0]) if file_paths else ".", base_dir=".")
97
+ # The above zips the entire directory — better: copy into temp pack dir first
98
+ # For precision, we’ll do that instead:
99
+ stamp = _ts()
100
+ pack_dir = os.path.join(TMP_DIR, f"pack_{stamp}_{uuid.uuid4().hex[:6]}")
101
  os.makedirs(pack_dir, exist_ok=True)
102
+ for p in file_paths:
103
  shutil.copy(p, pack_dir)
104
  zip_base = os.path.join(OUT_DIR, f"used_{stamp}")
105
  shutil.make_archive(zip_base, "zip", pack_dir)
106
  shutil.rmtree(pack_dir, ignore_errors=True)
107
  return f"{zip_base}.zip"
108
 
109
+ def _guess_filename_from_response(resp: requests.Response, default_ext: str = "mp4") -> str:
110
+ # Try Content-Disposition
111
+ cd = resp.headers.get("Content-Disposition", "")
112
+ if "filename=" in cd:
113
+ name = cd.split("filename=")[-1].strip('"; ')
114
+ if name:
115
+ return name
116
+ # Otherwise, attempt from content-type
117
+ ctype = resp.headers.get("Content-Type", "")
118
+ ext = mimetypes.guess_extension(ctype) or f".{default_ext}"
119
+ return _safe_filename("gen", ext.lstrip("."))
120
+
121
+ def save_bytes_to_file(content: bytes, ext: str = "mp4") -> str:
122
+ path = os.path.join(OUT_DIR, _safe_filename("gen", ext))
123
+ with open(path, "wb") as f:
124
+ f.write(content)
125
+ return path
126
 
127
+ # ================================
128
+ # Backend call
129
+ # ================================
130
+ def call_generator(prompt: str, seed_frame_path: Optional[str]) -> str:
131
+ """
132
+ Calls your Modal endpoint. Supports two response modes:
133
+ 1) Direct video bytes (Content-Type like video/mp4)
134
+ 2) JSON with {"video_url": "..."} which we then download.
135
+
136
+ We send:
137
+ - form field: prompt
138
+ - optional file: seed_frame (PNG/JPG)
139
+ """
140
+ files = {}
141
+ data = {"prompt": prompt}
142
+
143
+ if seed_frame_path and os.path.exists(seed_frame_path):
144
+ files["seed_frame"] = (os.path.basename(seed_frame_path), open(seed_frame_path, "rb"), "image/png")
145
+
146
+ # Make the request
147
+ resp = requests.post(ENDPOINT_URL, data=data, files=files, timeout=600)
148
+ if files:
149
+ # close file handles
150
+ for _, f in files.items():
151
+ # f is a tuple (name, fileobj, mimetype)
152
+ try:
153
+ f[1].close()
154
+ except Exception:
155
+ pass
156
+
157
+ if resp.status_code != 200:
158
+ raise RuntimeError(f"Generator returned {resp.status_code}: {resp.text[:500]}")
159
+
160
+ ctype = resp.headers.get("Content-Type", "")
161
+ if ctype.startswith("video/"):
162
+ # Direct bytes
163
+ ext = mimetypes.guess_extension(ctype) or ".mp4"
164
+ out_path = os.path.join(OUT_DIR, _safe_filename("gen", ext.lstrip(".")))
165
+ with open(out_path, "wb") as f:
166
+ f.write(resp.content)
167
+ return out_path
168
+
169
+ # Assume JSON with URL
170
+ try:
171
+ payload = resp.json()
172
+ except Exception:
173
+ # fallback: treat body as raw mp4
174
+ return save_bytes_to_file(resp.content, "mp4")
175
+
176
+ video_url = payload.get("video_url")
177
+ if not video_url:
178
+ # Maybe it returned base64? If so, add handling here if needed.
179
+ raise RuntimeError(f"Unexpected response: {json.dumps(payload)[:500]}")
180
+
181
+ # Download the video
182
+ r2 = requests.get(video_url, stream=True, timeout=600)
183
+ if r2.status_code != 200:
184
+ raise RuntimeError(f"Failed to fetch video from URL: {r2.status_code}")
185
+ ext = mimetypes.guess_extension(r2.headers.get("Content-Type", "")) or ".mp4"
186
+ out_path = os.path.join(OUT_DIR, _safe_filename("gen", ext.lstrip(".")))
187
+ with open(out_path, "wb") as f:
188
+ for chunk in r2.iter_content(chunk_size=1024 * 1024):
189
+ if chunk:
190
+ f.write(chunk)
191
+ return out_path
192
+
193
+ # ================================
194
+ # Gradio UI
195
+ # ================================
196
+ with gr.Blocks() as demo:
197
+ gr.Markdown("## Continuous Video (Prompt → Use → Chain → Download)")
198
+
199
+ state_used = gr.State([]) # list[str] of used clip paths
200
+ state_seed = gr.State(None) # path to last-frame image
201
+ state_current = gr.State(None) # most recent generated video path
202
 
203
  with gr.Row():
204
  prompt = gr.Textbox(
205
  label="Prompt",
206
  placeholder="Describe your next shot…",
207
  lines=2,
208
+ autofocus=True
209
  )
210
+ with gr.Row():
211
+ video_out = gr.Video(label="Video Output", interactive=False)
212
+
213
  with gr.Row():
214
  btn_generate = gr.Button("Generate", variant="primary")
215
+ btn_use = gr.Button("Use (chain this)")
216
+ btn_download = gr.Button("Download (A+B+C & ZIP)")
217
  btn_reset = gr.Button("Reset Session", variant="stop")
218
 
219
+ files_out = gr.Files(label="Downloads", height=100)
220
+
221
+ # ---------- Handlers ----------
222
 
223
+ def on_generate(prompt_text, seed_frame):
 
224
  if not prompt_text or not prompt_text.strip():
225
+ return gr.update(value=None), None
226
+ try:
227
+ vid_path = call_generator(prompt_text.strip(), seed_frame)
228
+ except Exception as e:
229
+ # Surface the error in the UI
230
+ err_mp4 = os.path.join(OUT_DIR, _safe_filename("error", "txt"))
231
+ with open(err_mp4, "w") as f:
232
+ f.write(str(e))
233
+ return gr.update(value=None), None
234
+ return vid_path, vid_path
235
 
236
  btn_generate.click(
237
+ on_generate,
238
+ inputs=[prompt, state_seed],
239
+ outputs=[video_out, state_current],
240
  )
241
 
242
+ def on_use(current_path, used_paths):
 
 
 
243
  if not current_path or not os.path.exists(current_path):
244
+ return used_paths, None
 
 
 
245
  new_used = list(used_paths)
246
  if current_path not in new_used:
247
  new_used.append(current_path)
248
+ # Extract last frame -> seed for next gen
249
+ try:
250
+ seed_img = extract_last_frame(current_path)
251
+ except Exception as e:
252
+ # If extraction fails, keep chaining logic but without seed
253
+ seed_img = None
254
+ return new_used, seed_img
255
 
256
  btn_use.click(
257
+ on_use,
258
+ inputs=[state_current, state_used],
259
+ outputs=[state_used, state_seed],
260
  )
261
 
262
+ def on_download(used_paths):
 
 
 
 
263
  if not used_paths:
264
  return []
265
+ try:
266
+ concat_path = concat_videos_ffmpeg(used_paths)
267
+ except Exception as e:
268
+ # If concat fails, skip it but still offer the ZIP
269
+ concat_path = None
270
+ zip_path = zip_files(used_paths)
271
+ files = [zip_path]
272
+ if concat_path:
273
+ files.insert(0, concat_path)
274
+ return files
275
 
276
  btn_download.click(
277
+ on_download,
278
+ inputs=[state_used],
279
  outputs=[files_out],
280
  )
281
 
282
+ def on_reset():
283
+ # Clear temp files and state
284
  try:
285
  for f in os.listdir(TMP_DIR):
286
  fp = os.path.join(TMP_DIR, f)
 
288
  os.remove(fp)
289
  except Exception:
290
  pass
291
+ return None, [], None, gr.update(value=None), gr.update(value=None), gr.update(value=[])
292
 
293
  btn_reset.click(
294
+ on_reset,
295
  inputs=None,
296
+ outputs=[state_seed, state_used, state_current, prompt, video_out, files_out],
297
  )
298
 
299
  if __name__ == "__main__":
300
  demo.launch()
301
+