LogicGoInfotechSpaces commited on
Commit
13e7046
·
verified ·
1 Parent(s): 5fc122d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +383 -168
app.py CHANGED
@@ -162,7 +162,7 @@ def enhance_image_with_codeformer(rgb_img, temp_dir=None):
162
  python_cmd = sys.executable if sys.executable else "python3"
163
  cmd = (
164
  f"{python_cmd} {CODEFORMER_PATH} "
165
- f"-w 0.4 "
166
  f"--input_path {input_path} "
167
  f"--output_path {temp_dir} "
168
  f"--bg_upsampler realesrgan "
@@ -182,168 +182,73 @@ def enhance_image_with_codeformer(rgb_img, temp_dir=None):
182
  enhanced = cv2.imread(final_path)
183
  return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB)
184
 
185
- import numpy as np
186
- from sklearn.metrics.pairwise import cosine_similarity
187
-
188
- SIM_THRESHOLD = 0.35 # strict
189
- MAX_YAW = 35
190
- MAX_PITCH = 25
191
- MIN_FACE_AREA = 10000
192
-
193
-
194
- def face_ok(face):
195
- yaw, pitch, roll = face.pose
196
- x1, y1, x2, y2 = face.bbox
197
- area = (x2 - x1) * (y2 - y1)
198
-
199
- return (
200
- abs(yaw) <= MAX_YAW and
201
- abs(pitch) <= MAX_PITCH and
202
- area >= MIN_FACE_AREA
203
- )
204
-
205
-
206
- def quality_score(face):
207
- yaw, pitch, roll = face.pose
208
- x1, y1, x2, y2 = face.bbox
209
- area = (x2 - x1) * (y2 - y1)
210
- return area - abs(yaw)*200 - abs(pitch)*150
211
-
212
-
213
- def multi_face_swap_industry(src_img, tgt_img):
214
- """
215
- Industry-grade 2-face swap
216
- Returns swapped RGB image or raises error
217
- """
218
-
219
  src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
220
  tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
221
 
222
  src_faces = face_analysis_app.get(src_bgr)
223
  tgt_faces = face_analysis_app.get(tgt_bgr)
224
 
225
- if len(src_faces) < 1 or len(tgt_faces) < 1:
226
  raise ValueError("No faces detected")
227
 
228
- if len(src_faces) > 2 or len(tgt_faces) > 2:
229
- raise ValueError("More than 2 faces detected (blocked)")
230
-
231
- # ---- Quality filtering ----
232
- src_faces = [f for f in src_faces if face_ok(f)]
233
- tgt_faces = [f for f in tgt_faces if face_ok(f)]
234
 
235
- if not src_faces or not tgt_faces:
236
- raise ValueError("Faces rejected due to pose/quality")
 
237
 
238
- # ---- Sort by quality (best first) ----
239
- src_faces = sorted(src_faces, key=quality_score, reverse=True)
240
- tgt_faces = sorted(tgt_faces, key=quality_score, reverse=True)
241
 
242
- # ---- Extract embeddings ----
243
- src_embs = np.array([f.normed_embedding for f in src_faces])
244
- tgt_embs = np.array([f.normed_embedding for f in tgt_faces])
245
 
246
- sim = cosine_similarity(src_embs, tgt_embs)
 
247
 
248
- used_tgts = set()
249
  pairs = []
250
 
251
- for i in range(len(src_faces)):
252
- j = np.argmax(sim[i])
253
- score = sim[i][j]
254
 
255
- if score < SIM_THRESHOLD:
256
- raise ValueError(f"Low identity similarity ({score:.2f})")
257
 
258
- if j in used_tgts:
259
- continue
 
 
 
260
 
261
- pairs.append((src_faces[i], tgt_faces[j], score))
262
- used_tgts.add(j)
263
 
264
- if not pairs:
265
- raise ValueError("No valid face pairs")
 
 
266
 
267
- # ---- Swap WITHOUT re-detection ----
268
- result = tgt_bgr.copy()
 
 
269
 
270
- for src_face, tgt_face, score in pairs:
271
- result = swapper.get(
272
- result,
273
- tgt_face,
 
274
  src_face,
275
  paste_back=True
276
  )
277
 
