Spaces:
Sleeping
Sleeping
| """ | |
| Hugging Face Spaces - Face Swap App | |
| Gradio interface for image and video face swapping | |
| Supports: ZeroGPU, T4 GPU, and CPU-only environments | |
| """ | |
| import os | |
| import cv2 | |
| import numpy as np | |
| import gradio as gr | |
| import urllib.request | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| import tempfile | |
| import subprocess | |
| # Try to import spaces module for ZeroGPU decorator | |
| try: | |
| import spaces | |
| SPACES_AVAILABLE = True | |
| print("β HF Spaces module available - ZeroGPU mode enabled") | |
| except ImportError: | |
| SPACES_AVAILABLE = False | |
| print("βΉοΈ Running without ZeroGPU (dedicated GPU or CPU mode)") | |
| # Model URL | |
| MODEL_URL = "https://huggingface.co/ezioruan/inswapper_128.onnx/resolve/main/inswapper_128.onnx" | |
| MODEL_PATH = "inswapper_128.onnx" | |
| # Download model if needed (do this at startup, it's just a file download) | |
| if not os.path.exists(MODEL_PATH): | |
| print("Downloading model...") | |
| urllib.request.urlretrieve(MODEL_URL, MODEL_PATH) | |
| print("Model downloaded!") | |
| # Global model cache - will be initialized lazily | |
| _face_app = None | |
| _swapper = None | |
| _models_initialized = False | |
| def get_models(): | |
| """ | |
| Lazy model initialization - loads models on first use. | |
| For ZeroGPU: This runs INSIDE the @spaces.GPU decorated function, | |
| so CUDA is available and models will use GPU acceleration. | |
| """ | |
| global _face_app, _swapper, _models_initialized | |
| if _models_initialized: | |
| return _face_app, _swapper | |
| print("Initializing models...") | |
| # Check available providers | |
| import onnxruntime as ort | |
| available = ort.get_available_providers() | |
| print(f"Available ONNX providers: {available}") | |
| # Use CUDA if available (it will be available inside @spaces.GPU) | |
| if 'CUDAExecutionProvider' in available: | |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| ctx_id = 0 | |
| print("β Using CUDA GPU acceleration") | |
| else: | |
| providers = ['CPUExecutionProvider'] | |
| ctx_id = -1 | |
| print("Using CPU") | |
| # Initialize face analyzer | |
| _face_app = FaceAnalysis(name='buffalo_l', providers=providers) | |
| _face_app.prepare(ctx_id=ctx_id, det_size=(640, 640)) | |
| # Initialize swapper | |
| _swapper = insightface.model_zoo.get_model(MODEL_PATH, providers=providers) | |
| _models_initialized = True | |
| print("Models loaded!") | |
| return _face_app, _swapper | |
| # For non-ZeroGPU environments, initialize models at startup | |
| if not SPACES_AVAILABLE: | |
| print("Pre-loading models for non-ZeroGPU environment...") | |
| get_models() | |
| def _swap_faces_impl(source_image, target_image): | |
| """Core face swap implementation""" | |
| if source_image is None or target_image is None: | |
| return None, "Please upload both images" | |
| # Get models (lazy initialization - uses GPU inside @spaces.GPU) | |
| face_app, swapper = get_models() | |
| # Convert to BGR (OpenCV format) | |
| source_img = cv2.cvtColor(source_image, cv2.COLOR_RGB2BGR) | |
| target_img = cv2.cvtColor(target_image, cv2.COLOR_RGB2BGR) | |
| # Detect faces | |
| source_faces = face_app.get(source_img) | |
| if not source_faces: | |
| return None, "No face detected in source image" | |
| target_faces = face_app.get(target_img) | |
| if not target_faces: | |
| return None, "No face detected in target image" | |
| # Swap faces | |
| source_face = source_faces[0] | |
| result = target_img.copy() | |
| for target_face in target_faces: | |
| result = swapper.get(result, target_face, source_face, paste_back=True) | |
| # Convert back to RGB | |
| result_rgb = cv2.cvtColor(result, cv2.COLOR_BGR2RGB) | |
| return result_rgb, "Face swap completed!" | |
| # Apply @spaces.GPU decorator if available (for ZeroGPU) | |
| if SPACES_AVAILABLE: | |
| def swap_faces(source_image, target_image): | |
| """Swap face from source onto target image (GPU accelerated)""" | |
| return _swap_faces_impl(source_image, target_image) | |
| else: | |
| def swap_faces(source_image, target_image): | |
| """Swap face from source onto target image""" | |
| return _swap_faces_impl(source_image, target_image) | |
| def _swap_faces_video_impl(source_image, target_video, progress_callback=None): | |
| """Core video face swap implementation""" | |
| if source_image is None: | |
| return None, "Please upload a source face image" | |
| if target_video is None: | |
| return None, "Please upload a target video" | |
| # Get models (lazy initialization - uses GPU inside @spaces.GPU) | |
| face_app, swapper = get_models() | |
| # Convert source to BGR | |
| source_img = cv2.cvtColor(source_image, cv2.COLOR_RGB2BGR) | |
| # Detect source face | |
| source_faces = face_app.get(source_img) | |
| if not source_faces: | |
| return None, "No face detected in source image" | |
| source_face = source_faces[0] | |
| # Open video | |
| cap = cv2.VideoCapture(target_video) | |
| if not cap.isOpened(): | |
| return None, "Could not open video file" | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Limit video length (prevent timeout) | |
| max_frames = 300 # ~10 seconds at 30fps | |
| if total_frames > max_frames: | |
| cap.release() | |
| return None, f"Video too long! Max {max_frames} frames (~10 seconds). Your video has {total_frames} frames." | |
| # Create temp output file | |
| temp_dir = tempfile.mkdtemp() | |
| temp_output = os.path.join(temp_dir, "temp_output.mp4") | |
| final_output = os.path.join(temp_dir, "output.mp4") | |
| # Write video without audio first | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height)) | |
| frame_count = 0 | |
| faces_swapped = 0 | |
| if progress_callback: | |
| progress_callback(0, desc="Processing video...") | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| # Detect faces in frame | |
| target_faces = face_app.get(frame) | |
| if target_faces: | |
| # Swap all faces in frame | |
| result_frame = frame.copy() | |
| for target_face in target_faces: | |
| result_frame = swapper.get(result_frame, target_face, source_face, paste_back=True) | |
| faces_swapped += 1 | |
| else: | |
| result_frame = frame | |
| out.write(result_frame) | |
| # Update progress | |
| if progress_callback: | |
| progress_callback(frame_count / total_frames, desc=f"Frame {frame_count}/{total_frames}") | |
| cap.release() | |
| out.release() | |
| # Try to add audio back using ffmpeg | |
| if progress_callback: | |
| progress_callback(0.95, desc="Adding audio...") | |
| try: | |
| ffmpeg_cmd = [ | |
| 'ffmpeg', '-y', | |
| '-i', temp_output, # Video without audio | |
| '-i', target_video, # Original video (for audio) | |
| '-c:v', 'libx264', # H.264 codec | |
| '-preset', 'fast', | |
| '-crf', '23', | |
| '-c:a', 'aac', # AAC audio | |
| '-map', '0:v:0', # Video from first input | |
| '-map', '1:a:0?', # Audio from second input (optional) | |
| '-shortest', | |
| '-pix_fmt', 'yuv420p', | |
| '-movflags', '+faststart', | |
| final_output | |
| ] | |
| subprocess.run(ffmpeg_cmd, capture_output=True, timeout=30) | |
| if os.path.exists(final_output): | |
| output_path = final_output | |
| else: | |
| output_path = temp_output | |
| except Exception as e: | |
| print(f"FFmpeg error: {e}") | |
| output_path = temp_output | |
| if progress_callback: | |
| progress_callback(1.0, desc="Complete!") | |
| return output_path, f"β Video processed! {faces_swapped}/{frame_count} frames had faces swapped." | |
| # Apply @spaces.GPU decorator if available (for ZeroGPU) | |
| if SPACES_AVAILABLE: | |
| def swap_faces_video(source_image, target_video, progress=gr.Progress()): | |
| """Swap faces in video (GPU accelerated)""" | |
| return _swap_faces_video_impl(source_image, target_video, progress) | |
| else: | |
| def swap_faces_video(source_image, target_video, progress=gr.Progress()): | |
| """Swap faces in video""" | |
| return _swap_faces_video_impl(source_image, target_video, progress) | |
| # Create tabbed Gradio interface | |
| with gr.Blocks(title="π MVR Face Swap") as demo: | |
| gr.Markdown("# π MVR Face Swap\nSwap faces in images or videos using AI") | |
| with gr.Tabs(): | |
| # Image Tab | |
| with gr.TabItem("π· Image Swap"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| img_source = gr.Image(label="Your Face (Source)", type="numpy") | |
| img_target = gr.Image(label="Target Image", type="numpy") | |
| img_btn = gr.Button("π Swap Faces", variant="primary") | |
| with gr.Column(): | |
| img_output = gr.Image(label="Result") | |
| img_status = gr.Textbox(label="Status") | |
| img_btn.click( | |
| fn=swap_faces, | |
| inputs=[img_source, img_target], | |
| outputs=[img_output, img_status] | |
| ) | |
| # Video Tab | |
| with gr.TabItem("π¬ Video Swap"): | |
| gr.Markdown("β οΈ **Note:** Videos are limited to ~10 seconds to avoid timeout. For longer videos, use the local Python app.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| vid_source = gr.Image(label="Your Face (Source)", type="numpy") | |
| vid_target = gr.Video(label="Target Video") | |
| vid_btn = gr.Button("π Swap Faces in Video", variant="primary") | |
| with gr.Column(): | |
| vid_output = gr.Video(label="Result Video") | |
| vid_status = gr.Textbox(label="Status") | |
| vid_btn.click( | |
| fn=swap_faces_video, | |
| inputs=[vid_source, vid_target], | |
| outputs=[vid_output, vid_status] | |
| ) | |
| gr.Markdown("---\n*Powered by InsightFace & Gradio*") | |
| if __name__ == "__main__": | |
| demo.launch() | |