import os import cv2 import uuid import time import numpy as np import insightface import concurrent.futures import traceback from fastapi import FastAPI, UploadFile, File, HTTPException, Form from fastapi.responses import HTMLResponse, StreamingResponse # HEIC SUPPORT try: import pillow_heif from PIL import Image pillow_heif.register_heif_opener() HEIC_SUPPORTED = True except: HEIC_SUPPORTED = False # ============================================================ # CONFIG # ============================================================ MAX_FILE_MB = 10 MAX_DIM = 640 MAX_WORKERS = 3 CLEANUP_TIME = 300 TASKS = {} executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) # ============================================================ # LOAD MODELS # ============================================================ face_app = insightface.app.FaceAnalysis(name="buffalo_l") face_app.prepare(ctx_id=-1, det_size=(640, 640)) swapper = insightface.model_zoo.get_model("inswapper_128.onnx", root=".") # ============================================================ # IMAGE HELPERS # ============================================================ def decode_image(file_bytes): arr = np.frombuffer(file_bytes, np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img is None and HEIC_SUPPORTED: try: from PIL import Image import io pil_img = Image.open(io.BytesIO(file_bytes)).convert("RGB") img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR) except: pass if img is None: raise ValueError("Unsupported image format") return img def compress_and_resize(file_bytes): img = decode_image(file_bytes) size_mb = len(file_bytes) / (1024 * 1024) if size_mb > MAX_FILE_MB: img = cv2.resize(img, None, fx=0.6, fy=0.6, interpolation=cv2.INTER_AREA) h, w = img.shape[:2] if max(h, w) > MAX_DIM: scale = MAX_DIM / max(h, w) img = cv2.resize(img, (int(w * scale), int(h * scale))) return img def enhance(img): blur = cv2.GaussianBlur(img, (0, 0), 1.2) return cv2.addWeighted(img, 1.2, blur, -0.2, 0) def cleanup(): now = time.time() remove = [] for k, v in TASKS.items(): if "time" in v and now - v["time"] > CLEANUP_TIME: try: if "result" in v: os.remove(v["result"]) except: pass remove.append(k) for k in remove: TASKS.pop(k, None) # ============================================================ # WORKER # ============================================================ def run_task(tid, src_bytes, tgt_bytes, filename, face_index): TASKS[tid]["status"] = "processing" try: src = compress_and_resize(src_bytes) tgt = compress_and_resize(tgt_bytes) s_faces = face_app.get(src) t_faces = face_app.get(tgt) if not s_faces or not t_faces: raise ValueError("Face not detected") face_index = min(face_index, len(t_faces) - 1) result = swapper.get(tgt, t_faces[face_index], s_faces[0], paste_back=True) result = enhance(result) name = os.path.splitext(filename)[0] out_path = f"/tmp/{name}_{tid}.png" cv2.imwrite(out_path, result, [cv2.IMWRITE_PNG_COMPRESSION, 3]) TASKS[tid] = { "status": "done", "result": out_path, "filename": f"{name}.png", "time": time.time() } except Exception as e: TASKS[tid] = {"status": "failed", "error": str(e)} print(traceback.format_exc()) # ============================================================ # FASTAPI # ============================================================ app = FastAPI() # ============================================================ # UI PAGE # ============================================================ @app.get("/", response_class=HTMLResponse) def home(): return """ Face Swap API Test

⚡ Face Swap Test UI

Upload Source
Upload Target





""" # ============================================================ # API # ============================================================ @app.post("/swap") async def swap( source: UploadFile = File(...), target: UploadFile = File(...), face_index: int = Form(0) ): tid = str(uuid.uuid4()) TASKS[tid] = {"status": "queued", "time": time.time()} executor.submit( run_task, tid, await source.read(), await target.read(), source.filename, face_index ) return {"task_id": tid} @app.get("/status/{tid}") def status(tid: str): cleanup() if tid not in TASKS: raise HTTPException(404) return TASKS[tid] @app.get("/result/{tid}") def result(tid: str): task = TASKS.get(tid) if not task or task["status"] != "done": raise HTTPException(404) return StreamingResponse( open(task["result"], "rb"), media_type="image/png", headers={ "Content-Disposition": f'attachment; filename="{task["filename"]}"' } )