278
- return cv2.cvtColor(result, cv2.COLOR_BGR2RGB)
279
-
280
- # def multi_face_swap(src_img, tgt_img):
281
- # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
282
- # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
283
-
284
- # src_faces = face_analysis_app.get(src_bgr)
285
- # tgt_faces = face_analysis_app.get(tgt_bgr)
286
-
287
- # if not src_faces or not tgt_faces:
288
- # raise ValueError("No faces detected")
289
-
290
- # def face_sort_key(face):
291
- # x1, y1, x2, y2 = face.bbox
292
- # area = (x2 - x1) * (y2 - y1)
293
- # cx = (x1 + x2) / 2
294
- # return (-area, cx)
295
-
296
- # # Split by gender
297
- # src_male = [f for f in src_faces if f.gender == 1]
298
- # src_female = [f for f in src_faces if f.gender == 0]
299
-
300
- # tgt_male = [f for f in tgt_faces if f.gender == 1]
301
- # tgt_female = [f for f in tgt_faces if f.gender == 0]
302
-
303
- # # Sort inside gender groups
304
- # src_male = sorted(src_male, key=face_sort_key)
305
- # src_female = sorted(src_female, key=face_sort_key)
306
-
307
- # tgt_male = sorted(tgt_male, key=face_sort_key)
308
- # tgt_female = sorted(tgt_female, key=face_sort_key)
309
-
310
- # # Build final swap pairs
311
- # pairs = []
312
-
313
- # for s, t in zip(src_male, tgt_male):
314
- # pairs.append((s, t))
315
-
316
- # for s, t in zip(src_female, tgt_female):
317
- # pairs.append((s, t))
318
-
319
- # # Fallback if gender mismatch
320
- # if not pairs:
321
- # src_faces = sorted(src_faces, key=face_sort_key)
322
- # tgt_faces = sorted(tgt_faces, key=face_sort_key)
323
- # pairs = list(zip(src_faces, tgt_faces))
324
-
325
- # result_img = tgt_bgr.copy()
326
-
327
- # for src_face, _ in pairs:
328
- # # 🔁 re-detect current target faces
329
- # current_faces = face_analysis_app.get(result_img)
330
- # current_faces = sorted(current_faces, key=face_sort_key)
331
-
332
- # # choose best matching gender
333
- # candidates = [
334
- # f for f in current_faces if f.gender == src_face.gender
335
- # ] or current_faces
336
-
337
- # target_face = candidates[0]
338
-
339
- # result_img = swapper.get(
340
- # result_img,
341
- # target_face,
342
- # src_face,
343
- # paste_back=True
344
- # )
345
-
346
- # return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB)
347
 
348
 
349
 
@@ -366,16 +271,14 @@ def face_swap_and_enhance(src_img, tgt_img, temp_dir=None):
366
  return None, None, "❌ Face not detected in one of the images"
367
 
368
  swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
369
- swapped_rgb = multi_face_swap_industry(src_img, tgt_img)
370
- swapped_bgr = cv2.cvtColor(swapped_rgb, cv2.COLOR_RGB2BGR)
371
-
372
  if swapped_bgr is None:
373
  return None, None, "❌ Face swap failed"
374
 
375
  cv2.imwrite(swapped_path, swapped_bgr)
376
 
377
  python_cmd = sys.executable if sys.executable else "python3"
378
- cmd = f"{python_cmd} {CODEFORMER_PATH} -w 0.4 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample"
379
  result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
380
  if result.returncode != 0:
381
  return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
@@ -476,7 +379,7 @@ def build_multi_faceswap_gradio():
476
 
477
  def process(src_img, tgt_img):
478
  try:
479
- swapped = multi_face_swap_industry(src_img, tgt_img)
480
  enhanced = enhance_image_with_codeformer(swapped)
481
  return enhanced, ""
482
  except Exception as e:
