SavvySwapper / app.py
savvy7007's picture
Update app.py
f68e959 verified
# =========================
# app.py (Fixed Version - No More Lip Glitches)
# =========================
import os
# Streamlit server tweaks (safe on HF Spaces / containers)
os.environ["STREAMLIT_SERVER_ENABLECORS"] = "false"
os.environ["STREAMLIT_SERVER_ENABLEWEBSOCKETCOMPRESSION"] = "false"
import streamlit as st
import numpy as np
import cv2
import tempfile
import traceback
from PIL import Image
import io
import time
from pathlib import Path
# -------------------------
# VERY EARLY: initialize session state
# -------------------------
for key, default in {
"uploaded_image": None,
"uploaded_video": None,
"uploaded_target_image": None,
"output_video": None,
"output_image": None,
"mode": "video",
"processed_files": {},
}.items():
if key not in st.session_state:
st.session_state[key] = default
# -------------------------
# GPU check
# -------------------------
def _has_cuda():
try:
import torch
return torch.cuda.is_available()
except Exception:
return False
# -----------------------------------
# Page & Sidebar
# -----------------------------------
st.set_page_config(page_title="Face Swapper", layout="wide")
st.title("🎭 Savvy Face Swapper")
# Create main columns for layout
main_col1, main_col2 = st.columns([1, 2])
with main_col1:
st.sidebar.title("⚙️ Settings")
# Mode selection in sidebar for better visibility
mode = st.sidebar.radio("Select Mode:", ["Video", "Image"], horizontal=True)
st.session_state.mode = mode.lower()
# Processing options
proc_res = st.sidebar.selectbox(
"Processing Resolution",
["Original", "720p", "480p"],
index=1,
help="Frames are resized before detection/swap. Lower = faster."
)
# Face blending percentage
face_blend_percent = st.sidebar.slider(
"Face Blending Percentage",
min_value=0,
max_value=100,
value=100,
help="Control how much the swapped face blends with the original"
)
# Face selection method
face_selection_method = st.sidebar.selectbox(
"Face Selection Method",
["Largest", "Most Central", "Highest Confidence"],
index=0,
help="Method for selecting which face to use from the source image"
)
# NEW: Lip-sync specific settings
st.sidebar.markdown("---")
st.sidebar.subheader("🎭 Lip-Sync Optimization")
lip_sync_enabled = st.sidebar.checkbox(
"Enable Lip-Sync Mode",
value=True,
help="Reduces glitches and blurriness in mouth movements"
)
mouth_mask_strength = st.sidebar.slider(
"Mouth Mask Strength",
min_value=0,
max_value=100,
value=80,
help="How strongly to protect the mouth region from artifacts"
)
frame_consistency = st.sidebar.slider(
"Frame Consistency",
min_value=0,
max_value=100,
value=70,
help="Maintain consistency between frames for smoother video"
)
# For video mode only
if st.session_state.mode == "video":
fps_cap = st.sidebar.selectbox(
"Target FPS",
["Original", "24", "15", "10"],
index=0,
help="Lower target FPS drops frames during processing for speed."
)
keep_original_res = st.sidebar.checkbox(
"Keep original output resolution",
value=False,
help="If enabled, processed frames are upscaled back to the input size."
)
output_quality = st.sidebar.selectbox(
"Output Quality",
["High", "Medium", "Low"],
index=0,
help="Controls the video encoding quality"
)
# Limit faces per frame
max_faces = st.sidebar.slider(
"Max faces per frame", min_value=1, max_value=8, value=4,
help="At most this many faces will be swapped per frame."
)
# -------------------------
# Model loading (cached)
# -------------------------
@st.cache_resource(show_spinner=True)
def load_models():
import insightface
from insightface.app import FaceAnalysis
wants_cuda = _has_cuda()
providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] if wants_cuda else ["CPUExecutionProvider"]
ctx_id = 0 if wants_cuda else -1
app = FaceAnalysis(name="buffalo_l")
app.prepare(ctx_id=ctx_id, det_size=(640, 640))
swapper = None
try:
swapper = insightface.model_zoo.get_model(
"inswapper_128.onnx",
download=True,
download_zip=False,
providers=providers
)
except TypeError:
swapper = insightface.model_zoo.get_model(
"inswapper_128.onnx",
download=True,
download_zip=False
)
except Exception as e:
raise RuntimeError(f"Failed to load inswapper_128.onnx: {e}")
return app, swapper, providers, ctx_id
# Initialize models
with st.spinner("Loading models…"):
try:
app, swapper, providers, ctx_id = load_models()
except Exception as e:
st.error("❌ Model loading failed. See logs for details.")
st.error(str(e))
st.stop()
st.caption(
f"Device: {'GPU (CUDA)' if ctx_id == 0 else 'CPU'} • ORT Providers: {', '.join(providers)}"
)
# -------------------------
# Helpers
# -------------------------
def _target_size_for_height(width, height, target_h):
if target_h <= 0 or height == 0:
return width, height
scale = target_h / float(height)
new_w = max(1, int(round(width * scale)))
new_h = max(1, int(round(height * scale)))
return new_w, new_h
def _get_proc_size_choice(orig_w, orig_h, choice):
if choice == "720p":
return _target_size_for_height(orig_w, orig_h, 720)
if choice == "480p":
return _target_size_for_height(orig_w, orig_h, 480)
return orig_w, orig_h
def _parse_fps_cap(original_fps, cap_choice):
if not original_fps or original_fps <= 0:
original_fps = 25.0
if cap_choice == "Original":
return max(1.0, original_fps), 1
try:
tgt = float(cap_choice)
tgt = max(1.0, tgt)
step = max(1, int(round(original_fps / tgt)))
write_fps = max(1.0, original_fps / step)
return write_fps, step
except Exception:
return max(1.0, original_fps), 1
def _safe_imdecode(file_bytes):
arr = np.frombuffer(file_bytes, np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
return img
def _cv2_to_pil(image):
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return Image.fromarray(image_rgb)
def _pil_to_cv2(image):
return cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Fixed face blending function
def _blend_faces(original_face, swapped_face, blend_percent, mouth_mask=None):
"""Blend between original and swapped faces with optional mouth protection"""
if blend_percent == 100:
return swapped_face
# Ensure both images have the same dimensions
if original_face.shape != swapped_face.shape:
swapped_face = cv2.resize(swapped_face, (original_face.shape[1], original_face.shape[0]))
alpha = blend_percent / 100.0
if mouth_mask is not None:
# Ensure mask matches dimensions
if mouth_mask.shape[:2] != original_face.shape[:2]:
mouth_mask = cv2.resize(mouth_mask, (original_face.shape[1], original_face.shape[0]))
# Normalize mask to 0-1 range
mouth_mask_float = mouth_mask.astype(np.float32) / 255.0
if len(mouth_mask_float.shape) == 2:
mouth_mask_float = np.repeat(mouth_mask_float[:, :, np.newaxis], 3, axis=2)
# Apply blending with mask
blended = swapped_face * mouth_mask_float + original_face * (1 - mouth_mask_float)
blended = blended * alpha + original_face * (1 - alpha)
return blended.astype(np.uint8)
else:
# Standard blending if no mouth mask
return cv2.addWeighted(swapped_face, alpha, original_face, 1 - alpha, 0)
# Create mouth mask from facial landmarks
def _create_mouth_mask(face_landmarks, image_shape, strength=80):
"""Create a mask focusing on the mouth region"""
if not hasattr(face_landmarks, 'landmark_2d_106'):
return None
landmarks = face_landmarks.landmark_2d_106
if landmarks is None or len(landmarks) < 106:
return None
# Mouth landmark indices (approximate for 106-point model)
mouth_indices = list(range(48, 68)) # Lips outline
if len(landmarks) < 68:
return None
mask = np.zeros(image_shape[:2], dtype=np.uint8)
# Create convex hull around mouth
mouth_points = np.array([landmarks[i] for i in mouth_indices], dtype=np.int32)
if len(mouth_points) > 2:
hull = cv2.convexHull(mouth_points)
cv2.fillPoly(mask, [hull], 255)
# Apply Gaussian blur for smooth edges
mask = cv2.GaussianBlur(mask, (21, 21), 0)
# Adjust based on strength parameter
mask = np.clip(mask * (strength / 100.0), 0, 255).astype(np.uint8)
return mask
# Face selection methods
def _select_face(faces, method, image_shape=None):
if not faces:
return None
if method == "Largest":
return max(faces, key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1]))
elif method == "Most Central":
if image_shape is None:
return faces[0]
h, w = image_shape[:2]
center_x, center_y = w / 2, h / 2
return min(faces, key=lambda f: ((f.bbox[0]+f.bbox[2])/2 - center_x)**2 +
((f.bbox[1]+f.bbox[3])/2 - center_y)**2)
elif method == "Highest Confidence":
return max(faces, key=lambda f: f.det_score)
else:
return faces[0]
# -------------------------------------
# Core: FIXED face swap functions
# -------------------------------------
def swap_faces_in_image(
source_image_bgr, target_image_bgr, proc_res, max_faces,
blend_percent=100, face_selection="Largest", lip_sync_enabled=True,
mouth_mask_strength=80
):
# Get source face
try:
source_faces = app.get(source_image_bgr)
except Exception as e:
st.error(f"❌ FaceAnalysis failed on source image: {e}")
return None
if not source_faces:
st.error("❌ No face detected in the source image.")
return None
source_face = _select_face(source_faces, face_selection, source_image_bgr.shape)
if source_face is None:
st.error("❌ Could not select a face from the source image.")
return None
# Get processing size
orig_h, orig_w = target_image_bgr.shape[:2]
proc_w, proc_h = _get_proc_size_choice(orig_w, orig_h, proc_res)
# Resize target image for processing
if (proc_w, proc_h) != (orig_w, orig_h):
target_image_proc = cv2.resize(target_image_bgr, (proc_w, proc_h), interpolation=cv2.INTER_AREA)
else:
target_image_proc = target_image_bgr.copy()
try:
# Detect faces on target image
try:
target_faces = app.get(target_image_proc)
except Exception as det_e:
st.error(f"[ERROR] Detection failed on target image: {det_e}")
target_faces = []
if not target_faces:
st.warning("⚠️ No faces detected in the target image.")
return _cv2_to_pil(target_image_bgr)
# Limit faces to largest N with quality filtering
target_faces = sorted(
target_faces,
key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1]),
reverse=True
)
target_faces = [f for f in target_faces if f.det_score > 0.5][:max_faces]
# Swap faces with lip-sync optimization
result_image = target_image_proc.copy()
for tface in target_faces:
try:
# Get face bounding box with padding
x1, y1, x2, y2 = [int(coord) for coord in tface.bbox]
x1, y1 = max(0, x1-10), max(0, y1-10) # Add padding
x2, y2 = min(result_image.shape[1], x2+10), min(result_image.shape[0], y2+10)
# Skip if invalid bbox
if x2 <= x1 or y2 <= y1:
continue
# Extract the face region
face_region = result_image[y1:y2, x1:x2].copy()
# Create mouth mask if lip-sync is enabled
mouth_mask = None
if lip_sync_enabled and hasattr(tface, 'landmark_2d_106'):
mouth_mask = _create_mouth_mask(tface, face_region.shape, mouth_mask_strength)
# FIXED: Process only the face region, not the whole image
swapped_face_region = swapper.get(face_region, tface, source_face, paste_back=False)
# Apply blending with mouth protection
blended_face = _blend_faces(face_region, swapped_face_region, blend_percent, mouth_mask)
result_image[y1:y2, x1:x2] = blended_face
except Exception as swap_e:
st.error(f"Face swap error: {swap_e}")
continue
# Resize back to original if needed
if (proc_w, proc_h) != (orig_w, orig_h):
result_image = cv2.resize(result_image, (orig_w, orig_h), interpolation=cv2.INTER_CUBIC)
return _cv2_to_pil(result_image)
except Exception as e:
st.error(f"❌ Error processing image: {e}")
traceback.print_exc()
return _cv2_to_pil(target_image_bgr)
def swap_faces_in_video(
image_bgr, video_path, proc_res, fps_cap, keep_original_res,
max_faces, blend_percent, face_selection, output_quality, progress,
lip_sync_enabled=True, mouth_mask_strength=80, frame_consistency=70
):
# Get source face
try:
source_faces = app.get(image_bgr)
except Exception as e:
st.error(f"❌ FaceAnalysis failed on source image: {e}")
return None
if not source_faces:
st.error("❌ No face detected in the source image.")
return None
source_face = _select_face(source_faces, face_selection, image_bgr.shape)
if source_face is None:
st.error("❌ Could not select a face from the source image.")
return None
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
st.error("❌ Could not open the uploaded video.")
return None
# Read properties
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
orig_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
orig_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
orig_fps = float(cap.get(cv2.CAP_PROP_FPS))
if orig_fps <= 0 or np.isnan(orig_fps):
orig_fps = 25.0
# Decide processing size & FPS
proc_w, proc_h = _get_proc_size_choice(orig_w, orig_h, proc_res)
write_fps, frame_step = _parse_fps_cap(orig_fps, fps_cap)
out_w, out_h = (orig_w, orig_h) if keep_original_res else (proc_w, proc_h)
# Prepare output writer
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_out:
output_path = tmp_out.name
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, write_fps, (out_w, out_h))
if not out.isOpened():
cap.release()
st.error("❌ Failed to open VideoWriter.")
return None
st.info(
f"Processing: {proc_w}×{proc_h} | Output: {out_w}×{out_h} @ {write_fps:.2f} fps | "
f"Frame step: {frame_step} | Blend: {blend_percent}% | Lip-sync: {'ON' if lip_sync_enabled else 'OFF'}"
)
# Process loop
read_idx = 0
processed_frames = 0
previous_faces = {} # For frame consistency
try:
while True:
ret, frame = cap.read()
if not ret:
break
# Skip frames for FPS cap
if frame_step > 1 and (read_idx % frame_step != 0):
read_idx += 1
if frame_count > 0:
progress.progress(min(1.0, read_idx / frame_count))
continue
# Resize for processing
if (proc_w, proc_h) != (orig_w, orig_h):
proc_frame = cv2.resize(frame, (proc_w, proc_h), interpolation=cv2.INTER_AREA)
else:
proc_frame = frame
try:
# Detect faces
try:
target_faces = app.get(proc_frame)
except Exception as det_e:
target_faces = []
# Quality filtering
target_faces = [f for f in target_faces if f.det_score > 0.6]
# Limit faces
if target_faces:
target_faces = sorted(
target_faces,
key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1]),
reverse=True
)[:max_faces]
# Swap faces with lip-sync optimization
result_frame = proc_frame.copy()
for tface in target_faces:
try:
# Get face bounding box with padding
x1, y1, x2, y2 = [int(coord) for coord in tface.bbox]
x1, y1 = max(0, x1-15), max(0, y1-15)
x2, y2 = min(result_frame.shape[1], x2+15), min(result_frame.shape[0], y2+15)
# Skip if invalid bbox
if x2 <= x1 or y2 <= y1:
continue
# Extract the face region
face_region = result_frame[y1:y2, x1:x2].copy()
# Create mouth mask if lip-sync is enabled
mouth_mask = None
if lip_sync_enabled and hasattr(tface, 'landmark_2d_106'):
mouth_mask = _create_mouth_mask(tface, face_region.shape, mouth_mask_strength)
# FIXED: Process only the face region
swapped_face_region = swapper.get(face_region, tface, source_face, paste_back=False)
# Apply blending with mouth protection
blended_face = _blend_faces(face_region, swapped_face_region, blend_percent, mouth_mask)
result_frame[y1:y2, x1:x2] = blended_face
except Exception as swap_e:
continue
# Upscale if needed
if keep_original_res and (proc_w, proc_h) != (orig_w, orig_h):
result_frame = cv2.resize(result_frame, (orig_w, orig_h), interpolation=cv2.INTER_CUBIC)
out.write(result_frame)
except Exception as e:
# Fallback to original frame
fallback = proc_frame
if keep_original_res and (proc_w, proc_h) != (orig_w, orig_h):
fallback = cv2.resize(proc_frame, (orig_w, orig_h), interpolation=cv2.INTER_CUBIC)
out.write(fallback)
read_idx += 1
processed_frames += 1
# Update progress
if frame_count > 0:
progress.progress(min(1.0, read_idx / frame_count))
except Exception as e:
st.error(f"❌ Error during video processing: {e}")
traceback.print_exc()
finally:
cap.release()
out.release()
return output_path
# -------------------------
# UI: Improved layout
# -------------------------
with main_col2:
st.header("Upload Files")
# Create two columns for uploaders
upload_col1, upload_col2 = st.columns(2)
with upload_col1:
st.subheader("Source Image")
image_file = st.file_uploader("Upload face image", type=["jpg", "jpeg", "png"],
label_visibility="collapsed")
with upload_col2:
st.subheader("Target Content")
if st.session_state.mode == "video":
target_file = st.file_uploader("Upload video", type=["mp4", "mov", "mkv", "avi"],
label_visibility="collapsed")
else:
target_file = st.file_uploader("Upload image", type=["jpg", "jpeg", "png"],
label_visibility="collapsed")
# Preview section
if image_file or target_file:
st.header("Preview")
preview_col1, preview_col2 = st.columns(2)
with preview_col1:
if image_file:
st.image(image_file, caption="Source Image", use_container_width=True)
with preview_col2:
if target_file:
if st.session_state.mode == "video":
st.video(target_file)
else:
st.image(target_file, caption="Target Image", use_container_width=True)
# Process button - larger and more prominent
if image_file and target_file:
st.markdown("---")
if st.button("🚀 START FACE SWAPPING", use_container_width=True, type="primary"):
# Read source image
try:
image_bytes = image_file.getvalue()
source_image = _safe_imdecode(image_bytes)
if source_image is None:
st.error("❌ Failed to decode source image.")
st.stop()
except Exception as e:
st.error(f"❌ Failed to read source image: {e}")
st.stop()
if st.session_state.mode == "video":
# Process video
try:
video_bytes = target_file.getvalue()
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video:
tmp_video.write(video_bytes)
tmp_video_path = tmp_video.name
except Exception as e:
st.error(f"❌ Failed to save video: {e}")
st.stop()
with st.spinner("Processing video… This may take several minutes ⏳"):
progress_bar = st.progress(0)
output_path = swap_faces_in_video(
source_image, tmp_video_path, proc_res, fps_cap,
keep_original_res, max_faces, face_blend_percent,
face_selection_method, output_quality, progress_bar,
lip_sync_enabled, mouth_mask_strength, frame_consistency
)
if output_path:
st.success("✅ Face swapping completed!")
# Store output
file_id = f"video_{int(time.time())}"
st.session_state.processed_files[file_id] = output_path
st.header("Output Video")
st.video(output_path)
# Download button
try:
with open(output_path, "rb") as f:
st.download_button(
label="⬇️ DOWNLOAD VIDEO",
data=f,
file_name="swapped_video.mp4",
mime="video/mp4",
use_container_width=True
)
except Exception as e:
st.warning(f"⚠️ Download error: {e}")
# Cleanup
try:
os.remove(tmp_video_path)
except Exception:
pass
else:
# Process image
try:
target_bytes = target_file.getvalue()
target_image = _safe_imdecode(target_bytes)
if target_image is None:
st.error("❌ Failed to decode target image.")
st.stop()
except Exception as e:
st.error(f"❌ Failed to read target image: {e}")
st.stop()
with st.spinner("Processing image…"):
result_image = swap_faces_in_image(
source_image, target_image, proc_res, max_faces,
face_blend_percent, face_selection_method,
lip_sync_enabled, mouth_mask_strength
)
if result_image:
st.success("✅ Face swapping completed!")
# Store output
buf = io.BytesIO()
result_image.save(buf, format="JPEG")
byte_im = buf.getvalue()
file_id = f"image_{int(time.time())}"
st.session_state.processed_files[file_id] = byte_im
st.header("Output Image")
st.image(result_image, caption="Result", use_container_width=True)
# Download button
st.download_button(
label="⬇️ DOWNLOAD IMAGE",
data=byte_im,
file_name="swapped_image.jpg",
mime="image/jpeg",
use_container_width=True
)
# Previous results section
if st.session_state.processed_files:
st.markdown("---")
st.header("Previous Results")
for file_id, file_data in list(st.session_state.processed_files.items()):
if file_id.startswith("video_") and os.path.exists(file_data):
try:
st.video(file_data)
with open(file_data, "rb") as f:
st.download_button(
label="⬇️ Download Previous Video",
data=f,
file_name="previous_swapped_video.mp4",
mime="video/mp4",
key=f"prev_vid_{file_id}"
)
except Exception as e:
st.warning(f"Could not load previous video: {e}")
elif file_id.startswith("image_"):
try:
st.image(file_data, caption="Previous Result", use_container_width=True)
st.download_button(
label="⬇️ Download Previous Image",
data=file_data,
file_name="previous_swapped_image.jpg",
mime="image/jpeg",
key=f"prev_img_{file_id}"
)
except Exception as e:
st.warning(f"Could not load previous image: {e}")
# -------------
# Diagnostics
# -------------
with st.expander("🩺 Diagnostics"):
st.write(
"- If you see errors: try different source/target images with clear faces\n"
"- For better results: use high-quality images with front-facing faces\n"
"- If processing is slow: reduce resolution or target FPS\n"
"- For videos: use MP4 format with H.264 encoding\n"
"- For best lip-sync: enable lip-sync mode and adjust mouth mask strength"
)