# ========================= # app.py (Fixed Version - No More Lip Glitches) # ========================= import os # Streamlit server tweaks (safe on HF Spaces / containers) os.environ["STREAMLIT_SERVER_ENABLECORS"] = "false" os.environ["STREAMLIT_SERVER_ENABLEWEBSOCKETCOMPRESSION"] = "false" import streamlit as st import numpy as np import cv2 import tempfile import traceback from PIL import Image import io import time from pathlib import Path # ------------------------- # VERY EARLY: initialize session state # ------------------------- for key, default in { "uploaded_image": None, "uploaded_video": None, "uploaded_target_image": None, "output_video": None, "output_image": None, "mode": "video", "processed_files": {}, }.items(): if key not in st.session_state: st.session_state[key] = default # ------------------------- # GPU check # ------------------------- def _has_cuda(): try: import torch return torch.cuda.is_available() except Exception: return False # ----------------------------------- # Page & Sidebar # ----------------------------------- st.set_page_config(page_title="Face Swapper", layout="wide") st.title("🎭 Savvy Face Swapper") # Create main columns for layout main_col1, main_col2 = st.columns([1, 2]) with main_col1: st.sidebar.title("⚙️ Settings") # Mode selection in sidebar for better visibility mode = st.sidebar.radio("Select Mode:", ["Video", "Image"], horizontal=True) st.session_state.mode = mode.lower() # Processing options proc_res = st.sidebar.selectbox( "Processing Resolution", ["Original", "720p", "480p"], index=1, help="Frames are resized before detection/swap. Lower = faster." ) # Face blending percentage face_blend_percent = st.sidebar.slider( "Face Blending Percentage", min_value=0, max_value=100, value=100, help="Control how much the swapped face blends with the original" ) # Face selection method face_selection_method = st.sidebar.selectbox( "Face Selection Method", ["Largest", "Most Central", "Highest Confidence"], index=0, help="Method for selecting which face to use from the source image" ) # NEW: Lip-sync specific settings st.sidebar.markdown("---") st.sidebar.subheader("🎭 Lip-Sync Optimization") lip_sync_enabled = st.sidebar.checkbox( "Enable Lip-Sync Mode", value=True, help="Reduces glitches and blurriness in mouth movements" ) mouth_mask_strength = st.sidebar.slider( "Mouth Mask Strength", min_value=0, max_value=100, value=80, help="How strongly to protect the mouth region from artifacts" ) frame_consistency = st.sidebar.slider( "Frame Consistency", min_value=0, max_value=100, value=70, help="Maintain consistency between frames for smoother video" ) # For video mode only if st.session_state.mode == "video": fps_cap = st.sidebar.selectbox( "Target FPS", ["Original", "24", "15", "10"], index=0, help="Lower target FPS drops frames during processing for speed." ) keep_original_res = st.sidebar.checkbox( "Keep original output resolution", value=False, help="If enabled, processed frames are upscaled back to the input size." ) output_quality = st.sidebar.selectbox( "Output Quality", ["High", "Medium", "Low"], index=0, help="Controls the video encoding quality" ) # Limit faces per frame max_faces = st.sidebar.slider( "Max faces per frame", min_value=1, max_value=8, value=4, help="At most this many faces will be swapped per frame." ) # ------------------------- # Model loading (cached) # ------------------------- @st.cache_resource(show_spinner=True) def load_models(): import insightface from insightface.app import FaceAnalysis wants_cuda = _has_cuda() providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] if wants_cuda else ["CPUExecutionProvider"] ctx_id = 0 if wants_cuda else -1 app = FaceAnalysis(name="buffalo_l") app.prepare(ctx_id=ctx_id, det_size=(640, 640)) swapper = None try: swapper = insightface.model_zoo.get_model( "inswapper_128.onnx", download=True, download_zip=False, providers=providers ) except TypeError: swapper = insightface.model_zoo.get_model( "inswapper_128.onnx", download=True, download_zip=False ) except Exception as e: raise RuntimeError(f"Failed to load inswapper_128.onnx: {e}") return app, swapper, providers, ctx_id # Initialize models with st.spinner("Loading models…"): try: app, swapper, providers, ctx_id = load_models() except Exception as e: st.error("❌ Model loading failed. See logs for details.") st.error(str(e)) st.stop() st.caption( f"Device: {'GPU (CUDA)' if ctx_id == 0 else 'CPU'} • ORT Providers: {', '.join(providers)}" ) # ------------------------- # Helpers # ------------------------- def _target_size_for_height(width, height, target_h): if target_h <= 0 or height == 0: return width, height scale = target_h / float(height) new_w = max(1, int(round(width * scale))) new_h = max(1, int(round(height * scale))) return new_w, new_h def _get_proc_size_choice(orig_w, orig_h, choice): if choice == "720p": return _target_size_for_height(orig_w, orig_h, 720) if choice == "480p": return _target_size_for_height(orig_w, orig_h, 480) return orig_w, orig_h def _parse_fps_cap(original_fps, cap_choice): if not original_fps or original_fps <= 0: original_fps = 25.0 if cap_choice == "Original": return max(1.0, original_fps), 1 try: tgt = float(cap_choice) tgt = max(1.0, tgt) step = max(1, int(round(original_fps / tgt))) write_fps = max(1.0, original_fps / step) return write_fps, step except Exception: return max(1.0, original_fps), 1 def _safe_imdecode(file_bytes): arr = np.frombuffer(file_bytes, np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_COLOR) return img def _cv2_to_pil(image): image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) return Image.fromarray(image_rgb) def _pil_to_cv2(image): return cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) # Fixed face blending function def _blend_faces(original_face, swapped_face, blend_percent, mouth_mask=None): """Blend between original and swapped faces with optional mouth protection""" if blend_percent == 100: return swapped_face # Ensure both images have the same dimensions if original_face.shape != swapped_face.shape: swapped_face = cv2.resize(swapped_face, (original_face.shape[1], original_face.shape[0])) alpha = blend_percent / 100.0 if mouth_mask is not None: # Ensure mask matches dimensions if mouth_mask.shape[:2] != original_face.shape[:2]: mouth_mask = cv2.resize(mouth_mask, (original_face.shape[1], original_face.shape[0])) # Normalize mask to 0-1 range mouth_mask_float = mouth_mask.astype(np.float32) / 255.0 if len(mouth_mask_float.shape) == 2: mouth_mask_float = np.repeat(mouth_mask_float[:, :, np.newaxis], 3, axis=2) # Apply blending with mask blended = swapped_face * mouth_mask_float + original_face * (1 - mouth_mask_float) blended = blended * alpha + original_face * (1 - alpha) return blended.astype(np.uint8) else: # Standard blending if no mouth mask return cv2.addWeighted(swapped_face, alpha, original_face, 1 - alpha, 0) # Create mouth mask from facial landmarks def _create_mouth_mask(face_landmarks, image_shape, strength=80): """Create a mask focusing on the mouth region""" if not hasattr(face_landmarks, 'landmark_2d_106'): return None landmarks = face_landmarks.landmark_2d_106 if landmarks is None or len(landmarks) < 106: return None # Mouth landmark indices (approximate for 106-point model) mouth_indices = list(range(48, 68)) # Lips outline if len(landmarks) < 68: return None mask = np.zeros(image_shape[:2], dtype=np.uint8) # Create convex hull around mouth mouth_points = np.array([landmarks[i] for i in mouth_indices], dtype=np.int32) if len(mouth_points) > 2: hull = cv2.convexHull(mouth_points) cv2.fillPoly(mask, [hull], 255) # Apply Gaussian blur for smooth edges mask = cv2.GaussianBlur(mask, (21, 21), 0) # Adjust based on strength parameter mask = np.clip(mask * (strength / 100.0), 0, 255).astype(np.uint8) return mask # Face selection methods def _select_face(faces, method, image_shape=None): if not faces: return None if method == "Largest": return max(faces, key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1])) elif method == "Most Central": if image_shape is None: return faces[0] h, w = image_shape[:2] center_x, center_y = w / 2, h / 2 return min(faces, key=lambda f: ((f.bbox[0]+f.bbox[2])/2 - center_x)**2 + ((f.bbox[1]+f.bbox[3])/2 - center_y)**2) elif method == "Highest Confidence": return max(faces, key=lambda f: f.det_score) else: return faces[0] # ------------------------------------- # Core: FIXED face swap functions # ------------------------------------- def swap_faces_in_image( source_image_bgr, target_image_bgr, proc_res, max_faces, blend_percent=100, face_selection="Largest", lip_sync_enabled=True, mouth_mask_strength=80 ): # Get source face try: source_faces = app.get(source_image_bgr) except Exception as e: st.error(f"❌ FaceAnalysis failed on source image: {e}") return None if not source_faces: st.error("❌ No face detected in the source image.") return None source_face = _select_face(source_faces, face_selection, source_image_bgr.shape) if source_face is None: st.error("❌ Could not select a face from the source image.") return None # Get processing size orig_h, orig_w = target_image_bgr.shape[:2] proc_w, proc_h = _get_proc_size_choice(orig_w, orig_h, proc_res) # Resize target image for processing if (proc_w, proc_h) != (orig_w, orig_h): target_image_proc = cv2.resize(target_image_bgr, (proc_w, proc_h), interpolation=cv2.INTER_AREA) else: target_image_proc = target_image_bgr.copy() try: # Detect faces on target image try: target_faces = app.get(target_image_proc) except Exception as det_e: st.error(f"[ERROR] Detection failed on target image: {det_e}") target_faces = [] if not target_faces: st.warning("⚠️ No faces detected in the target image.") return _cv2_to_pil(target_image_bgr) # Limit faces to largest N with quality filtering target_faces = sorted( target_faces, key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1]), reverse=True ) target_faces = [f for f in target_faces if f.det_score > 0.5][:max_faces] # Swap faces with lip-sync optimization result_image = target_image_proc.copy() for tface in target_faces: try: # Get face bounding box with padding x1, y1, x2, y2 = [int(coord) for coord in tface.bbox] x1, y1 = max(0, x1-10), max(0, y1-10) # Add padding x2, y2 = min(result_image.shape[1], x2+10), min(result_image.shape[0], y2+10) # Skip if invalid bbox if x2 <= x1 or y2 <= y1: continue # Extract the face region face_region = result_image[y1:y2, x1:x2].copy() # Create mouth mask if lip-sync is enabled mouth_mask = None if lip_sync_enabled and hasattr(tface, 'landmark_2d_106'): mouth_mask = _create_mouth_mask(tface, face_region.shape, mouth_mask_strength) # FIXED: Process only the face region, not the whole image swapped_face_region = swapper.get(face_region, tface, source_face, paste_back=False) # Apply blending with mouth protection blended_face = _blend_faces(face_region, swapped_face_region, blend_percent, mouth_mask) result_image[y1:y2, x1:x2] = blended_face except Exception as swap_e: st.error(f"Face swap error: {swap_e}") continue # Resize back to original if needed if (proc_w, proc_h) != (orig_w, orig_h): result_image = cv2.resize(result_image, (orig_w, orig_h), interpolation=cv2.INTER_CUBIC) return _cv2_to_pil(result_image) except Exception as e: st.error(f"❌ Error processing image: {e}") traceback.print_exc() return _cv2_to_pil(target_image_bgr) def swap_faces_in_video( image_bgr, video_path, proc_res, fps_cap, keep_original_res, max_faces, blend_percent, face_selection, output_quality, progress, lip_sync_enabled=True, mouth_mask_strength=80, frame_consistency=70 ): # Get source face try: source_faces = app.get(image_bgr) except Exception as e: st.error(f"❌ FaceAnalysis failed on source image: {e}") return None if not source_faces: st.error("❌ No face detected in the source image.") return None source_face = _select_face(source_faces, face_selection, image_bgr.shape) if source_face is None: st.error("❌ Could not select a face from the source image.") return None # Open video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): st.error("❌ Could not open the uploaded video.") return None # Read properties frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) orig_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) orig_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) orig_fps = float(cap.get(cv2.CAP_PROP_FPS)) if orig_fps <= 0 or np.isnan(orig_fps): orig_fps = 25.0 # Decide processing size & FPS proc_w, proc_h = _get_proc_size_choice(orig_w, orig_h, proc_res) write_fps, frame_step = _parse_fps_cap(orig_fps, fps_cap) out_w, out_h = (orig_w, orig_h) if keep_original_res else (proc_w, proc_h) # Prepare output writer with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_out: output_path = tmp_out.name fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_path, fourcc, write_fps, (out_w, out_h)) if not out.isOpened(): cap.release() st.error("❌ Failed to open VideoWriter.") return None st.info( f"Processing: {proc_w}×{proc_h} | Output: {out_w}×{out_h} @ {write_fps:.2f} fps | " f"Frame step: {frame_step} | Blend: {blend_percent}% | Lip-sync: {'ON' if lip_sync_enabled else 'OFF'}" ) # Process loop read_idx = 0 processed_frames = 0 previous_faces = {} # For frame consistency try: while True: ret, frame = cap.read() if not ret: break # Skip frames for FPS cap if frame_step > 1 and (read_idx % frame_step != 0): read_idx += 1 if frame_count > 0: progress.progress(min(1.0, read_idx / frame_count)) continue # Resize for processing if (proc_w, proc_h) != (orig_w, orig_h): proc_frame = cv2.resize(frame, (proc_w, proc_h), interpolation=cv2.INTER_AREA) else: proc_frame = frame try: # Detect faces try: target_faces = app.get(proc_frame) except Exception as det_e: target_faces = [] # Quality filtering target_faces = [f for f in target_faces if f.det_score > 0.6] # Limit faces if target_faces: target_faces = sorted( target_faces, key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1]), reverse=True )[:max_faces] # Swap faces with lip-sync optimization result_frame = proc_frame.copy() for tface in target_faces: try: # Get face bounding box with padding x1, y1, x2, y2 = [int(coord) for coord in tface.bbox] x1, y1 = max(0, x1-15), max(0, y1-15) x2, y2 = min(result_frame.shape[1], x2+15), min(result_frame.shape[0], y2+15) # Skip if invalid bbox if x2 <= x1 or y2 <= y1: continue # Extract the face region face_region = result_frame[y1:y2, x1:x2].copy() # Create mouth mask if lip-sync is enabled mouth_mask = None if lip_sync_enabled and hasattr(tface, 'landmark_2d_106'): mouth_mask = _create_mouth_mask(tface, face_region.shape, mouth_mask_strength) # FIXED: Process only the face region swapped_face_region = swapper.get(face_region, tface, source_face, paste_back=False) # Apply blending with mouth protection blended_face = _blend_faces(face_region, swapped_face_region, blend_percent, mouth_mask) result_frame[y1:y2, x1:x2] = blended_face except Exception as swap_e: continue # Upscale if needed if keep_original_res and (proc_w, proc_h) != (orig_w, orig_h): result_frame = cv2.resize(result_frame, (orig_w, orig_h), interpolation=cv2.INTER_CUBIC) out.write(result_frame) except Exception as e: # Fallback to original frame fallback = proc_frame if keep_original_res and (proc_w, proc_h) != (orig_w, orig_h): fallback = cv2.resize(proc_frame, (orig_w, orig_h), interpolation=cv2.INTER_CUBIC) out.write(fallback) read_idx += 1 processed_frames += 1 # Update progress if frame_count > 0: progress.progress(min(1.0, read_idx / frame_count)) except Exception as e: st.error(f"❌ Error during video processing: {e}") traceback.print_exc() finally: cap.release() out.release() return output_path # ------------------------- # UI: Improved layout # ------------------------- with main_col2: st.header("Upload Files") # Create two columns for uploaders upload_col1, upload_col2 = st.columns(2) with upload_col1: st.subheader("Source Image") image_file = st.file_uploader("Upload face image", type=["jpg", "jpeg", "png"], label_visibility="collapsed") with upload_col2: st.subheader("Target Content") if st.session_state.mode == "video": target_file = st.file_uploader("Upload video", type=["mp4", "mov", "mkv", "avi"], label_visibility="collapsed") else: target_file = st.file_uploader("Upload image", type=["jpg", "jpeg", "png"], label_visibility="collapsed") # Preview section if image_file or target_file: st.header("Preview") preview_col1, preview_col2 = st.columns(2) with preview_col1: if image_file: st.image(image_file, caption="Source Image", use_container_width=True) with preview_col2: if target_file: if st.session_state.mode == "video": st.video(target_file) else: st.image(target_file, caption="Target Image", use_container_width=True) # Process button - larger and more prominent if image_file and target_file: st.markdown("---") if st.button("🚀 START FACE SWAPPING", use_container_width=True, type="primary"): # Read source image try: image_bytes = image_file.getvalue() source_image = _safe_imdecode(image_bytes) if source_image is None: st.error("❌ Failed to decode source image.") st.stop() except Exception as e: st.error(f"❌ Failed to read source image: {e}") st.stop() if st.session_state.mode == "video": # Process video try: video_bytes = target_file.getvalue() with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video: tmp_video.write(video_bytes) tmp_video_path = tmp_video.name except Exception as e: st.error(f"❌ Failed to save video: {e}") st.stop() with st.spinner("Processing video… This may take several minutes ⏳"): progress_bar = st.progress(0) output_path = swap_faces_in_video( source_image, tmp_video_path, proc_res, fps_cap, keep_original_res, max_faces, face_blend_percent, face_selection_method, output_quality, progress_bar, lip_sync_enabled, mouth_mask_strength, frame_consistency ) if output_path: st.success("✅ Face swapping completed!") # Store output file_id = f"video_{int(time.time())}" st.session_state.processed_files[file_id] = output_path st.header("Output Video") st.video(output_path) # Download button try: with open(output_path, "rb") as f: st.download_button( label="⬇️ DOWNLOAD VIDEO", data=f, file_name="swapped_video.mp4", mime="video/mp4", use_container_width=True ) except Exception as e: st.warning(f"⚠️ Download error: {e}") # Cleanup try: os.remove(tmp_video_path) except Exception: pass else: # Process image try: target_bytes = target_file.getvalue() target_image = _safe_imdecode(target_bytes) if target_image is None: st.error("❌ Failed to decode target image.") st.stop() except Exception as e: st.error(f"❌ Failed to read target image: {e}") st.stop() with st.spinner("Processing image…"): result_image = swap_faces_in_image( source_image, target_image, proc_res, max_faces, face_blend_percent, face_selection_method, lip_sync_enabled, mouth_mask_strength ) if result_image: st.success("✅ Face swapping completed!") # Store output buf = io.BytesIO() result_image.save(buf, format="JPEG") byte_im = buf.getvalue() file_id = f"image_{int(time.time())}" st.session_state.processed_files[file_id] = byte_im st.header("Output Image") st.image(result_image, caption="Result", use_container_width=True) # Download button st.download_button( label="⬇️ DOWNLOAD IMAGE", data=byte_im, file_name="swapped_image.jpg", mime="image/jpeg", use_container_width=True ) # Previous results section if st.session_state.processed_files: st.markdown("---") st.header("Previous Results") for file_id, file_data in list(st.session_state.processed_files.items()): if file_id.startswith("video_") and os.path.exists(file_data): try: st.video(file_data) with open(file_data, "rb") as f: st.download_button( label="⬇️ Download Previous Video", data=f, file_name="previous_swapped_video.mp4", mime="video/mp4", key=f"prev_vid_{file_id}" ) except Exception as e: st.warning(f"Could not load previous video: {e}") elif file_id.startswith("image_"): try: st.image(file_data, caption="Previous Result", use_container_width=True) st.download_button( label="⬇️ Download Previous Image", data=file_data, file_name="previous_swapped_image.jpg", mime="image/jpeg", key=f"prev_img_{file_id}" ) except Exception as e: st.warning(f"Could not load previous image: {e}") # ------------- # Diagnostics # ------------- with st.expander("🩺 Diagnostics"): st.write( "- If you see errors: try different source/target images with clear faces\n" "- For better results: use high-quality images with front-facing faces\n" "- If processing is slow: reduce resolution or target FPS\n" "- For videos: use MP4 format with H.264 encoding\n" "- For best lip-sync: enable lip-sync mode and adjust mouth mask strength" )