Spaces:
Paused
Paused
| """ | |
| 🎭 Advanced Face Swap Studio - HuggingFace Spaces Optimized | |
| ========================================================= | |
| ✅ FEATURES: | |
| - Professional face swapping with GPU acceleration | |
| - Batch processing for multiple videos | |
| - Real-time processing monitor | |
| - Lip sync integration (beta) | |
| - Enhanced face detection and analysis | |
| 🚀 Optimized exclusively for HuggingFace Spaces environment | |
| """ | |
| import os | |
| import sys | |
| import tempfile | |
| import time | |
| import shutil | |
| import subprocess as sp | |
| import uuid | |
| import zipfile | |
| import gc | |
| from pathlib import Path | |
| # Set up environment for HuggingFace Spaces | |
| os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" | |
| os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "TRUE" | |
| os.environ["PYTHONPATH"] = "." | |
| # Core imports | |
| import gradio as gr | |
| import torch | |
| # Optional imports with graceful degradation | |
| try: | |
| import onnxruntime as ort | |
| print("✅ ONNX Runtime loaded successfully") | |
| except ImportError as e: | |
| print(f"⚠️ ONNX Runtime not available: {e}") | |
| try: | |
| from moviepy.editor import VideoFileClip | |
| MOVIEPY_AVAILABLE = True | |
| print("✅ MoviePy loaded successfully") | |
| except ImportError as e: | |
| print(f"⚠️ MoviePy not available: {e}") | |
| MOVIEPY_AVAILABLE = False | |
| # Try to import enhancement modules - make this more robust | |
| ENHANCEMENT_AVAILABLE = False | |
| try: | |
| import importlib.util | |
| # Check if the modules exist | |
| face_enhancer_path = Path("SwitcherAI/processors/frame/modules/face_enhancer.py") | |
| frame_enhancer_path = Path("SwitcherAI/processors/frame/modules/frame_enhancer.py") | |
| if face_enhancer_path.exists() and frame_enhancer_path.exists(): | |
| sys.path.insert(0, str(Path("SwitcherAI/processors/frame/modules").resolve())) | |
| import face_enhancer | |
| import frame_enhancer | |
| ENHANCEMENT_AVAILABLE = True | |
| print("✅ Enhancement modules loaded successfully") | |
| else: | |
| print("⚠️ Enhancement module files not found") | |
| except Exception as e: | |
| print(f"⚠️ Enhancement modules not available: {e}") | |
| # Directory setup for HuggingFace Spaces | |
| BASE_DIR = Path(__file__).parent.resolve() | |
| TEMP_DIR = BASE_DIR / "temp_workspace" | |
| OUTPUT_DIR = BASE_DIR / "outputs" | |
| CONVERT_DIR = BASE_DIR / "Convert" | |
| ASSETS_DIR = BASE_DIR / ".assets" / "models" | |
| # Create directories with better error handling | |
| for directory in [TEMP_DIR, OUTPUT_DIR, CONVERT_DIR, ASSETS_DIR]: | |
| try: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| print(f"📁 Directory ready: {directory}") | |
| except Exception as e: | |
| print(f"⚠️ Failed to create directory {directory}: {e}") | |
| print(f"📁 Base directory: {BASE_DIR}") | |
| print(f"📂 Temp directory: {TEMP_DIR}") | |
| print(f"📤 Output directory: {OUTPUT_DIR}") | |
| print(f"🎯 Assets directory: {ASSETS_DIR}") | |
| print(f"📁 Convert directory: {CONVERT_DIR}") | |
| # Try to set up SwitcherAI temp directory | |
| try: | |
| sys.path.insert(0, str(BASE_DIR)) | |
| from SwitcherAI.utilities import conditional_download | |
| # Set up temp directory for SwitcherAI | |
| temp_switcher_dir = TEMP_DIR / "switcher_temp" | |
| temp_switcher_dir.mkdir(exist_ok=True) | |
| # Set environment variable for temp directory | |
| os.environ['SWITCHER_TEMP_DIR'] = str(temp_switcher_dir) | |
| print("🔧 SwitcherAI utilities loaded successfully") | |
| except ImportError as e: | |
| print(f"⚠️ Could not import SwitcherAI utilities: {e}") | |
| print("🔄 Using default temp directory behavior") | |
| # Download required model files with better error handling | |
| def download_required_models(): | |
| """Download required model files if not present""" | |
| import urllib.request | |
| import urllib.error | |
| models_to_download = [ | |
| { | |
| 'name': 'inswapper_128_fp16.onnx', | |
| 'url': 'https://huggingface.co/ninjawick/webui-faceswap-unlocked/resolve/main/inswapper_128_fp16.onnx', | |
| 'path': ASSETS_DIR / 'inswapper_128_fp16.onnx', | |
| 'description': 'InSwapper FP16 face swap model' | |
| }, | |
| { | |
| 'name': 'inswapper_128.onnx', | |
| 'url': 'https://huggingface.co/xingren23/comfyflow-models/resolve/main/insightface/inswapper_128.onnx', | |
| 'path': ASSETS_DIR / 'inswapper_128.onnx', | |
| 'description': 'InSwapper face swap model' | |
| }, | |
| { | |
| 'name': 'GFPGANv1.4.pth', | |
| 'url': 'https://huggingface.co/gmk123/GFPGAN/resolve/main/GFPGANv1.4.pth', | |
| 'path': ASSETS_DIR / 'GFPGANv1.4.pth', | |
| 'description': 'GFPGAN face enhancement model' | |
| } | |
| ] | |
| for model in models_to_download: | |
| model_path = model['path'] | |
| model_url = model['url'] | |
| model_name = model['name'] | |
| try: | |
| if model_path.exists() and model_path.stat().st_size > 1024: # Check if file exists and is > 1KB | |
| file_size = model_path.stat().st_size / (1024 * 1024) # MB | |
| print(f"✅ {model_name} already exists ({file_size:.1f}MB)") | |
| continue | |
| except Exception as e: | |
| print(f"⚠️ Error checking {model_name}: {e}") | |
| try: | |
| print(f"📥 Downloading {model_name}...") | |
| print(f" Description: {model['description']}") | |
| print(f" URL: {model_url}") | |
| print(f" Path: {model_path}") | |
| # Ensure parent directory exists | |
| model_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Create a progress callback | |
| def progress_callback(block_num, block_size, total_size): | |
| if total_size > 0: | |
| percent = min(100, (block_num * block_size * 100) / total_size) | |
| if block_num % 200 == 0: # Update every 200 blocks to avoid spam | |
| print(f" Progress: {percent:.1f}%") | |
| # Download with progress and proper headers for HuggingFace | |
| req = urllib.request.Request(model_url) | |
| req.add_header('User-Agent', 'Mozilla/5.0 (compatible; FaceSwapStudio/1.0)') | |
| with urllib.request.urlopen(req) as response: | |
| total_size = int(response.headers.get('Content-Length', 0)) | |
| downloaded = 0 | |
| with open(model_path, 'wb') as f: | |
| while True: | |
| chunk = response.read(8192) | |
| if not chunk: | |
| break | |
| f.write(chunk) | |
| downloaded += len(chunk) | |
| if total_size > 0 and downloaded % (8192 * 100) == 0: # Progress every ~800KB | |
| percent = (downloaded * 100) / total_size | |
| print(f" Progress: {percent:.1f}%") | |
| # Verify download | |
| if model_path.exists() and model_path.stat().st_size > 1024: | |
| file_size = model_path.stat().st_size / (1024 * 1024) # MB | |
| print(f"✅ {model_name} downloaded successfully ({file_size:.1f}MB)") | |
| else: | |
| print(f"❌ {model_name} download failed - file not created or too small") | |
| # Clean up failed download | |
| if model_path.exists(): | |
| model_path.unlink() | |
| except urllib.error.URLError as e: | |
| print(f"❌ Network error downloading {model_name}: {e}") | |
| except Exception as e: | |
| print(f"❌ Error downloading {model_name}: {e}") | |
| # Download models at startup - BEFORE web interface | |
| print("\n🔄 Checking required model files...") | |
| try: | |
| download_required_models() | |
| print("✅ Model check complete\n") | |
| except Exception as e: | |
| print(f"⚠️ Model download failed: {e}\n") | |
| # Global variables | |
| current_process = None | |
| last_output_path = None | |
| last_batch_mode = False | |
| def get_available_gpus(): | |
| """Get list of available CUDA devices for HuggingFace Spaces""" | |
| print("🔍 Detecting GPU devices...") | |
| available_gpus = [] | |
| if not torch.cuda.is_available(): | |
| print("❌ CUDA not available") | |
| return ["CPU Only"] | |
| try: | |
| device_count = torch.cuda.device_count() | |
| print(f"🔢 CUDA devices detected: {device_count}") | |
| for i in range(device_count): | |
| try: | |
| props = torch.cuda.get_device_properties(i) | |
| gpu_name = props.name | |
| gpu_memory = props.total_memory / (1024**3) # GB | |
| # Test device accessibility | |
| torch.cuda.set_device(i) | |
| test_tensor = torch.tensor([1.0], device=f'cuda:{i}') | |
| gpu_entry = f"GPU {i}: {gpu_name} ({gpu_memory:.1f}GB)" | |
| available_gpus.append(gpu_entry) | |
| print(f"✅ {gpu_entry}") | |
| del test_tensor | |
| torch.cuda.empty_cache() | |
| except Exception as e: | |
| print(f"❌ Error with GPU {i}: {e}") | |
| available_gpus.append(f"GPU {i}: Error") | |
| except Exception as e: | |
| print(f"❌ GPU detection failed: {e}") | |
| available_gpus.append("CPU Only") | |
| return available_gpus | |
| def set_gpu_device(gpu_selection): | |
| """Set CUDA device based on selection""" | |
| try: | |
| if gpu_selection.startswith("GPU") and "Error" not in gpu_selection: | |
| gpu_id = gpu_selection.split(":")[0].split(" ")[1] | |
| os.environ["CUDA_VISIBLE_DEVICES"] = gpu_id | |
| print(f"🖥️ Using GPU {gpu_id}") | |
| return gpu_id | |
| else: | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| print("🖥️ Using CPU mode") | |
| return "cpu" | |
| except Exception as e: | |
| print(f"⚠️ Error setting GPU device: {e}") | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| return "cpu" | |
| def safe_copy_file(source, destination): | |
| """Safely copy file with verification""" | |
| try: | |
| if isinstance(source, str): | |
| source = Path(source) | |
| if isinstance(destination, str): | |
| destination = Path(destination) | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| # Check source file exists and is readable | |
| if not source.exists(): | |
| print(f"❌ Source file does not exist: {source}") | |
| return False | |
| if source.stat().st_size == 0: | |
| print(f"❌ Source file is empty: {source}") | |
| return False | |
| shutil.copy2(source, destination) | |
| # Verify copy | |
| if destination.exists() and destination.stat().st_size > 0: | |
| print(f"✅ File copied: {destination.name}") | |
| return True | |
| else: | |
| print(f"❌ Copy verification failed: {destination.name}") | |
| return False | |
| except Exception as e: | |
| print(f"❌ Copy error: {e}") | |
| return False | |
| def handle_batch_file_upload(files): | |
| """Handle multiple file uploads for batch mode""" | |
| if not files: | |
| return "📁 No files uploaded" | |
| # Clear existing files in convert directory | |
| try: | |
| for existing_file in CONVERT_DIR.glob("*"): | |
| if existing_file.is_file(): | |
| existing_file.unlink() | |
| except Exception as e: | |
| print(f"⚠️ Error cleaning convert directory: {e}") | |
| uploaded_count = 0 | |
| failed_count = 0 | |
| for file in files: | |
| try: | |
| if file is None: | |
| continue | |
| # Get the original filename | |
| original_name = Path(file.name).name if hasattr(file, 'name') else f"video_{uploaded_count}.mp4" | |
| # Copy file to convert directory | |
| dest_path = CONVERT_DIR / original_name | |
| if safe_copy_file(file, dest_path): | |
| file_size = dest_path.stat().st_size / (1024 * 1024) # MB | |
| print(f"✅ Uploaded: {original_name} ({file_size:.1f}MB)") | |
| uploaded_count += 1 | |
| else: | |
| print(f"❌ Failed to upload: {original_name}") | |
| failed_count += 1 | |
| except Exception as e: | |
| print(f"❌ Error uploading file: {e}") | |
| failed_count += 1 | |
| status_msg = f"📦 Batch Upload Complete:\n✅ Uploaded: {uploaded_count} files\n" | |
| if failed_count > 0: | |
| status_msg += f"❌ Failed: {failed_count} files\n" | |
| # List uploaded files | |
| try: | |
| uploaded_files = [f.name for f in CONVERT_DIR.glob("*.mp4")] + [f.name for f in CONVERT_DIR.glob("*.avi")] + [f.name for f in CONVERT_DIR.glob("*.mov")] | |
| if uploaded_files: | |
| status_msg += f"📁 Files ready for processing:\n" + "\n".join([f" • {f}" for f in uploaded_files[:10]]) | |
| if len(uploaded_files) > 10: | |
| status_msg += f"\n ... and {len(uploaded_files) - 10} more" | |
| except Exception as e: | |
| print(f"⚠️ Error listing files: {e}") | |
| return status_msg | |
| def resize_video(input_path, output_path, fps=30): | |
| """Resize/process video with fallback""" | |
| try: | |
| if not MOVIEPY_AVAILABLE: | |
| print("⚠️ MoviePy not available - copying video directly") | |
| shutil.copy2(input_path, output_path) | |
| return True | |
| print(f"🎬 Processing video: {input_path.name}") | |
| clip = VideoFileClip(str(input_path)) | |
| clip.write_videofile(str(output_path), fps=fps, audio_codec='aac', verbose=False, logger=None) | |
| clip.close() | |
| print("✅ Video processed successfully") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Video processing failed: {e}") | |
| try: | |
| shutil.copy2(input_path, output_path) | |
| return True | |
| except Exception as e2: | |
| print(f"❌ Fallback copy failed: {e2}") | |
| return False | |
| def extract_audio(video_path, audio_path): | |
| """Extract audio from video""" | |
| try: | |
| if not MOVIEPY_AVAILABLE: | |
| print("⚠️ MoviePy not available - cannot extract audio") | |
| return False | |
| clip = VideoFileClip(str(video_path)) | |
| if clip.audio is not None: | |
| clip.audio.write_audiofile(str(audio_path), logger=None, verbose=False) | |
| clip.close() | |
| return True | |
| else: | |
| clip.close() | |
| return False | |
| except Exception as e: | |
| print(f"❌ Audio extraction failed: {e}") | |
| return False | |
| def cleanup_temp_files(): | |
| """Clean up temporary files""" | |
| try: | |
| for file in TEMP_DIR.glob("*"): | |
| if file.is_file(): | |
| file.unlink() | |
| print("🧹 Temp files cleaned") | |
| except Exception as e: | |
| print(f"⚠️ Cleanup error: {e}") | |
| def cleanup_convert_files(): | |
| """Clean up convert directory files""" | |
| try: | |
| for file in CONVERT_DIR.glob("*"): | |
| if file.is_file(): | |
| file.unlink() | |
| print("🧹 Convert directory cleaned") | |
| except Exception as e: | |
| print(f"⚠️ Convert cleanup error: {e}") | |
| def create_batch_zip(): | |
| """Create zip file of all output files""" | |
| try: | |
| output_files = list(OUTPUT_DIR.glob("*.mp4")) + list(OUTPUT_DIR.glob("*.avi")) | |
| if not output_files: | |
| return None | |
| zip_path = OUTPUT_DIR / f"batch_results_{int(time.time())}.zip" | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for file in output_files: | |
| zipf.write(file, file.name) | |
| print(f"📦 Added to zip: {file.name}") | |
| print(f"✅ Batch zip created: {zip_path.name}") | |
| return zip_path | |
| except Exception as e: | |
| print(f"❌ Zip creation failed: {e}") | |
| return None | |
| def get_download_file(): | |
| """Get the latest output file for download""" | |
| try: | |
| output_files = list(OUTPUT_DIR.glob("*.mp4")) + list(OUTPUT_DIR.glob("*.avi")) + list(OUTPUT_DIR.glob("*.zip")) | |
| if not output_files: | |
| return None, "📁 No output files found" | |
| latest_file = max(output_files, key=lambda f: f.stat().st_ctime) | |
| file_size = latest_file.stat().st_size / (1024 * 1024) # MB | |
| return str(latest_file), f"📥 Ready: {latest_file.name} ({file_size:.1f}MB)" | |
| except Exception as e: | |
| return None, f"❌ Error: {e}" | |
| def run_single_video(source_image, target_video, frame_processor, face_analyser_direction, | |
| face_recognition, face_analyser_gender, face_analyser_age, skip_audio, | |
| keep_fps, lip_syncer_model, enable_lip_sync, gpu_selection): | |
| """Process single video""" | |
| global last_output_path, last_batch_mode, current_process | |
| last_batch_mode = False | |
| try: | |
| set_gpu_device(gpu_selection) | |
| # Setup temp files | |
| temp_source = TEMP_DIR / 'source-image.jpg' | |
| temp_target = TEMP_DIR / 'resize-vid.mp4' | |
| # Copy and process files | |
| if not safe_copy_file(Path(source_image), temp_source): | |
| return "❌ Failed to copy source image", "" | |
| if not resize_video(Path(target_video), temp_target): | |
| return "❌ Video processing failed", "" | |
| # Generate output filename | |
| source_name = Path(source_image).stem | |
| target_name = Path(target_video).stem | |
| suffix = "_lipsynced" if enable_lip_sync else "" | |
| output_filename = f"{source_name}_{target_name}{suffix}.mp4" | |
| output_path = OUTPUT_DIR / output_filename | |
| # Handle lip sync | |
| audio_path = None | |
| if enable_lip_sync: | |
| audio_path = TEMP_DIR / 'target-audio.wav' | |
| if not extract_audio(temp_target, audio_path): | |
| print("⚠️ Lip sync disabled - audio extraction failed") | |
| enable_lip_sync = False | |
| # Build command | |
| execution_provider = "cuda" if gpu_selection.startswith("GPU") and "Error" not in gpu_selection else "cpu" | |
| cmd = [ | |
| sys.executable, "run.py", | |
| "--execution-providers", execution_provider, | |
| "--execution-thread-count", "8", | |
| "--reference-face-distance", "1.5", | |
| "-s", str(temp_source), | |
| "-t", str(temp_target), | |
| "-o", str(output_path), | |
| "--frame-processors"] + frame_processor + [ | |
| "--face-analyser-direction", face_analyser_direction, | |
| "--face-analyser-age", face_analyser_age | |
| ] | |
| if enable_lip_sync and audio_path: | |
| cmd.extend(["--source-paths", str(audio_path)]) | |
| cmd.extend(["--lip-syncer-model", lip_syncer_model]) | |
| if 'lip_syncer' not in frame_processor: | |
| idx = cmd.index("--frame-processors") + 1 | |
| cmd[idx:idx] = ['lip_syncer'] | |
| if face_recognition != 'none': | |
| cmd.extend(["--face-recognition", face_recognition]) | |
| if face_analyser_gender != 'none': | |
| cmd.extend(["--face-analyser-gender", face_analyser_gender]) | |
| if skip_audio and not enable_lip_sync: | |
| cmd.append("--skip-audio") | |
| if keep_fps: | |
| cmd.append("--keep-fps") | |
| print("🚀 Starting face swap processing...") | |
| print(f"📋 Command: {' '.join(cmd)}") | |
| start_time = time.time() | |
| current_process = sp.Popen( | |
| cmd, | |
| stdout=sp.PIPE, | |
| stderr=sp.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| cwd=str(BASE_DIR) | |
| ) | |
| cli_output = "" | |
| while True: | |
| output = current_process.stdout.readline() | |
| if output == '' and current_process.poll() is not None: | |
| break | |
| if output: | |
| line = output.strip() | |
| print(line) | |
| cli_output += line + "\n" | |
| # Keep output manageable | |
| lines = cli_output.split('\n') | |
| if len(lines) > 50: | |
| cli_output = '\n'.join(lines[-50:]) | |
| yield None, cli_output | |
| rc = current_process.poll() | |
| execution_time = time.time() - start_time | |
| if rc != 0: | |
| return "❌ Processing failed", cli_output + f"\n\n⏱️ Time: {execution_time:.2f}s" | |
| # Cleanup | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| if audio_path and audio_path.exists(): | |
| audio_path.unlink() | |
| except Exception as e: | |
| print(f"⚠️ Cleanup error: {e}") | |
| last_output_path = str(output_path) | |
| return str(output_path), cli_output + f"\n\n✅ Completed in {execution_time:.2f}s" | |
| except Exception as e: | |
| return f"❌ Error: {e}", "" | |
| def run_batch_processing(source_image, frame_processor, face_analyser_direction, face_recognition, | |
| face_analyser_gender, skip_audio, keep_fps, lip_syncer_model, enable_lip_sync, gpu_selection): | |
| """Process all videos in Convert folder""" | |
| global last_output_path, last_batch_mode, current_process | |
| last_batch_mode = True | |
| try: | |
| set_gpu_device(gpu_selection) | |
| video_extensions = ['*.mp4', '*.avi', '*.mov', '*.mkv'] | |
| video_files = [] | |
| for ext in video_extensions: | |
| video_files.extend(CONVERT_DIR.glob(ext)) | |
| if not video_files: | |
| yield None, f"📁 No video files found in Convert folder.\nPlease upload videos using the file input above." | |
| return | |
| temp_source = TEMP_DIR / 'source-image.jpg' | |
| if not safe_copy_file(Path(source_image), temp_source): | |
| yield None, "❌ Failed to copy source image" | |
| return | |
| source_name = Path(source_image).stem | |
| cli_output = f"📊 Processing {len(video_files)} videos in batch mode\n🎯 Source: {source_name}\n\n" | |
| yield None, cli_output | |
| successful = 0 | |
| failed = 0 | |
| for i, video_file in enumerate(video_files, 1): | |
| current_output = f"[{i}/{len(video_files)}] 🎬 {video_file.name}\n" | |
| cli_output += current_output | |
| yield None, cli_output | |
| temp_target = TEMP_DIR / 'resize-vid.mp4' | |
| if not resize_video(video_file, temp_target): | |
| error_msg = f"❌ Video resize failed\n" | |
| cli_output += error_msg | |
| failed += 1 | |
| yield None, cli_output | |
| continue | |
| suffix = "_lipsynced" if enable_lip_sync else "" | |
| output_filename = f"{source_name}_{video_file.stem}{suffix}.mp4" | |
| output_path = OUTPUT_DIR / output_filename | |
| # Handle lip sync | |
| audio_path = None | |
| if enable_lip_sync: | |
| audio_path = TEMP_DIR / 'target-audio.wav' | |
| if not extract_audio(temp_target, audio_path): | |
| enable_lip_sync = False | |
| # Build command | |
| execution_provider = "cuda" if gpu_selection.startswith("GPU") and "Error" not in gpu_selection else "cpu" | |
| cmd = [ | |
| sys.executable, "run.py", | |
| "--execution-providers", execution_provider, | |
| "--execution-thread-count", "8", | |
| "--reference-face-distance", "1.5", | |
| "-s", str(temp_source), | |
| "-t", str(temp_target), | |
| "-o", str(output_path), | |
| "--frame-processors"] + frame_processor + [ | |
| "--face-analyser-direction", face_analyser_direction | |
| ] | |
| if enable_lip_sync and audio_path: | |
| cmd.extend(["--source-paths", str(audio_path)]) | |
| cmd.extend(["--lip-syncer-model", lip_syncer_model]) | |
| if 'lip_syncer' not in frame_processor: | |
| idx = cmd.index("--frame-processors") + 1 | |
| cmd[idx:idx] = ['lip_syncer'] | |
| if face_recognition != 'none': | |
| cmd.extend(["--face-recognition", face_recognition]) | |
| if face_analyser_gender != 'none': | |
| cmd.extend(["--face-analyser-gender", face_analyser_gender]) | |
| if skip_audio and not enable_lip_sync: | |
| cmd.append("--skip-audio") | |
| if keep_fps: | |
| cmd.append("--keep-fps") | |
| try: | |
| start_time = time.time() | |
| current_process = sp.Popen( | |
| cmd, | |
| stdout=sp.PIPE, | |
| stderr=sp.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| cwd=str(BASE_DIR) | |
| ) | |
| while True: | |
| output = current_process.stdout.readline() | |
| if output == '' and current_process.poll() is not None: | |
| break | |
| if output: | |
| line = output.strip() | |
| print(line) | |
| rc = current_process.poll() | |
| execution_time = time.time() - start_time | |
| if rc == 0: | |
| success_msg = f"✅ Completed in {execution_time:.2f}s\n\n" | |
| cli_output += success_msg | |
| successful += 1 | |
| else: | |
| error_msg = f"❌ Processing failed\n\n" | |
| cli_output += error_msg | |
| failed += 1 | |
| yield None, cli_output | |
| # Cleanup | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| if audio_path and audio_path.exists(): | |
| audio_path.unlink() | |
| except Exception as e: | |
| print(f"⚠️ Cleanup error: {e}") | |
| except Exception as e: | |
| error_msg = f"❌ Error: {e}\n\n" | |
| cli_output += error_msg | |
| failed += 1 | |
| yield None, cli_output | |
| # Final summary | |
| final_msg = f"\n=== BATCH COMPLETE ===\n✅ Successful: {successful}\n❌ Failed: {failed}\n" | |
| cli_output += final_msg | |
| if successful > 0: | |
| last_output_path = str(create_batch_zip()) | |
| yield None, cli_output | |
| except Exception as e: | |
| yield None, f"❌ Batch processing error: {e}" | |
| def handle_processing(source_image, target_video, frame_processor, face_analyser_direction, face_recognition, | |
| face_analyser_gender, face_analyser_age, skip_audio, keep_fps, | |
| lip_syncer_model, enable_lip_sync, use_folder_mode, gpu_selection): | |
| """Main processing handler""" | |
| try: | |
| if use_folder_mode: | |
| for _, cli_output in run_batch_processing( | |
| source_image, frame_processor, face_analyser_direction, face_recognition, | |
| face_analyser_gender, skip_audio, keep_fps, lip_syncer_model, enable_lip_sync, gpu_selection | |
| ): | |
| yield cli_output, "⏹️ CANCEL" | |
| yield cli_output + "\n🎉 Batch processing complete!", "📥 DOWNLOAD" | |
| else: | |
| for video_result, cli_output in run_single_video( | |
| source_image, target_video, frame_processor, face_analyser_direction, face_recognition, | |
| face_analyser_gender, face_analyser_age, skip_audio, keep_fps, | |
| lip_syncer_model, enable_lip_sync, gpu_selection | |
| ): | |
| yield cli_output, "⏹️ CANCEL" | |
| if video_result and not video_result.startswith("❌"): | |
| yield cli_output + "\n🎉 Processing complete!", "📥 DOWNLOAD" | |
| else: | |
| yield cli_output, "🔄 RESET" | |
| except Exception as e: | |
| yield f"❌ Processing error: {e}", "🔄 RESET" | |
| def cancel_processing(): | |
| """Cancel current processing""" | |
| global current_process | |
| try: | |
| if current_process and current_process.poll() is None: | |
| current_process.terminate() | |
| current_process.wait(timeout=10) | |
| return "⏹️ Processing cancelled" | |
| else: | |
| return "⚠️ No active processing" | |
| except Exception as e: | |
| try: | |
| if current_process: | |
| current_process.kill() | |
| current_process.wait() | |
| return f"⏹️ Processing force-cancelled: {e}" | |
| except: | |
| return f"❌ Cancel failed: {e}" | |
| def reset_interface(): | |
| """Reset interface to defaults""" | |
| try: | |
| cleanup_temp_files() | |
| cleanup_convert_files() | |
| return ( | |
| None, # source_image | |
| None, # target_video | |
| ['face_swapper'] + (['face_enhancer'] if ENHANCEMENT_AVAILABLE else []), # frame_processor | |
| 'top-bottom', # face_analyser_direction | |
| 'reference', # face_recognition | |
| 'female', # face_analyser_gender | |
| 'adult', # face_analyser_age | |
| False, # skip_audio | |
| True, # keep_fps | |
| 'wav2lip_gan_96', # lip_syncer_model | |
| False, # enable_lip_sync | |
| False, # use_folder_mode | |
| AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only", # gpu_selection | |
| "🔧 Interface reset. Ready for new session!", # cli_output | |
| "🚀 START PROCESSING" # button text | |
| ) | |
| except Exception as e: | |
| return (None, None, ['face_swapper'], 'top-bottom', 'reference', 'female', 'adult', | |
| False, True, 'wav2lip_gan_96', False, False, "CPU Only", | |
| f"⚠️ Reset error: {e}", "🚀 START PROCESSING") | |
| def handle_download(): | |
| """Handle download button click""" | |
| try: | |
| download_path, status = get_download_file() | |
| if download_path: | |
| return download_path, status, gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| return None, status, gr.update(visible=False), gr.update(visible=True) | |
| except Exception as e: | |
| return None, f"❌ Download error: {e}", gr.update(visible=False), gr.update(visible=True) | |
| def handle_action_button(button_text, *inputs): | |
| """Handle multi-purpose action button""" | |
| try: | |
| if "RESET" in button_text: | |
| return reset_interface() | |
| elif "CANCEL" in button_text: | |
| cancel_msg = cancel_processing() | |
| return inputs + (cancel_msg, "🔄 RESET") | |
| else: | |
| return inputs + ("", button_text) | |
| except Exception as e: | |
| return inputs + (f"❌ Action error: {e}", "🔄 RESET") | |
| def toggle_batch_mode(use_folder_mode): | |
| """Handle batch mode toggle""" | |
| try: | |
| if use_folder_mode: | |
| return gr.update( | |
| label="📁 Target Videos (Drag multiple files here)", | |
| file_count="multiple", | |
| file_types=["video"] | |
| ) | |
| else: | |
| return gr.update( | |
| label="Target Video (Video to modify)", | |
| file_count="single", | |
| file_types=["video"] | |
| ) | |
| except Exception as e: | |
| print(f"⚠️ Toggle batch mode error: {e}") | |
| return gr.update(label="Target Video") | |
| def handle_file_upload(files, use_folder_mode): | |
| """Handle file uploads - single or multiple""" | |
| try: | |
| if use_folder_mode and files: | |
| # Handle batch upload | |
| return handle_batch_file_upload(files) | |
| elif not use_folder_mode and files: | |
| # Single file mode - just return status | |
| return f"✅ Single video uploaded: {Path(files.name).name if hasattr(files, 'name') else 'video file'}" | |
| else: | |
| return "📁 No files uploaded" | |
| except Exception as e: | |
| return f"❌ Upload error: {e}" | |
| # Initialize GPU detection | |
| try: | |
| AVAILABLE_GPUS = get_available_gpus() | |
| print(f"🖥️ Available GPUs: {AVAILABLE_GPUS}") | |
| except Exception as e: | |
| print(f"⚠️ GPU detection failed: {e}") | |
| AVAILABLE_GPUS = ["CPU Only"] | |
| # Gradio Interface | |
| def create_interface(): | |
| with gr.Blocks( | |
| theme=gr.themes.Monochrome( | |
| primary_hue=gr.themes.colors.teal, | |
| secondary_hue=gr.themes.colors.gray, | |
| font=gr.themes.GoogleFont('Inter') | |
| ).set( | |
| background_fill_primary="#1f1f1f", | |
| background_fill_secondary="#2d2d2d" | |
| ), | |
| css=""" | |
| .gradio-container { max-width: 1400px !important; margin: 0 auto !important; } | |
| .main-header { text-align: center; padding: 1rem; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 12px; color: white; margin-bottom: 1rem; } | |
| .control-panel { background: rgba(102, 126, 234, 0.1); border-radius: 12px; padding: 1rem; margin-bottom: 1rem; border: 2px solid rgba(102, 126, 234, 0.2); } | |
| .section-header { font-weight: 600; color: #667eea; margin-bottom: 1rem; border-bottom: 2px solid #667eea; padding-bottom: 0.5rem; } | |
| """ | |
| ) as interface: | |
| # Header | |
| with gr.Column(elem_classes="main-header"): | |
| gr.Markdown("# 🎭 Advanced Face Swap Studio\n**HuggingFace Spaces Optimized**") | |
| with gr.Row(): | |
| # Left Column - Input & Controls | |
| with gr.Column(scale=2): | |
| with gr.Group(elem_classes="control-panel"): | |
| gr.HTML('<div class="section-header">📸 Input Files</div>') | |
| source_image = gr.File( | |
| label="Source Image (Face to use)", | |
| file_types=["image"], | |
| file_count="single" | |
| ) | |
| # Batch mode toggle | |
| use_folder_mode = gr.Checkbox( | |
| label="📁 Batch Mode (Process multiple videos)", | |
| value=False | |
| ) | |
| target_video = gr.File( | |
| label="Target Video (Video to modify)", | |
| file_types=["video"], | |
| file_count="single" | |
| ) | |
| # Upload status display | |
| upload_status = gr.Textbox( | |
| label="Upload Status", | |
| value="Ready to upload files...", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| with gr.Group(elem_classes="control-panel"): | |
| gr.HTML('<div class="section-header">🎮 Controls</div>') | |
| start_button = gr.Button("🚀 START PROCESSING", variant="primary", size="lg") | |
| action_button = gr.Button("🔄 RESET", variant="secondary", size="lg") | |
| download_button = gr.Button("📥 DOWNLOAD", variant="secondary", size="lg") | |
| download_status = gr.Textbox( | |
| label="Download Status", | |
| value="Ready for processing...", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| download_file = gr.File( | |
| label="Download File", | |
| visible=False, | |
| interactive=False | |
| ) | |
| # Middle Column - Configuration | |
| with gr.Column(scale=3): | |
| with gr.Group(elem_classes="control-panel"): | |
| gr.HTML('<div class="section-header">⚙️ Processing Configuration</div>') | |
| with gr.Row(): | |
| with gr.Column(): | |
| # Frame processing | |
| available_processors = ['face_swapper'] | |
| if ENHANCEMENT_AVAILABLE: | |
| available_processors.extend(['face_enhancer', 'frame_enhancer']) | |
| frame_processor = gr.CheckboxGroup( | |
| choices=available_processors, | |
| label='Frame Processors', | |
| value=['face_swapper'] + (['face_enhancer'] if ENHANCEMENT_AVAILABLE else []) | |
| ) | |
| enable_lip_sync = gr.Checkbox(label="🎵 Enable Lip Sync (Beta)", value=False) | |
| lip_syncer_model = gr.Dropdown( | |
| label='Lip Sync Model', | |
| choices=['wav2lip_96', 'wav2lip_gan_96'], | |
| value='wav2lip_gan_96', | |
| visible=False | |
| ) | |
| with gr.Column(): | |
| # Face analysis | |
| face_recognition = gr.Dropdown( | |
| label='Recognition Mode', | |
| choices=['none', 'reference', 'many'], | |
| value='reference' | |
| ) | |
| face_analyser_direction = gr.Dropdown( | |
| label='Analysis Direction', | |
| choices=['left-right', 'right-left', 'top-bottom', 'bottom-top', 'small-large', 'large-small'], | |
| value='top-bottom' | |
| ) | |
| face_analyser_gender = gr.Dropdown( | |
| label='Target Gender', | |
| choices=['none', 'male', 'female'], | |
| value='female' | |
| ) | |
| face_analyser_age = gr.Dropdown( | |
| label='Target Age Group', | |
| choices=['child', 'teen', 'adult', 'senior'], | |
| value='adult' | |
| ) | |
| # Right Column - Monitor & Options | |
| with gr.Column(scale=3): | |
| with gr.Group(elem_classes="control-panel"): | |
| gr.HTML('<div class="section-header">🖥️ Processing Monitor</div>') | |
| cli_output = gr.Textbox( | |
| label="Live Processing Output", | |
| lines=15, | |
| interactive=False, | |
| show_copy_button=True, | |
| placeholder="🔧 System ready. Configure settings and start processing..." | |
| ) | |
| with gr.Group(elem_classes="control-panel"): | |
| gr.HTML('<div class="section-header">🛠️ Processing Options</div>') | |
| with gr.Row(): | |
| with gr.Column(): | |
| gpu_selection = gr.Dropdown( | |
| label="🖥️ Compute Device", | |
| choices=AVAILABLE_GPUS, | |
| value=AVAILABLE_GPUS[0] if AVAILABLE_GPUS else "CPU Only" | |
| ) | |
| skip_audio = gr.Checkbox(label="🔇 Skip Audio", value=False) | |
| with gr.Column(): | |
| keep_fps = gr.Checkbox(label="🎬 Keep Original FPS", value=True) | |
| # Event handlers with error handling | |
| try: | |
| enable_lip_sync.change( | |
| lambda x: gr.update(visible=x), | |
| inputs=[enable_lip_sync], | |
| outputs=[lip_syncer_model] | |
| ) | |
| use_folder_mode.change( | |
| toggle_batch_mode, | |
| inputs=[use_folder_mode], | |
| outputs=[target_video] | |
| ) | |
| target_video.upload( | |
| handle_file_upload, | |
| inputs=[target_video, use_folder_mode], | |
| outputs=[upload_status] | |
| ) | |
| start_button.click( | |
| handle_processing, | |
| inputs=[ | |
| source_image, target_video, frame_processor, face_analyser_direction, | |
| face_recognition, face_analyser_gender, face_analyser_age, | |
| skip_audio, keep_fps, lip_syncer_model, enable_lip_sync, | |
| use_folder_mode, gpu_selection | |
| ], | |
| outputs=[cli_output, action_button] | |
| ) | |
| action_button.click( | |
| handle_action_button, | |
| inputs=[ | |
| action_button, source_image, target_video, frame_processor, | |
| face_analyser_direction, face_recognition, face_analyser_gender, | |
| face_analyser_age, skip_audio, keep_fps, lip_syncer_model, | |
| enable_lip_sync, use_folder_mode, gpu_selection | |
| ], | |
| outputs=[ | |
| source_image, target_video, frame_processor, face_analyser_direction, | |
| face_recognition, face_analyser_gender, face_analyser_age, | |
| skip_audio, keep_fps, lip_syncer_model, enable_lip_sync, | |
| use_folder_mode, gpu_selection, cli_output, action_button | |
| ] | |
| ) | |
| download_button.click( | |
| handle_download, | |
| outputs=[download_file, download_status, download_file, download_button] | |
| ) | |
| download_file.change( | |
| lambda: (gr.update(visible=False), gr.update(visible=True), "Ready for next download"), | |
| outputs=[download_file, download_button, download_status] | |
| ) | |
| except Exception as e: | |
| print(f"⚠️ Error setting up event handlers: {e}") | |
| return interface | |
| # Launch application | |
| if __name__ == "__main__": | |
| print("\n" + "="*60) | |
| print("🎭 Advanced Face Swap Studio - HuggingFace Spaces") | |
| print("="*60) | |
| print(f"📁 Directories configured:") | |
| print(f" - Base: {BASE_DIR}") | |
| print(f" - Temp: {TEMP_DIR}") | |
| print(f" - Output: {OUTPUT_DIR}") | |
| print(f" - Convert: {CONVERT_DIR}") | |
| print(f"🖥️ GPU Support: {torch.cuda.is_available()}") | |
| print(f"🎬 MoviePy: {'✅' if MOVIEPY_AVAILABLE else '❌'}") | |
| print(f"✨ Enhancement: {'✅' if ENHANCEMENT_AVAILABLE else '❌'}") | |
| print("="*60) | |
| # Clean startup | |
| cleanup_temp_files() | |
| # Create and launch interface | |
| try: | |
| app = create_interface() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=False, | |
| show_error=True, | |
| max_file_size="1500mb" | |
| ) | |
| except Exception as e: | |
| print(f"❌ Failed to launch application: {e}") | |
| print("🔄 Please check your dependencies and try again") |