@@ -896,7 +799,6 @@ fastapi_app = mount_gradio_app(
896
  if __name__ == "__main__":
897
  uvicorn.run(fastapi_app, host="0.0.0.0", port=7860)
898
 
899
-
900
  # # --------------------- List Images Endpoint ---------------------
901
  # import os
902
  # os.environ["OMP_NUM_THREADS"] = "1"
@@ -907,6 +809,8 @@ if __name__ == "__main__":
907
  # import threading
908
  # import subprocess
909
  # import logging
 
 
910
  # from datetime import datetime,timedelta
911
 
912
  # import insightface
@@ -917,10 +821,13 @@ if __name__ == "__main__":
917
  # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
918
  # from motor.motor_asyncio import AsyncIOMotorClient
919
  # from bson import ObjectId
920
- # import requests
 
921
  # import uvicorn
922
  # import gradio as gr
923
  # from gradio import mount_gradio_app
 
 
924
 
925
  # # DigitalOcean Spaces
926
  # import boto3
@@ -1045,10 +952,208 @@ if __name__ == "__main__":
1045
  # # --------------------- Face Swap Pipeline ---------------------
1046
  # swap_lock = threading.Lock()
1047
 
1048
- # def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap_work"):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1049
  # try:
1050
  # with swap_lock:
1051
  # # Use a temp dir for intermediate files
 
 
1052
  # if os.path.exists(temp_dir):
1053
  # shutil.rmtree(temp_dir)
1054
  # os.makedirs(temp_dir, exist_ok=True)
@@ -1062,13 +1167,16 @@ if __name__ == "__main__":
1062
  # return None, None, "❌ Face not detected in one of the images"
1063
 
1064
  # swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
1065
- # swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0])
 
 
1066
  # if swapped_bgr is None:
1067
  # return None, None, "❌ Face swap failed"
1068
 
1069
  # cv2.imwrite(swapped_path, swapped_bgr)
1070
 
1071
- # cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample"
 
1072
  # result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
1073
  # if result.returncode != 0:
1074
  # return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
@@ -1079,31 +1187,59 @@ if __name__ == "__main__":
1079
  # return None, None, "❌ No enhanced image found"
1080
 
1081
  # final_path = os.path.join(final_results_dir, final_files[0])
1082
- # final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB)
 
 
 
1083
 
1084
  # return final_img, final_path, ""
1085
 
1086
  # except Exception as e:
1087
  # return None, None, f"❌ Error: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1088
 
1089
  # # --------------------- Gradio ---------------------
1090
- # with gr.Blocks() as demo:
1091
- # gr.Markdown("Face Swap")
1092
 
1093
- # with gr.Row():
1094
- # src_input = gr.Image(type="numpy", label="Upload Your Face")
1095
- # tgt_input = gr.Image(type="numpy", label="Upload Target Image")
1096
 
1097
- # btn = gr.Button("Swap Face")
1098
- # output_img = gr.Image(type="numpy", label="Enhanced Output")
1099
- # download = gr.File(label="⬇️ Download Enhanced Image")
1100
- # error_box = gr.Textbox(label="Logs / Errors", interactive=False)
1101
 
1102
- # def process(src, tgt):
1103
- # img, path, err = face_swap_and_enhance(src, tgt)
1104
- # return img, path, err
1105
 
1106
- # btn.click(process, [src_input, tgt_input], [output_img, download, error_box])
1107
 
1108
  # # --------------------- DigitalOcean Spaces Helper ---------------------
1109
  # def get_spaces_client():
@@ -1128,10 +1264,34 @@ if __name__ == "__main__":
1128
  # obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key)
1129
  # return obj['Body'].read()
1130
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1131
  # # --------------------- API Endpoints ---------------------
1132
  # @fastapi_app.get("/")
1133
  # def root():
1134
- # return RedirectResponse("/gradio")
1135
 
1136
  # @fastapi_app.get("/health")
1137
  # async def health():
@@ -1221,12 +1381,19 @@ if __name__ == "__main__":
1221
  # # ------------------------------------------------------------------#
1222
  # if user_id:
1223
  # try:
1224
- # user_oid = ObjectId(user_id.strip())
 
 
 
 
 
 
 
 
1225
  # now = datetime.utcnow()
1226
 
1227
  # # Normalize dates (UTC midnight)
1228
  # today_date = datetime(now.year, now.month, now.day)
1229
- # yesterday_date = today_date - timedelta(days=1)
1230
 
1231
  # # -------------------------------------------------
1232
  # # STEP 1: Ensure root document exists
@@ -1340,6 +1507,29 @@ if __name__ == "__main__":
1340
  # }
1341
  # }
1342
  # )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1343
 
1344
  # logger.info(
1345
  # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked",
@@ -1410,7 +1600,10 @@ if __name__ == "__main__":
1410
  # # # ------------------------------------------------------------------
1411
  # # # DOWNLOAD TARGET IMAGE
1412
  # # # ------------------------------------------------------------------
1413
- # tgt_bytes = requests.get(target_url).content
 
 
 
1414
 
1415
  # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR)
1416
  # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR)
@@ -1433,6 +1626,21 @@ if __name__ == "__main__":
1433
 
1434
  # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png"
1435
  # result_url = upload_to_spaces(result_bytes, result_key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1436
  # end_time = datetime.utcnow()
1437
  # response_time_ms = (end_time - start_time).total_seconds() * 1000
1438
 
@@ -1447,7 +1655,8 @@ if __name__ == "__main__":
1447
 
1448
  # return {
1449
  # "result_key": result_key,
1450
- # "result_url": result_url
 
1451
  # }
1452
 
1453
  # except Exception as e:
@@ -1477,7 +1686,13 @@ if __name__ == "__main__":
1477
  # headers={"Content-Disposition": "inline; filename=result.png"}
1478
  # )
1479
  # # --------------------- Mount Gradio ---------------------
1480
- # fastapi_app = mount_gradio_app(fastapi_app, demo, path="/gradio")
 
 
 
 
 
 
1481
 
1482
  # if __name__ == "__main__":
1483
  # uvicorn.run(fastapi_app, host="0.0.0.0", port=7860)
 
162
  python_cmd = sys.executable if sys.executable else "python3"
163
  cmd = (
164
  f"{python_cmd} {CODEFORMER_PATH} "
165
+ f"-w 0.7 "
166
  f"--input_path {input_path} "
167
  f"--output_path {temp_dir} "
168
  f"--bg_upsampler realesrgan "
 
182
  enhanced = cv2.imread(final_path)
183
  return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB)
184
 
185
+ def multi_face_swap(src_img, tgt_img):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
  src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
187
  tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
188
 
189
  src_faces = face_analysis_app.get(src_bgr)
190
  tgt_faces = face_analysis_app.get(tgt_bgr)
191
 
192
+ if not src_faces or not tgt_faces:
193
  raise ValueError("No faces detected")
194
 
195
+ def face_sort_key(face):
196
+ x1, y1, x2, y2 = face.bbox
197
+ area = (x2 - x1) * (y2 - y1)
198
+ cx = (x1 + x2) / 2
199
+ return (-area, cx)
 
200
 
201
+ # Split by gender
202
+ src_male = [f for f in src_faces if f.gender == 1]
203
+ src_female = [f for f in src_faces if f.gender == 0]
204
 
205
+ tgt_male = [f for f in tgt_faces if f.gender == 1]
206
+ tgt_female = [f for f in tgt_faces if f.gender == 0]
 
207
 
208
+ # Sort inside gender groups
209
+ src_male = sorted(src_male, key=face_sort_key)
210
+ src_female = sorted(src_female, key=face_sort_key)
211
 
212
+ tgt_male = sorted(tgt_male, key=face_sort_key)
213
+ tgt_female = sorted(tgt_female, key=face_sort_key)
214
 
215
+ # Build final swap pairs
216
  pairs = []
217
 
218
+ for s, t in zip(src_male, tgt_male):
219
+ pairs.append((s, t))
 
220
 
221
+ for s, t in zip(src_female, tgt_female):
222
+ pairs.append((s, t))
223
 
224
+ # Fallback if gender mismatch
225
+ if not pairs:
226
+ src_faces = sorted(src_faces, key=face_sort_key)
227
+ tgt_faces = sorted(tgt_faces, key=face_sort_key)
228
+ pairs = list(zip(src_faces, tgt_faces))
229
 
230
+ result_img = tgt_bgr.copy()
 
231
 
232
+ for src_face, _ in pairs:
233
+ # 🔁 re-detect current target faces
234
+ current_faces = face_analysis_app.get(result_img)
235
+ current_faces = sorted(current_faces, key=face_sort_key)
236
 
237
+ # choose best matching gender
238
+ candidates = [
239
+ f for f in current_faces if f.gender == src_face.gender
240
+ ] or current_faces
241
 
242
+ target_face = candidates[0]
243
+
244
+ result_img = swapper.get(
245
+ result_img,
246
+ target_face,
247
  src_face,
248
  paste_back=True
249
  )
250
 
251
+ return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
252
 
253
 
254
 
 
271
  return None, None, "❌ Face not detected in one of the images"
272
 
273
  swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
274
+ swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0])
 
 
275
  if swapped_bgr is None:
