Update app.py
Browse files
app.py
CHANGED
|
@@ -8,7 +8,8 @@ from PIL import Image
|
|
| 8 |
import tempfile
|
| 9 |
import logging
|
| 10 |
import onnxruntime
|
| 11 |
-
from typing import List, Optional, Tuple # For improved type hinting
|
|
|
|
| 12 |
|
| 13 |
# --- Configuration & Setup ---
|
| 14 |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
@@ -40,7 +41,7 @@ logging.info(f"Using Execution Providers: {EXECUTION_PROVIDERS}")
|
|
| 40 |
logging.info(f"InsightFace Context ID for preparation: {INSIGHTFACE_CTX_ID}")
|
| 41 |
|
| 42 |
face_analyzer: Optional[FaceAnalysis] = None
|
| 43 |
-
swapper = None
|
| 44 |
face_restorer: Optional[onnxruntime.InferenceSession] = None
|
| 45 |
|
| 46 |
def initialize_models():
|
|
@@ -58,10 +59,11 @@ def initialize_models():
|
|
| 58 |
else:
|
| 59 |
logging.info(f"Loading swapper model from: {SWAPPER_MODEL_PATH} with providers: {EXECUTION_PROVIDERS}")
|
| 60 |
try:
|
|
|
|
| 61 |
swapper = get_model(SWAPPER_MODEL_PATH, download=False, providers=EXECUTION_PROVIDERS)
|
| 62 |
-
except TypeError:
|
| 63 |
logging.warning(f"Failed to pass 'providers' argument to swapper model {SWAPPER_MODEL_PATH}. Retrying without 'providers' argument. Model will use its default execution provider.")
|
| 64 |
-
swapper = get_model(SWAPPER_MODEL_PATH, download=False)
|
| 65 |
logging.info("Swapper model loaded successfully.")
|
| 66 |
|
| 67 |
if face_restorer is None:
|
|
@@ -82,6 +84,7 @@ initialize_models()
|
|
| 82 |
core_models_loaded_successfully = face_analyzer is not None and swapper is not None
|
| 83 |
restoration_model_loaded_successfully = face_restorer is not None
|
| 84 |
|
|
|
|
| 85 |
def convert_pil_to_cv2(pil_image: Optional[Image.Image]) -> Optional[np.ndarray]:
|
| 86 |
if pil_image is None: return None
|
| 87 |
try:
|
|
@@ -98,7 +101,7 @@ def convert_cv2_to_pil(cv2_image: Optional[np.ndarray]) -> Optional[Image.Image]
|
|
| 98 |
logging.error(f"Error converting CV2 to PIL: {e}")
|
| 99 |
return None
|
| 100 |
|
| 101 |
-
def get_faces_from_image(img_np: np.ndarray) -> List:
|
| 102 |
if not core_models_loaded_successfully or face_analyzer is None:
|
| 103 |
logging.error("Face analyzer not available for get_faces_from_image. Core models might have failed to load.")
|
| 104 |
return []
|
|
@@ -112,7 +115,7 @@ def get_faces_from_image(img_np: np.ndarray) -> List:
|
|
| 112 |
logging.error(f"Error during face detection: {e}", exc_info=True)
|
| 113 |
return []
|
| 114 |
|
| 115 |
-
def draw_detected_faces(img_np: np.ndarray, faces: List) -> np.ndarray:
|
| 116 |
img_with_boxes = img_np.copy()
|
| 117 |
for i, face in enumerate(faces):
|
| 118 |
box = face.bbox.astype(int)
|
|
@@ -120,9 +123,10 @@ def draw_detected_faces(img_np: np.ndarray, faces: List) -> np.ndarray:
|
|
| 120 |
cv2.rectangle(img_with_boxes, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
| 121 |
label_position = (x1, max(0, y1 - 10))
|
| 122 |
cv2.putText(img_with_boxes, f"Face {i}", label_position,
|
| 123 |
-
|
| 124 |
return img_with_boxes
|
| 125 |
|
|
|
|
| 126 |
def enhance_cropped_face(face_crop_bgr: np.ndarray) -> np.ndarray:
|
| 127 |
if not restoration_model_loaded_successfully or face_restorer is None:
|
| 128 |
logging.warning("Face restorer model not available. Skipping enhancement for crop.")
|
|
@@ -131,7 +135,7 @@ def enhance_cropped_face(face_crop_bgr: np.ndarray) -> np.ndarray:
|
|
| 131 |
logging.warning("Received empty or invalid face crop for enhancement.")
|
| 132 |
return face_crop_bgr if face_crop_bgr is not None else np.array([])
|
| 133 |
|
| 134 |
-
logging.
|
| 135 |
original_crop_height, original_crop_width = face_crop_bgr.shape[:2]
|
| 136 |
|
| 137 |
try:
|
|
@@ -139,16 +143,16 @@ def enhance_cropped_face(face_crop_bgr: np.ndarray) -> np.ndarray:
|
|
| 139 |
restorer_input_node = face_restorer.get_inputs()[0]
|
| 140 |
try:
|
| 141 |
shape = restorer_input_node.shape
|
| 142 |
-
if isinstance(shape[2], int) and isinstance(shape[3], int):
|
| 143 |
-
|
| 144 |
-
|
| 145 |
-
else:
|
| 146 |
restorer_input_size_hw = (512, 512)
|
| 147 |
-
logging.warning(f"Could not determine restorer input size from model, defaulting to {restorer_input_size_hw}.")
|
| 148 |
-
except:
|
| 149 |
restorer_input_size_hw = (512, 512)
|
| 150 |
-
logging.warning(f"Error determining restorer input size, defaulting to {restorer_input_size_hw}.")
|
| 151 |
-
|
|
|
|
| 152 |
img_resized_for_model = cv2.resize(img_rgb, (restorer_input_size_hw[1], restorer_input_size_hw[0]), interpolation=cv2.INTER_AREA)
|
| 153 |
img_normalized = (img_resized_for_model / 255.0).astype(np.float32)
|
| 154 |
img_chw = np.transpose(img_normalized, (2, 0, 1))
|
|
@@ -161,7 +165,7 @@ def enhance_cropped_face(face_crop_bgr: np.ndarray) -> np.ndarray:
|
|
| 161 |
restored_img_uint8_model_size = np.clip(restored_img_hwc_model_size * 255.0, 0, 255).astype(np.uint8)
|
| 162 |
restored_crop_rgb = cv2.resize(restored_img_uint8_model_size, (original_crop_width, original_crop_height), interpolation=cv2.INTER_LANCZOS4)
|
| 163 |
restored_crop_bgr = cv2.cvtColor(restored_crop_rgb, cv2.COLOR_RGB2BGR)
|
| 164 |
-
logging.
|
| 165 |
return restored_crop_bgr
|
| 166 |
except Exception as e:
|
| 167 |
logging.error(f"Error during face restoration for crop: {e}", exc_info=True)
|
|
@@ -171,129 +175,137 @@ def histogram_match_channel(source_channel: np.ndarray, target_channel: np.ndarr
|
|
| 171 |
source_shape = source_channel.shape
|
| 172 |
source_channel_flat = source_channel.flatten()
|
| 173 |
target_channel_flat = target_channel.flatten()
|
|
|
|
| 174 |
source_hist, bins = np.histogram(source_channel_flat, 256, [0, 256])
|
| 175 |
source_cdf = source_hist.cumsum()
|
| 176 |
if source_cdf[-1] == 0:
|
| 177 |
-
logging.warning("Source channel for histogram matching is effectively empty
|
| 178 |
return source_channel
|
|
|
|
| 179 |
target_hist, bins = np.histogram(target_channel_flat, 256, [0, 256])
|
| 180 |
target_cdf = target_hist.cumsum()
|
| 181 |
if target_cdf[-1] == 0:
|
| 182 |
logging.warning("Target channel for histogram matching is effectively empty. No matching possible. Returning original source channel.")
|
| 183 |
return source_channel
|
|
|
|
|
|
|
| 184 |
source_cdf_normalized = (source_cdf * 255 / source_cdf[-1]).astype(np.uint8)
|
| 185 |
target_cdf_normalized = (target_cdf * 255 / target_cdf[-1]).astype(np.uint8)
|
|
|
|
| 186 |
lookup_table = np.zeros(256, dtype=np.uint8)
|
| 187 |
gj = 0
|
| 188 |
for gi in range(256):
|
| 189 |
while gj < 255 and target_cdf_normalized[gj] < source_cdf_normalized[gi]:
|
| 190 |
gj += 1
|
| 191 |
lookup_table[gi] = gj
|
| 192 |
-
|
| 193 |
-
|
|
|
|
|
|
|
| 194 |
|
| 195 |
def histogram_match_color(source_img_bgr: np.ndarray, target_img_bgr: np.ndarray) -> np.ndarray:
|
| 196 |
if source_img_bgr is None or target_img_bgr is None or source_img_bgr.size == 0 or target_img_bgr.size == 0:
|
| 197 |
logging.warning("Cannot perform histogram matching on empty or None images.")
|
| 198 |
return source_img_bgr if source_img_bgr is not None else np.array([])
|
|
|
|
| 199 |
matched_img = np.zeros_like(source_img_bgr)
|
| 200 |
try:
|
| 201 |
-
for i in range(source_img_bgr.shape[2]):
|
| 202 |
matched_img[:, :, i] = histogram_match_channel(source_img_bgr[:, :, i], target_img_bgr[:, :, i])
|
| 203 |
return matched_img
|
| 204 |
except Exception as e:
|
| 205 |
logging.error(f"Error during color histogram matching: {e}", exc_info=True)
|
| 206 |
return source_img_bgr
|
| 207 |
|
|
|
|
| 208 |
def process_face_swap(source_pil_img: Optional[Image.Image], target_pil_img: Optional[Image.Image],
|
| 209 |
target_face_index: int, apply_enhancement: bool, apply_color_correction: bool,
|
| 210 |
progress=gr.Progress(track_tqdm=True)):
|
| 211 |
-
progress(0, desc="Initializing
|
| 212 |
-
if not core_models_loaded_successfully:
|
| 213 |
-
gr.Error("CRITICAL: Core models (Face Analyzer or Swapper) not loaded. Cannot proceed.")
|
| 214 |
return Image.new('RGB', (100, 100), color='lightgrey'), None, "Core models failed to load."
|
| 215 |
-
if source_pil_img is None: raise gr.Error("Source image not provided.
|
| 216 |
-
if target_pil_img is None: raise gr.Error("Target image not provided.
|
| 217 |
|
| 218 |
-
progress(0.05, desc="Converting images
|
| 219 |
source_np = convert_pil_to_cv2(source_pil_img)
|
| 220 |
target_np = convert_pil_to_cv2(target_pil_img)
|
| 221 |
if source_np is None or target_np is None:
|
| 222 |
-
raise gr.Error("Image conversion failed.
|
| 223 |
target_h, target_w = target_np.shape[:2]
|
| 224 |
|
| 225 |
-
progress(0.15, desc="Detecting
|
| 226 |
source_faces = get_faces_from_image(source_np)
|
| 227 |
if not source_faces:
|
| 228 |
-
raise gr.Error("No face found in
|
| 229 |
source_face = source_faces[0]
|
| 230 |
|
| 231 |
-
progress(0.25, desc="Detecting
|
| 232 |
target_faces = get_faces_from_image(target_np)
|
| 233 |
if not target_faces:
|
| 234 |
-
raise gr.Error("No faces found in
|
| 235 |
if not (0 <= target_face_index < len(target_faces)):
|
| 236 |
-
|
| 237 |
-
|
| 238 |
-
raise gr.Error(err_msg)
|
| 239 |
target_face_to_swap_info = target_faces[int(target_face_index)]
|
| 240 |
swapped_bgr_img = target_np.copy()
|
|
|
|
| 241 |
try:
|
| 242 |
progress(0.4, desc="Performing face swap...")
|
|
|
|
|
|
|
| 243 |
swapped_bgr_img = swapper.get(swapped_bgr_img, target_face_to_swap_info, source_face, paste_back=True)
|
| 244 |
bbox_coords = target_face_to_swap_info.bbox.astype(int)
|
| 245 |
-
|
| 246 |
if apply_enhancement:
|
| 247 |
if restoration_model_loaded_successfully and face_restorer is not None:
|
| 248 |
-
progress(0.6, desc="Applying
|
| 249 |
-
padding_enh = 20
|
| 250 |
enh_x1 = max(0, bbox_coords[0] - padding_enh)
|
| 251 |
enh_y1 = max(0, bbox_coords[1] - padding_enh)
|
| 252 |
enh_x2 = min(target_w, bbox_coords[2] + padding_enh)
|
| 253 |
enh_y2 = min(target_h, bbox_coords[3] + padding_enh)
|
| 254 |
if enh_x1 < enh_x2 and enh_y1 < enh_y2:
|
| 255 |
face_crop_to_enhance = swapped_bgr_img[enh_y1:enh_y2, enh_x1:enh_x2]
|
| 256 |
-
enhanced_crop = enhance_cropped_face(face_crop_to_enhance.copy())
|
| 257 |
if enhanced_crop.shape == face_crop_to_enhance.shape:
|
| 258 |
swapped_bgr_img[enh_y1:enh_y2, enh_x1:enh_x2] = enhanced_crop
|
| 259 |
-
else:
|
| 260 |
-
|
| 261 |
-
else:
|
| 262 |
-
logging.warning("Skipping enhancement due to invalid crop dimensions after padding.")
|
| 263 |
else:
|
| 264 |
-
logging.warning("Enhancement requested but
|
| 265 |
-
gr.Info("Face restoration model not available, enhancement
|
| 266 |
|
| 267 |
if apply_color_correction:
|
| 268 |
-
progress(0.75, desc="Applying color correction
|
| 269 |
cc_x1, cc_y1, cc_x2, cc_y2 = bbox_coords[0], bbox_coords[1], bbox_coords[2], bbox_coords[3]
|
| 270 |
if cc_x1 < cc_x2 and cc_y1 < cc_y2:
|
| 271 |
original_target_face_region = target_np[cc_y1:cc_y2, cc_x1:cc_x2]
|
| 272 |
swapped_face_region_to_correct = swapped_bgr_img[cc_y1:cc_y2, cc_x1:cc_x2]
|
| 273 |
if original_target_face_region.size > 0 and swapped_face_region_to_correct.size > 0:
|
| 274 |
corrected_swapped_region = histogram_match_color(
|
| 275 |
-
swapped_face_region_to_correct.copy(),
|
| 276 |
-
original_target_face_region.copy()
|
| 277 |
)
|
| 278 |
if corrected_swapped_region.shape == swapped_face_region_to_correct.shape:
|
| 279 |
-
|
| 280 |
-
else:
|
| 281 |
-
|
| 282 |
-
|
| 283 |
-
|
| 284 |
-
else:
|
| 285 |
-
logging.warning("Skipping color correction due to invalid bounding box for region extraction.")
|
| 286 |
except Exception as e:
|
| 287 |
-
logging.error(f"Error during face swapping or post-processing: {e}", exc_info=True)
|
|
|
|
| 288 |
swapped_pil_img_on_error = convert_cv2_to_pil(swapped_bgr_img)
|
| 289 |
-
if swapped_pil_img_on_error is None:
|
| 290 |
-
|
| 291 |
-
raise gr.Error(f"An error occurred during processing: {str(e)}.
|
| 292 |
|
| 293 |
-
progress(0.9, desc="Finalizing image
|
| 294 |
swapped_pil_img = convert_cv2_to_pil(swapped_bgr_img)
|
| 295 |
if swapped_pil_img is None:
|
| 296 |
-
gr.Error("Failed to convert final image
|
| 297 |
return Image.new('RGB', (target_w, target_h), color='darkred'), None, "Failed to convert final image."
|
| 298 |
|
| 299 |
temp_file_path = None
|
|
@@ -301,57 +313,316 @@ def process_face_swap(source_pil_img: Optional[Image.Image], target_pil_img: Opt
|
|
| 301 |
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False, mode="wb") as tmp_file:
|
| 302 |
swapped_pil_img.save(tmp_file, format="JPEG", quality=95)
|
| 303 |
temp_file_path = tmp_file.name
|
| 304 |
-
logging.info(f"Swapped image temporarily saved to: {temp_file_path} for download.")
|
| 305 |
except Exception as e:
|
| 306 |
logging.error(f"Error saving swapped image to temporary file: {e}", exc_info=True)
|
| 307 |
-
gr.Warning("Could not save
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 308 |
|
| 309 |
-
|
| 310 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 311 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 312 |
def preview_target_faces(target_pil_img: Optional[Image.Image]):
|
| 313 |
blank_preview_placeholder = Image.new('RGB', (DETECTION_SIZE[0], DETECTION_SIZE[1]), color='lightgray')
|
| 314 |
reset_slider = gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False, label="π― Select Target Face")
|
| 315 |
-
|
|
|
|
|
|
|
| 316 |
gr.Warning("Face detection models not loaded. Preview cannot be generated.")
|
| 317 |
-
return blank_preview_placeholder, reset_slider
|
| 318 |
if target_pil_img is None:
|
| 319 |
-
return blank_preview_placeholder, reset_slider
|
|
|
|
| 320 |
target_np = convert_pil_to_cv2(target_pil_img)
|
| 321 |
if target_np is None:
|
| 322 |
-
gr.Warning("Could not process target image for preview.
|
| 323 |
-
return blank_preview_placeholder, reset_slider
|
|
|
|
| 324 |
faces = get_faces_from_image(target_np)
|
| 325 |
if not faces:
|
| 326 |
-
|
|
|
|
| 327 |
h, w = target_np.shape[:2]
|
|
|
|
| 328 |
if h > max_h or w > max_w:
|
| 329 |
scale = min(max_h/h, max_w/w)
|
| 330 |
-
|
| 331 |
-
|
| 332 |
-
|
| 333 |
-
preview_pil_img = convert_cv2_to_pil(target_np)
|
| 334 |
if preview_pil_img is None: preview_pil_img = blank_preview_placeholder
|
| 335 |
gr.Info("No faces were detected in the target image.")
|
| 336 |
-
return preview_pil_img, reset_slider
|
| 337 |
-
|
|
|
|
| 338 |
preview_pil_img_with_boxes = convert_cv2_to_pil(preview_np_with_boxes)
|
| 339 |
if preview_pil_img_with_boxes is None:
|
| 340 |
preview_pil_img_with_boxes = blank_preview_placeholder
|
| 341 |
gr.Warning("Error generating preview image with face boxes.")
|
|
|
|
|
|
|
| 342 |
num_faces = len(faces)
|
| 343 |
slider_update = gr.Slider(minimum=0, maximum=max(0, num_faces - 1), value=0, step=1,
|
| 344 |
label=f"π― Select Target Face (0 to {num_faces-1})", interactive=(num_faces > 0))
|
| 345 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 346 |
|
| 347 |
# --- Gradio UI Definition ---
|
| 348 |
-
with gr.Blocks(title="Ultimate Face Swap AI π v3.
|
| 349 |
gr.Markdown(
|
| 350 |
"""
|
| 351 |
<div style="text-align: center; border-bottom: 1px solid #eee; padding-bottom:10px;">
|
| 352 |
-
<h1>π Ultimate Face Swap AI <span style="font-size:0.8em; color: #555;">v3.
|
| 353 |
-
<p>
|
| 354 |
-
<p style="font-size:0.9em;">Optionally enhance with <strong>face restoration</strong> and <strong>color correction</strong
|
| 355 |
</div>
|
| 356 |
"""
|
| 357 |
)
|
|
@@ -360,137 +631,181 @@ with gr.Blocks(title="Ultimate Face Swap AI π v3.2 Lite", theme=gr.themes.Sof
|
|
| 360 |
gr.Error(
|
| 361 |
"π΄ CRITICAL ERROR: Core models (Face Analyzer or Swapper) failed to load. "
|
| 362 |
"The application UI will load but WILL NOT FUNCTION correctly. "
|
| 363 |
-
"Please check
|
| 364 |
-
"and restart the application after fixing the underlying problem."
|
| 365 |
)
|
| 366 |
|
| 367 |
-
with gr.
|
| 368 |
-
|
| 369 |
-
|
| 370 |
-
|
| 371 |
-
|
| 372 |
-
|
| 373 |
-
|
| 374 |
-
|
| 375 |
-
with gr.Row(equal_height=False):
|
| 376 |
-
with gr.Column(scale=2):
|
| 377 |
-
target_faces_preview_output = gr.Image(label="π Target Image Preview & Detected Faces", interactive=False, height=380, show_download_button=False)
|
| 378 |
-
with gr.Column(scale=1, min_width=250):
|
| 379 |
-
preview_button = gr.Button("π Preview & Select Target Face", variant="secondary")
|
| 380 |
-
face_index_slider = gr.Slider(
|
| 381 |
-
label="π― Select Target Face (0-indexed)",
|
| 382 |
-
minimum=0, maximum=0, step=1, value=0, interactive=False
|
| 383 |
-
)
|
| 384 |
-
gr.Markdown("<p style='font-size:0.8em; color:#555;'>After uploading target, click 'Preview'. Then use slider to select face.</p>")
|
| 385 |
|
| 386 |
-
|
| 387 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 388 |
|
| 389 |
-
|
| 390 |
-
|
| 391 |
-
|
| 392 |
-
|
| 393 |
-
|
| 394 |
-
|
| 395 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 396 |
)
|
| 397 |
-
|
| 398 |
-
|
| 399 |
-
|
| 400 |
-
|
| 401 |
-
|
| 402 |
-
|
| 403 |
-
|
| 404 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 405 |
)
|
| 406 |
-
gr.Markdown("<p style='font-size:0.8em; color:#555;'>Matches color tone of swapped face to the target scene.</p>")
|
| 407 |
-
|
| 408 |
-
|
| 409 |
-
gr.HTML("<hr style='margin-top: 15px; margin-bottom: 15px;'>")
|
| 410 |
-
|
| 411 |
-
with gr.Row():
|
| 412 |
-
swap_button = gr.Button("π GENERATE SWAP!", variant="primary", scale=3, interactive=core_models_loaded_successfully)
|
| 413 |
-
clear_button = gr.Button("π§Ή Clear All", variant="stop", scale=1)
|
| 414 |
-
|
| 415 |
-
with gr.Row():
|
| 416 |
-
with gr.Column(scale=2):
|
| 417 |
-
swapped_image_output = gr.Image(label="β¨ Swapped Result", interactive=False, height=480, show_download_button=False)
|
| 418 |
-
with gr.Column(scale=1):
|
| 419 |
-
download_output_file = gr.File(label="β¬οΈ Download Swapped Image", interactive=True)
|
| 420 |
-
status_message_output = gr.Markdown(value="<p style='color: #555; font-style: italic;'>Status messages will appear here.</p>")
|
| 421 |
-
|
| 422 |
-
def on_target_image_upload_or_clear(target_img_pil: Optional[Image.Image]):
|
| 423 |
-
blank_preview = Image.new('RGB', (DETECTION_SIZE[0], DETECTION_SIZE[1]), color='whitesmoke')
|
| 424 |
-
reset_slider = gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False, label="π― Select Target Face")
|
| 425 |
-
status_update = "Target image changed. Click 'Preview & Select Target Face' to proceed." if target_img_pil else "Target image cleared."
|
| 426 |
-
return blank_preview, reset_slider, status_update
|
| 427 |
-
|
| 428 |
-
target_image_input.change(
|
| 429 |
-
fn=on_target_image_upload_or_clear,
|
| 430 |
-
inputs=[target_image_input],
|
| 431 |
-
outputs=[target_faces_preview_output, face_index_slider, status_message_output],
|
| 432 |
-
queue=False
|
| 433 |
-
)
|
| 434 |
-
|
| 435 |
-
preview_button.click(
|
| 436 |
-
fn=preview_target_faces,
|
| 437 |
-
inputs=[target_image_input],
|
| 438 |
-
outputs=[target_faces_preview_output, face_index_slider],
|
| 439 |
-
show_progress="full"
|
| 440 |
-
).then(lambda: "Target preview updated. Select a face if multiple are shown.", outputs=[status_message_output])
|
| 441 |
-
|
| 442 |
-
swap_button.click(
|
| 443 |
-
fn=process_face_swap,
|
| 444 |
-
inputs=[source_image_input, target_image_input, face_index_slider, enhance_checkbox, color_correction_checkbox],
|
| 445 |
-
outputs=[swapped_image_output, download_output_file, status_message_output]
|
| 446 |
-
)
|
| 447 |
|
| 448 |
-
|
| 449 |
-
|
| 450 |
-
|
| 451 |
-
|
| 452 |
-
|
| 453 |
-
|
| 454 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 455 |
|
| 456 |
-
clear_button.click(
|
| 457 |
-
fn=clear_all_inputs_outputs,
|
| 458 |
-
inputs=None,
|
| 459 |
-
outputs=[
|
| 460 |
-
source_image_input, target_image_input,
|
| 461 |
-
target_faces_preview_output, face_index_slider,
|
| 462 |
-
swapped_image_output, download_output_file,
|
| 463 |
-
status_message_output
|
| 464 |
-
],
|
| 465 |
-
queue=False
|
| 466 |
-
)
|
| 467 |
-
|
| 468 |
gr.HTML("<hr style='margin-top: 20px; margin-bottom: 10px;'>")
|
| 469 |
-
gr.Markdown("### π Example Usage")
|
| 470 |
-
|
| 471 |
-
def process_face_swap_for_examples(src_img, tgt_img, idx, enhance, color_correct, progress=gr.Progress(track_tqdm=True)):
|
| 472 |
-
img, file_path, status = process_face_swap(src_img, tgt_img, idx, enhance, color_correct, progress)
|
| 473 |
-
return img, file_path, status
|
| 474 |
-
|
| 475 |
gr.Examples(
|
| 476 |
examples=[
|
| 477 |
["examples/source_example_1.jpg", "examples/target_example_1.jpg", 0, True, True],
|
| 478 |
["examples/source_example_2.png", "examples/target_example_2.png", 1, True, False],
|
| 479 |
-
["examples/source_example_1.jpg", "examples/target_example_1.jpg", 0, False, True],
|
| 480 |
],
|
| 481 |
-
inputs=[
|
| 482 |
-
outputs=[
|
| 483 |
-
fn=
|
| 484 |
-
cache_examples=False,
|
| 485 |
-
label="
|
| 486 |
)
|
|
|
|
|
|
|
| 487 |
|
| 488 |
if __name__ == "__main__":
|
| 489 |
os.makedirs("models", exist_ok=True)
|
| 490 |
os.makedirs("examples", exist_ok=True)
|
|
|
|
| 491 |
|
| 492 |
print("\n" + "="*70)
|
| 493 |
-
print("π ULTIMATE FACE SWAP AI - v3.
|
| 494 |
print("="*70)
|
| 495 |
print(f"Execution Providers Selected: {EXECUTION_PROVIDERS}")
|
| 496 |
|
|
@@ -498,18 +813,25 @@ if __name__ == "__main__":
|
|
| 498 |
print("\nπ΄ CRITICAL ERROR: CORE MODELS FAILED TO LOAD.")
|
| 499 |
print(f" - Face Analyzer ('{FACE_ANALYZER_NAME}'): {'Loaded' if face_analyzer else 'FAILED'}")
|
| 500 |
print(f" - Swapper Model ('{SWAPPER_MODEL_PATH}'): {'Loaded' if swapper else 'FAILED'}")
|
| 501 |
-
print(" The application UI will be severely limited or non-functional.")
|
| 502 |
else:
|
| 503 |
print("\nπ’ Core models (Face Analyzer & Swapper) loaded successfully.")
|
|
|
|
| 504 |
if not restoration_model_loaded_successfully:
|
| 505 |
-
print(f"\nπ‘ INFO: Face Restoration model ('{RESTORATION_MODEL_PATH}') NOT loaded.")
|
| 506 |
-
print(" The 'Apply Selective Face Restoration' feature will be DISABLED.")
|
| 507 |
else:
|
| 508 |
print("\nπ’ Face Restoration model loaded successfully.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 509 |
print("="*70)
|
|
|
|
| 510 |
if not core_models_loaded_successfully:
|
| 511 |
-
print("\nπ The Gradio interface will launch, but swapping functionality will be
|
| 512 |
else:
|
| 513 |
-
print("\nπ Launching Gradio Interface... Access it in your browser (usually at http://127.0.0.1:7860).")
|
| 514 |
|
| 515 |
demo.launch()
|
|
|
|
| 8 |
import tempfile
|
| 9 |
import logging
|
| 10 |
import onnxruntime
|
| 11 |
+
from typing import List, Optional, Tuple, Any # For improved type hinting
|
| 12 |
+
import shutil # For cleaning up temporary directories
|
| 13 |
|
| 14 |
# --- Configuration & Setup ---
|
| 15 |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
| 41 |
logging.info(f"InsightFace Context ID for preparation: {INSIGHTFACE_CTX_ID}")
|
| 42 |
|
| 43 |
face_analyzer: Optional[FaceAnalysis] = None
|
| 44 |
+
swapper: Optional[Any] = None # Type Any because insightface.model_zoo.get_model doesn't have precise type hints readily available
|
| 45 |
face_restorer: Optional[onnxruntime.InferenceSession] = None
|
| 46 |
|
| 47 |
def initialize_models():
|
|
|
|
| 59 |
else:
|
| 60 |
logging.info(f"Loading swapper model from: {SWAPPER_MODEL_PATH} with providers: {EXECUTION_PROVIDERS}")
|
| 61 |
try:
|
| 62 |
+
# Attempt to pass providers if the model supports it
|
| 63 |
swapper = get_model(SWAPPER_MODEL_PATH, download=False, providers=EXECUTION_PROVIDERS)
|
| 64 |
+
except TypeError: # Some versions of get_model might not accept 'providers'
|
| 65 |
logging.warning(f"Failed to pass 'providers' argument to swapper model {SWAPPER_MODEL_PATH}. Retrying without 'providers' argument. Model will use its default execution provider.")
|
| 66 |
+
swapper = get_model(SWAPPER_MODEL_PATH, download=False) # Fallback
|
| 67 |
logging.info("Swapper model loaded successfully.")
|
| 68 |
|
| 69 |
if face_restorer is None:
|
|
|
|
| 84 |
core_models_loaded_successfully = face_analyzer is not None and swapper is not None
|
| 85 |
restoration_model_loaded_successfully = face_restorer is not None
|
| 86 |
|
| 87 |
+
# --- Utility Functions (Image Conversion, Face Detection, Drawing) ---
|
| 88 |
def convert_pil_to_cv2(pil_image: Optional[Image.Image]) -> Optional[np.ndarray]:
|
| 89 |
if pil_image is None: return None
|
| 90 |
try:
|
|
|
|
| 101 |
logging.error(f"Error converting CV2 to PIL: {e}")
|
| 102 |
return None
|
| 103 |
|
| 104 |
+
def get_faces_from_image(img_np: np.ndarray) -> List[Any]: # Using Any for insightface.app.common.Face
|
| 105 |
if not core_models_loaded_successfully or face_analyzer is None:
|
| 106 |
logging.error("Face analyzer not available for get_faces_from_image. Core models might have failed to load.")
|
| 107 |
return []
|
|
|
|
| 115 |
logging.error(f"Error during face detection: {e}", exc_info=True)
|
| 116 |
return []
|
| 117 |
|
| 118 |
+
def draw_detected_faces(img_np: np.ndarray, faces: List[Any]) -> np.ndarray:
|
| 119 |
img_with_boxes = img_np.copy()
|
| 120 |
for i, face in enumerate(faces):
|
| 121 |
box = face.bbox.astype(int)
|
|
|
|
| 123 |
cv2.rectangle(img_with_boxes, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
| 124 |
label_position = (x1, max(0, y1 - 10))
|
| 125 |
cv2.putText(img_with_boxes, f"Face {i}", label_position,
|
| 126 |
+
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (36, 255, 12), 2, cv2.LINE_AA)
|
| 127 |
return img_with_boxes
|
| 128 |
|
| 129 |
+
# --- Enhancement & Color Correction ---
|
| 130 |
def enhance_cropped_face(face_crop_bgr: np.ndarray) -> np.ndarray:
|
| 131 |
if not restoration_model_loaded_successfully or face_restorer is None:
|
| 132 |
logging.warning("Face restorer model not available. Skipping enhancement for crop.")
|
|
|
|
| 135 |
logging.warning("Received empty or invalid face crop for enhancement.")
|
| 136 |
return face_crop_bgr if face_crop_bgr is not None else np.array([])
|
| 137 |
|
| 138 |
+
logging.debug(f"Applying face restoration to crop of size {face_crop_bgr.shape[:2]}...")
|
| 139 |
original_crop_height, original_crop_width = face_crop_bgr.shape[:2]
|
| 140 |
|
| 141 |
try:
|
|
|
|
| 143 |
restorer_input_node = face_restorer.get_inputs()[0]
|
| 144 |
try:
|
| 145 |
shape = restorer_input_node.shape
|
| 146 |
+
if isinstance(shape[2], int) and isinstance(shape[3], int) and shape[2] > 0 and shape[3] > 0:
|
| 147 |
+
restorer_input_size_hw = (shape[2], shape[3])
|
| 148 |
+
else: # Handle dynamic axes or unexpected shapes
|
|
|
|
| 149 |
restorer_input_size_hw = (512, 512)
|
| 150 |
+
logging.warning(f"Could not reliably determine restorer input size from model shape {shape}, defaulting to {restorer_input_size_hw}.")
|
| 151 |
+
except Exception as e_shape:
|
| 152 |
restorer_input_size_hw = (512, 512)
|
| 153 |
+
logging.warning(f"Error determining restorer input size (error: {e_shape}), defaulting to {restorer_input_size_hw}.")
|
| 154 |
+
|
| 155 |
+
logging.debug(f"Restoration model expects input HxW: {restorer_input_size_hw}")
|
| 156 |
img_resized_for_model = cv2.resize(img_rgb, (restorer_input_size_hw[1], restorer_input_size_hw[0]), interpolation=cv2.INTER_AREA)
|
| 157 |
img_normalized = (img_resized_for_model / 255.0).astype(np.float32)
|
| 158 |
img_chw = np.transpose(img_normalized, (2, 0, 1))
|
|
|
|
| 165 |
restored_img_uint8_model_size = np.clip(restored_img_hwc_model_size * 255.0, 0, 255).astype(np.uint8)
|
| 166 |
restored_crop_rgb = cv2.resize(restored_img_uint8_model_size, (original_crop_width, original_crop_height), interpolation=cv2.INTER_LANCZOS4)
|
| 167 |
restored_crop_bgr = cv2.cvtColor(restored_crop_rgb, cv2.COLOR_RGB2BGR)
|
| 168 |
+
logging.debug("Cropped face restoration complete.")
|
| 169 |
return restored_crop_bgr
|
| 170 |
except Exception as e:
|
| 171 |
logging.error(f"Error during face restoration for crop: {e}", exc_info=True)
|
|
|
|
| 175 |
source_shape = source_channel.shape
|
| 176 |
source_channel_flat = source_channel.flatten()
|
| 177 |
target_channel_flat = target_channel.flatten()
|
| 178 |
+
|
| 179 |
source_hist, bins = np.histogram(source_channel_flat, 256, [0, 256])
|
| 180 |
source_cdf = source_hist.cumsum()
|
| 181 |
if source_cdf[-1] == 0:
|
| 182 |
+
logging.warning("Source channel for histogram matching is effectively empty. Returning original channel.")
|
| 183 |
return source_channel
|
| 184 |
+
|
| 185 |
target_hist, bins = np.histogram(target_channel_flat, 256, [0, 256])
|
| 186 |
target_cdf = target_hist.cumsum()
|
| 187 |
if target_cdf[-1] == 0:
|
| 188 |
logging.warning("Target channel for histogram matching is effectively empty. No matching possible. Returning original source channel.")
|
| 189 |
return source_channel
|
| 190 |
+
|
| 191 |
+
# Normalize CDFs
|
| 192 |
source_cdf_normalized = (source_cdf * 255 / source_cdf[-1]).astype(np.uint8)
|
| 193 |
target_cdf_normalized = (target_cdf * 255 / target_cdf[-1]).astype(np.uint8)
|
| 194 |
+
|
| 195 |
lookup_table = np.zeros(256, dtype=np.uint8)
|
| 196 |
gj = 0
|
| 197 |
for gi in range(256):
|
| 198 |
while gj < 255 and target_cdf_normalized[gj] < source_cdf_normalized[gi]:
|
| 199 |
gj += 1
|
| 200 |
lookup_table[gi] = gj
|
| 201 |
+
|
| 202 |
+
matched_channel = cv2.LUT(source_channel, lookup_table)
|
| 203 |
+
return matched_channel.reshape(source_shape)
|
| 204 |
+
|
| 205 |
|
| 206 |
def histogram_match_color(source_img_bgr: np.ndarray, target_img_bgr: np.ndarray) -> np.ndarray:
|
| 207 |
if source_img_bgr is None or target_img_bgr is None or source_img_bgr.size == 0 or target_img_bgr.size == 0:
|
| 208 |
logging.warning("Cannot perform histogram matching on empty or None images.")
|
| 209 |
return source_img_bgr if source_img_bgr is not None else np.array([])
|
| 210 |
+
|
| 211 |
matched_img = np.zeros_like(source_img_bgr)
|
| 212 |
try:
|
| 213 |
+
for i in range(source_img_bgr.shape[2]): # For each channel (B, G, R)
|
| 214 |
matched_img[:, :, i] = histogram_match_channel(source_img_bgr[:, :, i], target_img_bgr[:, :, i])
|
| 215 |
return matched_img
|
| 216 |
except Exception as e:
|
| 217 |
logging.error(f"Error during color histogram matching: {e}", exc_info=True)
|
| 218 |
return source_img_bgr
|
| 219 |
|
| 220 |
+
# --- Image Face Swap Processing ---
|
| 221 |
def process_face_swap(source_pil_img: Optional[Image.Image], target_pil_img: Optional[Image.Image],
|
| 222 |
target_face_index: int, apply_enhancement: bool, apply_color_correction: bool,
|
| 223 |
progress=gr.Progress(track_tqdm=True)):
|
| 224 |
+
progress(0, desc="Initializing image swap...")
|
| 225 |
+
if not core_models_loaded_successfully or face_analyzer is None or swapper is None:
|
| 226 |
+
gr.Error("CRITICAL: Core models (Face Analyzer or Swapper) not loaded. Cannot proceed with image swap.")
|
| 227 |
return Image.new('RGB', (100, 100), color='lightgrey'), None, "Core models failed to load."
|
| 228 |
+
if source_pil_img is None: raise gr.Error("Source image not provided.")
|
| 229 |
+
if target_pil_img is None: raise gr.Error("Target image not provided.")
|
| 230 |
|
| 231 |
+
progress(0.05, desc="Converting images...")
|
| 232 |
source_np = convert_pil_to_cv2(source_pil_img)
|
| 233 |
target_np = convert_pil_to_cv2(target_pil_img)
|
| 234 |
if source_np is None or target_np is None:
|
| 235 |
+
raise gr.Error("Image conversion failed.")
|
| 236 |
target_h, target_w = target_np.shape[:2]
|
| 237 |
|
| 238 |
+
progress(0.15, desc="Detecting source face...")
|
| 239 |
source_faces = get_faces_from_image(source_np)
|
| 240 |
if not source_faces:
|
| 241 |
+
raise gr.Error("No face found in source image.")
|
| 242 |
source_face = source_faces[0]
|
| 243 |
|
| 244 |
+
progress(0.25, desc="Detecting target faces...")
|
| 245 |
target_faces = get_faces_from_image(target_np)
|
| 246 |
if not target_faces:
|
| 247 |
+
raise gr.Error("No faces found in target image.")
|
| 248 |
if not (0 <= target_face_index < len(target_faces)):
|
| 249 |
+
raise gr.Error(f"Target face index {target_face_index} out of range ({len(target_faces)} faces found).")
|
| 250 |
+
|
|
|
|
| 251 |
target_face_to_swap_info = target_faces[int(target_face_index)]
|
| 252 |
swapped_bgr_img = target_np.copy()
|
| 253 |
+
|
| 254 |
try:
|
| 255 |
progress(0.4, desc="Performing face swap...")
|
| 256 |
+
# Ensure swapper.get is called correctly. Assuming it modifies swapped_bgr_img or returns the modified image.
|
| 257 |
+
# Based on typical insightface usage, it modifies the input image if paste_back=True
|
| 258 |
swapped_bgr_img = swapper.get(swapped_bgr_img, target_face_to_swap_info, source_face, paste_back=True)
|
| 259 |
bbox_coords = target_face_to_swap_info.bbox.astype(int)
|
| 260 |
+
|
| 261 |
if apply_enhancement:
|
| 262 |
if restoration_model_loaded_successfully and face_restorer is not None:
|
| 263 |
+
progress(0.6, desc="Applying face enhancement...")
|
| 264 |
+
padding_enh = 20
|
| 265 |
enh_x1 = max(0, bbox_coords[0] - padding_enh)
|
| 266 |
enh_y1 = max(0, bbox_coords[1] - padding_enh)
|
| 267 |
enh_x2 = min(target_w, bbox_coords[2] + padding_enh)
|
| 268 |
enh_y2 = min(target_h, bbox_coords[3] + padding_enh)
|
| 269 |
if enh_x1 < enh_x2 and enh_y1 < enh_y2:
|
| 270 |
face_crop_to_enhance = swapped_bgr_img[enh_y1:enh_y2, enh_x1:enh_x2]
|
| 271 |
+
enhanced_crop = enhance_cropped_face(face_crop_to_enhance.copy()) # Pass a copy
|
| 272 |
if enhanced_crop.shape == face_crop_to_enhance.shape:
|
| 273 |
swapped_bgr_img[enh_y1:enh_y2, enh_x1:enh_x2] = enhanced_crop
|
| 274 |
+
else: logging.warning("Enhanced crop size mismatch.")
|
| 275 |
+
else: logging.warning("Invalid crop for enhancement.")
|
|
|
|
|
|
|
| 276 |
else:
|
| 277 |
+
logging.warning("Enhancement requested but model not available.")
|
| 278 |
+
gr.Info("Face restoration model not available, enhancement skipped.")
|
| 279 |
|
| 280 |
if apply_color_correction:
|
| 281 |
+
progress(0.75, desc="Applying color correction...")
|
| 282 |
cc_x1, cc_y1, cc_x2, cc_y2 = bbox_coords[0], bbox_coords[1], bbox_coords[2], bbox_coords[3]
|
| 283 |
if cc_x1 < cc_x2 and cc_y1 < cc_y2:
|
| 284 |
original_target_face_region = target_np[cc_y1:cc_y2, cc_x1:cc_x2]
|
| 285 |
swapped_face_region_to_correct = swapped_bgr_img[cc_y1:cc_y2, cc_x1:cc_x2]
|
| 286 |
if original_target_face_region.size > 0 and swapped_face_region_to_correct.size > 0:
|
| 287 |
corrected_swapped_region = histogram_match_color(
|
| 288 |
+
swapped_face_region_to_correct.copy(), # Pass a copy
|
| 289 |
+
original_target_face_region.copy() # Pass a copy
|
| 290 |
)
|
| 291 |
if corrected_swapped_region.shape == swapped_face_region_to_correct.shape:
|
| 292 |
+
swapped_bgr_img[cc_y1:cc_y2, cc_x1:cc_x2] = corrected_swapped_region
|
| 293 |
+
else: logging.warning("Color corrected region size mismatch.")
|
| 294 |
+
else: logging.warning("Empty regions for color correction.")
|
| 295 |
+
else: logging.warning("Invalid bbox for color correction.")
|
| 296 |
+
|
|
|
|
|
|
|
| 297 |
except Exception as e:
|
| 298 |
+
logging.error(f"Error during image face swapping or post-processing: {e}", exc_info=True)
|
| 299 |
+
# Fallback to returning the (partially) swapped image if an error occurs mid-process
|
| 300 |
swapped_pil_img_on_error = convert_cv2_to_pil(swapped_bgr_img)
|
| 301 |
+
if swapped_pil_img_on_error is None: # If conversion also fails
|
| 302 |
+
swapped_pil_img_on_error = Image.new('RGB', (target_w, target_h), color='lightgrey') # Placeholder
|
| 303 |
+
raise gr.Error(f"An error occurred during image processing: {str(e)}.")
|
| 304 |
|
| 305 |
+
progress(0.9, desc="Finalizing image...")
|
| 306 |
swapped_pil_img = convert_cv2_to_pil(swapped_bgr_img)
|
| 307 |
if swapped_pil_img is None:
|
| 308 |
+
gr.Error("Failed to convert final image.")
|
| 309 |
return Image.new('RGB', (target_w, target_h), color='darkred'), None, "Failed to convert final image."
|
| 310 |
|
| 311 |
temp_file_path = None
|
|
|
|
| 313 |
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False, mode="wb") as tmp_file:
|
| 314 |
swapped_pil_img.save(tmp_file, format="JPEG", quality=95)
|
| 315 |
temp_file_path = tmp_file.name
|
|
|
|
| 316 |
except Exception as e:
|
| 317 |
logging.error(f"Error saving swapped image to temporary file: {e}", exc_info=True)
|
| 318 |
+
gr.Warning("Could not save image for download.")
|
| 319 |
+
|
| 320 |
+
progress(1.0, desc="π Image swap complete!")
|
| 321 |
+
return swapped_pil_img, temp_file_path, "π Image swap complete!"
|
| 322 |
+
|
| 323 |
+
# --- Video Face Swap Processing ---
|
| 324 |
+
def process_video_face_swap(
|
| 325 |
+
source_pil_img: Optional[Image.Image],
|
| 326 |
+
target_video_path: Optional[str], # Path from gr.Video component
|
| 327 |
+
target_face_index_hint: int,
|
| 328 |
+
apply_enhancement: bool,
|
| 329 |
+
apply_color_correction: bool,
|
| 330 |
+
progress=gr.Progress(track_tqdm=True)
|
| 331 |
+
):
|
| 332 |
+
progress(0, desc="Initializing video swap...")
|
| 333 |
+
if not core_models_loaded_successfully or face_analyzer is None or swapper is None:
|
| 334 |
+
gr.Error("CRITICAL: Core models not loaded. Cannot proceed with video swap.")
|
| 335 |
+
return None, "Core models failed."
|
| 336 |
+
if source_pil_img is None: raise gr.Error("Source image for video swap not provided.")
|
| 337 |
+
if target_video_path is None: raise gr.Error("Target video file not provided.")
|
| 338 |
+
|
| 339 |
+
source_np = convert_pil_to_cv2(source_pil_img)
|
| 340 |
+
if source_np is None: raise gr.Error("Source image conversion failed (video swap).")
|
| 341 |
+
|
| 342 |
+
source_faces = get_faces_from_image(source_np)
|
| 343 |
+
if not source_faces: raise gr.Error("No face found in source image (video swap).")
|
| 344 |
+
source_face = source_faces[0]
|
| 345 |
+
|
| 346 |
+
cap = cv2.VideoCapture(target_video_path)
|
| 347 |
+
if not cap.isOpened(): raise gr.Error(f"Could not open video file: {target_video_path}")
|
| 348 |
+
|
| 349 |
+
fps = cap.get(cv2.CAP_PROP_FPS)
|
| 350 |
+
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| 351 |
+
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| 352 |
+
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
| 353 |
+
if total_frames <= 0: # Fallback if total_frames is not correctly read
|
| 354 |
+
logging.warning(f"Could not accurately determine total frames for {target_video_path}. Processing may not show correct progress percentage.")
|
| 355 |
+
# Estimate a large number or process until cap.read() fails
|
| 356 |
+
total_frames = 1 # Placeholder to avoid division by zero if loop is entered
|
| 357 |
+
|
| 358 |
+
temp_process_dir = tempfile.mkdtemp() # Create a temporary directory for all intermediate files
|
| 359 |
+
output_video_basename = f"swapped_{os.path.splitext(os.path.basename(target_video_path))[0]}"
|
| 360 |
+
output_video_path_nosound = os.path.join(temp_process_dir, output_video_basename + "_nosound.mp4")
|
| 361 |
+
final_output_video_path = os.path.join(temp_process_dir, output_video_basename + ".mp4")
|
| 362 |
|
| 363 |
+
# Using 'mp4v' as it's broadly available. For H.264 (libx264), moviepy is more reliable.
|
| 364 |
+
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
| 365 |
+
out_nosound = cv2.VideoWriter(output_video_path_nosound, fourcc, fps if fps > 0 else 30.0, (frame_width, frame_height))
|
| 366 |
+
|
| 367 |
+
logging.info(f"Processing video: {target_video_path} ({total_frames if total_frames > 1 else 'unknown'} frames, {fps} FPS)")
|
| 368 |
+
frames_processed_count = 0
|
| 369 |
+
audio_extracted_path: Optional[str] = None
|
| 370 |
|
| 371 |
+
try: # Main processing block
|
| 372 |
+
# Attempt to extract audio first
|
| 373 |
+
progress(0.01, desc="Extracting audio (if any)...")
|
| 374 |
+
try:
|
| 375 |
+
from moviepy.editor import VideoFileClip
|
| 376 |
+
with VideoFileClip(target_video_path) as video_clip:
|
| 377 |
+
if video_clip.audio:
|
| 378 |
+
audio_extracted_path = os.path.join(temp_process_dir, "original_audio.mp3")
|
| 379 |
+
video_clip.audio.write_audiofile(audio_extracted_path, logger=None) # Suppress moviepy stdout
|
| 380 |
+
logging.info(f"Original audio extracted to: {audio_extracted_path}")
|
| 381 |
+
else:
|
| 382 |
+
logging.info("No audio track found in the original video.")
|
| 383 |
+
except ImportError:
|
| 384 |
+
logging.warning("moviepy library not found. Video will be processed without audio. Install with 'pip install moviepy'.")
|
| 385 |
+
gr.Info("moviepy not found. Video will be silent.")
|
| 386 |
+
except Exception as e_audio_extract:
|
| 387 |
+
logging.warning(f"Could not extract audio using moviepy: {e_audio_extract}. Proceeding without audio.", exc_info=True)
|
| 388 |
+
gr.Warning(f"Audio extraction failed: {e_audio_extract}. Video will be silent.")
|
| 389 |
+
|
| 390 |
+
# Frame-by-frame processing
|
| 391 |
+
# Use a manual loop if total_frames is unreliable
|
| 392 |
+
frame_num_iterator = range(total_frames) if total_frames > 1 else iter(lambda: cap.isOpened(), False)
|
| 393 |
+
|
| 394 |
+
for frame_idx in progress.tqdm(frame_num_iterator, desc="Swapping video frames", total=total_frames if total_frames > 1 else None):
|
| 395 |
+
ret, frame = cap.read()
|
| 396 |
+
if not ret:
|
| 397 |
+
logging.info(f"End of video or cannot read frame at index {frames_processed_count}.")
|
| 398 |
+
break
|
| 399 |
+
|
| 400 |
+
current_frame_h, current_frame_w = frame.shape[:2]
|
| 401 |
+
target_faces_in_frame = get_faces_from_image(frame)
|
| 402 |
+
processed_frame = frame.copy()
|
| 403 |
+
|
| 404 |
+
if target_faces_in_frame and swapper:
|
| 405 |
+
target_face_to_swap_info_frame = None
|
| 406 |
+
if 0 <= target_face_index_hint < len(target_faces_in_frame):
|
| 407 |
+
target_face_to_swap_info_frame = target_faces_in_frame[target_face_index_hint]
|
| 408 |
+
elif target_faces_in_frame: # Fallback: use the first detected face
|
| 409 |
+
target_face_to_swap_info_frame = target_faces_in_frame[0]
|
| 410 |
+
|
| 411 |
+
if target_face_to_swap_info_frame:
|
| 412 |
+
try:
|
| 413 |
+
processed_frame = swapper.get(processed_frame, target_face_to_swap_info_frame, source_face, paste_back=True)
|
| 414 |
+
bbox_coords_frame = target_face_to_swap_info_frame.bbox.astype(int)
|
| 415 |
+
|
| 416 |
+
if apply_enhancement and restoration_model_loaded_successfully and face_restorer:
|
| 417 |
+
padding_enh = 20
|
| 418 |
+
enh_x1 = max(0, bbox_coords_frame[0] - padding_enh)
|
| 419 |
+
enh_y1 = max(0, bbox_coords_frame[1] - padding_enh)
|
| 420 |
+
enh_x2 = min(current_frame_w, bbox_coords_frame[2] + padding_enh)
|
| 421 |
+
enh_y2 = min(current_frame_h, bbox_coords_frame[3] + padding_enh)
|
| 422 |
+
if enh_x1 < enh_x2 and enh_y1 < enh_y2:
|
| 423 |
+
crop = processed_frame[enh_y1:enh_y2, enh_x1:enh_x2]
|
| 424 |
+
enhanced = enhance_cropped_face(crop.copy())
|
| 425 |
+
if enhanced.shape == crop.shape:
|
| 426 |
+
processed_frame[enh_y1:enh_y2, enh_x1:enh_x2] = enhanced
|
| 427 |
+
# else: logging.debug(f"Frame {frames_processed_count}: Enhanced crop mismatch")
|
| 428 |
+
# else: logging.debug(f"Frame {frames_processed_count}: Invalid enhancement crop")
|
| 429 |
+
|
| 430 |
+
if apply_color_correction:
|
| 431 |
+
cc_x1, cc_y1, cc_x2, cc_y2 = bbox_coords_frame
|
| 432 |
+
if cc_x1 < cc_x2 and cc_y1 < cc_y2:
|
| 433 |
+
original_region = frame[cc_y1:cc_y2, cc_x1:cc_x2]
|
| 434 |
+
swapped_region = processed_frame[cc_y1:cc_y2, cc_x1:cc_x2]
|
| 435 |
+
if original_region.size > 0 and swapped_region.size > 0:
|
| 436 |
+
corrected = histogram_match_color(swapped_region.copy(), original_region.copy())
|
| 437 |
+
if corrected.shape == swapped_region.shape:
|
| 438 |
+
processed_frame[cc_y1:cc_y2, cc_x1:cc_x2] = corrected
|
| 439 |
+
# else: logging.debug(f"Frame {frames_processed_count}: Color correction region mismatch")
|
| 440 |
+
# else: logging.debug(f"Frame {frames_processed_count}: Invalid color correction bbox")
|
| 441 |
+
except Exception as e_frame_swap:
|
| 442 |
+
logging.warning(f"Error swapping face in frame {frames_processed_count}: {e_frame_swap}", exc_info=False)
|
| 443 |
+
# Continue with the (partially) processed frame or original if error is too severe
|
| 444 |
+
|
| 445 |
+
out_nosound.write(processed_frame)
|
| 446 |
+
frames_processed_count += 1
|
| 447 |
+
if total_frames <=1 and frames_processed_count % int(fps if fps > 0 else 30) == 0 : # Update progress for unknown total
|
| 448 |
+
progress(frames_processed_count / (frames_processed_count +1), desc=f"Swapping video frames ({frames_processed_count} so far)")
|
| 449 |
+
|
| 450 |
+
|
| 451 |
+
cap.release()
|
| 452 |
+
out_nosound.release()
|
| 453 |
+
logging.info(f"Finished processing {frames_processed_count} frames. Silent video at: {output_video_path_nosound}")
|
| 454 |
+
|
| 455 |
+
# Re-mux audio using moviepy if audio was extracted
|
| 456 |
+
if audio_extracted_path and os.path.exists(output_video_path_nosound):
|
| 457 |
+
progress(0.95, desc="Combining video with audio...")
|
| 458 |
+
try:
|
| 459 |
+
from moviepy.editor import VideoFileClip, AudioFileClip
|
| 460 |
+
video_clip_nosound = VideoFileClip(output_video_path_nosound)
|
| 461 |
+
audio_clip = AudioFileClip(audio_extracted_path)
|
| 462 |
+
final_video_clip = video_clip_nosound.set_audio(audio_clip)
|
| 463 |
+
# Use a reliable codec like libx264 for video and aac for audio
|
| 464 |
+
final_video_clip.write_videofile(final_output_video_path, codec="libx264", audio_codec="aac", temp_audiofile_path=temp_process_dir, logger=None) # Suppress moviepy stdout
|
| 465 |
+
video_clip_nosound.close()
|
| 466 |
+
audio_clip.close()
|
| 467 |
+
logging.info(f"Video with audio successfully saved to: {final_output_video_path}")
|
| 468 |
+
except ImportError:
|
| 469 |
+
logging.warning("moviepy not found, cannot add audio back. Returning silent video.")
|
| 470 |
+
if os.path.exists(output_video_path_nosound) and not os.path.exists(final_output_video_path):
|
| 471 |
+
os.rename(output_video_path_nosound, final_output_video_path) # Rename for consistency
|
| 472 |
+
except Exception as e_audio_mux:
|
| 473 |
+
logging.error(f"Error adding audio back to video: {e_audio_mux}. Returning silent video.", exc_info=True)
|
| 474 |
+
gr.Warning(f"Audio muxing failed: {e_audio_mux}. Resulting video may be silent.")
|
| 475 |
+
if os.path.exists(output_video_path_nosound) and not os.path.exists(final_output_video_path):
|
| 476 |
+
os.rename(output_video_path_nosound, final_output_video_path)
|
| 477 |
+
elif os.path.exists(output_video_path_nosound) and not os.path.exists(final_output_video_path): # No audio to begin with or extraction failed
|
| 478 |
+
os.rename(output_video_path_nosound, final_output_video_path)
|
| 479 |
+
logging.info(f"No audio to add or audio extraction failed. Final video (silent) at: {final_output_video_path}")
|
| 480 |
+
|
| 481 |
+
if os.path.exists(final_output_video_path):
|
| 482 |
+
progress(1.0, desc="π Video swap complete!")
|
| 483 |
+
return final_output_video_path, f"π Video processing complete! ({frames_processed_count} frames)"
|
| 484 |
+
else:
|
| 485 |
+
raise gr.Error("Video processing failed to produce a final output file.")
|
| 486 |
+
|
| 487 |
+
except Exception as e_video_proc:
|
| 488 |
+
cap.release() # Ensure cap is released on error
|
| 489 |
+
if out_nosound.isOpened(): out_nosound.release() # Ensure writer is released
|
| 490 |
+
logging.error(f"Unhandled error during video processing: {e_video_proc}", exc_info=True)
|
| 491 |
+
raise gr.Error(f"Video processing error: {str(e_video_proc)}")
|
| 492 |
+
finally:
|
| 493 |
+
# Clean up the temporary directory ONLY if final_output_video_path is NOT inside it
|
| 494 |
+
# OR if Gradio handles the file by copying it.
|
| 495 |
+
# Gradio usually copies output files. So, cleaning temp_process_dir should be okay.
|
| 496 |
+
# To be safe, one might copy the final_output_video_path to a Gradio managed temp location
|
| 497 |
+
# before deleting temp_process_dir. For now, we rely on Gradio's handling.
|
| 498 |
+
# If final_output_video_path is to be returned, it should not be deleted yet.
|
| 499 |
+
# Let Gradio handle the temp file. The `temp_process_dir` can be problematic if returned file is in it.
|
| 500 |
+
# A better approach for temp files given to Gradio is to use `tempfile.NamedTemporaryFile` and pass its name.
|
| 501 |
+
# For now, the video file will remain in temp_process_dir, and Gradio will serve it from there.
|
| 502 |
+
# It might get deleted when the app closes or based on OS temp cleanup.
|
| 503 |
+
# For robust cleanup, you'd manage this more carefully, possibly by copying final output
|
| 504 |
+
# to a location Gradio fully controls or a specific output directory.
|
| 505 |
+
# Let's assume Gradio copies it, and we can clean our main temp_process_dir later if needed.
|
| 506 |
+
# The current return path IS inside temp_process_dir. Gradio needs to access it.
|
| 507 |
+
# A solution: copy final_output_video_path to a NamedTemporaryFile before returning.
|
| 508 |
+
if os.path.exists(final_output_video_path):
|
| 509 |
+
final_gradio_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
|
| 510 |
+
shutil.copy2(final_output_video_path, final_gradio_video_path)
|
| 511 |
+
shutil.rmtree(temp_process_dir) # Now clean up the directory
|
| 512 |
+
logging.info(f"Temporary processing directory {temp_process_dir} cleaned up.")
|
| 513 |
+
return final_gradio_video_path, f"π Video processing complete! ({frames_processed_count} frames)"
|
| 514 |
+
else: # If no final path, cleanup might still be useful if dir exists
|
| 515 |
+
if os.path.exists(temp_process_dir):
|
| 516 |
+
shutil.rmtree(temp_process_dir)
|
| 517 |
+
logging.info(f"Temporary processing directory {temp_process_dir} cleaned up after error.")
|
| 518 |
+
return None, "Video processing failed."
|
| 519 |
+
|
| 520 |
+
|
| 521 |
+
# --- Gradio UI Functions ---
|
| 522 |
def preview_target_faces(target_pil_img: Optional[Image.Image]):
|
| 523 |
blank_preview_placeholder = Image.new('RGB', (DETECTION_SIZE[0], DETECTION_SIZE[1]), color='lightgray')
|
| 524 |
reset_slider = gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False, label="π― Select Target Face")
|
| 525 |
+
status_msg = ""
|
| 526 |
+
|
| 527 |
+
if not core_models_loaded_successfully or face_analyzer is None:
|
| 528 |
gr.Warning("Face detection models not loaded. Preview cannot be generated.")
|
| 529 |
+
return blank_preview_placeholder, reset_slider, "Core models not loaded."
|
| 530 |
if target_pil_img is None:
|
| 531 |
+
return blank_preview_placeholder, reset_slider, "Upload target image first."
|
| 532 |
+
|
| 533 |
target_np = convert_pil_to_cv2(target_pil_img)
|
| 534 |
if target_np is None:
|
| 535 |
+
gr.Warning("Could not process target image for preview.")
|
| 536 |
+
return blank_preview_placeholder, reset_slider, "Error processing image."
|
| 537 |
+
|
| 538 |
faces = get_faces_from_image(target_np)
|
| 539 |
if not faces:
|
| 540 |
+
# Resize image for preview if it's too large, even if no faces are found
|
| 541 |
+
max_h, max_w = DETECTION_SIZE[1], DETECTION_SIZE[0] # Assuming (width, height) for DETECTION_SIZE
|
| 542 |
h, w = target_np.shape[:2]
|
| 543 |
+
preview_display_np = target_np
|
| 544 |
if h > max_h or w > max_w:
|
| 545 |
scale = min(max_h/h, max_w/w)
|
| 546 |
+
preview_display_np = cv2.resize(target_np, (int(w*scale), int(h*scale)))
|
| 547 |
+
|
| 548 |
+
preview_pil_img = convert_cv2_to_pil(preview_display_np)
|
|
|
|
| 549 |
if preview_pil_img is None: preview_pil_img = blank_preview_placeholder
|
| 550 |
gr.Info("No faces were detected in the target image.")
|
| 551 |
+
return preview_pil_img, reset_slider, "No faces detected in image."
|
| 552 |
+
|
| 553 |
+
preview_np_with_boxes = draw_detected_faces(target_np.copy(), faces)
|
| 554 |
preview_pil_img_with_boxes = convert_cv2_to_pil(preview_np_with_boxes)
|
| 555 |
if preview_pil_img_with_boxes is None:
|
| 556 |
preview_pil_img_with_boxes = blank_preview_placeholder
|
| 557 |
gr.Warning("Error generating preview image with face boxes.")
|
| 558 |
+
status_msg = "Error generating preview."
|
| 559 |
+
|
| 560 |
num_faces = len(faces)
|
| 561 |
slider_update = gr.Slider(minimum=0, maximum=max(0, num_faces - 1), value=0, step=1,
|
| 562 |
label=f"π― Select Target Face (0 to {num_faces-1})", interactive=(num_faces > 0))
|
| 563 |
+
status_msg = status_msg if status_msg else f"Target preview updated. {num_faces} face(s) found."
|
| 564 |
+
return preview_pil_img_with_boxes, slider_update, status_msg
|
| 565 |
+
|
| 566 |
+
|
| 567 |
+
def preview_video_first_frame(target_video_path: Optional[str]): # Path from gr.Video
|
| 568 |
+
blank_preview_placeholder = Image.new('RGB', (DETECTION_SIZE[0], DETECTION_SIZE[1]), color='lightgray')
|
| 569 |
+
reset_slider = gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False, label="π― Select Target Face")
|
| 570 |
+
status_msg = ""
|
| 571 |
+
|
| 572 |
+
if not core_models_loaded_successfully or face_analyzer is None:
|
| 573 |
+
gr.Warning("Face detection models not loaded. Video preview cannot be generated.")
|
| 574 |
+
return blank_preview_placeholder, reset_slider, "Core models not loaded."
|
| 575 |
+
if target_video_path is None:
|
| 576 |
+
return blank_preview_placeholder, reset_slider, "Upload target video first."
|
| 577 |
+
|
| 578 |
+
cap = cv2.VideoCapture(target_video_path)
|
| 579 |
+
if not cap.isOpened():
|
| 580 |
+
gr.Warning("Could not open video file for preview.")
|
| 581 |
+
return blank_preview_placeholder, reset_slider, "Error opening video."
|
| 582 |
+
|
| 583 |
+
ret, first_frame_np = cap.read()
|
| 584 |
+
cap.release()
|
| 585 |
+
|
| 586 |
+
if not ret or first_frame_np is None:
|
| 587 |
+
gr.Warning("Could not read the first frame from the video.")
|
| 588 |
+
return blank_preview_placeholder, reset_slider, "Error reading first frame."
|
| 589 |
+
|
| 590 |
+
faces = get_faces_from_image(first_frame_np)
|
| 591 |
+
if not faces:
|
| 592 |
+
max_h, max_w = DETECTION_SIZE[1], DETECTION_SIZE[0]
|
| 593 |
+
h, w = first_frame_np.shape[:2]
|
| 594 |
+
preview_display_np = first_frame_np
|
| 595 |
+
if h > max_h or w > max_w: # Resize for display
|
| 596 |
+
scale = min(max_h/h, max_w/w)
|
| 597 |
+
preview_display_np = cv2.resize(first_frame_np, (int(w*scale), int(h*scale)))
|
| 598 |
+
|
| 599 |
+
preview_pil_img = convert_cv2_to_pil(preview_display_np)
|
| 600 |
+
if preview_pil_img is None: preview_pil_img = blank_preview_placeholder
|
| 601 |
+
gr.Info("No faces detected in the first frame of the video.")
|
| 602 |
+
return preview_pil_img, reset_slider, "No faces in first frame of video."
|
| 603 |
+
|
| 604 |
+
preview_np_with_boxes = draw_detected_faces(first_frame_np.copy(), faces)
|
| 605 |
+
preview_pil_img_with_boxes = convert_cv2_to_pil(preview_np_with_boxes)
|
| 606 |
+
if preview_pil_img_with_boxes is None:
|
| 607 |
+
preview_pil_img_with_boxes = blank_preview_placeholder
|
| 608 |
+
gr.Warning("Error generating preview image for video's first frame.")
|
| 609 |
+
status_msg = "Error generating video preview."
|
| 610 |
+
|
| 611 |
+
num_faces = len(faces)
|
| 612 |
+
slider_update = gr.Slider(minimum=0, maximum=max(0, num_faces - 1), value=0, step=1,
|
| 613 |
+
label=f"π― Select Target Face in First Frame (0 to {num_faces-1})", interactive=(num_faces > 0))
|
| 614 |
+
status_msg = status_msg if status_msg else f"Video first frame preview updated. {num_faces} face(s) found."
|
| 615 |
+
return preview_pil_img_with_boxes, slider_update, status_msg
|
| 616 |
+
|
| 617 |
|
| 618 |
# --- Gradio UI Definition ---
|
| 619 |
+
with gr.Blocks(title="Ultimate Face Swap AI π v3.3 Video", theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue, secondary_hue=gr.themes.colors.sky)) as demo:
|
| 620 |
gr.Markdown(
|
| 621 |
"""
|
| 622 |
<div style="text-align: center; border-bottom: 1px solid #eee; padding-bottom:10px;">
|
| 623 |
+
<h1>π Ultimate Face Swap AI <span style="font-size:0.8em; color: #555;">v3.3 Video</span> β¨</h1>
|
| 624 |
+
<p>Swap faces in <strong>images</strong> or <strong>videos</strong>!</p>
|
| 625 |
+
<p style="font-size:0.9em;">Optionally enhance with <strong>face restoration</strong> and <strong>color correction</strong>.</p>
|
| 626 |
</div>
|
| 627 |
"""
|
| 628 |
)
|
|
|
|
| 631 |
gr.Error(
|
| 632 |
"π΄ CRITICAL ERROR: Core models (Face Analyzer or Swapper) failed to load. "
|
| 633 |
"The application UI will load but WILL NOT FUNCTION correctly. "
|
| 634 |
+
"Please check console logs and restart after fixing."
|
|
|
|
| 635 |
)
|
| 636 |
|
| 637 |
+
with gr.Tabs() as tabs:
|
| 638 |
+
# --- IMAGE SWAP TAB ---
|
| 639 |
+
with gr.TabItem("πΌοΈ Image Face Swap", id="image_tab"):
|
| 640 |
+
with gr.Row():
|
| 641 |
+
with gr.Column(scale=1, min_width=300):
|
| 642 |
+
img_source_image_input = gr.Image(label="π€ Source Face Image (Clear, single face)", type="pil", sources=["upload", "clipboard"], height=380)
|
| 643 |
+
with gr.Column(scale=1, min_width=300):
|
| 644 |
+
img_target_image_input = gr.Image(label="πΌοΈ Target Scene Image", type="pil", sources=["upload", "clipboard"], height=380)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 645 |
|
| 646 |
+
gr.HTML("<hr style='margin-top: 10px; margin-bottom: 15px; border-style: dashed; border-color: #ccc;'>")
|
| 647 |
+
|
| 648 |
+
with gr.Row(equal_height=False):
|
| 649 |
+
with gr.Column(scale=2):
|
| 650 |
+
img_target_faces_preview_output = gr.Image(label="π Target Image Preview & Detected Faces", interactive=False, height=380, show_download_button=False)
|
| 651 |
+
with gr.Column(scale=1, min_width=250):
|
| 652 |
+
img_preview_button = gr.Button("π Preview & Select Target Face (Image)", variant="secondary")
|
| 653 |
+
img_face_index_slider = gr.Slider(
|
| 654 |
+
label="π― Select Target Face (0-indexed)", minimum=0, maximum=0, step=1, value=0, interactive=False
|
| 655 |
+
)
|
| 656 |
+
gr.Markdown("<p style='font-size:0.8em; color:#555;'>Upload target, click 'Preview', then use slider.</p>")
|
| 657 |
+
gr.HTML("<hr style='margin-top: 5px; margin-bottom: 5px; border-style: solid; border-width: thin; border-color: #ddd;'>")
|
| 658 |
+
|
| 659 |
+
img_enhance_checkbox_label = "β¨ Apply Face Restoration"
|
| 660 |
+
if not restoration_model_loaded_successfully: img_enhance_checkbox_label += " (Model N/A)"
|
| 661 |
+
img_enhance_checkbox = gr.Checkbox(label=img_enhance_checkbox_label, value=restoration_model_loaded_successfully, interactive=restoration_model_loaded_successfully)
|
| 662 |
+
if not restoration_model_loaded_successfully: gr.Markdown("<p style='color: orange; font-size:0.8em;'>β οΈ Face restoration model N/A.</p>")
|
| 663 |
+
else: gr.Markdown("<p style='font-size:0.8em; color:#555;'>Improves swapped face quality.</p>")
|
| 664 |
+
|
| 665 |
+
img_color_correction_checkbox = gr.Checkbox(label="π¨ Apply Color Correction", value=True)
|
| 666 |
+
gr.Markdown("<p style='font-size:0.8em; color:#555;'>Matches swapped face color to scene.</p>")
|
| 667 |
+
|
| 668 |
+
gr.HTML("<hr style='margin-top: 15px; margin-bottom: 15px;'>")
|
| 669 |
+
with gr.Row():
|
| 670 |
+
img_swap_button = gr.Button("π GENERATE IMAGE SWAP!", variant="primary", scale=3, interactive=core_models_loaded_successfully)
|
| 671 |
+
img_clear_button = gr.Button("π§Ή Clear Image Inputs", variant="stop", scale=1)
|
| 672 |
|
| 673 |
+
with gr.Row():
|
| 674 |
+
with gr.Column(scale=2):
|
| 675 |
+
img_swapped_image_output = gr.Image(label="β¨ Swapped Image Result", interactive=False, height=480, show_download_button=False)
|
| 676 |
+
with gr.Column(scale=1):
|
| 677 |
+
img_download_output_file = gr.File(label="β¬οΈ Download Swapped Image", interactive=True)
|
| 678 |
+
img_status_message_output = gr.Markdown(value="<p style='color: #555; font-style: italic;'>Image status messages.</p>")
|
| 679 |
+
|
| 680 |
+
# Image Tab Event Handlers
|
| 681 |
+
def on_img_target_image_upload_or_clear(target_img_pil: Optional[Image.Image]):
|
| 682 |
+
blank_preview = Image.new('RGB', (DETECTION_SIZE[0], DETECTION_SIZE[1]), color='whitesmoke')
|
| 683 |
+
reset_slider = gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False, label="π― Select Target Face")
|
| 684 |
+
status = "Target image changed. Click 'Preview' to update." if target_img_pil else "Target image cleared."
|
| 685 |
+
return blank_preview, reset_slider, status
|
| 686 |
+
|
| 687 |
+
img_target_image_input.change(
|
| 688 |
+
fn=on_img_target_image_upload_or_clear,
|
| 689 |
+
inputs=[img_target_image_input],
|
| 690 |
+
outputs=[img_target_faces_preview_output, img_face_index_slider, img_status_message_output],
|
| 691 |
+
queue=False
|
| 692 |
)
|
| 693 |
+
img_preview_button.click(
|
| 694 |
+
fn=preview_target_faces, inputs=[img_target_image_input],
|
| 695 |
+
outputs=[img_target_faces_preview_output, img_face_index_slider, img_status_message_output],
|
| 696 |
+
show_progress="full"
|
| 697 |
+
)
|
| 698 |
+
img_swap_button.click(
|
| 699 |
+
fn=process_face_swap,
|
| 700 |
+
inputs=[img_source_image_input, img_target_image_input, img_face_index_slider, img_enhance_checkbox, img_color_correction_checkbox],
|
| 701 |
+
outputs=[img_swapped_image_output, img_download_output_file, img_status_message_output]
|
| 702 |
+
)
|
| 703 |
+
def clear_image_tab():
|
| 704 |
+
blank_preview = Image.new('RGB', (DETECTION_SIZE[0], DETECTION_SIZE[1]), color='lightgray')
|
| 705 |
+
reset_slider = gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False)
|
| 706 |
+
return None, None, blank_preview, reset_slider, None, None, "<p style='color: #555; font-style: italic;'>Image fields cleared.</p>"
|
| 707 |
+
img_clear_button.click(
|
| 708 |
+
fn=clear_image_tab, inputs=None,
|
| 709 |
+
outputs=[img_source_image_input, img_target_image_input, img_target_faces_preview_output, img_face_index_slider, img_swapped_image_output, img_download_output_file, img_status_message_output],
|
| 710 |
+
queue=False
|
| 711 |
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 712 |
|
| 713 |
+
# --- VIDEO SWAP TAB ---
|
| 714 |
+
with gr.TabItem("π¬ Video Face Swap", id="video_tab"):
|
| 715 |
+
gr.Markdown("<p style='font-size:0.9em; color:orange;'>β οΈ Video processing can be time-consuming. Audio is preserved if <strong>moviepy</strong> is installed (`pip install moviepy`).</p>")
|
| 716 |
+
with gr.Row():
|
| 717 |
+
with gr.Column(scale=1, min_width=300):
|
| 718 |
+
vid_source_image_input = gr.Image(label="π€ Source Face Image (for video)", type="pil", sources=["upload", "clipboard"], height=380)
|
| 719 |
+
with gr.Column(scale=1, min_width=300):
|
| 720 |
+
vid_target_video_input = gr.Video(label="π¬ Target Video File", sources=["upload"], height=380)
|
| 721 |
+
|
| 722 |
+
gr.HTML("<hr style='margin-top: 10px; margin-bottom: 15px; border-style: dashed; border-color: #ccc;'>")
|
| 723 |
+
|
| 724 |
+
with gr.Row(equal_height=False):
|
| 725 |
+
with gr.Column(scale=2):
|
| 726 |
+
vid_target_preview_output = gr.Image(label="π Target Video First Frame Preview", interactive=False, height=380, show_download_button=False)
|
| 727 |
+
with gr.Column(scale=1, min_width=250):
|
| 728 |
+
vid_preview_button = gr.Button("π Preview First Frame & Select Target Face (Video)", variant="secondary")
|
| 729 |
+
vid_face_index_slider = gr.Slider(
|
| 730 |
+
label="π― Select Target Face in First Frame (0-indexed)", minimum=0, maximum=0, step=1, value=0, interactive=False
|
| 731 |
+
)
|
| 732 |
+
gr.Markdown("<p style='font-size:0.8em; color:#555;'>Upload video, click 'Preview', then use slider for first frame.</p>")
|
| 733 |
+
gr.HTML("<hr style='margin-top: 5px; margin-bottom: 5px; border-style: solid; border-width: thin; border-color: #ddd;'>")
|
| 734 |
+
|
| 735 |
+
vid_enhance_checkbox_label = "β¨ Apply Face Restoration (Video)"
|
| 736 |
+
if not restoration_model_loaded_successfully: vid_enhance_checkbox_label += " (Model N/A)"
|
| 737 |
+
vid_enhance_checkbox = gr.Checkbox(label=vid_enhance_checkbox_label, value=restoration_model_loaded_successfully, interactive=restoration_model_loaded_successfully)
|
| 738 |
+
if not restoration_model_loaded_successfully: gr.Markdown("<p style='color: orange; font-size:0.8em;'>β οΈ Face restoration model N/A.</p>")
|
| 739 |
+
|
| 740 |
+
vid_color_correction_checkbox = gr.Checkbox(label="π¨ Apply Color Correction (Video)", value=True)
|
| 741 |
+
|
| 742 |
+
gr.HTML("<hr style='margin-top: 15px; margin-bottom: 15px;'>")
|
| 743 |
+
with gr.Row():
|
| 744 |
+
vid_swap_button = gr.Button("π GENERATE VIDEO SWAP!", variant="primary", scale=3, interactive=core_models_loaded_successfully)
|
| 745 |
+
vid_clear_button = gr.Button("π§Ή Clear Video Inputs", variant="stop", scale=1)
|
| 746 |
+
|
| 747 |
+
with gr.Row():
|
| 748 |
+
with gr.Column(scale=2): # Changed scale for better layout
|
| 749 |
+
vid_swapped_video_output = gr.Video(label="β¨ Swapped Video Result", interactive=False, height=480, show_download_button=True)
|
| 750 |
+
with gr.Column(scale=1):
|
| 751 |
+
vid_status_message_output = gr.Markdown(value="<p style='color: #555; font-style: italic;'>Video status messages.</p>")
|
| 752 |
+
|
| 753 |
+
# Video Tab Event Handlers
|
| 754 |
+
def on_vid_target_video_upload_or_clear(target_video_file: Optional[str]):
|
| 755 |
+
blank_preview = Image.new('RGB', (DETECTION_SIZE[0], DETECTION_SIZE[1]), color='whitesmoke')
|
| 756 |
+
reset_slider = gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False, label="π― Select Target Face")
|
| 757 |
+
status = "Target video changed. Click 'Preview First Frame' to update." if target_video_file else "Target video cleared."
|
| 758 |
+
return blank_preview, reset_slider, status
|
| 759 |
+
|
| 760 |
+
vid_target_video_input.change(
|
| 761 |
+
fn=on_vid_target_video_upload_or_clear,
|
| 762 |
+
inputs=[vid_target_video_input],
|
| 763 |
+
outputs=[vid_target_preview_output, vid_face_index_slider, vid_status_message_output],
|
| 764 |
+
queue=False # No need to queue for simple UI updates
|
| 765 |
+
)
|
| 766 |
+
vid_preview_button.click(
|
| 767 |
+
fn=preview_video_first_frame, inputs=[vid_target_video_input],
|
| 768 |
+
outputs=[vid_target_preview_output, vid_face_index_slider, vid_status_message_output],
|
| 769 |
+
show_progress="full"
|
| 770 |
+
)
|
| 771 |
+
vid_swap_button.click(
|
| 772 |
+
fn=process_video_face_swap,
|
| 773 |
+
inputs=[vid_source_image_input, vid_target_video_input, vid_face_index_slider, vid_enhance_checkbox, vid_color_correction_checkbox],
|
| 774 |
+
outputs=[vid_swapped_video_output, vid_status_message_output] # Output is video path and status
|
| 775 |
+
)
|
| 776 |
+
def clear_video_tab():
|
| 777 |
+
blank_preview = Image.new('RGB', (DETECTION_SIZE[0], DETECTION_SIZE[1]), color='lightgray')
|
| 778 |
+
reset_slider = gr.Slider(minimum=0, maximum=0, value=0, step=1, interactive=False)
|
| 779 |
+
return None, None, blank_preview, reset_slider, None, "<p style='color: #555; font-style: italic;'>Video fields cleared.</p>"
|
| 780 |
+
vid_clear_button.click(
|
| 781 |
+
fn=clear_video_tab, inputs=None,
|
| 782 |
+
outputs=[vid_source_image_input, vid_target_video_input, vid_target_preview_output, vid_face_index_slider, vid_swapped_video_output, vid_status_message_output],
|
| 783 |
+
queue=False
|
| 784 |
+
)
|
| 785 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 786 |
gr.HTML("<hr style='margin-top: 20px; margin-bottom: 10px;'>")
|
| 787 |
+
gr.Markdown("### π Example Usage (Image Tab)")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 788 |
gr.Examples(
|
| 789 |
examples=[
|
| 790 |
["examples/source_example_1.jpg", "examples/target_example_1.jpg", 0, True, True],
|
| 791 |
["examples/source_example_2.png", "examples/target_example_2.png", 1, True, False],
|
|
|
|
| 792 |
],
|
| 793 |
+
inputs=[img_source_image_input, img_target_image_input, img_face_index_slider, img_enhance_checkbox, img_color_correction_checkbox],
|
| 794 |
+
outputs=[img_swapped_image_output, img_download_output_file, img_status_message_output],
|
| 795 |
+
fn=process_face_swap, # Using the image swap function
|
| 796 |
+
cache_examples=False, # Re-run for dynamic results, set to True if examples are static
|
| 797 |
+
label="Image Swap Examples (Ensure 'examples' folder and images exist)"
|
| 798 |
)
|
| 799 |
+
# Note: gr.Examples for video is more complex due to large file inputs.
|
| 800 |
+
# It's often omitted or handled by providing downloadable example video files and instructions.
|
| 801 |
|
| 802 |
if __name__ == "__main__":
|
| 803 |
os.makedirs("models", exist_ok=True)
|
| 804 |
os.makedirs("examples", exist_ok=True)
|
| 805 |
+
# Consider adding example images if they don't exist for gr.Examples to work out of the box.
|
| 806 |
|
| 807 |
print("\n" + "="*70)
|
| 808 |
+
print("π ULTIMATE FACE SWAP AI - v3.3 Video STARTUP STATUS π")
|
| 809 |
print("="*70)
|
| 810 |
print(f"Execution Providers Selected: {EXECUTION_PROVIDERS}")
|
| 811 |
|
|
|
|
| 813 |
print("\nπ΄ CRITICAL ERROR: CORE MODELS FAILED TO LOAD.")
|
| 814 |
print(f" - Face Analyzer ('{FACE_ANALYZER_NAME}'): {'Loaded' if face_analyzer else 'FAILED'}")
|
| 815 |
print(f" - Swapper Model ('{SWAPPER_MODEL_PATH}'): {'Loaded' if swapper else 'FAILED'}")
|
|
|
|
| 816 |
else:
|
| 817 |
print("\nπ’ Core models (Face Analyzer & Swapper) loaded successfully.")
|
| 818 |
+
|
| 819 |
if not restoration_model_loaded_successfully:
|
| 820 |
+
print(f"\nπ‘ INFO: Face Restoration model ('{RESTORATION_MODEL_PATH}') NOT loaded. Enhancement will be disabled.")
|
|
|
|
| 821 |
else:
|
| 822 |
print("\nπ’ Face Restoration model loaded successfully.")
|
| 823 |
+
|
| 824 |
+
try:
|
| 825 |
+
import moviepy.editor
|
| 826 |
+
print("\nπ’ moviepy library found. Audio processing for videos is ENABLED.")
|
| 827 |
+
except ImportError:
|
| 828 |
+
print("\nπ‘ WARNING: moviepy library NOT found. Audio will NOT be processed for videos.")
|
| 829 |
+
print(" To enable audio in video swaps, please install it: pip install moviepy")
|
| 830 |
print("="*70)
|
| 831 |
+
|
| 832 |
if not core_models_loaded_successfully:
|
| 833 |
+
print("\nπ The Gradio interface will launch, but swapping functionality will be BROKEN. Please address errors.")
|
| 834 |
else:
|
| 835 |
+
print("\nπ Launching Gradio Interface... Access it in your browser (usually at http://127.0.0.1:7860 or a public link if shared).")
|
| 836 |
|
| 837 |
demo.launch()
|