import os import sys import time import logging import datetime from pathlib import Path from typing import Optional, Tuple, List, Union import warnings warnings.filterwarnings("ignore") import numpy as np from PIL import Image, ImageDraw, ImageFont import imageio import imageio_ffmpeg logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s') logger = logging.getLogger("LegionVideo") # Output directory OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "outputs") os.makedirs(OUTPUT_DIR, exist_ok=True) # Model directories MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models") T2V_MODEL_DIR = os.path.join(MODEL_DIR, "t2v") I2V_MODEL_DIR = os.path.join(MODEL_DIR, "i2v") # Constants DEFAULT_NEGATIVE_PROMPT = "" class MockVideoGenerator: def __init__(self): self.device = "cpu" logger.info("MockVideoGenerator initialized - will create test pattern videos") def generate_video(self, prompt: str, num_frames: int, width: int, height: int) -> np.ndarray: frames = [] for i in range(num_frames): frame = np.zeros((height, width, 3), dtype=np.uint8) progress = i / max(num_frames - 1, 1) # Moving color bar bar_x = int(progress * (width - width // 4)) frame[:, bar_x:bar_x + width // 4] = [ int(128 + 127 * np.sin(progress * 4)), int(128 + 127 * np.sin(progress * 4 + 2)), int(128 + 127 * np.sin(progress * 4 + 4)) ] # Text overlay with prompt frame_pil = Image.fromarray(frame) draw = ImageDraw.Draw(frame_pil) draw.text((10, 10), prompt, fill=(255, 255, 255)) draw.text((10, height - 30), f"LEGION AI | Frame {i+1}/{num_frames}", fill=(200, 200, 200)) frames.append(np.array(frame_pil)) return np.stack(frames) class LegionVideoGenerator: """LEGION Video Generator - High-quality video generation system. Features: - Text-to-Video generation - Image-to-Video generation - Temporal enhancement for smooth frame transitions - QWatermark system (configurable quality watermark overlay) - CPU fallback with mock generation when GPU/model unavailable """ def __init__(self, model_path: Optional[str] = None): self.device = self._detect_device() self.pipe_t2v = None self.pipe_i2v = None self.mock_mode = False self.mock_gen = None logger.info(f"LEGION Video Generator initializing (device: {self.device})") # Try loading real models if not self._load_models(model_path): logger.warning("Real model loading failed - using mock generator fallback") self.mock_mode = True self.mock_gen = MockVideoGenerator() logger.info("LEGION Video Generator initialized successfully") def _detect_device(self) -> str: try: import torch if torch.cuda.is_available(): logger.info(f"GPU detected: {torch.cuda.get_device_name(0)}") return "cuda" except Exception: pass logger.info("No GPU detected - using CPU") return "cpu" def _check_memory_sufficient(self) -> bool: try: import psutil available_gb = psutil.virtual_memory().available / (1024 ** 3) logger.info(f"Available system RAM: {available_gb:.1f} GB") if available_gb < 20.0: logger.warning( f"Insufficient RAM ({available_gb:.1f} GB < 20 GB required) " f"to load 8.3B parameter model - using mock fallback" ) return False return True except ImportError: try: with open('/proc/meminfo', 'r') as f: for line in f: if 'MemAvailable' in line: available_kb = int(line.split()[1]) available_gb = available_kb / (1024 * 1024) logger.info(f"Available system RAM: {available_gb:.1f} GB") if available_gb < 20.0: logger.warning( f"Insufficient RAM ({available_gb:.1f} GB < 20 GB) - using mock" ) return False return True except Exception as e: logger.warning(f"Cannot check RAM: {e}") logger.warning("Cannot check RAM - defaulting to mock mode on CPU") return False def _load_models(self, model_path: Optional[str] = None) -> bool: try: from diffusers import HunyuanVideo15Pipeline except ImportError as e: logger.warning(f"Required modules not available: {e}") return False # On CPU, check if we have enough memory first if self.device == "cpu": if not self._check_memory_sufficient(): return False # Try T2V model from local path only t2v_path = model_path or T2V_MODEL_DIR try: if os.path.exists(os.path.join(t2v_path, "model_index.json")): logger.info(f"Loading T2V model from local path: {t2v_path}") self.pipe_t2v = HunyuanVideo15Pipeline.from_pretrained( t2v_path, torch_dtype=torch.float32, ) else: logger.warning(f"T2V model not found at {t2v_path}") return False # Enable memory optimizations if self.pipe_t2v is not None: self.pipe_t2v.enable_model_cpu_offload() if hasattr(self.pipe_t2v, 'vae') and hasattr(self.pipe_t2v.vae, 'enable_tiling'): self.pipe_t2v.vae.enable_tiling() self.pipe_t2v.enable_attention_slicing() except Exception as e: logger.warning(f"Could not load T2V model: {e}") # Try I2V model from local path only try: i2v_path = I2V_MODEL_DIR if os.path.exists(os.path.join(i2v_path, "model_index.json")): logger.info(f"Loading I2V model from local path: {i2v_path}") self.pipe_i2v = HunyuanVideo15Pipeline.from_pretrained( i2v_path, torch_dtype=torch.float32, ) # Enable memory optimizations on I2V if self.pipe_i2v is not None: self.pipe_i2v.enable_model_cpu_offload() if hasattr(self.pipe_i2v, 'vae') and hasattr(self.pipe_i2v.vae, 'enable_tiling'): self.pipe_i2v.vae.enable_tiling() self.pipe_i2v.enable_attention_slicing() except Exception as e: logger.warning(f"Could not load I2V model: {e}") return self.pipe_t2v is not None or self.pipe_i2v is not None def generate_from_text( self, prompt: str, negative_prompt: str = DEFAULT_NEGATIVE_PROMPT, num_frames: int = 49, width: int = 480, height: int = 480, num_inference_steps: int = 50, guidance_scale: float = 6.0, watermark_strength: float = 0.0, seed: Optional[int] = None, ) -> str: """Generate a video from a text prompt. Args: prompt: Text description of the video to generate negative_prompt: Things to avoid in the video num_frames: Number of frames to generate (1-129) width, height: Video resolution num_inference_steps: Diffusion inference steps guidance_scale: Classifier-free guidance scale watermark_strength: QWatermark opacity (0.0 = none, 1.0 = full) seed: Random seed for reproducibility Returns: Path to the generated MP4 file """ logger.info(f"T2V: '{prompt[:60]}...' ({num_frames}f, {width}x{height}, {num_inference_steps}steps)") if self.mock_mode: return self._generate_mock_video(prompt, num_frames, width, height, watermark_strength, "t2v") if self.pipe_t2v is None: raise RuntimeError("T2V pipeline not available") try: import torch generator = None if seed is not None: generator = torch.Generator(device=self.device).manual_seed(seed) output = self.pipe_t2v( prompt=prompt, negative_prompt=negative_prompt, num_frames=num_frames, width=width, height=height, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, generator=generator, ) frames = output.frames[0] return self._export_video(frames, prompt, watermark_strength, "t2v") except Exception as e: logger.error(f"T2V generation failed: {e}") raise def generate_from_image( self, image_path: str, prompt: str = "", negative_prompt: str = DEFAULT_NEGATIVE_PROMPT, num_frames: int = 49, width: int = 480, height: int = 480, num_inference_steps: int = 50, guidance_scale: float = 6.0, watermark_strength: float = 0.0, seed: Optional[int] = None, ) -> str: """Generate a video from an input image + text prompt. Args: image_path: Path to the conditioning image prompt: Text description of motion/action negative_prompt: Things to avoid num_frames, width, height, num_inference_steps, guidance_scale: Generation params watermark_strength: QWatermark opacity seed: Random seed Returns: Path to the generated MP4 file """ logger.info(f"I2V from '{image_path}': '{prompt[:60]}...'") if self.mock_mode: return self._generate_mock_video(prompt, num_frames, width, height, watermark_strength, "i2v") from PIL import Image as PILImage if not os.path.exists(image_path): raise FileNotFoundError(f"Image not found: {image_path}") input_image = PILImage.open(image_path).convert("RGB") if self.pipe_i2v is not None: try: import torch generator = None if seed is not None: generator = torch.Generator(device=self.device).manual_seed(seed) output = self.pipe_i2v( image=input_image, prompt=prompt, negative_prompt=negative_prompt, num_frames=num_frames, width=width, height=height, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, generator=generator, ) frames = output.frames[0] except Exception as e: logger.error(f"I2V generation failed: {e}") raise elif self.pipe_t2v is not None: # Use T2V pipeline as fallback logger.warning("I2V pipeline not available, falling back to T2V with prompt style") enhanced_prompt = prompt + ", based on the provided image style" try: import torch generator = None if seed is not None: generator = torch.Generator(device=self.device).manual_seed(seed) output = self.pipe_t2v( prompt=enhanced_prompt, negative_prompt=negative_prompt, num_frames=num_frames, width=width, height=height, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, generator=generator, ) frames = output.frames[0] except Exception as e: logger.error(f"T2V fallback generation failed: {e}") raise else: raise RuntimeError("No video generation pipeline available") return self._export_video(frames, prompt, watermark_strength, "i2v") def _generate_mock_video( self, prompt: str, num_frames: int, width: int, height: int, watermark_strength: float, mode: str ) -> str: logger.info("Using mock generator (model unavailable)") frames = self.mock_gen.generate_video(prompt, num_frames, width, height) return self._export_video(frames, prompt, watermark_strength, mode) def _temporal_enhancement(self, frames: np.ndarray, strength: float = 0.5) -> np.ndarray: """Apply temporal smoothing to reduce frame-to-frame artifacts. Applies a lightweight Gaussian filter across the temporal dimension to smooth out flickering and jitter between consecutive frames. Args: frames: Video frames as numpy array (T, H, W, C) strength: Smoothing intensity (0.0 = none, 1.0 = maximum) Returns: Temporally smoothed frames """ if not isinstance(frames, np.ndarray): return frames T, H, W, C = frames.shape if T < 3: return frames # Not enough frames to smooth # Apply lightweight temporal smoothing kernel_size = max(3, int(5 * strength)) if kernel_size % 2 == 0: kernel_size += 1 # Simple temporal blur: average adjacent frames smoothed = frames.copy() half_k = min(kernel_size // 2, T // 2) for t in range(1, T - 1): left = max(0, t - half_k) right = min(T, t + half_k + 1) smoothed[t] = np.mean(frames[left:right], axis=0) return smoothed def _export_video( self, frames, prompt: str, watermark_strength: float, mode: str ) -> str: # Apply temporal enhancement frames = self._temporal_enhancement(frames) # Apply QWatermark if watermark_strength > 0: frames = self.apply_qwatermark(frames, strength=watermark_strength) # Generate filename timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") safe_prompt = "".join(c if c.isalnum() or c in " _-" else "_" for c in prompt[:30]) filename = f"legion_{mode}_{timestamp}_{safe_prompt}.mp4" output_path = os.path.join(OUTPUT_DIR, filename) # Export frames to MP4 if isinstance(frames, np.ndarray): if frames.dtype != np.uint8: frames = (np.clip(frames, 0, 1) * 255).astype(np.uint8) imageio.mimsave(output_path, frames, fps=8, codec='libx264', quality=8, pixelformat='yuv420p') else: frame_list = [] for f in frames: if hasattr(f, 'mode'): frame_list.append(np.array(f.convert("RGB"))) else: frame_list.append(np.array(f)) imageio.mimsave(output_path, frame_list, fps=8, codec='libx264', quality=8, pixelformat='yuv420p') file_size = os.path.getsize(output_path) logger.info(f"Video exported: {output_path} ({file_size / 1024:.1f} KB)") return output_path def apply_qwatermark( self, frames, strength: float = 0.3, text: str = "LEGION", position: str = "bottom-right", font_size: int = 36, opacity: float = 0.3, ) -> np.ndarray: """Apply LEGION QWatermark to video frames. The QWatermark is a semi-transparent quality assurance marker that indicates the video was generated by the LEGION system. Args: frames: Video frames (numpy array or list of PIL Images) strength: Overall watermark intensity (0.0-1.0) text: Watermark text position: Position on frame font_size: Font size for watermark text opacity: Text opacity (0.0-1.0) Returns: Watermarked frames as numpy array """ opacity = opacity * strength if isinstance(frames, np.ndarray): pil_frames = [Image.fromarray(f) for f in frames] else: pil_frames = [Image.fromarray(np.array(f)) for f in frames] watermarked = [] for frame in pil_frames: frame = frame.convert("RGBA") overlay = Image.new("RGBA", frame.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(overlay) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) except (IOError, OSError): try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size) except (IOError, OSError): font = ImageFont.load_default() bbox = draw.textbbox((0, 0), text, font=font) text_w = bbox[2] - bbox[0] text_h = bbox[3] - bbox[1] padding = 10 margin = 15 w, h = frame.size pos_map = { "top-left": (margin, margin), "top-right": (w - text_w - margin, margin), "bottom-left": (margin, h - text_h - margin), "center": ((w - text_w) // 2, (h - text_h) // 2), "bottom-right": (w - text_w - margin, h - text_h - margin), } x, y = pos_map.get(position, pos_map["bottom-right"]) alpha_bg = int(40 * strength) draw.rectangle( [x - padding, y - padding, x + text_w + padding, y + text_h + padding], fill=(0, 0, 0, alpha_bg) ) alpha_text = int(255 * opacity) draw.text((x, y), text, font=font, fill=(255, 255, 255, alpha_text)) badge_text = "LEGION AI" try: small_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 12) except: small_font = ImageFont.load_default() bbox_badge = draw.textbbox((0, 0), badge_text, font=small_font) badge_w = bbox_badge[2] - bbox_badge[0] badge_h = bbox_badge[3] - bbox_badge[1] draw.rectangle([5, 5, 5 + badge_w + 8, 5 + badge_h + 4], fill=(0, 0, 0, alpha_bg)) draw.text((9, 7), badge_text, font=small_font, fill=(200, 200, 200, alpha_text)) watermarked_frame = Image.alpha_composite(frame, overlay) watermarked.append(np.array(watermarked_frame.convert("RGB"))) return np.stack(watermarked)