276
  return None, None, "❌ Face swap failed"
277
 
278
  cv2.imwrite(swapped_path, swapped_bgr)
279
 
280
  python_cmd = sys.executable if sys.executable else "python3"
281
+ cmd = f"{python_cmd} {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample"
282
  result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
283
  if result.returncode != 0:
284
  return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
 
379
 
380
  def process(src_img, tgt_img):
381
  try:
382
+ swapped = multi_face_swap(src_img, tgt_img)
383
  enhanced = enhance_image_with_codeformer(swapped)
384
  return enhanced, ""
385
  except Exception as e:
 
799
  if __name__ == "__main__":
800
  uvicorn.run(fastapi_app, host="0.0.0.0", port=7860)
801
 
 
802
  # # --------------------- List Images Endpoint ---------------------
803
  # import os
804
  # os.environ["OMP_NUM_THREADS"] = "1"
 
809
  # import threading
810
  # import subprocess
811
  # import logging
812
+ # import tempfile
813
+ # import sys
814
  # from datetime import datetime,timedelta
815
 
816
  # import insightface
 
821
  # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
822
  # from motor.motor_asyncio import AsyncIOMotorClient
823
  # from bson import ObjectId
824
+ # from bson.errors import InvalidId
825
+ # import httpx
826
  # import uvicorn
