Spaces:
Sleeping
Sleeping
| # app.py — Shot Grammar Adapter — Proof (clean + auth) | |
| import os, json, tempfile, zipfile, random | |
| from typing import List, Tuple | |
| os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") | |
| import traceback | |
| import numpy as np | |
| import gradio as gr | |
| import imageio | |
| import torch | |
| CPU_THREADS = max(1, min(4, os.cpu_count() // 2)) | |
| torch.set_num_threads(CPU_THREADS) | |
| os.environ["OMP_NUM_THREADS"] = str(CPU_THREADS) | |
| os.environ.setdefault("MKL_NUM_THREADS", str(CPU_THREADS)) | |
| print(f"[Threads] torch={torch.get_num_threads()} OMP={os.getenv('OMP_NUM_THREADS')}") | |
| # ----------------------------- | |
| # Config | |
| # ----------------------------- | |
| MODEL_ID = os.getenv("VIDEO_MODEL_ID", "damo-vilab/text-to-video-ms-1.7b") | |
| DATA_PATH = os.getenv("SHOTS_JSONL", "shots_public_subset.jsonl") | |
| assert os.path.exists(DATA_PATH), f"Missing data file: {DATA_PATH}" | |
| DEFAULT_NUM_FRAMES = int(os.getenv("DEF_FRAMES", 12)) | |
| DEFAULT_GUIDANCE = float(os.getenv("DEF_GUIDANCE", 6.0)) | |
| DEFAULT_STEPS = int(os.getenv("DEF_STEPS", 10)) | |
| DEFAULT_SIZE = int(os.getenv("DEF_SIZE", 256)) | |
| DEFAULT_FPS = int(os.getenv("DEF_FPS", 8)) | |
| MAX_BATCH = int(os.getenv("MAX_BATCH", 30)) | |
| # ----------------------------- | |
| # Data load | |
| # ----------------------------- | |
| ROWS = [] | |
| with open(DATA_PATH, "r", encoding="utf-8") as f: | |
| for line in f: | |
| s = line.strip() | |
| if not s: | |
| continue | |
| ROWS.append(json.loads(s)) | |
| INDEX = {r["shot_id"]: r for r in ROWS} | |
| SHOT_IDS = list(INDEX.keys()) | |
| assert SHOT_IDS, "No shots found in JSONL." | |
| def pretty_features(row: dict) -> dict: | |
| feat = row.get("features", {}) | |
| return { | |
| "ep_id": row.get("ep_id"), | |
| "shot_id": row.get("shot_id"), | |
| "size": feat.get("size"), | |
| "angle": feat.get("angle"), | |
| "motion": feat.get("motion"), | |
| "relation": feat.get("relation"), | |
| "duration": feat.get("duration"), | |
| "prompt": row.get("prompt"), | |
| } | |
| # ----------------------------- | |
| # Pipeline (CPU-optimized) | |
| # ----------------------------- | |
| from diffusers import DiffusionPipeline | |
| from diffusers import DPMSolverMultistepScheduler | |
| device = "cpu" | |
| pipe = DiffusionPipeline.from_pretrained(MODEL_ID, dtype=torch.float32) | |
| pipe.to("cpu") | |
| if hasattr(pipe, "enable_attention_slicing"): | |
| pipe.enable_attention_slicing() | |
| if hasattr(pipe, "enable_vae_slicing"): | |
| pipe.enable_vae_slicing() | |
| if hasattr(pipe, "enable_sequential_cpu_offload") and torch.cuda.is_available(): | |
| pipe.enable_sequential_cpu_offload() | |
| pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config) | |
| if hasattr(pipe, "set_progress_bar_config"): | |
| pipe.set_progress_bar_config(disable=True) | |
| CAMERA_LIB = { | |
| ("MS","eye","single","static"): | |
| "medium shot, chest-up framing, eye-level, single subject, locked tripod, shallow depth of field, subject centered", | |
| ("MS","eye","ots","dolly-in"): | |
| "over-the-shoulder framing, foreground shoulder soft-focus, background subject sharp, slow dolly-in, subtle parallax, conversational tension", | |
| ("WS","slight_high","group","static"): | |
| "wide shot, slight high angle, group composition, static camera, staging clarity, environment emphasis", | |
| ("EWS","eye","group","static"): | |
| "extreme wide establishing shot, horizon line at mid-height, environmental scale, small human figures", | |
| } | |
| def camera_tokens(row): | |
| f = row.get("features", {}) | |
| key = (f.get("size"), f.get("angle"), f.get("relation"), f.get("motion")) | |
| return CAMERA_LIB.get(key, "") | |
| # ----------------------------- | |
| # Prompt compose | |
| # ----------------------------- | |
| def compose_prompt(shot_row, fixed_prompt, mode="Fixed + Shot"): | |
| shot_prompt = shot_row.get("prompt","").strip() | |
| cam = camera_tokens(shot_row) | |
| blocks = [] | |
| if mode in ("Fixed + Shot","Fixed-only") and fixed_prompt.strip(): | |
| blocks.append(fixed_prompt.strip()) | |
| if mode in ("Fixed + Shot","Shot-only") and shot_prompt: | |
| blocks.append(shot_prompt) | |
| if cam: | |
| blocks.append(f"[CAMERA PRIOR] {cam}") | |
| return "\n\n".join(blocks).strip() | |
| def infer_one(shot_id: str, num_frames: int, guidance: float, steps: int, size: int, | |
| fixed_prompt: str, combine_mode: str, lock_duration: bool, fps: int = DEFAULT_FPS) -> Tuple[str, str]: | |
| row = INDEX[shot_id] | |
| frames_eff = int(row.get("features", {}).get("duration", 0) * fps) if lock_duration else int(num_frames) | |
| frames_eff = max(4, min(32, frames_eff)) | |
| final_prompt = compose_prompt(row, fixed_prompt, combine_mode) | |
| generator = torch.Generator(device="cpu").manual_seed(42) | |
| result = safe_pipe( | |
| final_prompt, | |
| num_frames=frames_eff, | |
| steps=steps, | |
| guidance=guidance, | |
| size=size, | |
| generator=generator, | |
| ) | |
| frames = _result_to_frames(result) | |
| gif_path = save_gif(frames, fps=fps, shot_id=shot_id) | |
| meta = pretty_features(row) | |
| meta["final_prompt"] = final_prompt | |
| meta["frames_used"] = frames_eff | |
| meta_json = json.dumps(meta, ensure_ascii=False, indent=2) | |
| return meta_json, gif_path | |
| def safe_pipe(prompt, num_frames, steps, guidance, size, generator=None, negative_prompt=None): | |
| kw = dict( | |
| prompt=prompt, num_frames=int(num_frames), | |
| num_inference_steps=int(steps), guidance_scale=float(guidance), | |
| height=int(size), width=int(size), generator=generator | |
| ) | |
| if "negative_prompt" in pipe.__call__.__code__.co_varnames: | |
| kw["negative_prompt"] = negative_prompt | |
| return pipe(**kw) | |
| # ----------------------------- | |
| # Utils | |
| # ----------------------------- | |
| def _result_to_frames(result): | |
| import numpy as np | |
| from PIL import Image | |
| def _to_uint8(arr): | |
| arr = np.asarray(arr) | |
| if arr.dtype == np.uint8: | |
| return arr | |
| a_min, a_max = float(arr.min()), float(arr.max()) | |
| if a_min >= -1.0 and a_max <= 1.0: | |
| arr = ((arr + 1.0) * 127.5).clip(0, 255).astype(np.uint8) | |
| return arr | |
| if a_min >= 0.0 and a_max <= 1.0: | |
| arr = (arr * 255.0).clip(0, 255).astype(np.uint8) | |
| return arr | |
| return arr.clip(0, 255).astype(np.uint8) | |
| def _as_list_of_arrays(x): | |
| if isinstance(x, (list, tuple)): | |
| out = [] | |
| for f in x: | |
| if isinstance(f, Image.Image): | |
| out.append(np.array(f)) | |
| elif isinstance(f, np.ndarray): | |
| out.append(f) | |
| else: | |
| raise TypeError(f"Unsupported frame element type: {type(f)}") | |
| return out | |
| if isinstance(x, np.ndarray): | |
| # 허용 형태: | |
| # 3D: (H, W, C) | |
| # 4D: (T, H, W, C) 또는 (T, C, H, W) | |
| # 5D: (B, T, H, W, C) 또는 (B, T, C, H, W) | |
| if x.ndim == 3: | |
| return [x] | |
| if x.ndim == 4: | |
| if x.shape[-1] in (3, 4): # (T, H, W, C) | |
| return [f for f in x] | |
| if x.shape[1] in (3, 4): # (T, C, H, W) | |
| x = np.transpose(x, (0, 2, 3, 1)) | |
| return [f for f in x] | |
| raise ValueError(f"Unexpected ndarray 4D shape: {x.shape}") | |
| if x.ndim == 5: | |
| # (B, T, H, W, C) → (B*T, H, W, C) | |
| if x.shape[-1] in (3, 4) and x.shape[2] == x.shape[3]: | |
| x = x.reshape(-1, x.shape[2], x.shape[3], x.shape[4]) | |
| return [f for f in x] | |
| # (B, T, C, H, W) → (B, T, H, W, C) → (B*T, H, W, C) | |
| if x.shape[2] in (3, 4): | |
| x = np.transpose(x, (0, 1, 3, 4, 2)).reshape(-1, x.shape[3], x.shape[4], x.shape[2]) | |
| return [f for f in x] | |
| # (1, T, H, W, C) 같은 경우는 위에서 커버되지만, 혹시 모를 싱글톤 압축 | |
| if x.shape[0] == 1: | |
| return _as_list_of_arrays(x[0]) | |
| raise ValueError(f"Unexpected ndarray 5D shape: {x.shape}") | |
| raise ValueError(f"Unexpected ndarray shape: {x.shape}") | |
| try: | |
| import torch | |
| if isinstance(x, torch.Tensor): | |
| t = x.detach().cpu() | |
| # 허용 형태: | |
| # 3D: (H, W, C) 또는 (C, H, W) | |
| # 4D: (T, H, W, C) / (T, C, H, W) | |
| # 5D: (B, T, H, W, C) / (B, T, C, H, W) | |
| if t.ndim == 5: | |
| # (B, T, C, H, W) → (B, T, H, W, C) | |
| if t.shape[2] in (1, 3, 4): | |
| t = t.permute(0, 1, 3, 4, 2).contiguous() | |
| # (B, T, H, W, C) 가정 → (B*T, H, W, C) | |
| t = t.reshape(-1, t.shape[2], t.shape[3], t.shape[4]) | |
| arr = t.numpy() | |
| return [f for f in arr] | |
| if t.ndim == 4: | |
| # (T, C, H, W) → (T, H, W, C) | |
| if t.shape[1] in (1, 3, 4): | |
| t = t.permute(0, 2, 3, 1).contiguous() | |
| arr = t.numpy() | |
| return [f for f in arr] | |
| if t.ndim == 3: | |
| # (C, H, W) → (H, W, C) | |
| if t.shape[0] in (1, 3, 4): | |
| t = t.permute(1, 2, 0).contiguous() | |
| else: | |
| t = t.unsqueeze(0) # (H, W, C?) → (1, H, W, C?) | |
| arr = t.numpy() | |
| return [f for f in arr] | |
| except Exception: | |
| pass | |
| if isinstance(x, Image.Image): | |
| return [np.array(x)] | |
| candidates = [] | |
| if isinstance(result, dict): | |
| for key in ("frames", "images", "frames_list", "videos"): | |
| if key in result and result[key] is not None: | |
| candidates = result[key] | |
| break | |
| else: | |
| for attr in ("frames", "images", "videos"): | |
| if hasattr(result, attr): | |
| candidates = getattr(result, attr) | |
| if candidates is not None: | |
| break | |
| if candidates is not None and candidates != []: | |
| return _as_list_of_arrays(candidates) | |
| raise TypeError(f"Unsupported result type: {type(x)}") | |
| candidate = None | |
| if isinstance(result, dict): | |
| for k in ("frames", "images", "frames_list", "videos"): | |
| if k in result and result[k] is not None: | |
| candidate = result[k]; break | |
| else: | |
| for attr in ("frames", "images", "videos"): | |
| if hasattr(result, attr): | |
| v = getattr(result, attr) | |
| if v is not None: | |
| candidate = v; break | |
| if candidate is None: | |
| candidate = result | |
| arr_list = _as_list_of_arrays(candidate) | |
| out = [] | |
| for arr in arr_list: | |
| arr = _to_uint8(arr) | |
| if arr.ndim == 2: | |
| arr = np.stack([arr] * 3, axis=-1) | |
| if arr.ndim != 3 or arr.shape[2] not in (3, 4): | |
| raise gr.Error(f"Unexpected frame shape: {arr.shape}. Expected HxWx3(or 4).") | |
| out.append(Image.fromarray(arr)) | |
| if len(out) == 0: | |
| raise gr.Error("No frames generated. Try lowering Frames/Steps/Resolution or add content tokens.") | |
| return out | |
| def save_gif(frames, fps: int = DEFAULT_FPS, shot_id: str = "clip") -> str: | |
| import numpy as np | |
| from PIL import Image | |
| frames_np = [] | |
| for f in frames: | |
| if isinstance(f, Image.Image): | |
| f = np.array(f.convert("RGB")) | |
| elif isinstance(f, np.ndarray) and f.ndim == 2: | |
| f = np.stack([f]*3, axis=-1) | |
| frames_np.append(f.astype(np.uint8)) | |
| tmpdir = tempfile.mkdtemp() | |
| out_path = os.path.join(tmpdir, f"{shot_id}.gif") | |
| imageio.mimsave(out_path, frames_np, duration=1.0 / fps) | |
| return out_path | |
| def save_mp4(frames, fps: int = DEFAULT_FPS, shot_id: str = "clip") -> str: | |
| import numpy as np | |
| from PIL import Image | |
| tmpdir = tempfile.mkdtemp() | |
| out_path = os.path.join(tmpdir, f"{shot_id}.mp4") | |
| try: | |
| writer = imageio.get_writer(out_path, format="FFMPEG", fps=fps, codec="mpeg4", quality=6) | |
| except Exception: | |
| writer = imageio.get_writer(out_path, format="FFMPEG", fps=fps, codec="libx264", quality=6) | |
| for f in frames: | |
| if isinstance(f, Image.Image): | |
| f = np.array(f.convert("RGB")) | |
| elif isinstance(f, np.ndarray) and f.ndim == 2: | |
| f = np.stack([f]*3, axis=-1) | |
| writer.append_data(f.astype(np.uint8)) | |
| writer.close() | |
| return out_path | |
| # ----------------------------- | |
| # Inference | |
| # ----------------------------- | |
| def infer_batch(shot_ids: List[str], num_frames: int, guidance: float, steps: int, size: int, | |
| fixed_prompt: str, combine_mode: str, lock_duration: bool, fps: int = DEFAULT_FPS) -> str: | |
| if not shot_ids: | |
| raise gr.Error("Pick at least one shot for batch.") | |
| outputs, metas = [], [] | |
| for i, sid in enumerate(shot_ids): | |
| row = INDEX[sid] | |
| frames_eff = int(row.get("features", {}).get("duration", 0) * fps) if lock_duration else int(num_frames) | |
| frames_eff = max(4, min(32, frames_eff)) | |
| final_prompt = compose_prompt(row, fixed_prompt, combine_mode) | |
| generator = torch.Generator(device="cpu").manual_seed(42 + i) | |
| result = safe_pipe( | |
| final_prompt, | |
| num_frames=frames_eff, | |
| steps=steps, | |
| guidance=guidance, | |
| size=size, | |
| generator=generator, | |
| ) | |
| frames = _result_to_frames(result) | |
| mp4_path = save_mp4(frames, fps=fps, shot_id=sid) | |
| outputs.append(mp4_path) | |
| m = pretty_features(row) | |
| m["final_prompt"] = final_prompt | |
| m["frames_used"] = frames_eff | |
| metas.append(m) | |
| preview_mp4 = outputs[0] if outputs else None | |
| zip_path = tempfile.mktemp(suffix=".zip") | |
| with zipfile.ZipFile(zip_path, "w") as z: | |
| for p in outputs: | |
| z.write(p, arcname=os.path.basename(p)) | |
| meta_path = tempfile.mktemp(suffix=".json") | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump(metas, f, ensure_ascii=False, indent=2) | |
| z.write(meta_path, arcname="metadata.json") | |
| return zip_path, preview_mp4 | |
| def self_test_io(): | |
| import numpy as np, os | |
| H, W = 256, 256 | |
| frames = [] | |
| x = np.linspace(0, 255, W, dtype=np.uint8)[None, :].repeat(H, axis=0) | |
| for t in range(8): | |
| r = x | |
| g = np.roll(x, t*12, axis=1) | |
| b = np.flipud(x) | |
| rgb = np.stack([r, g, b], axis=-1) | |
| frames.append(rgb) | |
| gif = save_gif(frames, fps=8, shot_id="selftest") | |
| mp4 = save_mp4(frames, fps=8, shot_id="selftest") | |
| return f"Self Test OK — GIF:{os.path.basename(gif)} MP4:{os.path.basename(mp4)}", gif, mp4 | |
| # ----------------------------- | |
| # UI | |
| # ----------------------------- | |
| PRESET_MAP = { | |
| "Low • fastest (CPU)": dict(frames=8, steps=8, guidance=5.5, size=224), | |
| "Med • balanced": dict(frames=12, steps=10, guidance=6.5, size=256), | |
| "High • slower": dict(frames=16, steps=12, guidance=7.0, size=320), | |
| } | |
| with gr.Blocks(title="Shot Grammar Adapter — Proof") as demo: | |
| gr.Markdown( | |
| "🔒 **Shot Grammar Adapter — Proof** \n" | |
| "Team-only proof page. If you need access, contact the owner.\n\n" | |
| "# Shot Grammar Adapter — Proof\n" | |
| "Turn shot grammar JSON into controllable video generations.\n" | |
| "**Preview = GIF (fast) · Batch = MP4 (ZIP)**\n\n" | |
| "**Duration lock**: Frames = duration(sec) × FPS (default FPS = 8)" | |
| ) | |
| # ——— Global Prompt Controls ——— | |
| fixed_prompt_tb = gr.Textbox( | |
| label="Fixed Content Prompt (공통 내용 한 줄)", | |
| value="Show a lone traveler in a rain-soaked neon alley, cinematic rim light, moody, dusk.", | |
| lines=3, | |
| placeholder="캐릭터/공간/조명/무드 한 줄을 넣어 주세요." | |
| ) | |
| combine_mode = gr.Radio( | |
| choices=["Fixed + Shot", "Shot-only", "Fixed-only"], | |
| value="Fixed + Shot", | |
| label="Prompt Combine Mode" | |
| ) | |
| lock_duration = gr.Checkbox( | |
| value=True, | |
| label="Lock Frames to duration × FPS" | |
| ) | |
| final_prompt_view = gr.Code( | |
| label="Final Prompt (debug)", | |
| interactive=False, | |
| language="markdown" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| preset = gr.Dropdown( | |
| choices=list(PRESET_MAP.keys()), | |
| value="Low • fastest (CPU)", | |
| label="Preset" | |
| ) | |
| shot_dropdown = gr.Dropdown( | |
| choices=SHOT_IDS, | |
| value=SHOT_IDS[0], | |
| label="Select Shot ID" | |
| ) | |
| with gr.Row(): | |
| num_frames = gr.Slider(8, 32, value=DEFAULT_NUM_FRAMES, step=1, label="Frames") | |
| steps = gr.Slider(8, 24, value=DEFAULT_STEPS, step=1, label="Steps") | |
| with gr.Row(): | |
| guidance = gr.Slider(1.0, 12.0, value=DEFAULT_GUIDANCE, step=0.5, label="Guidance") | |
| size = gr.Slider(224, 384, value=DEFAULT_SIZE, step=32, label="Resolution (square)") | |
| run_btn = gr.Button("Generate (GIF Preview)") | |
| info_json = gr.Code(label="Selected Shot JSON (features + prompt)", interactive=False, language="json") | |
| with gr.Column(scale=1): | |
| gif_out = gr.Image(label="Generated GIF Preview", type="filepath", interactive=False) | |
| file_out = gr.File(label="Download GIF", interactive=False) | |
| # ——— Batch ——— | |
| gr.Markdown("### Batch: Build a Proof Reel (MP4 + metadata.json → ZIP)") | |
| with gr.Row(): | |
| batch_select = gr.CheckboxGroup( | |
| choices=SHOT_IDS[:MAX_BATCH], | |
| value=SHOT_IDS[: min(30, MAX_BATCH)], | |
| label=f"Pick up to {MAX_BATCH} shots" | |
| ) | |
| build_btn = gr.Button("Build ZIP") | |
| zip_out = gr.File(label="Download Proof Reel (ZIP)", interactive=False) | |
| video_out = gr.Video(label="Preview MP4 (latest)", interactive=False) | |
| # ——— Self Test ——— | |
| gr.Markdown("### Self Test (IO only)") | |
| self_btn = gr.Button("Run Self Test") | |
| self_log = gr.Textbox(label="Self Test log", interactive=False) | |
| self_gif = gr.File(label="Test GIF", interactive=False) | |
| self_mp4 = gr.File(label="Test MP4", interactive=False) | |
| # ——— Wire up events ——— | |
| def apply_preset(name): | |
| p = PRESET_MAP[name] | |
| return p["frames"], p["steps"], p["guidance"], p["size"] | |
| preset.change(apply_preset, inputs=[preset], outputs=[num_frames, steps, guidance, size]) | |
| def _run_one(sid, nf, gs, st, sz, fxp, mode, lock): | |
| meta_json, gif_path = infer_one(sid, nf, gs, st, sz, fxp, mode, lock, DEFAULT_FPS) | |
| try: | |
| fp = json.loads(meta_json).get("final_prompt", "") | |
| except Exception: | |
| fp = "" | |
| return meta_json, gif_path, gif_path, fp | |
| run_btn.click( | |
| _run_one, | |
| inputs=[shot_dropdown, num_frames, guidance, steps, size, fixed_prompt_tb, combine_mode, lock_duration], | |
| outputs=[info_json, gif_out, file_out, final_prompt_view], | |
| ) | |
| def _run_batch(sids, nf, gs, st, sz, fxp, mode, lock): | |
| return infer_batch(sids, nf, gs, st, sz, fxp, mode, lock, DEFAULT_FPS) | |
| build_btn.click( | |
| _run_batch, | |
| inputs=[batch_select, num_frames, guidance, steps, size, fixed_prompt_tb, combine_mode, lock_duration], | |
| outputs=[zip_out, video_out] | |
| ) | |
| self_btn.click(self_test_io, inputs=[], outputs=[self_log, self_gif, self_mp4]) | |
| demo.queue(max_size=8) | |
| demo.launch(show_api=False, ssr_mode=False) | |