File size: 4,619 Bytes
70b0d1a
 
 
 
46c3ea1
70b0d1a
46c3ea1
70b0d1a
 
 
 
 
46c3ea1
70b0d1a
 
 
 
 
46c3ea1
 
 
 
 
 
 
 
70b0d1a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46c3ea1
 
 
 
 
 
 
 
 
 
 
 
70b0d1a
46c3ea1
 
 
 
 
 
70b0d1a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from __future__ import annotations

import os
import tempfile
from typing import Dict, List, Optional, Sequence, Tuple

from huggingface_hub import InferenceClient

from .models import SceneBeat, Storyboard
from .placeholders import create_placeholder_video

DEFAULT_VIDEO_MODELS = [
    "Wan-AI/Wan2.2-TI2V-5B",
    "Lightricks/LTX-Video-0.9.7-distilled",
    "tencent/HunyuanVideo-1.5",
    "THUDM/CogVideoX-5b",
]

MODEL_PROVIDER_OVERRIDES: Dict[str, Optional[str]] = {
    "Wan-AI/Wan2.2-TI2V-5B": "fal-ai",
}

MIN_FRAMES = 16
MAX_FRAMES = 240
FRAMES_PER_SECOND = 8


class VideoDirector:
    def __init__(
        self,
        token: Optional[str] = None,
        models: Optional[Sequence[str]] = None,
    ):
        env_token = (
            token
            or os.environ.get("HF_TOKEN")
            or os.environ.get("HUGGINGFACEHUB_API_TOKEN")
            or os.environ.get("HUGGING_FACE_HUB_TOKEN")
        )
        self.token = env_token
        self.models = list(models or DEFAULT_VIDEO_MODELS)

    def render(self, storyboard: Storyboard) -> Tuple[str, List[str]]:
        logs: List[str] = []
        clip_paths: List[str] = []
        for scene in storyboard.scenes:
            video = self._produce_scene(storyboard, scene, logs)
            clip_paths.append(video)
        final_cut = self._merge_clips(clip_paths, logs)
        return final_cut, logs

    def _produce_scene(self, storyboard: Storyboard, scene: SceneBeat, logs: List[str]) -> str:
        composed_prompt = self._compose_prompt(storyboard, scene)
        if self.token:
            for model in self.models:
                try:
                    clip = self._call_hf_inference(composed_prompt, model, scene.duration)
                    logs.append(f"Scene {scene.scene_id}: generated via {model}")
                    return clip
                except Exception as exc:
                    logs.append(f"Scene {scene.scene_id}: {model} failed ({exc})")
        clip = create_placeholder_video(scene, storyboard.style)
        logs.append(f"Scene {scene.scene_id}: fallback placeholder clip used.")
        return clip

    def _call_hf_inference(self, prompt: str, model_id: str, duration: int) -> str:
        if not self.token:
            raise RuntimeError("Missing Hugging Face token")
        client = self._build_client(model_id)
        frames = max(MIN_FRAMES, min(MAX_FRAMES, int(duration * FRAMES_PER_SECOND)))
        video_bytes = client.text_to_video(
            prompt,
            model=model_id,
            num_frames=frames,
        )
        tmp_dir = tempfile.mkdtemp(prefix="cinegen-video-")
        path = os.path.join(tmp_dir, f"{model_id.split('/')[-1]}.mp4")
        with open(path, "wb") as handle:
            handle.write(video_bytes)
        return path

    def _build_client(self, model_id: str) -> InferenceClient:
        provider = MODEL_PROVIDER_OVERRIDES.get(model_id)
        kwargs = {"token": self.token}
        if provider:
            kwargs["provider"] = provider
        return InferenceClient(**kwargs)

    @staticmethod
    def _compose_prompt(storyboard: Storyboard, scene: SceneBeat) -> str:
        characters = "; ".join(scene.characters)
        return (
            f"Title: {storyboard.title}. Style: {storyboard.style}. "
            f"Scene {scene.scene_id} - {scene.title}: {scene.action} "
            f"Visual cues: {scene.visuals}. Mood: {scene.mood}. "
            f"Camera: {scene.camera}. Characters: {characters or 'solo sequence'}."
        )

    def _merge_clips(self, clip_paths: Sequence[str], logs: List[str]) -> str:
        try:
            from moviepy.editor import VideoFileClip, concatenate_videoclips  # type: ignore
        except Exception as exc:
            logs.append(f"MoviePy unavailable ({exc}); returning first clip only.")
            return clip_paths[0]

        clips = []
        for path in clip_paths:
            try:
                clip = VideoFileClip(path)
                clips.append(clip)
            except Exception as exc:
                logs.append(f"Failed to read clip {path}: {exc}")
        if not clips:
            raise RuntimeError("No clips to merge")
        final = concatenate_videoclips(clips, method="compose")
        tmp_dir = tempfile.mkdtemp(prefix="cinegen-final-")
        final_path = os.path.join(tmp_dir, "cinegen_short.mp4")
        final.write_videofile(final_path, fps=clips[0].fps, codec="libx264", audio=False, verbose=False, logger=None)
        for clip in clips:
            clip.close()
        logs.append(f"Merged {len(clips)} clips into final cut.")
        return final_path