827
  # import gradio as gr
828
  # from gradio import mount_gradio_app
829
+ # from PIL import Image
830
+ # import io
831
 
832
  # # DigitalOcean Spaces
833
  # import boto3
 
952
  # # --------------------- Face Swap Pipeline ---------------------
953
  # swap_lock = threading.Lock()
954
 
955
+ # def enhance_image_with_codeformer(rgb_img, temp_dir=None):
956
+ # if temp_dir is None:
957
+ # temp_dir = os.path.join(tempfile.gettempdir(), f"enhance_{uuid.uuid4().hex[:8]}")
958
+ # os.makedirs(temp_dir, exist_ok=True)
959
+
960
+ # input_path = os.path.join(temp_dir, "input.jpg")
961
+ # cv2.imwrite(input_path, cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR))
962
+
963
+ # python_cmd = sys.executable if sys.executable else "python3"
964
+ # cmd = (
965
+ # f"{python_cmd} {CODEFORMER_PATH} "
966
+ # f"-w 0.4 "
967
+ # f"--input_path {input_path} "
968
+ # f"--output_path {temp_dir} "
969
+ # f"--bg_upsampler realesrgan "
970
+ # f"--face_upsample"
971
+ # )
972
+
973
+ # result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
974
+ # if result.returncode != 0:
975
+ # raise RuntimeError(result.stderr)
976
+
977
+ # final_dir = os.path.join(temp_dir, "final_results")
978
+ # files = [f for f in os.listdir(final_dir) if f.endswith(".png")]
979
+ # if not files:
980
+ # raise RuntimeError("No enhanced output")
981
+
982
+ # final_path = os.path.join(final_dir, files[0])
983
+ # enhanced = cv2.imread(final_path)
984
+ # return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB)
985
+
986
+ # import numpy as np
987
+ # from sklearn.metrics.pairwise import cosine_similarity
988
+
989
+ # SIM_THRESHOLD = 0.35 # strict
990
+ # MAX_YAW = 35
991
+ # MAX_PITCH = 25
992
+ # MIN_FACE_AREA = 10000
993
+
994
+
995
+ # def face_ok(face):
996
+ # yaw, pitch, roll = face.pose
997
+ # x1, y1, x2, y2 = face.bbox
998
+ # area = (x2 - x1) * (y2 - y1)
999
+
1000
+ # return (
1001
+ # abs(yaw) <= MAX_YAW and
1002
+ # abs(pitch) <= MAX_PITCH and
1003
+ # area >= MIN_FACE_AREA
1004
+ # )
1005
+
1006
+
1007
+ # def quality_score(face):
1008
+ # yaw, pitch, roll = face.pose
1009
+ # x1, y1, x2, y2 = face.bbox
1010
+ # area = (x2 - x1) * (y2 - y1)
1011
+ # return area - abs(yaw)*200 - abs(pitch)*150
1012
+
1013
+
1014
+ # def multi_face_swap_industry(src_img, tgt_img):
1015
+ # """
1016
+ # Industry-grade 2-face swap
1017
+ # Returns swapped RGB image or raises error
1018
+ # """
1019
+
1020
+ # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
1021
+ # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
1022
+
1023
+ # src_faces = face_analysis_app.get(src_bgr)
1024
+ # tgt_faces = face_analysis_app.get(tgt_bgr)
1025
+
1026
+ # if len(src_faces) < 1 or len(tgt_faces) < 1:
1027
+ # raise ValueError("No faces detected")
1028
+
1029
+ # if len(src_faces) > 2 or len(tgt_faces) > 2:
1030
+ # raise ValueError("More than 2 faces detected (blocked)")
1031
+
1032
+ # # ---- Quality filtering ----
1033
+ # src_faces = [f for f in src_faces if face_ok(f)]
1034
+ # tgt_faces = [f for f in tgt_faces if face_ok(f)]
1035
+
1036
+ # if not src_faces or not tgt_faces:
1037
+ # raise ValueError("Faces rejected due to pose/quality")
1038
+
1039
+ # # ---- Sort by quality (best first) ----
1040
+ # src_faces = sorted(src_faces, key=quality_score, reverse=True)
1041
+ # tgt_faces = sorted(tgt_faces, key=quality_score, reverse=True)
1042
+
1043
+ # # ---- Extract embeddings ----
1044
+ # src_embs = np.array([f.normed_embedding for f in src_faces])
1045
+ # tgt_embs = np.array([f.normed_embedding for f in tgt_faces])
1046
+
1047
+ # sim = cosine_similarity(src_embs, tgt_embs)
1048
+
1049
+ # used_tgts = set()
1050
+ # pairs = []
1051
+
1052
+ # for i in range(len(src_faces)):
1053
+ # j = np.argmax(sim[i])
1054
+ # score = sim[i][j]
1055
+
1056
+ # if score < SIM_THRESHOLD:
1057
+ # raise ValueError(f"Low identity similarity ({score:.2f})")
1058
+
1059
+ # if j in used_tgts:
1060
+ # continue
1061
+
1062
+ # pairs.append((src_faces[i], tgt_faces[j], score))
1063
+ # used_tgts.add(j)
1064
+
1065
+ # if not pairs:
1066
+ # raise ValueError("No valid face pairs")
1067
+
1068
+ # # ---- Swap WITHOUT re-detection ----
1069
+ # result = tgt_bgr.copy()
1070
+
1071
+ # for src_face, tgt_face, score in pairs:
1072
+ # result = swapper.get(
1073
+ # result,
1074
+ # tgt_face,
1075
+ # src_face,
1076
+ # paste_back=True
1077
+ # )
1078
+
1079
+ # return cv2.cvtColor(result, cv2.COLOR_BGR2RGB)
1080
+
1081
+ # # def multi_face_swap(src_img, tgt_img):
1082
+ # # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
1083
+ # # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR)
1084
+
1085
+ # # src_faces = face_analysis_app.get(src_bgr)
1086
+ # # tgt_faces = face_analysis_app.get(tgt_bgr)
1087
+
1088
+ # # if not src_faces or not tgt_faces:
1089
+ # # raise ValueError("No faces detected")
1090
+
1091
+ # # def face_sort_key(face):
1092
+ # # x1, y1, x2, y2 = face.bbox
1093
+ # # area = (x2 - x1) * (y2 - y1)
1094
+ # # cx = (x1 + x2) / 2
1095
+ # # return (-area, cx)
1096
+
1097
+ # # # Split by gender
1098
+ # # src_male = [f for f in src_faces if f.gender == 1]
1099
+ # # src_female = [f for f in src_faces if f.gender == 0]
1100
+
1101
+ # # tgt_male = [f for f in tgt_faces if f.gender == 1]
1102
+ # # tgt_female = [f for f in tgt_faces if f.gender == 0]
1103
+
1104
+ # # # Sort inside gender groups
1105
+ # # src_male = sorted(src_male, key=face_sort_key)
1106
+ # # src_female = sorted(src_female, key=face_sort_key)
1107
+
1108
+ # # tgt_male = sorted(tgt_male, key=face_sort_key)
1109
+ # # tgt_female = sorted(tgt_female, key=face_sort_key)
1110
+
1111
+ # # # Build final swap pairs
1112
+ # # pairs = []
1113
+
1114
+ # # for s, t in zip(src_male, tgt_male):
1115
+ # # pairs.append((s, t))
1116
+
1117
+ # # for s, t in zip(src_female, tgt_female):
1118
+ # # pairs.append((s, t))
1119
+
1120
+ # # # Fallback if gender mismatch
1121
+ # # if not pairs:
1122
+ # # src_faces = sorted(src_faces, key=face_sort_key)
1123
+ # # tgt_faces = sorted(tgt_faces, key=face_sort_key)
1124
+ # # pairs = list(zip(src_faces, tgt_faces))
1125
+
1126
+ # # result_img = tgt_bgr.copy()
1127
+
1128
+ # # for src_face, _ in pairs:
1129
+ # # # 🔁 re-detect current target faces
1130
+ # # current_faces = face_analysis_app.get(result_img)
1131
+ # # current_faces = sorted(current_faces, key=face_sort_key)
1132
+
1133
+ # # # choose best matching gender
1134
+ # # candidates = [
1135
+ # # f for f in current_faces if f.gender == src_face.gender
1136
+ # # ] or current_faces
1137
+
1138
+ # # target_face = candidates[0]
1139
+
1140
+ # # result_img = swapper.get(
1141
+ # # result_img,
1142
+ # # target_face,
1143
+ # # src_face,
1144
+ # # paste_back=True
1145
+ # # )
1146
+
1147
+ # # return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB)
1148
+
1149
+
1150
+
1151
+ # def face_swap_and_enhance(src_img, tgt_img, temp_dir=None):
1152
  # try:
