Fix connection reset: Improve video streaming, increase timeouts, and add better error handling
33da647
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| import zipfile | |
| from io import BytesIO | |
| import cv2 | |
| import numpy as np | |
| from fastapi import BackgroundTasks, Depends, FastAPI, File, Form, Header, HTTPException, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from SinglePhoto import FaceSwapper | |
| from VideoSwapping import extract_frames, frames_to_video | |
| logger = logging.getLogger(__name__) | |
| API_KEY = os.getenv("FACE_SWAP_API_KEY") | |
| face_swapper = FaceSwapper() | |
| app = FastAPI(title="FaceSwapAll API", version="1.0.0", docs_url="/docs", redoc_url="/redoc") | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def verify_api_key(x_api_key: str | None = Header(default=None)) -> None: | |
| """Simple header-based API key guard.""" | |
| if API_KEY and x_api_key != API_KEY: | |
| logger.warning("Unauthorized request with invalid API key") | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| def decode_image(upload: UploadFile) -> np.ndarray: | |
| data = upload.file.read() | |
| image_array = np.frombuffer(data, dtype=np.uint8) | |
| bgr = cv2.imdecode(image_array, cv2.IMREAD_COLOR) | |
| if bgr is None: | |
| raise HTTPException(status_code=400, detail=f"Unable to decode image: {upload.filename}") | |
| return bgr | |
| def add_audio_to_video(original_video_path: str, video_no_audio_path: str, output_path: str) -> tuple[bool, str]: | |
| """Add audio from original video to swapped video.""" | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", video_no_audio_path, | |
| "-i", original_video_path, | |
| "-c:v", "copy", | |
| "-c:a", "aac", | |
| "-map", "0:v:0", | |
| "-map", "1:a:0?", | |
| "-shortest", | |
| output_path | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| return True, "" | |
| except subprocess.CalledProcessError as e: | |
| return False, e.stderr.decode() if e.stderr else "Unknown error" | |
| def swap_video_file( | |
| src_bgr: np.ndarray, | |
| src_idx: int, | |
| video_path: str, | |
| dst_idx: int, | |
| add_audio: bool = True, | |
| workdir: str | None = None | |
| ) -> tuple[str, str]: | |
| """Run face swap on a video file. Returns (output_video_path, workdir).""" | |
| if workdir is None: | |
| workdir = tempfile.mkdtemp(prefix="swap_video_") | |
| src_path = os.path.join(workdir, "data_src.jpg") | |
| dst_video_path = os.path.join(workdir, "data_dst.mp4") | |
| frames_dir = os.path.join(workdir, "video_frames") | |
| swapped_dir = os.path.join(workdir, "swapped_frames") | |
| output_video_path = os.path.join(workdir, "output_tmp_output_video.mp4") | |
| final_output_path = os.path.join(workdir, "output_with_audio.mp4") | |
| os.makedirs(frames_dir, exist_ok=True) | |
| os.makedirs(swapped_dir, exist_ok=True) | |
| cv2.imwrite(src_path, src_bgr) | |
| shutil.copy(video_path, dst_video_path) | |
| frame_paths = extract_frames(dst_video_path, frames_dir) | |
| total_frames = len(frame_paths) | |
| logger.info("Extracted %s frames to %s", total_frames, frames_dir) | |
| for idx, frame_path in enumerate(frame_paths): | |
| if idx % 50 == 0 or idx == total_frames - 1: | |
| logger.info("Processing frame %s/%s (%.1f%%)", idx + 1, total_frames, (idx + 1) / total_frames * 100) | |
| swapped_name = f"swapped_{idx:05d}.jpg" | |
| out_path = os.path.join(swapped_dir, swapped_name) | |
| try: | |
| try: | |
| swapped = face_swapper.swap_faces(src_path, src_idx, frame_path, dst_idx) | |
| cv2.imwrite(out_path, swapped) | |
| except ValueError as err: | |
| if "Target image contains" in str(err) or "Source image contains" in str(err): | |
| # No face detected, use original frame | |
| original_frame = cv2.imread(frame_path) | |
| if original_frame is not None: | |
| cv2.imwrite(out_path, original_frame) | |
| logger.debug("Frame %s: No face detected, using original", idx) | |
| else: | |
| logger.warning("Frame %s: Could not read original frame", idx) | |
| elif dst_idx != 1 and "Target image contains" in str(err): | |
| swapped = face_swapper.swap_faces(src_path, src_idx, frame_path, 1) | |
| cv2.imwrite(out_path, swapped) | |
| logger.info("Frame %s fallback to dst_idx 1", idx) | |
| else: | |
| raise | |
| except Exception as exc: | |
| logger.warning("Failed to swap frame %s: %s", idx, exc) | |
| # Always write original frame as fallback | |
| original_frame = cv2.imread(frame_path) | |
| if original_frame is not None: | |
| cv2.imwrite(out_path, original_frame) | |
| else: | |
| logger.error("Frame %s: Could not read original frame for fallback", idx) | |
| cap = cv2.VideoCapture(dst_video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| cap.release() | |
| # Use ffmpeg for more reliable video encoding | |
| frame_pattern = os.path.join(swapped_dir, "swapped_%05d.jpg") | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-framerate", str(fps), | |
| "-i", frame_pattern, | |
| "-c:v", "libx264", | |
| "-pix_fmt", "yuv420p", | |
| "-crf", "23", | |
| output_video_path | |
| ] | |
| try: | |
| result = subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| logger.info("Created swapped video at %s using ffmpeg", output_video_path) | |
| except subprocess.CalledProcessError as e: | |
| logger.error("FFmpeg encoding failed: %s, %s", e.stderr, e.stdout) | |
| # Fallback to OpenCV method | |
| frames_to_video(swapped_dir, output_video_path, fps) | |
| logger.info("Created swapped video at %s using OpenCV fallback", output_video_path) | |
| if add_audio: | |
| ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path) | |
| if ok: | |
| logger.info("Added audio to %s", final_output_path) | |
| return final_output_path, workdir | |
| else: | |
| logger.warning("Audio muxing failed: %s", audio_log) | |
| return output_video_path, workdir | |
| return output_video_path, workdir | |
| def root() -> JSONResponse: | |
| """Root endpoint with API information.""" | |
| return JSONResponse({ | |
| "name": "FaceSwapAll API", | |
| "version": "1.0.0", | |
| "status": "running", | |
| "endpoints": { | |
| "health": "/health", | |
| "docs": "/docs", | |
| "photo_swap": "/swap/photo", | |
| "video_swap": "/swap/video", | |
| "photo_single_src_multi_dst": "/swap/photo/single-src-multi-dst", | |
| "photo_multi_src_single_dst": "/swap/photo/multi-src-single-dst", | |
| "photo_multi_src_multi_dst": "/swap/photo/multi-src-multi-dst", | |
| "photo_custom_mapping": "/swap/photo/custom-mapping", | |
| "video_all_faces": "/swap/video/all-faces", | |
| "video_custom_mapping": "/swap/video/custom-mapping", | |
| "video_single_src_multi_video": "/swap/video/single-src-multi-video" | |
| } | |
| }) | |
| def health() -> JSONResponse: | |
| """Health check endpoint.""" | |
| return JSONResponse({ | |
| "status": "ok", | |
| "service": "FaceSwapAll API", | |
| "version": "1.0.0" | |
| }) | |
| async def swap_photo( | |
| source_image: UploadFile = File(..., description="Source face image"), | |
| destination_image: UploadFile = File(..., description="Destination image"), | |
| source_face_idx: int = Form(1, ge=1), | |
| destination_face_idx: int = Form(1, ge=1), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap a single face from source image to destination image.""" | |
| src_bgr = decode_image(source_image) | |
| dst_bgr = decode_image(destination_image) | |
| with tempfile.TemporaryDirectory(prefix="swap_photo_") as workdir: | |
| src_path = os.path.join(workdir, "src.jpg") | |
| dst_path = os.path.join(workdir, "dst.jpg") | |
| cv2.imwrite(src_path, src_bgr) | |
| cv2.imwrite(dst_path, dst_bgr) | |
| try: | |
| swapped = face_swapper.swap_faces(src_path, source_face_idx, dst_path, destination_face_idx) | |
| except ValueError as err: | |
| if destination_face_idx != 1 and "Target image contains" in str(err): | |
| swapped = face_swapper.swap_faces(src_path, source_face_idx, dst_path, 1) | |
| logger.info("Fallback to destination_face_idx 1 after missing face") | |
| else: | |
| raise HTTPException(status_code=400, detail=str(err)) | |
| ok, buffer = cv2.imencode(".jpg", swapped) | |
| if not ok: | |
| raise HTTPException(status_code=500, detail="Failed to encode swapped image") | |
| return StreamingResponse(BytesIO(buffer.tobytes()), media_type="image/jpeg") | |
| async def swap_single_src_multi_dst( | |
| source_image: UploadFile = File(..., description="Source face image"), | |
| destination_images: list[UploadFile] = File(..., description="Destination images"), | |
| destination_indices: str = Form(..., description="Comma-separated destination face indices (e.g., '1,1,2')"), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap a single source face onto multiple destination images.""" | |
| src_bgr = decode_image(source_image) | |
| dst_indices_list = [int(idx.strip()) for idx in destination_indices.split(",") if idx.strip().isdigit()] | |
| with tempfile.TemporaryDirectory(prefix="single_src_multi_dst_") as workdir: | |
| src_path = os.path.join(workdir, "src.jpg") | |
| cv2.imwrite(src_path, src_bgr) | |
| zip_buffer = BytesIO() | |
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: | |
| for j, dst_file in enumerate(destination_images): | |
| dst_bgr = decode_image(dst_file) | |
| dst_path = os.path.join(workdir, f"dst_{j}.jpg") | |
| output_path = os.path.join(workdir, f"output_{j}.jpg") | |
| cv2.imwrite(dst_path, dst_bgr) | |
| try: | |
| dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1 | |
| swapped = face_swapper.swap_faces(src_path, 1, dst_path, dst_idx) | |
| cv2.imwrite(output_path, swapped) | |
| zip_file.write(output_path, f"swapped_{j}.jpg") | |
| except Exception as exc: | |
| logger.warning("Failed to swap destination %s: %s", j, exc) | |
| zip_buffer.seek(0) | |
| return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=swapped_images.zip"}) | |
| async def swap_multi_src_single_dst( | |
| source_images: list[UploadFile] = File(..., description="Source face images"), | |
| destination_image: UploadFile = File(..., description="Destination image"), | |
| destination_face_idx: int = Form(1, ge=1), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap multiple source faces onto a single destination image.""" | |
| dst_bgr = decode_image(destination_image) | |
| with tempfile.TemporaryDirectory(prefix="multi_src_single_dst_") as workdir: | |
| dst_path = os.path.join(workdir, "dst.jpg") | |
| cv2.imwrite(dst_path, dst_bgr) | |
| zip_buffer = BytesIO() | |
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: | |
| for i, src_file in enumerate(source_images): | |
| src_bgr = decode_image(src_file) | |
| src_path = os.path.join(workdir, f"src_{i}.jpg") | |
| output_path = os.path.join(workdir, f"output_{i}.jpg") | |
| cv2.imwrite(src_path, src_bgr) | |
| try: | |
| swapped = face_swapper.swap_faces(src_path, 1, dst_path, destination_face_idx) | |
| cv2.imwrite(output_path, swapped) | |
| zip_file.write(output_path, f"swapped_{i}.jpg") | |
| except Exception as exc: | |
| logger.warning("Failed to swap source %s: %s", i, exc) | |
| zip_buffer.seek(0) | |
| return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=swapped_images.zip"}) | |
| async def swap_multi_src_multi_dst( | |
| source_images: list[UploadFile] = File(..., description="Source face images"), | |
| destination_images: list[UploadFile] = File(..., description="Destination images"), | |
| destination_indices: str = Form(..., description="Comma-separated destination face indices (e.g., '1,1,2')"), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap multiple source faces onto multiple destination images.""" | |
| dst_indices_list = [int(idx.strip()) for idx in destination_indices.split(",") if idx.strip().isdigit()] | |
| with tempfile.TemporaryDirectory(prefix="multi_src_multi_dst_") as workdir: | |
| zip_buffer = BytesIO() | |
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: | |
| for i, src_file in enumerate(source_images): | |
| src_bgr = decode_image(src_file) | |
| src_path = os.path.join(workdir, f"src_{i}.jpg") | |
| cv2.imwrite(src_path, src_bgr) | |
| for j, dst_file in enumerate(destination_images): | |
| dst_bgr = decode_image(dst_file) | |
| dst_path = os.path.join(workdir, f"dst_{j}.jpg") | |
| output_path = os.path.join(workdir, f"output_{i}_{j}.jpg") | |
| try: | |
| dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1 | |
| swapped = face_swapper.swap_faces(src_path, 1, dst_path, dst_idx) | |
| cv2.imwrite(output_path, swapped) | |
| zip_file.write(output_path, f"swapped_src{i}_dst{j}.jpg") | |
| except Exception as exc: | |
| logger.warning("Failed to swap src %s with dst %s: %s", i, j, exc) | |
| zip_buffer.seek(0) | |
| return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=swapped_images.zip"}) | |
| async def swap_faces_custom( | |
| source_images: list[UploadFile] = File(..., description="Source face images"), | |
| destination_image: UploadFile = File(..., description="Destination image"), | |
| mapping: str = Form(..., description="Comma-separated mapping (e.g., '2,1,3' means face1->src2, face2->src1, face3->src3)"), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap faces with custom mapping: map destination faces to specific source images.""" | |
| dst_bgr = decode_image(destination_image) | |
| mapping_list = [int(x.strip()) for x in mapping.split(",") if x.strip().isdigit()] | |
| with tempfile.TemporaryDirectory(prefix="custom_swap_") as workdir: | |
| dst_path = os.path.join(workdir, "dst.jpg") | |
| temp_path = os.path.join(workdir, "temp.jpg") | |
| output_path = os.path.join(workdir, "output.jpg") | |
| cv2.imwrite(dst_path, dst_bgr) | |
| shutil.copy(dst_path, temp_path) | |
| src_paths = [] | |
| for i, src_file in enumerate(source_images): | |
| src_bgr = decode_image(src_file) | |
| src_path = os.path.join(workdir, f"src_{i+1}.jpg") | |
| cv2.imwrite(src_path, src_bgr) | |
| src_paths.append(src_path) | |
| for face_idx, src_idx in enumerate(mapping_list, start=1): | |
| if src_idx < 1 or src_idx > len(src_paths): | |
| logger.warning("Invalid source index %s for face %s", src_idx, face_idx) | |
| continue | |
| try: | |
| swapped_img = face_swapper.swap_faces(src_paths[src_idx-1], 1, temp_path, face_idx) | |
| cv2.imwrite(temp_path, swapped_img) | |
| except Exception as exc: | |
| logger.warning("Failed to swap face %s with source %s: %s", face_idx, src_idx, exc) | |
| shutil.copy(temp_path, output_path) | |
| ok, buffer = cv2.imencode(".jpg", cv2.imread(output_path)) | |
| if not ok: | |
| raise HTTPException(status_code=500, detail="Failed to encode swapped image") | |
| return StreamingResponse(BytesIO(buffer.tobytes()), media_type="image/jpeg") | |
| async def swap_video( | |
| background_tasks: BackgroundTasks, | |
| source_image: UploadFile = File(..., description="Source face image"), | |
| target_video: UploadFile = File(..., description="Target video (mp4)"), | |
| source_face_idx: int = Form(1, ge=1), | |
| destination_face_idx: int = Form(1, ge=1), | |
| add_audio: bool = Form(True, description="Add audio from original video"), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap a single face from source image onto all frames of target video.""" | |
| src_bgr = decode_image(source_image) | |
| workdir = tempfile.mkdtemp(prefix="swap_video_") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4", dir=workdir) as tmp_video: | |
| video_bytes = await target_video.read() | |
| tmp_video.write(video_bytes) | |
| video_path = tmp_video.name | |
| try: | |
| logger.info("Starting video processing...") | |
| output_path, _ = swap_video_file(src_bgr, source_face_idx, video_path, destination_face_idx, add_audio, workdir) | |
| logger.info("Video processing completed: %s", output_path) | |
| if not os.path.exists(output_path): | |
| raise HTTPException(status_code=500, detail="Output video file was not created") | |
| file_size = os.path.getsize(output_path) | |
| logger.info("Output video size: %s bytes", file_size) | |
| except Exception as exc: | |
| logger.exception("Video swap failed: %s", exc) | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail=f"Video swap failed: {str(exc)}") from exc | |
| def cleanup() -> None: | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| logger.info("Cleaned up workdir %s", workdir) | |
| background_tasks.add_task(cleanup) | |
| def iterfile(): | |
| try: | |
| with open(output_path, "rb") as f: | |
| while True: | |
| chunk = f.read(8192) # Read in 8KB chunks | |
| if not chunk: | |
| break | |
| yield chunk | |
| except Exception as e: | |
| logger.error("Error streaming video file: %s", e) | |
| raise | |
| file_size = os.path.getsize(output_path) | |
| logger.info("Streaming video file: %s bytes", file_size) | |
| return StreamingResponse( | |
| iterfile(), | |
| media_type="video/mp4", | |
| headers={ | |
| "Content-Disposition": "attachment; filename=swapped_video.mp4", | |
| "Content-Length": str(file_size), | |
| "Accept-Ranges": "bytes" | |
| } | |
| ) | |
| async def swap_video_all_faces( | |
| background_tasks: BackgroundTasks, | |
| source_image: UploadFile = File(..., description="Source face image"), | |
| target_video: UploadFile = File(..., description="Target video (mp4)"), | |
| num_faces_to_swap: int = Form(1, ge=1, description="Number of faces to swap"), | |
| add_audio: bool = Form(True, description="Add audio from original video"), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap all faces in video with the same source face.""" | |
| src_bgr = decode_image(source_image) | |
| workdir = tempfile.mkdtemp(prefix="swap_video_all_") | |
| src_path = os.path.join(workdir, "data_src.jpg") | |
| cv2.imwrite(src_path, src_bgr) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4", dir=workdir) as tmp_video: | |
| video_bytes = await target_video.read() | |
| tmp_video.write(video_bytes) | |
| video_path = tmp_video.name | |
| try: | |
| dst_video_path = os.path.join(workdir, "data_dst.mp4") | |
| frames_dir = os.path.join(workdir, "video_frames") | |
| swapped_dir = os.path.join(workdir, "swapped_frames") | |
| temp_dir = os.path.join(swapped_dir, "temp_swap") | |
| output_video_path = os.path.join(workdir, "output_tmp_output_video.mp4") | |
| final_output_path = os.path.join(workdir, "output_with_audio.mp4") | |
| os.makedirs(frames_dir, exist_ok=True) | |
| os.makedirs(swapped_dir, exist_ok=True) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| shutil.copy(video_path, dst_video_path) | |
| frame_paths = extract_frames(dst_video_path, frames_dir) | |
| logger.info("Extracted %s frames", len(frame_paths)) | |
| temp_frame_path = os.path.join(temp_dir, "temp.jpg") | |
| for idx, frame_path in enumerate(frame_paths): | |
| swapped_name = f"swapped_{idx:05d}.jpg" | |
| out_path = os.path.join(swapped_dir, swapped_name) | |
| try: | |
| shutil.copy(frame_path, temp_frame_path) | |
| for face_idx in range(1, num_faces_to_swap + 1): | |
| try: | |
| swapped_img = face_swapper.swap_faces(src_path, 1, temp_frame_path, face_idx) | |
| cv2.imwrite(temp_frame_path, swapped_img) | |
| except Exception as exc: | |
| logger.warning("Failed to swap face %s in frame %s: %s", face_idx, idx, exc) | |
| shutil.copy(temp_frame_path, out_path) | |
| except Exception as exc: | |
| logger.warning("Failed to swap frame %s: %s", idx, exc) | |
| cv2.imwrite(out_path, cv2.imread(frame_path)) | |
| cap = cv2.VideoCapture(dst_video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| cap.release() | |
| frames_to_video(swapped_dir, output_video_path, fps) | |
| if add_audio: | |
| ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path) | |
| if ok: | |
| output_path = final_output_path | |
| else: | |
| logger.warning("Audio muxing failed: %s", audio_log) | |
| output_path = output_video_path | |
| else: | |
| output_path = output_video_path | |
| except Exception as exc: | |
| logger.exception("Video swap all faces failed: %s", exc) | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail="Video swap failed") from exc | |
| def cleanup() -> None: | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| logger.info("Cleaned up workdir %s", workdir) | |
| background_tasks.add_task(cleanup) | |
| def iterfile(): | |
| try: | |
| with open(output_path, "rb") as f: | |
| while True: | |
| chunk = f.read(8192) | |
| if not chunk: | |
| break | |
| yield chunk | |
| except Exception as e: | |
| logger.error("Error streaming video file: %s", e) | |
| raise | |
| file_size = os.path.getsize(output_path) | |
| return StreamingResponse( | |
| iterfile(), | |
| media_type="video/mp4", | |
| headers={ | |
| "Content-Disposition": "attachment; filename=swapped_video.mp4", | |
| "Content-Length": str(file_size), | |
| "Accept-Ranges": "bytes" | |
| } | |
| ) | |
| async def swap_video_custom_mapping( | |
| background_tasks: BackgroundTasks, | |
| source_images: list[UploadFile] = File(..., description="Source face images"), | |
| target_video: UploadFile = File(..., description="Target video (mp4)"), | |
| mapping: str = Form(..., description="Comma-separated mapping (e.g., '2,1,3')"), | |
| add_audio: bool = Form(True, description="Add audio from original video"), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap faces in video with custom mapping: map destination faces to specific source images.""" | |
| workdir = tempfile.mkdtemp(prefix="swap_video_custom_") | |
| mapping_list = [int(x.strip()) for x in mapping.split(",") if x.strip().isdigit()] | |
| src_paths = [] | |
| for i, src_file in enumerate(source_images): | |
| src_bgr = decode_image(src_file) | |
| src_path = os.path.join(workdir, f"src_{i+1}.jpg") | |
| cv2.imwrite(src_path, src_bgr) | |
| src_paths.append(src_path) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4", dir=workdir) as tmp_video: | |
| video_bytes = await target_video.read() | |
| tmp_video.write(video_bytes) | |
| video_path = tmp_video.name | |
| try: | |
| dst_video_path = os.path.join(workdir, "data_dst.mp4") | |
| temp_dir = os.path.join(workdir, "temp") | |
| frames_dir = os.path.join(workdir, "frames") | |
| swapped_dir = os.path.join(workdir, "swapped_frames") | |
| output_video_path = os.path.join(workdir, "output_tmp_output_video.mp4") | |
| final_output_path = os.path.join(workdir, "output_with_audio.mp4") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| os.makedirs(frames_dir, exist_ok=True) | |
| os.makedirs(swapped_dir, exist_ok=True) | |
| shutil.copy(video_path, dst_video_path) | |
| frame_paths = extract_frames(dst_video_path, frames_dir) | |
| logger.info("Extracted %s frames", len(frame_paths)) | |
| temp_frame_path = os.path.join(temp_dir, "temp.jpg") | |
| for idx, frame_path in enumerate(frame_paths): | |
| swapped_name = f"swapped_{idx:05d}.jpg" | |
| out_path = os.path.join(swapped_dir, swapped_name) | |
| try: | |
| shutil.copy(frame_path, temp_frame_path) | |
| for face_idx, src_idx in enumerate(mapping_list, start=1): | |
| if src_idx < 1 or src_idx > len(src_paths): | |
| logger.warning("Invalid source index %s for face %s in frame %s", src_idx, face_idx, idx) | |
| continue | |
| try: | |
| swapped_img = face_swapper.swap_faces(src_paths[src_idx-1], 1, temp_frame_path, face_idx) | |
| cv2.imwrite(temp_frame_path, swapped_img) | |
| except Exception as exc: | |
| logger.warning("Failed to swap face %s with source %s in frame %s: %s", face_idx, src_idx, idx, exc) | |
| shutil.copy(temp_frame_path, out_path) | |
| except Exception as exc: | |
| logger.warning("Failed to swap frame %s: %s", idx, exc) | |
| cv2.imwrite(out_path, cv2.imread(frame_path)) | |
| cap = cv2.VideoCapture(dst_video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| cap.release() | |
| frames_to_video(swapped_dir, output_video_path, fps) | |
| if add_audio: | |
| ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path) | |
| if ok: | |
| output_path = final_output_path | |
| else: | |
| logger.warning("Audio muxing failed: %s", audio_log) | |
| output_path = output_video_path | |
| else: | |
| output_path = output_video_path | |
| except Exception as exc: | |
| logger.exception("Video swap custom mapping failed: %s", exc) | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail="Video swap failed") from exc | |
| def cleanup() -> None: | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| logger.info("Cleaned up workdir %s", workdir) | |
| background_tasks.add_task(cleanup) | |
| def iterfile(): | |
| try: | |
| with open(output_path, "rb") as f: | |
| while True: | |
| chunk = f.read(8192) | |
| if not chunk: | |
| break | |
| yield chunk | |
| except Exception as e: | |
| logger.error("Error streaming video file: %s", e) | |
| raise | |
| file_size = os.path.getsize(output_path) | |
| return StreamingResponse( | |
| iterfile(), | |
| media_type="video/mp4", | |
| headers={ | |
| "Content-Disposition": "attachment; filename=swapped_video.mp4", | |
| "Content-Length": str(file_size), | |
| "Accept-Ranges": "bytes" | |
| } | |
| ) | |
| async def swap_single_src_multi_video( | |
| background_tasks: BackgroundTasks, | |
| source_image: UploadFile = File(..., description="Source face image"), | |
| target_videos: list[UploadFile] = File(..., description="Target videos (mp4)"), | |
| destination_indices: str = Form(..., description="Comma-separated destination face indices (e.g., '1,2,1')"), | |
| add_audio: bool = Form(True, description="Add audio from original video"), | |
| _: None = Depends(verify_api_key), | |
| ) -> StreamingResponse: | |
| """Swap a single source face onto multiple videos.""" | |
| src_bgr = decode_image(source_image) | |
| dst_indices_list = [int(idx.strip()) for idx in destination_indices.split(",") if idx.strip().isdigit()] | |
| workdir = tempfile.mkdtemp(prefix="single_src_multi_video_") | |
| src_path = os.path.join(workdir, "data_src.jpg") | |
| cv2.imwrite(src_path, src_bgr) | |
| video_paths = [] | |
| for i, video_file in enumerate(target_videos): | |
| video_bytes = await video_file.read() | |
| video_path = os.path.join(workdir, f"video_{i}.mp4") | |
| with open(video_path, "wb") as f: | |
| f.write(video_bytes) | |
| video_paths.append((video_path, dst_indices_list[i] if i < len(dst_indices_list) else 1)) | |
| try: | |
| zip_buffer = BytesIO() | |
| with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: | |
| for i, (video_path, dst_idx) in enumerate(video_paths): | |
| try: | |
| output_path, _ = swap_video_file(src_bgr, 1, video_path, dst_idx, add_audio, workdir) | |
| zip_file.write(output_path, f"swapped_video_{i}.mp4") | |
| except Exception as exc: | |
| logger.warning("Failed to swap video %s: %s", i, exc) | |
| zip_buffer.seek(0) | |
| except Exception as exc: | |
| logger.exception("Single src multi video swap failed: %s", exc) | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| raise HTTPException(status_code=500, detail="Video swap failed") from exc | |
| def cleanup() -> None: | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| logger.info("Cleaned up workdir %s", workdir) | |
| background_tasks.add_task(cleanup) | |
| return StreamingResponse(zip_buffer, media_type="application/zip", headers={"Content-Disposition": "attachment; filename=swapped_videos.zip"}) | |