FWC / app.py
FLODARELTIH's picture
Update app.py
16ffe68 verified
# -*- coding: UTF-8 -*-
#!/usr/bin/env python
import os
import tempfile
import numpy as np
import gradio as gr
from PIL import Image
import roop.globals
from roop.core import (
start,
decode_execution_providers,
suggest_execution_threads,
)
from roop.processors.frame.core import get_frame_processors_modules
from roop.utilities import normalize_output_path
import threading
import time
import traceback
# Global lock to prevent concurrent processing
processing_lock = threading.Lock()
def optimize_memory_settings():
"""Optimize memory usage for 14GB RAM environment"""
# For 14GB RAM, allocate max 12GB to avoid OOM
roop.globals.max_memory = 12 # Changed from 16 to 12GB for safety
# For 2 vCPU, limit threads to 2
roop.globals.execution_threads = 2
# Optimize video encoding settings for faster processing
roop.globals.video_encoder = "libx264"
roop.globals.video_quality = 23 # Slightly lower quality for faster processing
# Use CPU execution for stability on limited resources
roop.globals.execution_providers = decode_execution_providers(['cpu'])
# Optimize face detection settings
roop.globals.similar_face_distance = 0.85
roop.globals.reference_face_position = 0
roop.globals.many_faces = False
def validate_inputs(source_img, target_img):
"""Validate input images before processing"""
if source_img is None or target_img is None:
raise ValueError("Please provide both source and target images")
# Convert to PIL Image if needed
if isinstance(source_img, np.ndarray):
source_img = Image.fromarray(source_img)
if isinstance(target_img, np.ndarray):
target_img = Image.fromarray(target_img)
# Check image sizes and resize if too large (for memory optimization)
max_size = (1024, 1024) # Maximum dimensions
if source_img.size[0] > max_size[0] or source_img.size[1] > max_size[1]:
source_img.thumbnail(max_size, Image.Resampling.LANCZOS)
print(f"Resized source image to {source_img.size}")
if target_img.size[0] > max_size[0] or target_img.size[1] > max_size[1]:
target_img.thumbnail(max_size, Image.Resampling.LANCZOS)
print(f"Resized target image to {target_img.size}")
return source_img, target_img
def swap_face(source_img, target_img, use_face_enhancer, progress=gr.Progress()):
"""
Main face swap function with progress tracking and memory optimization
"""
# Prevent concurrent processing
if not processing_lock.acquire(blocking=False):
raise RuntimeError("Another face swap is currently in progress. Please wait.")
try:
progress(0, desc="Validating inputs...")
# Validate and preprocess inputs
source_img, target_img = validate_inputs(source_img, target_img)
# Create temporary directory for processing
with tempfile.TemporaryDirectory() as temp_dir:
source_path = os.path.join(temp_dir, "source.jpg")
target_path = os.path.join(temp_dir, "target.jpg")
output_path = os.path.join(temp_dir, "output.jpg")
# Save images to temp files
progress(0.1, desc="Saving images...")
source_img.save(source_path, "JPEG", quality=95)
target_img.save(target_path, "JPEG", quality=95)
# Configure roop settings
roop.globals.source_path = source_path
roop.globals.target_path = target_path
roop.globals.output_path = normalize_output_path(
source_path, target_path, output_path
)
# Set frame processors based on user selection
if use_face_enhancer:
roop.globals.frame_processors = ["face_swapper", "face_enhancer"]
progress(0.2, desc="Configuring face swapper and enhancer...")
else:
roop.globals.frame_processors = ["face_swapper"]
progress(0.2, desc="Configuring face swapper...")
# Set optimization flags
roop.globals.headless = True
roop.globals.keep_fps = True
roop.globals.keep_audio = True
roop.globals.keep_frames = False
# Apply memory-optimized settings
optimize_memory_settings()
# Pre-check frame processors
progress(0.3, desc="Initializing processors...")
for frame_processor in get_frame_processors_modules(
roop.globals.frame_processors
):
if not frame_processor.pre_check():
raise RuntimeError(f"Processor {frame_processor} failed pre-check")
# Execute face swap
progress(0.4, desc="Starting face swap...")
# We need to simulate progress since roop doesn't have callbacks
# Start processing in a separate thread to monitor progress
def process_with_progress():
start()
# Start processing
process_thread = threading.Thread(target=process_with_progress)
process_thread.start()
# Simulate progress updates (since roop doesn't provide progress callbacks)
for i in range(5):
if process_thread.is_alive():
progress(0.5 + (i * 0.1), desc=f"Processing... step {i+1}/5")
time.sleep(0.5)
else:
break
# Wait for completion
process_thread.join()
progress(0.95, desc="Finalizing output...")
# Check if output was created
if not os.path.exists(roop.globals.output_path):
# Try alternative output path
output_path = os.path.join(os.path.dirname(target_path),
"output",
os.path.basename(target_path))
if not os.path.exists(output_path):
raise FileNotFoundError("Output file was not created")
# Load and return output image
result_img = Image.open(roop.globals.output_path)
# Convert to RGB if necessary
if result_img.mode in ('RGBA', 'LA', 'P'):
result_img = result_img.convert('RGB')
progress(1.0, desc="Processing complete!")
return result_img
except Exception as e:
error_msg = f"Error during face swap: {str(e)}\n\n{traceback.format_exc()}"
print(error_msg)
raise gr.Error(f"Processing failed: {str(e)}")
finally:
# Always release the lock
processing_lock.release()
# Create a more user-friendly UI
css = """
.gradio-container {
max-width: 1200px !important;
}
.thumbnail {
border-radius: 10px;
border: 2px solid #e0e0e0;
}
.thumbnail:hover {
border-color: #4CAF50;
}
.output-image {
border-radius: 10px;
border: 3px solid #4CAF50;
}
"""
with gr.Blocks(title="Face Swap Optimized", css=css) as app:
gr.Markdown("# 🎭 Face Swap Optimized")
gr.Markdown("### Optimized for 14GB RAM & 2 vCPU environment")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Source Face")
source_image = gr.Image(
label="Upload source face",
type="numpy",
elem_classes="thumbnail"
)
with gr.Column(scale=1):
gr.Markdown("### Target Image")
target_image = gr.Image(
label="Upload target image",
type="numpy",
elem_classes="thumbnail"
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Processing Options")
with gr.Group():
use_enhancer = gr.Checkbox(
label="Enable Face Enhancer",
value=False,
info="Improves face quality but uses more resources"
)
gr.Markdown("---")
info_text = gr.Markdown("""
**Optimization Settings (Auto-configured):**
- Max Memory: 12GB
- Threads: 2
- Quality: Balanced
- Processing: Single-face mode
**Tips for best results:**
1. Use clear, front-facing face photos
2. Ensure good lighting in source image
3. Keep image sizes under 1024px
4. Process one image at a time
""")
with gr.Row():
process_btn = gr.Button(
"🎭 Swap Faces",
variant="primary",
size="lg"
)
clear_btn = gr.Button("Clear All", variant="secondary")
with gr.Row():
gr.Markdown("### Result")
output_image = gr.Image(
label="Swapped Result",
type="pil",
elem_classes="output-image"
)
# Examples for quick testing
with gr.Accordion("📁 Example Images (Click to load)", open=False):
gr.Markdown("Try these examples to test the system:")
with gr.Row():
gr.Examples(
examples=[
["https://upload.wikimedia.org/wikipedia/commons/thumb/d/d8/Person_icon_BLACK-01.svg/1200px-Person_icon_BLACK-01.svg.png",
"https://upload.wikimedia.org/wikipedia/commons/thumb/d/d8/Person_icon_BLACK-01.svg/1200px-Person_icon_BLACK-01.svg.png"],
],
inputs=[source_image, target_image],
label="Basic Examples"
)
# Status indicator
status = gr.Markdown("**Status:** Ready")
# Process button click handler
process_btn.click(
fn=swap_face,
inputs=[source_image, target_image, use_enhancer],
outputs=output_image,
api_name="swap_face"
).then(
fn=lambda: "**Status:** Processing complete! ✅",
outputs=status
)
# Clear button functionality
def clear_all():
return None, None, None, "**Status:** Cleared"
clear_btn.click(
fn=clear_all,
outputs=[source_image, target_image, output_image, status]
)
# Add loading state for process button
process_btn.click(
fn=lambda: "**Status:** Processing... ⏳",
outputs=status
)
if __name__ == "__main__":
# Configure server for 2 vCPU
app.queue(
max_size=1, # Process one at a time due to memory constraints
concurrency_count=1 # Only one concurrent process
).launch(
server_name="0.0.0.0",
server_port=7860,
share=False, # Disable sharing to reduce load
debug=False, # Disable debug for performance
show_error=True,
quiet=True # Reduce console output
)