1153
  # with swap_lock:
1154
  # # Use a temp dir for intermediate files
1155
+ # if temp_dir is None:
1156
+ # temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_work_{uuid.uuid4().hex[:8]}")
1157
  # if os.path.exists(temp_dir):
1158
  # shutil.rmtree(temp_dir)
1159
  # os.makedirs(temp_dir, exist_ok=True)
 
1167
  # return None, None, "❌ Face not detected in one of the images"
1168
 
1169
  # swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg")
1170
+ # swapped_rgb = multi_face_swap_industry(src_img, tgt_img)
1171
+ # swapped_bgr = cv2.cvtColor(swapped_rgb, cv2.COLOR_RGB2BGR)
1172
+
1173
  # if swapped_bgr is None:
1174
  # return None, None, "❌ Face swap failed"
1175
 
1176
  # cv2.imwrite(swapped_path, swapped_bgr)
1177
 
1178
+ # python_cmd = sys.executable if sys.executable else "python3"
1179
+ # cmd = f"{python_cmd} {CODEFORMER_PATH} -w 0.4 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample"
1180
  # result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
1181
  # if result.returncode != 0:
1182
  # return None, None, f"❌ CodeFormer failed:\n{result.stderr}"
 
1187
  # return None, None, "❌ No enhanced image found"
