Spaces:
Sleeping
Sleeping
| #####################FASTAPI___________________############## | |
| import os | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| import shutil | |
| import uuid | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| import asyncio | |
| import subprocess | |
| import logging | |
| import tempfile | |
| import sys | |
| import time | |
| from datetime import datetime,timedelta | |
| import tempfile | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| from huggingface_hub import hf_hub_download | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Depends, Security, Form | |
| from fastapi.responses import RedirectResponse | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| import httpx | |
| import uvicorn | |
| from PIL import Image | |
| import io | |
| import requests | |
| # DigitalOcean Spaces | |
| import boto3 | |
| from botocore.client import Config | |
| from typing import Optional | |
| # --------------------- Logging --------------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --------------------- Secrets & Paths --------------------- | |
| REPO_ID = "HariLogicgo/face_swap_models" | |
| MODELS_DIR = "./models" | |
| os.makedirs(MODELS_DIR, exist_ok=True) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| DO_SPACES_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| DO_SPACES_ENDPOINT = f"https://{DO_SPACES_REGION}.digitaloceanspaces.com" | |
| DO_SPACES_KEY = os.getenv("DO_SPACES_KEY") | |
| DO_SPACES_SECRET = os.getenv("DO_SPACES_SECRET") | |
| DO_SPACES_BUCKET = os.getenv("DO_SPACES_BUCKET") | |
| # NEW admin DB (with error handling for missing env vars) | |
| ADMIN_MONGO_URL = os.getenv("ADMIN_MONGO_URL") | |
| admin_client = None | |
| admin_db = None | |
| subcategories_col = None | |
| if ADMIN_MONGO_URL: | |
| try: | |
| admin_client = AsyncIOMotorClient(ADMIN_MONGO_URL) | |
| admin_db = admin_client.adminPanel | |
| subcategories_col = admin_db.subcategories | |
| except Exception as e: | |
| logger.warning(f"MongoDB admin connection failed (optional): {e}") | |
| # Collage Maker DB (optional) | |
| COLLAGE_MAKER_DB_URL = os.getenv("COLLAGE_MAKER_DB_URL") | |
| collage_maker_client = None | |
| collage_maker_db = None | |
| collage_subcategories_col = None | |
| if COLLAGE_MAKER_DB_URL: | |
| try: | |
| collage_maker_client = AsyncIOMotorClient(COLLAGE_MAKER_DB_URL) | |
| collage_maker_db = collage_maker_client.adminPanel | |
| collage_subcategories_col = collage_maker_db.subcategories | |
| except Exception as e: | |
| logger.warning(f"MongoDB collage-maker connection failed (optional): {e}") | |
| # AI Enhancer DB (optional) | |
| AI_ENHANCER_DB_URL = os.getenv("AI_ENHANCER_DB_URL") | |
| ai_enhancer_client = None | |
| ai_enhancer_db = None | |
| ai_enhancer_subcategories_col = None | |
| if AI_ENHANCER_DB_URL: | |
| try: | |
| ai_enhancer_client = AsyncIOMotorClient(AI_ENHANCER_DB_URL) | |
| ai_enhancer_db = ai_enhancer_client.test # 🔴 test database | |
| ai_enhancer_subcategories_col = ai_enhancer_db.subcategories | |
| except Exception as e: | |
| logger.warning(f"MongoDB ai-enhancer connection failed (optional): {e}") | |
| # OLD logs DB | |
| MONGODB_URL = os.getenv("MONGODB_URL_LOGS") | |
| client = None | |
| database = None | |
| # --------------------- Download Models --------------------- | |
| def download_models(): | |
| try: | |
| logger.info("Downloading models...") | |
| inswapper_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="models/inswapper_128.onnx", | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| buffalo_files = ["1k3d68.onnx", "2d106det.onnx", "genderage.onnx", "det_10g.onnx", "w600k_r50.onnx"] | |
| for f in buffalo_files: | |
| hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=f"models/buffalo_l/" + f, | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| logger.info("Models downloaded successfully.") | |
| return inswapper_path | |
| except Exception as e: | |
| logger.error(f"Model download failed: {e}") | |
| raise | |
| try: | |
| inswapper_path = download_models() | |
| # --------------------- Face Analysis + Swapper --------------------- | |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| logger.info("Face analysis models loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize face analysis models: {e}") | |
| # Set defaults to prevent crash | |
| inswapper_path = None | |
| face_analysis_app = None | |
| swapper = None | |
| # --------------------- CodeFormer --------------------- | |
| CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| def ensure_codeformer(): | |
| """ | |
| Ensure CodeFormer's local basicsr + facelib are importable and | |
| pretrained weights are downloaded. No setup.py needed — we use | |
| sys.path / PYTHONPATH instead. | |
| """ | |
| try: | |
| if not os.path.exists("CodeFormer"): | |
| logger.info("CodeFormer not found, cloning repository...") | |
| subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=False) | |
| # Add CodeFormer root to sys.path so `import basicsr` and | |
| # `import facelib` resolve to the local (compatible) versions | |
| # instead of the broken PyPI basicsr==1.4.2. | |
| codeformer_root = os.path.join(os.getcwd(), "CodeFormer") | |
| if codeformer_root not in sys.path: | |
| sys.path.insert(0, codeformer_root) | |
| logger.info(f"Added {codeformer_root} to sys.path for local basicsr/facelib") | |
| # NOTE: We do NOT need the PyPI 'realesrgan' package. | |
| # Both in-process and subprocess paths use CodeFormer's local | |
| # basicsr.utils.realesrgan_utils.RealESRGANer instead. | |
| # Installing PyPI realesrgan at runtime would re-install the | |
| # broken basicsr==1.4.2 and break everything. | |
| # Download pretrained weights if not already present | |
| if os.path.exists("CodeFormer"): | |
| try: | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=False, timeout=300) | |
| except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| logger.warning("Failed to download facelib models (optional)") | |
| try: | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=False, timeout=300) | |
| except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| logger.warning("Failed to download CodeFormer models (optional)") | |
| except Exception as e: | |
| logger.error(f"CodeFormer setup failed: {e}") | |
| logger.warning("Continuing without CodeFormer features...") | |
| ensure_codeformer() | |
| # --------------------- In-Process CodeFormer (No Subprocess!) --------------------- | |
| # Load CodeFormer models ONCE at startup instead of spawning a new Python process per request. | |
| # This eliminates 15-40s of model loading overhead per request. | |
| codeformer_net = None | |
| codeformer_upsampler = None | |
| codeformer_face_helper = None | |
| codeformer_device = None | |
| def init_codeformer_in_process(): | |
| """Load CodeFormer models once into memory for fast per-request inference.""" | |
| global codeformer_net, codeformer_upsampler, codeformer_face_helper, codeformer_device | |
| try: | |
| import torch | |
| from torchvision.transforms.functional import normalize as torch_normalize | |
| # Add CodeFormer to Python path | |
| codeformer_root = os.path.join(os.getcwd(), "CodeFormer") | |
| if codeformer_root not in sys.path: | |
| sys.path.insert(0, codeformer_root) | |
| from basicsr.utils.registry import ARCH_REGISTRY | |
| from basicsr.utils.download_util import load_file_from_url | |
| from facelib.utils.face_restoration_helper import FaceRestoreHelper | |
| codeformer_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| logger.info(f"Initializing CodeFormer on device: {codeformer_device}") | |
| # 1) Load CodeFormer network | |
| net = ARCH_REGISTRY.get('CodeFormer')( | |
| dim_embd=512, codebook_size=1024, n_head=8, n_layers=9, | |
| connect_list=['32', '64', '128', '256'] | |
| ).to(codeformer_device) | |
| ckpt_path = load_file_from_url( | |
| url='https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/codeformer.pth', | |
| model_dir='weights/CodeFormer', progress=True, file_name=None | |
| ) | |
| checkpoint = torch.load(ckpt_path, map_location=codeformer_device)['params_ema'] | |
| net.load_state_dict(checkpoint) | |
| net.eval() | |
| codeformer_net = net | |
| # 2) RealESRGAN upsampler — SKIPPED for face swap | |
| # Background/face upsampling is the #1 bottleneck (~20s per image). | |
| # For face swap we only need CodeFormer face restoration, not super-resolution. | |
| # The upsampler is kept as None; we no longer download the 64MB model at startup. | |
| codeformer_upsampler = None | |
| # 3) Create FaceRestoreHelper (reused per request) | |
| # NOTE: local CodeFormer uses "upscale_factor" (not "upscale") | |
| # upscale_factor=1 → keep original resolution (no 2x upscale needed for face swap) | |
| codeformer_face_helper = FaceRestoreHelper( | |
| upscale_factor=1, | |
| face_size=512, | |
| crop_ratio=(1, 1), | |
| det_model='retinaface_resnet50', | |
| save_ext='png', | |
| use_parse=True, | |
| device=codeformer_device | |
| ) | |
| logger.info("✅ CodeFormer models loaded in-process successfully!") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to load CodeFormer in-process: {e}") | |
| logger.warning("CodeFormer enhancement will be unavailable.") | |
| return False | |
| # Try to load CodeFormer models in-process | |
| _codeformer_available = init_codeformer_in_process() | |
| # --------------------- FastAPI --------------------- | |
| fastapi_app = FastAPI() | |
| async def startup_db(): | |
| global client, database | |
| if MONGODB_URL: | |
| try: | |
| logger.info("Initializing MongoDB for API logs...") | |
| client = AsyncIOMotorClient(MONGODB_URL) | |
| database = client.logs | |
| logger.info("MongoDB initialized for API logs") | |
| except Exception as e: | |
| logger.warning(f"MongoDB connection failed (optional): {e}") | |
| client = None | |
| database = None | |
| else: | |
| logger.warning("MONGODB_URL not set, skipping MongoDB initialization") | |
| async def shutdown_db(): | |
| global client, admin_client, collage_maker_client | |
| if client is not None: | |
| client.close() | |
| logger.info("MongoDB connection closed") | |
| if admin_client is not None: | |
| admin_client.close() | |
| logger.info("Admin MongoDB connection closed") | |
| if collage_maker_client is not None: | |
| collage_maker_client.close() | |
| logger.info("Collage Maker MongoDB connection closed") | |
| # --------------------- Auth --------------------- | |
| security = HTTPBearer() | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| if credentials.credentials != API_SECRET_TOKEN: | |
| raise HTTPException(status_code=401, detail="Invalid or missing token") | |
| return credentials.credentials | |
| # --------------------- DB Selector --------------------- | |
| def get_app_db_collections(appname: Optional[str] = None): | |
| """ | |
| Returns subcategories_collection based on appname. | |
| """ | |
| if appname: | |
| app = appname.strip().lower() | |
| if app == "collage-maker": | |
| if collage_subcategories_col is not None: | |
| return collage_subcategories_col | |
| logger.warning("Collage-maker DB not configured, falling back to admin") | |
| elif app == "ai-enhancer": | |
| if ai_enhancer_subcategories_col is not None: | |
| return ai_enhancer_subcategories_col | |
| logger.warning("AI-Enhancer DB not configured, falling back to admin") | |
| # default fallback | |
| return subcategories_col | |
| # --------------------- Logging API Hits --------------------- | |
| async def log_faceswap_hit(token: str, status: str = "success"): | |
| global database | |
| if database is None: | |
| return | |
| await database.faceswap.insert_one({ | |
| "token": token, | |
| "endpoint": "/faceswap", | |
| "status": status, | |
| "timestamp": datetime.utcnow() | |
| }) | |
| # --------------------- Face Swap Pipeline --------------------- | |
| swap_lock = threading.Lock() | |
| def enhance_image_with_codeformer(rgb_img, temp_dir=None, w=0.7): | |
| """ | |
| Enhance face image using CodeFormer. | |
| Uses in-process models (fast) if available, falls back to subprocess (slow). | |
| """ | |
| global codeformer_net, codeformer_upsampler, codeformer_face_helper, codeformer_device | |
| t0 = time.time() | |
| # ── FAST PATH: In-process CodeFormer (no subprocess!) ── | |
| if codeformer_net is not None and codeformer_face_helper is not None: | |
| import torch | |
| from torchvision.transforms.functional import normalize as torch_normalize | |
| from basicsr.utils import img2tensor, tensor2img | |
| from facelib.utils.misc import is_gray | |
| bgr_img = cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR) | |
| # Reset face helper state | |
| codeformer_face_helper.clean_all() | |
| codeformer_face_helper.read_image(bgr_img) | |
| num_faces = codeformer_face_helper.get_face_landmarks_5( | |
| only_center_face=False, resize=640, eye_dist_threshold=5 | |
| ) | |
| logger.info(f"[CodeFormer] Detected {num_faces} faces in {time.time()-t0:.2f}s") | |
| codeformer_face_helper.align_warp_face() | |
| # Enhance each cropped face with CodeFormer neural net | |
| t_faces = time.time() | |
| for idx, cropped_face in enumerate(codeformer_face_helper.cropped_faces): | |
| cropped_face_t = img2tensor(cropped_face / 255., bgr2rgb=True, float32=True) | |
| torch_normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True) | |
| cropped_face_t = cropped_face_t.unsqueeze(0).to(codeformer_device) | |
| try: | |
| with torch.no_grad(): | |
| output = codeformer_net(cropped_face_t, w=w, adain=True)[0] | |
| restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1)) | |
| del output | |
| torch.cuda.empty_cache() | |
| except Exception as e: | |
| logger.warning(f"[CodeFormer] Face {idx} inference failed: {e}") | |
| restored_face = tensor2img(cropped_face_t, rgb2bgr=True, min_max=(-1, 1)) | |
| restored_face = restored_face.astype('uint8') | |
| codeformer_face_helper.add_restored_face(restored_face, cropped_face) | |
| logger.info(f"[CodeFormer] Face restoration ({num_faces} faces): {time.time()-t_faces:.2f}s") | |
| # Paste restored faces back onto original image | |
| # NOTE: We skip RealESRGAN background/face upsampling — it's the #1 bottleneck | |
| # (~20s) and unnecessary for face swap. We only need CodeFormer face restoration. | |
| t_paste = time.time() | |
| codeformer_face_helper.get_inverse_affine(None) | |
| restored_img = codeformer_face_helper.paste_faces_to_input_image( | |
| upsample_img=None, draw_box=False | |
| ) | |
| logger.info(f"[CodeFormer] Paste back: {time.time()-t_paste:.2f}s") | |
| logger.info(f"[CodeFormer] In-process enhancement done in {time.time()-t0:.2f}s") | |
| return cv2.cvtColor(restored_img, cv2.COLOR_BGR2RGB) | |
| # ── SLOW FALLBACK: Subprocess CodeFormer (with timeout!) ── | |
| logger.warning("[CodeFormer] In-process models unavailable, falling back to subprocess") | |
| if temp_dir is None: | |
| temp_dir = os.path.join(tempfile.gettempdir(), f"enhance_{uuid.uuid4().hex[:8]}") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| input_path = os.path.join(temp_dir, "input.jpg") | |
| cv2.imwrite(input_path, cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR)) | |
| python_cmd = sys.executable if sys.executable else "python3" | |
| cmd = ( | |
| f"{python_cmd} {CODEFORMER_PATH} " | |
| f"-w {w} " | |
| f"--input_path {input_path} " | |
| f"--output_path {temp_dir} " | |
| f"--bg_upsampler None " | |
| f"--upscale 1" | |
| ) | |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=120) | |
| if result.returncode != 0: | |
| raise RuntimeError(result.stderr) | |
| final_dir = os.path.join(temp_dir, "final_results") | |
| files = [f for f in os.listdir(final_dir) if f.endswith(".png")] | |
| if not files: | |
| raise RuntimeError("No enhanced output") | |
| final_path = os.path.join(final_dir, files[0]) | |
| enhanced = cv2.imread(final_path) | |
| logger.info(f"[CodeFormer] Subprocess enhancement done in {time.time()-t0:.2f}s") | |
| return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB) | |
| def multi_face_swap(src_img, tgt_img): | |
| pipeline_start = time.time() | |
| src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| t0 = time.time() | |
| src_faces = face_analysis_app.get(src_bgr) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| logger.info(f"[Pipeline] Multi-face detection: {time.time()-t0:.2f}s") | |
| if not src_faces or not tgt_faces: | |
| raise ValueError("No faces detected") | |
| def face_sort_key(face): | |
| x1, y1, x2, y2 = face.bbox | |
| area = (x2 - x1) * (y2 - y1) | |
| cx = (x1 + x2) / 2 | |
| return (-area, cx) | |
| src_male = sorted([f for f in src_faces if f.gender == 1], key=face_sort_key) | |
| src_female = sorted([f for f in src_faces if f.gender == 0], key=face_sort_key) | |
| tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| pairs = [] | |
| for s, t in zip(src_male, tgt_male): | |
| pairs.append((s, t)) | |
| for s, t in zip(src_female, tgt_female): | |
| pairs.append((s, t)) | |
| if not pairs: | |
| src_faces = sorted(src_faces, key=face_sort_key) | |
| tgt_faces = sorted(tgt_faces, key=face_sort_key) | |
| pairs = list(zip(src_faces, tgt_faces)) | |
| t0 = time.time() | |
| result_img = tgt_bgr.copy() | |
| for src_face, _ in pairs: | |
| if face_analysis_app is None: | |
| raise ValueError("Face analysis models not initialized.") | |
| current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| target_face = candidates[0] | |
| if swapper is None: | |
| raise ValueError("Face swap models not initialized.") | |
| result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| logger.info(f"[Pipeline] Multi-face swap ({len(pairs)} pairs): {time.time()-t0:.2f}s") | |
| logger.info(f"[Pipeline] TOTAL multi_face_swap: {time.time()-pipeline_start:.2f}s") | |
| return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| def face_swap_and_enhance(src_img, tgt_img, temp_dir=None): | |
| try: | |
| with swap_lock: | |
| pipeline_start = time.time() | |
| if temp_dir is None: | |
| temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_work_{uuid.uuid4().hex[:8]}") | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| if face_analysis_app is None: | |
| return None, None, "❌ Face analysis models not initialized." | |
| if swapper is None: | |
| return None, None, "❌ Face swap models not initialized." | |
| src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| t0 = time.time() | |
| src_faces = face_analysis_app.get(src_bgr) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| logger.info(f"[Pipeline] Face detection: {time.time()-t0:.2f}s") | |
| if not src_faces or not tgt_faces: | |
| return None, None, "❌ Face not detected in one of the images" | |
| t0 = time.time() | |
| swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| logger.info(f"[Pipeline] Face swap: {time.time()-t0:.2f}s") | |
| if swapped_bgr is None: | |
| return None, None, "❌ Face swap failed" | |
| # Use in-process CodeFormer enhancement (fast path) | |
| t0 = time.time() | |
| swapped_rgb = cv2.cvtColor(swapped_bgr, cv2.COLOR_BGR2RGB) | |
| try: | |
| enhanced_rgb = enhance_image_with_codeformer(swapped_rgb) | |
| enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| except Exception as e: | |
| logger.error(f"[Pipeline] CodeFormer failed, using raw swap: {e}") | |
| enhanced_bgr = swapped_bgr | |
| logger.info(f"[Pipeline] Enhancement: {time.time()-t0:.2f}s") | |
| final_path = os.path.join(temp_dir, f"result_{uuid.uuid4().hex[:8]}.png") | |
| cv2.imwrite(final_path, enhanced_bgr) | |
| final_img = cv2.cvtColor(enhanced_bgr, cv2.COLOR_BGR2RGB) | |
| logger.info(f"[Pipeline] TOTAL face_swap_and_enhance: {time.time()-pipeline_start:.2f}s") | |
| return final_img, final_path, "" | |
| except Exception as e: | |
| return None, None, f"❌ Error: {str(e)}" | |
| def compress_image( | |
| image_bytes: bytes, | |
| max_size=(1280, 1280), # max width/height | |
| quality=75 # JPEG quality (60–80 is ideal) | |
| ) -> bytes: | |
| """ | |
| Compress image by resizing and lowering quality. | |
| Returns compressed image bytes. | |
| """ | |
| img = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| # Resize while maintaining aspect ratio | |
| img.thumbnail(max_size, Image.LANCZOS) | |
| output = io.BytesIO() | |
| img.save( | |
| output, | |
| format="JPEG", | |
| quality=quality, | |
| optimize=True, | |
| progressive=True | |
| ) | |
| return output.getvalue() | |
| # --------------------- DigitalOcean Spaces Helper --------------------- | |
| def get_spaces_client(): | |
| session = boto3.session.Session() | |
| client = session.client( | |
| 's3', | |
| region_name=DO_SPACES_REGION, | |
| endpoint_url=DO_SPACES_ENDPOINT, | |
| aws_access_key_id=DO_SPACES_KEY, | |
| aws_secret_access_key=DO_SPACES_SECRET, | |
| config=Config(signature_version='s3v4') | |
| ) | |
| return client | |
| def upload_to_spaces(file_bytes, key, content_type="image/png"): | |
| client = get_spaces_client() | |
| client.put_object(Bucket=DO_SPACES_BUCKET, Key=key, Body=file_bytes, ContentType=content_type, ACL='public-read') | |
| return f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{key}" | |
| def download_from_spaces(key): | |
| client = get_spaces_client() | |
| obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key) | |
| return obj['Body'].read() | |
| def mandatory_enhancement(rgb_img): | |
| """ | |
| Always runs CodeFormer on the final image. | |
| Fail-safe: returns original if enhancement fails. | |
| """ | |
| try: | |
| return enhance_image_with_codeformer(rgb_img) | |
| except Exception as e: | |
| logger.error(f"CodeFormer failed, returning original: {e}") | |
| return rgb_img | |
| # --------------------- API Endpoints --------------------- | |
| async def root(): | |
| """Root endpoint""" | |
| return { | |
| "success": True, | |
| "message": "FaceSwap API", | |
| "data": { | |
| "version": "1.0.0", | |
| "Product Name":"Beauty Camera - GlowCam AI Studio", | |
| "Released By" : "LogicGo Infotech" | |
| } | |
| } | |
| async def health(): | |
| return {"status": "healthy"} | |
| async def test_admin_db(): | |
| try: | |
| doc = await admin_db.list_collection_names() | |
| return {"ok": True, "collections": doc} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e), "url": ADMIN_MONGO_URL} | |
| async def face_swap_api( | |
| source: UploadFile = File(...), | |
| image2: Optional[UploadFile] = File(None), | |
| target_category_id: str = Form(None), | |
| new_category_id: str = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| appname: Optional[str] = Form(None), | |
| credentials: HTTPAuthorizationCredentials = Security(security) | |
| ): | |
| start_time = datetime.utcnow() | |
| try: | |
| # ------------------------------------------------------------------ | |
| # VALIDATION | |
| # ------------------------------------------------------------------ | |
| # -------------------------------------------------------------- | |
| # BACKWARD COMPATIBILITY FOR OLD ANDROID VERSIONS | |
| # -------------------------------------------------------------- | |
| if target_category_id == "": | |
| target_category_id = None | |
| if new_category_id == "": | |
| new_category_id = None | |
| if user_id == "": | |
| user_id = None | |
| subcategories_collection = get_app_db_collections(appname) | |
| logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| if target_category_id and new_category_id: | |
| raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| if not target_category_id and not new_category_id: | |
| raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # ------------------------------------------------------------------ | |
| # READ SOURCE IMAGE | |
| # ------------------------------------------------------------------ | |
| src_bytes = await source.read() | |
| src_key = f"faceswap/source/{uuid.uuid4().hex}_{source.filename}" | |
| upload_to_spaces(src_bytes, src_key, content_type=source.content_type) | |
| # ------------------------------------------------------------------ | |
| # CASE 1 : new_category_id → MongoDB lookup | |
| # ------------------------------------------------------------------ | |
| if new_category_id: | |
| # doc = await subcategories_col.find_one({ | |
| # "asset_images._id": ObjectId(new_category_id) | |
| # }) | |
| doc = await subcategories_collection.find_one({ | |
| "asset_images._id": ObjectId(new_category_id) | |
| }) | |
| if not doc: | |
| raise HTTPException(404, "Asset image not found in database") | |
| # extract correct asset | |
| asset = next( | |
| (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| None | |
| ) | |
| if not asset: | |
| raise HTTPException(404, "Asset image URL not found") | |
| # correct URL | |
| target_url = asset["url"] | |
| # correct categoryId (ObjectId) | |
| #category_oid = doc["categoryId"] # <-- DO NOT CONVERT TO STRING | |
| subcategory_oid = doc["_id"] | |
| # # ------------------------------------------------------------------ | |
| # # CASE 2 : target_category_id → DigitalOcean path (unchanged logic) | |
| # # ------------------------------------------------------------------ | |
| if target_category_id: | |
| client = get_spaces_client() | |
| base_prefix = "faceswap/target/" | |
| resp = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| ) | |
| # Extract categories from the CommonPrefixes | |
| categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| target_url = None | |
| # --- FIX STARTS HERE --- | |
| for category in categories: | |
| original_prefix = f"faceswap/target/{category}/original/" | |
| thumb_prefix = f"faceswap/target/{category}/thumb/" # Keep for file list check (optional but safe) | |
| # List objects in original/ | |
| original_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| ).get("Contents", []) | |
| # List objects in thumb/ (optional: for the old code's extra check) | |
| thumb_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| ).get("Contents", []) | |
| # Extract only the filenames and filter for .png | |
| original_filenames = sorted([ | |
| obj["Key"].split("/")[-1] for obj in original_objects | |
| if obj["Key"].split("/")[-1].endswith(".png") | |
| ]) | |
| thumb_filenames = [ | |
| obj["Key"].split("/")[-1] for obj in thumb_objects | |
| ] | |
| # Replicate the old indexing logic based on sorted filenames | |
| for idx, filename in enumerate(original_filenames, start=1): | |
| cid = f"{category.lower()}image_{idx}" | |
| # Optional: Replicate the thumb file check for 100% parity | |
| # if filename in thumb_filenames and cid == target_category_id: | |
| # Simpler check just on the ID, assuming thumb files are present | |
| if cid == target_category_id: | |
| # Construct the final target URL using the full prefix and the filename | |
| target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| break | |
| if target_url: | |
| break | |
| # --- FIX ENDS HERE --- | |
| if not target_url: | |
| raise HTTPException(404, "Target categoryId not found") | |
| # # ------------------------------------------------------------------ | |
| # # DOWNLOAD TARGET IMAGE | |
| # # ------------------------------------------------------------------ | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get(target_url) | |
| response.raise_for_status() | |
| tgt_bytes = response.content | |
| src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src_bgr is None or tgt_bgr is None: | |
| raise HTTPException(400, "Invalid image data") | |
| src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # ------------------------------------------------------------------ | |
| # READ OPTIONAL IMAGE2 | |
| # ------------------------------------------------------------------ | |
| img2_rgb = None | |
| if image2: | |
| img2_bytes = await image2.read() | |
| img2_bgr = cv2.imdecode(np.frombuffer(img2_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if img2_bgr is not None: | |
| img2_rgb = cv2.cvtColor(img2_bgr, cv2.COLOR_BGR2RGB) | |
| # ------------------------------------------------------------------ | |
| # FACE SWAP EXECUTION (run in thread to not block event loop) | |
| # ------------------------------------------------------------------ | |
| if img2_rgb is not None: | |
| def _couple_swap(): | |
| pipeline_start = time.time() | |
| src_images = [src_rgb, img2_rgb] | |
| all_src_faces = [] | |
| t0 = time.time() | |
| for img in src_images: | |
| faces = face_analysis_app.get(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| all_src_faces.extend(faces) | |
| tgt_faces = face_analysis_app.get(cv2.cvtColor(tgt_rgb, cv2.COLOR_RGB2BGR)) | |
| logger.info(f"[Pipeline] Couple face detection: {time.time()-t0:.2f}s") | |
| if not all_src_faces: | |
| raise ValueError("No faces detected in source images") | |
| if not tgt_faces: | |
| raise ValueError("No faces detected in target image") | |
| def face_sort_key(face): | |
| x1, y1, x2, y2 = face.bbox | |
| area = (x2 - x1) * (y2 - y1) | |
| cx = (x1 + x2) / 2 | |
| return (-area, cx) | |
| src_male = sorted([f for f in all_src_faces if f.gender == 1], key=face_sort_key) | |
| src_female = sorted([f for f in all_src_faces if f.gender == 0], key=face_sort_key) | |
| tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| pairs = [] | |
| for s, t in zip(src_male, tgt_male): | |
| pairs.append((s, t)) | |
| for s, t in zip(src_female, tgt_female): | |
| pairs.append((s, t)) | |
| if not pairs: | |
| src_all = sorted(all_src_faces, key=face_sort_key) | |
| tgt_all = sorted(tgt_faces, key=face_sort_key) | |
| pairs = list(zip(src_all, tgt_all)) | |
| t0 = time.time() | |
| with swap_lock: | |
| result_img = cv2.cvtColor(tgt_rgb, cv2.COLOR_RGB2BGR) | |
| for src_face, _ in pairs: | |
| current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| target_face = candidates[0] | |
| result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| logger.info(f"[Pipeline] Couple face swap: {time.time()-t0:.2f}s") | |
| result_rgb_out = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| t0 = time.time() | |
| enhanced_rgb = mandatory_enhancement(result_rgb_out) | |
| logger.info(f"[Pipeline] Couple enhancement: {time.time()-t0:.2f}s") | |
| enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| temp_dir = tempfile.mkdtemp(prefix="faceswap_") | |
| final_path = os.path.join(temp_dir, "result.png") | |
| cv2.imwrite(final_path, enhanced_bgr) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| logger.info(f"[Pipeline] TOTAL couple swap: {time.time()-pipeline_start:.2f}s") | |
| return result_bytes | |
| try: | |
| result_bytes = await asyncio.to_thread(_couple_swap) | |
| except ValueError as ve: | |
| raise HTTPException(400, str(ve)) | |
| else: | |
| # ----- SINGLE SOURCE SWAP (run in thread) ----- | |
| def _single_swap(): | |
| return face_swap_and_enhance(src_rgb, tgt_rgb) | |
| final_img, final_path, err = await asyncio.to_thread(_single_swap) | |
| if err: | |
| raise HTTPException(500, err) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| result_url = upload_to_spaces(result_bytes, result_key) | |
| # ------------------------------------------------- | |
| # COMPRESS IMAGE (2–3 MB target) | |
| # ------------------------------------------------- | |
| compressed_bytes = compress_image( | |
| image_bytes=result_bytes, | |
| max_size=(1280, 1280), | |
| quality=72 | |
| ) | |
| compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| compressed_url = upload_to_spaces( | |
| compressed_bytes, | |
| compressed_key, | |
| content_type="image/jpeg" | |
| ) | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| log_entry = { | |
| "endpoint": "/face-swap", | |
| "status": "success", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time | |
| } | |
| if appname: | |
| log_entry["appname"] = appname | |
| await database.faceswap.insert_one(log_entry) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url, | |
| "Compressed_Image_URL": compressed_url | |
| } | |
| except Exception as e: | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| log_entry = { | |
| "endpoint": "/face-swap", | |
| "status": "fail", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time, | |
| "error": str(e) | |
| } | |
| if appname: | |
| log_entry["appname"] = appname | |
| await database.faceswap.insert_one(log_entry) | |
| raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| async def preview_result(result_key: str): | |
| try: | |
| img_bytes = download_from_spaces(result_key) | |
| except Exception: | |
| raise HTTPException(status_code=404, detail="Result not found") | |
| return Response( | |
| content=img_bytes, | |
| media_type="image/png", | |
| headers={"Content-Disposition": "inline; filename=result.png"} | |
| ) | |
| async def multi_face_swap_api( | |
| source_image: UploadFile = File(...), | |
| target_image: UploadFile = File(...) | |
| ): | |
| start_time = datetime.utcnow() | |
| try: | |
| # ----------------------------- | |
| # Read images | |
| # ----------------------------- | |
| src_bytes = await source_image.read() | |
| tgt_bytes = await target_image.read() | |
| src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src_bgr is None or tgt_bgr is None: | |
| raise HTTPException(400, "Invalid image data") | |
| src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # ----------------------------- | |
| # Multi-face swap (run in thread to not block event loop) | |
| # ----------------------------- | |
| def _multi_swap_and_enhance(): | |
| swapped_rgb = multi_face_swap(src_rgb, tgt_rgb) | |
| return mandatory_enhancement(swapped_rgb) | |
| final_rgb = await asyncio.to_thread(_multi_swap_and_enhance) | |
| final_bgr = cv2.cvtColor(final_rgb, cv2.COLOR_RGB2BGR) | |
| # ----------------------------- | |
| # Save temp result | |
| # ----------------------------- | |
| temp_dir = tempfile.mkdtemp(prefix="multi_faceswap_") | |
| result_path = os.path.join(temp_dir, "result.png") | |
| cv2.imwrite(result_path, final_bgr) | |
| with open(result_path, "rb") as f: | |
| result_bytes = f.read() | |
| # ----------------------------- | |
| # Upload | |
| # ----------------------------- | |
| result_key = f"faceswap/multi/{uuid.uuid4().hex}.png" | |
| result_url = upload_to_spaces( | |
| result_bytes, | |
| result_key, | |
| content_type="image/png" | |
| ) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def face_swap_couple_api( | |
| image1: UploadFile = File(...), | |
| image2: Optional[UploadFile] = File(None), | |
| target_category_id: str = Form(None), | |
| new_category_id: str = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| appname: Optional[str] = Form(None), | |
| credentials: HTTPAuthorizationCredentials = Security(security) | |
| ): | |
| """ | |
| Production-ready face swap endpoint supporting: | |
| - Multiple source images (image1 + optional image2) | |
| - Gender-based pairing | |
| - Merged faces from multiple sources | |
| - Mandatory CodeFormer enhancement | |
| """ | |
| start_time = datetime.utcnow() | |
| try: | |
| # ----------------------------- | |
| # Validate input | |
| # ----------------------------- | |
| if target_category_id == "": | |
| target_category_id = None | |
| if new_category_id == "": | |
| new_category_id = None | |
| if user_id == "": | |
| user_id = None | |
| subcategories_collection = get_app_db_collections(appname) | |
| if target_category_id and new_category_id: | |
| raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| if not target_category_id and not new_category_id: | |
| raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # ----------------------------- | |
| # Read source images | |
| # ----------------------------- | |
| src_images = [] | |
| img1_bytes = await image1.read() | |
| src1 = cv2.imdecode(np.frombuffer(img1_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src1 is None: | |
| raise HTTPException(400, "Invalid image1 data") | |
| src_images.append(cv2.cvtColor(src1, cv2.COLOR_BGR2RGB)) | |
| if image2: | |
| img2_bytes = await image2.read() | |
| src2 = cv2.imdecode(np.frombuffer(img2_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src2 is not None: | |
| src_images.append(cv2.cvtColor(src2, cv2.COLOR_BGR2RGB)) | |
| # ----------------------------- | |
| # Resolve target image | |
| # ----------------------------- | |
| target_url = None | |
| if new_category_id: | |
| doc = await subcategories_collection.find_one({ | |
| "asset_images._id": ObjectId(new_category_id) | |
| }) | |
| if not doc: | |
| raise HTTPException(404, "Asset image not found in database") | |
| asset = next( | |
| (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| None | |
| ) | |
| if not asset: | |
| raise HTTPException(404, "Asset image URL not found") | |
| target_url = asset["url"] | |
| subcategory_oid = doc["_id"] | |
| if target_category_id: | |
| client = get_spaces_client() | |
| base_prefix = "faceswap/target/" | |
| resp = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| ) | |
| categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| for category in categories: | |
| original_prefix = f"faceswap/target/{category}/original/" | |
| thumb_prefix = f"faceswap/target/{category}/thumb/" | |
| original_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| ).get("Contents", []) | |
| thumb_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| ).get("Contents", []) | |
| original_filenames = sorted([ | |
| obj["Key"].split("/")[-1] for obj in original_objects | |
| if obj["Key"].split("/")[-1].endswith(".png") | |
| ]) | |
| for idx, filename in enumerate(original_filenames, start=1): | |
| cid = f"{category.lower()}image_{idx}" | |
| if cid == target_category_id: | |
| target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| break | |
| if target_url: | |
| break | |
| if not target_url: | |
| raise HTTPException(404, "Target categoryId not found") | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get(target_url) | |
| response.raise_for_status() | |
| tgt_bytes = response.content | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if tgt_bgr is None: | |
| raise HTTPException(400, "Invalid target image data") | |
| # ----------------------------- | |
| # Couple face swap + enhance (run in thread) | |
| # ----------------------------- | |
| def _couple_face_swap_and_enhance(): | |
| pipeline_start = time.time() | |
| all_src_faces = [] | |
| t0 = time.time() | |
| for img in src_images: | |
| faces = face_analysis_app.get(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| all_src_faces.extend(faces) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| logger.info(f"[Pipeline] Couple-ep face detection: {time.time()-t0:.2f}s") | |
| if not all_src_faces: | |
| raise ValueError("No faces detected in source images") | |
| if not tgt_faces: | |
| raise ValueError("No faces detected in target image") | |
| def face_sort_key(face): | |
| x1, y1, x2, y2 = face.bbox | |
| area = (x2 - x1) * (y2 - y1) | |
| cx = (x1 + x2) / 2 | |
| return (-area, cx) | |
| src_male = sorted([f for f in all_src_faces if f.gender == 1], key=face_sort_key) | |
| src_female = sorted([f for f in all_src_faces if f.gender == 0], key=face_sort_key) | |
| tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| pairs = [] | |
| for s, t in zip(src_male, tgt_male): | |
| pairs.append((s, t)) | |
| for s, t in zip(src_female, tgt_female): | |
| pairs.append((s, t)) | |
| if not pairs: | |
| src_all = sorted(all_src_faces, key=face_sort_key) | |
| tgt_all = sorted(tgt_faces, key=face_sort_key) | |
| pairs = list(zip(src_all, tgt_all)) | |
| t0 = time.time() | |
| with swap_lock: | |
| result_img = tgt_bgr.copy() | |
| for src_face, _ in pairs: | |
| current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| target_face = candidates[0] | |
| result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| logger.info(f"[Pipeline] Couple-ep face swap: {time.time()-t0:.2f}s") | |
| result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| t0 = time.time() | |
| enhanced_rgb = mandatory_enhancement(result_rgb) | |
| logger.info(f"[Pipeline] Couple-ep enhancement: {time.time()-t0:.2f}s") | |
| enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| temp_dir = tempfile.mkdtemp(prefix="faceswap_") | |
| final_path = os.path.join(temp_dir, "result.png") | |
| cv2.imwrite(final_path, enhanced_bgr) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| logger.info(f"[Pipeline] TOTAL couple-ep swap: {time.time()-pipeline_start:.2f}s") | |
| return result_bytes | |
| try: | |
| result_bytes = await asyncio.to_thread(_couple_face_swap_and_enhance) | |
| except ValueError as ve: | |
| raise HTTPException(400, str(ve)) | |
| result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| result_url = upload_to_spaces(result_bytes, result_key) | |
| compressed_bytes = compress_image(result_bytes, max_size=(1280, 1280), quality=72) | |
| compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| compressed_url = upload_to_spaces(compressed_bytes, compressed_key, content_type="image/jpeg") | |
| # ----------------------------- | |
| # Log API usage | |
| # ----------------------------- | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| log_entry = { | |
| "endpoint": "/face-swap-couple", | |
| "status": "success", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time | |
| } | |
| if appname: | |
| log_entry["appname"] = appname | |
| await database.faceswap.insert_one(log_entry) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url, | |
| "compressed_url": compressed_url | |
| } | |
| except Exception as e: | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| log_entry = { | |
| "endpoint": "/face-swap-couple", | |
| "status": "fail", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time, | |
| "error": str(e) | |
| } | |
| if appname: | |
| log_entry["appname"] = appname | |
| await database.faceswap.insert_one(log_entry) | |
| raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| if __name__ == "__main__": | |
| uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) |