| """ |
| Video processing utilities for EceMotion Pictures. |
| Enhanced text-to-video generation with robust error handling and fallbacks. |
| """ |
|
|
| import numpy as np |
| import logging |
| import os |
| import shutil |
| from typing import Optional, Tuple, List |
| from pathlib import Path |
|
|
| from config import ( |
| MODEL_VIDEO, MODEL_CONFIGS, get_device, VHS_INTENSITY, SCANLINE_OPACITY, |
| CHROMATIC_ABERRATION, FILM_GRAIN, get_safe_model_name |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| t2v_pipe = None |
| current_model = None |
|
|
| def get_t2v_pipe(device: str, model_name: str = MODEL_VIDEO): |
| """Get or create T2V pipeline with lazy loading and model switching.""" |
| global t2v_pipe, current_model |
| |
| |
| safe_model_name = get_safe_model_name(model_name, "video") |
| |
| if t2v_pipe is None or current_model != safe_model_name: |
| logger.info(f"Loading T2V model: {safe_model_name}") |
| |
| try: |
| if "cogvideox" in safe_model_name.lower(): |
| |
| t2v_pipe = _load_cogvideox(safe_model_name, device) |
| else: |
| |
| t2v_pipe = _load_standard_t2v(safe_model_name, device) |
| |
| if t2v_pipe is not None: |
| current_model = safe_model_name |
| logger.info(f"T2V model {safe_model_name} loaded successfully") |
| else: |
| raise RuntimeError("Failed to load any T2V model") |
| |
| except Exception as e: |
| logger.error(f"Failed to load {safe_model_name}: {e}") |
| |
| t2v_pipe = _load_standard_t2v("damo-vilab/text-to-video-ms-1.7b", device) |
| current_model = "damo-vilab/text-to-video-ms-1.7b" |
| |
| return t2v_pipe |
|
|
| def _load_cogvideox(model_name: str, device: str): |
| """Load CogVideoX model.""" |
| try: |
| from diffusers import CogVideoXPipeline |
| |
| pipe = CogVideoXPipeline.from_pretrained( |
| model_name, |
| torch_dtype="auto", |
| trust_remote_code=True |
| ) |
| |
| if device == "cuda": |
| pipe = pipe.to(device) |
| |
| return pipe |
| |
| except Exception as e: |
| logger.error(f"Failed to load CogVideoX: {e}") |
| return None |
|
|
| def _load_standard_t2v(model_name: str, device: str): |
| """Load standard T2V model.""" |
| try: |
| from diffusers import TextToVideoSDPipeline |
| import torch |
| |
| |
| if device == "auto": |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
| pipe = TextToVideoSDPipeline.from_pretrained( |
| model_name, |
| torch_dtype=torch.float16 if device == "cuda" else torch.float32 |
| ) |
| |
| if device == "cuda": |
| pipe = pipe.to(device) |
| |
| return pipe |
| |
| except Exception as e: |
| logger.error(f"Failed to load standard T2V: {e}") |
| return None |
|
|
| def synth_t2v(prompt: str, seed: int, num_frames: int = 16, fps: int = 8, |
| device: str = None, model_name: str = MODEL_VIDEO): |
| """ |
| Generate text-to-video with enhanced model support and frame control. |
| """ |
| if device is None: |
| device = get_device() |
| |
| pipe = get_t2v_pipe(device, model_name) |
| model_config = MODEL_CONFIGS.get(current_model, {}) |
| |
| |
| max_frames = model_config.get("max_frames", 16) |
| min_frames = model_config.get("min_frames", 8) |
| num_frames = max(min_frames, min(num_frames, max_frames)) |
| |
| |
| if num_frames > 16: |
| num_frames = 16 |
| logger.info(f"Reduced frame count to {num_frames} for ZeroGPU compatibility") |
| |
| logger.info(f"Generating {num_frames} frames at {fps}fps with {current_model}") |
| |
| try: |
| |
| import torch |
| generator = torch.Generator(device=device).manual_seed(seed) |
| |
| |
| if "cogvideox" in current_model.lower(): |
| |
| result = pipe( |
| prompt=prompt, |
| num_frames=num_frames, |
| generator=generator, |
| guidance_scale=5.0, |
| num_inference_steps=10 |
| ) |
| frames = result.frames |
| else: |
| |
| result = pipe( |
| prompt=prompt, |
| num_frames=num_frames, |
| generator=generator |
| ) |
| frames = result.frames |
| |
| |
| frame_arrays = [np.array(frame) for frame in frames] |
| |
| |
| from moviepy.editor import ImageSequenceClip |
| clip = ImageSequenceClip(frame_arrays, fps=fps) |
| |
| logger.info(f"Generated video clip: {clip.duration:.2f}s, {len(frame_arrays)} frames") |
| return clip |
| |
| except Exception as e: |
| logger.error(f"Video generation failed: {e}") |
| |
| return _create_fallback_clip(prompt, num_frames, fps) |
|
|
| def _create_fallback_clip(prompt: str, num_frames: int, fps: int): |
| """Create a simple fallback clip when video generation fails.""" |
| try: |
| from moviepy.editor import ColorClip |
| |
| |
| duration = num_frames / fps |
| clip = ColorClip(size=(640, 480), color=(100, 50, 200), duration=duration) |
| |
| logger.info(f"Created fallback clip: {clip.duration:.2f}s") |
| return clip |
| |
| except Exception as e: |
| logger.error(f"Failed to create fallback clip: {e}") |
| |
| from moviepy.editor import ColorClip |
| return ColorClip(size=(640, 480), color=(100, 50, 200), duration=5.0) |
|
|
| def apply_retro_filters(input_path: str, output_path: str, intensity: float = VHS_INTENSITY): |
| """ |
| Apply authentic VHS/CRT effects with enhanced visual artifacts. |
| """ |
| logger.info(f"Applying retro filters with intensity {intensity}") |
| |
| |
| if not _check_ffmpeg(): |
| logger.warning("ffmpeg not available, using simple filter") |
| _apply_simple_retro_filters(input_path, output_path) |
| return |
| |
| try: |
| |
| filters = [] |
| |
| |
| filters.append('format=yuv420p') |
| |
| |
| filters.append(f'hue=s={0.8 + 0.2 * intensity}') |
| filters.append(f'eq=brightness={0.02 * intensity}:contrast={1.0 + 0.1 * intensity}:saturation={1.0 + 0.2 * intensity}:gamma={1.0 - 0.05 * intensity}') |
| |
| |
| if intensity > 0.3: |
| filters.append(f'tblend=all_mode=difference:all_opacity={0.05 * intensity}') |
| filters.append(f'noise=alls={int(20 * intensity)}:allf=t') |
| |
| |
| if FILM_GRAIN > 0: |
| grain = FILM_GRAIN * intensity |
| filters.append(f'noise=alls={int(15 * grain)}:allf=u') |
| |
| |
| filters.append(f'vignette=PI/4:{0.3 * intensity}') |
| |
| |
| import ffmpeg |
| |
| stream = ffmpeg.input(input_path) |
| |
| |
| if len(filters) > 1: |
| |
| for filter_str in filters: |
| if filter_str == 'format=yuv420p': |
| stream = stream.filter('format', 'yuv420p') |
| elif 'hue=' in filter_str: |
| s_val = filter_str.split('s=')[1] |
| stream = stream.filter('hue', s=float(s_val)) |
| elif 'eq=' in filter_str: |
| |
| eq_params = filter_str.split('eq=')[1] |
| parts = eq_params.split(':') |
| brightness = float(parts[0].split('=')[1]) if 'brightness=' in parts[0] else 0 |
| contrast = float(parts[1].split('=')[1]) if 'contrast=' in parts[1] else 1 |
| saturation = float(parts[2].split('=')[1]) if 'saturation=' in parts[2] else 1 |
| stream = stream.filter('eq', brightness=brightness, contrast=contrast, saturation=saturation) |
| elif 'noise=' in filter_str: |
| alls_val = int(filter_str.split('alls=')[1].split(':')[0]) |
| stream = stream.filter('noise', alls=alls_val) |
| elif 'vignette=' in filter_str: |
| angle = float(filter_str.split('vignette=')[1].split(':')[0]) |
| strength = float(filter_str.split(':')[1]) |
| stream = stream.filter('vignette', angle=angle, strength=strength) |
| else: |
| stream = stream.filter('format', 'yuv420p') |
| |
| |
| stream = stream.output( |
| output_path, |
| vcodec='libx264', |
| pix_fmt='yuv420p', |
| crf=20, |
| preset='medium', |
| movflags='+faststart' |
| ) |
| |
| stream.overwrite_output().run(quiet=True) |
| logger.info("Retro filters applied successfully") |
| |
| except Exception as e: |
| logger.error(f"Failed to apply retro filters: {e}") |
| |
| _apply_simple_retro_filters(input_path, output_path) |
|
|
| def _check_ffmpeg() -> bool: |
| """Check if ffmpeg is available.""" |
| try: |
| import subprocess |
| subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) |
| return True |
| except (subprocess.CalledProcessError, FileNotFoundError): |
| return False |
|
|
| def _apply_simple_retro_filters(input_path: str, output_path: str): |
| """Fallback simple retro filter application.""" |
| try: |
| import ffmpeg |
| |
| ( |
| ffmpeg |
| .input(input_path) |
| .filter('format', 'yuv420p') |
| .filter('tblend', all_mode='difference', all_opacity=0.05) |
| .filter('hue', s=0.9) |
| .filter('eq', brightness=0.02, contrast=1.05, saturation=1.1, gamma=0.98) |
| .filter('noise', alls=10) |
| .output(output_path, vcodec='libx264', pix_fmt='yuv420p', crf=20, movflags='+faststart') |
| .overwrite_output() |
| .run(quiet=True) |
| ) |
| logger.info("Simple retro filters applied as fallback") |
| except Exception as e: |
| logger.error(f"Even simple retro filters failed: {e}") |
| |
| shutil.copy2(input_path, output_path) |
|
|
| def mux_audio(video_in: str, audio_in: str, out_path: str): |
| """Mux video and audio with error handling.""" |
| try: |
| if _check_ffmpeg(): |
| _mux_with_ffmpeg(video_in, audio_in, out_path) |
| else: |
| _mux_with_moviepy(video_in, audio_in, out_path) |
| except Exception as e: |
| logger.error(f"Audio muxing failed: {e}") |
| |
| shutil.copy2(video_in, out_path) |
|
|
| def _mux_with_ffmpeg(video_in: str, audio_in: str, out_path: str): |
| """Mux using ffmpeg.""" |
| import ffmpeg |
| |
| ( |
| ffmpeg |
| .input(video_in) |
| .input(audio_in) |
| .output(out_path, vcodec='copy', acodec='aac', audio_bitrate='128k', movflags='+faststart') |
| .overwrite_output() |
| .run(quiet=True) |
| ) |
|
|
| def _mux_with_moviepy(video_in: str, audio_in: str, out_path: str): |
| """Mux using moviepy (fallback).""" |
| from moviepy.editor import VideoFileClip, AudioFileClip |
| |
| |
| video = VideoFileClip(video_in) |
| audio = AudioFileClip(audio_in) |
| |
| |
| if audio.duration > video.duration: |
| audio = audio.subclip(0, video.duration) |
| elif audio.duration < video.duration: |
| |
| from moviepy.audio.AudioClip import AudioClip |
| silence = AudioClip(lambda t: 0, duration=video.duration - audio.duration) |
| audio = audio.concatenate_audioclips([audio, silence]) |
| |
| |
| final_video = video.set_audio(audio) |
| final_video.write_videofile( |
| out_path, |
| codec='libx264', |
| audio_codec='aac', |
| temp_audiofile='temp-audio.m4a', |
| remove_temp=True, |
| verbose=False, |
| logger=None |
| ) |
| |
| |
| video.close() |
| audio.close() |
| final_video.close() |
|
|
|
|