LogicGoInfotechSpaces commited on
Commit
b04dbb5
·
1 Parent(s): 5f12d8e

Add comprehensive FastAPI endpoints for all face swap operations

Browse files
Files changed (1) hide show
  1. fastapi_app.py +414 -9
fastapi_app.py CHANGED
@@ -1,7 +1,9 @@
1
  import logging
2
  import os
3
  import shutil
 
4
  import tempfile
 
5
  from io import BytesIO
6
 
7
  import cv2
@@ -35,14 +37,44 @@ def decode_image(upload: UploadFile) -> np.ndarray:
35
  return bgr
36
 
37
 
38
- def swap_video_file(src_bgr: np.ndarray, src_idx: int, video_path: str, dst_idx: int) -> tuple[str, str]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
  """Run face swap on a video file. Returns (output_video_path, workdir)."""
40
- workdir = tempfile.mkdtemp(prefix="swap_video_")
 
41
  src_path = os.path.join(workdir, "data_src.jpg")
42
  dst_video_path = os.path.join(workdir, "data_dst.mp4")
43
  frames_dir = os.path.join(workdir, "video_frames")
44
  swapped_dir = os.path.join(workdir, "swapped_frames")
45
- output_video_path = os.path.join(workdir, "output_swapped_video.mp4")
 
46
 
47
  os.makedirs(frames_dir, exist_ok=True)
48
  os.makedirs(swapped_dir, exist_ok=True)
