Spaces:
Build error
Build error
| # -*- coding: UTF-8 -*- | |
| #!/usr/bin/env python | |
| import os | |
| import tempfile | |
| import numpy as np | |
| import gradio as gr | |
| from PIL import Image | |
| import roop.globals | |
| from roop.core import ( | |
| start, | |
| decode_execution_providers, | |
| suggest_execution_threads, | |
| ) | |
| from roop.processors.frame.core import get_frame_processors_modules | |
| from roop.utilities import normalize_output_path | |
| import threading | |
| import time | |
| import traceback | |
| # Global lock to prevent concurrent processing | |
| processing_lock = threading.Lock() | |
| def optimize_memory_settings(): | |
| """Optimize memory usage for 14GB RAM environment""" | |
| # For 14GB RAM, allocate max 12GB to avoid OOM | |
| roop.globals.max_memory = 12 # Changed from 16 to 12GB for safety | |
| # For 2 vCPU, limit threads to 2 | |
| roop.globals.execution_threads = 2 | |
| # Optimize video encoding settings for faster processing | |
| roop.globals.video_encoder = "libx264" | |
| roop.globals.video_quality = 23 # Slightly lower quality for faster processing | |
| # Use CPU execution for stability on limited resources | |
| roop.globals.execution_providers = decode_execution_providers(['cpu']) | |
| # Optimize face detection settings | |
| roop.globals.similar_face_distance = 0.85 | |
| roop.globals.reference_face_position = 0 | |
| roop.globals.many_faces = False | |
| def validate_inputs(source_img, target_img): | |
| """Validate input images before processing""" | |
| if source_img is None or target_img is None: | |
| raise ValueError("Please provide both source and target images") | |
| # Convert to PIL Image if needed | |
| if isinstance(source_img, np.ndarray): | |
| source_img = Image.fromarray(source_img) | |
| if isinstance(target_img, np.ndarray): | |
| target_img = Image.fromarray(target_img) | |
| # Check image sizes and resize if too large (for memory optimization) | |
| max_size = (1024, 1024) # Maximum dimensions | |
| if source_img.size[0] > max_size[0] or source_img.size[1] > max_size[1]: | |
| source_img.thumbnail(max_size, Image.Resampling.LANCZOS) | |
| print(f"Resized source image to {source_img.size}") | |
| if target_img.size[0] > max_size[0] or target_img.size[1] > max_size[1]: | |
| target_img.thumbnail(max_size, Image.Resampling.LANCZOS) | |
| print(f"Resized target image to {target_img.size}") | |
| return source_img, target_img | |
| def swap_face(source_img, target_img, use_face_enhancer, progress=gr.Progress()): | |
| """ | |
| Main face swap function with progress tracking and memory optimization | |
| """ | |
| # Prevent concurrent processing | |
| if not processing_lock.acquire(blocking=False): | |
| raise RuntimeError("Another face swap is currently in progress. Please wait.") | |
| try: | |
| progress(0, desc="Validating inputs...") | |
| # Validate and preprocess inputs | |
| source_img, target_img = validate_inputs(source_img, target_img) | |
| # Create temporary directory for processing | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| source_path = os.path.join(temp_dir, "source.jpg") | |
| target_path = os.path.join(temp_dir, "target.jpg") | |
| output_path = os.path.join(temp_dir, "output.jpg") | |
| # Save images to temp files | |
| progress(0.1, desc="Saving images...") | |
| source_img.save(source_path, "JPEG", quality=95) | |
| target_img.save(target_path, "JPEG", quality=95) | |
| # Configure roop settings | |
| roop.globals.source_path = source_path | |
| roop.globals.target_path = target_path | |
| roop.globals.output_path = normalize_output_path( | |
| source_path, target_path, output_path | |
| ) | |
| # Set frame processors based on user selection | |
| if use_face_enhancer: | |
| roop.globals.frame_processors = ["face_swapper", "face_enhancer"] | |
| progress(0.2, desc="Configuring face swapper and enhancer...") | |
| else: | |
| roop.globals.frame_processors = ["face_swapper"] | |
| progress(0.2, desc="Configuring face swapper...") | |
| # Set optimization flags | |
| roop.globals.headless = True | |
| roop.globals.keep_fps = True | |
| roop.globals.keep_audio = True | |
| roop.globals.keep_frames = False | |
| # Apply memory-optimized settings | |
| optimize_memory_settings() | |
| # Pre-check frame processors | |
| progress(0.3, desc="Initializing processors...") | |
| for frame_processor in get_frame_processors_modules( | |
| roop.globals.frame_processors | |
| ): | |
| if not frame_processor.pre_check(): | |
| raise RuntimeError(f"Processor {frame_processor} failed pre-check") | |
| # Execute face swap | |
| progress(0.4, desc="Starting face swap...") | |
| # We need to simulate progress since roop doesn't have callbacks | |
| # Start processing in a separate thread to monitor progress | |
| def process_with_progress(): | |
| start() | |
| # Start processing | |
| process_thread = threading.Thread(target=process_with_progress) | |
| process_thread.start() | |
| # Simulate progress updates (since roop doesn't provide progress callbacks) | |
| for i in range(5): | |
| if process_thread.is_alive(): | |
| progress(0.5 + (i * 0.1), desc=f"Processing... step {i+1}/5") | |
| time.sleep(0.5) | |
| else: | |
| break | |
| # Wait for completion | |
| process_thread.join() | |
| progress(0.95, desc="Finalizing output...") | |
| # Check if output was created | |
| if not os.path.exists(roop.globals.output_path): | |
| # Try alternative output path | |
| output_path = os.path.join(os.path.dirname(target_path), | |
| "output", | |
| os.path.basename(target_path)) | |
| if not os.path.exists(output_path): | |
| raise FileNotFoundError("Output file was not created") | |
| # Load and return output image | |
| result_img = Image.open(roop.globals.output_path) | |
| # Convert to RGB if necessary | |
| if result_img.mode in ('RGBA', 'LA', 'P'): | |
| result_img = result_img.convert('RGB') | |
| progress(1.0, desc="Processing complete!") | |
| return result_img | |
| except Exception as e: | |
| error_msg = f"Error during face swap: {str(e)}\n\n{traceback.format_exc()}" | |
| print(error_msg) | |
| raise gr.Error(f"Processing failed: {str(e)}") | |
| finally: | |
| # Always release the lock | |
| processing_lock.release() | |
| # Create a more user-friendly UI | |
| css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| } | |
| .thumbnail { | |
| border-radius: 10px; | |
| border: 2px solid #e0e0e0; | |
| } | |
| .thumbnail:hover { | |
| border-color: #4CAF50; | |
| } | |
| .output-image { | |
| border-radius: 10px; | |
| border: 3px solid #4CAF50; | |
| } | |
| """ | |
| with gr.Blocks(title="Face Swap Optimized", css=css) as app: | |
| gr.Markdown("# 🎭 Face Swap Optimized") | |
| gr.Markdown("### Optimized for 14GB RAM & 2 vCPU environment") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Source Face") | |
| source_image = gr.Image( | |
| label="Upload source face", | |
| type="numpy", | |
| elem_classes="thumbnail" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Target Image") | |
| target_image = gr.Image( | |
| label="Upload target image", | |
| type="numpy", | |
| elem_classes="thumbnail" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Processing Options") | |
| with gr.Group(): | |
| use_enhancer = gr.Checkbox( | |
| label="Enable Face Enhancer", | |
| value=False, | |
| info="Improves face quality but uses more resources" | |
| ) | |
| gr.Markdown("---") | |
| info_text = gr.Markdown(""" | |
| **Optimization Settings (Auto-configured):** | |
| - Max Memory: 12GB | |
| - Threads: 2 | |
| - Quality: Balanced | |
| - Processing: Single-face mode | |
| **Tips for best results:** | |
| 1. Use clear, front-facing face photos | |
| 2. Ensure good lighting in source image | |
| 3. Keep image sizes under 1024px | |
| 4. Process one image at a time | |
| """) | |
| with gr.Row(): | |
| process_btn = gr.Button( | |
| "🎭 Swap Faces", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| clear_btn = gr.Button("Clear All", variant="secondary") | |
| with gr.Row(): | |
| gr.Markdown("### Result") | |
| output_image = gr.Image( | |
| label="Swapped Result", | |
| type="pil", | |
| elem_classes="output-image" | |
| ) | |
| # Examples for quick testing | |
| with gr.Accordion("📁 Example Images (Click to load)", open=False): | |
| gr.Markdown("Try these examples to test the system:") | |
| with gr.Row(): | |
| gr.Examples( | |
| examples=[ | |
| ["https://upload.wikimedia.org/wikipedia/commons/thumb/d/d8/Person_icon_BLACK-01.svg/1200px-Person_icon_BLACK-01.svg.png", | |
| "https://upload.wikimedia.org/wikipedia/commons/thumb/d/d8/Person_icon_BLACK-01.svg/1200px-Person_icon_BLACK-01.svg.png"], | |
| ], | |
| inputs=[source_image, target_image], | |
| label="Basic Examples" | |
| ) | |
| # Status indicator | |
| status = gr.Markdown("**Status:** Ready") | |
| # Process button click handler | |
| process_btn.click( | |
| fn=swap_face, | |
| inputs=[source_image, target_image, use_enhancer], | |
| outputs=output_image, | |
| api_name="swap_face" | |
| ).then( | |
| fn=lambda: "**Status:** Processing complete! ✅", | |
| outputs=status | |
| ) | |
| # Clear button functionality | |
| def clear_all(): | |
| return None, None, None, "**Status:** Cleared" | |
| clear_btn.click( | |
| fn=clear_all, | |
| outputs=[source_image, target_image, output_image, status] | |
| ) | |
| # Add loading state for process button | |
| process_btn.click( | |
| fn=lambda: "**Status:** Processing... ⏳", | |
| outputs=status | |
| ) | |
| if __name__ == "__main__": | |
| # Configure server for 2 vCPU | |
| app.queue( | |
| max_size=1, # Process one at a time due to memory constraints | |
| concurrency_count=1 # Only one concurrent process | |
| ).launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, # Disable sharing to reduce load | |
| debug=False, # Disable debug for performance | |
| show_error=True, | |
| quiet=True # Reduce console output | |
| ) |