import os import sys import re import random import math # THIS IS THE FIX - PART 1 os.environ['GRADIO_SUPPRESS_PROGRESS'] = 'true' # THIS IS THE FIX - PART 2: Clean up console logs from Gradio import logging logging.getLogger('gradio').setLevel(logging.ERROR) import cv2 import numpy as np import gradio as gr from gradio import Progress import shutil import subprocess from PIL import Image, ImageDraw, ImageFont, ImageOps, ImageEnhance from datetime import datetime from threading import Lock import base64 import io # --- Dependency Check --- try: from controlnet_aux import ( CannyDetector, MLSDdetector, HEDdetector, LineartDetector, OpenposeDetector, NormalBaeDetector ) from gradio_client import Client from rembg import remove import librosa # NEW: Added for the audio chopping feature from pydub import AudioSegment from pydub.silence import split_on_silence except ImportError as e: print("="*80) print(f"ERROR: Missing dependency -> {e}") print("Please install all required packages by running:") print("pip install -r requirements.txt") print("(Note: The new feature requires 'pydub'. Make sure it's in your requirements file.)") print("="*80) sys.exit(1) # --- AI Model Dependency Check --- try: import whisper except ImportError: print("="*80) print("WARNING: 'openai-whisper' not installed. The Transcription tab will be disabled.") print("To enable it, run: pip install -U openai-whisper") print("="*80) whisper = None # --- Slo-Mo & Enhance AI Dependency Check (SIMPLIFIED) --- try: from rife_ncnn_vulkan_python import Rife ENHANCE_AI_AVAILABLE = True except ImportError: print("="*80) print("WARNING: 'rife-ncnn-vulkan-python' not found.") print("The AI-Enhanced option in 'Slo-Mo & Enhance' will be disabled.") print("To enable it, run: pip install rife-ncnn-vulkan-python") print("="*80) Rife = None ENHANCE_AI_AVAILABLE = False # --- Global Variables & Setup --- TEMP_DIR = "temp_gradio" os.makedirs(TEMP_DIR, exist_ok=True) model_load_lock = Lock() loaded_detectors = {} whisper_model = None whisper_model_name = "" rife_model = None # REMOVED realesrgan_model # --- Default Presets for Transfer Tab (Flat Dictionary) --- DEFAULT_LINK_PRESETS = { # Text To Image "FLUX.1-schnell (black-forest-labs)": "https://huggingface.co/spaces/black-forest-labs/FLUX.1-schnell", "FLUX.1-schnell (Rooc)": "https://huggingface.co/spaces/Rooc/FLUX.1-schnell", "FLUX.1-schnell (evalstate)": "https://huggingface.co/spaces/evalstate/flux1_schnell", "FLUX.1-schnell (hysts-mcp)": "https://huggingface.co/spaces/hysts-mcp/FLUX.1-schnell", "FLUX.1-schnell (cbensimon)": "https://huggingface.co/spaces/cbensimon/FLUX-1-schnell-mcp", "FLUX.1-dev": "https://huggingface.co/spaces/black-forest-labs/FLUX.1-dev", "FLUX.1-dev-quantized": "https://huggingface.co/spaces/multimodalart/FLUX.1-dev-quantized", "FLUX.1-dev_NotASI": "https://huggingface.co/spaces/NotASI/FLUX.1-dev", "FLUX.1-dev_hysts": "https://huggingface.co/spaces/hysts-mcp/FLUX.1-dev", "HiDream-I1-Dev": "https://huggingface.co/spaces/HiDream-ai/HiDream-I1-Dev", "UnfilteredAI-NSFW-gen-v2": "https://huggingface.co/spaces/armen425221356/UnfilteredAI-NSFW-gen-v2_self_parms", "InfiniteYou-FLUX": "https://huggingface.co/spaces/ByteDance/InfiniteYou-FLUX", "Stable Diffusion 3.5 Large (arad1367)": "https://huggingface.co/spaces/arad1367/Stable_Diffusion_3_5_Large_Customized", "Stable Diffusion 3.5 Large Turbo (doevent)": "https://huggingface.co/spaces/doevent/stable-diffusion-3.5-large-turbo", # Virtual Try-On & Character "OutfitAnyone": "https://huggingface.co/spaces/HumanAIGC/OutfitAnyone", "Kolors Virtual Try-On": "https://huggingface.co/spaces/Kwai-Kolors/Kolors-Virtual-Try-On", "Miragic Virtual Try-On": "https://huggingface.co/spaces/Miragic-AI/Miragic-Virtual-Try-On", "OutfitAnyway": "https://huggingface.co/spaces/selfit-camera/OutfitAnyway", "IDM-VTON": "https://huggingface.co/spaces/yisol/IDM-VTON", "InstantCharacter": "https://huggingface.co/spaces/InstantX/InstantCharacter", "InstantID": "https://huggingface.co/spaces/InstantX/InstantID", # AI Lip-Sync & Talking Avatars "LivePortrait": "https://huggingface.co/spaces/Han-123/LivePortrait", "LivePortrait (CPU)": "https://huggingface.co/spaces/K00B404/LivePortrait_cpu", "D-ID Live Portrait AI": "https://www.d-id.com/liveportrait-4/", "Synthesia Avatars": "https://www.synthesia.io/features/avatars", "Papercup": "https://www.papercup.com/", "Hedra": "https://www.hedra.com", "LemonSlice": "https://lemonslice.com", "Vozo AI": "https://www.vozo.ai/lip-sync", "Gooey AI Lipsync": "https://gooey.ai/Lipsync", "Sync.so": "https://sync.so", "LipDub AI": "https://www.lipdub.ai", "Magic Hour": "https://magichour.ai", "Lifelike AI": "https://www.lifelikeai.io", "DeepMotion": "https://www.deepmotion.com", "Elai.io": "https://elai.io", "Rephrase.ai": "https://www.rephrase.ai", "Colossyan": "https://www.colossyan.com", "HeyGen (Movio)": "https://www.heygen.com", "Murf Studio": "https://murf.ai", # Image Editing & Upscaling "FLUX Fill/Outpaint": "https://huggingface.co/spaces/multimodalart/flux-fill-outpaint", "ReSize Image Outpainting": "https://huggingface.co/spaces/VIDraft/ReSize-Image-Outpainting", "IC-Light (Relighting)": "https://huggingface.co/spaces/lllyasviel/IC-Light", "IC-Light v2-vary": "https://huggingface.co/spaces/lllyasviel/iclight-v2-vary", "Kontext Relight": "https://huggingface.co/spaces/kontext-community/kontext-relight", "SUPIR Upscaler": "https://huggingface.co/spaces/Fabrice-TIERCELIN/SUPIR", # Video Generation & FramePacks "Framepacks (atunc29)": "https://huggingface.co/spaces/atunc29/Framepacks", "Framepack i2v (ginigen)": "https://huggingface.co/spaces/ginigen/framepack-i2v", "Framepack i2v (beowcow)": "https://huggingface.co/spaces/beowcow/framepack-i2v", "Framepack i2v (lisonallen)": "https://huggingface.co/spaces/lisonallen/framepack-i2v", "FramePack F1 (Latyrine)": "https://huggingface.co/spaces/Latyrine/FramePack-F1", "FramePack F1 (linoyts)": "https://huggingface.co/spaces/linoyts/FramePack-F1", "FramePack Rotate (tori29umai)": "https://huggingface.co/spaces/tori29umai/FramePack_rotate_landscape", "FramePack Rotate (bep40)": "https://huggingface.co/spaces/bep40/FramePack_rotate_landscape", "FramePack Rotate (VIDraft)": "https://huggingface.co/spaces/VIDraft/FramePack_rotate_landscape", "Framepack-H111 (rahul7star)": "https://huggingface.co/spaces/rahul7star/Framepack-H111", "FLUX.1 Kontext Dev": "https://huggingface.co/spaces/black-forest-labs/FLUX.1-Kontext-Dev", "Wan2-1-fast": "https://huggingface.co/spaces/multimodalart/wan2-1-fast", "LTX-video-distilled": "https://huggingface.co/spaces/Lightricks/ltx-video-distilled", "RunwayML": "https://app.runwayml.com/video-tools/teams/rinaabdine1/ai-tools/generate", "Pika Labs": "https://pika.art/", "Kling AI": "https://app.klingai.com/global/image-to-video/frame-mode", # Video Interpolation & Slow Motion "RIFE (remzloev)": "https://huggingface.co/spaces/remzloev/Rife", "VFI Converter (Agung1453)": "https://huggingface.co/spaces/Agung1453/Video-Frame-Interpolation-Converter", "ZeroGPU Upscaler/Interpolation": "https://huggingface.co/spaces/inoculatemedia/zerogpu-upscaler-interpolation", "Frame Interpolation (meta-artem)": "https://huggingface.co/spaces/meta-artem/frame-interpolation", "Video Frame Interpolation (guardiancc)": "https://huggingface.co/spaces/guardiancc/video_frame_interpolation", "Video Frame Interpolation (freealise)": "https://huggingface.co/spaces/freealise/video_frame_interpolation", "Framer (wwen1997)": "https://huggingface.co/spaces/wwen1997/Framer", "Inter4k VideoInterpolator": "https://huggingface.co/spaces/vimleshc57/Inter4k_VideoInterpolator", # AnimateDiff & Advanced Animation "AnimateDiff Lightning (ByteDance)": "https://huggingface.co/spaces/ByteDance/AnimateDiff-Lightning", "AnimateDiff Lightning (SahaniJi)": "https://huggingface.co/spaces/SahaniJi/AnimateDiff-Lightning", "AnimateDiff (fatima14)": "https://huggingface.co/spaces/fatima14/AnimateDiff", "AnimateDiff Video Gen (faizanR)": "https://huggingface.co/spaces/faizanR/animatediff-video-generator", "Text-to-Animation Fast (MisterProton)": "https://huggingface.co/spaces/MisterProton/text-to-Animation-Fast-AnimateDiff", "Text-to-Animation Fast (Rowdy013)": "https://huggingface.co/spaces/Rowdy013/text-to-Animation-Fast", # StyleGAN & Portrait Motion "StyleGAN-Human Interpolation (hysts)": "https://huggingface.co/spaces/hysts/StyleGAN-Human-Interpolation", "StyleGAN-Human (Gradio-Blocks)": "https://huggingface.co/spaces/Gradio-Blocks/StyleGAN-Human", # Film & Style Models "MGM-Film-Diffusion (tonyassi)": "https://huggingface.co/spaces/tonyassi/MGM-Film-Diffusion", "CineDiffusion (takarajordan)": "https://huggingface.co/spaces/takarajordan/CineDiffusion", "FLUX Film Foto (MartsoBodziu1994)": "https://huggingface.co/spaces/MartsoBodziu1994/alvdansen-flux_film_foto", "FLUX Style Shaping": "https://huggingface.co/spaces/multimodalart/flux-style-shaping", "Film (Stijnijzelenberg)": "https://huggingface.co/spaces/Stijnijzelenberg/film", "Film Eras (abbiewoodbridge)": "https://huggingface.co/spaces/abbiewoodbridge/Film_Eras", "Film Genre Classifier (Rezuwan)": "https://huggingface.co/spaces/Rezuwan/film_genre_classifier", "RunwayML (Faizbulbul)": "https://huggingface.co/spaces/Faizbulbul/Runwaymlfaiz", # Text-to-3D "Step1X-3D": "https://huggingface.co/spaces/stepfun-ai/Step1X-3D", "TRELLIS TextTo3D (PUM4CH3N)": "https://huggingface.co/spaces/PUM4CH3N/TRELLIS_TextTo3D", "TRELLIS TextTo3D (cavargas10)": "https://huggingface.co/spaces/cavargas10/TRELLIS-Texto3D", "TRELLIS TextTo3D (dkatz2391)": "https://huggingface.co/spaces/dkatz2391/TRELLIS_TextTo3D_Try2", "Sparc3D": "https://huggingface.co/spaces/ilcve21/Sparc3D", "Hunyuan3D-2.1": "https://huggingface.co/spaces/tencent/Hunyuan3D-2.1", # Image Captioning & Interrogation "BLIP-2 (hysts)": "https://huggingface.co/spaces/hysts/BLIP2", "BLIP-3o": "https://huggingface.co/spaces/BLIP3o/blip-3o", "Blip-Dalle3 (DarwinAnim8or)": "https://huggingface.co/spaces/DarwinAnim8or/Blip-Dalle3", "BLIP API (Jonu1)": "https://huggingface.co/spaces/Jonu1/blip-image-captioning-api", "BLIP API (muxiddin19)": "https://huggingface.co/spaces/muxiddin19/blip-image-captioning-api", # Diffusion & Sketching Tools "DiffSketcher (SVGRender)": "https://huggingface.co/spaces/SVGRender/DiffSketcher", "Diffusion WikiArt (kaupane)": "https://huggingface.co/spaces/kaupane/diffusion-wikiart", "Diffusers Image Fill (OzzyGT)": "https://huggingface.co/spaces/OzzyGT/diffusers-image-fill", "Diffusers Fast Inpaint (OzzyGT)": "https://huggingface.co/spaces/OzzyGT/diffusers-fast-inpaint", # Audio & Voice Tools "ThinkSound (FunAudioLLM)": "https://huggingface.co/spaces/FunAudioLLM/ThinkSound", "TTS Unlimited (NihalGazi)": "https://huggingface.co/spaces/NihalGazi/Text-To-Speech-Unlimited", "Voice Clon (tonyassi)": "https://huggingface.co/spaces/tonyassi/voice-clon", # Scripting & Writing Tools "SKRIPTZ (skylinkd)": "https://huggingface.co/spaces/skylinkd/SKRIPTZ", # AI Frameworks & Platforms "Hugging Face Hub": "https://huggingface.co", "Hugging Face Transformers": "https://huggingface.co/docs/transformers/en/index", "Hugging Face Inference API": "https://huggingface.co/inference-api/", # Miscellaneous Video Tools "SpatialTrackerV2 (Yuxihenry)": "https://huggingface.co/spaces/Yuxihenry/SpatialTrackerV2", "MTVCraft (BAAI)": "https://huggingface.co/spaces/BAAI/MTVCraft", # Miscellaneous Tools "EBSynth (NihalGazi)": "https://huggingface.co/spaces/NihalGazi/EBSynth", "MoodSpace (huzey)": "https://huggingface.co/spaces/huzey/MoodSpace", "TR0N (Layer6)": "https://huggingface.co/spaces/Layer6/TR0N", "TUTOR (nathannarrik)": "https://huggingface.co/spaces/nathannarrik/TUTOR", "Sport Model 1 (CHEN11102)": "https://huggingface.co/spaces/CHEN11102/sportmodel1", "VBench Leaderboard (Vchitect)": "https://huggingface.co/spaces/Vchitect/VBench_Leaderboard", } # --- Model Loading --- DETECTOR_CONFIG = { "Canny": {"class": CannyDetector, "args": {}}, "Lineart": {"class": LineartDetector, "args": {"pretrained_model_or_path": "lllyasviel/Annotators"}}, "MLSD": {"class": MLSDdetector, "args": {"pretrained_model_or_path": "lllyasviel/Annotators"}}, "OpenPose": {"class": OpenposeDetector, "args": {"pretrained_model_or_path": "lllyasviel/Annotators"}}, "NormalBAE": {"class": NormalBaeDetector, "args": {"pretrained_model_or_path": "lllyasviel/Annotators"}}, "SoftEdge (HED)": {"class": HEDdetector, "args": {"pretrained_model_or_path": "lllyasviel/Annotators"}}, } def get_detector(name): with model_load_lock: if name not in loaded_detectors: print(f"Loading {name} model...") config = DETECTOR_CONFIG[name] if "pretrained_model_or_path" in config["args"]: detector_class = config["class"] loaded_detectors[name] = detector_class.from_pretrained(**config["args"]) else: loaded_detectors[name] = config["class"](**config["args"]) print(f"{name} model loaded.") return loaded_detectors[name] def load_whisper_model(model_name="base"): global whisper_model, whisper_model_name if whisper: with model_load_lock: if whisper_model is None or whisper_model_name != model_name: print(f"Loading Whisper model '{model_name}'... (This may download files on first run)") whisper_model = whisper.load_model(model_name, device="cpu") whisper_model_name = model_name print("Whisper model loaded.") return whisper_model return None def load_enhance_ai_models(): """Load RIFE model if it is not already loaded.""" global rife_model if not ENHANCE_AI_AVAILABLE: return with model_load_lock: if rife_model is None: print("Loading RIFE model for frame interpolation...") rife_model = Rife(gpuid=0, model="rife-v4.6", num_threads=4, tta_mode=False) print("RIFE model loaded.") get_detector("Canny") # Pre-load Canny detector # --- Utility Functions --- def parse_color(color_str): """ Parses a color string from Gradio's ColorPicker. It can handle hex strings ('#RRGGBB') or the problematic rgba float format ('rgba(r,g,b,a)'). Returns a tuple (r, g, b) for PIL. """ if not isinstance(color_str, str): return color_str # Should already be a tuple or other valid format if color_str.startswith('rgba'): parts = re.findall(r"[\d\.]+", color_str) if len(parts) >= 3: return (int(float(parts[0])), int(float(parts[1])), int(float(parts[2]))) # Handle standard hex '#RRGGBB' if color_str.startswith('#'): hex_color = color_str.lstrip('#') return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) return color_str def rotate_image(image, rotation): if rotation == "90 Degrees Clockwise": return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) elif rotation == "90 Degrees Counter-Clockwise": return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE) elif rotation == "180 Degrees": return cv2.rotate(image, cv2.ROTATE_180) return image def manipulate_image(image, operation): if image is None: raise gr.Error("Please upload an image first.") if operation == "Invert Colors": return cv2.bitwise_not(image) elif operation == "Flip Horizontal": return cv2.flip(image, 1) elif operation == "Flip Vertical": return cv2.flip(image, 0) elif operation == "Rotate 90° Right": return cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) elif operation == "Rotate 90° Left": return cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE) else: return image def manipulate_video(video_path, operation): if not video_path: raise gr.Error("Please upload a video first.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"manipulated_video_{timestamp}.mp4") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise gr.Error("Error opening video file.") width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) if fps == 0: fps = 30 frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out_width, out_height = width, height if operation in ["Rotate 90° Right", "Rotate 90° Left"]: out_width, out_height = height, width writer = cv2.VideoWriter(output_video_path, fourcc, fps, (out_width, out_height)) for _ in range(frame_count): ret, frame = cap.read() if not ret: break processed_frame = manipulate_image(frame, operation) writer.write(processed_frame) cap.release() writer.release() return output_video_path def get_media_duration(media_path): if not media_path or not os.path.exists(media_path): return 0.0 # --- METHOD 1: Fast Metadata Probe (for well-formed files) --- try: cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", media_path] result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=10) return float(result.stdout.strip()) except Exception: # This method failed, likely due to malformed metadata. Proceed to the robust method. pass # --- METHOD 2: Robust Full Scan (for problematic files) --- print(f"Warning: Fast duration check failed for {os.path.basename(media_path)}. Performing robust scan (this may take a moment)...") try: cmd = ["ffmpeg", "-i", media_path, "-f", "null", "-"] # We need to capture stderr, where ffmpeg writes its progress result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) # Search for the final 'time=' stamp in ffmpeg's output matches = re.findall(r"time=(\d{2}):(\d{2}):(\d{2})\.(\d{2})", result.stderr) if matches: last_match = matches[-1] hours, minutes, seconds, hundredths = map(int, last_match) total_seconds = (hours * 3600) + (minutes * 60) + seconds + (hundredths / 100.0) print(f"Robust scan successful. Detected duration: {total_seconds:.2f}s") return total_seconds else: # If even this fails, the file is likely very corrupt print(f"Error: Robust duration scan also failed for {os.path.basename(media_path)}.") return 0.0 except Exception as e: print(f"An unexpected error occurred during robust scan for {media_path}: {e}") return 0.0 def get_video_dimensions(video_path): if not video_path: return 0, 0 try: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return 0, 0 width, height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() return width, height except Exception: return 0, 0 def get_video_fps(video_path): if not video_path: return 24.0 try: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return 24.0 fps = cap.get(cv2.CAP_PROP_FPS) cap.release() return fps if fps > 0 else 24.0 except Exception: return 24.0 def has_audio_stream(video_path): """Checks if a video file has at least one audio stream.""" if not video_path: return False try: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return result.stdout.strip() != "" except (subprocess.CalledProcessError, FileNotFoundError): return False def run_ffmpeg_command(cmd, desc="Processing with FFMPEG..."): try: print(f"Running FFMPEG command: {' '.join(cmd)}") process = subprocess.run( cmd, capture_output=True, text=True, encoding='utf-8', check=False ) if process.returncode != 0: full_output = f"--- FFMPEG & GRADIO ERROR LOG ---\n\n" \ f"FFMPEG COMMAND:\n{' '.join(cmd)}\n\n" \ f"FFMPEG STDERR:\n{process.stderr}\n\n" \ f"FFMPEG STDOUT:\n{process.stdout}" raise subprocess.CalledProcessError(process.returncode, cmd, output=full_output) except subprocess.CalledProcessError as e: raise gr.Error(f"FFMPEG failed!\n\nDetails:\n{e.output}") except FileNotFoundError: raise gr.Error("FFMPEG not found. Please ensure ffmpeg is installed and in your system's PATH.") def batch_image_processor(files, processing_function, job_name, **kwargs): if not files: raise gr.Error("Please upload at least one image.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"{job_name}_{timestamp}"); os.makedirs(job_temp_dir, exist_ok=True) output_paths = [] for file_obj in files: try: base, _ = os.path.splitext(os.path.basename(file_obj.name)) if job_name == "zoom_videos": output_filename = f"{base}.mp4" elif job_name == "bg_removed": output_filename = f"{base}.png" elif job_name == "cropped": output_filename = f"{base}_cropped.png" else: output_filename = os.path.basename(file_obj.name) output_path = os.path.join(job_temp_dir, output_filename) processing_function(input_path=file_obj.name, output_path=output_path, **kwargs) output_paths.append(output_path) except Exception as e: print(f"Skipping file {file_obj.name} due to error: {e}") continue if not output_paths: shutil.rmtree(job_temp_dir) raise gr.Error("No images could be processed from the batch.") zip_base_name = os.path.join(TEMP_DIR, f"{job_name}_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) return output_paths, zip_path, job_temp_dir def process_batch_images_with_detector(files, detector_name): detector = get_detector(detector_name) def apply_detector(input_path, output_path, **kwargs): with Image.open(input_path).convert("RGB") as img: processed = detector(img, detect_resolution=512, image_resolution=1024) processed.save(output_path) output_paths, zip_path, _ = batch_image_processor(files, apply_detector, f"controlnet_{detector_name}") return output_paths, zip_path def process_video_with_detector(video_path, detector_name): if not video_path: raise gr.Error("Please upload a video first.") detector = get_detector(detector_name) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"job_{timestamp}") input_frames_dir, output_frames_dir = os.path.join(job_temp_dir, "input_frames"), os.path.join(job_temp_dir, "output_frames") os.makedirs(input_frames_dir, exist_ok=True); os.makedirs(output_frames_dir, exist_ok=True) output_video_path = os.path.join(TEMP_DIR, f"{detector_name.lower()}_output_{timestamp}.mp4") cap = cv2.VideoCapture(video_path) frame_count, frame_rate = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), get_video_fps(video_path) for i in range(frame_count): success, frame = cap.read() if not success: break cv2.imwrite(os.path.join(input_frames_dir, f"frame_{i:05d}.png"), frame) cap.release() input_files = sorted(os.listdir(input_frames_dir)) for filename in input_files: with Image.open(os.path.join(input_frames_dir, filename)).convert("RGB") as image: result_pil = detector(image, detect_resolution=512, image_resolution=1024) result_np = cv2.cvtColor(np.array(result_pil), cv2.COLOR_RGB2BGR) cv2.imwrite(os.path.join(output_frames_dir, filename), result_np) cmd = ["ffmpeg", "-framerate", str(frame_rate), "-i", os.path.join(output_frames_dir, "frame_%05d.png"), "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path] run_ffmpeg_command(cmd, "Compiling Video") shutil.rmtree(job_temp_dir) return output_video_path def extract_first_last_frame(video_path): if not video_path: raise gr.Error("Please upload a video first.") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise gr.Error("Failed to open video file.") frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_count < 1: cap.release() raise gr.Error("Video has no frames.") # Set position to the first frame and read it cap.set(cv2.CAP_PROP_POS_FRAMES, 0) success, first_frame_img = cap.read() if not success: cap.release() raise gr.Error("Could not read the first frame.") # --- FIX for Last Frame (Robust Method) --- # Direct seeking to frame_count - 1 can be unreliable. # This method seeks near the end and then reads sequentially to find the true last frame. last_frame_img = None # Start checking from a few frames before the reported end to be safe. start_frame_for_last = max(1, frame_count - 10) cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame_for_last) # Loop through the last few frames to ensure we get the very last one while True: success, frame = cap.read() if not success: break last_frame_img = frame cap.release() # If the loop fails (e.g., for very short videos), fall back to using the first frame as the last. if last_frame_img is None: last_frame_img = first_frame_img # --- FIX for saving with proper extension --- # The function now saves the images to temporary files with correct names (.png) and returns the paths. # Gradio's Gallery will display these files, and downloading them will use the correct filename. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") first_frame_path = os.path.join(TEMP_DIR, f"first_frame_{timestamp}.png") last_frame_path = os.path.join(TEMP_DIR, f"last_frame_{timestamp}.png") # Convert from OpenCV's BGR format to RGB before saving with the PIL library Image.fromarray(cv2.cvtColor(first_frame_img, cv2.COLOR_BGR2RGB)).save(first_frame_path) Image.fromarray(cv2.cvtColor(last_frame_img, cv2.COLOR_BGR2RGB)).save(last_frame_path) # Return the list of file paths to be displayed in the gallery return [first_frame_path, last_frame_path] # ### --- NEW FEATURE FUNCTION --- ### def batch_extract_first_last_frames(videos, progress=gr.Progress(track_tqdm=True)): if not videos: raise gr.Error("Please upload at least one video.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"batch_fl_frames_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) output_paths = [] for video_file in progress.tqdm(videos, desc="Processing videos"): try: video_path = video_file.name base_name = os.path.splitext(os.path.basename(video_path))[0] cap = cv2.VideoCapture(video_path) if not cap.isOpened(): gr.Warning(f"Skipping '{base_name}': could not open video file.") continue frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_count < 1: cap.release() gr.Warning(f"Skipping '{base_name}': video has no frames.") continue # First frame cap.set(cv2.CAP_PROP_POS_FRAMES, 0) success, first_frame_img = cap.read() if not success: cap.release() gr.Warning(f"Skipping '{base_name}': could not read the first frame.") continue # Last frame (robust method) last_frame_img = None start_frame_for_last = max(1, frame_count - 10) cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame_for_last) while True: success, frame = cap.read() if not success: break last_frame_img = frame cap.release() if last_frame_img is None: last_frame_img = first_frame_img # Save frames first_frame_path = os.path.join(job_temp_dir, f"{base_name}_first.png") last_frame_path = os.path.join(job_temp_dir, f"{base_name}_last.png") Image.fromarray(cv2.cvtColor(first_frame_img, cv2.COLOR_BGR2RGB)).save(first_frame_path) Image.fromarray(cv2.cvtColor(last_frame_img, cv2.COLOR_BGR2RGB)).save(last_frame_path) output_paths.extend([first_frame_path, last_frame_path]) except Exception as e: gr.Warning(f"Skipping file {os.path.basename(video_file.name)} due to an error: {e}") if 'cap' in locals() and cap.isOpened(): cap.release() continue if not output_paths: shutil.rmtree(job_temp_dir) raise gr.Error("No frames could be extracted from the batch.") zip_base_name = os.path.join(TEMP_DIR, f"batch_fl_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) return output_paths, zip_path def video_to_frames_extractor(video_path, skip_rate, rotation, do_resize, out_w, out_h, out_format, jpg_quality): if not video_path: raise gr.Error("Please upload a video first.") if do_resize and (out_w <= 0 or out_h <= 0): raise gr.Error("If resizing, width and height must be positive.") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise gr.Error("Failed to open video file.") frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_count < 1: cap.release(); raise gr.Error("Video appears to have no frames.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"v2f_{timestamp}"); os.makedirs(job_temp_dir, exist_ok=True) frame_paths = [] saved_count = 0 for i in range(frame_count): success, frame = cap.read() if not success: break if i % skip_rate != 0: continue frame = rotate_image(frame, rotation) if do_resize: frame = cv2.resize(frame, (out_w, out_h), interpolation=cv2.INTER_LANCZOS4) frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) file_ext = out_format.lower() frame_path = os.path.join(job_temp_dir, f"frame_{saved_count:05d}.{file_ext}") if out_format == "JPG": frame_pil.save(frame_path, quality=jpg_quality) else: frame_pil.save(frame_path) frame_paths.append(frame_path) saved_count += 1 cap.release() if not frame_paths: shutil.rmtree(job_temp_dir); raise gr.Error("Could not extract any frames.") zip_base_name = os.path.join(TEMP_DIR, f"frames_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) return frame_paths[:100], zip_path def create_video_from_frames(files, fps, rotation, do_resize, out_w, out_h): if not files: raise gr.Error("Please upload frame images first.") if do_resize and (out_w <= 0 or out_h <= 0): raise gr.Error("If resizing, width and height must be positive.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"f2v_{timestamp}"); os.makedirs(job_temp_dir, exist_ok=True) filenames = [] for i, file in enumerate(files): ext = os.path.splitext(file.name)[1] temp_path = os.path.join(job_temp_dir, f"frame_{i:05d}{ext}") shutil.copy(file.name, temp_path); filenames.append(temp_path) output_video_path = os.path.join(TEMP_DIR, f"video_from_frames_{timestamp}.mp4") first_frame_img = rotate_image(cv2.imread(filenames[0]), rotation) h, w, _ = first_frame_img.shape if do_resize: w, h = out_w, out_h w -= w % 2; h -= h % 2 temp_processed_dir = os.path.join(job_temp_dir, "processed"); os.makedirs(temp_processed_dir, exist_ok=True) for i, filename in enumerate(filenames): frame = rotate_image(cv2.imread(filename), rotation) frame = cv2.resize(frame, (w, h), interpolation=cv2.INTER_LANCZOS4) cv2.imwrite(os.path.join(temp_processed_dir, f"pframe_{i:05d}.png"), frame) cmd = ["ffmpeg", "-framerate", str(fps), "-i", os.path.join(temp_processed_dir, "pframe_%05d.png"), "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path] run_ffmpeg_command(cmd, "Compiling Video") shutil.rmtree(job_temp_dir) return output_video_path def image_to_looping_video(image_array, duration, audio_path=None): if image_array is None: raise gr.Error("Please upload an image first.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") temp_image_path = os.path.join(TEMP_DIR, f"temp_image_{timestamp}.png") output_video_path = os.path.join(TEMP_DIR, f"looping_video_{timestamp}.mp4") img = Image.fromarray(image_array) img.save(temp_image_path) width, height = img.size width -= width % 2; height -= height % 2 cmd = ["ffmpeg", "-loop", "1", "-i", temp_image_path] if audio_path: cmd.extend(["-i", audio_path, "-c:a", "aac", "-shortest"]) cmd.extend(["-c:v", "libx264", "-t", str(duration), "-pix_fmt", "yuv420p", "-vf", f"scale={width}:{height}", "-y", output_video_path]) run_ffmpeg_command(cmd, "Creating Looping Video...") os.remove(temp_image_path) return output_video_path def create_zoom_videos(files, duration, zoom_ratio, zoom_direction, combine_videos, audio_path=None): if not files: raise gr.Error("Please upload at least one image.") fps = 30 total_frames = int(duration * fps) zoom_step = (zoom_ratio - 1.0) / total_frames zoom_coords = { "Center": "x=iw/2-(iw/zoom)/2:y=ih/2-(ih/zoom)/2", "Top": "x=iw/2-(iw/zoom)/2:y=0", "Bottom": "x=iw/2-(iw/zoom)/2:y=ih-(ih/zoom)", "Left": "x=0:y=ih/2-(ih/zoom)/2", "Right": "x=iw-(iw/zoom):y=ih/2-(ih/zoom)/2", "Top-Left": "x=0:y=0", "Top-Right": "x=iw-(iw/zoom):y=0", "Bottom-Left": "x=0:y=ih-(ih/zoom)", "Bottom-Right": "x=iw-(iw/zoom):y=ih-(ih/zoom)", } def process_single_image(input_path, output_path, **kwargs): audio_for_clip = kwargs.get('audio_for_clip') zoom_filter = (f"scale=3840:-1,zoompan=z='min(zoom+{zoom_step},{zoom_ratio})':{zoom_coords[zoom_direction]}:d={total_frames}:s=1920x1080:fps={fps}") cmd = ["ffmpeg", "-loop", "1", "-i", input_path] if audio_for_clip: cmd.extend(["-i", audio_for_clip, "-c:a", "aac", "-shortest"]) cmd.extend(["-vf", zoom_filter, "-c:v", "libx264", "-t", str(duration), "-pix_fmt", "yuv420p", "-b:v", "5M", "-y", output_path]) run_ffmpeg_command(cmd, f"Creating zoom video for {os.path.basename(input_path)}") batch_kwargs = {} if not combine_videos and audio_path: batch_kwargs['audio_for_clip'] = audio_path video_paths, zip_path, job_temp_dir = batch_image_processor(files, process_single_image, "zoom_videos", **batch_kwargs) if not combine_videos: return video_paths, None, zip_path if not video_paths: raise gr.Error("No videos were created to be combined.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") silent_combined_path = os.path.join(job_temp_dir, f"combined_silent_{timestamp}.mp4") if len(video_paths) > 1: file_list_path = os.path.join(job_temp_dir, "files.txt") with open(file_list_path, 'w', encoding='utf-8') as f: for path in video_paths: f.write(f"file '{os.path.abspath(path)}'\n") run_ffmpeg_command(["ffmpeg", "-f", "concat", "-safe", "0", "-i", file_list_path, "-c", "copy", "-y", silent_combined_path], "Combining Videos") else: shutil.copy(video_paths[0], silent_combined_path) if audio_path: final_video_path = os.path.join(TEMP_DIR, f"combined_audio_{timestamp}.mp4") run_ffmpeg_command(["ffmpeg", "-i", silent_combined_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-shortest", "-y", final_video_path], "Adding audio...") else: final_video_path = os.path.join(TEMP_DIR, f"combined_final_{timestamp}.mp4") shutil.move(silent_combined_path, final_video_path) return None, final_video_path, zip_path def change_video_speed(video_path, speed_multiplier): if not video_path: raise gr.Error("Please upload a video first.") if speed_multiplier <= 0: raise gr.Error("Speed multiplier must be positive.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"speed_change_{timestamp}.mp4") pts_value = 1 / speed_multiplier cmd = ["ffmpeg", "-i", video_path, "-filter:v", f"setpts={pts_value}*PTS", "-an", "-y", output_video_path] run_ffmpeg_command(cmd, "Changing Video Speed") return output_video_path def _get_atempo_filter_string(speed): """Helper function to create a chained atempo filter string for FFMPEG.""" filters = [] # 'atempo' is limited to [0.5, 100.0] if speed > 100.0: while speed > 100.0: filters.append("atempo=100.0") speed /= 100.0 elif speed < 0.5: while speed < 0.5: filters.append("atempo=0.5") speed /= 0.5 # Add the final filter for the remaining speed adjustment if speed != 1.0: # Avoid adding atempo=1.0 which does nothing filters.append(f"atempo={speed}") return ",".join(filters) if filters else None def process_slowmo_enhance_video(video_path, output_path, slowdown_factor, method, progress): """ Processes a single video for slow-motion and enhancement. """ if not video_path: raise gr.Error("Missing video path for processing.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"slowmo_{os.path.basename(video_path)}_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) original_fps = get_video_fps(video_path) final_fps = original_fps * slowdown_factor has_audio = has_audio_stream(video_path) try: if method == "AI-Enhanced (High Quality)": input_frames_dir = os.path.join(job_temp_dir, "input_frames") processed_frames_dir = os.path.join(job_temp_dir, "processed_frames") os.makedirs(input_frames_dir, exist_ok=True) os.makedirs(processed_frames_dir, exist_ok=True) load_enhance_ai_models() progress(0.1, desc="Extracting frames...") run_ffmpeg_command(["ffmpeg", "-i", video_path, os.path.join(input_frames_dir, "frame_%06d.png")]) input_frames = sorted([os.path.join(input_frames_dir, f) for f in os.listdir(input_frames_dir)]) if not input_frames: raise gr.Error("Could not extract any frames from the video.") progress(0.3, desc="AI Interpolating frames (This can be slow)...") for i in progress.tqdm(range(len(input_frames) - 1), unit="frame pairs"): frame0 = cv2.imread(input_frames[i]) frame1 = cv2.imread(input_frames[i+1]) shutil.copy(input_frames[i], os.path.join(processed_frames_dir, f"proc_{i:06d}_0.png")) interpolated_frames = rife_model.process(frame0, frame1, count=slowdown_factor-1) for j, int_frame in enumerate(interpolated_frames): cv2.imwrite(os.path.join(processed_frames_dir, f"proc_{i:06d}_{j+1}.png"), int_frame) shutil.copy(input_frames[-1], os.path.join(processed_frames_dir, f"proc_{len(input_frames)-1:06d}_0.png")) progress(0.8, desc="Compiling final video...") silent_video_path = os.path.join(job_temp_dir, "silent_video.mp4") cmd = ["ffmpeg", "-framerate", str(original_fps), "-pattern_type", "glob", "-i", os.path.join(processed_frames_dir, "*.png"), "-c:v", "libx264", "-crf", "18", "-pix_fmt", "yuv420p", "-y", silent_video_path] run_ffmpeg_command(cmd) if has_audio: progress(0.9, desc="Attaching slowed audio...") atempo_filter_str = _get_atempo_filter_string(1.0 / slowdown_factor) cmd_audio = ["ffmpeg", "-i", silent_video_path, "-i", video_path, "-filter:a", atempo_filter_str, "-c:v", "copy", "-map", "0:v:0", "-map", "1:a:0", "-y", output_path] run_ffmpeg_command(cmd_audio) else: shutil.move(silent_video_path, output_path) elif method == "Standard (Fast)": progress(0.5, desc="Processing with FFMPEG filter...") cmd = ["ffmpeg", "-y", "-i", video_path] vf_filter_chain = f"minterpolate=fps={final_fps}:mi_mode=mci,setpts={float(slowdown_factor)}*PTS" cmd.extend(["-vf", vf_filter_chain]) if has_audio: atempo_filter_str = _get_atempo_filter_string(1.0 / slowdown_factor) if atempo_filter_str: cmd.extend(["-af", atempo_filter_str]) else: cmd.append("-an") cmd.extend(["-r", str(original_fps)]) cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p", "-crf", "18", output_path]) run_ffmpeg_command(cmd) finally: if os.path.exists(job_temp_dir): shutil.rmtree(job_temp_dir) def batch_slowmo_enhance_videos(videos, slowdown_factor_str, method, progress=gr.Progress(track_tqdm=True)): if not videos: raise gr.Error("Please upload at least one video.") slowdown_factor = int(slowdown_factor_str.replace('x', '')) if "AI-Enhanced" in method and not ENHANCE_AI_AVAILABLE: raise gr.Error("AI-Enhanced method is not available. Please install 'rife-ncnn-vulkan-python' and restart the app.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"slowmo_batch_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) output_paths = [] for i, video_file in enumerate(videos): progress(i / len(videos), desc=f"Processing video {i+1}/{len(videos)}: {os.path.basename(video_file.name)}") base, _ = os.path.splitext(os.path.basename(video_file.name)) output_path = os.path.join(job_temp_dir, f"{base}_slowmo_{slowdown_factor}x.mp4") process_slowmo_enhance_video(video_file.name, output_path, slowdown_factor, method, progress) output_paths.append(output_path) if not output_paths: shutil.rmtree(job_temp_dir) raise gr.Error("No videos could be processed from the batch.") zip_base_name = os.path.join(TEMP_DIR, f"slowmo_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) return output_paths, zip_path def change_audio_speed(audio_path, speed_multiplier): if not audio_path: raise gr.Error("Please upload an audio file.") if speed_multiplier <= 0: raise gr.Error("Speed multiplier must be a positive number.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Get original extension _, extension = os.path.splitext(os.path.basename(audio_path)) if not extension: extension = ".mp3" # Fallback output_audio_path = os.path.join(TEMP_DIR, f"audio_speed_{speed_multiplier}x_{timestamp}{extension}") atempo_filter_str = _get_atempo_filter_string(speed_multiplier) if not atempo_filter_str: # If no speed change, just copy the file to avoid processing gr.Info("No speed change applied (multiplier is 1.0).") shutil.copy(audio_path, output_audio_path) return output_audio_path cmd = ["ffmpeg", "-i", audio_path, "-filter:a", atempo_filter_str, "-y", output_audio_path] run_ffmpeg_command(cmd, "Changing audio speed...") return output_audio_path # ### --- NEW FEATURE FUNCTION --- ### def chop_audio_on_silence(audio_path, silence_thresh, min_silence_len, progress=gr.Progress(track_tqdm=True)): if not audio_path: raise gr.Error("Please upload an audio file to chop.") progress(0, desc="Loading audio file...") try: sound = AudioSegment.from_file(audio_path) except Exception as e: raise gr.Error(f"Could not read audio file. It may be corrupt or in an unsupported format. Details: {e}") progress(0.2, desc="Detecting non-silent chunks...") audio_chunks = split_on_silence( sound, min_silence_len=int(min_silence_len), silence_thresh=int(silence_thresh), keep_silence=200 # Keep a bit of silence at the start/end of each chunk ) if not audio_chunks: raise gr.Error("No audio chunks were found above the silence threshold. Try using a lower (more negative) threshold value.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"audio_chop_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) output_paths = [] for i, chunk in enumerate(progress.tqdm(audio_chunks, desc="Exporting chunks...")): output_path = os.path.join(job_temp_dir, f"chunk_{i:04d}.mp3") chunk.export(output_path, format="mp3") output_paths.append(output_path) if not output_paths: shutil.rmtree(job_temp_dir) raise gr.Error("Failed to export any audio chunks.") zip_base_name = os.path.join(TEMP_DIR, f"audio_chop_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) # Return a preview gallery and the zip file return output_paths, zip_path def reverse_video(video_path, audio_option): if not video_path: raise gr.Error("Please upload a video first.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"reversed_video_{timestamp}.mp4") filters = ["reverse"] if audio_option == "Reverse Audio": filters.append("areverse") cmd = ["ffmpeg", "-i", video_path, "-vf", filters[0]] if len(filters) > 1: cmd.extend(["-af", filters[1]]) if audio_option == "Remove Audio": cmd.append("-an") cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path]) run_ffmpeg_command(cmd, "Reversing video...") return output_video_path def add_audio_to_video(video_path, audio_path): if not video_path: raise gr.Error("Please upload a video.") if not audio_path: raise gr.Error("Please upload an audio file.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"video_with_audio_{timestamp}.mp4") cmd = ["ffmpeg", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-shortest", "-y", output_video_path] run_ffmpeg_command(cmd, "Adding Audio to Video") return output_video_path def extract_audio(video_path, audio_format="mp3"): if not video_path: raise gr.Error("Please upload a video first.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_audio_path = os.path.join(TEMP_DIR, f"extracted_audio_{timestamp}.{audio_format}") cmd = ["ffmpeg", "-i", video_path, "-vn"] # -vn strips video if audio_format == "mp3": cmd.extend(["-c:a", "libmp3lame", "-q:a", "2"]) # VBR quality elif audio_format == "aac": cmd.extend(["-c:a", "aac", "-b:a", "192k"]) elif audio_format == "wav": cmd.extend(["-c:a", "pcm_s16le"]) cmd.extend(["-y", output_audio_path]) run_ffmpeg_command(cmd, "Extracting audio...") return output_audio_path def create_gif_from_video(video_path, start_time, end_time): if not video_path: raise gr.Error("Please upload a video first.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_gif_path, palette_path = os.path.join(TEMP_DIR, f"video_to_gif_{timestamp}.gif"), os.path.join(TEMP_DIR, f"palette_{timestamp}.png") duration_filter = [] if start_time > 0 or end_time > 0: if end_time > 0 and end_time <= start_time: raise gr.Error("End time must be after start time.") if start_time > 0: duration_filter.extend(["-ss", str(start_time)]) if end_time > 0: duration_filter.extend(["-to", str(end_time)]) run_ffmpeg_command(["ffmpeg", "-i", video_path] + duration_filter + ["-vf", "fps=15,scale=480:-1:flags=lanczos,palettegen", "-y", palette_path]) run_ffmpeg_command(["ffmpeg", "-i", video_path] + duration_filter + ["-i", palette_path, "-filter_complex", "fps=15,scale=480:-1:flags=lanczos[x];[x][1:v]paletteuse", "-y", output_gif_path]) os.remove(palette_path) return output_gif_path def get_frame_at_time(video_path, time_in_seconds=0): if not video_path: return None try: command = ['ffmpeg', '-ss', str(time_in_seconds), '-i', video_path, '-vframes', '1', '-f', 'image2pipe', '-c:v', 'png', '-'] pipe = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) return Image.open(io.BytesIO(pipe.stdout)).convert("RGB") except Exception as e: print(f"Error extracting frame for crop preview: {e}") cap = cv2.VideoCapture(video_path); cap.set(cv2.CAP_PROP_POS_MSEC, time_in_seconds * 1000) success, frame = cap.read(); cap.release() if success: return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) return None def crop_video(video_path, x, y, w, h, do_resize, out_w, out_h): if not video_path: raise gr.Error("Please upload a video first.") w, h, x, y = int(w), int(h), int(x), int(y) w -= w % 2; h -= h % 2 if w <= 0 or h <= 0: raise gr.Error("Crop dimensions must be positive.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"cropped_video_{timestamp}.mp4") vf_filters = [f"crop={w}:{h}:{x}:{y}"] if do_resize: if out_w <= 0 or out_h <= 0: raise gr.Error("Resize dimensions must be positive.") out_w, out_h = int(out_w), int(out_h) out_w -= out_w % 2; out_h -= out_h % 2 vf_filters.append(f"scale={out_w}:{out_h}") cmd = ["ffmpeg", "-i", video_path, "-vf", ",".join(vf_filters), "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path] run_ffmpeg_command(cmd, "Cropping video...") return output_video_path def trim_video(video_path, start_time, end_time): if not video_path: raise gr.Error("Please upload a video first.") if start_time < 0: start_time = 0 if end_time <= start_time: end_time = 0 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"trimmed_video_{timestamp}.mp4") cmd = ["ffmpeg", "-i", video_path, "-ss", str(start_time)] if end_time > 0: cmd.extend(["-to", str(end_time)]) cmd.extend(["-c:v", "libx264", "-c:a", "copy", "-pix_fmt", "yuv420p", "-y", output_video_path]) run_ffmpeg_command(cmd, "Trimming Video") return output_video_path def apply_video_watermark(video_path, text, position, opacity, size_scale, color): if not video_path: raise gr.Error("Please upload a video first.") if not text: raise gr.Error("Watermark text cannot be empty.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"watermarked_video_{timestamp}.mp4") _ , video_h = get_video_dimensions(video_path) if video_h == 0: video_h = 720 # Fallback escaped_text = text.replace("'", r"'\''").replace(":", r"\:").replace(",", r"\,") pos_map = {"Top-Left": "x=20:y=20", "Top-Right": "x=w-tw-20:y=20", "Bottom-Left": "x=20:y=h-th-20", "Bottom-Right": "x=w-tw-20:y=h-th-20", "Center": "x=(w-tw)/2:y=(h-th)/2"} font_opacity = opacity / 100.0 font_size = int(video_h / (50 - (size_scale * 3.5))) drawtext_filter = ( f"drawtext=" f"text='{escaped_text}':" f"{pos_map[position]}:" f"fontsize={font_size}:" f"fontcolor={color}@{font_opacity}" ) cmd = [ "ffmpeg", "-i", video_path, "-vf", drawtext_filter, "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path ] run_ffmpeg_command(cmd, "Applying text watermark...") return output_video_path def remove_video_background(video_path): if not video_path: raise gr.Error("Please upload a video first.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"bg_rem_job_{timestamp}"); input_frames_dir, output_frames_dir = os.path.join(job_temp_dir, "input_frames"), os.path.join(job_temp_dir, "output_frames") os.makedirs(input_frames_dir, exist_ok=True); os.makedirs(output_frames_dir, exist_ok=True) cap = cv2.VideoCapture(video_path); frame_count, fps = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), get_video_fps(video_path) for i in range(frame_count): success, frame = cap.read() if not success: break cv2.imwrite(os.path.join(input_frames_dir, f"frame_{i:05d}.png"), frame) cap.release() for filename in sorted(os.listdir(input_frames_dir)): with Image.open(os.path.join(input_frames_dir, filename)) as img: remove(img).save(os.path.join(output_frames_dir, filename)) output_video_path = os.path.join(TEMP_DIR, f"bg_removed_{timestamp}.webm") cmd = ["ffmpeg", "-framerate", str(fps), "-i", os.path.join(output_frames_dir, "frame_%05d.png"), "-c:v", "libvpx-vp9", "-pix_fmt", "yuva420p", "-auto-alt-ref", "0", "-b:v", "1M", "-y", output_video_path] run_ffmpeg_command(cmd, "Compiling transparent video...") shutil.rmtree(job_temp_dir) return output_video_path def generate_ass_from_whisper(result): """Generates an ASS subtitle file content from a Whisper result object with word timestamps.""" ass_content = [ "[Script Info]", "Title: Generated by Skriptz", "ScriptType: v4.00+", "WrapStyle: 0", "PlayResX: 1920", "PlayResY: 1080", "\n[V4+ Styles]", "Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding", "Style: Default,Arial,55,&H00FFFFFF,&H0000FFFF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,1,2,10,10,25,1", "\n[Events]", "Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text" ] def format_time(s): h, r = divmod(s, 3600) m, s = divmod(r, 60) cs = int((s - int(s)) * 100) return f"{int(h)}:{int(m):02}:{int(s):02}.{cs:02}" for segment in result['segments']: start_time = format_time(segment['start']) end_time = format_time(segment['end']) karaoke_line = "" for word_info in segment['words']: word = word_info['word'].strip() duration_cs = int((word_info['end'] - word_info['start']) * 100) karaoke_line += f"{{\\k{duration_cs}}}{word} " dialogue_line = f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{karaoke_line.strip()}" ass_content.append(dialogue_line) return "\n".join(ass_content) def transcribe_media(media_path, model_name): if media_path is None: raise gr.Error("Please upload a video or audio file first.") model = load_whisper_model(model_name) if model is None: raise gr.Error("Whisper model is not available.") audio_path = media_path.name base_name = os.path.splitext(os.path.basename(media_path.name))[0] if audio_path.lower().endswith(('.mp4', '.mov', '.mkv', '.avi', '.webm')): audio_path_temp = os.path.join(TEMP_DIR, f"{base_name}.mp3") try: run_ffmpeg_command(["ffmpeg", "-y", "-i", audio_path, "-q:a", "0", "-map", "a", audio_path_temp]) audio_path = audio_path_temp except gr.Error as e: if "does not contain any stream" in str(e): raise gr.Error("The uploaded video has no audio track.") else: raise e result = model.transcribe(audio_path, word_timestamps=True, verbose=False) def format_ts(s, separator=','): h, r = divmod(s, 3600); m, s = divmod(r, 60) return f"{int(h):02}:{int(m):02}:{int(s):02}{separator}{int((s-int(s))*1000):03}" srt_path = os.path.join(TEMP_DIR, f"{base_name}.srt") vtt_path = os.path.join(TEMP_DIR, f"{base_name}.vtt") ass_path = os.path.join(TEMP_DIR, f"{base_name}.ass") with open(srt_path, "w", encoding="utf-8") as srt_f, open(vtt_path, "w", encoding="utf-8") as vtt_f: vtt_f.write("WEBVTT\n\n") for i, seg in enumerate(result["segments"]): start, end, text = seg['start'], seg['end'], seg['text'].strip() srt_f.write(f"{i + 1}\n{format_ts(start)} --> {format_ts(end)}\n{text}\n\n") vtt_f.write(f"{format_ts(start, '.')} --> {format_ts(end, '.')}\n{text}\n\n") ass_content = generate_ass_from_whisper(result) with open(ass_path, "w", encoding="utf-8") as ass_f: ass_f.write(ass_content) return result["text"], [srt_path, vtt_path, ass_path] def transcribe_and_prep_burn(media_file, model_name): if not media_file: raise gr.Error("Please upload a file first.") is_video = media_file.name.lower().endswith(('.mp4', '.mov', '.mkv', '.avi', '.webm')) text, files = transcribe_media(media_file, model_name) if is_video: return text, files, media_file.name, gr.update(visible=True) else: return text, files, None, gr.update(visible=False) def reformat_srt_for_word_wrap(original_srt_path, words_per_line): if not original_srt_path or not os.path.exists(original_srt_path): return None timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") reformatted_path = os.path.join(TEMP_DIR, f"reformatted_{timestamp}.srt") with open(original_srt_path, 'r', encoding='utf-8') as f_in, \ open(reformatted_path, 'w', encoding='utf-8') as f_out: content = f_in.read().strip().split('\n\n') for block in content: lines = block.split('\n') if len(lines) < 3: f_out.write(block + '\n\n') continue text_lines = ' '.join(lines[2:]) words = text_lines.split() new_text_lines = [] current_line = [] for word in words: current_line.append(word) if len(current_line) >= words_per_line: new_text_lines.append(' '.join(current_line)) current_line = [] if current_line: new_text_lines.append(' '.join(current_line)) reformatted_text = '\n'.join(new_text_lines) f_out.write(f"{lines[0]}\n{lines[1]}\n{reformatted_text}\n\n") return reformatted_path def burn_block_subtitles(video_path, srt_file_obj, font_size_scale, font_color, words_per_line): original_srt_path = srt_file_obj[0].name reformatted_srt_path = None try: reformatted_srt_path = reformat_srt_for_word_wrap(original_srt_path, words_per_line) if not reformatted_srt_path: raise gr.Error("Failed to reformat subtitle file.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"subtitled_video_{timestamp}.mp4") _, video_h = get_video_dimensions(video_path) if video_h == 0: video_h = 720 divisor = 32 - (font_size_scale * 2) calculated_font_size = int(video_h / divisor) color_bgr = font_color[5:7] + font_color[3:5] + font_color[1:3] ffmpeg_color = f"&H00{color_bgr.upper()}" escaped_srt_path = reformatted_srt_path.replace('\\', '/').replace(':', r'\\:') vf_filter = f"subtitles='{escaped_srt_path}':force_style='Fontsize={calculated_font_size},PrimaryColour={ffmpeg_color},BorderStyle=1,Outline=1,Shadow=0.5,MarginV=25'" cmd = ["ffmpeg", "-y", "-i", video_path, "-vf", vf_filter, "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", output_video_path] run_ffmpeg_command(cmd, "Burning block subtitles into video...") return output_video_path finally: if reformatted_srt_path and os.path.exists(reformatted_srt_path): os.remove(reformatted_srt_path) def burn_karaoke_subtitles(video_path, subtitle_files, font_size_scale, base_color, highlight_color): ass_file_path = subtitle_files[2].name temp_ass_path = None try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") temp_ass_path = os.path.join(TEMP_DIR, f"style_applied_{timestamp}.ass") output_video_path = os.path.join(TEMP_DIR, f"karaoke_video_{timestamp}.mp4") _, video_h = get_video_dimensions(video_path) if video_h == 0: video_h = 720 calculated_font_size = int((video_h / 20) * (font_size_scale / 5)) def format_ass_color(hex_color): if hex_color.startswith('#'): hex_color = hex_color[1:] r, g, b = hex_color[0:2], hex_color[2:4], hex_color[4:6] return f"&H00{b.upper()}{g.upper()}{r.upper()}" primary_color_ass = format_ass_color(highlight_color) secondary_color_ass = format_ass_color(base_color) with open(ass_file_path, 'r', encoding='utf-8') as f_in, open(temp_ass_path, 'w', encoding='utf-8') as f_out: for line in f_in: if line.startswith("Style:"): parts = line.split(',') parts[2] = str(calculated_font_size) # Fontsize parts[3] = secondary_color_ass # PrimaryColour (Base text) parts[4] = primary_color_ass # SecondaryColour (Karaoke fill) if len(parts) > 17: parts[16] = '0' # Outline width parts[17] = '0' # Shadow width f_out.write(','.join(parts)) else: f_out.write(line) escaped_ass_path = temp_ass_path.replace('\\', '/').replace(':', r'\\:') vf_filter = f"subtitles='{escaped_ass_path}'" cmd = ["ffmpeg", "-y", "-i", video_path, "-vf", vf_filter, "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", output_video_path] run_ffmpeg_command(cmd, "Burning karaoke subtitles into video...") return output_video_path finally: if temp_ass_path and os.path.exists(temp_ass_path): os.remove(temp_ass_path) def burn_subtitles_wrapper(video_path, subtitle_files, style, font_size_scale, block_font_color, block_words_per_line, kara_base_color, kara_highlight_color): if not video_path or not subtitle_files: raise gr.Error("Missing video or subtitle files. Please transcribe first.") if style == "Block": return burn_block_subtitles(video_path, subtitle_files, font_size_scale, block_font_color, block_words_per_line) elif style == "Karaoke": return burn_karaoke_subtitles(video_path, subtitle_files, font_size_scale, kara_base_color, kara_highlight_color) else: raise gr.Error("Invalid subtitle style selected.") def remove_background_single(input_path, output_path, **kwargs): with Image.open(input_path) as img: remove(img).save(output_path) def remove_background_batch(files): output_paths, zip_path, _ = batch_image_processor(files, remove_background_single, "bg_removed") return output_paths, zip_path def resize_convert_single_image(input_path, output_path, **kwargs): output_format = kwargs.get('output_format', 'JPG') quality = kwargs.get('quality', 95) enable_resize = kwargs.get('enable_resize', False) max_w = kwargs.get('max_w', 1024) max_h = kwargs.get('max_h', 1024) resize_mode = kwargs.get('resize_mode', "Fit (preserve aspect ratio)") with Image.open(input_path) as img: if output_format in ['JPG', 'WEBP'] and img.mode in ['RGBA', 'P', 'LA']: img = img.convert("RGB") if enable_resize: if resize_mode == "Fit (preserve aspect ratio)": img.thumbnail((max_w, max_h), Image.Resampling.LANCZOS) else: # Stretch img = img.resize((max_w, max_h), Image.Resampling.LANCZOS) save_kwargs = {} pil_format = 'JPEG' if output_format == 'JPG' else output_format if pil_format in ['JPEG', 'WEBP']: save_kwargs['quality'] = quality img.save(output_path, pil_format, **save_kwargs) def batch_resize_convert_images(files, output_format, quality, enable_resize, max_w, max_h, resize_mode): if not files: raise gr.Error("Please upload at least one image.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_name = "resized_converted" job_temp_dir = os.path.join(TEMP_DIR, f"{job_name}_{timestamp}"); os.makedirs(job_temp_dir, exist_ok=True) output_paths = [] # Enable resizing if dimensions are provided, even if checkbox is somehow out of sync if max_w > 0 and max_h > 0: enable_resize = True processing_kwargs = { 'output_format': output_format, 'quality': quality, 'enable_resize': enable_resize, 'max_w': max_w, 'max_h': max_h, 'resize_mode': resize_mode } for file_obj in files: try: base, _ = os.path.splitext(os.path.basename(file_obj.name)) output_filename = f"{base}.{output_format.lower()}" output_path = os.path.join(job_temp_dir, output_filename) resize_convert_single_image(file_obj.name, output_path, **processing_kwargs) output_paths.append(output_path) except Exception as e: print(f"Skipping file {file_obj.name} due to error: {e}"); continue if not output_paths: shutil.rmtree(job_temp_dir); raise gr.Error("No images could be processed.") zip_base_name = os.path.join(TEMP_DIR, f"{job_name}_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) return output_paths[:100], zip_path def apply_watermark_single(input_path, output_path, watermark_text, position, opacity): with Image.open(input_path).convert("RGBA") as image: if not watermark_text: raise ValueError("Watermark text cannot be empty.") txt = Image.new("RGBA", image.size, (255, 255, 255, 0)) try: font = ImageFont.truetype("DejaVuSans.ttf", int(image.width / 20)) except IOError: font = ImageFont.load_default() d = ImageDraw.Draw(txt); bbox = d.textbbox((0, 0), watermark_text, font=font); w, h = bbox[2]-bbox[0], bbox[3]-bbox[1] pos_map = {"Top-Left":(10,10), "Top-Right":(image.width-w-10,10), "Bottom-Left":(10,image.height-h-10), "Bottom-Right":(image.width-w-10,image.height-h-10), "Center":((image.width-w)/2,(image.height-h)/2)} d.text(pos_map[position], watermark_text, font=font, fill=(255, 255, 255, int(255 * (opacity / 100)))) Image.alpha_composite(image, txt).convert("RGB").save(output_path) def apply_watermark_batch(files, watermark_text, position, opacity): if not watermark_text: raise gr.Error("Please provide watermark text.") processing_func = lambda input_path, output_path: apply_watermark_single( input_path, output_path, watermark_text=watermark_text, position=position, opacity=opacity ) output_paths, zip_path, _ = batch_image_processor(files, processing_func, "watermarked") return output_paths, zip_path # --- BATCH CONVERTER REPLACEMENT FUNCTIONS --- def convert_compress_video(video_path, out_format, v_codec, crf_value, scale_option, a_codec, a_bitrate, output_dir=None, base_name=None): if not video_path: raise gr.Error("Please upload a video to convert.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") _output_dir = output_dir if output_dir else TEMP_DIR _base_name = base_name if base_name else f"converted_{timestamp}" output_filename = f"{_base_name}.{out_format.lower()}" output_path = os.path.join(_output_dir, output_filename) cmd = ["ffmpeg", "-i", video_path] vf_filters = [] if scale_option != "Original": w, h = get_video_dimensions(video_path) if w > 0 and h > 0: target_h = int(scale_option.replace('p', '')) target_w = round(w * target_h / h / 2) * 2 vf_filters.append(f"scale={target_w}:{target_h}") vf_filters.append("pad=ceil(iw/2)*2:ceil(ih/2)*2") vf_filters.append("setsar=1") if vf_filters: cmd.extend(["-vf", ",".join(vf_filters)]) cmd.extend(["-c:v", v_codec]) if v_codec in ["libx264", "libx265"]: cmd.extend(["-crf", str(crf_value)]) cmd.extend(["-pix_fmt", "yuv420p"]) if has_audio_stream(video_path): if a_codec == "copy": cmd.extend(["-c:a", "copy"]) else: cmd.extend(["-c:a", a_codec, "-b:a", f"{a_bitrate}k"]) else: cmd.append("-an") if out_format.lower() in ["mp4", "mov"]: cmd.extend(["-movflags", "+faststart"]) cmd.extend(["-y", output_path]) run_ffmpeg_command(cmd, f"Converting {os.path.basename(video_path)}.") return output_path def batch_convert_compress_videos(files, out_format, v_codec, crf_value, scale_option, a_codec, a_bitrate, progress=gr.Progress(track_tqdm=True)): if not files: raise gr.Error("Please upload at least one video to convert.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"batch_convert_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) output_paths = [] for video_file in progress.tqdm(files, desc="Converting videos"): try: base_name = os.path.splitext(os.path.basename(video_file.name))[0] output_path = convert_compress_video( video_path=video_file.name, out_format=out_format, v_codec=v_codec, crf_value=crf_value, scale_option=scale_option, a_codec=a_codec, a_bitrate=a_bitrate, output_dir=job_temp_dir, base_name=base_name ) output_paths.append(output_path) except Exception as e: gr.Warning(f"Skipping file {os.path.basename(video_file.name)} due to an error: {e}") continue if not output_paths: shutil.rmtree(job_temp_dir) raise gr.Error("No videos could be processed from the batch.") zip_base_name = os.path.join(TEMP_DIR, f"video_convert_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) return output_paths, zip_path def convert_audio(media_path, out_format, a_bitrate, output_dir=None, base_name=None): if not media_path: raise gr.Error("Please provide a media file to convert.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") _output_dir = output_dir if output_dir else TEMP_DIR _base_name = base_name if base_name else f"audio_converted_{timestamp}" output_filename = f"{_base_name}.{out_format.lower()}" output_path = os.path.join(_output_dir, output_filename) cmd = ["ffmpeg", "-i", media_path, "-vn"] if out_format == "mp3": cmd.extend(["-c:a", "libmp3lame", "-b:a", f"{a_bitrate}k"]) elif out_format == "aac": cmd.extend(["-c:a", "aac", "-b:a", f"{a_bitrate}k"]) elif out_format == "ogg": cmd.extend(["-c:a", "libopus", "-b:a", f"{a_bitrate}k"]) elif out_format == "wav": cmd.extend(["-c:a", "pcm_s16le"]) elif out_format == "flac": cmd.extend(["-c:a", "flac"]) cmd.extend(["-y", output_path]) run_ffmpeg_command(cmd, f"Converting audio from {os.path.basename(media_path)}...") return output_path def batch_convert_audio(files, out_format, a_bitrate, progress=gr.Progress(track_tqdm=True)): if not files: raise gr.Error("Please upload at least one file to convert.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"batch_audio_convert_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) output_paths = [] for media_file in progress.tqdm(files, desc="Converting audio"): try: is_video = get_file_type(media_file.name) == 'video' if is_video and not has_audio_stream(media_file.name): gr.Warning(f"Skipping video '{os.path.basename(media_file.name)}' as it has no audio track.") continue base_name = os.path.splitext(os.path.basename(media_file.name))[0] output_path = convert_audio( media_path=media_file.name, out_format=out_format, a_bitrate=a_bitrate, output_dir=job_temp_dir, base_name=base_name ) output_paths.append(output_path) except Exception as e: gr.Warning(f"Skipping file {os.path.basename(media_file.name)} due to an error: {e}") continue if not output_paths: shutil.rmtree(job_temp_dir) raise gr.Error("No files could be processed from the batch.") zip_base_name = os.path.join(TEMP_DIR, f"audio_convert_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) return output_paths[0], zip_path # --- END BATCH CONVERTER REPLACEMENT FUNCTIONS --- def apply_video_fade(video_path, fade_in_duration, fade_out_duration): if not video_path: raise gr.Error("Please upload a video.") video_duration = get_media_duration(video_path) if fade_in_duration + fade_out_duration > video_duration: raise gr.Error("The sum of fade durations cannot be greater than the video duration.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"faded_video_{timestamp}.mp4") fade_filters = [] if fade_in_duration > 0: fade_filters.append(f"fade=t=in:st=0:d={fade_in_duration}") if fade_out_duration > 0: fade_out_start = video_duration - fade_out_duration; fade_filters.append(f"fade=t=out:st={fade_out_start}:d={fade_out_duration}") if not fade_filters: gr.Info("No fade applied."); return video_path cmd = ["ffmpeg", "-i", video_path, "-vf", ",".join(fade_filters), "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path] run_ffmpeg_command(cmd, "Applying video fade...") return output_video_path # --- ACCURATE Color Grading Functions --- def preview_color_grading_ffmpeg(image_np, brightness, contrast, saturation, sharpness): """Applies color grading to a single frame using FFMPEG for an accurate preview.""" if image_np is None: return None timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") input_path = os.path.join(TEMP_DIR, f"cg_preview_in_{timestamp}.png") output_path = os.path.join(TEMP_DIR, f"cg_preview_out_{timestamp}.png") try: Image.fromarray(image_np).save(input_path) eq_filters, other_filters = [], [] if brightness != 0.0: eq_filters.append(f"brightness={brightness}") if contrast != 1.0: eq_filters.append(f"contrast={contrast}") if saturation != 1.0: eq_filters.append(f"saturation={saturation}") if sharpness > 0.0: other_filters.append(f"unsharp=5:5:{sharpness}") vf_parts = [] if eq_filters: vf_parts.append("eq=" + ":".join(eq_filters)) if other_filters: vf_parts.extend(other_filters) if not vf_parts: return Image.fromarray(image_np) vf_string = ",".join(vf_parts) cmd = ["ffmpeg", "-i", input_path, "-vf", vf_string, "-y", output_path] subprocess.run(cmd, capture_output=True, text=True, check=False) if os.path.exists(output_path): with Image.open(output_path) as img: return img.copy() else: return Image.fromarray(image_np) except Exception as e: print(f"Error in FFMPEG preview: {e}") return Image.fromarray(image_np) finally: if os.path.exists(input_path): os.remove(input_path) if os.path.exists(output_path): os.remove(output_path) def apply_color_grading(video_path, brightness, contrast, saturation, sharpness): """Applies color grading to a full video using FFMPEG.""" if not video_path: raise gr.Error("Please upload a video first.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"graded_video_{timestamp}.mp4") eq_filters, other_filters = [], [] if brightness != 0.0: eq_filters.append(f"brightness={brightness}") if contrast != 1.0: eq_filters.append(f"contrast={contrast}") if saturation != 1.0: eq_filters.append(f"saturation={saturation}") if sharpness > 0.0: other_filters.append(f"unsharp=5:5:{sharpness}") vf_parts = [] if eq_filters: vf_parts.append("eq=" + ":".join(eq_filters)) if other_filters: vf_parts.extend(other_filters) if not vf_parts: gr.Info("No adjustments made. Returning original video path.") return video_path vf_string = ",".join(vf_parts) cmd = ["ffmpeg", "-i", video_path, "-vf", vf_string, "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path] run_ffmpeg_command(cmd, "Applying Color Grading...") return output_video_path # --- END ACCURATE Color Grading Functions --- def trim_and_fade_audio(audio_path, start_time, end_time, fade_in_duration, fade_out_duration): if not audio_path: raise gr.Error("Please upload an audio file.") audio_duration = get_media_duration(audio_path) if start_time < 0: start_time = 0 if end_time <= 0 or end_time > audio_duration: end_time = audio_duration if start_time >= end_time: raise gr.Error("Start time must be less than end time.") trimmed_duration = end_time - start_time if fade_in_duration + fade_out_duration > trimmed_duration: raise gr.Error("Sum of fade durations cannot be greater than the trimmed audio duration.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_audio_path = os.path.join(TEMP_DIR, f"edited_audio_{timestamp}.mp3") af_filters = [] if fade_in_duration > 0: af_filters.append(f"afade=t=in:st=0:d={fade_in_duration}") if fade_out_duration > 0: fade_out_start = trimmed_duration - fade_out_duration; af_filters.append(f"afade=t=out:st={fade_out_start}:d={fade_out_duration}") cmd = ["ffmpeg", "-ss", str(start_time), "-to", str(end_time), "-i", audio_path] if af_filters: cmd.extend(["-af", ",".join(af_filters)]) cmd.extend(["-y", output_audio_path]) run_ffmpeg_command(cmd, "Trimming and fading audio...") return output_audio_path # In app.py, replace the existing create_gradual_ramp_video function with this one. def create_gradual_ramp_video(video_path, progress=gr.Progress(track_tqdm=True)): """ Creates a video with a gradual speed ramp: 1x -> 0.5x -> 1x. The effect is applied over the entire duration of the video. This uses a piecewise approximation with frame interpolation for a smooth result. --- ROBUSTNESS ENHANCEMENT --- A hybrid approach is used: 1. For videos < 2.0s: A simplified, robust method applies an *average* speed change across the whole clip. This avoids errors from creating many tiny, unstable segments. 2. For videos >= 2.0s: The original advanced segmentation logic is used to create a more detailed and noticeable ramp effect. """ if not video_path: raise gr.Error("Please upload a video to process.") progress(0, desc="Analyzing video properties...") duration = get_media_duration(video_path) if duration == 0: raise gr.Error("Could not determine video duration. The file may be corrupt.") fps = get_video_fps(video_path) has_audio = has_audio_stream(video_path) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"gradual_ramp_{timestamp}.mp4") # --- THIS IS THE FIX: A DEDICATED PATH FOR SHORT VIDEOS --- # For very short videos, the complex ramp is barely noticeable and prone to ffmpeg errors. # We switch to a simpler, more stable method by applying an average speed change. if duration < 2.0: gr.Info("Video is short (< 2s). Applying a simplified, robust ramp effect.") progress(0.2, desc="Applying simplified ramp for short video...") # The integral of the speed curve from 1->0.5->1 gives a total duration multiplier of 1.5. # So, the average speed is original_duration / new_duration = 1 / 1.5 = 2/3. avg_speed = 2.0 / 3.0 # Calculate the target interpolated FPS to create new frames for the slowdown. interpolated_fps = fps / avg_speed filter_complex_parts = [] # Video filter: interpolate to the new framerate, then adjust timestamps to slow it down. video_filter = f"[0:v]minterpolate=fps={interpolated_fps}:mi_mode=mci,setpts=PTS/{avg_speed}[vout]" filter_complex_parts.append(video_filter) # Audio filter: apply the same speed change to the audio. if has_audio: atempo_str = _get_atempo_filter_string(avg_speed) audio_filter = f"[0:a]asetpts=PTS" if atempo_str: audio_filter += f",{atempo_str}" audio_filter += f"[aout]" filter_complex_parts.append(audio_filter) filter_complex_str = ";".join(filter_complex_parts) # Build the simplified ffmpeg command cmd = ["ffmpeg", "-y", "-i", video_path, "-filter_complex", filter_complex_str, "-map", "[vout]"] if has_audio: cmd.extend(["-map", "[aout]"]) cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p", "-crf", "18", output_video_path]) progress(0.6, desc="Executing simplified FFMPEG command...") run_ffmpeg_command(cmd, "Applying simplified speed ramp...") return output_video_path # --- Standard logic for videos >= 2.0 seconds --- progress(0.1, desc="Planning detailed speed ramp...") # Determine the number of segments to approximate the curve. # More segments = smoother, but more complex command. Capped at 60 for performance. target_segment_duration = 0.25 # Aim for segments of this length num_segments = int(duration / target_segment_duration) if num_segments % 2 != 0: num_segments += 1 # Ensure even number of segments for symmetry num_segments = max(10, min(num_segments, 60)) # Clamp between 10 and 60 min_speed = 0.5 half_segments = num_segments / 2.0 filter_complex_parts = [] video_outputs, audio_outputs = [], [] # Loop through each segment and build the corresponding filter chain for i in progress.tqdm(range(num_segments), desc="Building FFMPEG filter command..."): start_time = i * duration / num_segments end_time = (i + 1) * duration / num_segments # Parabolic speed calculation (y = ax^2 + c) for smooth ease-in/out x = (i - half_segments + 0.5) / half_segments speed = (1.0 - min_speed) * (x ** 2) + min_speed speed = max(0.01, speed) # Prevent speed from being zero # Video processing for this segment interpolated_fps_seg = fps / speed setpts_val_seg = 1.0 / speed video_filter = ( f"[0:v]trim=start={start_time}:end={end_time},setpts=PTS-STARTPTS," # Cut the segment f"minterpolate=fps={interpolated_fps_seg}:mi_mode=mci," # Interpolate frames for smoothness f"setpts={setpts_val_seg}*PTS[v{i}]" # Adjust speed ) filter_complex_parts.append(video_filter) video_outputs.append(f"[v{i}]") # Audio processing for this segment if has_audio: atempo_str_seg = _get_atempo_filter_string(speed) audio_filter = f"[0:a]atrim=start={start_time}:end={end_time},asetpts=PTS-STARTPTS" if atempo_str_seg: audio_filter += f",{atempo_str_seg}" audio_filter += f"[a{i}]" filter_complex_parts.append(audio_filter) audio_outputs.append(f"[a{i}]") progress(0.5, desc="Finalizing filter command...") # Concatenate all processed video and audio segments concat_filter_v = f"{''.join(video_outputs)}concat=n={num_segments}:v=1:a=0[vout]" filter_complex_parts.append(concat_filter_v) if has_audio and audio_outputs: concat_filter_a = f"{''.join(audio_outputs)}concat=n={num_segments}:v=0:a=1[aout]" filter_complex_parts.append(concat_filter_a) filter_complex_str = ";".join(filter_complex_parts) # Build the final complex ffmpeg command cmd = ["ffmpeg", "-y", "-i", video_path, "-filter_complex", filter_complex_str, "-map", "[vout]"] if has_audio and audio_outputs: cmd.extend(["-map", "[aout]"]) cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p", "-crf", "18", output_video_path]) progress(0.6, desc="Executing FFMPEG... This may take a while.") run_ffmpeg_command(cmd, "Applying gradual speed ramp...") return output_video_path # --- FLUX API --- FLUX_MODELS = {"FLUX.1-schnell (Fast)": "black-forest-labs/FLUX.1-schnell", "FLUX.1-dev (High Quality)": "black-forest-labs/FLUX.1-dev"} def call_flux_api(prompt, model_choice, width, height, hf_token): if not hf_token: raise gr.Error("Hugging Face User Access Token is required.") try: client = Client(FLUX_MODELS[model_choice], hf_token=hf_token) return client.predict(prompt=prompt, seed=0, randomize_seed=True, width=width, height=height, num_inference_steps=8 if "dev" in model_choice else 4, api_name="/infer")[0] except Exception as e: raise gr.Error(f"API call failed: {e}") def get_image_as_base64(path): try: with open(path, "rb") as f: return f"data:image/png;base64,{base64.b64encode(f.read()).decode('utf-8')}" except FileNotFoundError: return None # --- Transfer Tab Functions (Simplified) --- def filter_presets(query, all_presets): if not query: return gr.update(choices=sorted(list(all_presets.keys()))) filtered_keys = [key for key in all_presets.keys() if query.lower() in key.lower()] return gr.update(choices=sorted(filtered_keys)) def save_preset(presets, name, url): if not name or not name.strip(): gr.Warning("Preset name cannot be empty."); return presets, gr.update() if not url or not url.strip(): gr.Warning("Target URL cannot be empty."); return presets, gr.update() presets[name] = url gr.Info(f"Preset '{name}' saved!") return presets, gr.update(choices=sorted(list(presets.keys()))) def delete_preset(presets, name): if name in presets: del presets[name] gr.Info(f"Preset '{name}' deleted!") return presets, gr.update(choices=sorted(list(presets.keys())), value=None), "" gr.Warning(f"Preset '{name}' not found.") return presets, gr.update(), gr.update() def load_preset(presets, name): return presets.get(name, "") # --- Join/Beat-Sync/Etc Video Feature Functions --- def ping_pong_video(video_path, audio_option): if not video_path: raise gr.Error("Please upload a video.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"pingpong_{timestamp}"); os.makedirs(job_temp_dir, exist_ok=True) reversed_video_path = os.path.join(job_temp_dir, "reversed_temp.mp4") cmd_reverse = ["ffmpeg", "-i", video_path, "-vf", "reverse"] if audio_option == "Reverse Audio": cmd_reverse.extend(["-af", "areverse"]) else: cmd_reverse.append("-an") cmd_reverse.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", reversed_video_path]) run_ffmpeg_command(cmd_reverse) file_list_path = os.path.join(job_temp_dir, "files.txt") with open(file_list_path, 'w', encoding='utf-8') as f: f.write(f"file '{os.path.abspath(video_path)}'\n") f.write(f"file '{os.path.abspath(reversed_video_path)}'\n") output_video_path = os.path.join(TEMP_DIR, f"pingpong_video_{timestamp}.mp4") cmd_join = ["ffmpeg", "-f", "concat", "-safe", "0", "-i", file_list_path, "-c", "copy", "-y", output_video_path] if audio_option == "Original Audio Only": cmd_join = ["ffmpeg", "-i", video_path, "-i", reversed_video_path, "-filter_complex", "[0:v][1:v]concat=n=2:v=1[v]", "-map", "[v]", "-map", "0:a?", "-c:a", "copy", "-y", output_video_path] run_ffmpeg_command(cmd_join) shutil.rmtree(job_temp_dir) return output_video_path # ### --- NEW FEATURE: VIDEO STABILIZATION --- ### def stabilize_video(video_path, shakiness, smoothing): """ Stabilizes a video using a two-pass FFMPEG process. Pass 1: Detects motion vectors. Pass 2: Uses the motion vectors to smooth the video. """ if not video_path: raise gr.Error("Please upload a video to stabilize.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") transforms_path = os.path.join(TEMP_DIR, f"transforms_{timestamp}.trf") output_video_path = os.path.join(TEMP_DIR, f"stabilized_{timestamp}.mp4") try: # Pass 1: Detect shakiness detect_cmd = [ "ffmpeg", "-i", video_path, "-vf", f"vidstabdetect=shakiness={shakiness}:result={transforms_path}", "-f", "null", "-" ] run_ffmpeg_command(detect_cmd, "Analyzing video for stabilization (Pass 1/2)...") # Pass 2: Apply stabilization transform_cmd = [ "ffmpeg", "-i", video_path, "-vf", f"vidstabtransform=input={transforms_path}:smoothing={smoothing}:optalgo=gauss", "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path ] run_ffmpeg_command(transform_cmd, "Applying stabilization (Pass 2/2)...") finally: # Clean up the temporary transforms file if os.path.exists(transforms_path): os.remove(transforms_path) return output_video_path # ### --- NEW FEATURE: AUTO JUMP-CUT & WAVEFORM PREVIEW --- ### def generate_waveform_preview(video_path): """Generates a PNG image of the audio waveform.""" if not video_path or not has_audio_stream(video_path): return None # Return None if no video or no audio timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_image_path = os.path.join(TEMP_DIR, f"waveform_{timestamp}.png") # FFMPEG command to generate a waveform picture cmd = [ "ffmpeg", "-i", video_path, "-filter_complex", "aformat=channel_layouts=mono,compand,showwavespic=s=1280x240:colors=#38bdf8", "-frames:v", "1", "-y", output_image_path ] try: # Use subprocess.run and check for errors, but don't raise gr.Error to avoid stopping the UI result = subprocess.run(cmd, capture_output=True, text=True, check=True) return output_image_path except (subprocess.CalledProcessError, FileNotFoundError) as e: print(f"--- WAVEFORM GENERATION ERROR ---\n{e}") return None def auto_jump_cut(video_path, silence_threshold, min_silence_duration, resolution_choice, custom_w, custom_h, progress=gr.Progress(track_tqdm=True)): """ Automatically removes silent parts from a video and stitches the remaining parts together. """ if not video_path: raise gr.Error("Please upload a video to process.") if not has_audio_stream(video_path): raise gr.Error("The uploaded video has no audio track. Cannot detect silence.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"jumpcut_{timestamp}.mp4") # --- 1. Detect Silence --- progress(0.1, desc="Analyzing for silent sections...") silence_cmd = [ "ffmpeg", "-i", video_path, "-af", f"silencedetect=noise={silence_threshold}dB:d={min_silence_duration}", "-f", "null", "-" ] print(f"Running silence detection: {' '.join(silence_cmd)}") result = subprocess.run(silence_cmd, capture_output=True, text=True, encoding='utf-8') silence_starts = [float(x) for x in re.findall(r'silence_start: (\d+\.?\d*)', result.stderr)] silence_ends = [float(x) for x in re.findall(r'silence_end: (\d+\.?\d*)', result.stderr)] if not silence_starts: gr.Info("No silence was detected with the current settings. Returning original video.") return video_path silences = list(zip(silence_starts, silence_ends)) # --- 2. Calculate Segments to Keep --- progress(0.3, desc="Calculating video cuts...") video_duration = get_media_duration(video_path) keep_segments = [] last_silence_end = 0.0 for start, end in silences: if start > last_silence_end: keep_segments.append((last_silence_end, start)) last_silence_end = end if last_silence_end < video_duration: keep_segments.append((last_silence_end, video_duration)) if not keep_segments: raise gr.Error("Failed to calculate any segments to keep. Try adjusting silence parameters.") # --- 3. Build the FFMPEG Filter Complex Command --- progress(0.5, desc="Building FFMPEG command...") scale_pad_filter = "" target_w, target_h = get_video_dimensions(video_path) if resolution_choice != "Keep Original": if resolution_choice == "1080p (1920x1080)": target_w, target_h = 1920, 1080 elif resolution_choice == "Portrait (1080x1920)": target_w, target_h = 1080, 1920 elif resolution_choice == "Custom": target_w, target_h = int(custom_w), int(custom_h) target_w -= target_w % 2 target_h -= target_h % 2 scale_pad_filter = f"scale={target_w}:{target_h}:force_original_aspect_ratio=decrease,pad={target_w}:{target_h}:(ow-iw)/2:(oh-ih)/2,setsar=1" filter_complex_parts = [] video_outputs = [] audio_outputs = [] for i, (start, end) in enumerate(keep_segments): filter_complex_parts.append(f"[0:v]trim=start={start}:end={end},setpts=PTS-STARTPTS[v{i}]") filter_complex_parts.append(f"[0:a]atrim=start={start}:end={end},asetpts=PTS-STARTPTS[a{i}]") if scale_pad_filter: filter_complex_parts.append(f"[v{i}]{scale_pad_filter}[scaled_v{i}]") video_outputs.append(f"[scaled_v{i}]") else: video_outputs.append(f"[v{i}]") audio_outputs.append(f"[a{i}]") filter_complex_parts.append(f"{''.join(video_outputs)}concat=n={len(keep_segments)}:v=1:a=0[vout]") filter_complex_parts.append(f"{''.join(audio_outputs)}concat=n={len(keep_segments)}:v=0:a=1[aout]") filter_complex_str = ";".join(filter_complex_parts) # --- 4. Execute the Final Command --- final_cmd = [ "ffmpeg", "-i", video_path, "-filter_complex", filter_complex_str, "-map", "[vout]", "-map", "[aout]", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac", "-y", output_video_path ] progress(0.7, desc="Generating final jump-cut video...") run_ffmpeg_command(final_cmd, desc="Stitching video segments...") return output_video_path # ### --- NEW FEATURE: VIDEO SILENCE CHOPPER --- ### def chop_video_on_silence(video_path, silence_threshold, min_silence_duration, resolution_choice, custom_w, custom_h, progress=gr.Progress(track_tqdm=True)): """ Splits a video into multiple clips, removing the silent parts. """ if not video_path: raise gr.Error("Please upload a video to process.") if not has_audio_stream(video_path): raise gr.Error("The uploaded video has no audio track. Cannot detect silence.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"video_chop_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) output_paths = [] progress(0.1, desc="Analyzing for silent sections...") silence_cmd = [ "ffmpeg", "-i", video_path, "-af", f"silencedetect=noise={silence_threshold}dB:d={min_silence_duration}", "-f", "null", "-" ] result = subprocess.run(silence_cmd, capture_output=True, text=True, encoding='utf-8') silence_starts = [float(x) for x in re.findall(r'silence_start: (\d+\.?\d*)', result.stderr)] silence_ends = [float(x) for x in re.findall(r'silence_end: (\d+\.?\d*)', result.stderr)] if not silence_starts: shutil.rmtree(job_temp_dir) raise gr.Error("No silence was detected with the current settings. Try adjusting the parameters.") silences = list(zip(silence_starts, silence_ends)) progress(0.3, desc="Calculating video cuts...") video_duration = get_media_duration(video_path) keep_segments = [] last_silence_end = 0.0 for start, end in silences: if start > last_silence_end: keep_segments.append((last_silence_end, start)) last_silence_end = end if last_silence_end < video_duration: keep_segments.append((last_silence_end, video_duration)) if not keep_segments: shutil.rmtree(job_temp_dir) raise gr.Error("Failed to calculate any segments to keep.") vf_filter = None if resolution_choice != "Keep Original": if resolution_choice == "1080p (1920x1080)": target_w, target_h = 1920, 1080 elif resolution_choice == "Portrait (1080x1920)": target_w, target_h = 1080, 1920 elif resolution_choice == "Custom": target_w, target_h = int(custom_w), int(custom_h) target_w -= target_w % 2 target_h -= target_h % 2 vf_filter = f"scale={target_w}:{target_h}:force_original_aspect_ratio=decrease,pad={target_w}:{target_h}:(ow-iw)/2:(oh-ih)/2,setsar=1" for i, (start, end) in enumerate(progress.tqdm(keep_segments, desc="Exporting video clips...")): output_clip_path = os.path.join(job_temp_dir, f"clip_{i:04d}.mp4") duration = end - start cmd = ["ffmpeg", "-y", "-ss", str(start), "-to", str(end), "-i", video_path] if vf_filter: # Re-encoding is necessary cmd.extend(["-vf", vf_filter, "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac"]) else: # Can use fast stream copy cmd.extend(["-c", "copy"]) cmd.append(output_clip_path) try: run_ffmpeg_command(cmd) output_paths.append(output_clip_path) except Exception as e: gr.Warning(f"Skipping a clip due to an error: {e}") continue if not output_paths: shutil.rmtree(job_temp_dir) raise gr.Error("No video clips could be exported.") zip_base_name = os.path.join(TEMP_DIR, f"video_chop_archive_{timestamp}") zip_path = shutil.make_archive(zip_base_name, 'zip', job_temp_dir) return output_paths, zip_path # --- STORYBOARD / ANIMATIC CREATOR FUNCTIONS --- def get_file_type(file_path): if not file_path: return "unknown" image_exts = ['.png', '.jpg', '.jpeg', '.webp', '.bmp', '.gif'] video_exts = ['.mp4', '.mov', '.mkv', '.avi', '.webm'] ext = os.path.splitext(file_path.lower())[1] if ext in image_exts: return "image" if ext in video_exts: return "video" return "unknown" def add_assets_to_bin(files, current_assets): if not files: return current_assets, gr.update(value=[a['path'] for a in current_assets] if current_assets else None) session_id = f"storyboard_session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" asset_session_dir = os.path.join(TEMP_DIR, session_id) os.makedirs(asset_session_dir, exist_ok=True) updated_asset_list = list(current_assets) for file_obj in files: try: file_type = get_file_type(file_obj.name) if file_type == "unknown": gr.Warning(f"Skipping unknown file type: {os.path.basename(file_obj.name)}") continue new_path = os.path.join(asset_session_dir, os.path.basename(file_obj.name)) shutil.copy(file_obj.name, new_path) updated_asset_list.append({"path": new_path, "name": os.path.basename(new_path), "type": file_type}) except Exception as e: gr.Warning(f"Error adding asset {os.path.basename(file_obj.name)}: {e}") return updated_asset_list, gr.update(value=[a['path'] for a in updated_asset_list]) def handle_asset_selection(evt: gr.SelectData, assets_state, timeline_state): if not evt.selected: return timeline_state, None selected_asset = assets_state[evt.index] new_timeline = list(timeline_state) item_to_add = { "path": selected_asset['path'], "name": selected_asset['name'], "type": selected_asset['type'], } if selected_asset['type'] == 'image': item_to_add.update({ "duration": 3.0, "start_time": 0, "original_duration": 0 }) else: # video original_duration = get_media_duration(selected_asset['path']) if original_duration <= 0: gr.Warning(f"Could not read duration for '{selected_asset['name']}'. Defaulting to 3.0 seconds. The file may be corrupt or in an unsupported format.") original_duration = 3.0 item_to_add.update({ "duration": round(original_duration, 2), "start_time": 0.0, "original_duration": round(original_duration, 2) }) new_timeline.append(item_to_add) gr.Info(f"Added '{selected_asset['name']}' to timeline.") preview_frames = None if selected_asset['type'] == 'video': try: preview_frames = extract_first_last_frame(selected_asset['path']) except Exception as e: print(f"Could not generate preview for {selected_asset['name']}: {e}") return new_timeline, preview_frames def add_all_assets_to_timeline(assets_state, timeline_state): if not assets_state: gr.Warning("Asset bin is empty.") return timeline_state new_timeline = list(timeline_state) for asset in assets_state: item_to_add = { "path": asset['path'], "name": asset['name'], "type": asset['type'], } if asset['type'] == 'image': item_to_add.update({ "duration": 3.0, "start_time": 0, "original_duration": 0 }) else: # video original_duration = get_media_duration(asset['path']) if original_duration <= 0: gr.Warning(f"Could not read duration for '{asset['name']}'. Defaulting to 3.0 seconds.") original_duration = 3.0 item_to_add.update({ "duration": round(original_duration, 2), "start_time": 0.0, "original_duration": round(original_duration, 2) }) new_timeline.append(item_to_add) gr.Info(f"Added {len(assets_state)} assets to the timeline.") return new_timeline def update_timeline_df(timeline_state): if not timeline_state: return gr.update(value=None) df_data = [[i + 1, item['name'], item['type'], item['duration']] for i, item in enumerate(timeline_state)] return gr.update(value=df_data) def handle_timeline_selection(timeline_state, evt: gr.SelectData): if not evt.selected: return -1, None, None, gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), gr.update(visible=False), 0, 0 index = evt.index[0] if not (0 <= index < len(timeline_state)): return -1, None, None, gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), gr.update(visible=False), 0, 0 selected_item = timeline_state[index] preview_val = selected_item['path'] duration_val = selected_item['duration'] can_move_up = index > 0 can_move_down = index < len(timeline_state) - 1 if selected_item['type'] == 'video': start_time = selected_item.get('start_time', 0.0) end_time = start_time + selected_item['duration'] return (index, preview_val, duration_val, gr.update(interactive=can_move_up), gr.update(interactive=can_move_down), gr.update(interactive=True), gr.update(visible=True), round(start_time, 2), round(end_time, 2)) else: # Image return (index, preview_val, duration_val, gr.update(interactive=can_move_up), gr.update(interactive=can_move_down), gr.update(interactive=True), gr.update(visible=False), 0, 0) def apply_trim_and_update(timeline_state, selected_index, new_start, new_end): if selected_index == -1 or not (0 <= selected_index < len(timeline_state)): gr.Warning("No clip selected in timeline.") return timeline_state, gr.update() item_to_update = timeline_state[selected_index] if item_to_update['type'] != 'video': gr.Warning("Trimming is only available for video clips.") return timeline_state, gr.update() original_duration = item_to_update.get('original_duration', 0) if not (0 <= new_start < new_end and new_end <= original_duration): gr.Warning(f"Invalid trim times. Must be between 0 and {original_duration:.2f}s, and start must be before end.") return timeline_state, gr.update() new_duration = new_end - new_start new_timeline = list(timeline_state) new_timeline[selected_index]['start_time'] = round(new_start, 2) new_timeline[selected_index]['duration'] = round(new_duration, 2) gr.Info(f"Clip '{item_to_update['name']}' trimmed. New duration is {new_duration:.2f}s.") return new_timeline, gr.update(value=round(new_duration, 2)) def update_clip_properties(timeline_state, selected_index, new_duration): if selected_index == -1 or not (0 <= selected_index < len(timeline_state)): gr.Warning("No clip selected in timeline.") return timeline_state if new_duration <= 0: gr.Warning("Duration must be a positive number.") return timeline_state new_timeline = list(timeline_state) item_to_update = new_timeline[selected_index] if item_to_update['type'] == 'video': start_time = item_to_update.get('start_time', 0.0) original_duration = item_to_update.get('original_duration', 0.0) max_possible_duration = original_duration - start_time if new_duration > max_possible_duration: gr.Warning(f"Duration cannot exceed available video length from start time ({max_possible_duration:.2f}s). Clamping value.") new_duration = max_possible_duration item_to_update['duration'] = round(new_duration, 2) gr.Info(f"Updated duration for '{item_to_update['name']}'.") return new_timeline def handle_timeline_action(timeline_state, selected_index, action): if selected_index == -1 or not (0 <= selected_index < len(timeline_state)): gr.Warning("Please select a clip from the timeline first.") return timeline_state, gr.update() new_list = list(timeline_state) new_index = selected_index if action == "up" and selected_index > 0: new_list.insert(selected_index - 1, new_list.pop(selected_index)) new_index = selected_index - 1 elif action == "down" and selected_index < len(new_list) - 1: new_list.insert(selected_index + 1, new_list.pop(selected_index)) new_index = selected_index + 1 elif action == "remove": new_list.pop(selected_index) new_index = -1 # Deselect after removing # Return the new list and tell the UI to select the new index return new_list, gr.update(selected_index=new_index if new_index != -1 else None) def set_resolution_from_first_asset(timeline_state): if not timeline_state: gr.Warning("Timeline is empty. Cannot determine resolution.") return gr.update(), gr.update() first_item = timeline_state[0] path = first_item['path'] item_type = first_item['type'] w, h = 0, 0 if item_type == 'video': w, h = get_video_dimensions(path) elif item_type == 'image': try: with Image.open(path) as img: w, h = img.size except Exception as e: print(f"Could not get image dimensions for {path}: {e}") if w > 0 and h > 0: gr.Info(f"Set resolution to {w}x{h} based on '{first_item['name']}'.") return w, h else: gr.Warning(f"Could not get dimensions for the first asset: '{first_item['name']}'.") return gr.update(), gr.update() def create_animatic(timeline_data, audio_path, out_w, out_h, keep_original_audio): if not timeline_data: raise gr.Error("Timeline is empty. Please add assets to the timeline.") out_w, out_h = int(out_w), int(out_h) if out_w <= 0 or out_h <= 0: raise gr.Error("Output width and height must be positive numbers.") out_w -= out_w % 2 out_h -= out_h % 2 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"animatic_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) clip_paths = [] for i, item in enumerate(timeline_data): item_path, item_type, item_duration = item['path'], item['type'], item['duration'] start_time = item.get('start_time', 0) if item_duration <= 0: gr.Warning(f"Skipping clip '{item['name']}' because its duration is zero.") continue output_clip_path = os.path.join(job_temp_dir, f"clip_{i:04d}.mp4") cmd = ["ffmpeg", "-y"] vf_base_scale = f"scale={out_w}:{out_h}:force_original_aspect_ratio=decrease,pad={out_w}:{out_h}:(ow-iw)/2:(oh-ih)/2,setsar=1" if item_type == 'video': video_has_audio = has_audio_stream(item_path) if start_time > 0: cmd.extend(["-ss", str(start_time)]) cmd.extend(["-t", str(item_duration), "-i", item_path]) vf_filters = [f"setpts=PTS-STARTPTS", vf_base_scale] cmd.extend(["-vf", ",".join(vf_filters)]) if keep_original_audio: if video_has_audio: cmd.extend(["-af", "asetpts=PTS-STARTPTS"]) cmd.extend(["-c:a", "aac", "-ar", "44100"]) else: cmd.extend(["-f", "lavfi", "-t", str(item_duration), "-i", "anullsrc=channel_layout=stereo:sample_rate=44100"]) cmd.extend(["-map", "0:v:0", "-map", "1:a:0"]) cmd.extend(["-c:a", "aac", "-ar", "44100"]) else: cmd.append("-an") cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p"]) else: # item_type == 'image' cmd.extend(["-loop", "1", "-i", item_path, "-t", str(item_duration)]) vf_filter_img = f"{vf_base_scale},format=yuv420p" cmd.extend(["-vf", vf_filter_img]) if keep_original_audio: cmd.extend(["-f", "lavfi", "-t", str(item_duration), "-i", "anullsrc=channel_layout=stereo:sample_rate=44100", "-shortest"]) cmd.extend(["-c:a", "aac", "-ar", "44100"]) else: cmd.append("-an") cmd.append(output_clip_path) run_ffmpeg_command(cmd, f"Processing clip {i+1}/{len(timeline_data)}: {item['name']}") clip_paths.append(output_clip_path) if not clip_paths: shutil.rmtree(job_temp_dir) raise gr.Error("No valid clips were generated. Check clip durations and file integrity.") combined_video_path = os.path.join(job_temp_dir, "combined_video.mp4") if len(clip_paths) > 1: cmd_concat = ["ffmpeg", "-y"] video_inputs, audio_inputs = [], [] for i, path in enumerate(clip_paths): cmd_concat.extend(["-i", path]) video_inputs.append(f"[{i}:v]") if keep_original_audio: audio_inputs.append(f"[{i}:a]") filter_complex_str = "" if keep_original_audio: video_concat_str = "".join(video_inputs) + f"concat=n={len(clip_paths)}:v=1:a=0[v_out];" audio_concat_str = "".join(audio_inputs) + f"concat=n={len(clip_paths)}:v=0:a=1[a_out]" filter_complex_str = video_concat_str + audio_concat_str cmd_concat.extend(["-filter_complex", filter_complex_str, "-map", "[v_out]", "-map", "[a_out]"]) else: video_concat_str = "".join(video_inputs) + f"concat=n={len(clip_paths)}:v=1:a=0[v_out]" filter_complex_str = video_concat_str cmd_concat.extend(["-filter_complex", filter_complex_str, "-map", "[v_out]"]) cmd_concat.append(combined_video_path) run_ffmpeg_command(cmd_concat, "Joining and Finalizing Video (Robust Mode)...") else: if os.path.exists(clip_paths[0]): shutil.copy(clip_paths[0], combined_video_path) else: shutil.rmtree(job_temp_dir) raise gr.Error("The only clip in the timeline failed to process.") final_output_path = os.path.join(TEMP_DIR, f"animatic_final_{timestamp}.mp4") if not keep_original_audio and audio_path: run_ffmpeg_command(["ffmpeg", "-y", "-i", combined_video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-shortest", final_output_path], "Muxing audio") else: shutil.move(combined_video_path, final_output_path) shutil.rmtree(job_temp_dir) return final_output_path def detect_bpm(audio_path): if not audio_path: return "Please upload an audio track first." try: y, sr = librosa.load(audio_path) tempo_val, _ = librosa.beat.beat_track(y=y, sr=sr) if isinstance(tempo_val, np.ndarray): tempo = tempo_val.item() else: tempo = float(tempo_val) if tempo > 0: return f"Detected BPM: {tempo:.2f}" else: return "Could not detect BPM." except Exception as e: print(f"--- BPM DETECTION ERROR ---\n{e}") return "Error: Could not analyze audio file." def update_new_bpm_display(original_bpm_text, speed_multiplier): if not original_bpm_text or "Detected" not in original_bpm_text: return "---" try: bpm_match = re.search(r"(\d+\.\d+)", original_bpm_text) if bpm_match: original_bpm = float(bpm_match.group(1)) new_bpm = original_bpm * speed_multiplier return f"Estimated New BPM: {new_bpm:.2f}" else: return "---" except (ValueError, TypeError): return "---" def create_rhythmic_animatic(timeline_data, audio_path, measure_choice, out_w, out_h): if not timeline_data: raise gr.Error("Timeline is empty.") if not audio_path: raise gr.Error("An audio track is required for rhythmic editing.") try: y, sr = librosa.load(audio_path) tempo_val, _ = librosa.beat.beat_track(y=y, sr=sr) if isinstance(tempo_val, np.ndarray): tempo = tempo_val.item() else: tempo = float(tempo_val) if not tempo or tempo <= 0: raise gr.Error("Could not determine BPM from audio file.") except Exception as e: raise gr.Error(f"Audio analysis failed: {e}") seconds_per_beat = 60.0 / tempo seconds_per_measure = seconds_per_beat * 4.0 measure_multipliers = { "2 Measures": 2.0, "1 Measure": 1.0, "1/2 Measure": 0.5, "1/4 Measure (Beat)": 0.25 } clip_duration = seconds_per_measure * measure_multipliers[measure_choice] rhythmic_timeline = [] for item in timeline_data: new_item = item.copy() if new_item['type'] == 'video': start_time = new_item.get('start_time', 0) available_duration = new_item.get('original_duration', 0) - start_time new_item['duration'] = min(clip_duration, available_duration) else: new_item['duration'] = clip_duration rhythmic_timeline.append(new_item) gr.Info(f"Re-timed {len(rhythmic_timeline)} clips to ~{clip_duration:.2f}s each based on {tempo:.2f} BPM.") return create_animatic(rhythmic_timeline, audio_path, out_w, out_h, keep_original_audio=False) # --- NEW CREATIVE FUNCTIONS --- def _create_auto_trailer_impl(video_path, trailer_duration, clip_duration, analysis_method, transition_style, music_path, out_w, out_h, progress: Progress): """Internal implementation of the auto-trailer creator.""" if not video_path: raise gr.Error("Please upload a source video.") source_duration = get_media_duration(video_path) if source_duration < trailer_duration: gr.Warning(f"Source video is only {source_duration:.1f}s long. The trailer duration will be capped at the source video length.") trailer_duration = source_duration if clip_duration > trailer_duration: new_clip_duration = trailer_duration / 2 if trailer_duration > 2 else trailer_duration gr.Warning(f"Clip duration ({clip_duration}s) is longer than the trailer duration ({trailer_duration:.1f}s). Adjusting clip duration to {new_clip_duration:.1f}s.") clip_duration = new_clip_duration timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"trailer_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) try: progress(0, desc="Analyzing video for high-motion scenes...") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) if fps == 0: fps = 30 # fallback chunk_duration_frames = int(clip_duration * fps) video_total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) chunk_scores = [] prev_frame = None frame_num = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_skip = max(1, int(fps / 5)) # Analyze ~5 frames per second if frame_num % frame_skip == 0: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.GaussianBlur(gray, (21, 21), 0) if prev_frame is not None: frame_delta = cv2.absdiff(prev_frame, gray) thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] motion_score = np.sum(thresh) chunk_index = frame_num // chunk_duration_frames while len(chunk_scores) <= chunk_index: chunk_scores.append({'start_time': (len(chunk_scores) * clip_duration), 'scores': []}) chunk_scores[chunk_index]['scores'].append(motion_score) prev_frame = gray frame_num += 1 if frame_num % 100 == 0: progress(0.2 * (frame_num / video_total_frames), desc=f"Analyzing frame {frame_num}/{video_total_frames}...") cap.release() final_chunks = [{'start_time': chunk['start_time'], 'score': sum(chunk['scores']) / len(chunk['scores'])} for chunk in chunk_scores if chunk['scores']] if not final_chunks: raise gr.Error("Could not analyze video for motion. Is the video very short or static?") progress(0.2, desc="Selecting the best clips...") num_clips_to_select = max(1, int(trailer_duration / clip_duration)) selected_clips_info = sorted(sorted(final_chunks, key=lambda x: x['score'], reverse=True)[:num_clips_to_select], key=lambda x: x['start_time']) extracted_clips, out_w, out_h = [], int(out_w) - (int(out_w) % 2), int(out_h) - (int(out_h) % 2) for i, clip_info in enumerate(selected_clips_info): progress(0.2 + (0.5 * (i / len(selected_clips_info))), desc=f"Extracting clip {i+1}/{len(selected_clips_info)}...") output_clip_path = os.path.join(job_temp_dir, f"clip_{i:03d}.mp4") vf_filter = f"scale={out_w}:{out_h}:force_original_aspect_ratio=decrease,pad={out_w}:{out_h}:(ow-iw)/2:(oh-ih)/2,setsar=1" cmd = ["ffmpeg", "-y", "-ss", str(clip_info['start_time']), "-i", video_path, "-t", str(clip_duration), "-vf", vf_filter, "-an", "-c:v", "libx264", "-pix_fmt", "yuv420p", output_clip_path] run_ffmpeg_command(cmd) extracted_clips.append(output_clip_path) if not extracted_clips: raise gr.Error("Failed to extract any clips.") progress(0.7, desc="Stitching clips together...") final_silent_path = os.path.join(job_temp_dir, "final_silent.mp4") if transition_style == "None" or len(extracted_clips) == 1: if len(extracted_clips) == 1: shutil.copy(extracted_clips[0], final_silent_path) else: file_list_path = os.path.join(job_temp_dir, "files.txt") with open(file_list_path, 'w', encoding='utf-8') as f: for path in extracted_clips: f.write(f"file '{os.path.abspath(path)}'\n") run_ffmpeg_command(["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", file_list_path, "-c", "copy", final_silent_path], "Concatenating clips...") else: transition_duration = 0.5 cmd = ["ffmpeg", "-y"] filter_complex = [] running_duration = 0 for i, clip_path in enumerate(extracted_clips): cmd.extend(["-i", clip_path]) for i in range(len(extracted_clips) - 1): input1 = f"[{i}:v]" if i == 0 else f"[v{i-1}]" input2 = f"[{i+1}:v]" output = f"[v{i}]" offset = max(0, running_duration + clip_duration - transition_duration) filter_complex.append(f"{input1}{input2}xfade=transition={transition_style.lower()}:duration={transition_duration}:offset={offset}{output}") running_duration += clip_duration - transition_duration cmd.extend([ "-filter_complex", ";".join(filter_complex), "-map", f"[v{len(extracted_clips)-2}]", "-c:v", "libx264", "-pix_fmt", "yuv420p", final_silent_path ]) run_ffmpeg_command(cmd, "Applying transitions...") progress(0.95, desc="Adding background music...") final_output_path = os.path.join(TEMP_DIR, f"trailer_final_{timestamp}.mp4") if music_path: run_ffmpeg_command(["ffmpeg", "-y", "-i", final_silent_path, "-i", music_path, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", final_output_path], "Muxing audio") else: shutil.move(final_silent_path, final_output_path) return final_output_path finally: if os.path.exists(job_temp_dir): shutil.rmtree(job_temp_dir) def auto_trailer_wrapper(video_path, trailer_duration, clip_duration, analysis_method, transition_style, music_path, out_w, out_h, progress=gr.Progress(track_tqdm=True)): return _create_auto_trailer_impl(video_path, trailer_duration, clip_duration, analysis_method, transition_style, music_path, out_w, out_h, progress) def generate_waveform_video(video_path, style, size, position, color): if not video_path: raise gr.Error("Please upload a video first.") if not has_audio_stream(video_path): raise gr.Error("The uploaded video has no audio track. A waveform cannot be generated.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"waveform_video_{timestamp}.mp4") pos_map = { "Bottom": f"overlay=x=(W-w)/2:y=H-h-50", "Center": f"overlay=x=(W-w)/2:y=(H-h)/2", "Top": f"overlay=x=(W-w)/2:y=50" } safe_color = color.lstrip('#') filter_complex = ( f"[0:a]showwaves=s={size}:mode={style}:colors={safe_color}:rate=25[wave];" f"[0:v][wave]{pos_map[position]}" ) cmd = [ "ffmpeg", "-i", video_path, "-filter_complex", filter_complex, "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path ] run_ffmpeg_command(cmd, "Generating Audio Waveform...") return output_video_path def create_pip_video(main_video, overlay_media, position, scale): if not main_video: raise gr.Error("Please upload a main video.") if not overlay_media: raise gr.Error("Please upload an overlay video or image.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_video_path = os.path.join(TEMP_DIR, f"pip_video_{timestamp}.mp4") scale_filter = f"[1:v]scale=iw*{scale}:-1[scaled_overlay]" pos_map = { "Top-Left": "x=10:y=10", "Top-Center": "x=(W-w)/2:y=10", "Top-Right": "x=W-w-10:y=10", "Center-Left": "x=10:y=(H-h)/2", "Center": "x=(W-w)/2:y=(H-h)/2", "Center-Right": "x=W-w-10:y=(H-h)/2", "Bottom-Left": "x=10:y=H-h-10", "Bottom-Center": "x=(W-w)/2:y=H-h-10", "Bottom-Right": "x=W-w-10:y=H-h-10" } overlay_filter = f"[0:v][scaled_overlay]overlay={pos_map[position]}" cmd = ["ffmpeg", "-i", main_video, "-i", overlay_media.name] cmd.extend([ "-filter_complex", f"{scale_filter};{overlay_filter}", "-map", "0:a?", "-c:a", "copy", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-y", output_video_path ]) run_ffmpeg_command(cmd, "Creating Picture-in-Picture video...") return output_video_path def create_meme(image, text_input, position, font_choice, font_size_scale, text_color, outline_color): if image is None: raise gr.Error("Please upload an image.") parsed_text_color = parse_color(text_color) parsed_outline_color = parse_color(outline_color) img = Image.fromarray(image).convert("RGB") draw = ImageDraw.Draw(img) FONT_MAP = { "Impact": "impact.ttf", "Arial": "arial.ttf", "Arial Black": "ariblk.ttf", "Comic Sans MS": "comic.ttf", "Courier New": "cour.ttf", "Georgia": "georgia.ttf", "Tahoma": "tahoma.ttf", "Times New Roman": "times.ttf", "Trebuchet MS": "trebuc.ttf", "Verdana": "verdana.ttf" } font_path = FONT_MAP.get(font_choice, "impact.ttf") try: font_size = int(img.width / 10 * (font_size_scale / 5)) font = ImageFont.truetype(font_path, font_size) except IOError: gr.Warning(f"{font_choice} font ('{font_path}') not found. Trying Arial.") try: font_path = FONT_MAP["Arial"] font = ImageFont.truetype(font_path, font_size) except IOError: gr.Warning("Arial font not found. Using default font.") font = ImageFont.load_default() def draw_text_with_outline(text, x, y): # Outline draw.text((x-2, y-2), text, font=font, fill=parsed_outline_color) draw.text((x+2, y-2), text, font=font, fill=parsed_outline_color) draw.text((x-2, y+2), text, font=font, fill=parsed_outline_color) draw.text((x+2, y+2), text, font=font, fill=parsed_outline_color) # Main Text draw.text((x, y), text, font=font, fill=parsed_text_color) if text_input: bbox = draw.textbbox((0, 0), text_input.upper(), font=font) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] x = (img.width - text_width) / 2 if position == "Top": y = 10 elif position == "Bottom": y = img.height - text_height - 10 else: # Center y = (img.height - text_height) / 2 draw_text_with_outline(text_input.upper(), x, y) return img def stitch_images_smartly(img1_np, img2_np, output_size, bg_color_hex): """ Stitches two images together into a square. - If input images are vertical (based on first image), they are placed side-by-side. - If input images are horizontal, they are stacked vertically. """ if img1_np is None or img2_np is None: raise gr.Error("Please upload two images.") # Convert inputs to PIL Images img1 = Image.fromarray(img1_np).convert("RGBA") img2 = Image.fromarray(img2_np).convert("RGBA") # Parse background color using the utility function bg_color = parse_color(bg_color_hex) # Create the final square canvas final_image = Image.new("RGB", (output_size, output_size), bg_color) # Determine orientation from the first image w1, h1 = img1.size is_vertical = h1 > w1 if is_vertical: # --- Place two vertical images side-by-side --- target_box_w = output_size // 2 target_box_h = output_size # Process image 1: resize and paste centered in the left box img1.thumbnail((target_box_w, target_box_h), Image.Resampling.LANCZOS) x1_offset = (target_box_w - img1.width) // 2 y1_offset = (target_box_h - img1.height) // 2 final_image.paste(img1, (x1_offset, y1_offset), img1) # Process image 2: resize and paste centered in the right box img2.thumbnail((target_box_w, target_box_h), Image.Resampling.LANCZOS) x2_offset = target_box_w + (target_box_w - img2.width) // 2 y2_offset = (target_box_h - img2.height) // 2 final_image.paste(img2, (x2_offset, y2_offset), img2) else: # --- Stack two horizontal images vertically --- target_box_w = output_size target_box_h = output_size // 2 # Process image 1: resize and paste centered in the top box img1.thumbnail((target_box_w, target_box_h), Image.Resampling.LANCZOS) x1_offset = (target_box_w - img1.width) // 2 y1_offset = (target_box_h - img1.height) // 2 final_image.paste(img1, (x1_offset, y1_offset), img1) # Process image 2: resize and paste centered in the bottom box img2.thumbnail((target_box_w, target_box_h), Image.Resampling.LANCZOS) x2_offset = (target_box_w - img2.width) // 2 y2_offset = target_box_h + (target_box_h - img2.height) // 2 final_image.paste(img2, (x2_offset, y2_offset), img2) return final_image def merge_videos(videos): if not videos or len(videos) < 2: raise gr.Error("Please upload at least two videos to merge.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"merge_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) first_video_path = videos[0].name w, h = get_video_dimensions(first_video_path) fps = get_video_fps(first_video_path) w -= w % 2 h -= h % 2 processed_clips = [] for i, video_file in enumerate(videos): clip_path = os.path.join(job_temp_dir, f"clip_{i}.mp4") cmd = [ "ffmpeg", "-i", video_file.name, "-vf", f"scale={w}:{h},setsar=1", "-r", str(fps), "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac", "-ar", "44100", "-y", clip_path ] run_ffmpeg_command(cmd) processed_clips.append(clip_path) file_list_path = os.path.join(job_temp_dir, "files.txt") with open(file_list_path, 'w', encoding='utf-8') as f: for path in processed_clips: f.write(f"file '{os.path.abspath(path)}'\n") output_video_path = os.path.join(TEMP_DIR, f"merged_video_{timestamp}.mp4") cmd_merge = [ "ffmpeg", "-f", "concat", "-safe", "0", "-i", file_list_path, "-c", "copy", "-y", output_video_path ] run_ffmpeg_command(cmd_merge, "Merging videos...") shutil.rmtree(job_temp_dir) return output_video_path # ### --- NEW: BATCH IMAGE CROPPER --- ### def update_crop_preview(original_image, x, y, w, h): if original_image is None: return None # Create a copy to draw on preview_image = original_image.copy() draw = ImageDraw.Draw(preview_image) # Define the bounding box for the crop area box = (x, y, x + w, y + h) # Draw a rectangle outline draw.rectangle(box, outline="#38bdf8", width=3) return preview_image def crop_single_image(input_path, output_path, **kwargs): x = int(kwargs.get('x', 0)) y = int(kwargs.get('y', 0)) w = int(kwargs.get('w', 512)) h = int(kwargs.get('h', 512)) with Image.open(input_path) as img: cropped_img = img.crop((x, y, x + w, y + h)) cropped_img.save(output_path) def batch_crop_images(files, x, y, w, h): if not files: raise gr.Error("Please upload at least one image.") if w <= 0 or h <= 0: raise gr.Error("Width and Height must be positive.") processing_kwargs = {'x': x, 'y': y, 'w': w, 'h': h} output_paths, zip_path, _ = batch_image_processor( files, crop_single_image, "cropped", **processing_kwargs ) return output_paths, zip_path # ### --- NEW: COLLAGE MAKER --- ### def create_collage(files, layout, width, height, bg_color_hex): if not files: raise gr.Error("Please upload images to create a collage.") bg_color = parse_color(bg_color_hex) images = [Image.open(file.name).convert("RGBA") for file in files] n = len(images) if layout == "Grid": cols = int(math.ceil(math.sqrt(n))) rows = int(math.ceil(n / cols)) elif layout == "Horizontal": cols, rows = n, 1 else: # Vertical cols, rows = 1, n cell_w = width // cols cell_h = height // rows canvas = Image.new("RGB", (width, height), bg_color) for i, img in enumerate(images): row = i // cols col = i % cols img.thumbnail((cell_w, cell_h), Image.Resampling.LANCZOS) paste_x = (col * cell_w) + (cell_w - img.width) // 2 paste_y = (row * cell_h) + (cell_h - img.height) // 2 canvas.paste(img, (paste_x, paste_y), img) return canvas # ### --- NEW: VIDEO GRID COMPILER --- ### def update_audio_source_choices_for_grid(files): if not files: return gr.update(choices=["From Video 1", "None"], value="From Video 1") choices = [f"From Video {i+1}" for i in range(len(files))] choices.append("None") return gr.update(choices=choices, value=choices[0]) def compile_video_grid(videos, layout, width, height, bg_color, audio_choice, music_path): if not videos: raise gr.Error("Please upload videos to compile.") num_videos = len(videos) layout_map = { "2x1 (Side-by-Side)": 2, "1x2 (Stacked)": 2, "2x2 (Quad-View)": 4, "4x4 (16-View)": 16, "8x4 (32-View)": 32, } required_videos = layout_map.get(layout) if num_videos != required_videos: raise gr.Error(f"The '{layout}' layout requires exactly {required_videos} videos, but you uploaded {num_videos}.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = os.path.join(TEMP_DIR, f"grid_video_{timestamp}.mp4") width, height = int(width) - (int(width) % 2), int(height) - (int(height) % 2) cmd = ["ffmpeg", "-y"] input_count = 0 for video_file in videos: cmd.extend(["-i", video_file.name]) input_count += 1 if music_path: cmd.extend(["-i", music_path]) filter_complex_parts = [] if layout in ["2x1 (Side-by-Side)", "1x2 (Stacked)", "2x2 (Quad-View)"]: if layout == "2x1 (Side-by-Side)": tile_w, tile_h = width // 2, height filter_complex_parts.append(f"[0:v]scale={tile_w}:{tile_h}:force_original_aspect_ratio=decrease,pad={tile_w}:{tile_h}:-1:-1:color={bg_color}[v0];[1:v]scale={tile_w}:{tile_h}:force_original_aspect_ratio=decrease,pad={tile_w}:{tile_h}:-1:-1:color={bg_color}[v1];[v0][v1]hstack=inputs=2[vout]") elif layout == "1x2 (Stacked)": tile_w, tile_h = width, height // 2 filter_complex_parts.append(f"[0:v]scale={tile_w}:{tile_h}:force_original_aspect_ratio=decrease,pad={tile_w}:{tile_h}:-1:-1:color={bg_color}[v0];[1:v]scale={tile_w}:{tile_h}:force_original_aspect_ratio=decrease,pad={tile_w}:{tile_h}:-1:-1:color={bg_color}[v1];[v0][v1]vstack=inputs=2[vout]") elif layout == "2x2 (Quad-View)": tile_w, tile_h = width // 2, height // 2 for i in range(4): filter_complex_parts.append(f"[{i}:v]scale={tile_w}:{tile_h}:force_original_aspect_ratio=decrease,pad={tile_w}:{tile_h}:-1:-1:color={bg_color}[v{i}]") filter_complex_parts.append("[v0][v1][v2][v3]xstack=inputs=4:layout=0_0|w0_0|0_h0|w0_h0[vout]") elif layout in ["4x4 (16-View)", "8x4 (32-View)"]: cols, rows = (4, 4) if layout == "4x4 (16-View)" else (8, 4) tile_w, tile_h = width // cols, height // rows # 1. Scale all inputs for i in range(required_videos): filter_complex_parts.append(f"[{i}:v]scale={tile_w}:{tile_h}:force_original_aspect_ratio=decrease,pad={tile_w}:{tile_h}:-1:-1:color={bg_color}[v{i}]") # 2. Horizontally stack videos for each row row_outputs = [] for r in range(rows): start_index = r * cols end_index = start_index + cols row_inputs = "".join([f"[v{i}]" for i in range(start_index, end_index)]) row_output_label = f"[row{r}]" filter_complex_parts.append(f"{row_inputs}hstack=inputs={cols}{row_output_label}") row_outputs.append(row_output_label) # 3. Vertically stack all the rows final_vstack_inputs = "".join(row_outputs) filter_complex_parts.append(f"{final_vstack_inputs}vstack=inputs={rows}[vout]") cmd.extend(["-filter_complex", ";".join(filter_complex_parts)]) cmd.extend(["-map", "[vout]"]) # Audio mapping logic if music_path: cmd.extend(["-map", f"{input_count}:a?"]) # Map the external audio track cmd.extend(["-c:a", "aac", "-shortest"]) elif audio_choice != "None": try: audio_idx_match = re.search(r'\d+', audio_choice) if audio_idx_match: audio_idx = int(audio_idx_match.group()) - 1 if 0 <= audio_idx < num_videos: cmd.extend(["-map", f"{audio_idx}:a?"]) cmd.extend(["-c:a", "aac", "-shortest"]) except (AttributeError, IndexError): raise gr.Error("Invalid audio source selected.") cmd.extend(["-c:v", "libx264", "-pix_fmt", "yuv420p", output_path]) run_ffmpeg_command(cmd, "Compiling video grid...") return output_path def _create_automated_slideshow_impl(images, audio_path, kb_effect_style, transition_style, rhythm_choice, out_w, out_h, progress: Progress): """Internal implementation of the slideshow creator with progress tracking.""" if not images: raise gr.Error("Please upload at least one image.") if not audio_path: raise gr.Error("Please upload an audio track for rhythmic editing.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"slideshow_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) try: out_w, out_h = int(out_w) - (int(out_w) % 2), int(out_h) - (int(out_h) % 2) output_res_str = f"{out_w}x{out_h}" fps = 30 transition_duration = 0.5 progress(0, desc="Analyzing audio track...") try: y, sr = librosa.load(audio_path) audio_duration = librosa.get_duration(y=y, sr=sr) _, beat_frames = librosa.beat.beat_track(y=y, sr=sr, units='frames') beat_times = librosa.frames_to_time(beat_frames, sr=sr) except Exception as e: raise gr.Error(f"Audio analysis failed: {e}") beats_per_clip = {"1 Image per Beat": 1, "1 Image every 2 Beats": 2, "1 Image per Measure (4 Beats)": 4}[rhythm_choice] clip_start_times = [0.0] + [beat_times[i] for i in range(beats_per_clip, len(beat_times), beats_per_clip)] MAX_CLIPS = 200 if len(clip_start_times) > MAX_CLIPS: gr.Warning(f"Audio beat detection resulted in {len(clip_start_times)} clips. Capping at {MAX_CLIPS} to ensure performance.") clip_start_times = clip_start_times[:MAX_CLIPS] num_clips = len(clip_start_times) image_paths = [img.name for img in images] looped_image_paths = [image_paths[i % len(image_paths)] for i in range(num_clips)] kb_clips = [] total_steps = num_clips + 1 current_step = 0 for i in range(num_clips): progress(current_step / total_steps, desc=f"Creating clip {i+1}/{num_clips}") start_time = clip_start_times[i] end_time = clip_start_times[i + 1] if i + 1 < len(clip_start_times) else audio_duration clip_duration = end_time - start_time if clip_duration <= transition_duration: continue total_frames = int(clip_duration * fps) if total_frames <= 0: continue output_clip_path = os.path.join(job_temp_dir, f"kb_clip_{i:04d}.mp4") with Image.open(looped_image_paths[i]) as img: iw, ih = img.size zoom_levels = {"Subtle": (1.1, 1.15), "Standard": (1.1, 1.25), "Dynamic": (1.2, 1.5)} start_zoom = 1.0 end_zoom = random.uniform(*zoom_levels[kb_effect_style]) directions = ['top_left', 'top_right', 'bottom_left', 'bottom_right', 'center'] start_pos_name, end_pos_name = random.sample(directions, 2) def get_xy(pos_name, zoom_val, img_w, img_h): if pos_name == 'center': return (img_w/2 - (img_w/zoom_val)/2, img_h/2 - (img_h/zoom_val)/2) if pos_name == 'top_left': return (0, 0) if pos_name == 'top_right': return (img_w - img_w/zoom_val, 0) if pos_name == 'bottom_left': return (0, img_h - img_h/zoom_val) if pos_name == 'bottom_right': return (img_w - img_w/zoom_val, img_h - img_h/zoom_val) return (0,0) start_x, start_y = get_xy(start_pos_name, start_zoom, iw, ih) end_x, end_y = get_xy(end_pos_name, end_zoom, iw, ih) x_expr = f"{start_x}+({end_x}-({start_x}))*on/({total_frames}-1)" y_expr = f"{start_y}+({end_y}-({start_y}))*on/({total_frames}-1)" z_expr = f"if(lte(on,0),{start_zoom},{start_zoom}+({end_zoom}-{start_zoom})*on/({total_frames}-1))" zoompan_filter = f"zoompan=z='{z_expr}':x='{x_expr}':y='{y_expr}':d={total_frames}:s={output_res_str}:fps={fps}" cmd = ["ffmpeg", "-y", "-loop", "1", "-i", looped_image_paths[i], "-vf", zoompan_filter, "-t", str(clip_duration), "-c:v", "libx264", "-pix_fmt", "yuv420p", output_clip_path] run_ffmpeg_command(cmd) kb_clips.append({"path": output_clip_path, "duration": clip_duration}) current_step += 1 if not kb_clips: raise gr.Error("No clips were generated. The audio may be too short or the rhythm settings too fast.") progress(current_step / total_steps, desc=f"Applying transitions...") final_silent_path = os.path.join(job_temp_dir, "final_silent.mp4") if len(kb_clips) == 1: shutil.copy(kb_clips[0]['path'], final_silent_path) else: all_transitions = ["fade", "wipeleft", "wiperight", "wipeup", "wipedown", "slideleft", "slideright", "slideup", "slidedown", "dissolve"] cmd = ["ffmpeg", "-y"] filter_complex = [] running_duration = 0 for i, clip in enumerate(kb_clips): cmd.extend(["-i", clip['path']]) for i in range(len(kb_clips) - 1): input1 = f"[{i}:v]" if i == 0 else f"[v{i-1}]" input2 = f"[{i+1}:v]" output = f"[v{i}]" transition = random.choice(all_transitions) if transition_style == "Random" else transition_style.lower() offset = running_duration + kb_clips[i]['duration'] - transition_duration filter_complex.append(f"{input1}{input2}xfade=transition={transition}:duration={transition_duration}:offset={offset}{output}") running_duration += kb_clips[i]['duration'] - transition_duration cmd.extend(["-filter_complex", ";".join(filter_complex), "-map", f"[v{len(kb_clips)-2}]", "-c:v", "libx264", "-pix_fmt", "yuv420p", final_silent_path]) run_ffmpeg_command(cmd) progress(0.98, desc="Muxing final audio...") final_output_path = os.path.join(TEMP_DIR, f"slideshow_final_{timestamp}.mp4") run_ffmpeg_command(["ffmpeg", "-y", "-i", final_silent_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-shortest", final_output_path], "Muxing audio") return final_output_path finally: if os.path.exists(job_temp_dir): shutil.rmtree(job_temp_dir) def slideshow_wrapper(images, audio_path, kb_effect_style, transition_style, rhythm_choice, out_w, out_h, progress=gr.Progress(track_tqdm=True)): return _create_automated_slideshow_impl(images, audio_path, kb_effect_style, transition_style, rhythm_choice, out_w, out_h, progress) def _create_rhythmic_remix_impl(video_path, audio_path, cut_style, beat_sync, resolution_choice, custom_w, custom_h, progress: Progress): """Internal implementation of the auto-rhythmic video remixer.""" if not video_path: raise gr.Error("Please upload a source video.") if not audio_path: raise gr.Error("Please upload an audio track.") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") job_temp_dir = os.path.join(TEMP_DIR, f"remix_{timestamp}") os.makedirs(job_temp_dir, exist_ok=True) try: vf_filter = None if resolution_choice == "Match Source Video Dimensions": out_w, out_h = get_video_dimensions(video_path) if out_w == 0 or out_h == 0: gr.Warning("Could not read source video dimensions. Defaulting to 1080p.") out_w, out_h = 1920, 1080 vf_filter = f"scale={out_w}:{out_h}:force_original_aspect_ratio=decrease,pad={out_w}:{out_h}:(ow-iw)/2:(oh-ih)/2,setsar=1" elif resolution_choice == "1080p (1920x1080)": out_w, out_h = 1920, 1080 elif resolution_choice == "720p (1280x720)": out_w, out_h = 1280, 720 elif resolution_choice == "Custom": out_w, out_h = int(custom_w), int(custom_h) if out_w <= 0 or out_h <= 0: raise gr.Error("Custom width and height must be positive numbers.") out_w, out_h = out_w - (out_w % 2), out_h - (out_h % 2) if vf_filter is None and resolution_choice != "Match Source Video Dimensions": vf_filter = f"scale={out_w}:{out_h}:force_original_aspect_ratio=decrease,pad={out_w}:{out_h}:(ow-iw)/2:(oh-ih)/2,setsar=1" progress(0, desc="Analyzing audio track for beats...") try: y, sr = librosa.load(audio_path) audio_duration = librosa.get_duration(y=y, sr=sr) _, beat_frames = librosa.beat.beat_track(y=y, sr=sr, units='frames') beat_times = librosa.frames_to_time(beat_frames, sr=sr) if len(beat_times) < 2: raise ValueError("Not enough beats were detected in the audio.") except Exception as e: raise gr.Error(f"Audio analysis failed: {e}") beats_per_clip = {"On the Beat": 1, "Every 2 Beats": 2, "Every Measure (4 beats)": 4}[beat_sync] clip_definitions = [] clip_start_beat_indices = range(0, len(beat_times), beats_per_clip) for i, beat_index in enumerate(clip_start_beat_indices): start_beat_time = beat_times[beat_index] if i + 1 < len(clip_start_beat_indices): end_beat_time = beat_times[clip_start_beat_indices[i+1]] else: end_beat_time = audio_duration duration = end_beat_time - start_beat_time if duration > 0.1: clip_definitions.append({'duration': duration}) if not clip_definitions: raise gr.Error("Could not define any video clips based on the detected beats.") progress(0.1, desc="Planning video cuts...") source_duration = get_media_duration(video_path) current_time_in_source = 0 for clip in clip_definitions: if cut_style == "Sequential": clip['source_start'] = current_time_in_source current_time_in_source += clip['duration'] if current_time_in_source > source_duration: gr.Warning("Source video is shorter than the music. Looping video from the beginning.") current_time_in_source = 0 elif cut_style == "Random Shuffle": max_start_time = source_duration - clip['duration'] clip['source_start'] = random.uniform(0, max_start_time) if max_start_time > 0 else 0 extracted_clip_paths = [] for i, clip in enumerate(clip_definitions): progress(0.1 + (0.7 * (i / len(clip_definitions))), desc=f"Extracting clip {i+1}/{len(clip_definitions)}...") output_clip_path = os.path.join(job_temp_dir, f"clip_{i:04d}.mp4") cmd = [ "ffmpeg", "-y", "-ss", str(clip['source_start']), "-i", video_path, "-t", str(clip['duration']), "-an", "-c:v", "libx264", "-pix_fmt", "yuv420p" ] if vf_filter: cmd.extend(["-vf", vf_filter]) cmd.append(output_clip_path) run_ffmpeg_command(cmd) extracted_clip_paths.append(output_clip_path) progress(0.85, desc="Stitching clips together...") file_list_path = os.path.join(job_temp_dir, "files.txt") with open(file_list_path, 'w', encoding='utf-8') as f: for path in extracted_clip_paths: f.write(f"file '{os.path.abspath(path)}'\n") silent_final_path = os.path.join(job_temp_dir, "final_silent.mp4") run_ffmpeg_command(["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", file_list_path, "-c", "copy", silent_final_path], "Concatenating clips...") progress(0.95, desc="Adding music...") final_output_path = os.path.join(TEMP_DIR, f"remix_final_{timestamp}.mp4") run_ffmpeg_command(["ffmpeg", "-y", "-i", silent_final_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", final_output_path], "Muxing audio") return final_output_path finally: if os.path.exists(job_temp_dir): shutil.rmtree(job_temp_dir) def rhythmic_remix_wrapper(video_path, audio_path, cut_style, beat_sync, resolution_choice, custom_w, custom_h, progress=gr.Progress(track_tqdm=True)): return _create_rhythmic_remix_impl(video_path, audio_path, cut_style, beat_sync, resolution_choice, custom_w, custom_h, progress) # --- BLING --- CSS AND JS --- bling_css = """ @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;700&display=swap'); :root { --bling-main-font: 'Inter', sans-serif; --bling-gradient-start: #0f172a; --bling-gradient-mid: #1e293b; --bling-gradient-end: #334155; --bling-accent-color: #38bdf8; /* sky-400 */ } body, .gradio-container { font-family: var(--bling-main-font) !important; background: var(--bling-gradient-start); background: linear-gradient(135deg, var(--bling-gradient-start) 0%, var(--bling-gradient-mid) 50%, var(--bling-gradient-end) 100%); background-size: 200% 200%; animation: gradient-animation 15s ease infinite; } @keyframes gradient-animation { 0% { background-position: 0% 50%; } 50% { background-position: 100% 50%; } 100% { background-position: 0% 50%; } } /* Glassmorphism for containers */ .gradio-tabs, .gradio-accordion, .gradio-group { background: rgba(255, 255, 255, 0.05) !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; border-radius: 12px !important; backdrop-filter: blur(10px) !important; -webkit-backdrop-filter: blur(10px) !important; box-shadow: 0 4px 30px rgba(0, 0, 0, 0.1) !important; } /* Button Bling */ .gradio-button { transition: all 0.2s ease-in-out !important; box-shadow: 0 2px 4px rgba(0,0,0,0.2) !important; } .gradio-button:hover { transform: translateY(-2px); box-shadow: 0 4px 8px rgba(0,0,0,0.3) !important; filter: brightness(1.1); } /* Custom Info/Warning Boxes */ .gradio-info { background: rgba(56, 189, 248, 0.1) !important; /* sky-400 with alpha */ color: #f0f9ff !important; /* sky-50 */ border-left: 4px solid var(--bling-accent-color) !important; border-radius: 8px !important; } .gradio-warning { background: rgba(251, 191, 36, 0.1) !important; /* amber-400 with alpha */ color: #fffbeb !important; /* amber-50 */ border-left: 4px solid #fbbf24 !important; border-radius: 8px !important; } /* Custom Scrollbars */ ::-webkit-scrollbar { width: 8px; } ::-webkit-scrollbar-track { background: rgba(255, 255, 255, 0.1); } ::-webkit-scrollbar-thumb { background-color: var(--bling-accent-color); border-radius: 4px; } ::-webkit-scrollbar-thumb:hover { background-color: #0ea5e9; } /* sky-500 */ #custom-footer { text-align: center !important; padding: 20px 0 5px 0 !important; font-size: .9em; color: #94a3b8; /* slate-400 */ } /* Loading Overlay CSS */ #loading-overlay { position: fixed; top: 0; left: 0; width: 100vw; height: 100vh; background-color: rgba(15, 23, 42, 0.8); z-index: 10000; display: flex; justify-content: center; align-items: center; flex-direction: column; color: white; font-size: 1.2em; backdrop-filter: blur(5px); -webkit-backdrop-filter: blur(5px); opacity: 0; visibility: hidden; transition: opacity 0.3s ease, visibility 0.3s ease; } #loading-overlay.visible { opacity: 1; visibility: visible; } .spinner { width: 60px; height: 60px; border: 5px solid rgba(255, 255, 255, 0.3); border-top-color: var(--bling-accent-color); border-radius: 50%; animation: spin 1s linear infinite; margin-bottom: 20px; } @keyframes spin { to { transform: rotate(360deg); } } """ bling_js = """ () => { // --- JKL Video Control --- let activeVideo = null; document.addEventListener('mouseover', (e) => { if (e.target.tagName === 'VIDEO') { activeVideo = e.target; } }); document.addEventListener('keydown', (e) => { const activeElement = document.activeElement; if (activeElement && (activeElement.tagName === 'INPUT' || activeElement.tagName === 'TEXTAREA')) { return; } if (!activeVideo) return; const frameTime = 1 / 30; let handled = false; switch (e.key.toLowerCase()) { case 'k': activeVideo.paused ? activeVideo.play() : activeVideo.pause(); handled = true; break; case 'j': activeVideo.currentTime = Math.max(0, activeVideo.currentTime - frameTime); handled = true; break; case 'l': activeVideo.currentTime += frameTime; handled = true; break; } if (handled) e.preventDefault(); }); // --- Loading Overlay --- function show_overlay(message = 'Processing... Please wait.') { let overlay = document.getElementById('loading-overlay'); if (!overlay) { overlay = document.createElement('div'); overlay.id = 'loading-overlay'; overlay.innerHTML = `
`; document.body.appendChild(overlay); } document.getElementById('loading-message').textContent = message; overlay.classList.add('visible'); } function hide_overlay() { const overlay = document.getElementById('loading-overlay'); if (overlay) { overlay.classList.remove('visible'); } } // --- Confetti --- function fire_confetti() { const a=document.createElement("script");a.setAttribute("src","https://cdn.jsdelivr.net/npm/canvas-confetti@1.9.2/dist/confetti.browser.min.js"),document.head.appendChild(a),a.onload=()=>{var e=confetti.create(null,{resize:!0,useWorker:!0});e({particleCount:150,spread:90,origin:{y:.6}})} } // --- Audio Feedback with Synthesized Whistle --- const skriptz_audio = { context: null, isInitialized: false, }; async function init_audio() { if (skriptz_audio.isInitialized) return; try { skriptz_audio.context = new (window.AudioContext || window.webkitAudioContext)(); if (skriptz_audio.context.state === 'suspended') { await skriptz_audio.context.resume(); } } catch (e) { console.error('Failed to initialize Web Audio API:', e); } skriptz_audio.isInitialized = true; } async function play_finish_sound() { if (!skriptz_audio.isInitialized) { await init_audio(); } const context = skriptz_audio.context; if (!context) return; if (context.state === 'suspended') { await context.resume(); } const now = context.currentTime; const delay = 0.2; const startTime = now + delay; const oscillator = context.createOscillator(); const gainNode = context.createGain(); oscillator.connect(gainNode); gainNode.connect(context.destination); oscillator.type = 'sine'; const startFreq = 2000; const endFreq = 1000; oscillator.frequency.setValueAtTime(startFreq, startTime); oscillator.frequency.exponentialRampToValueAtTime(endFreq, startTime + 0.15); gainNode.gain.setValueAtTime(0, startTime); gainNode.gain.linearRampToValueAtTime(0.4, startTime + 0.02); gainNode.gain.linearRampToValueAtTime(0, startTime + 0.15); oscillator.start(startTime); oscillator.stop(startTime + 0.2); } // --- Dynamic Page Title --- function update_title(tab_name) { if (tab_name) { const clean_name = tab_name.replace(/[\\u{1F600}-\\u{1F64F}\\u{1F300}-\\u{1F5FF}\\u{1F680}-\\u{1F6FF}\\u{1F700}-\\u{1F77F}\\u{1F780}-\\u{1F7FF}\\u{1F800}-\\u{1F8FF}\\u{1F900}-\\u{1F9FF}\\u{1FA00}-\\u{1FA6F}\\u{1FA70}-\\u{1FAFF}\\u{2600}-\\u{26FF}\\u{2700}-\\u{27BF}]/gu, '').trim(); document.title = `Skriptz - ${clean_name}`; } else { document.title = "Skriptz - Universal Tool"; } } // --- Copy to Clipboard --- function copy_to_clipboard(text_id) { const text_area = document.getElementById(text_id).querySelector('textarea'); if(text_area) { text_area.select(); document.execCommand('copy'); const original_button = this.event.target; const original_text = original_button.innerText; original_button.innerText = 'Copied!'; setTimeout(() => { original_button.innerText = original_text; }, 2000); } } // --- Storyboard Time Getter --- function storyboard_get_time(){ const e=document.querySelector('#storyboard_clip_preview video'); return e?e.currentTime:0 } // Make functions globally accessible for Gradio window.skriptz_bling = { show_overlay, hide_overlay, fire_confetti, play_finish_sound, update_title, copy_to_clipboard, storyboard_get_time }; } """ with gr.Blocks( title="Skriptz - Universal Tool", css=bling_css, js=bling_js ) as demo: gr.HTML("""Processing... Please wait.