@@ -67,7 +99,6 @@ def swap_video_file(src_bgr: np.ndarray, src_idx: int, video_path: str, dst_idx:
67
  cv2.imwrite(out_path, swapped)
68
  except Exception as exc:
69
  logger.warning("Failed to swap frame %s: %s", idx, exc)
70
- # Copy original frame so the video remains playable
71
  cv2.imwrite(out_path, cv2.imread(frame_path))
72
 
73
  cap = cv2.VideoCapture(dst_video_path)
@@ -75,6 +106,15 @@ def swap_video_file(src_bgr: np.ndarray, src_idx: int, video_path: str, dst_idx:
75
  cap.release()
76
  frames_to_video(swapped_dir, output_video_path, fps)
77
  logger.info("Created swapped video at %s", output_video_path)
 
 
 
 
 
 
 
 
 
78
  return output_video_path, workdir
79
 
80
 
@@ -91,6 +131,7 @@ async def swap_photo(
91
  destination_face_idx: int = Form(1, ge=1),
92
  _: None = Depends(verify_api_key),
93
  ) -> StreamingResponse:
 
94
  src_bgr = decode_image(source_image)
95
  dst_bgr = decode_image(destination_image)
96
 
@@ -106,7 +147,7 @@ async def swap_photo(
106
  swapped = face_swapper.swap_faces(src_path, source_face_idx, dst_path, 1)
107
  logger.info("Fallback to destination_face_idx 1 after missing face")
108
  else:
109
- raise
110
 
111
  ok, buffer = cv2.imencode(".jpg", swapped)
112
  if not ok:
@@ -114,6 +155,151 @@ async def swap_photo(
114
  return StreamingResponse(BytesIO(buffer.tobytes()), media_type="image/jpeg")
115
 
116
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  @app.post("/swap/video")
118
  async def swap_video(
119
  background_tasks: BackgroundTasks,
@@ -121,26 +307,245 @@ async def swap_video(
121
  target_video: UploadFile = File(..., description="Target video (mp4)"),
122
  source_face_idx: int = Form(1, ge=1),
123
  destination_face_idx: int = Form(1, ge=1),
 
124
  _: None = Depends(verify_api_key),
125
  ) -> StreamingResponse:
 
126
  src_bgr = decode_image(source_image)
127
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video:
 
128
  video_bytes = await target_video.read()
129
  tmp_video.write(video_bytes)
130
  video_path = tmp_video.name
131
 
132
  try:
133
- output_path, workdir = swap_video_file(src_bgr, source_face_idx, video_path, destination_face_idx)
134
  except Exception as exc:
135
  logger.exception("Video swap failed: %s", exc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  raise HTTPException(status_code=500, detail="Video swap failed") from exc
137
 
138
  def cleanup() -> None:
139
  shutil.rmtree(workdir, ignore_errors=True)
140
- if os.path.exists(video_path):
141
- os.remove(video_path)
142
  logger.info("Cleaned up workdir %s", workdir)
143
 
144
  background_tasks.add_task(cleanup)
145
  return StreamingResponse(open(output_path, "rb"), media_type="video/mp4")
146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import logging
2
  import os
3
  import shutil
4
+ import subprocess
5
  import tempfile
6
+ import zipfile
7
  from io import BytesIO
8
 
9
  import cv2
 
37
  return bgr
38
 
39
 
40
+ def add_audio_to_video(original_video_path: str, video_no_audio_path: str, output_path: str) -> tuple[bool, str]:
41
+ """Add audio from original video to swapped video."""
42
+ cmd = [
43
+ "ffmpeg",
44
+ "-y",
45
+ "-i", video_no_audio_path,
46
+ "-i", original_video_path,
47
+ "-c:v", "copy",
48
+ "-c:a", "aac",
49
+ "-map", "0:v:0",
50
+ "-map", "1:a:0?",
51
+ "-shortest",
52
+ output_path
53
+ ]
54
+ try:
55
+ subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
56
+ return True, ""
57
+ except subprocess.CalledProcessError as e:
58
+ return False, e.stderr.decode() if e.stderr else "Unknown error"
59
+
60
+
61
+ def swap_video_file(
62
+ src_bgr: np.ndarray,
63
+ src_idx: int,
64
+ video_path: str,
65
+ dst_idx: int,
66
+ add_audio: bool = True,
67
+ workdir: str | None = None
68
+ ) -> tuple[str, str]:
69
  """Run face swap on a video file. Returns (output_video_path, workdir)."""
70
+ if workdir is None:
71
+ workdir = tempfile.mkdtemp(prefix="swap_video_")
72
  src_path = os.path.join(workdir, "data_src.jpg")
73
  dst_video_path = os.path.join(workdir, "data_dst.mp4")
74
  frames_dir = os.path.join(workdir, "video_frames")
75
  swapped_dir = os.path.join(workdir, "swapped_frames")
76
+ output_video_path = os.path.join(workdir, "output_tmp_output_video.mp4")
77
+ final_output_path = os.path.join(workdir, "output_with_audio.mp4")
78
 
79
  os.makedirs(frames_dir, exist_ok=True)
80
  os.makedirs(swapped_dir, exist_ok=True)
 
99
  cv2.imwrite(out_path, swapped)
100
  except Exception as exc:
101
  logger.warning("Failed to swap frame %s: %s", idx, exc)
 
102
  cv2.imwrite(out_path, cv2.imread(frame_path))
103
 
104
  cap = cv2.VideoCapture(dst_video_path)
 
106
  cap.release()
107
  frames_to_video(swapped_dir, output_video_path, fps)
108
  logger.info("Created swapped video at %s", output_video_path)
109
+
110
+ if add_audio:
111
+ ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path)
112
+ if ok:
113
+ logger.info("Added audio to %s", final_output_path)
114
+ return final_output_path, workdir
115
+ else:
116
+ logger.warning("Audio muxing failed: %s", audio_log)
117
+ return output_video_path, workdir
118
  return output_video_path, workdir
119
 
120
 
 
131
  destination_face_idx: int = Form(1, ge=1),
132
  _: None = Depends(verify_api_key),
133
  ) -> StreamingResponse:
134
+ """Swap a single face from source image to destination image."""
135
  src_bgr = decode_image(source_image)
136
  dst_bgr = decode_image(destination_image)
137
 
 
147
  swapped = face_swapper.swap_faces(src_path, source_face_idx, dst_path, 1)
148
  logger.info("Fallback to destination_face_idx 1 after missing face")
149
  else:
150
+ raise HTTPException(status_code=400, detail=str(err))
151
 
152
  ok, buffer = cv2.imencode(".jpg", swapped)
153
  if not ok:
 
155
  return StreamingResponse(BytesIO(buffer.tobytes()), media_type="image/jpeg")
156
 
157
 
158
+ @app.post("/swap/photo/single-src-multi-dst")
159
+ async def swap_single_src_multi_dst(
160
+ source_image: UploadFile = File(..., description="Source face image"),
161
+ destination_images: list[UploadFile] = File(..., description="Destination images"),
162
+ destination_indices: str = Form(..., description="Comma-separated destination face indices (e.g., '1,1,2')"),
163
+ _: None = Depends(verify_api_key),
164
+ ) -> StreamingResponse:
165
+ """Swap a single source face onto multiple destination images."""
166
+ src_bgr = decode_image(source_image)
167
+ dst_indices_list = [int(idx.strip()) for idx in destination_indices.split(",") if idx.strip().isdigit()]
168
+
169
+ with tempfile.TemporaryDirectory(prefix="single_src_multi_dst_") as workdir:
170
+ src_path = os.path.join(workdir, "src.jpg")
171
+ cv2.imwrite(src_path, src_bgr)
172
+
173
+ zip_buffer = BytesIO()
174
+ with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
175
+ for j, dst_file in enumerate(destination_images):
176
+ dst_bgr = decode_image(dst_file)
177
+ dst_path = os.path.join(workdir, f"dst_{j}.jpg")
178
+ output_path = os.path.join(workdir, f"output_{j}.jpg")
179
+ cv2.imwrite(dst_path, dst_bgr)
180
+
181
+ try:
182
+ dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1
183
+ swapped = face_swapper.swap_faces(src_path, 1, dst_path, dst_idx)
184
+ cv2.imwrite(output_path, swapped)
185
+ zip_file.write(output_path, f"swapped_{j}.jpg")
186
+ except Exception as exc:
187
+ logger.warning("Failed to swap destination %s: %s", j, exc)
188
+
189
+ zip_buffer.seek(0)
190
+ return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=swapped_images.zip"})
191
+
192
+
193
+ @app.post("/swap/photo/multi-src-single-dst")
194
+ async def swap_multi_src_single_dst(
195
+ source_images: list[UploadFile] = File(..., description="Source face images"),
196
+ destination_image: UploadFile = File(..., description="Destination image"),
197
+ destination_face_idx: int = Form(1, ge=1),
198
+ _: None = Depends(verify_api_key),
199
+ ) -> StreamingResponse:
200
+ """Swap multiple source faces onto a single destination image."""
201
+ dst_bgr = decode_image(destination_image)
202
+
203
+ with tempfile.TemporaryDirectory(prefix="multi_src_single_dst_") as workdir:
204
+ dst_path = os.path.join(workdir, "dst.jpg")
205
+ cv2.imwrite(dst_path, dst_bgr)
206
+
207
+ zip_buffer = BytesIO()
208
+ with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
209
+ for i, src_file in enumerate(source_images):
210
+ src_bgr = decode_image(src_file)
211
+ src_path = os.path.join(workdir, f"src_{i}.jpg")
212
+ output_path = os.path.join(workdir, f"output_{i}.jpg")
213
+ cv2.imwrite(src_path, src_bgr)
214
+
215
+ try:
216
+ swapped = face_swapper.swap_faces(src_path, 1, dst_path, destination_face_idx)
217
+ cv2.imwrite(output_path, swapped)
218
+ zip_file.write(output_path, f"swapped_{i}.jpg")
219
+ except Exception as exc:
220
+ logger.warning("Failed to swap source %s: %s", i, exc)
221
+
222
+ zip_buffer.seek(0)
223
+ return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=swapped_images.zip"})
224
+
225
+
226
+ @app.post("/swap/photo/multi-src-multi-dst")
227
+ async def swap_multi_src_multi_dst(
228
+ source_images: list[UploadFile] = File(..., description="Source face images"),
229
+ destination_images: list[UploadFile] = File(..., description="Destination images"),
230
+ destination_indices: str = Form(..., description="Comma-separated destination face indices (e.g., '1,1,2')"),
231
+ _: None = Depends(verify_api_key),
232
+ ) -> StreamingResponse:
233
+ """Swap multiple source faces onto multiple destination images."""
234
+ dst_indices_list = [int(idx.strip()) for idx in destination_indices.split(",") if idx.strip().isdigit()]
235
+
236
+ with tempfile.TemporaryDirectory(prefix="multi_src_multi_dst_") as workdir:
237
+ zip_buffer = BytesIO()
238
+ with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
239
+ for i, src_file in enumerate(source_images):
240
+ src_bgr = decode_image(src_file)
241
+ src_path = os.path.join(workdir, f"src_{i}.jpg")
242
+ cv2.imwrite(src_path, src_bgr)
243
+
244
+ for j, dst_file in enumerate(destination_images):
245
+ dst_bgr = decode_image(dst_file)
246
+ dst_path = os.path.join(workdir, f"dst_{j}.jpg")
247
+ output_path = os.path.join(workdir, f"output_{i}_{j}.jpg")
248
+
249
+ try:
250
+ dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1
251
+ swapped = face_swapper.swap_faces(src_path, 1, dst_path, dst_idx)
252
+ cv2.imwrite(output_path, swapped)
253
+ zip_file.write(output_path, f"swapped_src{i}_dst{j}.jpg")
254
+ except Exception as exc:
255
+ logger.warning("Failed to swap src %s with dst %s: %s", i, j, exc)
256
+
257
+ zip_buffer.seek(0)
258
+ return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=swapped_images.zip"})
259
+
260
+
261
+ @app.post("/swap/photo/custom-mapping")
262
+ async def swap_faces_custom(
263
+ source_images: list[UploadFile] = File(..., description="Source face images"),
264
+ destination_image: UploadFile = File(..., description="Destination image"),
265
+ mapping: str = Form(..., description="Comma-separated mapping (e.g., '2,1,3' means face1->src2, face2->src1, face3->src3)"),
266
+ _: None = Depends(verify_api_key),
267
+ ) -> StreamingResponse:
268
+ """Swap faces with custom mapping: map destination faces to specific source images."""
269
+ dst_bgr = decode_image(destination_image)
270
+ mapping_list = [int(x.strip()) for x in mapping.split(",") if x.strip().isdigit()]
271
+
272
+ with tempfile.TemporaryDirectory(prefix="custom_swap_") as workdir:
273
+ dst_path = os.path.join(workdir, "dst.jpg")
274
+ temp_path = os.path.join(workdir, "temp.jpg")
275
+ output_path = os.path.join(workdir, "output.jpg")
276
+ cv2.imwrite(dst_path, dst_bgr)
277
+ shutil.copy(dst_path, temp_path)
278
+
279
+ src_paths = []
280
+ for i, src_file in enumerate(source_images):
281
+ src_bgr = decode_image(src_file)
282
+ src_path = os.path.join(workdir, f"src_{i+1}.jpg")
283
+ cv2.imwrite(src_path, src_bgr)
284
+ src_paths.append(src_path)
285
+
286
+ for face_idx, src_idx in enumerate(mapping_list, start=1):
287
+ if src_idx < 1 or src_idx > len(src_paths):
288
+ logger.warning("Invalid source index %s for face %s", src_idx, face_idx)
289
+ continue
290
+ try:
291
+ swapped_img = face_swapper.swap_faces(src_paths[src_idx-1], 1, temp_path, face_idx)
292
+ cv2.imwrite(temp_path, swapped_img)
293
+ except Exception as exc:
294
+ logger.warning("Failed to swap face %s with source %s: %s", face_idx, src_idx, exc)
295
+
296
+ shutil.copy(temp_path, output_path)
297
+ ok, buffer = cv2.imencode(".jpg", cv2.imread(output_path))
298
+ if not ok:
299
+ raise HTTPException(status_code=500, detail="Failed to encode swapped image")
300
+ return StreamingResponse(BytesIO(buffer.tobytes()), media_type="image/jpeg")
301
+
302
+
303
  @app.post("/swap/video")
304
  async def swap_video(
305
  background_tasks: BackgroundTasks,
 
307
  target_video: UploadFile = File(..., description="Target video (mp4)"),
308
  source_face_idx: int = Form(1, ge=1),
309
  destination_face_idx: int = Form(1, ge=1),
310
+ add_audio: bool = Form(True, description="Add audio from original video"),
311
  _: None = Depends(verify_api_key),
312
  ) -> StreamingResponse:
313
+ """Swap a single face from source image onto all frames of target video."""
314
  src_bgr = decode_image(source_image)
315
+ workdir = tempfile.mkdtemp(prefix="swap_video_")
316
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4", dir=workdir) as tmp_video:
317
  video_bytes = await target_video.read()
318
  tmp_video.write(video_bytes)
319
  video_path = tmp_video.name
320
 
321
  try:
322
+ output_path, _ = swap_video_file(src_bgr, source_face_idx, video_path, destination_face_idx, add_audio, workdir)
323
  except Exception as exc:
324
  logger.exception("Video swap failed: %s", exc)
325
+ shutil.rmtree(workdir, ignore_errors=True)
326
+ raise HTTPException(status_code=500, detail="Video swap failed") from exc
327
+
328
+ def cleanup() -> None:
329
+ shutil.rmtree(workdir, ignore_errors=True)
330
+ logger.info("Cleaned up workdir %s", workdir)
331
+
332
+ background_tasks.add_task(cleanup)
333
+ return StreamingResponse(open(output_path, "rb"), media_type="video/mp4")
334
+
335
+
336
+ @app.post("/swap/video/all-faces")
337
+ async def swap_video_all_faces(
338
+ background_tasks: BackgroundTasks,
339
+ source_image: UploadFile = File(..., description="Source face image"),
340
+ target_video: UploadFile = File(..., description="Target video (mp4)"),
341
+ num_faces_to_swap: int = Form(1, ge=1, description="Number of faces to swap"),
342
+ add_audio: bool = Form(True, description="Add audio from original video"),
343
+ _: None = Depends(verify_api_key),
344
+ ) -> StreamingResponse:
345
+ """Swap all faces in video with the same source face."""
346
+ src_bgr = decode_image(source_image)
347
+ workdir = tempfile.mkdtemp(prefix="swap_video_all_")
348
+ src_path = os.path.join(workdir, "data_src.jpg")
349
+ cv2.imwrite(src_path, src_bgr)
350
+
351
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4", dir=workdir) as tmp_video:
352
+ video_bytes = await target_video.read()
353
+ tmp_video.write(video_bytes)
354
+ video_path = tmp_video.name
355
+
356
+ try:
357
+ dst_video_path = os.path.join(workdir, "data_dst.mp4")
358
+ frames_dir = os.path.join(workdir, "video_frames")
359
+ swapped_dir = os.path.join(workdir, "swapped_frames")
360
+ temp_dir = os.path.join(swapped_dir, "temp_swap")
361
+ output_video_path = os.path.join(workdir, "output_tmp_output_video.mp4")
362
+ final_output_path = os.path.join(workdir, "output_with_audio.mp4")
363
+
364
+ os.makedirs(frames_dir, exist_ok=True)
365
+ os.makedirs(swapped_dir, exist_ok=True)
366
+ os.makedirs(temp_dir, exist_ok=True)
367
+
368
+ shutil.copy(video_path, dst_video_path)
369
+ frame_paths = extract_frames(dst_video_path, frames_dir)
370
+ logger.info("Extracted %s frames", len(frame_paths))
371
+
372
+ temp_frame_path = os.path.join(temp_dir, "temp.jpg")
373
+ for idx, frame_path in enumerate(frame_paths):
374
+ swapped_name = f"swapped_{idx:05d}.jpg"
375
+ out_path = os.path.join(swapped_dir, swapped_name)
376
+ try:
377
+ shutil.copy(frame_path, temp_frame_path)
378
+ for face_idx in range(1, num_faces_to_swap + 1):
379
+ try:
380
+ swapped_img = face_swapper.swap_faces(src_path, 1, temp_frame_path, face_idx)
381
+ cv2.imwrite(temp_frame_path, swapped_img)
382
+ except Exception as exc:
383
+ logger.warning("Failed to swap face %s in frame %s: %s", face_idx, idx, exc)
384
+ shutil.copy(temp_frame_path, out_path)
385
+ except Exception as exc:
386
+ logger.warning("Failed to swap frame %s: %s", idx, exc)
387
+ cv2.imwrite(out_path, cv2.imread(frame_path))
388
+
389
+ cap = cv2.VideoCapture(dst_video_path)
390
+ fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
391
+ cap.release()
392
+ frames_to_video(swapped_dir, output_video_path, fps)
393
+
394
+ if add_audio:
395
+ ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path)
396
+ if ok:
397
+ output_path = final_output_path
398
+ else:
399
+ logger.warning("Audio muxing failed: %s", audio_log)
400
+ output_path = output_video_path
401
+ else:
402
+ output_path = output_video_path
403
+
404
+ except Exception as exc:
405
+ logger.exception("Video swap all faces failed: %s", exc)
406
+ shutil.rmtree(workdir, ignore_errors=True)
407
+ raise HTTPException(status_code=500, detail="Video swap failed") from exc
408
+
409
+ def cleanup() -> None:
410
+ shutil.rmtree(workdir, ignore_errors=True)
411
+ logger.info("Cleaned up workdir %s", workdir)
412
+
413
+ background_tasks.add_task(cleanup)
414
+ return StreamingResponse(open(output_path, "rb"), media_type="video/mp4")
415
+
416
+
417
+ @app.post("/swap/video/custom-mapping")
418
+ async def swap_video_custom_mapping(
419
+ background_tasks: BackgroundTasks,
420
+ source_images: list[UploadFile] = File(..., description="Source face images"),
421
+ target_video: UploadFile = File(..., description="Target video (mp4)"),
422
+ mapping: str = Form(..., description="Comma-separated mapping (e.g., '2,1,3')"),
423
+ add_audio: bool = Form(True, description="Add audio from original video"),
424
+ _: None = Depends(verify_api_key),
425
+ ) -> StreamingResponse:
426
+ """Swap faces in video with custom mapping: map destination faces to specific source images."""
427
+ workdir = tempfile.mkdtemp(prefix="swap_video_custom_")
428
+ mapping_list = [int(x.strip()) for x in mapping.split(",") if x.strip().isdigit()]
429
+
430
+ src_paths = []
431
+ for i, src_file in enumerate(source_images):
432
+ src_bgr = decode_image(src_file)
433
+ src_path = os.path.join(workdir, f"src_{i+1}.jpg")
434
+ cv2.imwrite(src_path, src_bgr)
435
+ src_paths.append(src_path)
436
+
437
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4", dir=workdir) as tmp_video:
438
+ video_bytes = await target_video.read()
439
+ tmp_video.write(video_bytes)
440
+ video_path = tmp_video.name
441
+
442
+ try:
443
+ dst_video_path = os.path.join(workdir, "data_dst.mp4")
444
+ temp_dir = os.path.join(workdir, "temp")
445
+ frames_dir = os.path.join(workdir, "frames")
446
+ swapped_dir = os.path.join(workdir, "swapped_frames")
447
+ output_video_path = os.path.join(workdir, "output_tmp_output_video.mp4")
448
+ final_output_path = os.path.join(workdir, "output_with_audio.mp4")
449
+
450
+ os.makedirs(temp_dir, exist_ok=True)
451
+ os.makedirs(frames_dir, exist_ok=True)
452
+ os.makedirs(swapped_dir, exist_ok=True)
453
+
454
+ shutil.copy(video_path, dst_video_path)
455
+ frame_paths = extract_frames(dst_video_path, frames_dir)
456
+ logger.info("Extracted %s frames", len(frame_paths))
457
+
458
+ temp_frame_path = os.path.join(temp_dir, "temp.jpg")
459
+ for idx, frame_path in enumerate(frame_paths):
460
+ swapped_name = f"swapped_{idx:05d}.jpg"
461
+ out_path = os.path.join(swapped_dir, swapped_name)
462
+ try:
463
+ shutil.copy(frame_path, temp_frame_path)
464
+ for face_idx, src_idx in enumerate(mapping_list, start=1):
465
+ if src_idx < 1 or src_idx > len(src_paths):
466
+ logger.warning("Invalid source index %s for face %s in frame %s", src_idx, face_idx, idx)
467
+ continue
468
+ try:
469
+ swapped_img = face_swapper.swap_faces(src_paths[src_idx-1], 1, temp_frame_path, face_idx)
470
+ cv2.imwrite(temp_frame_path, swapped_img)
471
+ except Exception as exc:
472
+ logger.warning("Failed to swap face %s with source %s in frame %s: %s", face_idx, src_idx, idx, exc)
473
+ shutil.copy(temp_frame_path, out_path)
474
+ except Exception as exc:
475
+ logger.warning("Failed to swap frame %s: %s", idx, exc)
476
+ cv2.imwrite(out_path, cv2.imread(frame_path))
477
+
478
+ cap = cv2.VideoCapture(dst_video_path)
479
+ fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
480
+ cap.release()
481
+ frames_to_video(swapped_dir, output_video_path, fps)
482
+
483
+ if add_audio:
484
+ ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path)
485
+ if ok:
486
+ output_path = final_output_path
487
+ else:
488
+ logger.warning("Audio muxing failed: %s", audio_log)
489
+ output_path = output_video_path
490
+ else:
491
+ output_path = output_video_path
492
+
493
+ except Exception as exc:
494
+ logger.exception("Video swap custom mapping failed: %s", exc)
495
+ shutil.rmtree(workdir, ignore_errors=True)
496
  raise HTTPException(status_code=500, detail="Video swap failed") from exc
497
 
498
  def cleanup() -> None:
499
  shutil.rmtree(workdir, ignore_errors=True)
 
 
500
  logger.info("Cleaned up workdir %s", workdir)
501
 
502
  background_tasks.add_task(cleanup)
503
  return StreamingResponse(open(output_path, "rb"), media_type="video/mp4")
504
 
505
+
506
+ @app.post("/swap/video/single-src-multi-video")
507
+ async def swap_single_src_multi_video(
508
+ background_tasks: BackgroundTasks,
509
+ source_image: UploadFile = File(..., description="Source face image"),
510
+ target_videos: list[UploadFile] = File(..., description="Target videos (mp4)"),
511
+ destination_indices: str = Form(..., description="Comma-separated destination face indices (e.g., '1,2,1')"),
512
+ add_audio: bool = Form(True, description="Add audio from original video"),
513
+ _: None = Depends(verify_api_key),
514
+ ) -> StreamingResponse:
515
+ """Swap a single source face onto multiple videos."""
516
+ src_bgr = decode_image(source_image)
517
+ dst_indices_list = [int(idx.strip()) for idx in destination_indices.split(",") if idx.strip().isdigit()]
518
+ workdir = tempfile.mkdtemp(prefix="single_src_multi_video_")
519
+ src_path = os.path.join(workdir, "data_src.jpg")
520
+ cv2.imwrite(src_path, src_bgr)
521
+
522
+ video_paths = []
523
+ for i, video_file in enumerate(target_videos):
524
+ video_bytes = await video_file.read()
525
+ video_path = os.path.join(workdir, f"video_{i}.mp4")
526
+ with open(video_path, "wb") as f:
527
+ f.write(video_bytes)
528
+ video_paths.append((video_path, dst_indices_list[i] if i < len(dst_indices_list) else 1))
529
+
530
+ try:
531
+ zip_buffer = BytesIO()
532
+ with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
533
+ for i, (video_path, dst_idx) in enumerate(video_paths):
534
+ try:
535
+ output_path, _ = swap_video_file(src_bgr, 1, video_path, dst_idx, add_audio, workdir)
536
+ zip_file.write(output_path, f"swapped_video_{i}.mp4")
537
+ except Exception as exc:
538
+ logger.warning("Failed to swap video %s: %s", i, exc)
539
+
540
+ zip_buffer.seek(0)
541
+ except Exception as exc:
542
+ logger.exception("Single src multi video swap failed: %s", exc)
543
+ shutil.rmtree(workdir, ignore_errors=True)
544
+ raise HTTPException(status_code=500, detail="Video swap failed") from exc
545
+
546
+ def cleanup() -> None:
547
+ shutil.rmtree(workdir, ignore_errors=True)
548
+ logger.info("Cleaned up workdir %s", workdir)
549
+
550
+ background_tasks.add_task(cleanup)
551
+ return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=swapped_videos.zip"})