Instructions to use deathlegionteam/LEGION-Video-Gen with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Diffusers
How to use deathlegionteam/LEGION-Video-Gen with Diffusers:
pip install -U diffusers transformers accelerate
import torch from diffusers import DiffusionPipeline # switch to "mps" for apple devices pipe = DiffusionPipeline.from_pretrained("deathlegionteam/LEGION-Video-Gen", dtype=torch.bfloat16, device_map="cuda") prompt = "Astronaut in a jungle, cold color palette, muted colors, detailed, 8k" image = pipe(prompt).images[0] - Notebooks
- Google Colab
- Kaggle
| import os | |
| import sys | |
| import time | |
| import logging | |
| import datetime | |
| from pathlib import Path | |
| from typing import Optional, Tuple, List, Union | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| import imageio | |
| import imageio_ffmpeg | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s') | |
| logger = logging.getLogger("LegionVideo") | |
| # Output directory | |
| OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "outputs") | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Model directories | |
| MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "models") | |
| T2V_MODEL_DIR = os.path.join(MODEL_DIR, "t2v") | |
| I2V_MODEL_DIR = os.path.join(MODEL_DIR, "i2v") | |
| # Constants | |
| DEFAULT_NEGATIVE_PROMPT = "" | |
| class MockVideoGenerator: | |
| def __init__(self): | |
| self.device = "cpu" | |
| logger.info("MockVideoGenerator initialized - will create test pattern videos") | |
| def generate_video(self, prompt: str, num_frames: int, width: int, height: int) -> np.ndarray: | |
| frames = [] | |
| for i in range(num_frames): | |
| frame = np.zeros((height, width, 3), dtype=np.uint8) | |
| progress = i / max(num_frames - 1, 1) | |
| # Moving color bar | |
| bar_x = int(progress * (width - width // 4)) | |
| frame[:, bar_x:bar_x + width // 4] = [ | |
| int(128 + 127 * np.sin(progress * 4)), | |
| int(128 + 127 * np.sin(progress * 4 + 2)), | |
| int(128 + 127 * np.sin(progress * 4 + 4)) | |
| ] | |
| # Text overlay with prompt | |
| frame_pil = Image.fromarray(frame) | |
| draw = ImageDraw.Draw(frame_pil) | |
| draw.text((10, 10), prompt, fill=(255, 255, 255)) | |
| draw.text((10, height - 30), f"LEGION AI | Frame {i+1}/{num_frames}", fill=(200, 200, 200)) | |
| frames.append(np.array(frame_pil)) | |
| return np.stack(frames) | |
| class LegionVideoGenerator: | |
| """LEGION Video Generator - High-quality video generation system. | |
| Features: | |
| - Text-to-Video generation | |
| - Image-to-Video generation | |
| - Temporal enhancement for smooth frame transitions | |
| - QWatermark system (configurable quality watermark overlay) | |
| - CPU fallback with mock generation when GPU/model unavailable | |
| """ | |
| def __init__(self, model_path: Optional[str] = None): | |
| self.device = self._detect_device() | |
| self.pipe_t2v = None | |
| self.pipe_i2v = None | |
| self.mock_mode = False | |
| self.mock_gen = None | |
| logger.info(f"LEGION Video Generator initializing (device: {self.device})") | |
| # Try loading real models | |
| if not self._load_models(model_path): | |
| logger.warning("Real model loading failed - using mock generator fallback") | |
| self.mock_mode = True | |
| self.mock_gen = MockVideoGenerator() | |
| logger.info("LEGION Video Generator initialized successfully") | |
| def _detect_device(self) -> str: | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| logger.info(f"GPU detected: {torch.cuda.get_device_name(0)}") | |
| return "cuda" | |
| except Exception: | |
| pass | |
| logger.info("No GPU detected - using CPU") | |
| return "cpu" | |
| def _check_memory_sufficient(self) -> bool: | |
| try: | |
| import psutil | |
| available_gb = psutil.virtual_memory().available / (1024 ** 3) | |
| logger.info(f"Available system RAM: {available_gb:.1f} GB") | |
| if available_gb < 20.0: | |
| logger.warning( | |
| f"Insufficient RAM ({available_gb:.1f} GB < 20 GB required) " | |
| f"to load 8.3B parameter model - using mock fallback" | |
| ) | |
| return False | |
| return True | |
| except ImportError: | |
| try: | |
| with open('/proc/meminfo', 'r') as f: | |
| for line in f: | |
| if 'MemAvailable' in line: | |
| available_kb = int(line.split()[1]) | |
| available_gb = available_kb / (1024 * 1024) | |
| logger.info(f"Available system RAM: {available_gb:.1f} GB") | |
| if available_gb < 20.0: | |
| logger.warning( | |
| f"Insufficient RAM ({available_gb:.1f} GB < 20 GB) - using mock" | |
| ) | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Cannot check RAM: {e}") | |
| logger.warning("Cannot check RAM - defaulting to mock mode on CPU") | |
| return False | |
| def _load_models(self, model_path: Optional[str] = None) -> bool: | |
| try: | |
| from diffusers import HunyuanVideo15Pipeline | |
| except ImportError as e: | |
| logger.warning(f"Required modules not available: {e}") | |
| return False | |
| # On CPU, check if we have enough memory first | |
| if self.device == "cpu": | |
| if not self._check_memory_sufficient(): | |
| return False | |
| # Try T2V model from local path only | |
| t2v_path = model_path or T2V_MODEL_DIR | |
| try: | |
| if os.path.exists(os.path.join(t2v_path, "model_index.json")): | |
| logger.info(f"Loading T2V model from local path: {t2v_path}") | |
| self.pipe_t2v = HunyuanVideo15Pipeline.from_pretrained( | |
| t2v_path, | |
| torch_dtype=torch.float32, | |
| ) | |
| else: | |
| logger.warning(f"T2V model not found at {t2v_path}") | |
| return False | |
| # Enable memory optimizations | |
| if self.pipe_t2v is not None: | |
| self.pipe_t2v.enable_model_cpu_offload() | |
| if hasattr(self.pipe_t2v, 'vae') and hasattr(self.pipe_t2v.vae, 'enable_tiling'): | |
| self.pipe_t2v.vae.enable_tiling() | |
| self.pipe_t2v.enable_attention_slicing() | |
| except Exception as e: | |
| logger.warning(f"Could not load T2V model: {e}") | |
| # Try I2V model from local path only | |
| try: | |
| i2v_path = I2V_MODEL_DIR | |
| if os.path.exists(os.path.join(i2v_path, "model_index.json")): | |
| logger.info(f"Loading I2V model from local path: {i2v_path}") | |
| self.pipe_i2v = HunyuanVideo15Pipeline.from_pretrained( | |
| i2v_path, | |
| torch_dtype=torch.float32, | |
| ) | |
| # Enable memory optimizations on I2V | |
| if self.pipe_i2v is not None: | |
| self.pipe_i2v.enable_model_cpu_offload() | |
| if hasattr(self.pipe_i2v, 'vae') and hasattr(self.pipe_i2v.vae, 'enable_tiling'): | |
| self.pipe_i2v.vae.enable_tiling() | |
| self.pipe_i2v.enable_attention_slicing() | |
| except Exception as e: | |
| logger.warning(f"Could not load I2V model: {e}") | |
| return self.pipe_t2v is not None or self.pipe_i2v is not None | |
| def generate_from_text( | |
| self, | |
| prompt: str, | |
| negative_prompt: str = DEFAULT_NEGATIVE_PROMPT, | |
| num_frames: int = 49, | |
| width: int = 480, | |
| height: int = 480, | |
| num_inference_steps: int = 50, | |
| guidance_scale: float = 6.0, | |
| watermark_strength: float = 0.0, | |
| seed: Optional[int] = None, | |
| ) -> str: | |
| """Generate a video from a text prompt. | |
| Args: | |
| prompt: Text description of the video to generate | |
| negative_prompt: Things to avoid in the video | |
| num_frames: Number of frames to generate (1-129) | |
| width, height: Video resolution | |
| num_inference_steps: Diffusion inference steps | |
| guidance_scale: Classifier-free guidance scale | |
| watermark_strength: QWatermark opacity (0.0 = none, 1.0 = full) | |
| seed: Random seed for reproducibility | |
| Returns: | |
| Path to the generated MP4 file | |
| """ | |
| logger.info(f"T2V: '{prompt[:60]}...' ({num_frames}f, {width}x{height}, {num_inference_steps}steps)") | |
| if self.mock_mode: | |
| return self._generate_mock_video(prompt, num_frames, width, height, watermark_strength, "t2v") | |
| if self.pipe_t2v is None: | |
| raise RuntimeError("T2V pipeline not available") | |
| try: | |
| import torch | |
| generator = None | |
| if seed is not None: | |
| generator = torch.Generator(device=self.device).manual_seed(seed) | |
| output = self.pipe_t2v( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| num_frames=num_frames, | |
| width=width, | |
| height=height, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| generator=generator, | |
| ) | |
| frames = output.frames[0] | |
| return self._export_video(frames, prompt, watermark_strength, "t2v") | |
| except Exception as e: | |
| logger.error(f"T2V generation failed: {e}") | |
| raise | |
| def generate_from_image( | |
| self, | |
| image_path: str, | |
| prompt: str = "", | |
| negative_prompt: str = DEFAULT_NEGATIVE_PROMPT, | |
| num_frames: int = 49, | |
| width: int = 480, | |
| height: int = 480, | |
| num_inference_steps: int = 50, | |
| guidance_scale: float = 6.0, | |
| watermark_strength: float = 0.0, | |
| seed: Optional[int] = None, | |
| ) -> str: | |
| """Generate a video from an input image + text prompt. | |
| Args: | |
| image_path: Path to the conditioning image | |
| prompt: Text description of motion/action | |
| negative_prompt: Things to avoid | |
| num_frames, width, height, num_inference_steps, guidance_scale: Generation params | |
| watermark_strength: QWatermark opacity | |
| seed: Random seed | |
| Returns: | |
| Path to the generated MP4 file | |
| """ | |
| logger.info(f"I2V from '{image_path}': '{prompt[:60]}...'") | |
| if self.mock_mode: | |
| return self._generate_mock_video(prompt, num_frames, width, height, watermark_strength, "i2v") | |
| from PIL import Image as PILImage | |
| if not os.path.exists(image_path): | |
| raise FileNotFoundError(f"Image not found: {image_path}") | |
| input_image = PILImage.open(image_path).convert("RGB") | |
| if self.pipe_i2v is not None: | |
| try: | |
| import torch | |
| generator = None | |
| if seed is not None: | |
| generator = torch.Generator(device=self.device).manual_seed(seed) | |
| output = self.pipe_i2v( | |
| image=input_image, | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| num_frames=num_frames, | |
| width=width, | |
| height=height, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| generator=generator, | |
| ) | |
| frames = output.frames[0] | |
| except Exception as e: | |
| logger.error(f"I2V generation failed: {e}") | |
| raise | |
| elif self.pipe_t2v is not None: | |
| # Use T2V pipeline as fallback | |
| logger.warning("I2V pipeline not available, falling back to T2V with prompt style") | |
| enhanced_prompt = prompt + ", based on the provided image style" | |
| try: | |
| import torch | |
| generator = None | |
| if seed is not None: | |
| generator = torch.Generator(device=self.device).manual_seed(seed) | |
| output = self.pipe_t2v( | |
| prompt=enhanced_prompt, | |
| negative_prompt=negative_prompt, | |
| num_frames=num_frames, | |
| width=width, | |
| height=height, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| generator=generator, | |
| ) | |
| frames = output.frames[0] | |
| except Exception as e: | |
| logger.error(f"T2V fallback generation failed: {e}") | |
| raise | |
| else: | |
| raise RuntimeError("No video generation pipeline available") | |
| return self._export_video(frames, prompt, watermark_strength, "i2v") | |
| def _generate_mock_video( | |
| self, prompt: str, num_frames: int, width: int, height: int, | |
| watermark_strength: float, mode: str | |
| ) -> str: | |
| logger.info("Using mock generator (model unavailable)") | |
| frames = self.mock_gen.generate_video(prompt, num_frames, width, height) | |
| return self._export_video(frames, prompt, watermark_strength, mode) | |
| def _temporal_enhancement(self, frames: np.ndarray, strength: float = 0.5) -> np.ndarray: | |
| """Apply temporal smoothing to reduce frame-to-frame artifacts. | |
| Applies a lightweight Gaussian filter across the temporal dimension | |
| to smooth out flickering and jitter between consecutive frames. | |
| Args: | |
| frames: Video frames as numpy array (T, H, W, C) | |
| strength: Smoothing intensity (0.0 = none, 1.0 = maximum) | |
| Returns: | |
| Temporally smoothed frames | |
| """ | |
| if not isinstance(frames, np.ndarray): | |
| return frames | |
| T, H, W, C = frames.shape | |
| if T < 3: | |
| return frames # Not enough frames to smooth | |
| # Apply lightweight temporal smoothing | |
| kernel_size = max(3, int(5 * strength)) | |
| if kernel_size % 2 == 0: | |
| kernel_size += 1 | |
| # Simple temporal blur: average adjacent frames | |
| smoothed = frames.copy() | |
| half_k = min(kernel_size // 2, T // 2) | |
| for t in range(1, T - 1): | |
| left = max(0, t - half_k) | |
| right = min(T, t + half_k + 1) | |
| smoothed[t] = np.mean(frames[left:right], axis=0) | |
| return smoothed | |
| def _export_video( | |
| self, frames, prompt: str, watermark_strength: float, mode: str | |
| ) -> str: | |
| # Apply temporal enhancement | |
| frames = self._temporal_enhancement(frames) | |
| # Apply QWatermark | |
| if watermark_strength > 0: | |
| frames = self.apply_qwatermark(frames, strength=watermark_strength) | |
| # Generate filename | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_prompt = "".join(c if c.isalnum() or c in " _-" else "_" for c in prompt[:30]) | |
| filename = f"legion_{mode}_{timestamp}_{safe_prompt}.mp4" | |
| output_path = os.path.join(OUTPUT_DIR, filename) | |
| # Export frames to MP4 | |
| if isinstance(frames, np.ndarray): | |
| if frames.dtype != np.uint8: | |
| frames = (np.clip(frames, 0, 1) * 255).astype(np.uint8) | |
| imageio.mimsave(output_path, frames, fps=8, codec='libx264', | |
| quality=8, pixelformat='yuv420p') | |
| else: | |
| frame_list = [] | |
| for f in frames: | |
| if hasattr(f, 'mode'): | |
| frame_list.append(np.array(f.convert("RGB"))) | |
| else: | |
| frame_list.append(np.array(f)) | |
| imageio.mimsave(output_path, frame_list, fps=8, codec='libx264', | |
| quality=8, pixelformat='yuv420p') | |
| file_size = os.path.getsize(output_path) | |
| logger.info(f"Video exported: {output_path} ({file_size / 1024:.1f} KB)") | |
| return output_path | |
| def apply_qwatermark( | |
| self, | |
| frames, | |
| strength: float = 0.3, | |
| text: str = "LEGION", | |
| position: str = "bottom-right", | |
| font_size: int = 36, | |
| opacity: float = 0.3, | |
| ) -> np.ndarray: | |
| """Apply LEGION QWatermark to video frames. | |
| The QWatermark is a semi-transparent quality assurance marker | |
| that indicates the video was generated by the LEGION system. | |
| Args: | |
| frames: Video frames (numpy array or list of PIL Images) | |
| strength: Overall watermark intensity (0.0-1.0) | |
| text: Watermark text | |
| position: Position on frame | |
| font_size: Font size for watermark text | |
| opacity: Text opacity (0.0-1.0) | |
| Returns: | |
| Watermarked frames as numpy array | |
| """ | |
| opacity = opacity * strength | |
| if isinstance(frames, np.ndarray): | |
| pil_frames = [Image.fromarray(f) for f in frames] | |
| else: | |
| pil_frames = [Image.fromarray(np.array(f)) for f in frames] | |
| watermarked = [] | |
| for frame in pil_frames: | |
| frame = frame.convert("RGBA") | |
| overlay = Image.new("RGBA", frame.size, (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(overlay) | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) | |
| except (IOError, OSError): | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size) | |
| except (IOError, OSError): | |
| font = ImageFont.load_default() | |
| bbox = draw.textbbox((0, 0), text, font=font) | |
| text_w = bbox[2] - bbox[0] | |
| text_h = bbox[3] - bbox[1] | |
| padding = 10 | |
| margin = 15 | |
| w, h = frame.size | |
| pos_map = { | |
| "top-left": (margin, margin), | |
| "top-right": (w - text_w - margin, margin), | |
| "bottom-left": (margin, h - text_h - margin), | |
| "center": ((w - text_w) // 2, (h - text_h) // 2), | |
| "bottom-right": (w - text_w - margin, h - text_h - margin), | |
| } | |
| x, y = pos_map.get(position, pos_map["bottom-right"]) | |
| alpha_bg = int(40 * strength) | |
| draw.rectangle( | |
| [x - padding, y - padding, x + text_w + padding, y + text_h + padding], | |
| fill=(0, 0, 0, alpha_bg) | |
| ) | |
| alpha_text = int(255 * opacity) | |
| draw.text((x, y), text, font=font, fill=(255, 255, 255, alpha_text)) | |
| badge_text = "LEGION AI" | |
| try: | |
| small_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 12) | |
| except: | |
| small_font = ImageFont.load_default() | |
| bbox_badge = draw.textbbox((0, 0), badge_text, font=small_font) | |
| badge_w = bbox_badge[2] - bbox_badge[0] | |
| badge_h = bbox_badge[3] - bbox_badge[1] | |
| draw.rectangle([5, 5, 5 + badge_w + 8, 5 + badge_h + 4], fill=(0, 0, 0, alpha_bg)) | |
| draw.text((9, 7), badge_text, font=small_font, fill=(200, 200, 200, alpha_text)) | |
| watermarked_frame = Image.alpha_composite(frame, overlay) | |
| watermarked.append(np.array(watermarked_frame.convert("RGB"))) | |
| return np.stack(watermarked) |