1188
 
1189
  # final_path = os.path.join(final_results_dir, final_files[0])
1190
+ # final_img_bgr = cv2.imread(final_path)
1191
+ # if final_img_bgr is None:
1192
+ # return None, None, "❌ Failed to read enhanced image file"
1193
+ # final_img = cv2.cvtColor(final_img_bgr, cv2.COLOR_BGR2RGB)
1194
 
1195
  # return final_img, final_path, ""
1196
 
1197
  # except Exception as e:
1198
  # return None, None, f"❌ Error: {str(e)}"
1199
+
1200
+ # def compress_image(
1201
+ # image_bytes: bytes,
1202
+ # max_size=(1280, 1280), # max width/height
1203
+ # quality=75 # JPEG quality (60–80 is ideal)
1204
+ # ) -> bytes:
1205
+ # """
1206
+ # Compress image by resizing and lowering quality.
1207
+ # Returns compressed image bytes.
1208
+ # """
1209
+ # img = Image.open(io.BytesIO(image_bytes)).convert("RGB")
1210
+
1211
+ # # Resize while maintaining aspect ratio
1212
+ # img.thumbnail(max_size, Image.LANCZOS)
1213
+
1214
+ # output = io.BytesIO()
1215
+ # img.save(
1216
+ # output,
1217
+ # format="JPEG",
1218
+ # quality=quality,
1219
+ # optimize=True,
1220
+ # progressive=True
1221
+ # )
1222
+
1223
+ # return output.getvalue()
1224
 
1225
  # # --------------------- Gradio ---------------------
1226
+ # # with gr.Blocks() as demo:
1227
+ # # gr.Markdown("Face Swap")
1228
 
1229
+ # # with gr.Row():
1230
+ # # src_input = gr.Image(type="numpy", label="Upload Your Face")
1231
+ # # tgt_input = gr.Image(type="numpy", label="Upload Target Image")
1232
 
1233
+ # # btn = gr.Button("Swap Face")
1234
+ # # output_img = gr.Image(type="numpy", label="Enhanced Output")
1235
+ # # download = gr.File(label="⬇️ Download Enhanced Image")
1236
+ # # error_box = gr.Textbox(label="Logs / Errors", interactive=False)
1237
 
1238
+ # # def process(src, tgt):
1239
+ # # img, path, err = face_swap_and_enhance(src, tgt)
1240
+ # # return img, path, err
1241
 
1242
+ # # btn.click(process, [src_input, tgt_input], [output_img, download, error_box])
1243
 
1244
  # # --------------------- DigitalOcean Spaces Helper ---------------------
1245
  # def get_spaces_client():
 
1264
  # obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key)
1265
  # return obj['Body'].read()
1266
 
1267
+ # def build_multi_faceswap_gradio():
1268
+ # with gr.Blocks() as demo:
1269
+ # gr.Markdown("## 👩‍❤️‍👨 Multi Face Swap (Couple → Couple)")
1270
+
1271
+ # with gr.Row():
1272
+ # src = gr.Image(type="numpy", label="Source Image (2 Faces)")
1273
+ # tgt = gr.Image(type="numpy", label="Target Image (2 Faces)")
1274
+
1275
+ # out = gr.Image(type="numpy", label="Swapped Result")
1276
+ # error = gr.Textbox(label="Logs", interactive=False)
1277
+
1278
+ # def process(src_img, tgt_img):
1279
+ # try:
1280
+ # swapped = multi_face_swap_industry(src_img, tgt_img)
1281
+ # enhanced = enhance_image_with_codeformer(swapped)
1282
+ # return enhanced, ""
1283
+ # except Exception as e:
1284
+ # return None, str(e)
1285
+
1286
+ # btn = gr.Button("Swap Faces")
1287
+ # btn.click(process, [src, tgt], [out, error])
1288
+
1289
+ # return demo
1290
+
1291
  # # --------------------- API Endpoints ---------------------
