Spaces:
Running
on
T4
Running
on
T4
Update app.py
Browse files
app.py
CHANGED
|
@@ -147,20 +147,39 @@ async def log_faceswap_hit(token: str, status: str = "success"):
|
|
| 147 |
# --------------------- Face Swap Pipeline ---------------------
|
| 148 |
swap_lock = threading.Lock()
|
| 149 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 150 |
def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap_work"):
|
| 151 |
try:
|
| 152 |
with swap_lock:
|
| 153 |
-
#
|
| 154 |
if os.path.exists(temp_dir):
|
| 155 |
shutil.rmtree(temp_dir)
|
| 156 |
os.makedirs(temp_dir, exist_ok=True)
|
| 157 |
-
|
| 158 |
src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
|
| 159 |
tgt_bgr_full = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
|
| 160 |
|
| 161 |
src_faces = face_analysis_app.get(src_bgr)
|
| 162 |
tgt_faces = face_analysis_app.get(tgt_bgr_full)
|
| 163 |
-
|
| 164 |
if not src_faces or not tgt_faces:
|
| 165 |
return None, None, "β Face not detected in source or target image"
|
| 166 |
|
|
@@ -179,7 +198,7 @@ def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap_work"):
|
|
| 179 |
src_face0 = src_faces[0]
|
| 180 |
tgt_face0 = tgt_faces[0]
|
| 181 |
|
| 182 |
-
#
|
| 183 |
s_x1, s_y1, s_x2, s_y2 = expand_bbox(src_face0.bbox, src_bgr.shape, scale=1.4)
|
| 184 |
src_crop = src_bgr[s_y1:s_y2, s_x1:s_x2]
|
| 185 |
src_crop_faces = face_analysis_app.get(src_crop)
|
|
@@ -190,65 +209,61 @@ def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap_work"):
|
|
| 190 |
src_for_swap = src_bgr
|
| 191 |
src_face_for_swap = src_face0
|
| 192 |
|
| 193 |
-
#
|
| 194 |
-
t_x1, t_y1, t_x2, t_y2 = expand_bbox(tgt_face0.bbox, tgt_bgr_full.shape, scale=1.
|
| 195 |
tgt_crop = tgt_bgr_full[t_y1:t_y2, t_x1:t_x2]
|
| 196 |
tgt_crop_faces = face_analysis_app.get(tgt_crop)
|
| 197 |
-
|
| 198 |
if tgt_crop_faces:
|
| 199 |
tgt_for_swap = tgt_crop
|
| 200 |
tgt_face_for_swap = tgt_crop_faces[0]
|
| 201 |
-
|
| 202 |
swapped_crop = swapper.get(tgt_for_swap, tgt_face_for_swap, src_face_for_swap)
|
| 203 |
if swapped_crop is None:
|
| 204 |
return None, None, "β Face swap failed on crop"
|
| 205 |
-
|
| 206 |
-
|
|
|
|
| 207 |
mask = cv2.cvtColor(swapped_crop, cv2.COLOR_BGR2GRAY)
|
| 208 |
-
|
| 209 |
-
|
| 210 |
-
|
| 211 |
-
|
|
|
|
|
|
|
|
|
|
| 212 |
try:
|
| 213 |
-
blended = cv2.seamlessClone(swapped_crop, tgt_bgr_full, mask, center, cv2.NORMAL_CLONE)
|
| 214 |
except Exception:
|
| 215 |
-
# Fallback to direct paste if seamlessClone fails
|
| 216 |
blended = tgt_bgr_full.copy()
|
| 217 |
h, w = swapped_crop.shape[:2]
|
| 218 |
blended[t_y1:t_y1+h, t_x1:t_x1+w] = swapped_crop
|
| 219 |
-
|
| 220 |
swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
|
| 221 |
cv2.imwrite(swapped_path, blended)
|
| 222 |
-
|
| 223 |
else:
|
| 224 |
-
# Fallback: swap on full image
|
| 225 |
swapped_bgr_full = swapper.get(tgt_bgr_full, tgt_face0, src_face0)
|
| 226 |
if swapped_bgr_full is None:
|
| 227 |
return None, None, "β Face swap failed on full image"
|
| 228 |
swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
|
| 229 |
cv2.imwrite(swapped_path, swapped_bgr_full)
|
| 230 |
-
|
| 231 |
-
# Run CodeFormer enhancement on the swapped image
|
| 232 |
cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample"
|
| 233 |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
| 234 |
if result.returncode != 0:
|
| 235 |
return None, None, f"β CodeFormer failed:\n{result.stderr}"
|
| 236 |
-
|
| 237 |
final_results_dir = os.path.join(temp_dir, "final_results")
|
| 238 |
final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")]
|
| 239 |
if not final_files:
|
| 240 |
return None, None, "β No enhanced image found"
|
| 241 |
-
|
| 242 |
final_path = os.path.join(final_results_dir, final_files[0])
|
| 243 |
final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
|
| 244 |
-
|
| 245 |
return final_img, final_path, ""
|
| 246 |
-
|
| 247 |
except Exception as e:
|
| 248 |
return None, None, f"β Error: {str(e)}"
|
| 249 |
|
| 250 |
|
| 251 |
|
|
|
|
| 252 |
# --------------------- Gradio ---------------------
|
| 253 |
with gr.Blocks() as demo:
|
| 254 |
gr.Markdown("Face Swap")
|
|
|
|
| 147 |
# --------------------- Face Swap Pipeline ---------------------
|
| 148 |
swap_lock = threading.Lock()
|
| 149 |
|
| 150 |
+
def match_histogram(src_face, tgt_face):
|
| 151 |
+
"""Match the histogram of src_face to that of tgt_face for color harmony."""
|
| 152 |
+
matched = cv2.cvtColor(src_face, cv2.COLOR_BGR2LAB)
|
| 153 |
+
target_lab = cv2.cvtColor(tgt_face, cv2.COLOR_BGR2LAB)
|
| 154 |
+
for i in range(3):
|
| 155 |
+
matched[..., i] = cv2.equalizeHist(matched[..., i])
|
| 156 |
+
matched[..., i] = cv2.matchTemplate(matched[..., i], target_lab[..., i], cv2.TM_CCOEFF_NORMED)
|
| 157 |
+
return cv2.cvtColor(matched, cv2.COLOR_LAB2BGR)
|
| 158 |
+
|
| 159 |
+
def find_mask_center(mask):
|
| 160 |
+
"""Compute center for seamlessClone using mask moments."""
|
| 161 |
+
moments = cv2.moments(mask)
|
| 162 |
+
if moments["m00"] != 0:
|
| 163 |
+
center_x = int(moments["m10"] / moments["m00"])
|
| 164 |
+
center_y = int(moments["m01"] / moments["m00"])
|
| 165 |
+
return (center_x, center_y)
|
| 166 |
+
else:
|
| 167 |
+
# Fallback to geometric center
|
| 168 |
+
h, w = mask.shape
|
| 169 |
+
return (w // 2, h // 2)
|
| 170 |
+
|
| 171 |
def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap_work"):
|
| 172 |
try:
|
| 173 |
with swap_lock:
|
| 174 |
+
# Cleanup temp and prepare dirs
|
| 175 |
if os.path.exists(temp_dir):
|
| 176 |
shutil.rmtree(temp_dir)
|
| 177 |
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
| 178 |
src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
|
| 179 |
tgt_bgr_full = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
|
| 180 |
|
| 181 |
src_faces = face_analysis_app.get(src_bgr)
|
| 182 |
tgt_faces = face_analysis_app.get(tgt_bgr_full)
|
|
|
|
| 183 |
if not src_faces or not tgt_faces:
|
| 184 |
return None, None, "β Face not detected in source or target image"
|
| 185 |
|
|
|
|
| 198 |
src_face0 = src_faces[0]
|
| 199 |
tgt_face0 = tgt_faces[0]
|
| 200 |
|
| 201 |
+
# Source crop for precise landmark
|
| 202 |
s_x1, s_y1, s_x2, s_y2 = expand_bbox(src_face0.bbox, src_bgr.shape, scale=1.4)
|
| 203 |
src_crop = src_bgr[s_y1:s_y2, s_x1:s_x2]
|
| 204 |
src_crop_faces = face_analysis_app.get(src_crop)
|
|
|
|
| 209 |
src_for_swap = src_bgr
|
| 210 |
src_face_for_swap = src_face0
|
| 211 |
|
| 212 |
+
# Target crop for even more precision
|
| 213 |
+
t_x1, t_y1, t_x2, t_y2 = expand_bbox(tgt_face0.bbox, tgt_bgr_full.shape, scale=1.7)
|
| 214 |
tgt_crop = tgt_bgr_full[t_y1:t_y2, t_x1:t_x2]
|
| 215 |
tgt_crop_faces = face_analysis_app.get(tgt_crop)
|
|
|
|
| 216 |
if tgt_crop_faces:
|
| 217 |
tgt_for_swap = tgt_crop
|
| 218 |
tgt_face_for_swap = tgt_crop_faces[0]
|
| 219 |
+
# Face swap
|
| 220 |
swapped_crop = swapper.get(tgt_for_swap, tgt_face_for_swap, src_face_for_swap)
|
| 221 |
if swapped_crop is None:
|
| 222 |
return None, None, "β Face swap failed on crop"
|
| 223 |
+
# Histogram matching for color realism
|
| 224 |
+
swapped_crop = match_histogram(swapped_crop, tgt_for_swap)
|
| 225 |
+
# Mask for seamless blending
|
| 226 |
mask = cv2.cvtColor(swapped_crop, cv2.COLOR_BGR2GRAY)
|
| 227 |
+
mask = cv2.threshold(mask, 1, 255, cv2.THRESH_BINARY)[1]
|
| 228 |
+
mask = cv2.GaussianBlur(mask, (15, 15), 8)
|
| 229 |
+
# Get the center using moments
|
| 230 |
+
center = find_mask_center(mask)
|
| 231 |
+
# Smoothing step to reduce artifacts
|
| 232 |
+
swapped_crop = cv2.bilateralFilter(swapped_crop, 9, 75, 75)
|
| 233 |
+
# Seamless clone for perfect embedding
|
| 234 |
try:
|
| 235 |
+
blended = cv2.seamlessClone(swapped_crop, tgt_bgr_full, mask, (t_x1 + center[0], t_y1 + center[1]), cv2.NORMAL_CLONE)
|
| 236 |
except Exception:
|
|
|
|
| 237 |
blended = tgt_bgr_full.copy()
|
| 238 |
h, w = swapped_crop.shape[:2]
|
| 239 |
blended[t_y1:t_y1+h, t_x1:t_x1+w] = swapped_crop
|
|
|
|
| 240 |
swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
|
| 241 |
cv2.imwrite(swapped_path, blended)
|
|
|
|
| 242 |
else:
|
| 243 |
+
# Fallback: swap on full image
|
| 244 |
swapped_bgr_full = swapper.get(tgt_bgr_full, tgt_face0, src_face0)
|
| 245 |
if swapped_bgr_full is None:
|
| 246 |
return None, None, "β Face swap failed on full image"
|
| 247 |
swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
|
| 248 |
cv2.imwrite(swapped_path, swapped_bgr_full)
|
| 249 |
+
# CodeFormer enhancement post-processing as last step
|
|
|
|
| 250 |
cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample"
|
| 251 |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
| 252 |
if result.returncode != 0:
|
| 253 |
return None, None, f"β CodeFormer failed:\n{result.stderr}"
|
|
|
|
| 254 |
final_results_dir = os.path.join(temp_dir, "final_results")
|
| 255 |
final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")]
|
| 256 |
if not final_files:
|
| 257 |
return None, None, "β No enhanced image found"
|
|
|
|
| 258 |
final_path = os.path.join(final_results_dir, final_files[0])
|
| 259 |
final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
|
|
|
|
| 260 |
return final_img, final_path, ""
|
|
|
|
| 261 |
except Exception as e:
|
| 262 |
return None, None, f"β Error: {str(e)}"
|
| 263 |
|
| 264 |
|
| 265 |
|
| 266 |
+
|
| 267 |
# --------------------- Gradio ---------------------
|
| 268 |
with gr.Blocks() as demo:
|
| 269 |
gr.Markdown("Face Swap")
|