liiablack's picture
Update app.py
6fb044b verified
# app.py — Shot Grammar Adapter — Proof (clean + auth)
import os, json, tempfile, zipfile, random
from typing import List, Tuple
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
import traceback
import numpy as np
import gradio as gr
import imageio
import torch
CPU_THREADS = max(1, min(4, os.cpu_count() // 2))
torch.set_num_threads(CPU_THREADS)
os.environ["OMP_NUM_THREADS"] = str(CPU_THREADS)
os.environ.setdefault("MKL_NUM_THREADS", str(CPU_THREADS))
print(f"[Threads] torch={torch.get_num_threads()} OMP={os.getenv('OMP_NUM_THREADS')}")
# -----------------------------
# Config
# -----------------------------
MODEL_ID = os.getenv("VIDEO_MODEL_ID", "damo-vilab/text-to-video-ms-1.7b")
DATA_PATH = os.getenv("SHOTS_JSONL", "shots_public_subset.jsonl")
assert os.path.exists(DATA_PATH), f"Missing data file: {DATA_PATH}"
DEFAULT_NUM_FRAMES = int(os.getenv("DEF_FRAMES", 12))
DEFAULT_GUIDANCE = float(os.getenv("DEF_GUIDANCE", 6.0))
DEFAULT_STEPS = int(os.getenv("DEF_STEPS", 10))
DEFAULT_SIZE = int(os.getenv("DEF_SIZE", 256))
DEFAULT_FPS = int(os.getenv("DEF_FPS", 8))
MAX_BATCH = int(os.getenv("MAX_BATCH", 30))
# -----------------------------
# Data load
# -----------------------------
ROWS = []
with open(DATA_PATH, "r", encoding="utf-8") as f:
for line in f:
s = line.strip()
if not s:
continue
ROWS.append(json.loads(s))
INDEX = {r["shot_id"]: r for r in ROWS}
SHOT_IDS = list(INDEX.keys())
assert SHOT_IDS, "No shots found in JSONL."
def pretty_features(row: dict) -> dict:
feat = row.get("features", {})
return {
"ep_id": row.get("ep_id"),
"shot_id": row.get("shot_id"),
"size": feat.get("size"),
"angle": feat.get("angle"),
"motion": feat.get("motion"),
"relation": feat.get("relation"),
"duration": feat.get("duration"),
"prompt": row.get("prompt"),
}
# -----------------------------
# Pipeline (CPU-optimized)
# -----------------------------
from diffusers import DiffusionPipeline
from diffusers import DPMSolverMultistepScheduler
device = "cpu"
pipe = DiffusionPipeline.from_pretrained(MODEL_ID, dtype=torch.float32)
pipe.to("cpu")
if hasattr(pipe, "enable_attention_slicing"):
pipe.enable_attention_slicing()
if hasattr(pipe, "enable_vae_slicing"):
pipe.enable_vae_slicing()
if hasattr(pipe, "enable_sequential_cpu_offload") and torch.cuda.is_available():
pipe.enable_sequential_cpu_offload()
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
if hasattr(pipe, "set_progress_bar_config"):
pipe.set_progress_bar_config(disable=True)
CAMERA_LIB = {
("MS","eye","single","static"):
"medium shot, chest-up framing, eye-level, single subject, locked tripod, shallow depth of field, subject centered",
("MS","eye","ots","dolly-in"):
"over-the-shoulder framing, foreground shoulder soft-focus, background subject sharp, slow dolly-in, subtle parallax, conversational tension",
("WS","slight_high","group","static"):
"wide shot, slight high angle, group composition, static camera, staging clarity, environment emphasis",
("EWS","eye","group","static"):
"extreme wide establishing shot, horizon line at mid-height, environmental scale, small human figures",
}
def camera_tokens(row):
f = row.get("features", {})
key = (f.get("size"), f.get("angle"), f.get("relation"), f.get("motion"))
return CAMERA_LIB.get(key, "")
# -----------------------------
# Prompt compose
# -----------------------------
def compose_prompt(shot_row, fixed_prompt, mode="Fixed + Shot"):
shot_prompt = shot_row.get("prompt","").strip()
cam = camera_tokens(shot_row)
blocks = []
if mode in ("Fixed + Shot","Fixed-only") and fixed_prompt.strip():
blocks.append(fixed_prompt.strip())
if mode in ("Fixed + Shot","Shot-only") and shot_prompt:
blocks.append(shot_prompt)
if cam:
blocks.append(f"[CAMERA PRIOR] {cam}")
return "\n\n".join(blocks).strip()
def infer_one(shot_id: str, num_frames: int, guidance: float, steps: int, size: int,
fixed_prompt: str, combine_mode: str, lock_duration: bool, fps: int = DEFAULT_FPS) -> Tuple[str, str]:
row = INDEX[shot_id]
frames_eff = int(row.get("features", {}).get("duration", 0) * fps) if lock_duration else int(num_frames)
frames_eff = max(4, min(32, frames_eff))
final_prompt = compose_prompt(row, fixed_prompt, combine_mode)
generator = torch.Generator(device="cpu").manual_seed(42)
result = safe_pipe(
final_prompt,
num_frames=frames_eff,
steps=steps,
guidance=guidance,
size=size,
generator=generator,
)
frames = _result_to_frames(result)
gif_path = save_gif(frames, fps=fps, shot_id=shot_id)
meta = pretty_features(row)
meta["final_prompt"] = final_prompt
meta["frames_used"] = frames_eff
meta_json = json.dumps(meta, ensure_ascii=False, indent=2)
return meta_json, gif_path
def safe_pipe(prompt, num_frames, steps, guidance, size, generator=None, negative_prompt=None):
kw = dict(
prompt=prompt, num_frames=int(num_frames),
num_inference_steps=int(steps), guidance_scale=float(guidance),
height=int(size), width=int(size), generator=generator
)
if "negative_prompt" in pipe.__call__.__code__.co_varnames:
kw["negative_prompt"] = negative_prompt
return pipe(**kw)
# -----------------------------
# Utils
# -----------------------------
def _result_to_frames(result):
import numpy as np
from PIL import Image
def _to_uint8(arr):
arr = np.asarray(arr)
if arr.dtype == np.uint8:
return arr
a_min, a_max = float(arr.min()), float(arr.max())
if a_min >= -1.0 and a_max <= 1.0:
arr = ((arr + 1.0) * 127.5).clip(0, 255).astype(np.uint8)
return arr
if a_min >= 0.0 and a_max <= 1.0:
arr = (arr * 255.0).clip(0, 255).astype(np.uint8)
return arr
return arr.clip(0, 255).astype(np.uint8)
def _as_list_of_arrays(x):
if isinstance(x, (list, tuple)):
out = []
for f in x:
if isinstance(f, Image.Image):
out.append(np.array(f))
elif isinstance(f, np.ndarray):
out.append(f)
else:
raise TypeError(f"Unsupported frame element type: {type(f)}")
return out
if isinstance(x, np.ndarray):
# 허용 형태:
# 3D: (H, W, C)
# 4D: (T, H, W, C) 또는 (T, C, H, W)
# 5D: (B, T, H, W, C) 또는 (B, T, C, H, W)
if x.ndim == 3:
return [x]
if x.ndim == 4:
if x.shape[-1] in (3, 4): # (T, H, W, C)
return [f for f in x]
if x.shape[1] in (3, 4): # (T, C, H, W)
x = np.transpose(x, (0, 2, 3, 1))
return [f for f in x]
raise ValueError(f"Unexpected ndarray 4D shape: {x.shape}")
if x.ndim == 5:
# (B, T, H, W, C) → (B*T, H, W, C)
if x.shape[-1] in (3, 4) and x.shape[2] == x.shape[3]:
x = x.reshape(-1, x.shape[2], x.shape[3], x.shape[4])
return [f for f in x]
# (B, T, C, H, W) → (B, T, H, W, C) → (B*T, H, W, C)
if x.shape[2] in (3, 4):
x = np.transpose(x, (0, 1, 3, 4, 2)).reshape(-1, x.shape[3], x.shape[4], x.shape[2])
return [f for f in x]
# (1, T, H, W, C) 같은 경우는 위에서 커버되지만, 혹시 모를 싱글톤 압축
if x.shape[0] == 1:
return _as_list_of_arrays(x[0])
raise ValueError(f"Unexpected ndarray 5D shape: {x.shape}")
raise ValueError(f"Unexpected ndarray shape: {x.shape}")
try:
import torch
if isinstance(x, torch.Tensor):
t = x.detach().cpu()
# 허용 형태:
# 3D: (H, W, C) 또는 (C, H, W)
# 4D: (T, H, W, C) / (T, C, H, W)
# 5D: (B, T, H, W, C) / (B, T, C, H, W)
if t.ndim == 5:
# (B, T, C, H, W) → (B, T, H, W, C)
if t.shape[2] in (1, 3, 4):
t = t.permute(0, 1, 3, 4, 2).contiguous()
# (B, T, H, W, C) 가정 → (B*T, H, W, C)
t = t.reshape(-1, t.shape[2], t.shape[3], t.shape[4])
arr = t.numpy()
return [f for f in arr]
if t.ndim == 4:
# (T, C, H, W) → (T, H, W, C)
if t.shape[1] in (1, 3, 4):
t = t.permute(0, 2, 3, 1).contiguous()
arr = t.numpy()
return [f for f in arr]
if t.ndim == 3:
# (C, H, W) → (H, W, C)
if t.shape[0] in (1, 3, 4):
t = t.permute(1, 2, 0).contiguous()
else:
t = t.unsqueeze(0) # (H, W, C?) → (1, H, W, C?)
arr = t.numpy()
return [f for f in arr]
except Exception:
pass
if isinstance(x, Image.Image):
return [np.array(x)]
candidates = []
if isinstance(result, dict):
for key in ("frames", "images", "frames_list", "videos"):
if key in result and result[key] is not None:
candidates = result[key]
break
else:
for attr in ("frames", "images", "videos"):
if hasattr(result, attr):
candidates = getattr(result, attr)
if candidates is not None:
break
if candidates is not None and candidates != []:
return _as_list_of_arrays(candidates)
raise TypeError(f"Unsupported result type: {type(x)}")
candidate = None
if isinstance(result, dict):
for k in ("frames", "images", "frames_list", "videos"):
if k in result and result[k] is not None:
candidate = result[k]; break
else:
for attr in ("frames", "images", "videos"):
if hasattr(result, attr):
v = getattr(result, attr)
if v is not None:
candidate = v; break
if candidate is None:
candidate = result
arr_list = _as_list_of_arrays(candidate)
out = []
for arr in arr_list:
arr = _to_uint8(arr)
if arr.ndim == 2:
arr = np.stack([arr] * 3, axis=-1)
if arr.ndim != 3 or arr.shape[2] not in (3, 4):
raise gr.Error(f"Unexpected frame shape: {arr.shape}. Expected HxWx3(or 4).")
out.append(Image.fromarray(arr))
if len(out) == 0:
raise gr.Error("No frames generated. Try lowering Frames/Steps/Resolution or add content tokens.")
return out
def save_gif(frames, fps: int = DEFAULT_FPS, shot_id: str = "clip") -> str:
import numpy as np
from PIL import Image
frames_np = []
for f in frames:
if isinstance(f, Image.Image):
f = np.array(f.convert("RGB"))
elif isinstance(f, np.ndarray) and f.ndim == 2:
f = np.stack([f]*3, axis=-1)
frames_np.append(f.astype(np.uint8))
tmpdir = tempfile.mkdtemp()
out_path = os.path.join(tmpdir, f"{shot_id}.gif")
imageio.mimsave(out_path, frames_np, duration=1.0 / fps)
return out_path
def save_mp4(frames, fps: int = DEFAULT_FPS, shot_id: str = "clip") -> str:
import numpy as np
from PIL import Image
tmpdir = tempfile.mkdtemp()
out_path = os.path.join(tmpdir, f"{shot_id}.mp4")
try:
writer = imageio.get_writer(out_path, format="FFMPEG", fps=fps, codec="mpeg4", quality=6)
except Exception:
writer = imageio.get_writer(out_path, format="FFMPEG", fps=fps, codec="libx264", quality=6)
for f in frames:
if isinstance(f, Image.Image):
f = np.array(f.convert("RGB"))
elif isinstance(f, np.ndarray) and f.ndim == 2:
f = np.stack([f]*3, axis=-1)
writer.append_data(f.astype(np.uint8))
writer.close()
return out_path
# -----------------------------
# Inference
# -----------------------------
def infer_batch(shot_ids: List[str], num_frames: int, guidance: float, steps: int, size: int,
fixed_prompt: str, combine_mode: str, lock_duration: bool, fps: int = DEFAULT_FPS) -> str:
if not shot_ids:
raise gr.Error("Pick at least one shot for batch.")
outputs, metas = [], []
for i, sid in enumerate(shot_ids):
row = INDEX[sid]
frames_eff = int(row.get("features", {}).get("duration", 0) * fps) if lock_duration else int(num_frames)
frames_eff = max(4, min(32, frames_eff))
final_prompt = compose_prompt(row, fixed_prompt, combine_mode)
generator = torch.Generator(device="cpu").manual_seed(42 + i)
result = safe_pipe(
final_prompt,
num_frames=frames_eff,
steps=steps,
guidance=guidance,
size=size,
generator=generator,
)
frames = _result_to_frames(result)
mp4_path = save_mp4(frames, fps=fps, shot_id=sid)
outputs.append(mp4_path)
m = pretty_features(row)
m["final_prompt"] = final_prompt
m["frames_used"] = frames_eff
metas.append(m)
preview_mp4 = outputs[0] if outputs else None
zip_path = tempfile.mktemp(suffix=".zip")
with zipfile.ZipFile(zip_path, "w") as z:
for p in outputs:
z.write(p, arcname=os.path.basename(p))
meta_path = tempfile.mktemp(suffix=".json")
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(metas, f, ensure_ascii=False, indent=2)
z.write(meta_path, arcname="metadata.json")
return zip_path, preview_mp4
def self_test_io():
import numpy as np, os
H, W = 256, 256
frames = []
x = np.linspace(0, 255, W, dtype=np.uint8)[None, :].repeat(H, axis=0)
for t in range(8):
r = x
g = np.roll(x, t*12, axis=1)
b = np.flipud(x)
rgb = np.stack([r, g, b], axis=-1)
frames.append(rgb)
gif = save_gif(frames, fps=8, shot_id="selftest")
mp4 = save_mp4(frames, fps=8, shot_id="selftest")
return f"Self Test OK — GIF:{os.path.basename(gif)} MP4:{os.path.basename(mp4)}", gif, mp4
# -----------------------------
# UI
# -----------------------------
PRESET_MAP = {
"Low • fastest (CPU)": dict(frames=8, steps=8, guidance=5.5, size=224),
"Med • balanced": dict(frames=12, steps=10, guidance=6.5, size=256),
"High • slower": dict(frames=16, steps=12, guidance=7.0, size=320),
}
with gr.Blocks(title="Shot Grammar Adapter — Proof") as demo:
gr.Markdown(
"🔒 **Shot Grammar Adapter — Proof** \n"
"Team-only proof page. If you need access, contact the owner.\n\n"
"# Shot Grammar Adapter — Proof\n"
"Turn shot grammar JSON into controllable video generations.\n"
"**Preview = GIF (fast) · Batch = MP4 (ZIP)**\n\n"
"**Duration lock**: Frames = duration(sec) × FPS (default FPS = 8)"
)
# ——— Global Prompt Controls ———
fixed_prompt_tb = gr.Textbox(
label="Fixed Content Prompt (공통 내용 한 줄)",
value="Show a lone traveler in a rain-soaked neon alley, cinematic rim light, moody, dusk.",
lines=3,
placeholder="캐릭터/공간/조명/무드 한 줄을 넣어 주세요."
)
combine_mode = gr.Radio(
choices=["Fixed + Shot", "Shot-only", "Fixed-only"],
value="Fixed + Shot",
label="Prompt Combine Mode"
)
lock_duration = gr.Checkbox(
value=True,
label="Lock Frames to duration × FPS"
)
final_prompt_view = gr.Code(
label="Final Prompt (debug)",
interactive=False,
language="markdown"
)
with gr.Row():
with gr.Column(scale=1):
preset = gr.Dropdown(
choices=list(PRESET_MAP.keys()),
value="Low • fastest (CPU)",
label="Preset"
)
shot_dropdown = gr.Dropdown(
choices=SHOT_IDS,
value=SHOT_IDS[0],
label="Select Shot ID"
)
with gr.Row():
num_frames = gr.Slider(8, 32, value=DEFAULT_NUM_FRAMES, step=1, label="Frames")
steps = gr.Slider(8, 24, value=DEFAULT_STEPS, step=1, label="Steps")
with gr.Row():
guidance = gr.Slider(1.0, 12.0, value=DEFAULT_GUIDANCE, step=0.5, label="Guidance")
size = gr.Slider(224, 384, value=DEFAULT_SIZE, step=32, label="Resolution (square)")
run_btn = gr.Button("Generate (GIF Preview)")
info_json = gr.Code(label="Selected Shot JSON (features + prompt)", interactive=False, language="json")
with gr.Column(scale=1):
gif_out = gr.Image(label="Generated GIF Preview", type="filepath", interactive=False)
file_out = gr.File(label="Download GIF", interactive=False)
# ——— Batch ———
gr.Markdown("### Batch: Build a Proof Reel (MP4 + metadata.json → ZIP)")
with gr.Row():
batch_select = gr.CheckboxGroup(
choices=SHOT_IDS[:MAX_BATCH],
value=SHOT_IDS[: min(30, MAX_BATCH)],
label=f"Pick up to {MAX_BATCH} shots"
)
build_btn = gr.Button("Build ZIP")
zip_out = gr.File(label="Download Proof Reel (ZIP)", interactive=False)
video_out = gr.Video(label="Preview MP4 (latest)", interactive=False)
# ——— Self Test ———
gr.Markdown("### Self Test (IO only)")
self_btn = gr.Button("Run Self Test")
self_log = gr.Textbox(label="Self Test log", interactive=False)
self_gif = gr.File(label="Test GIF", interactive=False)
self_mp4 = gr.File(label="Test MP4", interactive=False)
# ——— Wire up events ———
def apply_preset(name):
p = PRESET_MAP[name]
return p["frames"], p["steps"], p["guidance"], p["size"]
preset.change(apply_preset, inputs=[preset], outputs=[num_frames, steps, guidance, size])
def _run_one(sid, nf, gs, st, sz, fxp, mode, lock):
meta_json, gif_path = infer_one(sid, nf, gs, st, sz, fxp, mode, lock, DEFAULT_FPS)
try:
fp = json.loads(meta_json).get("final_prompt", "")
except Exception:
fp = ""
return meta_json, gif_path, gif_path, fp
run_btn.click(
_run_one,
inputs=[shot_dropdown, num_frames, guidance, steps, size, fixed_prompt_tb, combine_mode, lock_duration],
outputs=[info_json, gif_out, file_out, final_prompt_view],
)
def _run_batch(sids, nf, gs, st, sz, fxp, mode, lock):
return infer_batch(sids, nf, gs, st, sz, fxp, mode, lock, DEFAULT_FPS)
build_btn.click(
_run_batch,
inputs=[batch_select, num_frames, guidance, steps, size, fixed_prompt_tb, combine_mode, lock_duration],
outputs=[zip_out, video_out]
)
self_btn.click(self_test_io, inputs=[], outputs=[self_log, self_gif, self_mp4])
demo.queue(max_size=8)
demo.launch(show_api=False, ssr_mode=False)