""" CineGen AI — Utility Functions Prompt engineering, validation, and config management """ import re import yaml import random from pathlib import Path from typing import Optional, Dict, Any # ────────────────────────────────────────────────────────────────────────────── # Prompt Engineering # ────────────────────────────────────────────────────────────────────────────── QUALITY_BOOSTERS = [ "photorealistic", "8K ultra-detailed", "cinematic quality", "sharp focus", "professional photography", "award-winning photo", "masterpiece", "best quality", "highly detailed", ] NEGATIVE_DEFAULTS = [ "blurry", "low quality", "watermark", "text overlay", "distorted proportions", "artifacts", "pixelated", "static", "frozen frame", "duplicate frames", "bad anatomy", "deformed", "ugly", "oversaturated", ] CINEMATIC_TECHNIQUES = [ "anamorphic lens flare", "shallow depth of field", "bokeh background", "volumetric lighting", "dramatic shadows", "god rays", "color grading", "cinematic color grade", "lens distortion", ] MOTION_DESCRIPTORS = [ "smooth camera movement", "subtle camera parallax", "natural motion blur", "dynamic composition", "stabilized shot", "crane shot movement", ] def validate_prompt(prompt: str, max_length: int = 512) -> str: """ Clean and validate a generation prompt. - Strips extra whitespace - Removes potentially problematic characters - Truncates to max length """ # Strip and normalize whitespace prompt = " ".join(prompt.split()) # Remove special characters that may break tokenizers prompt = re.sub(r'[<>{}|\\^~\[\]]', '', prompt) # Truncate if too long if len(prompt) > max_length: prompt = prompt[:max_length].rsplit(' ', 1)[0] return prompt.strip() def enhance_prompt(prompt: str, style: str = "hyperrealism") -> str: """ Automatically enhance a prompt with cinematic quality boosters. Adds technical photography terms, quality descriptors, and motion characteristics appropriate for the selected style. Args: prompt: Original user prompt style: Selected style preset name Returns: Enhanced prompt with quality boosters """ # Don't add if already has quality terms prompt_lower = prompt.lower() additions = [] # Add quality boosters if not present quality_terms = ["photorealistic", "8k", "ultra-detailed", "cinematic"] if not any(term in prompt_lower for term in quality_terms): additions.append(random.choice(QUALITY_BOOSTERS[:4])) # Add cinematic technique based on style if style in ("cinematic_epic", "hyperrealism", "portrait_closeup"): tech = random.choice(CINEMATIC_TECHNIQUES) if tech.split()[0] not in prompt_lower: additions.append(tech) # Add motion descriptor if not any(m.split()[0] in prompt_lower for m in MOTION_DESCRIPTORS): additions.append(random.choice(MOTION_DESCRIPTORS)) if additions: return f"{prompt}, {', '.join(additions)}" return prompt def build_negative_prompt(base_negative: str = "", style: str = "") -> str: """Build a comprehensive negative prompt.""" all_negatives = list(NEGATIVE_DEFAULTS) if base_negative: all_negatives = [base_negative] + all_negatives # Style-specific negatives style_negatives = { "hyperrealism": ["cartoon", "anime", "painting", "illustration"], "documentary": ["studio lighting", "artificial", "staged"], "noir": ["color", "bright", "cheerful"], } all_negatives.extend(style_negatives.get(style, [])) # Deduplicate while preserving order seen = set() unique_negatives = [] for neg in all_negatives: if neg not in seen: seen.add(neg) unique_negatives.append(neg) return ", ".join(unique_negatives) # ────────────────────────────────────────────────────────────────────────────── # Config Management # ────────────────────────────────────────────────────────────────────────────── _config_cache: Dict[str, Any] = {} def load_config(config_path: str = "configs/generation_config.yaml") -> Dict[str, Any]: """Load and cache generation configuration.""" if config_path in _config_cache: return _config_cache[config_path] path = Path(config_path) if not path.exists(): return _get_default_config() with open(path, "r") as f: config = yaml.safe_load(f) _config_cache[config_path] = config return config def _get_default_config() -> Dict[str, Any]: """Return default configuration values.""" return { "model": { "primary": "Wan-AI/Wan2.1-T2V-14B-Diffusers", "refinement": "THUDM/CogVideoX-5b", "i2v": "stabilityai/stable-video-diffusion-img2vid-xt", }, "generation": { "default_fps": 24, "default_frames": 81, "default_resolution": "720p", "scheduler": "euler_ancestral", }, "quality": { "enable_temporal_smoothing": True, "enable_super_resolution": True, "upscale_factor": 2, }, "performance": { "enable_xformers": False, "enable_flash_attention": True, "torch_dtype": "bfloat16", }, } # ────────────────────────────────────────────────────────────────────────────── # Resolution Helpers # ────────────────────────────────────────────────────────────────────────────── RESOLUTION_MAP = { "256p": (256, 256), "360p": (640, 360), "480p": (854, 480), "720p": (1280, 720), "1080p": (1920, 1080), } def parse_resolution(resolution: str) -> tuple: """Parse resolution string to (width, height) tuple.""" return RESOLUTION_MAP.get(resolution, (1280, 720)) def get_vae_compatible_size(width: int, height: int, factor: int = 32) -> tuple: """Round dimensions to nearest VAE-compatible size.""" return ( (width // factor) * factor, (height // factor) * factor, ) def estimate_vram_gb(width: int, height: int, num_frames: int) -> float: """Rough VRAM estimation for a given generation config.""" pixels = width * height # ~0.8 bytes per pixel per frame for bfloat16 latents + overhead raw_gb = (pixels * num_frames * 0.8) / (1024 ** 3) # Model weights: ~28GB for Wan2.1-14B in bf16 model_gb = 28.0 return raw_gb + model_gb