diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -1,1859 +1,1878 @@
-"""
-๐ญ Advanced Face Swap Studio - Cross-Environment Compatible Version
-================================================================
-
-โ
FEATURES:
-- Professional 3-column layout optimized for all screen sizes
-- GPU/CPU processing with automatic detection
-- Batch processing mode for multiple videos
-- Lip sync integration (beta)
-- Real-time processing monitor
-- Cross-platform compatibility (Windows, Linux, macOS)
-
-๐ง COMPATIBILITY:
-- HuggingFace Spaces ready
-- Google Colab compatible
-- Local development friendly
-- Graceful degradation for missing dependencies
-- Automatic environment detection and configuration
-
-๐ REQUIREMENTS:
-- gradio, torch, onnxruntime (core)
-- moviepy (optional - for video processing)
-- SwitcherAI modules (optional - for enhancement)
-
-๐ USAGE:
-- Local: python app.py
-- HuggingFace: Upload and run as Space
-- Colab: Upload and execute in notebook
-"""
-
-import os
-import sys
-import zipfile
-import time
-
-# Get the directory where app.py is located
-BASE_DIR = os.path.dirname(os.path.abspath(__file__))
-
-# Create unique instance ID for multi-instance support
-INSTANCE_ID = f"instance_{os.getpid()}_{int(time.time() % 10000)}"
-INSTANCE_TEMP_DIR = os.path.join(BASE_DIR, "Temp", INSTANCE_ID)
-INSTANCE_OUTPUT_DIR = os.path.join(BASE_DIR, "output", INSTANCE_ID)
-
-# Create instance-specific directories
-os.makedirs(INSTANCE_TEMP_DIR, exist_ok=True)
-os.makedirs(INSTANCE_OUTPUT_DIR, exist_ok=True)
-
-# Set up environment variables using relative paths (removed Conda and CUDA paths)
-os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
-os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "TRUE"
-os.environ["GRADIO_TEMP_DIR"] = INSTANCE_TEMP_DIR
-
-import gradio as gr
-import subprocess as sp
-import uuid
-import time
-import shutil
-try:
- from moviepy.editor import *
- MOVIEPY_AVAILABLE = True
- print("โ
MoviePy loaded successfully")
-except ImportError as e:
- print(f"โ ๏ธ MoviePy not available: {e}")
- print("๐ Some video processing features may be limited")
- MOVIEPY_AVAILABLE = False
-import gc # Import garbage collector
-
-# Add relative paths to sys.path
-sys.path.append(os.path.join(BASE_DIR, "SwitcherAI", "processors", "frame", "modules"))
-
-# Try to import optional enhancement modules (graceful degradation)
-try:
- import face_enhancer
- import frame_enhancer
- ENHANCEMENT_AVAILABLE = True
- print("โ
Enhancement modules loaded successfully")
-except ImportError as e:
- print(f"โ ๏ธ Enhancement modules not available: {e}")
- print("๐ App will run in basic mode without enhancement features")
- ENHANCEMENT_AVAILABLE = False
-
-sys.path.append(BASE_DIR) # Add base directory
-
-import onnxruntime as ort
-import torch
-import shlex
-
-def find_available_port(start_port=7860, max_attempts=10):
- """Find an available port starting from start_port"""
- import socket
-
- for i in range(max_attempts):
- port = start_port + i
- try:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('localhost', port))
- print(f"๐ Found available port: {port}")
- return port
- except OSError:
- continue
-
- print(f"โ ๏ธ Could not find available port in range {start_port}-{start_port + max_attempts}")
- return start_port # Fallback to original port
-
-def get_instance_file_path(filename):
- """Get instance-specific file path to avoid conflicts"""
- return os.path.join(INSTANCE_TEMP_DIR, filename)
-
-def get_available_gpus():
- """Get list of available CUDA devices with enhanced debugging"""
- print("\n๐ Starting GPU Detection...")
- available_gpus = []
-
- # Check CUDA availability first
- cuda_available = torch.cuda.is_available()
- print(f"๐ CUDA Available: {cuda_available}")
-
- if not cuda_available:
- print("โ CUDA not available - returning CPU only")
- return ["CPU Only"]
-
- # Get device count
- device_count = torch.cuda.device_count()
- print(f"๐ข Total CUDA devices detected: {device_count}")
-
- if device_count == 0:
- print("โ No CUDA devices found despite CUDA being available")
- return ["CPU Only"]
-
- # Check environment variables that might limit GPU visibility
- cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
- if cuda_visible is not None:
- print(f"๐ CUDA_VISIBLE_DEVICES: {cuda_visible}")
-
- # Detect each GPU
- for i in range(device_count):
- try:
- print(f"\n--- Checking GPU {i} ---")
-
- # Get device properties
- props = torch.cuda.get_device_properties(i)
- gpu_name = props.name
- gpu_memory = props.total_memory / (1024**3) # GB
-
- print(f" Name: {gpu_name}")
- print(f" Memory: {gpu_memory:.1f}GB")
- print(f" Compute Capability: {props.major}.{props.minor}")
-
- # Test device accessibility
- try:
- # Save current device
- current_device = torch.cuda.current_device() if torch.cuda.is_available() else None
-
- # Test the device
- torch.cuda.set_device(i)
- test_tensor = torch.tensor([1.0], device=f'cuda:{i}')
-
- # Successful - add to list
- gpu_entry = f"GPU {i}: {gpu_name} ({gpu_memory:.1f}GB)"
- available_gpus.append(gpu_entry)
- print(f" Status: โ
Accessible")
- print(f" Added: {gpu_entry}")
-
- # Cleanup
- del test_tensor
-
- # Restore previous device if it existed
- if current_device is not None:
- torch.cuda.set_device(current_device)
-
- except Exception as device_error:
- print(f" Status: โ Not accessible - {device_error}")
- # Still add it to the list but mark as problematic
- gpu_entry = f"GPU {i}: {gpu_name} (โ ๏ธ Issues)"
- available_gpus.append(gpu_entry)
- print(f" Added with warning: {gpu_entry}")
-
- except Exception as e:
- print(f" โ Error detecting GPU {i}: {e}")
- # Add as unknown GPU
- available_gpus.append(f"GPU {i}: Unknown GPU (Error)")
-
- # Always add CPU option
- available_gpus.append("CPU Only")
-
- # Final summary
- print(f"\n๐ GPU Detection Summary:")
- print(f" Total devices found: {len(available_gpus)}")
- for i, gpu in enumerate(available_gpus):
- print(f" {i+1}. {gpu}")
-
- print("โ
GPU detection complete\n")
- return available_gpus
-
-def set_gpu_device(gpu_selection):
- """Set the CUDA device based on user selection"""
- if gpu_selection.startswith("GPU"):
- try:
- gpu_id = gpu_selection.split(":")[0].split(" ")[1]
- os.environ["CUDA_VISIBLE_DEVICES"] = gpu_id
- print(f"Set CUDA_VISIBLE_DEVICES to: {gpu_id}")
- return gpu_id
- except (IndexError, ValueError) as e:
- print(f"โ ๏ธ Error parsing GPU selection '{gpu_selection}': {e}")
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
- print("Falling back to CPU mode")
- return "cpu"
- else:
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
- print("Using CPU mode")
- return "cpu"
-
-# Get the port early for display in UI
-INSTANCE_PORT = find_available_port(7860) if not os.getenv('SPACE_ID') else 7860
-
-def check_environment():
- """Check environment and display compatibility status"""
- print("\n๐ ENVIRONMENT CHECK:")
- print("=" * 50)
-
- # Check Python version
- import sys
- python_version = sys.version_info
- print(f"๐ Python: {python_version.major}.{python_version.minor}.{python_version.micro}")
-
- # Check key dependencies
- dependencies = {
- 'gradio': 'โ
Available',
- 'torch': 'โ
Available' if torch.__version__ else 'โ Not Available',
- 'onnxruntime': 'โ
Available',
- 'moviepy': 'โ
Available' if MOVIEPY_AVAILABLE else 'โ ๏ธ Limited functionality',
- 'enhancement_modules': 'โ
Available' if ENHANCEMENT_AVAILABLE else 'โ ๏ธ Basic mode only'
- }
-
- for dep, status in dependencies.items():
- print(f"๐ฆ {dep}: {status}")
-
- # Check CUDA availability
- if torch.cuda.is_available():
- print(f"๐ CUDA: โ
Available ({torch.cuda.device_count()} device(s))")
- for i in range(torch.cuda.device_count()):
- try:
- print(f" โโ GPU {i}: {torch.cuda.get_device_name(i)}")
- except:
- print(f" โโ GPU {i}: Unknown GPU")
- else:
- print("๐ CUDA: โ ๏ธ Not available (CPU mode only)")
-
- print("=" * 50)
- return True
-
-def create_requirements_file():
- """Create a requirements.txt file for easy deployment"""
- requirements = [
- "gradio>=4.0.0",
- "torch>=2.0.0",
- "onnxruntime>=1.15.0",
- "moviepy>=1.0.3",
- "opencv-python>=4.8.0",
- "numpy>=1.24.0",
- "Pillow>=9.5.0"
- ]
-
- req_path = os.path.join(BASE_DIR, "requirements.txt")
- try:
- with open(req_path, 'w') as f:
- f.write('\n'.join(requirements))
- print(f"๐ Created requirements.txt at: {req_path}")
- except Exception as e:
- print(f"โ ๏ธ Could not create requirements.txt: {e}")
-
-def create_multi_instance_scripts():
- """Create helper scripts for running multiple instances"""
-
- # Windows batch script
- batch_script = """@echo off
-echo Starting Face Swap Studio Instance...
-echo Instance will auto-detect available port starting from 7860
-echo.
-python app.py
-pause
-"""
-
- # Linux/Mac shell script
- shell_script = """#!/bin/bash
-echo "Starting Face Swap Studio Instance..."
-echo "Instance will auto-detect available port starting from 7860"
-echo ""
-python3 app.py
-"""
-
- try:
- # Create Windows script
- with open(os.path.join(BASE_DIR, "launch_instance.bat"), 'w') as f:
- f.write(batch_script)
-
- # Create Linux/Mac script
- script_path = os.path.join(BASE_DIR, "launch_instance.sh")
- with open(script_path, 'w') as f:
- f.write(shell_script)
-
- # Make shell script executable
- try:
- os.chmod(script_path, 0o755)
- except:
- pass # Windows doesn't support chmod
-
- print("๐ Created multi-instance launch scripts:")
- print(" - launch_instance.bat (Windows)")
- print(" - launch_instance.sh (Linux/Mac)")
-
- except Exception as e:
- print(f"โ ๏ธ Could not create launch scripts: {e}")
-
-# Create launch scripts
-create_multi_instance_scripts()
-
-# Create requirements file for deployment
-create_requirements_file()
-
-# Run environment check
-check_environment()
-
-# Get available GPUs for the dropdown
-AVAILABLE_GPUS = get_available_gpus()
-
-# Print available GPUs to console for debugging
-print("\n" + "="*60)
-print("๐ฅ๏ธ GPU INITIALIZATION FOR DROPDOWN")
-print("="*60)
-print(f"๐ Final GPU List for Dropdown ({len(AVAILABLE_GPUS)} items):")
-for i, gpu in enumerate(AVAILABLE_GPUS):
- print(f" [{i}] {gpu}")
-print(f"๐ฏ Default selection: {AVAILABLE_GPUS[0] if AVAILABLE_GPUS else 'None'}")
-print(f"๐ List contents: {AVAILABLE_GPUS}")
-print(f"๐ข Total choices for dropdown: {len(AVAILABLE_GPUS)}")
-
-# Verify CUDA one more time
-print(f"\n๐ CUDA Status:")
-print(f" Available: {torch.cuda.is_available()}")
-if torch.cuda.is_available():
- print(f" Device count: {torch.cuda.device_count()}")
- for i in range(torch.cuda.device_count()):
- try:
- name = torch.cuda.get_device_name(i)
- print(f" GPU {i}: {name}")
- except:
- print(f" GPU {i}: Error getting name")
-print("="*60 + "\n")
-
-# Create a simple GPU test function
-def debug_gpu_choices():
- """Debug function to show what GPUs are available"""
- print("๐ Debug GPU Choices Called:")
- print(f" AVAILABLE_GPUS: {AVAILABLE_GPUS}")
- print(f" Length: {len(AVAILABLE_GPUS)}")
- return AVAILABLE_GPUS
-
-def on_gpu_selection_change(selected_gpu):
- """Handle GPU selection change - for debugging"""
- print(f"๐ฅ๏ธ GPU Selection Changed: {selected_gpu}")
- return selected_gpu
-
-def refresh_gpu_list():
- """Refresh the GPU list and return updated choices"""
- global AVAILABLE_GPUS
- print("๐ Refreshing GPU list...")
- AVAILABLE_GPUS = get_available_gpus()
-
- return gr.update(
- choices=AVAILABLE_GPUS,
- value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only",
- interactive=True,
- allow_custom_value=False
- )
-
-def debug_gpu_dropdown():
- """Debug the GPU dropdown and return status"""
- global AVAILABLE_GPUS
- print("๐ GPU Debug Button Clicked")
- print(f" Current AVAILABLE_GPUS: {AVAILABLE_GPUS}")
- print(f" Length: {len(AVAILABLE_GPUS)}")
-
- # Force complete refresh of GPU detection
- print("๐ Force refreshing GPU detection...")
- AVAILABLE_GPUS = get_available_gpus()
-
- debug_info = f"โ
DEBUG RESULTS:\n"
- debug_info += f"โข CUDA Available: {torch.cuda.is_available()}\n"
- debug_info += f"โข Device Count: {torch.cuda.device_count() if torch.cuda.is_available() else 0}\n"
- debug_info += f"โข Detected Options: {len(AVAILABLE_GPUS)}\n"
-
- for i, gpu in enumerate(AVAILABLE_GPUS):
- debug_info += f" [{i}] {gpu}\n"
-
- # Create completely new dropdown configuration
- dropdown_update = gr.update(
- choices=AVAILABLE_GPUS,
- value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only",
- interactive=True,
- visible=True
- )
-
- print(f"๐ Updated dropdown with {len(AVAILABLE_GPUS)} choices")
- return dropdown_update, debug_info
-
-# Global variable to track current process for cancellation
-current_process = None
-last_output_path = None
-last_batch_mode = False
-
-def create_batch_zip():
- """Create a zip file of all output files"""
- try:
- if not os.path.exists(INSTANCE_OUTPUT_DIR):
- print(f"โ Output directory does not exist: {INSTANCE_OUTPUT_DIR}")
- return None
-
- files = os.listdir(INSTANCE_OUTPUT_DIR)
- if not files:
- print("โ No files found in output directory")
- return None
-
- zip_path = os.path.join(INSTANCE_OUTPUT_DIR, f"batch_results_{INSTANCE_ID}.zip")
-
- # Remove old zip if exists
- if os.path.exists(zip_path):
- os.remove(zip_path)
- print("๐๏ธ Removed old zip file")
-
- with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for file in files:
- if not file.endswith('.zip'): # Don't zip existing zips
- file_path = os.path.join(INSTANCE_OUTPUT_DIR, file)
- if os.path.isfile(file_path):
- zipf.write(file_path, file)
- print(f"๐ฆ Added to zip: {file}")
-
- zip_size = os.path.getsize(zip_path) / (1024 * 1024) # MB
- print(f"โ
Batch zip created: {zip_path} ({zip_size:.1f}MB)")
- return zip_path
-
- except Exception as e:
- print(f"โ Error creating batch zip: {e}")
- return None
-
-def get_instance_downloads():
- """Get download file(s) from the current instance output directory"""
- try:
- print(f"๐ Checking downloads in: {INSTANCE_OUTPUT_DIR}") # Debug
-
- if not os.path.exists(INSTANCE_OUTPUT_DIR):
- print(f"โ Output directory does not exist: {INSTANCE_OUTPUT_DIR}")
- return None, "๐ No output directory found for this instance"
-
- # Get all video and zip files from the instance output directory
- files = []
- all_files = os.listdir(INSTANCE_OUTPUT_DIR)
- print(f"๐ Files in output directory: {all_files}") # Debug
-
- for file in all_files:
- if file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.zip')):
- full_path = os.path.join(INSTANCE_OUTPUT_DIR, file)
- files.append(full_path)
- print(f"โ
Found downloadable file: {file}") # Debug
-
- if not files:
- print("โ No downloadable files found") # Debug
- return None, "๐ No completed files found in output directory"
-
- # If only one file, return it directly
- if len(files) == 1:
- file_name = os.path.basename(files[0])
- file_size = os.path.getsize(files[0]) / (1024 * 1024) # MB
- # Normalize path for current OS
- normalized_path = os.path.normpath(files[0])
- print(f"๐ฅ Returning single file: {normalized_path}") # Debug
- return normalized_path, f"๐ฅ Ready to download: {file_name} ({file_size:.1f}MB)"
-
- # If multiple files, create a zip
- zip_path = os.path.join(INSTANCE_OUTPUT_DIR, f"all_results_{INSTANCE_ID}.zip")
- print(f"๐ฆ Creating zip file: {zip_path}") # Debug
-
- # Remove old zip if exists
- if os.path.exists(zip_path):
- os.remove(zip_path)
- print("๐๏ธ Removed old zip file") # Debug
-
- with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for file_path in files:
- if not file_path.endswith('.zip'): # Don't zip existing zips
- file_name = os.path.basename(file_path)
- zipf.write(file_path, file_name)
- print(f"๐ฆ Added to zip: {file_name}") # Debug
-
- zip_size = os.path.getsize(zip_path) / (1024 * 1024) # MB
- print(f"โ
Zip created successfully: {zip_size:.1f}MB") # Debug
- # Normalize path for current OS
- normalized_zip_path = os.path.normpath(zip_path)
- return normalized_zip_path, f"๐ฆ Ready to download: {len(files)} files ({zip_size:.1f}MB total)"
-
- except Exception as e:
- error_msg = f"โ Error accessing downloads: {str(e)}"
- print(error_msg) # Debug
- return None, error_msg
-
-def handle_download_click():
- """Handle download button click and return file + status"""
- download_file_path, status_message = get_instance_downloads()
- print(f"๐ Download click - File: {download_file_path}, Status: {status_message}")
-
- if download_file_path and os.path.exists(download_file_path):
- # Show download component and hide download button temporarily
- return (
- download_file_path, # Set file for download
- status_message, # Update status
- gr.update(visible=True), # Show download file component
- gr.update(visible=False) # Hide download button temporarily
- )
- else:
- return (
- None,
- status_message,
- gr.update(visible=False), # Keep download component hidden
- gr.update(visible=True) # Keep download button visible
- )
-
-def reset_download_ui():
- """Reset download UI after download completes"""
- # Called when download file component changes (indicating download started)
- return (
- gr.update(visible=False), # Hide download file component
- gr.update(visible=True), # Show download button again
- "๐ฅ Download completed! Ready for next download."
- )
-
-def check_downloads_status():
- """Check and return download status for the UI"""
- download_file, status_message = get_instance_downloads()
- return status_message
-
-def reset_to_defaults():
- """Reset all settings to their default values"""
- return (
- None, # source_image
- None, # target_video
- ['face_swapper', 'face_enhancer'], # frame_processor
- 'top-bottom', # face_analyser_direction
- 'reference', # face_recognition
- 'female', # face_analyser_gender
- 'adult', # face_analyser_age
- False, # skip_audio
- True, # keep_fps
- False, # keep_temp
- 'wav2lip_gan_96', # lip_syncer_model
- False, # enable_lip_sync
- False, # use_folder_mode
- AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only", # gpu_selection
- "๐ง Configuration reset to defaults. Ready for new processing session!\n", # cli_output
- "๐ RESET CONFIGURATION" # button text
- )
-
-def cancel_processing():
- """Cancel the current processing operation"""
- global current_process
- if current_process and current_process.poll() is None:
- try:
- current_process.terminate()
- current_process.wait(timeout=10) # Wait up to 10 seconds
- except subprocess.TimeoutExpired:
- current_process.kill() # Force kill if it doesn't terminate
- current_process.wait()
- except Exception as e:
- print(f"โ ๏ธ Error cancelling process: {e}")
- return "โน๏ธ Processing operation cancelled by user request.\n๐ System ready for new configuration.\n"
- return "โ ๏ธ No active processing session found.\n"
-
-def compress_video_if_needed(input_path, max_size_mb=1000):
- """Compress video if it exceeds the size limit"""
- if not MOVIEPY_AVAILABLE:
- print("โ ๏ธ MoviePy not available - skipping compression")
- return input_path
-
- try:
- file_size_mb = os.path.getsize(input_path) / (1024 * 1024)
-
- if file_size_mb > max_size_mb:
- print(f"Video size ({file_size_mb:.1f}MB) exceeds limit. Compressing...")
- compressed_path = input_path.replace('.mp4', '_compressed.mp4')
-
- clip = VideoFileClip(input_path)
- # Reduce quality for compression
- clip.write_videofile(
- compressed_path,
- fps=24, # Reduce FPS
- bitrate="2000k", # Reduce bitrate
- audio_codec='aac'
- )
- clip.close()
- return compressed_path
- except Exception as e:
- print(f"โ Compression failed: {e}")
-
- return input_path
-
-def resize_video(file, export, fps):
- """Resize video with fallback if MoviePy unavailable"""
- if not MOVIEPY_AVAILABLE:
- print("โ ๏ธ MoviePy not available - copying video without resizing")
- shutil.copy(file, export)
- return
-
- # Get the Convert directory path
- convert_dir = os.path.join(BASE_DIR, "Convert")
-
- # Compress if needed before processing (only for single video mode)
- if not file.startswith(convert_dir):
- file = compress_video_if_needed(file)
-
- try:
- # Load the video without applying crossfade blending to maintain sharpness in frames
- clip = VideoFileClip(file)
- # Write the video at the original resolution and fps, without blending
- clip.write_videofile(export, fps=fps, audio_codec='aac')
- clip.close()
- except Exception as e:
- print(f"โ Video processing failed: {e}")
- shutil.copy(file, export)
-
-def extract_audio(video_path, audio_path):
- """Extract audio from video file"""
- if not MOVIEPY_AVAILABLE:
- print("โ ๏ธ MoviePy not available - cannot extract audio")
- return False
-
- try:
- clip = VideoFileClip(video_path)
- if clip.audio is not None:
- clip.audio.write_audiofile(audio_path, logger=None)
- clip.close()
- return True
- else:
- clip.close()
- return False
- except Exception as e:
- print(f"โ Error extracting audio: {e}")
- return False
-
-def cleanup_temp_files():
- """Clean up temporary files from previous runs"""
- temp_files = [
- get_instance_file_path('source-image.jpg'),
- get_instance_file_path('resize-vid.mp4'),
- get_instance_file_path('target-audio.wav')
- ]
-
- cleanup_count = 0
-
- # Remove specific temp files for this instance
- for temp_file in temp_files:
- if os.path.exists(temp_file):
- try:
- os.remove(temp_file)
- print(f"๐งน Cleaned up: {temp_file}")
- cleanup_count += 1
- except Exception as e:
- print(f"โ ๏ธ Could not remove {temp_file}: {e}")
-
- # Clean up old instance directories (older than 1 hour)
- temp_base_dir = os.path.join(BASE_DIR, "Temp")
- if os.path.exists(temp_base_dir):
- try:
- for instance_dir in os.listdir(temp_base_dir):
- instance_path = os.path.join(temp_base_dir, instance_dir)
- if os.path.isdir(instance_path) and instance_dir.startswith('instance_'):
- # Check if directory is old enough to clean up
- if time.time() - os.path.getctime(instance_path) > 3600: # 1 hour
- try:
- shutil.rmtree(instance_path)
- print(f"๐งน Cleaned up old instance: {instance_path}")
- cleanup_count += 1
- except Exception as e:
- print(f"โ ๏ธ Could not remove {instance_path}: {e}")
- except Exception as e:
- print(f"โ ๏ธ Could not access temp directory: {e}")
-
- if cleanup_count > 0:
- print(f"โ
Startup cleanup completed: {cleanup_count} items removed")
- else:
- print("โจ Startup cleanup: No temp files found to remove")
-
-# Run cleanup on startup
-cleanup_temp_files()
-
-os.makedirs(INSTANCE_OUTPUT_DIR, exist_ok=True)
-
-def run_single_video(source_image, target_video, frame_processor, face_analyser_direction,
- face_recognition, face_analyser_gender, face_analyser_age, skip_audio,
- keep_fps, keep_temp, lip_syncer_model, enable_lip_sync, gpu_selection):
- """Process a single uploaded video"""
- global last_output_path, last_batch_mode
- last_batch_mode = False
-
- # Set GPU device
- set_gpu_device(gpu_selection)
-
- print(f'๐ฌ Processing target video: {target_video}')
-
- # Saving the uploaded image and video with instance-specific paths
- new_source = get_instance_file_path('source-image.jpg')
- new_target = get_instance_file_path('resize-vid.mp4')
-
- # Copy the files locally
- shutil.copy(source_image, new_source)
- resize_video(file=target_video, export=new_target, fps=30)
-
- if not os.path.exists(new_source):
- return "โ Source image file does not exist", ""
- if not os.path.exists(new_target):
- return "โ Target video file does not exist", ""
-
- # Extract the original filenames of the source image and target video
- source_image_name = os.path.splitext(os.path.basename(source_image))[0]
- target_video_name = os.path.splitext(os.path.basename(target_video))[0]
-
- selected_frame_processors = ' '.join(frame_processor)
-
- # Handle audio extraction for lip sync from the TARGET video itself
- audio_source_path = None
- if enable_lip_sync:
- audio_source_path = get_instance_file_path('target-audio.wav')
- if not extract_audio(new_target, audio_source_path):
- print(f"โ ๏ธ Warning: Could not extract audio from {target_video}. Skipping lip sync.")
- enable_lip_sync = False
-
- # Add lip sync suffix to filename if enabled
- suffix = "_lipsynced" if enable_lip_sync else ""
- output_filename = f"{source_image_name}_{target_video_name}{suffix}.mp4"
- output_path = os.path.join(INSTANCE_OUTPUT_DIR, output_filename)
-
- os.makedirs(INSTANCE_OUTPUT_DIR, exist_ok=True)
-
- # Determine execution provider based on GPU selection
- if gpu_selection.startswith("GPU"):
- execution_provider = "cuda"
- else:
- execution_provider = "cpu"
-
- # Construct command as a single string and use shlex.split to handle it
- cmd = (
- f"python run.py --execution-providers {execution_provider} "
- f"--execution-thread-count 8 " # Changed from 16 to 8
- f"--reference-face-distance 1.5 "
- f"-s {shlex.quote(new_source)} -t {shlex.quote(new_target)} -o {shlex.quote(output_path)} "
- f"--frame-processors {selected_frame_processors} "
- f"--face-analyser-direction {face_analyser_direction} "
- )
-
- # Add lip sync parameters if enabled
- if enable_lip_sync and audio_source_path:
- cmd += f"--source-paths {shlex.quote(audio_source_path)} "
- cmd += f"--lip-syncer-model {lip_syncer_model} "
- # Ensure lip_syncer is in frame processors
- if 'lip_syncer' not in frame_processor:
- frame_processor_with_lip = list(frame_processor) + ['lip_syncer']
- cmd = cmd.replace(f"--frame-processors {selected_frame_processors}",
- f"--frame-processors {' '.join(frame_processor_with_lip)}")
-
- if face_recognition != 'none':
- cmd += f"--face-recognition {face_recognition} "
- if face_analyser_gender != 'none':
- cmd += f"--face-analyser-gender {face_analyser_gender} "
-
- # Add the face_analyser_age parameter
- cmd += f"--face-analyser-age {face_analyser_age} "
-
- if skip_audio and not enable_lip_sync: # Don't skip audio if lip syncing
- cmd += "--skip-audio "
- if keep_fps:
- cmd += "--keep-fps "
- if keep_temp:
- cmd += "--keep-temp "
-
- try:
- print("Started command...", cmd)
- start_time = time.time()
-
- # Use shlex.split(cmd) to safely handle spaces in paths
- global current_process
- current_process = sp.Popen(shlex.split(cmd), stdout=sp.PIPE, stderr=sp.STDOUT, text=True, bufsize=1, universal_newlines=True)
- process = current_process
-
- output_lines = []
- cli_output = ""
-
- while True:
- output = process.stdout.readline()
- if output == '' and process.poll() is not None:
- break
- if output:
- line = output.strip()
- print(line)
- output_lines.append(line)
-
- # Build up CLI output for display
- cli_output += line + "\n"
-
- # Keep only last 50 lines to prevent memory issues
- if len(output_lines) > 50:
- output_lines = output_lines[-50:]
- cli_output = "\n".join(output_lines[-50:]) + "\n"
-
- # Yield intermediate results to update the interface
- yield None, cli_output
-
- rc = process.poll()
- end_time = time.time()
- execution_time = end_time - start_time
-
- final_output = cli_output + f"\n\nCommand execution time: {execution_time:.2f} seconds"
-
- if rc != 0:
- return f"An error occurred during command execution.", final_output
-
- # Clean up to free GPU memory
- del process
- if torch.cuda.is_available():
- torch.cuda.empty_cache()
- gc.collect()
-
- # Clean up temporary audio file
- if audio_source_path and os.path.exists(audio_source_path):
- os.remove(audio_source_path)
-
- # Set the last output path for downloading
- last_output_path = output_path
- return output_path, final_output
-
- except Exception as e:
- # Clean up temporary audio file in case of error
- if audio_source_path and os.path.exists(audio_source_path):
- os.remove(audio_source_path)
- return f"An error occurred: {str(e)}", cli_output
-
-def run_folder_batch(source_image, frame_processor, face_analyser_direction, face_recognition,
- face_analyser_gender, skip_audio, keep_fps, keep_temp, lip_syncer_model, enable_lip_sync, gpu_selection):
- """Process all videos in Convert folder"""
- global last_output_path, last_batch_mode
- last_batch_mode = True
-
- # Set GPU device
- set_gpu_device(gpu_selection)
-
- video_directory = os.path.join(BASE_DIR, "Convert")
-
- # Create Convert directory if it doesn't exist
- os.makedirs(video_directory, exist_ok=True)
-
- video_files = [os.path.join(video_directory, f) for f in os.listdir(video_directory) if f.lower().endswith(('.mp4', '.avi', '.mov', '.mkv'))]
-
- if not video_files:
- yield None, f"๐ No video files found in the directory: {video_directory}"
- return
-
- new_source = get_instance_file_path('source-image.jpg')
- shutil.copy(source_image, new_source)
-
- if not os.path.exists(new_source):
- yield None, "โ Source image file does not exist"
- return
-
- # Extract the original filename of the source image
- source_image_name = os.path.splitext(os.path.basename(source_image))[0]
-
- cli_output = f"๐ Found {len(video_files)} videos to process in {video_directory}\n"
- cli_output += f"๐ฏ Source image: {source_image_name}\n"
- cli_output += f"๐ฅ๏ธ GPU Selection: {gpu_selection}\n"
- cli_output += f"๐ Instance Output: {INSTANCE_OUTPUT_DIR}\n\n"
- yield None, cli_output
-
- successful_videos = 0
- failed_videos = 0
-
- for i, target_video in enumerate(video_files, 1):
- current_video_output = f"[{i}/{len(video_files)}] ๐ฌ Processing: {os.path.basename(target_video)}\n"
- cli_output += current_video_output
- print(f"[{i}/{len(video_files)}] Processing: {os.path.basename(target_video)}") # Console output
- yield None, cli_output
-
- new_target = get_instance_file_path('resize-vid.mp4')
-
- try:
- resize_video(file=target_video, export=new_target, fps=30)
- except Exception as e:
- error_msg = f"โ Error resizing video {target_video}: {e}\n"
- cli_output += error_msg
- print(error_msg.strip()) # Console output
- failed_videos += 1
- yield None, cli_output
- continue # Proceed to next video
-
- if not os.path.exists(new_target):
- error_msg = f"โ Target video file {target_video} does not exist after resizing.\n"
- cli_output += error_msg
- print(error_msg.strip()) # Console output
- failed_videos += 1
- yield None, cli_output
- continue # Proceed to next video
-
- target_video_name = os.path.splitext(os.path.basename(target_video))[0]
-
- # Handle audio extraction for lip sync from the TARGET video itself
- audio_source_path = None
- if enable_lip_sync:
- audio_source_path = get_instance_file_path('target-audio.wav')
- if not extract_audio(new_target, audio_source_path):
- warning_msg = f"โ ๏ธ Warning: Could not extract audio from {target_video}. Skipping lip sync.\n"
- cli_output += warning_msg
- print(warning_msg.strip()) # Console output
- yield None, cli_output
- enable_lip_sync = False
-
- # Add lip sync suffix to filename if enabled
- suffix = "_lipsynced" if enable_lip_sync else ""
- output_filename = f"{source_image_name}_{target_video_name}{suffix}.mp4"
- output_path = os.path.join(INSTANCE_OUTPUT_DIR, output_filename)
-
- os.makedirs(INSTANCE_OUTPUT_DIR, exist_ok=True)
-
- # Determine execution provider based on GPU selection
- if gpu_selection.startswith("GPU"):
- execution_provider = "cuda"
- else:
- execution_provider = "cpu"
-
- # Construct command as a single string and use shlex.split to handle it
- cmd = (
- f"python run.py --execution-providers {execution_provider} "
- f"--execution-thread-count 8 " # Changed from 16 to 8
- f"--reference-face-distance 1.5 "
- f"-s {shlex.quote(new_source)} -t {shlex.quote(new_target)} -o {shlex.quote(output_path)} "
- f"--frame-processors {' '.join(frame_processor)} "
- f"--face-analyser-direction {face_analyser_direction} "
- )
-
- # Add lip sync parameters if enabled
- if enable_lip_sync and audio_source_path:
- cmd += f"--source-paths {shlex.quote(audio_source_path)} "
- cmd += f"--lip-syncer-model {lip_syncer_model} "
- # Ensure lip_syncer is in frame processors
- if 'lip_syncer' not in frame_processor:
- frame_processor_with_lip = list(frame_processor) + ['lip_syncer']
- else:
- frame_processor_with_lip = frame_processor
- # Update the command with the new frame processors
- cmd = cmd.replace(f"--frame-processors {' '.join(frame_processor)}",
- f"--frame-processors {' '.join(frame_processor_with_lip)}")
-
- if face_recognition != 'none':
- cmd += f"--face-recognition {face_recognition} "
- if face_analyser_gender != 'none':
- cmd += f"--face-analyser-gender {face_analyser_gender} "
-
- if skip_audio and not enable_lip_sync: # Don't skip audio if lip syncing
- cmd += "--skip-audio "
- if keep_fps:
- cmd += "--keep-fps "
- if keep_temp:
- cmd += "--keep-temp "
-
- try:
- cmd_msg = f"Starting processing with command...\n"
- cli_output += cmd_msg
- print("Starting processing...") # Console output
- yield None, cli_output
-
- start_time = time.time()
- # Use shlex.split(cmd) to safely handle spaces in paths
- global current_process
- current_process = sp.Popen(shlex.split(cmd), stdout=sp.PIPE, stderr=sp.STDOUT, text=True, bufsize=1, universal_newlines=True)
- process = current_process
-
- line_count = 0
- last_update_time = time.time()
-
- while True:
- output = process.stdout.readline()
- if output == '' and process.poll() is not None:
- break
- if output:
- line = output.strip()
- print(line) # Always show in console
-
- # Only update web interface every 10 lines or every 2 seconds to prevent slowdown
- line_count += 1
- current_time = time.time()
-
- if line_count % 10 == 0 or current_time - last_update_time > 2:
- cli_output += line + "\n"
- # Keep only last 50 lines to prevent memory issues
- lines = cli_output.split('\n')
- if len(lines) > 50:
- cli_output = '\n'.join(lines[-50:])
- yield None, cli_output
- last_update_time = current_time
-
- rc = process.poll()
- end_time = time.time()
- execution_time = end_time - start_time
-
- if rc != 0:
- error_msg = f"An error occurred during command execution for {target_video}.\n"
- cli_output += error_msg
- print(error_msg.strip()) # Console output
- failed_videos += 1
- yield None, cli_output
- # Ensure the process is terminated
- try:
- process.kill()
- process.wait()
- except:
- pass
- continue # Proceed to next video
- else:
- success_msg = f"Processing completed for {target_video} in {execution_time:.2f} seconds.\n\n"
- cli_output += success_msg
- print(f"Processing completed for {os.path.basename(target_video)} in {execution_time:.2f} seconds.") # Console output
- successful_videos += 1
- yield None, cli_output
-
- # Clean up to free GPU memory
- del process
- if torch.cuda.is_available():
- torch.cuda.empty_cache()
- gc.collect()
-
- except Exception as e:
- error_msg = f"An error occurred while processing {target_video}: {str(e)}\n"
- cli_output += error_msg
- print(error_msg.strip()) # Console output
- failed_videos += 1
- yield None, cli_output
- continue # Proceed to next video
-
- # Clean up temporary audio file
- if audio_source_path and os.path.exists(audio_source_path):
- try:
- os.remove(audio_source_path)
- except Exception as e:
- print(f"โ ๏ธ Could not remove audio file: {e}")
-
- final_msg = f"\n=== BATCH PROCESSING COMPLETE ===\n"
- final_msg += f"โ
Successfully processed: {successful_videos} videos\n"
- final_msg += f"โ Failed: {failed_videos} videos\n"
- final_msg += f"๐ Total videos: {len(video_files)}\n"
- final_msg += f"๐๏ธ Check the output folder for results: {INSTANCE_OUTPUT_DIR}"
- cli_output += final_msg
- print(f"=== BATCH PROCESSING COMPLETE === Successfully processed: {successful_videos}/{len(video_files)} videos") # Console output
-
- # Set up for batch download
- if successful_videos > 0:
- last_output_path = create_batch_zip()
- if last_output_path:
- cli_output += f"\n๐ฆ Batch zip created: {os.path.basename(last_output_path)}"
- else:
- cli_output += f"\nโ ๏ธ Warning: Could not create batch zip file"
-
- yield None, cli_output
-
-def handle_button_action(button_text, source_image, target_video, frame_processor, face_analyser_direction,
- face_recognition, face_analyser_gender, face_analyser_age, skip_audio,
- keep_fps, keep_temp, lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection):
- """Handle the multi-purpose button actions"""
- global last_output_path, last_batch_mode
-
- if "RESET" in button_text:
- # Reset to defaults
- return reset_to_defaults()
- elif "CANCEL" in button_text:
- # Cancel processing
- cancel_msg = cancel_processing()
- return (
- source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
- face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
- lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection,
- cancel_msg, "๐ RESET CONFIGURATION"
- )
- elif "DOWNLOAD" in button_text:
- # Download results
- if last_batch_mode and last_output_path:
- # Return the zip file for batch download
- return (
- source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
- face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
- lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection,
- "๐ฅ Batch processing complete! Click download to get your zipped results!", "๐ฅ DOWNLOAD BATCH RESULTS"
- )
- elif not last_batch_mode and last_output_path:
- # Return the single file for download
- return (
- source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
- face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
- lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection,
- "๐ฅ Processing complete! Click download to get your enhanced video!", "๐ฅ DOWNLOAD VIDEO"
- )
-
- # Default return (shouldn't reach here normally)
- return (
- source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
- face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
- lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection,
- "", "๐ RESET CONFIGURATION"
- )
-
-def run_processing(source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
- face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
- lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection):
- """Main processing function"""
-
- if use_folder_mode:
- # Folder batch mode
- for _, cli_output in run_folder_batch(
- source_image, frame_processor, face_analyser_direction, face_recognition,
- face_analyser_gender, skip_audio, keep_fps, keep_temp, lip_syncer_model, enable_lip_sync, gpu_selection
- ):
- yield cli_output, "โน๏ธ CANCEL PROCESSING"
- # Processing complete
- yield cli_output + "\n\nโ
Batch processing completed successfully!", "๐ฅ DOWNLOAD BATCH RESULTS"
- else:
- # Single video mode
- output_path = None
- for video_result, cli_output in run_single_video(
- source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
- face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
- lip_syncer_model, enable_lip_sync, gpu_selection
- ):
- if video_result and not video_result.startswith("An error occurred"):
- output_path = video_result
- yield cli_output, "โน๏ธ CANCEL PROCESSING"
-
- # Processing complete
- if output_path and os.path.exists(output_path):
- yield cli_output + "\n\n๐ Video processing completed successfully!", "๐ฅ DOWNLOAD VIDEO"
- else:
- yield cli_output + "\n\nโ
I did what I was told!", "๐ RESET CONFIGURATION"
-
-def get_download_file():
- """Get the appropriate file for download"""
- global last_output_path, last_batch_mode
- if last_output_path and os.path.exists(last_output_path):
- return last_output_path
- return None
-
-def get_theme() -> gr.Theme:
- return gr.themes.Monochrome(
- primary_hue=gr.themes.colors.teal,
- secondary_hue=gr.themes.colors.gray,
- font=gr.themes.GoogleFont('Inter')
- ).set(
- background_fill_primary="#1f1f1f",
- background_fill_secondary="#2d2d2d",
- block_label_text_size="*text_sm",
- block_title_text_size="*text_md"
- )
-
-def toggle_lip_sync_visibility(enable_lip_sync):
- """Toggle visibility of lip sync related components"""
- return {
- lip_syncer_model_dropdown: gr.update(visible=enable_lip_sync)
- }
-
-def toggle_folder_mode(use_folder_mode):
- """Toggle visibility of target video upload based on folder mode"""
- return {
- target_video: gr.update(visible=not use_folder_mode),
- face_analyser_age_dropdown: gr.update(visible=not use_folder_mode)
- }
-
-with gr.Blocks(theme=get_theme(), css="""
- .gradio-container {
- max-width: 1800px !important;
- margin: 0 auto !important;
- background: linear-gradient(135deg, #0f0f23 0%, #1a1a2e 50%, #16213e 100%);
- min-height: 100vh;
- padding: 0.3rem !important;
- }
- .main-header {
- text-align: center;
- margin-bottom: 0.6rem;
- padding: 0.8rem;
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- border-radius: 12px;
- color: white;
- box-shadow: 0 10px 20px rgba(102, 126, 234, 0.3);
- position: relative;
- overflow: hidden;
- }
- .main-header h1 {
- font-size: 1.8rem !important;
- margin: 0 !important;
- }
- .main-header p {
- font-size: 0.9rem !important;
- margin: 0.2rem 0 0 0 !important;
- }
- .section-header {
- font-weight: 600;
- font-size: 0.95rem;
- margin-bottom: 0.6rem;
- color: #667eea;
- background: linear-gradient(90deg, #667eea, #764ba2);
- -webkit-background-clip: text;
- -webkit-text-fill-color: transparent;
- background-clip: text;
- border-bottom: 2px solid #667eea;
- padding-bottom: 0.3rem;
- position: relative;
- }
- .section-header::after {
- content: '';
- position: absolute;
- bottom: -2px;
- left: 0;
- width: 30px;
- height: 2px;
- background: linear-gradient(90deg, #667eea, #764ba2);
- border-radius: 2px;
- }
- .control-panel {
- background: linear-gradient(135deg, rgba(102, 126, 234, 0.1) 0%, rgba(118, 75, 162, 0.1) 100%);
- border-radius: 12px;
- padding: 0.8rem;
- margin-bottom: 0.5rem;
- border: 2px solid rgba(102, 126, 234, 0.2);
- box-shadow: 0 8px 20px rgba(102, 126, 234, 0.1);
- backdrop-filter: blur(10px);
- position: relative;
- overflow: hidden;
- height: fit-content;
- }
- .control-panel::before {
- content: '';
- position: absolute;
- top: 0;
- left: 0;
- right: 0;
- height: 1px;
- background: linear-gradient(90deg, transparent, rgba(255,255,255,0.3), transparent);
- }
- .button-row {
- display: flex;
- gap: 1rem;
- justify-content: center;
- margin: 0.8rem 0;
- }
- #action-buttons {
- margin-top: 0.5rem !important;
- }
- #action-buttons .gr-button {
- width: 100% !important;
- margin: 0.2rem 0 !important;
- }
- #download-btn {
- background: linear-gradient(135deg, #10b981, #059669) !important;
- border: none !important;
- color: white !important;
- font-weight: 600 !important;
- transition: all 0.3s ease !important;
- }
- #download-btn:hover {
- background: linear-gradient(135deg, #059669, #047857) !important;
- transform: translateY(-1px) !important;
- box-shadow: 0 4px 12px rgba(16, 185, 129, 0.4) !important;
- }
-
- /* Download status styling */
- .download-status {
- font-size: 0.8rem !important;
- background: rgba(16, 185, 129, 0.1) !important;
- border: 1px solid rgba(16, 185, 129, 0.3) !important;
- border-radius: 6px !important;
- margin-top: 0.3rem !important;
- }
-
- /* Download file component styling when visible */
- .download-component {
- background: rgba(16, 185, 129, 0.1) !important;
- border: 2px solid rgba(16, 185, 129, 0.4) !important;
- border-radius: 8px !important;
- padding: 0.5rem !important;
- margin-top: 0.3rem !important;
- }
-
- .processing-log {
- margin-top: 0.5rem;
- background: linear-gradient(135deg, rgba(15, 15, 35, 0.9) 0%, rgba(26, 26, 46, 0.9) 100%);
- border-radius: 12px;
- padding: 0.8rem;
- border: 2px solid rgba(102, 126, 234, 0.3);
- box-shadow: inset 0 2px 10px rgba(0,0,0,0.3), 0 8px 20px rgba(102, 126, 234, 0.1);
- height: fit-content;
- }
- /* Enhanced form styling */
- .gr-form {
- background: transparent !important;
- }
- .gr-box {
- border-radius: 8px !important;
- border: 1px solid rgba(102, 126, 234, 0.2) !important;
- background: rgba(255, 255, 255, 0.02) !important;
- margin: 0.2rem 0 !important;
- padding: 0.3rem !important;
- }
- .gr-button {
- border-radius: 10px !important;
- font-weight: 600 !important;
- text-transform: uppercase !important;
- letter-spacing: 0.5px !important;
- transition: all 0.3s ease !important;
- box-shadow: 0 4px 15px rgba(0,0,0,0.3) !important;
- padding: 0.5rem 1.2rem !important;
- font-size: 0.85rem !important;
- }
- .gr-button:hover {
- transform: translateY(-1px) !important;
- box-shadow: 0 6px 20px rgba(0,0,0,0.4) !important;
- }
- .gr-button-primary {
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
- border: none !important;
- }
- .gr-button-secondary {
- background: linear-gradient(135deg, #434343 0%, #000000 100%) !important;
- border: 1px solid rgba(102, 126, 234, 0.3) !important;
- color: white !important;
- }
- /* Configuration grid styling */
- .config-section {
- background: rgba(255, 255, 255, 0.03);
- border-radius: 10px;
- padding: 0.6rem;
- border: 1px solid rgba(102, 126, 234, 0.15);
- margin: 0.2rem;
- }
- .config-section h4 {
- font-size: 0.85rem !important;
- margin: 0 0 0.4rem 0 !important;
- }
- /* Compact textbox styling */
- .gr-textbox {
- background: rgba(15, 15, 35, 0.8) !important;
- border: 1px solid rgba(102, 126, 234, 0.3) !important;
- border-radius: 8px !important;
- color: #e2e8f0 !important;
- font-size: 0.8rem !important;
- }
- /* Compact file upload styling */
- .gr-file {
- border: 2px dashed rgba(102, 126, 234, 0.4) !important;
- border-radius: 10px !important;
- background: rgba(102, 126, 234, 0.05) !important;
- transition: all 0.3s ease !important;
- min-height: 85px !important;
- max-height: 90px !important;
- }
- .gr-file:hover {
- border-color: rgba(102, 126, 234, 0.6) !important;
- background: rgba(102, 126, 234, 0.1) !important;
- }
- .gr-file .gr-file-label {
- font-size: 0.75rem !important;
- line-height: 1.2 !important;
- }
- .gr-file .upload-container {
- padding: 0.8rem !important;
- }
- /* Simplified dropdown styling - remove complex CSS that might interfere */
- .gr-dropdown {
- background: rgba(255, 255, 255, 0.05) !important;
- border: 1px solid rgba(102, 126, 234, 0.3) !important;
- border-radius: 6px !important;
- font-size: 0.8rem !important;
- }
-
- /* Let Gradio handle dropdown positioning naturally */
- #main-gpu-dropdown {
- position: relative !important;
- }
-
- /* GPU debug styling */
- .gpu-debug {
- font-size: 0.75rem !important;
- background: rgba(255, 165, 0, 0.1) !important;
- border: 1px solid rgba(255, 165, 0, 0.3) !important;
- border-radius: 6px !important;
- margin-top: 0.3rem !important;
- }
- /* Compact checkbox styling */
- .gr-checkbox {
- font-size: 0.8rem !important;
- }
- /* Make everything more compact */
- .gr-group {
- gap: 0.3rem !important;
- }
- .gr-column {
- gap: 0.3rem !important;
- }
- .gr-row {
- gap: 0.5rem !important;
- }
- /* Text size adjustments */
- .gr-label {
- font-size: 0.8rem !important;
- }
- .gr-info {
- font-size: 0.7rem !important;
- }
- /* Ultra compact mode for smaller screens */
- @media (max-height: 900px) {
- .processing-log .gr-textbox {
- max-height: 200px !important;
- }
- .main-header {
- padding: 0.6rem !important;
- margin-bottom: 0.4rem !important;
- }
- .control-panel {
- padding: 0.6rem !important;
- margin-bottom: 0.3rem !important;
- }
- .gr-file {
- min-height: 70px !important;
- max-height: 75px !important;
- }
- }
- @media (max-height: 800px) {
- .processing-log .gr-textbox {
- max-height: 150px !important;
- }
- .main-header h1 {
- font-size: 1.5rem !important;
- }
- .gr-file {
- min-height: 65px !important;
- max-height: 70px !important;
- }
- }
- @media (max-height: 700px) {
- .gr-file {
- min-height: 60px !important;
- max-height: 65px !important;
- }
- .control-panel {
- padding: 0.4rem !important;
- }
- }
-""") as ui:
-
- with gr.Column(elem_classes="main-header"):
- gr.Markdown(f"""
- # ๐ญ Advanced Face Swap Studio
- **Professional-grade AI face swapping technology**
- *Instance: {INSTANCE_ID} | Port: {INSTANCE_PORT}*
- """, elem_classes="main-header")
-
- with gr.Tabs():
- # Main processing tab
- with gr.Tab("๐ญ Face Swap", id="main"):
- with gr.Row():
- # Left Column - Source Input + Action Buttons
- with gr.Column(scale=2):
- with gr.Group(elem_classes="control-panel"):
- gr.HTML('
')
-
- gr.HTML('๐ฏ Face Source
')
- source_image = gr.File(
- label="Upload Source Image",
- file_types=["image"],
- file_count="single",
- height=85
- )
- gr.HTML('Clear image with the face to use
')
-
- # Target video upload (visible by default)
- gr.HTML('๐ฌ Target Video
')
- target_video = gr.File(
- label="Upload Target Video",
- file_types=["video"],
- file_count="single",
- visible=True,
- height=85
- )
- gr.HTML('Video where faces will be replaced
')
-
- # Action Buttons moved to left column
- with gr.Group(elem_classes="control-panel", elem_id="action-buttons"):
- gr.HTML('')
- with gr.Row():
- start_button = gr.Button(
- "๐ LAUNCH PROCESSING",
- variant="primary",
- size="lg",
- elem_id="start-btn"
- )
- with gr.Row():
- action_button = gr.Button(
- "๐ RESET CONFIGURATION",
- variant="secondary",
- size="lg",
- elem_id="action-btn"
- )
- with gr.Row():
- download_button = gr.Button(
- "๐ฅ DOWNLOAD RESULTS",
- variant="secondary",
- size="lg",
- elem_id="download-btn",
- visible=True
- )
- with gr.Row():
- download_status = gr.Textbox(
- label="๐ฅ Download Status",
- value="startup_status", # Show startup status
- interactive=False,
- visible=True,
- lines=2,
- elem_classes="download-status"
- )
-
- # Download file component - now visible when needed
- download_file = gr.File(
- label="๐ฅ Click to Download",
- visible=False,
- file_count="single",
- file_types=None, # Allow all file types
- interactive=False,
- elem_classes="download-component"
- )
-
- # Middle Column - Core Processing Configuration
- with gr.Column(scale=3):
- with gr.Group(elem_classes="control-panel"):
- gr.HTML('')
-
- # Main configuration in a clean grid layout
- with gr.Row():
- with gr.Column(scale=1, elem_classes="config-section"):
- gr.HTML('๐ญ Frame Processing
')
- # Get available frame processors based on what's installed
- available_processors = ['face_swapper']
- if ENHANCEMENT_AVAILABLE:
- available_processors.extend(['face_enhancer', 'frame_enhancer'])
-
- frame_processor_checkbox = gr.CheckboxGroup(
- choices=available_processors,
- label='Active Processors',
- value=['face_swapper'] + (['face_enhancer'] if ENHANCEMENT_AVAILABLE else []),
- visible=True,
- info="โ ๏ธ frame_enhancer increases processing time" if ENHANCEMENT_AVAILABLE else "๐ง Basic mode - enhancement modules not available"
- )
-
- # Lip sync controls
- enable_lip_sync = gr.Checkbox(
- label="๐ต Enable Lip Sync",
- value=False,
- info="โ ๏ธ Beta feature"
- )
-
- lip_syncer_model_dropdown = gr.Dropdown(
- label='Lip Sync Model',
- choices=['wav2lip_96', 'wav2lip_gan_96'],
- value='wav2lip_gan_96',
- visible=False,
- scale=1
- )
-
- with gr.Column(scale=1, elem_classes="config-section"):
- gr.HTML('๐ Face Analysis
')
-
- face_recognition_dropdown = gr.Dropdown(
- label='Recognition Mode',
- choices=['none', 'reference', 'many'],
- value='reference',
- visible=True
- )
-
- face_analyser_direction_dropdown = gr.Dropdown(
- label='Analysis Direction',
- choices=['left-right', 'right-left', 'top-bottom', 'bottom-top', 'small-large', 'large-small'],
- value='top-bottom',
- visible=True
- )
-
- face_analyser_gender_dropdown = gr.Dropdown(
- label='Target Gender',
- choices=['none', 'male', 'female'],
- value='female',
- visible=True
- )
-
- face_analyser_age_dropdown = gr.Dropdown(
- label='Target Age Group',
- choices=['child', 'teen', 'adult', 'senior'],
- value='adult',
- visible=True
- )
-
- # Hidden option
- keep_temp = gr.Checkbox(
- label="๐๏ธ Keep Temp Files",
- value=False,
- visible=False
- )
-
- # Right Column - Processing Log + Processing Options
- with gr.Column(scale=3):
- with gr.Group(elem_classes="processing-log"):
- gr.HTML('')
-
- cli_output = gr.Textbox(
- label="๐ Live Processing Output",
- lines=12,
- max_lines=15,
- interactive=False,
- show_copy_button=True,
- container=True,
- placeholder=f"๐ง System ready. Configure settings and click 'Launch Processing'...\n\nโก Real-time progress updates\n๐ Performance metrics\n๐ฏ Processing logs\nโจ Completion notifications\n\n๐ Instance: {INSTANCE_ID}\n๐ Output: {INSTANCE_OUTPUT_DIR}\n๐ฅ Download button scans output folder automatically",
- elem_id="processing-monitor"
- )
-
- # Processing options moved to right column
- with gr.Group(elem_classes="control-panel"):
- gr.HTML('')
- with gr.Row():
- with gr.Column():
- # Simple GPU selection - mirroring working test
- gpu_selection_dropdown = gr.Dropdown(
- label="๐ฅ๏ธ Compute Device",
- choices=AVAILABLE_GPUS,
- value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only",
- info="Select your GPU or CPU for processing",
- interactive=True,
- allow_custom_value=False,
- elem_id="main-gpu-dropdown"
- )
-
- gpu_debug_btn = gr.Button("๐ Debug GPU", size="sm")
-
- gpu_debug_output = gr.Textbox(
- label="๐ GPU Status",
- value=f"Detected: {len(AVAILABLE_GPUS)} options โ {', '.join(AVAILABLE_GPUS)}",
- interactive=False,
- lines=2,
- elem_classes="gpu-debug"
- )
-
- skip_audio = gr.Checkbox(
- label="๐ Skip Audio",
- value=False,
- info="Video only processing"
- )
- with gr.Column():
- use_folder_mode = gr.Checkbox(
- label="๐ Batch Mode",
- value=False,
- info="Process ./Convert folder"
- )
- keep_fps = gr.Checkbox(
- label="๐ฌ Preserve FPS",
- value=True,
- info="Keep original frame rate"
- )
-
- # GPU Test Tab
- with gr.Tab("๐ง GPU Test", id="test"):
- gr.Markdown("## GPU Dropdown Test")
- gr.Markdown("This tab tests if the GPU dropdown works correctly")
-
- test_gpu_dropdown = gr.Dropdown(
- label="Test GPU Selection",
- choices=AVAILABLE_GPUS,
- value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only",
- interactive=True,
- allow_custom_value=False,
- info="This should show all your GPUs as a proper dropdown"
- )
-
- test_output = gr.Textbox(
- label="Selected GPU",
- value=f"Current: {AVAILABLE_GPUS[0] if AVAILABLE_GPUS else 'None'}",
- interactive=False
- )
-
- test_status = gr.Textbox(
- label="GPU Detection Status",
- value=f"Detected {len(AVAILABLE_GPUS)} options: {', '.join(AVAILABLE_GPUS)}",
- interactive=False,
- lines=3
- )
-
- def test_gpu_change(selected):
- print(f"๐งช Test GPU Selected: {selected}")
- return f"You selected: {selected}"
-
- test_gpu_dropdown.change(
- test_gpu_change,
- inputs=[test_gpu_dropdown],
- outputs=[test_output]
- )
-
- # Toggle lip sync components visibility
- enable_lip_sync.change(
- toggle_lip_sync_visibility,
- inputs=[enable_lip_sync],
- outputs=[lip_syncer_model_dropdown]
- )
-
- # Toggle folder mode visibility
- use_folder_mode.change(
- toggle_folder_mode,
- inputs=[use_folder_mode],
- outputs=[target_video, face_analyser_age_dropdown]
- )
-
- # GPU selection change handler for debugging
- gpu_selection_dropdown.change(
- on_gpu_selection_change,
- inputs=[gpu_selection_dropdown],
- outputs=[]
- )
-
- # GPU debug button
- gpu_debug_btn.click(
- debug_gpu_dropdown,
- inputs=[],
- outputs=[gpu_selection_dropdown, gpu_debug_output]
- )
-
- # Main processing button
- start_button.click(
- run_processing,
- inputs=[
- source_image,
- target_video,
- frame_processor_checkbox,
- face_analyser_direction_dropdown,
- face_recognition_dropdown,
- face_analyser_gender_dropdown,
- face_analyser_age_dropdown,
- skip_audio,
- keep_fps,
- keep_temp,
- lip_syncer_model_dropdown,
- enable_lip_sync,
- use_folder_mode,
- gpu_selection_dropdown
- ],
- outputs=[cli_output, action_button]
- )
-
- # Multi-purpose action button
- action_button.click(
- handle_button_action,
- inputs=[
- action_button,
- source_image,
- target_video,
- frame_processor_checkbox,
- face_analyser_direction_dropdown,
- face_recognition_dropdown,
- face_analyser_gender_dropdown,
- face_analyser_age_dropdown,
- skip_audio,
- keep_fps,
- keep_temp,
- lip_syncer_model_dropdown,
- enable_lip_sync,
- use_folder_mode,
- gpu_selection_dropdown
- ],
- outputs=[
- source_image,
- target_video,
- frame_processor_checkbox,
- face_analyser_direction_dropdown,
- face_recognition_dropdown,
- face_analyser_gender_dropdown,
- face_analyser_age_dropdown,
- skip_audio,
- keep_fps,
- keep_temp,
- lip_syncer_model_dropdown,
- enable_lip_sync,
- use_folder_mode,
- gpu_selection_dropdown,
- cli_output,
- action_button
- ]
- )
-
- # FIXED: Download button functionality with proper download component management
- download_button.click(
- handle_download_click,
- inputs=[],
- outputs=[download_file, download_status, download_file, download_button],
- show_progress=True
- )
-
- # FIXED: Reset download UI when download component changes (download completes)
- download_file.change(
- reset_download_ui,
- inputs=[],
- outputs=[download_file, download_button, download_status]
- )
-
-# Print system information for verification
-print(f"๐ Base directory: {BASE_DIR}")
-print(f"๐ Instance ID: {INSTANCE_ID}")
-print(f"๐ Convert directory: {os.path.join(BASE_DIR, 'Convert')}")
-print(f"๐๏ธ Instance temp: {INSTANCE_TEMP_DIR}")
-print(f"๐ค Instance output: {INSTANCE_OUTPUT_DIR}")
-print(f"๐ฅ๏ธ Available GPUs: {AVAILABLE_GPUS}")
-print(f"๐ง Enhancement modules: {'โ
Available' if ENHANCEMENT_AVAILABLE else 'โ Not Available'}")
-
-# Determine if running in HuggingFace Spaces or similar cloud environment
-def get_launch_config():
- """Get appropriate launch configuration based on environment"""
- if os.getenv('SPACE_ID'): # HuggingFace Spaces
- return {
- 'server_name': "0.0.0.0",
- 'server_port': 7860, # HF Spaces always use 7860
- 'share': False,
- 'debug': False
- }
- elif os.getenv('COLAB_GPU'): # Google Colab
- return {
- 'server_name': "127.0.0.1",
- 'server_port': INSTANCE_PORT,
- 'share': True,
- 'debug': False
- }
- else: # Local development - use pre-determined port
- return {
- 'server_name': "127.0.0.1",
- 'server_port': INSTANCE_PORT,
- 'share': False,
- 'debug': True
- }
-
-# Launch configuration for better cross-platform compatibility
-launch_config = get_launch_config()
-print(f"๐ Launching on port: {launch_config['server_port']}")
-print(f"๐ Access URL: http://localhost:{launch_config['server_port']}")
-
-ui.launch(
- max_file_size="2100mb",
- **launch_config
+import os
+import urllib.request
+
+# Target path (relative to your repo root)
+MODEL_DIR = './.assets/models'
+MODEL_PATH = os.path.join(MODEL_DIR, 'GFPGANv1.4.pth')
+MODEL_URL = 'https://huggingface.co/talhaty/GFPGANv1.4/resolve/main/GFPGANv1.4.pth'
+
+# Create directory if it does not exist
+os.makedirs(MODEL_DIR, exist_ok=True)
+
+# Download if not already present
+if not os.path.isfile(MODEL_PATH):
+ print(f'Downloading GFPGANv1.4.pth to {MODEL_PATH}...')
+ urllib.request.urlretrieve(MODEL_URL, MODEL_PATH)
+ print('Download complete.')
+else:
+ print(f'GFPGANv1.4.pth already exists at {MODEL_PATH}, skipping download.')
+
+"""
+๐ญ Advanced Face Swap Studio - Cross-Environment Compatible Version
+================================================================
+
+โ
FEATURES:
+- Professional 3-column layout optimized for all screen sizes
+- GPU/CPU processing with automatic detection
+- Batch processing mode for multiple videos
+- Lip sync integration (beta)
+- Real-time processing monitor
+- Cross-platform compatibility (Windows, Linux, macOS)
+
+๐ง COMPATIBILITY:
+- HuggingFace Spaces ready
+- Google Colab compatible
+- Local development friendly
+- Graceful degradation for missing dependencies
+- Automatic environment detection and configuration
+
+๐ REQUIREMENTS:
+- gradio, torch, onnxruntime (core)
+- moviepy (optional - for video processing)
+- SwitcherAI modules (optional - for enhancement)
+
+๐ USAGE:
+- Local: python app.py
+- HuggingFace: Upload and run as Space
+- Colab: Upload and execute in notebook
+"""
+
+import os
+import sys
+import zipfile
+import time
+
+# Get the directory where app.py is located
+BASE_DIR = os.path.dirname(os.path.abspath(__file__))
+
+# Create unique instance ID for multi-instance support
+INSTANCE_ID = f"instance_{os.getpid()}_{int(time.time() % 10000)}"
+INSTANCE_TEMP_DIR = os.path.join(BASE_DIR, "Temp", INSTANCE_ID)
+INSTANCE_OUTPUT_DIR = os.path.join(BASE_DIR, "output", INSTANCE_ID)
+
+# Create instance-specific directories
+os.makedirs(INSTANCE_TEMP_DIR, exist_ok=True)
+os.makedirs(INSTANCE_OUTPUT_DIR, exist_ok=True)
+
+# Set up environment variables using relative paths (removed Conda and CUDA paths)
+os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
+os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "TRUE"
+os.environ["GRADIO_TEMP_DIR"] = INSTANCE_TEMP_DIR
+
+import gradio as gr
+import subprocess as sp
+import uuid
+import time
+import shutil
+try:
+ from moviepy.editor import *
+ MOVIEPY_AVAILABLE = True
+ print("โ
MoviePy loaded successfully")
+except ImportError as e:
+ print(f"โ ๏ธ MoviePy not available: {e}")
+ print("๐ Some video processing features may be limited")
+ MOVIEPY_AVAILABLE = False
+import gc # Import garbage collector
+
+# Add relative paths to sys.path
+sys.path.append(os.path.join(BASE_DIR, "SwitcherAI", "processors", "frame", "modules"))
+
+# Try to import optional enhancement modules (graceful degradation)
+try:
+ import face_enhancer
+ import frame_enhancer
+ ENHANCEMENT_AVAILABLE = True
+ print("โ
Enhancement modules loaded successfully")
+except ImportError as e:
+ print(f"โ ๏ธ Enhancement modules not available: {e}")
+ print("๐ App will run in basic mode without enhancement features")
+ ENHANCEMENT_AVAILABLE = False
+
+sys.path.append(BASE_DIR) # Add base directory
+
+import onnxruntime as ort
+import torch
+import shlex
+
+def find_available_port(start_port=7860, max_attempts=10):
+ """Find an available port starting from start_port"""
+ import socket
+
+ for i in range(max_attempts):
+ port = start_port + i
+ try:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ s.bind(('localhost', port))
+ print(f"๐ Found available port: {port}")
+ return port
+ except OSError:
+ continue
+
+ print(f"โ ๏ธ Could not find available port in range {start_port}-{start_port + max_attempts}")
+ return start_port # Fallback to original port
+
+def get_instance_file_path(filename):
+ """Get instance-specific file path to avoid conflicts"""
+ return os.path.join(INSTANCE_TEMP_DIR, filename)
+
+def get_available_gpus():
+ """Get list of available CUDA devices with enhanced debugging"""
+ print("\n๐ Starting GPU Detection...")
+ available_gpus = []
+
+ # Check CUDA availability first
+ cuda_available = torch.cuda.is_available()
+ print(f"๐ CUDA Available: {cuda_available}")
+
+ if not cuda_available:
+ print("โ CUDA not available - returning CPU only")
+ return ["CPU Only"]
+
+ # Get device count
+ device_count = torch.cuda.device_count()
+ print(f"๐ข Total CUDA devices detected: {device_count}")
+
+ if device_count == 0:
+ print("โ No CUDA devices found despite CUDA being available")
+ return ["CPU Only"]
+
+ # Check environment variables that might limit GPU visibility
+ cuda_visible = os.environ.get('CUDA_VISIBLE_DEVICES')
+ if cuda_visible is not None:
+ print(f"๐ CUDA_VISIBLE_DEVICES: {cuda_visible}")
+
+ # Detect each GPU
+ for i in range(device_count):
+ try:
+ print(f"\n--- Checking GPU {i} ---")
+
+ # Get device properties
+ props = torch.cuda.get_device_properties(i)
+ gpu_name = props.name
+ gpu_memory = props.total_memory / (1024**3) # GB
+
+ print(f" Name: {gpu_name}")
+ print(f" Memory: {gpu_memory:.1f}GB")
+ print(f" Compute Capability: {props.major}.{props.minor}")
+
+ # Test device accessibility
+ try:
+ # Save current device
+ current_device = torch.cuda.current_device() if torch.cuda.is_available() else None
+
+ # Test the device
+ torch.cuda.set_device(i)
+ test_tensor = torch.tensor([1.0], device=f'cuda:{i}')
+
+ # Successful - add to list
+ gpu_entry = f"GPU {i}: {gpu_name} ({gpu_memory:.1f}GB)"
+ available_gpus.append(gpu_entry)
+ print(f" Status: โ
Accessible")
+ print(f" Added: {gpu_entry}")
+
+ # Cleanup
+ del test_tensor
+
+ # Restore previous device if it existed
+ if current_device is not None:
+ torch.cuda.set_device(current_device)
+
+ except Exception as device_error:
+ print(f" Status: โ Not accessible - {device_error}")
+ # Still add it to the list but mark as problematic
+ gpu_entry = f"GPU {i}: {gpu_name} (โ ๏ธ Issues)"
+ available_gpus.append(gpu_entry)
+ print(f" Added with warning: {gpu_entry}")
+
+ except Exception as e:
+ print(f" โ Error detecting GPU {i}: {e}")
+ # Add as unknown GPU
+ available_gpus.append(f"GPU {i}: Unknown GPU (Error)")
+
+ # Always add CPU option
+ available_gpus.append("CPU Only")
+
+ # Final summary
+ print(f"\n๐ GPU Detection Summary:")
+ print(f" Total devices found: {len(available_gpus)}")
+ for i, gpu in enumerate(available_gpus):
+ print(f" {i+1}. {gpu}")
+
+ print("โ
GPU detection complete\n")
+ return available_gpus
+
+def set_gpu_device(gpu_selection):
+ """Set the CUDA device based on user selection"""
+ if gpu_selection.startswith("GPU"):
+ try:
+ gpu_id = gpu_selection.split(":")[0].split(" ")[1]
+ os.environ["CUDA_VISIBLE_DEVICES"] = gpu_id
+ print(f"Set CUDA_VISIBLE_DEVICES to: {gpu_id}")
+ return gpu_id
+ except (IndexError, ValueError) as e:
+ print(f"โ ๏ธ Error parsing GPU selection '{gpu_selection}': {e}")
+ os.environ["CUDA_VISIBLE_DEVICES"] = ""
+ print("Falling back to CPU mode")
+ return "cpu"
+ else:
+ os.environ["CUDA_VISIBLE_DEVICES"] = ""
+ print("Using CPU mode")
+ return "cpu"
+
+# Get the port early for display in UI
+INSTANCE_PORT = find_available_port(7860) if not os.getenv('SPACE_ID') else 7860
+
+def check_environment():
+ """Check environment and display compatibility status"""
+ print("\n๐ ENVIRONMENT CHECK:")
+ print("=" * 50)
+
+ # Check Python version
+ import sys
+ python_version = sys.version_info
+ print(f"๐ Python: {python_version.major}.{python_version.minor}.{python_version.micro}")
+
+ # Check key dependencies
+ dependencies = {
+ 'gradio': 'โ
Available',
+ 'torch': 'โ
Available' if torch.__version__ else 'โ Not Available',
+ 'onnxruntime': 'โ
Available',
+ 'moviepy': 'โ
Available' if MOVIEPY_AVAILABLE else 'โ ๏ธ Limited functionality',
+ 'enhancement_modules': 'โ
Available' if ENHANCEMENT_AVAILABLE else 'โ ๏ธ Basic mode only'
+ }
+
+ for dep, status in dependencies.items():
+ print(f"๐ฆ {dep}: {status}")
+
+ # Check CUDA availability
+ if torch.cuda.is_available():
+ print(f"๐ CUDA: โ
Available ({torch.cuda.device_count()} device(s))")
+ for i in range(torch.cuda.device_count()):
+ try:
+ print(f" โโ GPU {i}: {torch.cuda.get_device_name(i)}")
+ except:
+ print(f" โโ GPU {i}: Unknown GPU")
+ else:
+ print("๐ CUDA: โ ๏ธ Not available (CPU mode only)")
+
+ print("=" * 50)
+ return True
+
+def create_requirements_file():
+ """Create a requirements.txt file for easy deployment"""
+ requirements = [
+ "gradio>=4.0.0",
+ "torch>=2.0.0",
+ "onnxruntime>=1.15.0",
+ "moviepy>=1.0.3",
+ "opencv-python>=4.8.0",
+ "numpy>=1.24.0",
+ "Pillow>=9.5.0"
+ ]
+
+ req_path = os.path.join(BASE_DIR, "requirements.txt")
+ try:
+ with open(req_path, 'w') as f:
+ f.write('\n'.join(requirements))
+ print(f"๐ Created requirements.txt at: {req_path}")
+ except Exception as e:
+ print(f"โ ๏ธ Could not create requirements.txt: {e}")
+
+def create_multi_instance_scripts():
+ """Create helper scripts for running multiple instances"""
+
+ # Windows batch script
+ batch_script = """@echo off
+echo Starting Face Swap Studio Instance...
+echo Instance will auto-detect available port starting from 7860
+echo.
+python app.py
+pause
+"""
+
+ # Linux/Mac shell script
+ shell_script = """#!/bin/bash
+echo "Starting Face Swap Studio Instance..."
+echo "Instance will auto-detect available port starting from 7860"
+echo ""
+python3 app.py
+"""
+
+ try:
+ # Create Windows script
+ with open(os.path.join(BASE_DIR, "launch_instance.bat"), 'w') as f:
+ f.write(batch_script)
+
+ # Create Linux/Mac script
+ script_path = os.path.join(BASE_DIR, "launch_instance.sh")
+ with open(script_path, 'w') as f:
+ f.write(shell_script)
+
+ # Make shell script executable
+ try:
+ os.chmod(script_path, 0o755)
+ except:
+ pass # Windows doesn't support chmod
+
+ print("๐ Created multi-instance launch scripts:")
+ print(" - launch_instance.bat (Windows)")
+ print(" - launch_instance.sh (Linux/Mac)")
+
+ except Exception as e:
+ print(f"โ ๏ธ Could not create launch scripts: {e}")
+
+# Create launch scripts
+create_multi_instance_scripts()
+
+# Create requirements file for deployment
+create_requirements_file()
+
+# Run environment check
+check_environment()
+
+# Get available GPUs for the dropdown
+AVAILABLE_GPUS = get_available_gpus()
+
+# Print available GPUs to console for debugging
+print("\n" + "="*60)
+print("๐ฅ๏ธ GPU INITIALIZATION FOR DROPDOWN")
+print("="*60)
+print(f"๐ Final GPU List for Dropdown ({len(AVAILABLE_GPUS)} items):")
+for i, gpu in enumerate(AVAILABLE_GPUS):
+ print(f" [{i}] {gpu}")
+print(f"๐ฏ Default selection: {AVAILABLE_GPUS[0] if AVAILABLE_GPUS else 'None'}")
+print(f"๐ List contents: {AVAILABLE_GPUS}")
+print(f"๐ข Total choices for dropdown: {len(AVAILABLE_GPUS)}")
+
+# Verify CUDA one more time
+print(f"\n๐ CUDA Status:")
+print(f" Available: {torch.cuda.is_available()}")
+if torch.cuda.is_available():
+ print(f" Device count: {torch.cuda.device_count()}")
+ for i in range(torch.cuda.device_count()):
+ try:
+ name = torch.cuda.get_device_name(i)
+ print(f" GPU {i}: {name}")
+ except:
+ print(f" GPU {i}: Error getting name")
+print("="*60 + "\n")
+
+# Create a simple GPU test function
+def debug_gpu_choices():
+ """Debug function to show what GPUs are available"""
+ print("๐ Debug GPU Choices Called:")
+ print(f" AVAILABLE_GPUS: {AVAILABLE_GPUS}")
+ print(f" Length: {len(AVAILABLE_GPUS)}")
+ return AVAILABLE_GPUS
+
+def on_gpu_selection_change(selected_gpu):
+ """Handle GPU selection change - for debugging"""
+ print(f"๐ฅ๏ธ GPU Selection Changed: {selected_gpu}")
+ return selected_gpu
+
+def refresh_gpu_list():
+ """Refresh the GPU list and return updated choices"""
+ global AVAILABLE_GPUS
+ print("๐ Refreshing GPU list...")
+ AVAILABLE_GPUS = get_available_gpus()
+
+ return gr.update(
+ choices=AVAILABLE_GPUS,
+ value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only",
+ interactive=True,
+ allow_custom_value=False
+ )
+
+def debug_gpu_dropdown():
+ """Debug the GPU dropdown and return status"""
+ global AVAILABLE_GPUS
+ print("๐ GPU Debug Button Clicked")
+ print(f" Current AVAILABLE_GPUS: {AVAILABLE_GPUS}")
+ print(f" Length: {len(AVAILABLE_GPUS)}")
+
+ # Force complete refresh of GPU detection
+ print("๐ Force refreshing GPU detection...")
+ AVAILABLE_GPUS = get_available_gpus()
+
+ debug_info = f"โ
DEBUG RESULTS:\n"
+ debug_info += f"โข CUDA Available: {torch.cuda.is_available()}\n"
+ debug_info += f"โข Device Count: {torch.cuda.device_count() if torch.cuda.is_available() else 0}\n"
+ debug_info += f"โข Detected Options: {len(AVAILABLE_GPUS)}\n"
+
+ for i, gpu in enumerate(AVAILABLE_GPUS):
+ debug_info += f" [{i}] {gpu}\n"
+
+ # Create completely new dropdown configuration
+ dropdown_update = gr.update(
+ choices=AVAILABLE_GPUS,
+ value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only",
+ interactive=True,
+ visible=True
+ )
+
+ print(f"๐ Updated dropdown with {len(AVAILABLE_GPUS)} choices")
+ return dropdown_update, debug_info
+
+# Global variable to track current process for cancellation
+current_process = None
+last_output_path = None
+last_batch_mode = False
+
+def create_batch_zip():
+ """Create a zip file of all output files"""
+ try:
+ if not os.path.exists(INSTANCE_OUTPUT_DIR):
+ print(f"โ Output directory does not exist: {INSTANCE_OUTPUT_DIR}")
+ return None
+
+ files = os.listdir(INSTANCE_OUTPUT_DIR)
+ if not files:
+ print("โ No files found in output directory")
+ return None
+
+ zip_path = os.path.join(INSTANCE_OUTPUT_DIR, f"batch_results_{INSTANCE_ID}.zip")
+
+ # Remove old zip if exists
+ if os.path.exists(zip_path):
+ os.remove(zip_path)
+ print("๐๏ธ Removed old zip file")
+
+ with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
+ for file in files:
+ if not file.endswith('.zip'): # Don't zip existing zips
+ file_path = os.path.join(INSTANCE_OUTPUT_DIR, file)
+ if os.path.isfile(file_path):
+ zipf.write(file_path, file)
+ print(f"๐ฆ Added to zip: {file}")
+
+ zip_size = os.path.getsize(zip_path) / (1024 * 1024) # MB
+ print(f"โ
Batch zip created: {zip_path} ({zip_size:.1f}MB)")
+ return zip_path
+
+ except Exception as e:
+ print(f"โ Error creating batch zip: {e}")
+ return None
+
+def get_instance_downloads():
+ """Get download file(s) from the current instance output directory"""
+ try:
+ print(f"๐ Checking downloads in: {INSTANCE_OUTPUT_DIR}") # Debug
+
+ if not os.path.exists(INSTANCE_OUTPUT_DIR):
+ print(f"โ Output directory does not exist: {INSTANCE_OUTPUT_DIR}")
+ return None, "๐ No output directory found for this instance"
+
+ # Get all video and zip files from the instance output directory
+ files = []
+ all_files = os.listdir(INSTANCE_OUTPUT_DIR)
+ print(f"๐ Files in output directory: {all_files}") # Debug
+
+ for file in all_files:
+ if file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.zip')):
+ full_path = os.path.join(INSTANCE_OUTPUT_DIR, file)
+ files.append(full_path)
+ print(f"โ
Found downloadable file: {file}") # Debug
+
+ if not files:
+ print("โ No downloadable files found") # Debug
+ return None, "๐ No completed files found in output directory"
+
+ # If only one file, return it directly
+ if len(files) == 1:
+ file_name = os.path.basename(files[0])
+ file_size = os.path.getsize(files[0]) / (1024 * 1024) # MB
+ # Normalize path for current OS
+ normalized_path = os.path.normpath(files[0])
+ print(f"๐ฅ Returning single file: {normalized_path}") # Debug
+ return normalized_path, f"๐ฅ Ready to download: {file_name} ({file_size:.1f}MB)"
+
+ # If multiple files, create a zip
+ zip_path = os.path.join(INSTANCE_OUTPUT_DIR, f"all_results_{INSTANCE_ID}.zip")
+ print(f"๐ฆ Creating zip file: {zip_path}") # Debug
+
+ # Remove old zip if exists
+ if os.path.exists(zip_path):
+ os.remove(zip_path)
+ print("๐๏ธ Removed old zip file") # Debug
+
+ with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
+ for file_path in files:
+ if not file_path.endswith('.zip'): # Don't zip existing zips
+ file_name = os.path.basename(file_path)
+ zipf.write(file_path, file_name)
+ print(f"๐ฆ Added to zip: {file_name}") # Debug
+
+ zip_size = os.path.getsize(zip_path) / (1024 * 1024) # MB
+ print(f"โ
Zip created successfully: {zip_size:.1f}MB") # Debug
+ # Normalize path for current OS
+ normalized_zip_path = os.path.normpath(zip_path)
+ return normalized_zip_path, f"๐ฆ Ready to download: {len(files)} files ({zip_size:.1f}MB total)"
+
+ except Exception as e:
+ error_msg = f"โ Error accessing downloads: {str(e)}"
+ print(error_msg) # Debug
+ return None, error_msg
+
+def handle_download_click():
+ """Handle download button click and return file + status"""
+ download_file_path, status_message = get_instance_downloads()
+ print(f"๐ Download click - File: {download_file_path}, Status: {status_message}")
+
+ if download_file_path and os.path.exists(download_file_path):
+ # Show download component and hide download button temporarily
+ return (
+ download_file_path, # Set file for download
+ status_message, # Update status
+ gr.update(visible=True), # Show download file component
+ gr.update(visible=False) # Hide download button temporarily
+ )
+ else:
+ return (
+ None,
+ status_message,
+ gr.update(visible=False), # Keep download component hidden
+ gr.update(visible=True) # Keep download button visible
+ )
+
+def reset_download_ui():
+ """Reset download UI after download completes"""
+ # Called when download file component changes (indicating download started)
+ return (
+ gr.update(visible=False), # Hide download file component
+ gr.update(visible=True), # Show download button again
+ "๐ฅ Download completed! Ready for next download."
+ )
+
+def check_downloads_status():
+ """Check and return download status for the UI"""
+ download_file, status_message = get_instance_downloads()
+ return status_message
+
+def reset_to_defaults():
+ """Reset all settings to their default values"""
+ return (
+ None, # source_image
+ None, # target_video
+ ['face_swapper', 'face_enhancer'], # frame_processor
+ 'top-bottom', # face_analyser_direction
+ 'reference', # face_recognition
+ 'female', # face_analyser_gender
+ 'adult', # face_analyser_age
+ False, # skip_audio
+ True, # keep_fps
+ False, # keep_temp
+ 'wav2lip_gan_96', # lip_syncer_model
+ False, # enable_lip_sync
+ False, # use_folder_mode
+ AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only", # gpu_selection
+ "๐ง Configuration reset to defaults. Ready for new processing session!\n", # cli_output
+ "๐ RESET CONFIGURATION" # button text
+ )
+
+def cancel_processing():
+ """Cancel the current processing operation"""
+ global current_process
+ if current_process and current_process.poll() is None:
+ try:
+ current_process.terminate()
+ current_process.wait(timeout=10) # Wait up to 10 seconds
+ except subprocess.TimeoutExpired:
+ current_process.kill() # Force kill if it doesn't terminate
+ current_process.wait()
+ except Exception as e:
+ print(f"โ ๏ธ Error cancelling process: {e}")
+ return "โน๏ธ Processing operation cancelled by user request.\n๐ System ready for new configuration.\n"
+ return "โ ๏ธ No active processing session found.\n"
+
+def compress_video_if_needed(input_path, max_size_mb=1000):
+ """Compress video if it exceeds the size limit"""
+ if not MOVIEPY_AVAILABLE:
+ print("โ ๏ธ MoviePy not available - skipping compression")
+ return input_path
+
+ try:
+ file_size_mb = os.path.getsize(input_path) / (1024 * 1024)
+
+ if file_size_mb > max_size_mb:
+ print(f"Video size ({file_size_mb:.1f}MB) exceeds limit. Compressing...")
+ compressed_path = input_path.replace('.mp4', '_compressed.mp4')
+
+ clip = VideoFileClip(input_path)
+ # Reduce quality for compression
+ clip.write_videofile(
+ compressed_path,
+ fps=24, # Reduce FPS
+ bitrate="2000k", # Reduce bitrate
+ audio_codec='aac'
+ )
+ clip.close()
+ return compressed_path
+ except Exception as e:
+ print(f"โ Compression failed: {e}")
+
+ return input_path
+
+def resize_video(file, export, fps):
+ """Resize video with fallback if MoviePy unavailable"""
+ if not MOVIEPY_AVAILABLE:
+ print("โ ๏ธ MoviePy not available - copying video without resizing")
+ shutil.copy(file, export)
+ return
+
+ # Get the Convert directory path
+ convert_dir = os.path.join(BASE_DIR, "Convert")
+
+ # Compress if needed before processing (only for single video mode)
+ if not file.startswith(convert_dir):
+ file = compress_video_if_needed(file)
+
+ try:
+ # Load the video without applying crossfade blending to maintain sharpness in frames
+ clip = VideoFileClip(file)
+ # Write the video at the original resolution and fps, without blending
+ clip.write_videofile(export, fps=fps, audio_codec='aac')
+ clip.close()
+ except Exception as e:
+ print(f"โ Video processing failed: {e}")
+ shutil.copy(file, export)
+
+def extract_audio(video_path, audio_path):
+ """Extract audio from video file"""
+ if not MOVIEPY_AVAILABLE:
+ print("โ ๏ธ MoviePy not available - cannot extract audio")
+ return False
+
+ try:
+ clip = VideoFileClip(video_path)
+ if clip.audio is not None:
+ clip.audio.write_audiofile(audio_path, logger=None)
+ clip.close()
+ return True
+ else:
+ clip.close()
+ return False
+ except Exception as e:
+ print(f"โ Error extracting audio: {e}")
+ return False
+
+def cleanup_temp_files():
+ """Clean up temporary files from previous runs"""
+ temp_files = [
+ get_instance_file_path('source-image.jpg'),
+ get_instance_file_path('resize-vid.mp4'),
+ get_instance_file_path('target-audio.wav')
+ ]
+
+ cleanup_count = 0
+
+ # Remove specific temp files for this instance
+ for temp_file in temp_files:
+ if os.path.exists(temp_file):
+ try:
+ os.remove(temp_file)
+ print(f"๐งน Cleaned up: {temp_file}")
+ cleanup_count += 1
+ except Exception as e:
+ print(f"โ ๏ธ Could not remove {temp_file}: {e}")
+
+ # Clean up old instance directories (older than 1 hour)
+ temp_base_dir = os.path.join(BASE_DIR, "Temp")
+ if os.path.exists(temp_base_dir):
+ try:
+ for instance_dir in os.listdir(temp_base_dir):
+ instance_path = os.path.join(temp_base_dir, instance_dir)
+ if os.path.isdir(instance_path) and instance_dir.startswith('instance_'):
+ # Check if directory is old enough to clean up
+ if time.time() - os.path.getctime(instance_path) > 3600: # 1 hour
+ try:
+ shutil.rmtree(instance_path)
+ print(f"๐งน Cleaned up old instance: {instance_path}")
+ cleanup_count += 1
+ except Exception as e:
+ print(f"โ ๏ธ Could not remove {instance_path}: {e}")
+ except Exception as e:
+ print(f"โ ๏ธ Could not access temp directory: {e}")
+
+ if cleanup_count > 0:
+ print(f"โ
Startup cleanup completed: {cleanup_count} items removed")
+ else:
+ print("โจ Startup cleanup: No temp files found to remove")
+
+# Run cleanup on startup
+cleanup_temp_files()
+
+os.makedirs(INSTANCE_OUTPUT_DIR, exist_ok=True)
+
+def run_single_video(source_image, target_video, frame_processor, face_analyser_direction,
+ face_recognition, face_analyser_gender, face_analyser_age, skip_audio,
+ keep_fps, keep_temp, lip_syncer_model, enable_lip_sync, gpu_selection):
+ """Process a single uploaded video"""
+ global last_output_path, last_batch_mode
+ last_batch_mode = False
+
+ # Set GPU device
+ set_gpu_device(gpu_selection)
+
+ print(f'๐ฌ Processing target video: {target_video}')
+
+ # Saving the uploaded image and video with instance-specific paths
+ new_source = get_instance_file_path('source-image.jpg')
+ new_target = get_instance_file_path('resize-vid.mp4')
+
+ # Copy the files locally
+ shutil.copy(source_image, new_source)
+ resize_video(file=target_video, export=new_target, fps=30)
+
+ if not os.path.exists(new_source):
+ return "โ Source image file does not exist", ""
+ if not os.path.exists(new_target):
+ return "โ Target video file does not exist", ""
+
+ # Extract the original filenames of the source image and target video
+ source_image_name = os.path.splitext(os.path.basename(source_image))[0]
+ target_video_name = os.path.splitext(os.path.basename(target_video))[0]
+
+ selected_frame_processors = ' '.join(frame_processor)
+
+ # Handle audio extraction for lip sync from the TARGET video itself
+ audio_source_path = None
+ if enable_lip_sync:
+ audio_source_path = get_instance_file_path('target-audio.wav')
+ if not extract_audio(new_target, audio_source_path):
+ print(f"โ ๏ธ Warning: Could not extract audio from {target_video}. Skipping lip sync.")
+ enable_lip_sync = False
+
+ # Add lip sync suffix to filename if enabled
+ suffix = "_lipsynced" if enable_lip_sync else ""
+ output_filename = f"{source_image_name}_{target_video_name}{suffix}.mp4"
+ output_path = os.path.join(INSTANCE_OUTPUT_DIR, output_filename)
+
+ os.makedirs(INSTANCE_OUTPUT_DIR, exist_ok=True)
+
+ # Determine execution provider based on GPU selection
+ if gpu_selection.startswith("GPU"):
+ execution_provider = "cuda"
+ else:
+ execution_provider = "cpu"
+
+ # Construct command as a single string and use shlex.split to handle it
+ cmd = (
+ f"python run.py --execution-providers {execution_provider} "
+ f"--execution-thread-count 8 " # Changed from 16 to 8
+ f"--reference-face-distance 1.5 "
+ f"-s {shlex.quote(new_source)} -t {shlex.quote(new_target)} -o {shlex.quote(output_path)} "
+ f"--frame-processors {selected_frame_processors} "
+ f"--face-analyser-direction {face_analyser_direction} "
+ )
+
+ # Add lip sync parameters if enabled
+ if enable_lip_sync and audio_source_path:
+ cmd += f"--source-paths {shlex.quote(audio_source_path)} "
+ cmd += f"--lip-syncer-model {lip_syncer_model} "
+ # Ensure lip_syncer is in frame processors
+ if 'lip_syncer' not in frame_processor:
+ frame_processor_with_lip = list(frame_processor) + ['lip_syncer']
+ cmd = cmd.replace(f"--frame-processors {selected_frame_processors}",
+ f"--frame-processors {' '.join(frame_processor_with_lip)}")
+
+ if face_recognition != 'none':
+ cmd += f"--face-recognition {face_recognition} "
+ if face_analyser_gender != 'none':
+ cmd += f"--face-analyser-gender {face_analyser_gender} "
+
+ # Add the face_analyser_age parameter
+ cmd += f"--face-analyser-age {face_analyser_age} "
+
+ if skip_audio and not enable_lip_sync: # Don't skip audio if lip syncing
+ cmd += "--skip-audio "
+ if keep_fps:
+ cmd += "--keep-fps "
+ if keep_temp:
+ cmd += "--keep-temp "
+
+ try:
+ print("Started command...", cmd)
+ start_time = time.time()
+
+ # Use shlex.split(cmd) to safely handle spaces in paths
+ global current_process
+ current_process = sp.Popen(shlex.split(cmd), stdout=sp.PIPE, stderr=sp.STDOUT, text=True, bufsize=1, universal_newlines=True)
+ process = current_process
+
+ output_lines = []
+ cli_output = ""
+
+ while True:
+ output = process.stdout.readline()
+ if output == '' and process.poll() is not None:
+ break
+ if output:
+ line = output.strip()
+ print(line)
+ output_lines.append(line)
+
+ # Build up CLI output for display
+ cli_output += line + "\n"
+
+ # Keep only last 50 lines to prevent memory issues
+ if len(output_lines) > 50:
+ output_lines = output_lines[-50:]
+ cli_output = "\n".join(output_lines[-50:]) + "\n"
+
+ # Yield intermediate results to update the interface
+ yield None, cli_output
+
+ rc = process.poll()
+ end_time = time.time()
+ execution_time = end_time - start_time
+
+ final_output = cli_output + f"\n\nCommand execution time: {execution_time:.2f} seconds"
+
+ if rc != 0:
+ return f"An error occurred during command execution.", final_output
+
+ # Clean up to free GPU memory
+ del process
+ if torch.cuda.is_available():
+ torch.cuda.empty_cache()
+ gc.collect()
+
+ # Clean up temporary audio file
+ if audio_source_path and os.path.exists(audio_source_path):
+ os.remove(audio_source_path)
+
+ # Set the last output path for downloading
+ last_output_path = output_path
+ return output_path, final_output
+
+ except Exception as e:
+ # Clean up temporary audio file in case of error
+ if audio_source_path and os.path.exists(audio_source_path):
+ os.remove(audio_source_path)
+ return f"An error occurred: {str(e)}", cli_output
+
+def run_folder_batch(source_image, frame_processor, face_analyser_direction, face_recognition,
+ face_analyser_gender, skip_audio, keep_fps, keep_temp, lip_syncer_model, enable_lip_sync, gpu_selection):
+ """Process all videos in Convert folder"""
+ global last_output_path, last_batch_mode
+ last_batch_mode = True
+
+ # Set GPU device
+ set_gpu_device(gpu_selection)
+
+ video_directory = os.path.join(BASE_DIR, "Convert")
+
+ # Create Convert directory if it doesn't exist
+ os.makedirs(video_directory, exist_ok=True)
+
+ video_files = [os.path.join(video_directory, f) for f in os.listdir(video_directory) if f.lower().endswith(('.mp4', '.avi', '.mov', '.mkv'))]
+
+ if not video_files:
+ yield None, f"๐ No video files found in the directory: {video_directory}"
+ return
+
+ new_source = get_instance_file_path('source-image.jpg')
+ shutil.copy(source_image, new_source)
+
+ if not os.path.exists(new_source):
+ yield None, "โ Source image file does not exist"
+ return
+
+ # Extract the original filename of the source image
+ source_image_name = os.path.splitext(os.path.basename(source_image))[0]
+
+ cli_output = f"๐ Found {len(video_files)} videos to process in {video_directory}\n"
+ cli_output += f"๐ฏ Source image: {source_image_name}\n"
+ cli_output += f"๐ฅ๏ธ GPU Selection: {gpu_selection}\n"
+ cli_output += f"๐ Instance Output: {INSTANCE_OUTPUT_DIR}\n\n"
+ yield None, cli_output
+
+ successful_videos = 0
+ failed_videos = 0
+
+ for i, target_video in enumerate(video_files, 1):
+ current_video_output = f"[{i}/{len(video_files)}] ๐ฌ Processing: {os.path.basename(target_video)}\n"
+ cli_output += current_video_output
+ print(f"[{i}/{len(video_files)}] Processing: {os.path.basename(target_video)}") # Console output
+ yield None, cli_output
+
+ new_target = get_instance_file_path('resize-vid.mp4')
+
+ try:
+ resize_video(file=target_video, export=new_target, fps=30)
+ except Exception as e:
+ error_msg = f"โ Error resizing video {target_video}: {e}\n"
+ cli_output += error_msg
+ print(error_msg.strip()) # Console output
+ failed_videos += 1
+ yield None, cli_output
+ continue # Proceed to next video
+
+ if not os.path.exists(new_target):
+ error_msg = f"โ Target video file {target_video} does not exist after resizing.\n"
+ cli_output += error_msg
+ print(error_msg.strip()) # Console output
+ failed_videos += 1
+ yield None, cli_output
+ continue # Proceed to next video
+
+ target_video_name = os.path.splitext(os.path.basename(target_video))[0]
+
+ # Handle audio extraction for lip sync from the TARGET video itself
+ audio_source_path = None
+ if enable_lip_sync:
+ audio_source_path = get_instance_file_path('target-audio.wav')
+ if not extract_audio(new_target, audio_source_path):
+ warning_msg = f"โ ๏ธ Warning: Could not extract audio from {target_video}. Skipping lip sync.\n"
+ cli_output += warning_msg
+ print(warning_msg.strip()) # Console output
+ yield None, cli_output
+ enable_lip_sync = False
+
+ # Add lip sync suffix to filename if enabled
+ suffix = "_lipsynced" if enable_lip_sync else ""
+ output_filename = f"{source_image_name}_{target_video_name}{suffix}.mp4"
+ output_path = os.path.join(INSTANCE_OUTPUT_DIR, output_filename)
+
+ os.makedirs(INSTANCE_OUTPUT_DIR, exist_ok=True)
+
+ # Determine execution provider based on GPU selection
+ if gpu_selection.startswith("GPU"):
+ execution_provider = "cuda"
+ else:
+ execution_provider = "cpu"
+
+ # Construct command as a single string and use shlex.split to handle it
+ cmd = (
+ f"python run.py --execution-providers {execution_provider} "
+ f"--execution-thread-count 8 " # Changed from 16 to 8
+ f"--reference-face-distance 1.5 "
+ f"-s {shlex.quote(new_source)} -t {shlex.quote(new_target)} -o {shlex.quote(output_path)} "
+ f"--frame-processors {' '.join(frame_processor)} "
+ f"--face-analyser-direction {face_analyser_direction} "
+ )
+
+ # Add lip sync parameters if enabled
+ if enable_lip_sync and audio_source_path:
+ cmd += f"--source-paths {shlex.quote(audio_source_path)} "
+ cmd += f"--lip-syncer-model {lip_syncer_model} "
+ # Ensure lip_syncer is in frame processors
+ if 'lip_syncer' not in frame_processor:
+ frame_processor_with_lip = list(frame_processor) + ['lip_syncer']
+ else:
+ frame_processor_with_lip = frame_processor
+ # Update the command with the new frame processors
+ cmd = cmd.replace(f"--frame-processors {' '.join(frame_processor)}",
+ f"--frame-processors {' '.join(frame_processor_with_lip)}")
+
+ if face_recognition != 'none':
+ cmd += f"--face-recognition {face_recognition} "
+ if face_analyser_gender != 'none':
+ cmd += f"--face-analyser-gender {face_analyser_gender} "
+
+ if skip_audio and not enable_lip_sync: # Don't skip audio if lip syncing
+ cmd += "--skip-audio "
+ if keep_fps:
+ cmd += "--keep-fps "
+ if keep_temp:
+ cmd += "--keep-temp "
+
+ try:
+ cmd_msg = f"Starting processing with command...\n"
+ cli_output += cmd_msg
+ print("Starting processing...") # Console output
+ yield None, cli_output
+
+ start_time = time.time()
+ # Use shlex.split(cmd) to safely handle spaces in paths
+ global current_process
+ current_process = sp.Popen(shlex.split(cmd), stdout=sp.PIPE, stderr=sp.STDOUT, text=True, bufsize=1, universal_newlines=True)
+ process = current_process
+
+ line_count = 0
+ last_update_time = time.time()
+
+ while True:
+ output = process.stdout.readline()
+ if output == '' and process.poll() is not None:
+ break
+ if output:
+ line = output.strip()
+ print(line) # Always show in console
+
+ # Only update web interface every 10 lines or every 2 seconds to prevent slowdown
+ line_count += 1
+ current_time = time.time()
+
+ if line_count % 10 == 0 or current_time - last_update_time > 2:
+ cli_output += line + "\n"
+ # Keep only last 50 lines to prevent memory issues
+ lines = cli_output.split('\n')
+ if len(lines) > 50:
+ cli_output = '\n'.join(lines[-50:])
+ yield None, cli_output
+ last_update_time = current_time
+
+ rc = process.poll()
+ end_time = time.time()
+ execution_time = end_time - start_time
+
+ if rc != 0:
+ error_msg = f"An error occurred during command execution for {target_video}.\n"
+ cli_output += error_msg
+ print(error_msg.strip()) # Console output
+ failed_videos += 1
+ yield None, cli_output
+ # Ensure the process is terminated
+ try:
+ process.kill()
+ process.wait()
+ except:
+ pass
+ continue # Proceed to next video
+ else:
+ success_msg = f"Processing completed for {target_video} in {execution_time:.2f} seconds.\n\n"
+ cli_output += success_msg
+ print(f"Processing completed for {os.path.basename(target_video)} in {execution_time:.2f} seconds.") # Console output
+ successful_videos += 1
+ yield None, cli_output
+
+ # Clean up to free GPU memory
+ del process
+ if torch.cuda.is_available():
+ torch.cuda.empty_cache()
+ gc.collect()
+
+ except Exception as e:
+ error_msg = f"An error occurred while processing {target_video}: {str(e)}\n"
+ cli_output += error_msg
+ print(error_msg.strip()) # Console output
+ failed_videos += 1
+ yield None, cli_output
+ continue # Proceed to next video
+
+ # Clean up temporary audio file
+ if audio_source_path and os.path.exists(audio_source_path):
+ try:
+ os.remove(audio_source_path)
+ except Exception as e:
+ print(f"โ ๏ธ Could not remove audio file: {e}")
+
+ final_msg = f"\n=== BATCH PROCESSING COMPLETE ===\n"
+ final_msg += f"โ
Successfully processed: {successful_videos} videos\n"
+ final_msg += f"โ Failed: {failed_videos} videos\n"
+ final_msg += f"๐ Total videos: {len(video_files)}\n"
+ final_msg += f"๐๏ธ Check the output folder for results: {INSTANCE_OUTPUT_DIR}"
+ cli_output += final_msg
+ print(f"=== BATCH PROCESSING COMPLETE === Successfully processed: {successful_videos}/{len(video_files)} videos") # Console output
+
+ # Set up for batch download
+ if successful_videos > 0:
+ last_output_path = create_batch_zip()
+ if last_output_path:
+ cli_output += f"\n๐ฆ Batch zip created: {os.path.basename(last_output_path)}"
+ else:
+ cli_output += f"\nโ ๏ธ Warning: Could not create batch zip file"
+
+ yield None, cli_output
+
+def handle_button_action(button_text, source_image, target_video, frame_processor, face_analyser_direction,
+ face_recognition, face_analyser_gender, face_analyser_age, skip_audio,
+ keep_fps, keep_temp, lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection):
+ """Handle the multi-purpose button actions"""
+ global last_output_path, last_batch_mode
+
+ if "RESET" in button_text:
+ # Reset to defaults
+ return reset_to_defaults()
+ elif "CANCEL" in button_text:
+ # Cancel processing
+ cancel_msg = cancel_processing()
+ return (
+ source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
+ face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
+ lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection,
+ cancel_msg, "๐ RESET CONFIGURATION"
+ )
+ elif "DOWNLOAD" in button_text:
+ # Download results
+ if last_batch_mode and last_output_path:
+ # Return the zip file for batch download
+ return (
+ source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
+ face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
+ lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection,
+ "๐ฅ Batch processing complete! Click download to get your zipped results!", "๐ฅ DOWNLOAD BATCH RESULTS"
+ )
+ elif not last_batch_mode and last_output_path:
+ # Return the single file for download
+ return (
+ source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
+ face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
+ lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection,
+ "๐ฅ Processing complete! Click download to get your enhanced video!", "๐ฅ DOWNLOAD VIDEO"
+ )
+
+ # Default return (shouldn't reach here normally)
+ return (
+ source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
+ face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
+ lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection,
+ "", "๐ RESET CONFIGURATION"
+ )
+
+def run_processing(source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
+ face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
+ lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection):
+ """Main processing function"""
+
+ if use_folder_mode:
+ # Folder batch mode
+ for _, cli_output in run_folder_batch(
+ source_image, frame_processor, face_analyser_direction, face_recognition,
+ face_analyser_gender, skip_audio, keep_fps, keep_temp, lip_syncer_model, enable_lip_sync, gpu_selection
+ ):
+ yield cli_output, "โน๏ธ CANCEL PROCESSING"
+ # Processing complete
+ yield cli_output + "\n\nโ
Batch processing completed successfully!", "๐ฅ DOWNLOAD BATCH RESULTS"
+ else:
+ # Single video mode
+ output_path = None
+ for video_result, cli_output in run_single_video(
+ source_image, target_video, frame_processor, face_analyser_direction, face_recognition,
+ face_analyser_gender, face_analyser_age, skip_audio, keep_fps, keep_temp,
+ lip_syncer_model, enable_lip_sync, gpu_selection
+ ):
+ if video_result and not video_result.startswith("An error occurred"):
+ output_path = video_result
+ yield cli_output, "โน๏ธ CANCEL PROCESSING"
+
+ # Processing complete
+ if output_path and os.path.exists(output_path):
+ yield cli_output + "\n\n๐ Video processing completed successfully!", "๐ฅ DOWNLOAD VIDEO"
+ else:
+ yield cli_output + "\n\nโ
I did what I was told!", "๐ RESET CONFIGURATION"
+
+def get_download_file():
+ """Get the appropriate file for download"""
+ global last_output_path, last_batch_mode
+ if last_output_path and os.path.exists(last_output_path):
+ return last_output_path
+ return None
+
+def get_theme() -> gr.Theme:
+ return gr.themes.Monochrome(
+ primary_hue=gr.themes.colors.teal,
+ secondary_hue=gr.themes.colors.gray,
+ font=gr.themes.GoogleFont('Inter')
+ ).set(
+ background_fill_primary="#1f1f1f",
+ background_fill_secondary="#2d2d2d",
+ block_label_text_size="*text_sm",
+ block_title_text_size="*text_md"
+ )
+
+def toggle_lip_sync_visibility(enable_lip_sync):
+ """Toggle visibility of lip sync related components"""
+ return {
+ lip_syncer_model_dropdown: gr.update(visible=enable_lip_sync)
+ }
+
+def toggle_folder_mode(use_folder_mode):
+ """Toggle visibility of target video upload based on folder mode"""
+ return {
+ target_video: gr.update(visible=not use_folder_mode),
+ face_analyser_age_dropdown: gr.update(visible=not use_folder_mode)
+ }
+
+with gr.Blocks(theme=get_theme(), css="""
+ .gradio-container {
+ max-width: 1800px !important;
+ margin: 0 auto !important;
+ background: linear-gradient(135deg, #0f0f23 0%, #1a1a2e 50%, #16213e 100%);
+ min-height: 100vh;
+ padding: 0.3rem !important;
+ }
+ .main-header {
+ text-align: center;
+ margin-bottom: 0.6rem;
+ padding: 0.8rem;
+ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
+ border-radius: 12px;
+ color: white;
+ box-shadow: 0 10px 20px rgba(102, 126, 234, 0.3);
+ position: relative;
+ overflow: hidden;
+ }
+ .main-header h1 {
+ font-size: 1.8rem !important;
+ margin: 0 !important;
+ }
+ .main-header p {
+ font-size: 0.9rem !important;
+ margin: 0.2rem 0 0 0 !important;
+ }
+ .section-header {
+ font-weight: 600;
+ font-size: 0.95rem;
+ margin-bottom: 0.6rem;
+ color: #667eea;
+ background: linear-gradient(90deg, #667eea, #764ba2);
+ -webkit-background-clip: text;
+ -webkit-text-fill-color: transparent;
+ background-clip: text;
+ border-bottom: 2px solid #667eea;
+ padding-bottom: 0.3rem;
+ position: relative;
+ }
+ .section-header::after {
+ content: '';
+ position: absolute;
+ bottom: -2px;
+ left: 0;
+ width: 30px;
+ height: 2px;
+ background: linear-gradient(90deg, #667eea, #764ba2);
+ border-radius: 2px;
+ }
+ .control-panel {
+ background: linear-gradient(135deg, rgba(102, 126, 234, 0.1) 0%, rgba(118, 75, 162, 0.1) 100%);
+ border-radius: 12px;
+ padding: 0.8rem;
+ margin-bottom: 0.5rem;
+ border: 2px solid rgba(102, 126, 234, 0.2);
+ box-shadow: 0 8px 20px rgba(102, 126, 234, 0.1);
+ backdrop-filter: blur(10px);
+ position: relative;
+ overflow: hidden;
+ height: fit-content;
+ }
+ .control-panel::before {
+ content: '';
+ position: absolute;
+ top: 0;
+ left: 0;
+ right: 0;
+ height: 1px;
+ background: linear-gradient(90deg, transparent, rgba(255,255,255,0.3), transparent);
+ }
+ .button-row {
+ display: flex;
+ gap: 1rem;
+ justify-content: center;
+ margin: 0.8rem 0;
+ }
+ #action-buttons {
+ margin-top: 0.5rem !important;
+ }
+ #action-buttons .gr-button {
+ width: 100% !important;
+ margin: 0.2rem 0 !important;
+ }
+ #download-btn {
+ background: linear-gradient(135deg, #10b981, #059669) !important;
+ border: none !important;
+ color: white !important;
+ font-weight: 600 !important;
+ transition: all 0.3s ease !important;
+ }
+ #download-btn:hover {
+ background: linear-gradient(135deg, #059669, #047857) !important;
+ transform: translateY(-1px) !important;
+ box-shadow: 0 4px 12px rgba(16, 185, 129, 0.4) !important;
+ }
+
+ /* Download status styling */
+ .download-status {
+ font-size: 0.8rem !important;
+ background: rgba(16, 185, 129, 0.1) !important;
+ border: 1px solid rgba(16, 185, 129, 0.3) !important;
+ border-radius: 6px !important;
+ margin-top: 0.3rem !important;
+ }
+
+ /* Download file component styling when visible */
+ .download-component {
+ background: rgba(16, 185, 129, 0.1) !important;
+ border: 2px solid rgba(16, 185, 129, 0.4) !important;
+ border-radius: 8px !important;
+ padding: 0.5rem !important;
+ margin-top: 0.3rem !important;
+ }
+
+ .processing-log {
+ margin-top: 0.5rem;
+ background: linear-gradient(135deg, rgba(15, 15, 35, 0.9) 0%, rgba(26, 26, 46, 0.9) 100%);
+ border-radius: 12px;
+ padding: 0.8rem;
+ border: 2px solid rgba(102, 126, 234, 0.3);
+ box-shadow: inset 0 2px 10px rgba(0,0,0,0.3), 0 8px 20px rgba(102, 126, 234, 0.1);
+ height: fit-content;
+ }
+ /* Enhanced form styling */
+ .gr-form {
+ background: transparent !important;
+ }
+ .gr-box {
+ border-radius: 8px !important;
+ border: 1px solid rgba(102, 126, 234, 0.2) !important;
+ background: rgba(255, 255, 255, 0.02) !important;
+ margin: 0.2rem 0 !important;
+ padding: 0.3rem !important;
+ }
+ .gr-button {
+ border-radius: 10px !important;
+ font-weight: 600 !important;
+ text-transform: uppercase !important;
+ letter-spacing: 0.5px !important;
+ transition: all 0.3s ease !important;
+ box-shadow: 0 4px 15px rgba(0,0,0,0.3) !important;
+ padding: 0.5rem 1.2rem !important;
+ font-size: 0.85rem !important;
+ }
+ .gr-button:hover {
+ transform: translateY(-1px) !important;
+ box-shadow: 0 6px 20px rgba(0,0,0,0.4) !important;
+ }
+ .gr-button-primary {
+ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important;
+ border: none !important;
+ }
+ .gr-button-secondary {
+ background: linear-gradient(135deg, #434343 0%, #000000 100%) !important;
+ border: 1px solid rgba(102, 126, 234, 0.3) !important;
+ color: white !important;
+ }
+ /* Configuration grid styling */
+ .config-section {
+ background: rgba(255, 255, 255, 0.03);
+ border-radius: 10px;
+ padding: 0.6rem;
+ border: 1px solid rgba(102, 126, 234, 0.15);
+ margin: 0.2rem;
+ }
+ .config-section h4 {
+ font-size: 0.85rem !important;
+ margin: 0 0 0.4rem 0 !important;
+ }
+ /* Compact textbox styling */
+ .gr-textbox {
+ background: rgba(15, 15, 35, 0.8) !important;
+ border: 1px solid rgba(102, 126, 234, 0.3) !important;
+ border-radius: 8px !important;
+ color: #e2e8f0 !important;
+ font-size: 0.8rem !important;
+ }
+ /* Compact file upload styling */
+ .gr-file {
+ border: 2px dashed rgba(102, 126, 234, 0.4) !important;
+ border-radius: 10px !important;
+ background: rgba(102, 126, 234, 0.05) !important;
+ transition: all 0.3s ease !important;
+ min-height: 85px !important;
+ max-height: 90px !important;
+ }
+ .gr-file:hover {
+ border-color: rgba(102, 126, 234, 0.6) !important;
+ background: rgba(102, 126, 234, 0.1) !important;
+ }
+ .gr-file .gr-file-label {
+ font-size: 0.75rem !important;
+ line-height: 1.2 !important;
+ }
+ .gr-file .upload-container {
+ padding: 0.8rem !important;
+ }
+ /* Simplified dropdown styling - remove complex CSS that might interfere */
+ .gr-dropdown {
+ background: rgba(255, 255, 255, 0.05) !important;
+ border: 1px solid rgba(102, 126, 234, 0.3) !important;
+ border-radius: 6px !important;
+ font-size: 0.8rem !important;
+ }
+
+ /* Let Gradio handle dropdown positioning naturally */
+ #main-gpu-dropdown {
+ position: relative !important;
+ }
+
+ /* GPU debug styling */
+ .gpu-debug {
+ font-size: 0.75rem !important;
+ background: rgba(255, 165, 0, 0.1) !important;
+ border: 1px solid rgba(255, 165, 0, 0.3) !important;
+ border-radius: 6px !important;
+ margin-top: 0.3rem !important;
+ }
+ /* Compact checkbox styling */
+ .gr-checkbox {
+ font-size: 0.8rem !important;
+ }
+ /* Make everything more compact */
+ .gr-group {
+ gap: 0.3rem !important;
+ }
+ .gr-column {
+ gap: 0.3rem !important;
+ }
+ .gr-row {
+ gap: 0.5rem !important;
+ }
+ /* Text size adjustments */
+ .gr-label {
+ font-size: 0.8rem !important;
+ }
+ .gr-info {
+ font-size: 0.7rem !important;
+ }
+ /* Ultra compact mode for smaller screens */
+ @media (max-height: 900px) {
+ .processing-log .gr-textbox {
+ max-height: 200px !important;
+ }
+ .main-header {
+ padding: 0.6rem !important;
+ margin-bottom: 0.4rem !important;
+ }
+ .control-panel {
+ padding: 0.6rem !important;
+ margin-bottom: 0.3rem !important;
+ }
+ .gr-file {
+ min-height: 70px !important;
+ max-height: 75px !important;
+ }
+ }
+ @media (max-height: 800px) {
+ .processing-log .gr-textbox {
+ max-height: 150px !important;
+ }
+ .main-header h1 {
+ font-size: 1.5rem !important;
+ }
+ .gr-file {
+ min-height: 65px !important;
+ max-height: 70px !important;
+ }
+ }
+ @media (max-height: 700px) {
+ .gr-file {
+ min-height: 60px !important;
+ max-height: 65px !important;
+ }
+ .control-panel {
+ padding: 0.4rem !important;
+ }
+ }
+""") as ui:
+
+ with gr.Column(elem_classes="main-header"):
+ gr.Markdown(f"""
+ # ๐ญ Advanced Face Swap Studio
+ **Professional-grade AI face swapping technology**
+ *Instance: {INSTANCE_ID} | Port: {INSTANCE_PORT}*
+ """, elem_classes="main-header")
+
+ with gr.Tabs():
+ # Main processing tab
+ with gr.Tab("๐ญ Face Swap", id="main"):
+ with gr.Row():
+ # Left Column - Source Input + Action Buttons
+ with gr.Column(scale=2):
+ with gr.Group(elem_classes="control-panel"):
+ gr.HTML('')
+
+ gr.HTML('๐ฏ Face Source
')
+ source_image = gr.File(
+ label="Upload Source Image",
+ file_types=["image"],
+ file_count="single",
+ height=85
+ )
+ gr.HTML('Clear image with the face to use
')
+
+ # Target video upload (visible by default)
+ gr.HTML('๐ฌ Target Video
')
+ target_video = gr.File(
+ label="Upload Target Video",
+ file_types=["video"],
+ file_count="single",
+ visible=True,
+ height=85
+ )
+ gr.HTML('Video where faces will be replaced
')
+
+ # Action Buttons moved to left column
+ with gr.Group(elem_classes="control-panel", elem_id="action-buttons"):
+ gr.HTML('')
+ with gr.Row():
+ start_button = gr.Button(
+ "๐ LAUNCH PROCESSING",
+ variant="primary",
+ size="lg",
+ elem_id="start-btn"
+ )
+ with gr.Row():
+ action_button = gr.Button(
+ "๐ RESET CONFIGURATION",
+ variant="secondary",
+ size="lg",
+ elem_id="action-btn"
+ )
+ with gr.Row():
+ download_button = gr.Button(
+ "๐ฅ DOWNLOAD RESULTS",
+ variant="secondary",
+ size="lg",
+ elem_id="download-btn",
+ visible=True
+ )
+ with gr.Row():
+ download_status = gr.Textbox(
+ label="๐ฅ Download Status",
+ value="startup_status", # Show startup status
+ interactive=False,
+ visible=True,
+ lines=2,
+ elem_classes="download-status"
+ )
+
+ # Download file component - now visible when needed
+ download_file = gr.File(
+ label="๐ฅ Click to Download",
+ visible=False,
+ file_count="single",
+ file_types=None, # Allow all file types
+ interactive=False,
+ elem_classes="download-component"
+ )
+
+ # Middle Column - Core Processing Configuration
+ with gr.Column(scale=3):
+ with gr.Group(elem_classes="control-panel"):
+ gr.HTML('')
+
+ # Main configuration in a clean grid layout
+ with gr.Row():
+ with gr.Column(scale=1, elem_classes="config-section"):
+ gr.HTML('๐ญ Frame Processing
')
+ # Get available frame processors based on what's installed
+ available_processors = ['face_swapper']
+ if ENHANCEMENT_AVAILABLE:
+ available_processors.extend(['face_enhancer', 'frame_enhancer'])
+
+ frame_processor_checkbox = gr.CheckboxGroup(
+ choices=available_processors,
+ label='Active Processors',
+ value=['face_swapper'] + (['face_enhancer'] if ENHANCEMENT_AVAILABLE else []),
+ visible=True,
+ info="โ ๏ธ frame_enhancer increases processing time" if ENHANCEMENT_AVAILABLE else "๐ง Basic mode - enhancement modules not available"
+ )
+
+ # Lip sync controls
+ enable_lip_sync = gr.Checkbox(
+ label="๐ต Enable Lip Sync",
+ value=False,
+ info="โ ๏ธ Beta feature"
+ )
+
+ lip_syncer_model_dropdown = gr.Dropdown(
+ label='Lip Sync Model',
+ choices=['wav2lip_96', 'wav2lip_gan_96'],
+ value='wav2lip_gan_96',
+ visible=False,
+ scale=1
+ )
+
+ with gr.Column(scale=1, elem_classes="config-section"):
+ gr.HTML('๐ Face Analysis
')
+
+ face_recognition_dropdown = gr.Dropdown(
+ label='Recognition Mode',
+ choices=['none', 'reference', 'many'],
+ value='reference',
+ visible=True
+ )
+
+ face_analyser_direction_dropdown = gr.Dropdown(
+ label='Analysis Direction',
+ choices=['left-right', 'right-left', 'top-bottom', 'bottom-top', 'small-large', 'large-small'],
+ value='top-bottom',
+ visible=True
+ )
+
+ face_analyser_gender_dropdown = gr.Dropdown(
+ label='Target Gender',
+ choices=['none', 'male', 'female'],
+ value='female',
+ visible=True
+ )
+
+ face_analyser_age_dropdown = gr.Dropdown(
+ label='Target Age Group',
+ choices=['child', 'teen', 'adult', 'senior'],
+ value='adult',
+ visible=True
+ )
+
+ # Hidden option
+ keep_temp = gr.Checkbox(
+ label="๐๏ธ Keep Temp Files",
+ value=False,
+ visible=False
+ )
+
+ # Right Column - Processing Log + Processing Options
+ with gr.Column(scale=3):
+ with gr.Group(elem_classes="processing-log"):
+ gr.HTML('')
+
+ cli_output = gr.Textbox(
+ label="๐ Live Processing Output",
+ lines=12,
+ max_lines=15,
+ interactive=False,
+ show_copy_button=True,
+ container=True,
+ placeholder=f"๐ง System ready. Configure settings and click 'Launch Processing'...\n\nโก Real-time progress updates\n๐ Performance metrics\n๐ฏ Processing logs\nโจ Completion notifications\n\n๐ Instance: {INSTANCE_ID}\n๐ Output: {INSTANCE_OUTPUT_DIR}\n๐ฅ Download button scans output folder automatically",
+ elem_id="processing-monitor"
+ )
+
+ # Processing options moved to right column
+ with gr.Group(elem_classes="control-panel"):
+ gr.HTML('')
+ with gr.Row():
+ with gr.Column():
+ # Simple GPU selection - mirroring working test
+ gpu_selection_dropdown = gr.Dropdown(
+ label="๐ฅ๏ธ Compute Device",
+ choices=AVAILABLE_GPUS,
+ value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only",
+ info="Select your GPU or CPU for processing",
+ interactive=True,
+ allow_custom_value=False,
+ elem_id="main-gpu-dropdown"
+ )
+
+ gpu_debug_btn = gr.Button("๐ Debug GPU", size="sm")
+
+ gpu_debug_output = gr.Textbox(
+ label="๐ GPU Status",
+ value=f"Detected: {len(AVAILABLE_GPUS)} options โ {', '.join(AVAILABLE_GPUS)}",
+ interactive=False,
+ lines=2,
+ elem_classes="gpu-debug"
+ )
+
+ skip_audio = gr.Checkbox(
+ label="๐ Skip Audio",
+ value=False,
+ info="Video only processing"
+ )
+ with gr.Column():
+ use_folder_mode = gr.Checkbox(
+ label="๐ Batch Mode",
+ value=False,
+ info="Process ./Convert folder"
+ )
+ keep_fps = gr.Checkbox(
+ label="๐ฌ Preserve FPS",
+ value=True,
+ info="Keep original frame rate"
+ )
+
+ # GPU Test Tab
+ with gr.Tab("๐ง GPU Test", id="test"):
+ gr.Markdown("## GPU Dropdown Test")
+ gr.Markdown("This tab tests if the GPU dropdown works correctly")
+
+ test_gpu_dropdown = gr.Dropdown(
+ label="Test GPU Selection",
+ choices=AVAILABLE_GPUS,
+ value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only",
+ interactive=True,
+ allow_custom_value=False,
+ info="This should show all your GPUs as a proper dropdown"
+ )
+
+ test_output = gr.Textbox(
+ label="Selected GPU",
+ value=f"Current: {AVAILABLE_GPUS[0] if AVAILABLE_GPUS else 'None'}",
+ interactive=False
+ )
+
+ test_status = gr.Textbox(
+ label="GPU Detection Status",
+ value=f"Detected {len(AVAILABLE_GPUS)} options: {', '.join(AVAILABLE_GPUS)}",
+ interactive=False,
+ lines=3
+ )
+
+ def test_gpu_change(selected):
+ print(f"๐งช Test GPU Selected: {selected}")
+ return f"You selected: {selected}"
+
+ test_gpu_dropdown.change(
+ test_gpu_change,
+ inputs=[test_gpu_dropdown],
+ outputs=[test_output]
+ )
+
+ # Toggle lip sync components visibility
+ enable_lip_sync.change(
+ toggle_lip_sync_visibility,
+ inputs=[enable_lip_sync],
+ outputs=[lip_syncer_model_dropdown]
+ )
+
+ # Toggle folder mode visibility
+ use_folder_mode.change(
+ toggle_folder_mode,
+ inputs=[use_folder_mode],
+ outputs=[target_video, face_analyser_age_dropdown]
+ )
+
+ # GPU selection change handler for debugging
+ gpu_selection_dropdown.change(
+ on_gpu_selection_change,
+ inputs=[gpu_selection_dropdown],
+ outputs=[]
+ )
+
+ # GPU debug button
+ gpu_debug_btn.click(
+ debug_gpu_dropdown,
+ inputs=[],
+ outputs=[gpu_selection_dropdown, gpu_debug_output]
+ )
+
+ # Main processing button
+ start_button.click(
+ run_processing,
+ inputs=[
+ source_image,
+ target_video,
+ frame_processor_checkbox,
+ face_analyser_direction_dropdown,
+ face_recognition_dropdown,
+ face_analyser_gender_dropdown,
+ face_analyser_age_dropdown,
+ skip_audio,
+ keep_fps,
+ keep_temp,
+ lip_syncer_model_dropdown,
+ enable_lip_sync,
+ use_folder_mode,
+ gpu_selection_dropdown
+ ],
+ outputs=[cli_output, action_button]
+ )
+
+ # Multi-purpose action button
+ action_button.click(
+ handle_button_action,
+ inputs=[
+ action_button,
+ source_image,
+ target_video,
+ frame_processor_checkbox,
+ face_analyser_direction_dropdown,
+ face_recognition_dropdown,
+ face_analyser_gender_dropdown,
+ face_analyser_age_dropdown,
+ skip_audio,
+ keep_fps,
+ keep_temp,
+ lip_syncer_model_dropdown,
+ enable_lip_sync,
+ use_folder_mode,
+ gpu_selection_dropdown
+ ],
+ outputs=[
+ source_image,
+ target_video,
+ frame_processor_checkbox,
+ face_analyser_direction_dropdown,
+ face_recognition_dropdown,
+ face_analyser_gender_dropdown,
+ face_analyser_age_dropdown,
+ skip_audio,
+ keep_fps,
+ keep_temp,
+ lip_syncer_model_dropdown,
+ enable_lip_sync,
+ use_folder_mode,
+ gpu_selection_dropdown,
+ cli_output,
+ action_button
+ ]
+ )
+
+ # FIXED: Download button functionality with proper download component management
+ download_button.click(
+ handle_download_click,
+ inputs=[],
+ outputs=[download_file, download_status, download_file, download_button],
+ show_progress=True
+ )
+
+ # FIXED: Reset download UI when download component changes (download completes)
+ download_file.change(
+ reset_download_ui,
+ inputs=[],
+ outputs=[download_file, download_button, download_status]
+ )
+
+# Print system information for verification
+print(f"๐ Base directory: {BASE_DIR}")
+print(f"๐ Instance ID: {INSTANCE_ID}")
+print(f"๐ Convert directory: {os.path.join(BASE_DIR, 'Convert')}")
+print(f"๐๏ธ Instance temp: {INSTANCE_TEMP_DIR}")
+print(f"๐ค Instance output: {INSTANCE_OUTPUT_DIR}")
+print(f"๐ฅ๏ธ Available GPUs: {AVAILABLE_GPUS}")
+print(f"๐ง Enhancement modules: {'โ
Available' if ENHANCEMENT_AVAILABLE else 'โ Not Available'}")
+
+# Determine if running in HuggingFace Spaces or similar cloud environment
+def get_launch_config():
+ """Get appropriate launch configuration based on environment"""
+ if os.getenv('SPACE_ID'): # HuggingFace Spaces
+ return {
+ 'server_name': "0.0.0.0",
+ 'server_port': 7860, # HF Spaces always use 7860
+ 'share': False,
+ 'debug': False
+ }
+ elif os.getenv('COLAB_GPU'): # Google Colab
+ return {
+ 'server_name': "127.0.0.1",
+ 'server_port': INSTANCE_PORT,
+ 'share': True,
+ 'debug': False
+ }
+ else: # Local development - use pre-determined port
+ return {
+ 'server_name': "127.0.0.1",
+ 'server_port': INSTANCE_PORT,
+ 'share': False,
+ 'debug': True
+ }
+
+# Launch configuration for better cross-platform compatibility
+launch_config = get_launch_config()
+print(f"๏ฟฝ๏ฟฝ๏ฟฝ Launching on port: {launch_config['server_port']}")
+print(f"๐ Access URL: http://localhost:{launch_config['server_port']}")
+
+ui.launch(
+ max_file_size="2100mb",
+ **launch_config
)
\ No newline at end of file