1292
  # @fastapi_app.get("/")
1293
  # def root():
1294
+ # return {"status": "healthy"}
1295
 
1296
  # @fastapi_app.get("/health")
1297
  # async def health():
 
1381
  # # ------------------------------------------------------------------#
1382
  # if user_id:
1383
  # try:
1384
+ # user_id_clean = user_id.strip()
1385
+ # if not user_id_clean:
1386
+ # raise ValueError("user_id cannot be empty")
1387
+ # try:
1388
+ # user_oid = ObjectId(user_id_clean)
1389
+ # except (InvalidId, ValueError) as e:
1390
+ # logger.error(f"Invalid user_id format: {user_id_clean}")
1391
+ # raise ValueError(f"Invalid user_id format: {user_id_clean}")
1392
+
1393
  # now = datetime.utcnow()
1394
 
1395
  # # Normalize dates (UTC midnight)
1396
  # today_date = datetime(now.year, now.month, now.day)
 
1397
 
1398
  # # -------------------------------------------------
1399
  # # STEP 1: Ensure root document exists
 
1507
  # }
1508
  # }
1509
  # )
1510
+
1511
+ # # -------------------------------------------------
1512
+ # # STEP 5: Sort subCategories by lastClickedAt (ascending - oldest first)
1513
+ # # -------------------------------------------------
1514
+ # user_doc = await media_clicks_col.find_one({"userId": user_oid})
1515
+ # if user_doc and "subCategories" in user_doc:
1516
+ # subcategories = user_doc["subCategories"]
1517
+ # # Sort by lastClickedAt in ascending order (oldest first)
1518
+ # # Handle missing or None dates by using datetime.min
1519
+ # subcategories_sorted = sorted(
1520
+ # subcategories,
1521
+ # key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min
1522
+ # )
1523
+ # # Update with sorted array
1524
+ # await media_clicks_col.update_one(
1525
+ # {"userId": user_oid},
1526
+ # {
1527
+ # "$set": {
1528
+ # "subCategories": subcategories_sorted,
1529
+ # "updatedAt": now
1530
+ # }
1531
+ # }
1532
+ # )
1533
 
1534
  # logger.info(
1535
  # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked",
 
1600
  # # # ------------------------------------------------------------------
1601
  # # # DOWNLOAD TARGET IMAGE
1602
  # # # ------------------------------------------------------------------
1603
+ # async with httpx.AsyncClient(timeout=30.0) as client:
1604
+ # response = await client.get(target_url)
1605
+ # response.raise_for_status()
1606
+ # tgt_bytes = response.content
1607
 
1608
  # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR)
1609
  # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR)
 
1626
 
1627
  # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png"
1628
  # result_url = upload_to_spaces(result_bytes, result_key)
1629
+ # # -------------------------------------------------
1630
+ # # COMPRESS IMAGE (2–3 MB target)
1631
+ # # -------------------------------------------------
1632
+ # compressed_bytes = compress_image(
1633
+ # image_bytes=result_bytes,
1634
+ # max_size=(1280, 1280),
1635
+ # quality=72
1636
+ # )
1637
+
1638
+ # compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg"
1639
+ # compressed_url = upload_to_spaces(
1640
+ # compressed_bytes,
1641
+ # compressed_key,
1642
+ # content_type="image/jpeg"
1643
+ # )
1644
  # end_time = datetime.utcnow()
1645
  # response_time_ms = (end_time - start_time).total_seconds() * 1000
1646
 
 
1655
 
1656
  # return {
1657
  # "result_key": result_key,
1658
+ # "result_url": result_url,
1659
+ # "Compressed_Image_URL": compressed_url
1660
  # }
1661
 
1662
  # except Exception as e:
 
1686
  # headers={"Content-Disposition": "inline; filename=result.png"}
1687
  # )
1688
  # # --------------------- Mount Gradio ---------------------
1689
+
1690
+ # multi_faceswap_app = build_multi_faceswap_gradio()
1691
+ # fastapi_app = mount_gradio_app(
1692
+ # fastapi_app,
1693
+ # multi_faceswap_app,
1694
+ # path="/gradio-multi"
1695
+ # )
1696
 
1697
  # if __name__ == "__main__":
1698
  # uvicorn.run(fastapi_app, host="0.0.0.0", port=7860)