Spaces:
Running
on
T4
Running
on
T4
Update app.py
Browse files
app.py
CHANGED
|
@@ -147,123 +147,116 @@ async def log_faceswap_hit(token: str, status: str = "success"):
|
|
| 147 |
# --------------------- Face Swap Pipeline ---------------------
|
| 148 |
swap_lock = threading.Lock()
|
| 149 |
|
| 150 |
-
def match_histogram(src_face, tgt_face):
|
| 151 |
-
"""Match the histogram of src_face to that of tgt_face for color harmony."""
|
| 152 |
-
matched = cv2.cvtColor(src_face, cv2.COLOR_BGR2LAB)
|
| 153 |
-
target_lab = cv2.cvtColor(tgt_face, cv2.COLOR_BGR2LAB)
|
| 154 |
-
for i in range(3):
|
| 155 |
-
matched[..., i] = cv2.equalizeHist(matched[..., i])
|
| 156 |
-
matched[..., i] = cv2.matchTemplate(matched[..., i], target_lab[..., i], cv2.TM_CCOEFF_NORMED)
|
| 157 |
-
return cv2.cvtColor(matched, cv2.COLOR_LAB2BGR)
|
| 158 |
-
|
| 159 |
-
def find_mask_center(mask):
|
| 160 |
-
"""Compute center for seamlessClone using mask moments."""
|
| 161 |
-
moments = cv2.moments(mask)
|
| 162 |
-
if moments["m00"] != 0:
|
| 163 |
-
center_x = int(moments["m10"] / moments["m00"])
|
| 164 |
-
center_y = int(moments["m01"] / moments["m00"])
|
| 165 |
-
return (center_x, center_y)
|
| 166 |
-
else:
|
| 167 |
-
# Fallback to geometric center
|
| 168 |
-
h, w = mask.shape
|
| 169 |
-
return (w // 2, h // 2)
|
| 170 |
-
|
| 171 |
def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap_work"):
|
| 172 |
try:
|
| 173 |
with swap_lock:
|
| 174 |
-
#
|
| 175 |
if os.path.exists(temp_dir):
|
| 176 |
shutil.rmtree(temp_dir)
|
| 177 |
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
| 178 |
src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
|
| 179 |
-
|
| 180 |
|
| 181 |
src_faces = face_analysis_app.get(src_bgr)
|
| 182 |
-
tgt_faces = face_analysis_app.get(
|
|
|
|
| 183 |
if not src_faces or not tgt_faces:
|
| 184 |
return None, None, "❌ Face not detected in source or target image"
|
| 185 |
|
| 186 |
-
|
| 187 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 188 |
x1, y1, x2, y2 = map(int, bbox)
|
| 189 |
-
|
| 190 |
-
cx, cy = x1 +
|
| 191 |
-
new_w, new_h = int(
|
| 192 |
-
nx1 = max(0, cx - new_w // 2)
|
| 193 |
-
|
| 194 |
-
nx2 = min(iw, cx + new_w // 2)
|
| 195 |
-
ny2 = min(ih, cy + new_h // 2)
|
| 196 |
return nx1, ny1, nx2, ny2
|
| 197 |
|
| 198 |
-
|
| 199 |
-
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
|
| 203 |
-
|
| 204 |
-
|
| 205 |
-
|
| 206 |
-
|
| 207 |
-
|
| 208 |
-
|
| 209 |
-
|
| 210 |
-
|
| 211 |
-
|
| 212 |
-
#
|
| 213 |
-
|
| 214 |
-
|
| 215 |
-
|
| 216 |
-
|
| 217 |
-
|
| 218 |
-
|
| 219 |
-
|
| 220 |
-
|
| 221 |
-
|
| 222 |
-
|
| 223 |
-
|
| 224 |
-
|
| 225 |
-
|
| 226 |
-
|
| 227 |
-
|
| 228 |
-
|
| 229 |
-
|
| 230 |
-
|
| 231 |
-
|
| 232 |
-
|
| 233 |
-
|
| 234 |
-
|
| 235 |
-
|
| 236 |
-
|
| 237 |
-
|
| 238 |
-
|
| 239 |
-
|
| 240 |
-
|
| 241 |
-
|
| 242 |
-
|
| 243 |
-
|
| 244 |
-
|
| 245 |
-
|
| 246 |
-
|
| 247 |
-
|
| 248 |
-
|
| 249 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 250 |
cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample"
|
| 251 |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
| 252 |
if result.returncode != 0:
|
| 253 |
return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
|
| 254 |
-
|
| 255 |
-
|
|
|
|
| 256 |
if not final_files:
|
| 257 |
-
return None, None, "❌ No enhanced
|
| 258 |
-
final_path = os.path.join(
|
|
|
|
| 259 |
final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
|
| 260 |
return final_img, final_path, ""
|
|
|
|
| 261 |
except Exception as e:
|
| 262 |
return None, None, f"❌ Error: {str(e)}"
|
| 263 |
|
| 264 |
|
| 265 |
|
| 266 |
|
|
|
|
| 267 |
# --------------------- Gradio ---------------------
|
| 268 |
with gr.Blocks() as demo:
|
| 269 |
gr.Markdown("Face Swap")
|
|
|
|
| 147 |
# --------------------- Face Swap Pipeline ---------------------
|
| 148 |
swap_lock = threading.Lock()
|
| 149 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 150 |
def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap_work"):
|
| 151 |
try:
|
| 152 |
with swap_lock:
|
| 153 |
+
# Clean workspace
|
| 154 |
if os.path.exists(temp_dir):
|
| 155 |
shutil.rmtree(temp_dir)
|
| 156 |
os.makedirs(temp_dir, exist_ok=True)
|
| 157 |
+
|
| 158 |
src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
|
| 159 |
+
tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
|
| 160 |
|
| 161 |
src_faces = face_analysis_app.get(src_bgr)
|
| 162 |
+
tgt_faces = face_analysis_app.get(tgt_bgr)
|
| 163 |
+
|
| 164 |
if not src_faces or not tgt_faces:
|
| 165 |
return None, None, "❌ Face not detected in source or target image"
|
| 166 |
|
| 167 |
+
src_face = src_faces[0]
|
| 168 |
+
tgt_face = tgt_faces[0]
|
| 169 |
+
|
| 170 |
+
# --- Adaptive expansion based on face size ---
|
| 171 |
+
def adaptive_bbox(bbox, shape, factor=1.7):
|
| 172 |
+
h, w = shape[:2]
|
| 173 |
x1, y1, x2, y2 = map(int, bbox)
|
| 174 |
+
bw, bh = x2 - x1, y2 - y1
|
| 175 |
+
cx, cy = x1 + bw // 2, y1 + bh // 2
|
| 176 |
+
new_w, new_h = int(bw * factor), int(bh * factor)
|
| 177 |
+
nx1, ny1 = max(0, cx - new_w // 2), max(0, cy - new_h // 2)
|
| 178 |
+
nx2, ny2 = min(w, cx + new_w // 2), min(h, cy + new_h // 2)
|
|
|
|
|
|
|
| 179 |
return nx1, ny1, nx2, ny2
|
| 180 |
|
| 181 |
+
sx1, sy1, sx2, sy2 = adaptive_bbox(src_face.bbox, src_bgr.shape)
|
| 182 |
+
tx1, ty1, tx2, ty2 = adaptive_bbox(tgt_face.bbox, tgt_bgr.shape)
|
| 183 |
+
|
| 184 |
+
src_crop = src_bgr[sy1:sy2, sx1:sx2]
|
| 185 |
+
tgt_crop = tgt_bgr[ty1:ty2, tx1:tx2]
|
| 186 |
+
|
| 187 |
+
src_face_crop = face_analysis_app.get(src_crop)
|
| 188 |
+
tgt_face_crop = face_analysis_app.get(tgt_crop)
|
| 189 |
+
if not src_face_crop or not tgt_face_crop:
|
| 190 |
+
return None, None, "❌ Re-detection failed in cropped region"
|
| 191 |
+
|
| 192 |
+
src_face_refined = src_face_crop[0]
|
| 193 |
+
tgt_face_refined = tgt_face_crop[0]
|
| 194 |
+
|
| 195 |
+
# --- Alignment correction using landmarks (similarity transform) ---
|
| 196 |
+
def align_face(img, src_pts, tgt_pts):
|
| 197 |
+
M, _ = cv2.estimateAffinePartial2D(src_pts, tgt_pts)
|
| 198 |
+
if M is not None:
|
| 199 |
+
aligned = cv2.warpAffine(img, M, (img.shape[1], img.shape[0]), flags=cv2.INTER_LINEAR)
|
| 200 |
+
return aligned
|
| 201 |
+
return img
|
| 202 |
+
|
| 203 |
+
src_pts = src_face_refined.landmark_2d_106[:68].astype(np.float32)
|
| 204 |
+
tgt_pts = tgt_face_refined.landmark_2d_106[:68].astype(np.float32)
|
| 205 |
+
src_aligned = align_face(src_crop, src_pts, tgt_pts)
|
| 206 |
+
|
| 207 |
+
# --- Perform swap on aligned faces ---
|
| 208 |
+
swapped_crop = swapper.get(tgt_crop, tgt_face_refined, src_face_refined)
|
| 209 |
+
if swapped_crop is None:
|
| 210 |
+
return None, None, "❌ Swap failed"
|
| 211 |
+
|
| 212 |
+
# --- Feathered mask for smooth blending ---
|
| 213 |
+
gray = cv2.cvtColor(swapped_crop, cv2.COLOR_BGR2GRAY)
|
| 214 |
+
mask = np.where(gray > 5, 255, 0).astype(np.uint8)
|
| 215 |
+
mask = cv2.GaussianBlur(mask, (41, 41), 15)
|
| 216 |
+
|
| 217 |
+
# --- Tone normalization between faces ---
|
| 218 |
+
def match_tone(src, ref):
|
| 219 |
+
src_mean, src_std = cv2.meanStdDev(src)
|
| 220 |
+
ref_mean, ref_std = cv2.meanStdDev(ref)
|
| 221 |
+
adj = (src - src_mean) * (ref_std / (src_std + 1e-6)) + ref_mean
|
| 222 |
+
return np.clip(adj, 0, 255).astype(np.uint8)
|
| 223 |
+
|
| 224 |
+
swapped_tone = match_tone(swapped_crop, tgt_crop)
|
| 225 |
+
|
| 226 |
+
# --- Seamless composite ---
|
| 227 |
+
center = (tx1 + (tx2 - tx1)//2, ty1 + (ty2 - ty1)//2)
|
| 228 |
+
try:
|
| 229 |
+
blended = cv2.seamlessClone(swapped_tone, tgt_bgr, mask, center, cv2.NORMAL_CLONE)
|
| 230 |
+
except Exception:
|
| 231 |
+
blended = tgt_bgr.copy()
|
| 232 |
+
h, w = swapped_tone.shape[:2]
|
| 233 |
+
blended[ty1:ty1+h, tx1:tx1+w] = swapped_tone
|
| 234 |
+
|
| 235 |
+
swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex}.jpg")
|
| 236 |
+
cv2.imwrite(swapped_path, blended)
|
| 237 |
+
|
| 238 |
+
# --- CodeFormer enhancement ---
|
| 239 |
cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample"
|
| 240 |
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
|
| 241 |
if result.returncode != 0:
|
| 242 |
return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
|
| 243 |
+
|
| 244 |
+
final_dir = os.path.join(temp_dir, "final_results")
|
| 245 |
+
final_files = [f for f in os.listdir(final_dir) if f.endswith(".png")]
|
| 246 |
if not final_files:
|
| 247 |
+
return None, None, "❌ No enhanced output found"
|
| 248 |
+
final_path = os.path.join(final_dir, final_files[0])
|
| 249 |
+
|
| 250 |
final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
|
| 251 |
return final_img, final_path, ""
|
| 252 |
+
|
| 253 |
except Exception as e:
|
| 254 |
return None, None, f"❌ Error: {str(e)}"
|
| 255 |
|
| 256 |
|
| 257 |
|
| 258 |
|
| 259 |
+
|
| 260 |
# --------------------- Gradio ---------------------
|
| 261 |
with gr.Blocks() as demo:
|
| 262 |
gr.Markdown("Face Swap")
|