Spaces:
Sleeping
Sleeping
| #####################FASTAPI___________________############## | |
| import os | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| import shutil | |
| import uuid | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| import asyncio | |
| import subprocess | |
| import logging | |
| import tempfile | |
| import sys | |
| import time | |
| from datetime import datetime,timedelta | |
| import tempfile | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| from huggingface_hub import hf_hub_download | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Depends, Security, Form | |
| from fastapi.responses import RedirectResponse | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| import httpx | |
| import uvicorn | |
| from PIL import Image | |
| import io | |
| import requests | |
| # DigitalOcean Spaces | |
| import boto3 | |
| from botocore.client import Config | |
| from typing import Optional | |
| # --------------------- Logging --------------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --------------------- Secrets & Paths --------------------- | |
| REPO_ID = "HariLogicgo/face_swap_models" | |
| MODELS_DIR = "./models" | |
| os.makedirs(MODELS_DIR, exist_ok=True) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| DO_SPACES_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| DO_SPACES_ENDPOINT = f"https://{DO_SPACES_REGION}.digitaloceanspaces.com" | |
| DO_SPACES_KEY = os.getenv("DO_SPACES_KEY") | |
| DO_SPACES_SECRET = os.getenv("DO_SPACES_SECRET") | |
| DO_SPACES_BUCKET = os.getenv("DO_SPACES_BUCKET") | |
| # NEW admin DB (with error handling for missing env vars) | |
| ADMIN_MONGO_URL = os.getenv("ADMIN_MONGO_URL") | |
| admin_client = None | |
| admin_db = None | |
| subcategories_col = None | |
| media_clicks_col = None | |
| if ADMIN_MONGO_URL: | |
| try: | |
| admin_client = AsyncIOMotorClient(ADMIN_MONGO_URL) | |
| admin_db = admin_client.adminPanel | |
| subcategories_col = admin_db.subcategories | |
| media_clicks_col = admin_db.media_clicks | |
| except Exception as e: | |
| logger.warning(f"MongoDB admin connection failed (optional): {e}") | |
| # Collage Maker DB (optional) | |
| COLLAGE_MAKER_DB_URL = os.getenv("COLLAGE_MAKER_DB_URL") | |
| collage_maker_client = None | |
| collage_maker_db = None | |
| collage_media_clicks_col = None | |
| collage_subcategories_col = None | |
| if COLLAGE_MAKER_DB_URL: | |
| try: | |
| collage_maker_client = AsyncIOMotorClient(COLLAGE_MAKER_DB_URL) | |
| collage_maker_db = collage_maker_client.adminPanel | |
| collage_media_clicks_col = collage_maker_db.media_clicks | |
| collage_subcategories_col = collage_maker_db.subcategories | |
| except Exception as e: | |
| logger.warning(f"MongoDB collage-maker connection failed (optional): {e}") | |
| # AI Enhancer DB (optional) | |
| AI_ENHANCER_DB_URL = os.getenv("AI_ENHANCER_DB_URL") | |
| ai_enhancer_client = None | |
| ai_enhancer_db = None | |
| ai_enhancer_media_clicks_col = None | |
| ai_enhancer_subcategories_col = None | |
| if AI_ENHANCER_DB_URL: | |
| try: | |
| ai_enhancer_client = AsyncIOMotorClient(AI_ENHANCER_DB_URL) | |
| ai_enhancer_db = ai_enhancer_client.test # 🔴 test database | |
| ai_enhancer_media_clicks_col = ai_enhancer_db.media_clicks | |
| ai_enhancer_subcategories_col = ai_enhancer_db.subcategories | |
| except Exception as e: | |
| logger.warning(f"MongoDB ai-enhancer connection failed (optional): {e}") | |
| def get_media_clicks_collection(appname: Optional[str] = None): | |
| """Return the media clicks collection for the given app (default: main admin).""" | |
| if appname and str(appname).strip().lower() == "collage-maker": | |
| return collage_media_clicks_col | |
| return media_clicks_col | |
| # OLD logs DB | |
| MONGODB_URL = os.getenv("MONGODB_URL") | |
| client = None | |
| database = None | |
| # --------------------- Download Models --------------------- | |
| def download_models(): | |
| try: | |
| logger.info("Downloading models...") | |
| inswapper_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="models/inswapper_128.onnx", | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| buffalo_files = ["1k3d68.onnx", "2d106det.onnx", "genderage.onnx", "det_10g.onnx", "w600k_r50.onnx"] | |
| for f in buffalo_files: | |
| hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=f"models/buffalo_l/" + f, | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| logger.info("Models downloaded successfully.") | |
| return inswapper_path | |
| except Exception as e: | |
| logger.error(f"Model download failed: {e}") | |
| raise | |
| try: | |
| inswapper_path = download_models() | |
| # --------------------- Face Analysis + Swapper --------------------- | |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| logger.info("Face analysis models loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize face analysis models: {e}") | |
| # Set defaults to prevent crash | |
| inswapper_path = None | |
| face_analysis_app = None | |
| swapper = None | |
| # --------------------- CodeFormer --------------------- | |
| CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| def ensure_codeformer(): | |
| """ | |
| Ensure CodeFormer's local basicsr + facelib are importable and | |
| pretrained weights are downloaded. No setup.py needed — we use | |
| sys.path / PYTHONPATH instead. | |
| """ | |
| try: | |
| if not os.path.exists("CodeFormer"): | |
| logger.info("CodeFormer not found, cloning repository...") | |
| subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=False) | |
| # Add CodeFormer root to sys.path so `import basicsr` and | |
| # `import facelib` resolve to the local (compatible) versions | |
| # instead of the broken PyPI basicsr==1.4.2. | |
| codeformer_root = os.path.join(os.getcwd(), "CodeFormer") | |
| if codeformer_root not in sys.path: | |
| sys.path.insert(0, codeformer_root) | |
| logger.info(f"Added {codeformer_root} to sys.path for local basicsr/facelib") | |
| # NOTE: We do NOT need the PyPI 'realesrgan' package. | |
| # Both in-process and subprocess paths use CodeFormer's local | |
| # basicsr.utils.realesrgan_utils.RealESRGANer instead. | |
| # Installing PyPI realesrgan at runtime would re-install the | |
| # broken basicsr==1.4.2 and break everything. | |
| # Download pretrained weights if not already present | |
| if os.path.exists("CodeFormer"): | |
| try: | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=False, timeout=300) | |
| except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| logger.warning("Failed to download facelib models (optional)") | |
| try: | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=False, timeout=300) | |
| except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| logger.warning("Failed to download CodeFormer models (optional)") | |
| except Exception as e: | |
| logger.error(f"CodeFormer setup failed: {e}") | |
| logger.warning("Continuing without CodeFormer features...") | |
| ensure_codeformer() | |
| # --------------------- In-Process CodeFormer (No Subprocess!) --------------------- | |
| # Load CodeFormer models ONCE at startup instead of spawning a new Python process per request. | |
| # This eliminates 15-40s of model loading overhead per request. | |
| codeformer_net = None | |
| codeformer_upsampler = None | |
| codeformer_face_helper = None | |
| codeformer_device = None | |
| def init_codeformer_in_process(): | |
| """Load CodeFormer models once into memory for fast per-request inference.""" | |
| global codeformer_net, codeformer_upsampler, codeformer_face_helper, codeformer_device | |
| try: | |
| import torch | |
| from torchvision.transforms.functional import normalize as torch_normalize | |
| # Add CodeFormer to Python path | |
| codeformer_root = os.path.join(os.getcwd(), "CodeFormer") | |
| if codeformer_root not in sys.path: | |
| sys.path.insert(0, codeformer_root) | |
| from basicsr.utils.registry import ARCH_REGISTRY | |
| from basicsr.utils.download_util import load_file_from_url | |
| from facelib.utils.face_restoration_helper import FaceRestoreHelper | |
| codeformer_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| logger.info(f"Initializing CodeFormer on device: {codeformer_device}") | |
| # 1) Load CodeFormer network | |
| net = ARCH_REGISTRY.get('CodeFormer')( | |
| dim_embd=512, codebook_size=1024, n_head=8, n_layers=9, | |
| connect_list=['32', '64', '128', '256'] | |
| ).to(codeformer_device) | |
| ckpt_path = load_file_from_url( | |
| url='https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/codeformer.pth', | |
| model_dir='weights/CodeFormer', progress=True, file_name=None | |
| ) | |
| checkpoint = torch.load(ckpt_path, map_location=codeformer_device)['params_ema'] | |
| net.load_state_dict(checkpoint) | |
| net.eval() | |
| codeformer_net = net | |
| # 2) RealESRGAN upsampler — SKIPPED for face swap | |
| # Background/face upsampling is the #1 bottleneck (~20s per image). | |
| # For face swap we only need CodeFormer face restoration, not super-resolution. | |
| # The upsampler is kept as None; we no longer download the 64MB model at startup. | |
| codeformer_upsampler = None | |
| # 3) Create FaceRestoreHelper (reused per request) | |
| # NOTE: local CodeFormer uses "upscale_factor" (not "upscale") | |
| # upscale_factor=1 → keep original resolution (no 2x upscale needed for face swap) | |
| codeformer_face_helper = FaceRestoreHelper( | |
| upscale_factor=1, | |
| face_size=512, | |
| crop_ratio=(1, 1), | |
| det_model='retinaface_resnet50', | |
| save_ext='png', | |
| use_parse=True, | |
| device=codeformer_device | |
| ) | |
| logger.info("✅ CodeFormer models loaded in-process successfully!") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to load CodeFormer in-process: {e}") | |
| logger.warning("CodeFormer enhancement will be unavailable.") | |
| return False | |
| # Try to load CodeFormer models in-process | |
| _codeformer_available = init_codeformer_in_process() | |
| # --------------------- FastAPI --------------------- | |
| fastapi_app = FastAPI() | |
| async def startup_db(): | |
| global client, database | |
| if MONGODB_URL: | |
| try: | |
| logger.info("Initializing MongoDB for API logs...") | |
| client = AsyncIOMotorClient(MONGODB_URL) | |
| database = client.FaceSwap | |
| logger.info("MongoDB initialized for API logs") | |
| except Exception as e: | |
| logger.warning(f"MongoDB connection failed (optional): {e}") | |
| client = None | |
| database = None | |
| else: | |
| logger.warning("MONGODB_URL not set, skipping MongoDB initialization") | |
| async def shutdown_db(): | |
| global client, admin_client, collage_maker_client | |
| if client is not None: | |
| client.close() | |
| logger.info("MongoDB connection closed") | |
| if admin_client is not None: | |
| admin_client.close() | |
| logger.info("Admin MongoDB connection closed") | |
| if collage_maker_client is not None: | |
| collage_maker_client.close() | |
| logger.info("Collage Maker MongoDB connection closed") | |
| # --------------------- Auth --------------------- | |
| security = HTTPBearer() | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| if credentials.credentials != API_SECRET_TOKEN: | |
| raise HTTPException(status_code=401, detail="Invalid or missing token") | |
| return credentials.credentials | |
| # --------------------- DB Selector --------------------- | |
| def get_app_db_collections(appname: Optional[str] = None): | |
| """ | |
| Returns (media_clicks_collection, subcategories_collection) | |
| based on appname. | |
| """ | |
| if appname: | |
| app = appname.strip().lower() | |
| if app == "collage-maker": | |
| if collage_media_clicks_col is not None and collage_subcategories_col is not None: | |
| return collage_media_clicks_col, collage_subcategories_col | |
| logger.warning("Collage-maker DB not configured, falling back to admin") | |
| elif app == "ai-enhancer": | |
| if ai_enhancer_media_clicks_col is not None and ai_enhancer_subcategories_col is not None: | |
| return ai_enhancer_media_clicks_col, ai_enhancer_subcategories_col | |
| logger.warning("AI-Enhancer DB not configured, falling back to admin") | |
| # default fallback | |
| return media_clicks_col, subcategories_col | |
| # --------------------- Logging API Hits --------------------- | |
| async def log_faceswap_hit(token: str, status: str = "success"): | |
| global database | |
| if database is None: | |
| return | |
| await database.api_logs.insert_one({ | |
| "token": token, | |
| "endpoint": "/faceswap", | |
| "status": status, | |
| "timestamp": datetime.utcnow() | |
| }) | |
| # --------------------- Face Swap Pipeline --------------------- | |
| swap_lock = threading.Lock() | |
| def enhance_image_with_codeformer(rgb_img, temp_dir=None, w=0.7): | |
| """ | |
| Enhance face image using CodeFormer. | |
| Uses in-process models (fast) if available, falls back to subprocess (slow). | |
| """ | |
| global codeformer_net, codeformer_upsampler, codeformer_face_helper, codeformer_device | |
| t0 = time.time() | |
| # ── FAST PATH: In-process CodeFormer (no subprocess!) ── | |
| if codeformer_net is not None and codeformer_face_helper is not None: | |
| import torch | |
| from torchvision.transforms.functional import normalize as torch_normalize | |
| from basicsr.utils import img2tensor, tensor2img | |
| from facelib.utils.misc import is_gray | |
| bgr_img = cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR) | |
| # Reset face helper state | |
| codeformer_face_helper.clean_all() | |
| codeformer_face_helper.read_image(bgr_img) | |
| num_faces = codeformer_face_helper.get_face_landmarks_5( | |
| only_center_face=False, resize=640, eye_dist_threshold=5 | |
| ) | |
| logger.info(f"[CodeFormer] Detected {num_faces} faces in {time.time()-t0:.2f}s") | |
| codeformer_face_helper.align_warp_face() | |
| # Enhance each cropped face with CodeFormer neural net | |
| t_faces = time.time() | |
| for idx, cropped_face in enumerate(codeformer_face_helper.cropped_faces): | |
| cropped_face_t = img2tensor(cropped_face / 255., bgr2rgb=True, float32=True) | |
| torch_normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True) | |
| cropped_face_t = cropped_face_t.unsqueeze(0).to(codeformer_device) | |
| try: | |
| with torch.no_grad(): | |
| output = codeformer_net(cropped_face_t, w=w, adain=True)[0] | |
| restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1)) | |
| del output | |
| torch.cuda.empty_cache() | |
| except Exception as e: | |
| logger.warning(f"[CodeFormer] Face {idx} inference failed: {e}") | |
| restored_face = tensor2img(cropped_face_t, rgb2bgr=True, min_max=(-1, 1)) | |
| restored_face = restored_face.astype('uint8') | |
| codeformer_face_helper.add_restored_face(restored_face, cropped_face) | |
| logger.info(f"[CodeFormer] Face restoration ({num_faces} faces): {time.time()-t_faces:.2f}s") | |
| # Paste restored faces back onto original image | |
| # NOTE: We skip RealESRGAN background/face upsampling — it's the #1 bottleneck | |
| # (~20s) and unnecessary for face swap. We only need CodeFormer face restoration. | |
| t_paste = time.time() | |
| codeformer_face_helper.get_inverse_affine(None) | |
| restored_img = codeformer_face_helper.paste_faces_to_input_image( | |
| upsample_img=None, draw_box=False | |
| ) | |
| logger.info(f"[CodeFormer] Paste back: {time.time()-t_paste:.2f}s") | |
| logger.info(f"[CodeFormer] In-process enhancement done in {time.time()-t0:.2f}s") | |
| return cv2.cvtColor(restored_img, cv2.COLOR_BGR2RGB) | |
| # ── SLOW FALLBACK: Subprocess CodeFormer (with timeout!) ── | |
| logger.warning("[CodeFormer] In-process models unavailable, falling back to subprocess") | |
| if temp_dir is None: | |
| temp_dir = os.path.join(tempfile.gettempdir(), f"enhance_{uuid.uuid4().hex[:8]}") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| input_path = os.path.join(temp_dir, "input.jpg") | |
| cv2.imwrite(input_path, cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR)) | |
| python_cmd = sys.executable if sys.executable else "python3" | |
| cmd = ( | |
| f"{python_cmd} {CODEFORMER_PATH} " | |
| f"-w {w} " | |
| f"--input_path {input_path} " | |
| f"--output_path {temp_dir} " | |
| f"--bg_upsampler None " | |
| f"--upscale 1" | |
| ) | |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=120) | |
| if result.returncode != 0: | |
| raise RuntimeError(result.stderr) | |
| final_dir = os.path.join(temp_dir, "final_results") | |
| files = [f for f in os.listdir(final_dir) if f.endswith(".png")] | |
| if not files: | |
| raise RuntimeError("No enhanced output") | |
| final_path = os.path.join(final_dir, files[0]) | |
| enhanced = cv2.imread(final_path) | |
| logger.info(f"[CodeFormer] Subprocess enhancement done in {time.time()-t0:.2f}s") | |
| return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB) | |
| def multi_face_swap(src_img, tgt_img): | |
| pipeline_start = time.time() | |
| src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| t0 = time.time() | |
| src_faces = face_analysis_app.get(src_bgr) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| logger.info(f"[Pipeline] Multi-face detection: {time.time()-t0:.2f}s") | |
| if not src_faces or not tgt_faces: | |
| raise ValueError("No faces detected") | |
| def face_sort_key(face): | |
| x1, y1, x2, y2 = face.bbox | |
| area = (x2 - x1) * (y2 - y1) | |
| cx = (x1 + x2) / 2 | |
| return (-area, cx) | |
| src_male = sorted([f for f in src_faces if f.gender == 1], key=face_sort_key) | |
| src_female = sorted([f for f in src_faces if f.gender == 0], key=face_sort_key) | |
| tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| pairs = [] | |
| for s, t in zip(src_male, tgt_male): | |
| pairs.append((s, t)) | |
| for s, t in zip(src_female, tgt_female): | |
| pairs.append((s, t)) | |
| if not pairs: | |
| src_faces = sorted(src_faces, key=face_sort_key) | |
| tgt_faces = sorted(tgt_faces, key=face_sort_key) | |
| pairs = list(zip(src_faces, tgt_faces)) | |
| t0 = time.time() | |
| result_img = tgt_bgr.copy() | |
| for src_face, _ in pairs: | |
| if face_analysis_app is None: | |
| raise ValueError("Face analysis models not initialized.") | |
| current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| target_face = candidates[0] | |
| if swapper is None: | |
| raise ValueError("Face swap models not initialized.") | |
| result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| logger.info(f"[Pipeline] Multi-face swap ({len(pairs)} pairs): {time.time()-t0:.2f}s") | |
| logger.info(f"[Pipeline] TOTAL multi_face_swap: {time.time()-pipeline_start:.2f}s") | |
| return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| def face_swap_and_enhance(src_img, tgt_img, temp_dir=None): | |
| try: | |
| with swap_lock: | |
| pipeline_start = time.time() | |
| if temp_dir is None: | |
| temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_work_{uuid.uuid4().hex[:8]}") | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| if face_analysis_app is None: | |
| return None, None, "❌ Face analysis models not initialized." | |
| if swapper is None: | |
| return None, None, "❌ Face swap models not initialized." | |
| src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| t0 = time.time() | |
| src_faces = face_analysis_app.get(src_bgr) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| logger.info(f"[Pipeline] Face detection: {time.time()-t0:.2f}s") | |
| if not src_faces or not tgt_faces: | |
| return None, None, "❌ Face not detected in one of the images" | |
| t0 = time.time() | |
| swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| logger.info(f"[Pipeline] Face swap: {time.time()-t0:.2f}s") | |
| if swapped_bgr is None: | |
| return None, None, "❌ Face swap failed" | |
| # Use in-process CodeFormer enhancement (fast path) | |
| t0 = time.time() | |
| swapped_rgb = cv2.cvtColor(swapped_bgr, cv2.COLOR_BGR2RGB) | |
| try: | |
| enhanced_rgb = enhance_image_with_codeformer(swapped_rgb) | |
| enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| except Exception as e: | |
| logger.error(f"[Pipeline] CodeFormer failed, using raw swap: {e}") | |
| enhanced_bgr = swapped_bgr | |
| logger.info(f"[Pipeline] Enhancement: {time.time()-t0:.2f}s") | |
| final_path = os.path.join(temp_dir, f"result_{uuid.uuid4().hex[:8]}.png") | |
| cv2.imwrite(final_path, enhanced_bgr) | |
| final_img = cv2.cvtColor(enhanced_bgr, cv2.COLOR_BGR2RGB) | |
| logger.info(f"[Pipeline] TOTAL face_swap_and_enhance: {time.time()-pipeline_start:.2f}s") | |
| return final_img, final_path, "" | |
| except Exception as e: | |
| return None, None, f"❌ Error: {str(e)}" | |
| def compress_image( | |
| image_bytes: bytes, | |
| max_size=(1280, 1280), # max width/height | |
| quality=75 # JPEG quality (60–80 is ideal) | |
| ) -> bytes: | |
| """ | |
| Compress image by resizing and lowering quality. | |
| Returns compressed image bytes. | |
| """ | |
| img = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| # Resize while maintaining aspect ratio | |
| img.thumbnail(max_size, Image.LANCZOS) | |
| output = io.BytesIO() | |
| img.save( | |
| output, | |
| format="JPEG", | |
| quality=quality, | |
| optimize=True, | |
| progressive=True | |
| ) | |
| return output.getvalue() | |
| # --------------------- DigitalOcean Spaces Helper --------------------- | |
| def get_spaces_client(): | |
| session = boto3.session.Session() | |
| client = session.client( | |
| 's3', | |
| region_name=DO_SPACES_REGION, | |
| endpoint_url=DO_SPACES_ENDPOINT, | |
| aws_access_key_id=DO_SPACES_KEY, | |
| aws_secret_access_key=DO_SPACES_SECRET, | |
| config=Config(signature_version='s3v4') | |
| ) | |
| return client | |
| def upload_to_spaces(file_bytes, key, content_type="image/png"): | |
| client = get_spaces_client() | |
| client.put_object(Bucket=DO_SPACES_BUCKET, Key=key, Body=file_bytes, ContentType=content_type, ACL='public-read') | |
| return f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{key}" | |
| def download_from_spaces(key): | |
| client = get_spaces_client() | |
| obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key) | |
| return obj['Body'].read() | |
| def mandatory_enhancement(rgb_img): | |
| """ | |
| Always runs CodeFormer on the final image. | |
| Fail-safe: returns original if enhancement fails. | |
| """ | |
| try: | |
| return enhance_image_with_codeformer(rgb_img) | |
| except Exception as e: | |
| logger.error(f"CodeFormer failed, returning original: {e}") | |
| return rgb_img | |
| # --------------------- API Endpoints --------------------- | |
| async def root(): | |
| """Root endpoint""" | |
| return { | |
| "success": True, | |
| "message": "FaceSwap API", | |
| "data": { | |
| "version": "1.0.0", | |
| "Product Name":"Beauty Camera - GlowCam AI Studio", | |
| "Released By" : "LogicGo Infotech" | |
| } | |
| } | |
| async def health(): | |
| return {"status": "healthy"} | |
| async def test_admin_db(): | |
| try: | |
| doc = await admin_db.list_collection_names() | |
| return {"ok": True, "collections": doc} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e), "url": ADMIN_MONGO_URL} | |
| async def face_swap_api( | |
| source: UploadFile = File(...), | |
| image2: Optional[UploadFile] = File(None), | |
| target_category_id: str = Form(None), | |
| new_category_id: str = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| appname: Optional[str] = Form(None), | |
| credentials: HTTPAuthorizationCredentials = Security(security) | |
| ): | |
| start_time = datetime.utcnow() | |
| try: | |
| # ------------------------------------------------------------------ | |
| # VALIDATION | |
| # ------------------------------------------------------------------ | |
| # -------------------------------------------------------------- | |
| # BACKWARD COMPATIBILITY FOR OLD ANDROID VERSIONS | |
| # -------------------------------------------------------------- | |
| if target_category_id == "": | |
| target_category_id = None | |
| if new_category_id == "": | |
| new_category_id = None | |
| if user_id == "": | |
| user_id = None | |
| # media_clicks_collection = get_media_clicks_collection(appname) | |
| media_clicks_collection, subcategories_collection = get_app_db_collections(appname) | |
| logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| if target_category_id and new_category_id: | |
| raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| if not target_category_id and not new_category_id: | |
| raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # ------------------------------------------------------------------ | |
| # READ SOURCE IMAGE | |
| # ------------------------------------------------------------------ | |
| src_bytes = await source.read() | |
| src_key = f"faceswap/source/{uuid.uuid4().hex}_{source.filename}" | |
| upload_to_spaces(src_bytes, src_key, content_type=source.content_type) | |
| # ------------------------------------------------------------------ | |
| # CASE 1 : new_category_id → MongoDB lookup | |
| # ------------------------------------------------------------------ | |
| if new_category_id: | |
| # doc = await subcategories_col.find_one({ | |
| # "asset_images._id": ObjectId(new_category_id) | |
| # }) | |
| doc = await subcategories_collection.find_one({ | |
| "asset_images._id": ObjectId(new_category_id) | |
| }) | |
| if not doc: | |
| raise HTTPException(404, "Asset image not found in database") | |
| # extract correct asset | |
| asset = next( | |
| (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| None | |
| ) | |
| if not asset: | |
| raise HTTPException(404, "Asset image URL not found") | |
| # correct URL | |
| target_url = asset["url"] | |
| # correct categoryId (ObjectId) | |
| #category_oid = doc["categoryId"] # <-- DO NOT CONVERT TO STRING | |
| subcategory_oid = doc["_id"] | |
| # ------------------------------------------------------------------# | |
| # # MEDIA_CLICKS (ONLY IF user_id PRESENT) | |
| # ------------------------------------------------------------------# | |
| if user_id and media_clicks_collection is not None: | |
| try: | |
| user_id_clean = user_id.strip() | |
| if not user_id_clean: | |
| raise ValueError("user_id cannot be empty") | |
| try: | |
| user_oid = ObjectId(user_id_clean) | |
| except (InvalidId, ValueError) as e: | |
| logger.error(f"Invalid user_id format: {user_id_clean}") | |
| raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| now = datetime.utcnow() | |
| # Normalize dates (UTC midnight) | |
| today_date = datetime(now.year, now.month, now.day) | |
| # ------------------------------------------------- | |
| # STEP 1: Ensure root document exists | |
| # ------------------------------------------------- | |
| await media_clicks_collection.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$setOnInsert": { | |
| "userId": user_oid, | |
| "createdAt": now, | |
| "ai_edit_complete": 0, | |
| "ai_edit_daily_count": [] | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 2: Handle DAILY USAGE (BINARY, NO DUPLICATES) | |
| # ------------------------------------------------- | |
| doc = await media_clicks_collection.find_one( | |
| {"userId": user_oid}, | |
| {"ai_edit_daily_count": 1} | |
| ) | |
| daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # Normalize today to UTC midnight | |
| today_date = datetime(now.year, now.month, now.day) | |
| # Build normalized date → count map (THIS ENFORCES UNIQUENESS) | |
| daily_map = {} | |
| for entry in daily_entries: | |
| d = entry["date"] | |
| if isinstance(d, datetime): | |
| d = datetime(d.year, d.month, d.day) | |
| daily_map[d] = entry["count"] # overwrite = no duplicates | |
| # Determine last recorded date | |
| last_date = max(daily_map.keys()) if daily_map else today_date | |
| # Fill ALL missing days with count = 0 | |
| next_day = last_date + timedelta(days=1) | |
| while next_day < today_date: | |
| daily_map.setdefault(next_day, 0) | |
| next_day += timedelta(days=1) | |
| # Mark today as used (binary) | |
| daily_map[today_date] = 1 | |
| # Rebuild list: OLDEST → NEWEST | |
| final_daily_entries = [ | |
| {"date": d, "count": daily_map[d]} | |
| for d in sorted(daily_map.keys()) | |
| ] | |
| # Keep only last 32 days | |
| final_daily_entries = final_daily_entries[-32:] | |
| # Atomic replace | |
| await media_clicks_collection.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "ai_edit_daily_count": final_daily_entries, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 3: Try updating existing subCategory | |
| # ------------------------------------------------- | |
| update_result = await media_clicks_collection.update_one( | |
| { | |
| "userId": user_oid, | |
| "subCategories.subCategoryId": subcategory_oid | |
| }, | |
| { | |
| "$inc": { | |
| "subCategories.$.click_count": 1, | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "subCategories.$.lastClickedAt": now, | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 4: Push subCategory if missing | |
| # ------------------------------------------------- | |
| if update_result.matched_count == 0: | |
| await media_clicks_collection.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$inc": { | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| }, | |
| "$push": { | |
| "subCategories": { | |
| "subCategoryId": subcategory_oid, | |
| "click_count": 1, | |
| "lastClickedAt": now | |
| } | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 5: Sort subCategories by lastClickedAt (ascending - oldest first) | |
| # ------------------------------------------------- | |
| user_doc = await media_clicks_collection.find_one({"userId": user_oid}) | |
| if user_doc and "subCategories" in user_doc: | |
| subcategories = user_doc["subCategories"] | |
| # Sort by lastClickedAt in ascending order (oldest first) | |
| # Handle missing or None dates by using datetime.min | |
| subcategories_sorted = sorted( | |
| subcategories, | |
| key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| ) | |
| # Update with sorted array | |
| await media_clicks_collection.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "subCategories": subcategories_sorted, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| logger.info( | |
| "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| user_id, | |
| str(subcategory_oid) | |
| ) | |
| except Exception as media_err: | |
| logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| elif user_id and media_clicks_collection is None: | |
| logger.warning("Media clicks collection unavailable; skipping media click tracking") | |
| # # ------------------------------------------------------------------ | |
| # # CASE 2 : target_category_id → DigitalOcean path (unchanged logic) | |
| # # ------------------------------------------------------------------ | |
| if target_category_id: | |
| client = get_spaces_client() | |
| base_prefix = "faceswap/target/" | |
| resp = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| ) | |
| # Extract categories from the CommonPrefixes | |
| categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| target_url = None | |
| # --- FIX STARTS HERE --- | |
| for category in categories: | |
| original_prefix = f"faceswap/target/{category}/original/" | |
| thumb_prefix = f"faceswap/target/{category}/thumb/" # Keep for file list check (optional but safe) | |
| # List objects in original/ | |
| original_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| ).get("Contents", []) | |
| # List objects in thumb/ (optional: for the old code's extra check) | |
| thumb_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| ).get("Contents", []) | |
| # Extract only the filenames and filter for .png | |
| original_filenames = sorted([ | |
| obj["Key"].split("/")[-1] for obj in original_objects | |
| if obj["Key"].split("/")[-1].endswith(".png") | |
| ]) | |
| thumb_filenames = [ | |
| obj["Key"].split("/")[-1] for obj in thumb_objects | |
| ] | |
| # Replicate the old indexing logic based on sorted filenames | |
| for idx, filename in enumerate(original_filenames, start=1): | |
| cid = f"{category.lower()}image_{idx}" | |
| # Optional: Replicate the thumb file check for 100% parity | |
| # if filename in thumb_filenames and cid == target_category_id: | |
| # Simpler check just on the ID, assuming thumb files are present | |
| if cid == target_category_id: | |
| # Construct the final target URL using the full prefix and the filename | |
| target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| break | |
| if target_url: | |
| break | |
| # --- FIX ENDS HERE --- | |
| if not target_url: | |
| raise HTTPException(404, "Target categoryId not found") | |
| # # ------------------------------------------------------------------ | |
| # # DOWNLOAD TARGET IMAGE | |
| # # ------------------------------------------------------------------ | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get(target_url) | |
| response.raise_for_status() | |
| tgt_bytes = response.content | |
| src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src_bgr is None or tgt_bgr is None: | |
| raise HTTPException(400, "Invalid image data") | |
| src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # ------------------------------------------------------------------ | |
| # READ OPTIONAL IMAGE2 | |
| # ------------------------------------------------------------------ | |
| img2_rgb = None | |
| if image2: | |
| img2_bytes = await image2.read() | |
| img2_bgr = cv2.imdecode(np.frombuffer(img2_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if img2_bgr is not None: | |
| img2_rgb = cv2.cvtColor(img2_bgr, cv2.COLOR_BGR2RGB) | |
| # ------------------------------------------------------------------ | |
| # FACE SWAP EXECUTION (run in thread to not block event loop) | |
| # ------------------------------------------------------------------ | |
| if img2_rgb is not None: | |
| def _couple_swap(): | |
| pipeline_start = time.time() | |
| src_images = [src_rgb, img2_rgb] | |
| all_src_faces = [] | |
| t0 = time.time() | |
| for img in src_images: | |
| faces = face_analysis_app.get(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| all_src_faces.extend(faces) | |
| tgt_faces = face_analysis_app.get(cv2.cvtColor(tgt_rgb, cv2.COLOR_RGB2BGR)) | |
| logger.info(f"[Pipeline] Couple face detection: {time.time()-t0:.2f}s") | |
| if not all_src_faces: | |
| raise ValueError("No faces detected in source images") | |
| if not tgt_faces: | |
| raise ValueError("No faces detected in target image") | |
| def face_sort_key(face): | |
| x1, y1, x2, y2 = face.bbox | |
| area = (x2 - x1) * (y2 - y1) | |
| cx = (x1 + x2) / 2 | |
| return (-area, cx) | |
| src_male = sorted([f for f in all_src_faces if f.gender == 1], key=face_sort_key) | |
| src_female = sorted([f for f in all_src_faces if f.gender == 0], key=face_sort_key) | |
| tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| pairs = [] | |
| for s, t in zip(src_male, tgt_male): | |
| pairs.append((s, t)) | |
| for s, t in zip(src_female, tgt_female): | |
| pairs.append((s, t)) | |
| if not pairs: | |
| src_all = sorted(all_src_faces, key=face_sort_key) | |
| tgt_all = sorted(tgt_faces, key=face_sort_key) | |
| pairs = list(zip(src_all, tgt_all)) | |
| t0 = time.time() | |
| with swap_lock: | |
| result_img = cv2.cvtColor(tgt_rgb, cv2.COLOR_RGB2BGR) | |
| for src_face, _ in pairs: | |
| current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| target_face = candidates[0] | |
| result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| logger.info(f"[Pipeline] Couple face swap: {time.time()-t0:.2f}s") | |
| result_rgb_out = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| t0 = time.time() | |
| enhanced_rgb = mandatory_enhancement(result_rgb_out) | |
| logger.info(f"[Pipeline] Couple enhancement: {time.time()-t0:.2f}s") | |
| enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| temp_dir = tempfile.mkdtemp(prefix="faceswap_") | |
| final_path = os.path.join(temp_dir, "result.png") | |
| cv2.imwrite(final_path, enhanced_bgr) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| logger.info(f"[Pipeline] TOTAL couple swap: {time.time()-pipeline_start:.2f}s") | |
| return result_bytes | |
| try: | |
| result_bytes = await asyncio.to_thread(_couple_swap) | |
| except ValueError as ve: | |
| raise HTTPException(400, str(ve)) | |
| else: | |
| # ----- SINGLE SOURCE SWAP (run in thread) ----- | |
| def _single_swap(): | |
| return face_swap_and_enhance(src_rgb, tgt_rgb) | |
| final_img, final_path, err = await asyncio.to_thread(_single_swap) | |
| if err: | |
| raise HTTPException(500, err) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| result_url = upload_to_spaces(result_bytes, result_key) | |
| # ------------------------------------------------- | |
| # COMPRESS IMAGE (2–3 MB target) | |
| # ------------------------------------------------- | |
| compressed_bytes = compress_image( | |
| image_bytes=result_bytes, | |
| max_size=(1280, 1280), | |
| quality=72 | |
| ) | |
| compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| compressed_url = upload_to_spaces( | |
| compressed_bytes, | |
| compressed_key, | |
| content_type="image/jpeg" | |
| ) | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| log_entry = { | |
| "endpoint": "/face-swap", | |
| "status": "success", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time | |
| } | |
| if appname: | |
| log_entry["appname"] = appname | |
| await database.api_logs.insert_one(log_entry) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url, | |
| "Compressed_Image_URL": compressed_url | |
| } | |
| except Exception as e: | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| log_entry = { | |
| "endpoint": "/face-swap", | |
| "status": "fail", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time, | |
| "error": str(e) | |
| } | |
| if appname: | |
| log_entry["appname"] = appname | |
| await database.api_logs.insert_one(log_entry) | |
| raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| async def preview_result(result_key: str): | |
| try: | |
| img_bytes = download_from_spaces(result_key) | |
| except Exception: | |
| raise HTTPException(status_code=404, detail="Result not found") | |
| return Response( | |
| content=img_bytes, | |
| media_type="image/png", | |
| headers={"Content-Disposition": "inline; filename=result.png"} | |
| ) | |
| async def multi_face_swap_api( | |
| source_image: UploadFile = File(...), | |
| target_image: UploadFile = File(...) | |
| ): | |
| start_time = datetime.utcnow() | |
| try: | |
| # ----------------------------- | |
| # Read images | |
| # ----------------------------- | |
| src_bytes = await source_image.read() | |
| tgt_bytes = await target_image.read() | |
| src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src_bgr is None or tgt_bgr is None: | |
| raise HTTPException(400, "Invalid image data") | |
| src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # ----------------------------- | |
| # Multi-face swap (run in thread to not block event loop) | |
| # ----------------------------- | |
| def _multi_swap_and_enhance(): | |
| swapped_rgb = multi_face_swap(src_rgb, tgt_rgb) | |
| return mandatory_enhancement(swapped_rgb) | |
| final_rgb = await asyncio.to_thread(_multi_swap_and_enhance) | |
| final_bgr = cv2.cvtColor(final_rgb, cv2.COLOR_RGB2BGR) | |
| # ----------------------------- | |
| # Save temp result | |
| # ----------------------------- | |
| temp_dir = tempfile.mkdtemp(prefix="multi_faceswap_") | |
| result_path = os.path.join(temp_dir, "result.png") | |
| cv2.imwrite(result_path, final_bgr) | |
| with open(result_path, "rb") as f: | |
| result_bytes = f.read() | |
| # ----------------------------- | |
| # Upload | |
| # ----------------------------- | |
| result_key = f"faceswap/multi/{uuid.uuid4().hex}.png" | |
| result_url = upload_to_spaces( | |
| result_bytes, | |
| result_key, | |
| content_type="image/png" | |
| ) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def face_swap_couple_api( | |
| image1: UploadFile = File(...), | |
| image2: Optional[UploadFile] = File(None), | |
| target_category_id: str = Form(None), | |
| new_category_id: str = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| appname: Optional[str] = Form(None), | |
| credentials: HTTPAuthorizationCredentials = Security(security) | |
| ): | |
| """ | |
| Production-ready face swap endpoint supporting: | |
| - Multiple source images (image1 + optional image2) | |
| - Gender-based pairing | |
| - Merged faces from multiple sources | |
| - Mandatory CodeFormer enhancement | |
| """ | |
| start_time = datetime.utcnow() | |
| try: | |
| # ----------------------------- | |
| # Validate input | |
| # ----------------------------- | |
| if target_category_id == "": | |
| target_category_id = None | |
| if new_category_id == "": | |
| new_category_id = None | |
| if user_id == "": | |
| user_id = None | |
| media_clicks_collection = get_media_clicks_collection(appname) | |
| if target_category_id and new_category_id: | |
| raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| if not target_category_id and not new_category_id: | |
| raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # ----------------------------- | |
| # Read source images | |
| # ----------------------------- | |
| src_images = [] | |
| img1_bytes = await image1.read() | |
| src1 = cv2.imdecode(np.frombuffer(img1_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src1 is None: | |
| raise HTTPException(400, "Invalid image1 data") | |
| src_images.append(cv2.cvtColor(src1, cv2.COLOR_BGR2RGB)) | |
| if image2: | |
| img2_bytes = await image2.read() | |
| src2 = cv2.imdecode(np.frombuffer(img2_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src2 is not None: | |
| src_images.append(cv2.cvtColor(src2, cv2.COLOR_BGR2RGB)) | |
| # ----------------------------- | |
| # Resolve target image | |
| # ----------------------------- | |
| target_url = None | |
| if new_category_id: | |
| doc = await subcategories_col.find_one({ | |
| "asset_images._id": ObjectId(new_category_id) | |
| }) | |
| if not doc: | |
| raise HTTPException(404, "Asset image not found in database") | |
| asset = next( | |
| (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| None | |
| ) | |
| if not asset: | |
| raise HTTPException(404, "Asset image URL not found") | |
| target_url = asset["url"] | |
| subcategory_oid = doc["_id"] | |
| if user_id and media_clicks_collection is not None: | |
| try: | |
| user_id_clean = user_id.strip() | |
| if not user_id_clean: | |
| raise ValueError("user_id cannot be empty") | |
| try: | |
| user_oid = ObjectId(user_id_clean) | |
| except (InvalidId, ValueError): | |
| logger.error(f"Invalid user_id format: {user_id_clean}") | |
| raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| now = datetime.utcnow() | |
| # Step 1: ensure root document exists | |
| await media_clicks_collection.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$setOnInsert": { | |
| "userId": user_oid, | |
| "createdAt": now, | |
| "ai_edit_complete": 0, | |
| "ai_edit_daily_count": [] | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| # Step 2: handle daily usage (binary, no duplicates) | |
| doc = await media_clicks_collection.find_one( | |
| {"userId": user_oid}, | |
| {"ai_edit_daily_count": 1} | |
| ) | |
| daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| today_date = datetime(now.year, now.month, now.day) | |
| daily_map = {} | |
| for entry in daily_entries: | |
| d = entry["date"] | |
| if isinstance(d, datetime): | |
| d = datetime(d.year, d.month, d.day) | |
| daily_map[d] = entry["count"] | |
| last_date = max(daily_map.keys()) if daily_map else None | |
| if last_date != today_date: | |
| daily_map[today_date] = 1 | |
| final_daily_entries = [ | |
| {"date": d, "count": daily_map[d]} | |
| for d in sorted(daily_map.keys()) | |
| ] | |
| final_daily_entries = final_daily_entries[-32:] | |
| await media_clicks_collection.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "ai_edit_daily_count": final_daily_entries, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # Step 3: try updating existing subCategory | |
| update_result = await media_clicks_collection.update_one( | |
| { | |
| "userId": user_oid, | |
| "subCategories.subCategoryId": subcategory_oid | |
| }, | |
| { | |
| "$inc": { | |
| "subCategories.$.click_count": 1, | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "subCategories.$.lastClickedAt": now, | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # Step 4: push subCategory if missing | |
| if update_result.matched_count == 0: | |
| await media_clicks_collection.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$inc": { | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| }, | |
| "$push": { | |
| "subCategories": { | |
| "subCategoryId": subcategory_oid, | |
| "click_count": 1, | |
| "lastClickedAt": now | |
| } | |
| } | |
| } | |
| ) | |
| # Step 5: sort subCategories by lastClickedAt (ascending) | |
| user_doc = await media_clicks_collection.find_one({"userId": user_oid}) | |
| if user_doc and "subCategories" in user_doc: | |
| subcategories = user_doc["subCategories"] | |
| subcategories_sorted = sorted( | |
| subcategories, | |
| key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| ) | |
| await media_clicks_collection.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "subCategories": subcategories_sorted, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| logger.info( | |
| "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| user_id, | |
| str(subcategory_oid) | |
| ) | |
| except Exception as media_err: | |
| logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| elif user_id and media_clicks_collection is None: | |
| logger.warning("Media clicks collection unavailable; skipping media click tracking") | |
| if target_category_id: | |
| client = get_spaces_client() | |
| base_prefix = "faceswap/target/" | |
| resp = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| ) | |
| categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| for category in categories: | |
| original_prefix = f"faceswap/target/{category}/original/" | |
| thumb_prefix = f"faceswap/target/{category}/thumb/" | |
| original_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| ).get("Contents", []) | |
| thumb_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| ).get("Contents", []) | |
| original_filenames = sorted([ | |
| obj["Key"].split("/")[-1] for obj in original_objects | |
| if obj["Key"].split("/")[-1].endswith(".png") | |
| ]) | |
| for idx, filename in enumerate(original_filenames, start=1): | |
| cid = f"{category.lower()}image_{idx}" | |
| if cid == target_category_id: | |
| target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| break | |
| if target_url: | |
| break | |
| if not target_url: | |
| raise HTTPException(404, "Target categoryId not found") | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get(target_url) | |
| response.raise_for_status() | |
| tgt_bytes = response.content | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if tgt_bgr is None: | |
| raise HTTPException(400, "Invalid target image data") | |
| # ----------------------------- | |
| # Couple face swap + enhance (run in thread) | |
| # ----------------------------- | |
| def _couple_face_swap_and_enhance(): | |
| pipeline_start = time.time() | |
| all_src_faces = [] | |
| t0 = time.time() | |
| for img in src_images: | |
| faces = face_analysis_app.get(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| all_src_faces.extend(faces) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| logger.info(f"[Pipeline] Couple-ep face detection: {time.time()-t0:.2f}s") | |
| if not all_src_faces: | |
| raise ValueError("No faces detected in source images") | |
| if not tgt_faces: | |
| raise ValueError("No faces detected in target image") | |
| def face_sort_key(face): | |
| x1, y1, x2, y2 = face.bbox | |
| area = (x2 - x1) * (y2 - y1) | |
| cx = (x1 + x2) / 2 | |
| return (-area, cx) | |
| src_male = sorted([f for f in all_src_faces if f.gender == 1], key=face_sort_key) | |
| src_female = sorted([f for f in all_src_faces if f.gender == 0], key=face_sort_key) | |
| tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| pairs = [] | |
| for s, t in zip(src_male, tgt_male): | |
| pairs.append((s, t)) | |
| for s, t in zip(src_female, tgt_female): | |
| pairs.append((s, t)) | |
| if not pairs: | |
| src_all = sorted(all_src_faces, key=face_sort_key) | |
| tgt_all = sorted(tgt_faces, key=face_sort_key) | |
| pairs = list(zip(src_all, tgt_all)) | |
| t0 = time.time() | |
| with swap_lock: | |
| result_img = tgt_bgr.copy() | |
| for src_face, _ in pairs: | |
| current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| target_face = candidates[0] | |
| result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| logger.info(f"[Pipeline] Couple-ep face swap: {time.time()-t0:.2f}s") | |
| result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| t0 = time.time() | |
| enhanced_rgb = mandatory_enhancement(result_rgb) | |
| logger.info(f"[Pipeline] Couple-ep enhancement: {time.time()-t0:.2f}s") | |
| enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| temp_dir = tempfile.mkdtemp(prefix="faceswap_") | |
| final_path = os.path.join(temp_dir, "result.png") | |
| cv2.imwrite(final_path, enhanced_bgr) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| logger.info(f"[Pipeline] TOTAL couple-ep swap: {time.time()-pipeline_start:.2f}s") | |
| return result_bytes | |
| try: | |
| result_bytes = await asyncio.to_thread(_couple_face_swap_and_enhance) | |
| except ValueError as ve: | |
| raise HTTPException(400, str(ve)) | |
| result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| result_url = upload_to_spaces(result_bytes, result_key) | |
| compressed_bytes = compress_image(result_bytes, max_size=(1280, 1280), quality=72) | |
| compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| compressed_url = upload_to_spaces(compressed_bytes, compressed_key, content_type="image/jpeg") | |
| # ----------------------------- | |
| # Log API usage | |
| # ----------------------------- | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| log_entry = { | |
| "endpoint": "/face-swap-couple", | |
| "status": "success", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time | |
| } | |
| if appname: | |
| log_entry["appname"] = appname | |
| await database.api_logs.insert_one(log_entry) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url, | |
| "compressed_url": compressed_url | |
| } | |
| except Exception as e: | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| log_entry = { | |
| "endpoint": "/face-swap-couple", | |
| "status": "fail", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time, | |
| "error": str(e) | |
| } | |
| if appname: | |
| log_entry["appname"] = appname | |
| await database.api_logs.insert_one(log_entry) | |
| raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| if __name__ == "__main__": | |
| uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |
| # # # --------------------- List Images Endpoint --------------------- | |
| # # import os | |
| # # os.environ["OMP_NUM_THREADS"] = "1" | |
| # # import shutil | |
| # # import uuid | |
| # # import cv2 | |
| # # import numpy as np | |
| # # import threading | |
| # # import subprocess | |
| # # import logging | |
| # # import tempfile | |
| # # import sys | |
| # # from datetime import datetime,timedelta | |
| # # import tempfile | |
| # # import insightface | |
| # # from insightface.app import FaceAnalysis | |
| # # from huggingface_hub import hf_hub_download | |
| # # from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Depends, Security, Form | |
| # # from fastapi.responses import RedirectResponse | |
| # # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| # # from motor.motor_asyncio import AsyncIOMotorClient | |
| # # from bson import ObjectId | |
| # # from bson.errors import InvalidId | |
| # # import httpx | |
| # # import uvicorn | |
| # # import gradio as gr | |
| # # from gradio import mount_gradio_app | |
| # # from PIL import Image | |
| # # import io | |
| # # # from scipy import ndimage | |
| # # # DigitalOcean Spaces | |
| # # import boto3 | |
| # # from botocore.client import Config | |
| # # from typing import Optional | |
| # # # --------------------- Logging --------------------- | |
| # # logging.basicConfig(level=logging.INFO) | |
| # # logger = logging.getLogger(__name__) | |
| # # # --------------------- Secrets & Paths --------------------- | |
| # # REPO_ID = "HariLogicgo/face_swap_models" | |
| # # MODELS_DIR = "./models" | |
| # # os.makedirs(MODELS_DIR, exist_ok=True) | |
| # # HF_TOKEN = os.getenv("HF_TOKEN") | |
| # # API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| # # DO_SPACES_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| # # DO_SPACES_ENDPOINT = f"https://{DO_SPACES_REGION}.digitaloceanspaces.com" | |
| # # DO_SPACES_KEY = os.getenv("DO_SPACES_KEY") | |
| # # DO_SPACES_SECRET = os.getenv("DO_SPACES_SECRET") | |
| # # DO_SPACES_BUCKET = os.getenv("DO_SPACES_BUCKET") | |
| # # # NEW admin DB (with error handling for missing env vars) | |
| # # ADMIN_MONGO_URL = os.getenv("ADMIN_MONGO_URL") | |
| # # admin_client = None | |
| # # admin_db = None | |
| # # subcategories_col = None | |
| # # media_clicks_col = None | |
| # # if ADMIN_MONGO_URL: | |
| # # try: | |
| # # admin_client = AsyncIOMotorClient(ADMIN_MONGO_URL) | |
| # # admin_db = admin_client.adminPanel | |
| # # subcategories_col = admin_db.subcategories | |
| # # media_clicks_col = admin_db.media_clicks | |
| # # except Exception as e: | |
| # # logger.warning(f"MongoDB admin connection failed (optional): {e}") | |
| # # # Collage Maker DB (optional) | |
| # # COLLAGE_MAKER_DB_URL = os.getenv("COLLAGE_MAKER_DB_URL") | |
| # # collage_maker_client = None | |
| # # collage_maker_db = None | |
| # # collage_media_clicks_col = None | |
| # # if COLLAGE_MAKER_DB_URL: | |
| # # try: | |
| # # collage_maker_client = AsyncIOMotorClient(COLLAGE_MAKER_DB_URL) | |
| # # collage_maker_db = collage_maker_client.adminPanel | |
| # # collage_media_clicks_col = collage_maker_db.media_clicks | |
| # # except Exception as e: | |
| # # logger.warning(f"MongoDB ai-enhancer connection failed (optional): {e}") | |
| # # # AI Enhancer DB (optional) | |
| # # AI_ENHANCER_DB_URL = os.getenv("AI_ENHANCER_DB_URL") | |
| # # ai_enhancer_client = None | |
| # # ai_enhancer_db = None | |
| # # ai_enhancer_media_clicks_col = None | |
| # # ai_enhancer_subcategories_col = None | |
| # # if AI_ENHANCER_DB_URL: | |
| # # try: | |
| # # ai_enhancer_client = AsyncIOMotorClient(AI_ENHANCER_DB_URL) | |
| # # ai_enhancer_db = ai_enhancer_client.test # 🔴 test database | |
| # # ai_enhancer_media_clicks_col = ai_enhancer_db.media_clicks | |
| # # ai_enhancer_subcategories_col = ai_enhancer_db.subcategories | |
| # # except Exception as e: | |
| # # logger.warning(f"MongoDB ai-enhancer connection failed (optional): {e}") | |
| # # # OLD logs DB | |
| # # MONGODB_URL = os.getenv("MONGODB_URL") | |
| # # client = None | |
| # # database = None | |
| # # # --------------------- Download Models --------------------- | |
| # # def download_models(): | |
| # # try: | |
| # # logger.info("Downloading models...") | |
| # # inswapper_path = hf_hub_download( | |
| # # repo_id=REPO_ID, | |
| # # filename="models/inswapper_128.onnx", | |
| # # repo_type="model", | |
| # # local_dir=MODELS_DIR, | |
| # # token=HF_TOKEN | |
| # # ) | |
| # # buffalo_files = ["1k3d68.onnx", "2d106det.onnx", "genderage.onnx", "det_10g.onnx", "w600k_r50.onnx"] | |
| # # for f in buffalo_files: | |
| # # hf_hub_download( | |
| # # repo_id=REPO_ID, | |
| # # filename=f"models/buffalo_l/" + f, | |
| # # repo_type="model", | |
| # # local_dir=MODELS_DIR, | |
| # # token=HF_TOKEN | |
| # # ) | |
| # # logger.info("Models downloaded successfully.") | |
| # # return inswapper_path | |
| # # except Exception as e: | |
| # # logger.error(f"Model download failed: {e}") | |
| # # raise | |
| # # try: | |
| # # inswapper_path = download_models() | |
| # # # --------------------- Face Analysis + Swapper --------------------- | |
| # # providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| # # face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| # # face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| # # swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| # # logger.info("Face analysis models loaded successfully") | |
| # # except Exception as e: | |
| # # logger.error(f"Failed to initialize face analysis models: {e}") | |
| # # # Set defaults to prevent crash | |
| # # inswapper_path = None | |
| # # face_analysis_app = None | |
| # # swapper = None | |
| # # # --------------------- CodeFormer --------------------- | |
| # # CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| # # def ensure_codeformer(): | |
| # # try: | |
| # # if not os.path.exists("CodeFormer"): | |
| # # logger.info("CodeFormer not found, cloning repository...") | |
| # # subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| # # subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=False) # Non-critical deps | |
| # # # Always ensure BasicSR is installed from local directory | |
| # # # This is needed for Hugging Face Spaces where BasicSR can't be installed from GitHub | |
| # # if os.path.exists("CodeFormer/basicsr/setup.py"): | |
| # # logger.info("Installing BasicSR from local directory...") | |
| # # subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True) | |
| # # logger.info("BasicSR installed successfully") | |
| # # # Install realesrgan after BasicSR is installed (realesrgan depends on BasicSR) | |
| # # # This must be done after BasicSR installation to avoid PyPI install issues | |
| # # try: | |
| # # import realesrgan | |
| # # logger.info("RealESRGAN already installed") | |
| # # except ImportError: | |
| # # logger.info("Installing RealESRGAN...") | |
| # # subprocess.run("pip install --no-cache-dir realesrgan", shell=True, check=True) | |
| # # logger.info("RealESRGAN installed successfully") | |
| # # # Download models if CodeFormer exists (fixed logic) | |
| # # if os.path.exists("CodeFormer"): | |
| # # try: | |
| # # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=False, timeout=300) | |
| # # except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| # # logger.warning("Failed to download facelib models (optional)") | |
| # # try: | |
| # # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=False, timeout=300) | |
| # # except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| # # logger.warning("Failed to download CodeFormer models (optional)") | |
| # # except Exception as e: | |
| # # logger.error(f"CodeFormer setup failed: {e}") | |
| # # logger.warning("Continuing without CodeFormer features...") | |
| # # ensure_codeformer() | |
| # # # --------------------- FastAPI --------------------- | |
| # # fastapi_app = FastAPI() | |
| # # @fastapi_app.on_event("startup") | |
| # # async def startup_db(): | |
| # # global client, database | |
| # # if MONGODB_URL: | |
| # # try: | |
| # # logger.info("Initializing MongoDB for API logs...") | |
| # # client = AsyncIOMotorClient(MONGODB_URL) | |
| # # database = client.FaceSwap | |
| # # logger.info("MongoDB initialized for API logs") | |
| # # except Exception as e: | |
| # # logger.warning(f"MongoDB connection failed (optional): {e}") | |
| # # client = None | |
| # # database = None | |
| # # else: | |
| # # logger.warning("MONGODB_URL not set, skipping MongoDB initialization") | |
| # # @fastapi_app.on_event("shutdown") | |
| # # async def shutdown_db(): | |
| # # global client, admin_client, collage_maker_client | |
| # # if client is not None: | |
| # # client.close() | |
| # # logger.info("MongoDB connection closed") | |
| # # if admin_client is not None: | |
| # # admin_client.close() | |
| # # logger.info("Admin MongoDB connection closed") | |
| # # if collage_maker_client is not None: | |
| # # collage_maker_client.close() | |
| # # logger.info("Collage Maker MongoDB connection closed") | |
| # # # --------------------- Auth --------------------- | |
| # # security = HTTPBearer() | |
| # # def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| # # if credentials.credentials != API_SECRET_TOKEN: | |
| # # raise HTTPException(status_code=401, detail="Invalid or missing token") | |
| # # return credentials.credentials | |
| # # # --------------------- DB Selector --------------------- | |
| # # # def get_media_clicks_collection(appname: Optional[str] = None): | |
| # # # """ | |
| # # # Returns the correct media_clicks collection based on appname. | |
| # # # Defaults to the primary admin database when no appname is provided | |
| # # # or when the requested database is unavailable. | |
| # # # """ | |
| # # # if appname: | |
| # # # normalized = appname.strip().lower() | |
| # # # if normalized == "collage-maker": | |
| # # # if collage_media_clicks_col is not None: | |
| # # # return collage_media_clicks_col | |
| # # # logger.warning("COLLAGE_MAKER_DB_URL not configured; falling back to default media_clicks collection") | |
| # # # return media_clicks_col | |
| # # def get_app_db_collections(appname: Optional[str] = None): | |
| # # """ | |
| # # Returns (media_clicks_collection, subcategories_collection) | |
| # # based on appname. | |
| # # """ | |
| # # if appname: | |
| # # app = appname.strip().lower() | |
| # # if app == "collage-maker": | |
| # # if collage_media_clicks_col is not None and subcategories_col is not None: | |
| # # return collage_media_clicks_col, subcategories_col | |
| # # logger.warning("Collage-maker DB not configured, falling back to admin") | |
| # # elif app == "ai-enhancer": | |
| # # if ai_enhancer_media_clicks_col is not None and ai_enhancer_subcategories_col is not None: | |
| # # return ai_enhancer_media_clicks_col, ai_enhancer_subcategories_col | |
| # # logger.warning("AI-Enhancer DB not configured, falling back to admin") | |
| # # # default fallback | |
| # # return media_clicks_col, subcategories_col | |
| # # # --------------------- Logging API Hits --------------------- | |
| # # async def log_faceswap_hit(token: str, status: str = "success"): | |
| # # global database | |
| # # if database is None: | |
| # # return | |
| # # await database.api_logs.insert_one({ | |
| # # "token": token, | |
| # # "endpoint": "/faceswap", | |
| # # "status": status, | |
| # # "timestamp": datetime.utcnow() | |
| # # }) | |
| # # # --------------------- Face Swap Pipeline --------------------- | |
| # # swap_lock = threading.Lock() | |
| # # def enhance_image_with_codeformer(rgb_img, temp_dir=None): | |
| # # if temp_dir is None: | |
| # # temp_dir = os.path.join(tempfile.gettempdir(), f"enhance_{uuid.uuid4().hex[:8]}") | |
| # # os.makedirs(temp_dir, exist_ok=True) | |
| # # input_path = os.path.join(temp_dir, "input.jpg") | |
| # # cv2.imwrite(input_path, cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR)) | |
| # # python_cmd = sys.executable if sys.executable else "python3" | |
| # # cmd = ( | |
| # # f"{python_cmd} {CODEFORMER_PATH} " | |
| # # f"-w 0.7 " | |
| # # f"--input_path {input_path} " | |
| # # f"--output_path {temp_dir} " | |
| # # f"--bg_upsampler realesrgan " | |
| # # f"--face_upsample" | |
| # # ) | |
| # # result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| # # if result.returncode != 0: | |
| # # raise RuntimeError(result.stderr) | |
| # # final_dir = os.path.join(temp_dir, "final_results") | |
| # # files = [f for f in os.listdir(final_dir) if f.endswith(".png")] | |
| # # if not files: | |
| # # raise RuntimeError("No enhanced output") | |
| # # final_path = os.path.join(final_dir, files[0]) | |
| # # enhanced = cv2.imread(final_path) | |
| # # return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB) | |
| # # def multi_face_swap(src_img, tgt_img): | |
| # # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # # src_faces = face_analysis_app.get(src_bgr) | |
| # # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # # if not src_faces or not tgt_faces: | |
| # # raise ValueError("No faces detected") | |
| # # def face_sort_key(face): | |
| # # x1, y1, x2, y2 = face.bbox | |
| # # area = (x2 - x1) * (y2 - y1) | |
| # # cx = (x1 + x2) / 2 | |
| # # return (-area, cx) | |
| # # # Split by gender | |
| # # src_male = [f for f in src_faces if f.gender == 1] | |
| # # src_female = [f for f in src_faces if f.gender == 0] | |
| # # tgt_male = [f for f in tgt_faces if f.gender == 1] | |
| # # tgt_female = [f for f in tgt_faces if f.gender == 0] | |
| # # # Sort inside gender groups | |
| # # src_male = sorted(src_male, key=face_sort_key) | |
| # # src_female = sorted(src_female, key=face_sort_key) | |
| # # tgt_male = sorted(tgt_male, key=face_sort_key) | |
| # # tgt_female = sorted(tgt_female, key=face_sort_key) | |
| # # # Build final swap pairs | |
| # # pairs = [] | |
| # # for s, t in zip(src_male, tgt_male): | |
| # # pairs.append((s, t)) | |
| # # for s, t in zip(src_female, tgt_female): | |
| # # pairs.append((s, t)) | |
| # # # Fallback if gender mismatch | |
| # # if not pairs: | |
| # # src_faces = sorted(src_faces, key=face_sort_key) | |
| # # tgt_faces = sorted(tgt_faces, key=face_sort_key) | |
| # # pairs = list(zip(src_faces, tgt_faces)) | |
| # # result_img = tgt_bgr.copy() | |
| # # for src_face, _ in pairs: | |
| # # # 🔁 re-detect current target faces | |
| # # if face_analysis_app is None: | |
| # # raise ValueError("Face analysis models not initialized. Please ensure models are downloaded.") | |
| # # current_faces = face_analysis_app.get(result_img) | |
| # # current_faces = sorted(current_faces, key=face_sort_key) | |
| # # # choose best matching gender | |
| # # candidates = [ | |
| # # f for f in current_faces if f.gender == src_face.gender | |
| # # ] or current_faces | |
| # # target_face = candidates[0] | |
| # # if swapper is None: | |
| # # raise ValueError("Face swap models not initialized. Please ensure models are downloaded.") | |
| # # result_img = swapper.get( | |
| # # result_img, | |
| # # target_face, | |
| # # src_face, | |
| # # paste_back=True | |
| # # ) | |
| # # return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| # # def face_swap_and_enhance(src_img, tgt_img, temp_dir=None): | |
| # # try: | |
| # # with swap_lock: | |
| # # # Use a temp dir for intermediate files | |
| # # if temp_dir is None: | |
| # # temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_work_{uuid.uuid4().hex[:8]}") | |
| # # if os.path.exists(temp_dir): | |
| # # shutil.rmtree(temp_dir) | |
| # # os.makedirs(temp_dir, exist_ok=True) | |
| # # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # # src_faces = face_analysis_app.get(src_bgr) | |
| # # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # # if face_analysis_app is None: | |
| # # return None, None, "❌ Face analysis models not initialized. Please ensure models are downloaded." | |
| # # if not src_faces or not tgt_faces: | |
| # # return None, None, "❌ Face not detected in one of the images" | |
| # # swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg") | |
| # # if swapper is None: | |
| # # return None, None, "❌ Face swap models not initialized. Please ensure models are downloaded." | |
| # # swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| # # if swapped_bgr is None: | |
| # # return None, None, "❌ Face swap failed" | |
| # # cv2.imwrite(swapped_path, swapped_bgr) | |
| # # python_cmd = sys.executable if sys.executable else "python3" | |
| # # cmd = f"{python_cmd} {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample" | |
| # # result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| # # if result.returncode != 0: | |
| # # return None, None, f"❌ CodeFormer failed:\n{result.stderr}" | |
| # # final_results_dir = os.path.join(temp_dir, "final_results") | |
| # # final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")] | |
| # # if not final_files: | |
| # # return None, None, "❌ No enhanced image found" | |
| # # final_path = os.path.join(final_results_dir, final_files[0]) | |
| # # final_img_bgr = cv2.imread(final_path) | |
| # # if final_img_bgr is None: | |
| # # return None, None, "❌ Failed to read enhanced image file" | |
| # # final_img = cv2.cvtColor(final_img_bgr, cv2.COLOR_BGR2RGB) | |
| # # return final_img, final_path, "" | |
| # # except Exception as e: | |
| # # return None, None, f"❌ Error: {str(e)}" | |
| # # def compress_image( | |
| # # image_bytes: bytes, | |
| # # max_size=(1280, 1280), # max width/height | |
| # # quality=75 # JPEG quality (60–80 is ideal) | |
| # # ) -> bytes: | |
| # # """ | |
| # # Compress image by resizing and lowering quality. | |
| # # Returns compressed image bytes. | |
| # # """ | |
| # # img = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| # # # Resize while maintaining aspect ratio | |
| # # img.thumbnail(max_size, Image.LANCZOS) | |
| # # output = io.BytesIO() | |
| # # img.save( | |
| # # output, | |
| # # format="JPEG", | |
| # # quality=quality, | |
| # # optimize=True, | |
| # # progressive=True | |
| # # ) | |
| # # return output.getvalue() | |
| # # # --------------------- DigitalOcean Spaces Helper --------------------- | |
| # # def get_spaces_client(): | |
| # # session = boto3.session.Session() | |
| # # client = session.client( | |
| # # 's3', | |
| # # region_name=DO_SPACES_REGION, | |
| # # endpoint_url=DO_SPACES_ENDPOINT, | |
| # # aws_access_key_id=DO_SPACES_KEY, | |
| # # aws_secret_access_key=DO_SPACES_SECRET, | |
| # # config=Config(signature_version='s3v4') | |
| # # ) | |
| # # return client | |
| # # def upload_to_spaces(file_bytes, key, content_type="image/png"): | |
| # # client = get_spaces_client() | |
| # # client.put_object(Bucket=DO_SPACES_BUCKET, Key=key, Body=file_bytes, ContentType=content_type, ACL='public-read') | |
| # # return f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{key}" | |
| # # def download_from_spaces(key): | |
| # # client = get_spaces_client() | |
| # # obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key) | |
| # # return obj['Body'].read() | |
| # # def build_multi_faceswap_gradio(): | |
| # # with gr.Blocks() as demo: | |
| # # gr.Markdown("## 👩❤️👨 Multi Face Swap (Couple → Couple)") | |
| # # with gr.Row(): | |
| # # src = gr.Image(type="numpy", label="Source Image (2 Faces)") | |
| # # tgt = gr.Image(type="numpy", label="Target Image (2 Faces)") | |
| # # out = gr.Image(type="numpy", label="Swapped Result") | |
| # # error = gr.Textbox(label="Logs", interactive=False) | |
| # # def process(src_img, tgt_img): | |
| # # try: | |
| # # swapped = multi_face_swap(src_img, tgt_img) | |
| # # enhanced = enhance_image_with_codeformer(swapped) | |
| # # return enhanced, "" | |
| # # except Exception as e: | |
| # # return None, str(e) | |
| # # btn = gr.Button("Swap Faces") | |
| # # btn.click(process, [src, tgt], [out, error]) | |
| # # return demo | |
| # # def mandatory_enhancement(rgb_img): | |
| # # """ | |
| # # Always runs CodeFormer on the final image. | |
| # # Fail-safe: returns original if enhancement fails. | |
| # # """ | |
| # # try: | |
| # # return enhance_image_with_codeformer(rgb_img) | |
| # # except Exception as e: | |
| # # logger.error(f"CodeFormer failed, returning original: {e}") | |
| # # return rgb_img | |
| # # # --------------------- API Endpoints --------------------- | |
| # # @fastapi_app.get("/") | |
| # # async def root(): | |
| # # """Root endpoint""" | |
| # # return { | |
| # # "success": True, | |
| # # "message": "FaceSwap API", | |
| # # "data": { | |
| # # "version": "1.0.0", | |
| # # "Product Name":"Beauty Camera - GlowCam AI Studio", | |
| # # "Released By" : "LogicGo Infotech" | |
| # # } | |
| # # } | |
| # # @fastapi_app.get("/health") | |
| # # async def health(): | |
| # # return {"status": "healthy"} | |
| # # from fastapi import Form | |
| # # import requests | |
| # # @fastapi_app.get("/test-admin-db") | |
| # # async def test_admin_db(): | |
| # # try: | |
| # # doc = await admin_db.list_collection_names() | |
| # # return {"ok": True, "collections": doc} | |
| # # except Exception as e: | |
| # # return {"ok": False, "error": str(e), "url": ADMIN_MONGO_URL} | |
| # # @fastapi_app.post("/face-swap", dependencies=[Depends(verify_token)]) | |
| # # async def face_swap_api( | |
| # # source: UploadFile = File(...), | |
| # # target_category_id: str = Form(None), | |
| # # new_category_id: str = Form(None), | |
| # # user_id: Optional[str] = Form(None), | |
| # # appname: Optional[str] = Form(None), | |
| # # credentials: HTTPAuthorizationCredentials = Security(security) | |
| # # ): | |
| # # start_time = datetime.utcnow() | |
| # # try: | |
| # # # ------------------------------------------------------------------ | |
| # # # VALIDATION | |
| # # # ------------------------------------------------------------------ | |
| # # # -------------------------------------------------------------- | |
| # # # BACKWARD COMPATIBILITY FOR OLD ANDROID VERSIONS | |
| # # # -------------------------------------------------------------- | |
| # # if target_category_id == "": | |
| # # target_category_id = None | |
| # # if new_category_id == "": | |
| # # new_category_id = None | |
| # # if user_id == "": | |
| # # user_id = None | |
| # # # media_clicks_collection = get_media_clicks_collection(appname) | |
| # # media_clicks_collection, subcategories_collection = get_app_db_collections(appname) | |
| # # logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # # if target_category_id and new_category_id: | |
| # # raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| # # if not target_category_id and not new_category_id: | |
| # # raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # # # ------------------------------------------------------------------ | |
| # # # READ SOURCE IMAGE | |
| # # # ------------------------------------------------------------------ | |
| # # src_bytes = await source.read() | |
| # # src_key = f"faceswap/source/{uuid.uuid4().hex}_{source.filename}" | |
| # # upload_to_spaces(src_bytes, src_key, content_type=source.content_type) | |
| # # # ------------------------------------------------------------------ | |
| # # # CASE 1 : new_category_id → MongoDB lookup | |
| # # # ------------------------------------------------------------------ | |
| # # if new_category_id: | |
| # # # doc = await subcategories_col.find_one({ | |
| # # # "asset_images._id": ObjectId(new_category_id) | |
| # # # }) | |
| # # doc = await subcategories_collection.find_one({ | |
| # # "asset_images._id": ObjectId(new_category_id) | |
| # # }) | |
| # # if not doc: | |
| # # raise HTTPException(404, "Asset image not found in database") | |
| # # # extract correct asset | |
| # # asset = next( | |
| # # (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| # # None | |
| # # ) | |
| # # if not asset: | |
| # # raise HTTPException(404, "Asset image URL not found") | |
| # # # correct URL | |
| # # target_url = asset["url"] | |
| # # # correct categoryId (ObjectId) | |
| # # #category_oid = doc["categoryId"] # <-- DO NOT CONVERT TO STRING | |
| # # subcategory_oid = doc["_id"] | |
| # # # ------------------------------------------------------------------# | |
| # # # # MEDIA_CLICKS (ONLY IF user_id PRESENT) | |
| # # # ------------------------------------------------------------------# | |
| # # if user_id and media_clicks_collection is not None: | |
| # # try: | |
| # # user_id_clean = user_id.strip() | |
| # # if not user_id_clean: | |
| # # raise ValueError("user_id cannot be empty") | |
| # # try: | |
| # # user_oid = ObjectId(user_id_clean) | |
| # # except (InvalidId, ValueError) as e: | |
| # # logger.error(f"Invalid user_id format: {user_id_clean}") | |
| # # raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| # # now = datetime.utcnow() | |
| # # # Normalize dates (UTC midnight) | |
| # # today_date = datetime(now.year, now.month, now.day) | |
| # # # ------------------------------------------------- | |
| # # # STEP 1: Ensure root document exists | |
| # # # ------------------------------------------------- | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$setOnInsert": { | |
| # # "userId": user_oid, | |
| # # "createdAt": now, | |
| # # "ai_edit_complete": 0, | |
| # # "ai_edit_daily_count": [] | |
| # # } | |
| # # }, | |
| # # upsert=True | |
| # # ) | |
| # # # ------------------------------------------------- | |
| # # # STEP 2: Handle DAILY USAGE (BINARY, NO DUPLICATES) | |
| # # # ------------------------------------------------- | |
| # # doc = await media_clicks_collection.find_one( | |
| # # {"userId": user_oid}, | |
| # # {"ai_edit_daily_count": 1} | |
| # # ) | |
| # # daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # # # Normalize today to UTC midnight | |
| # # today_date = datetime(now.year, now.month, now.day) | |
| # # # Build normalized date → count map (THIS ENFORCES UNIQUENESS) | |
| # # daily_map = {} | |
| # # for entry in daily_entries: | |
| # # d = entry["date"] | |
| # # if isinstance(d, datetime): | |
| # # d = datetime(d.year, d.month, d.day) | |
| # # daily_map[d] = entry["count"] # overwrite = no duplicates | |
| # # # Determine last recorded date | |
| # # last_date = max(daily_map.keys()) if daily_map else today_date | |
| # # # Fill ALL missing days with count = 0 | |
| # # next_day = last_date + timedelta(days=1) | |
| # # while next_day < today_date: | |
| # # daily_map.setdefault(next_day, 0) | |
| # # next_day += timedelta(days=1) | |
| # # # Mark today as used (binary) | |
| # # daily_map[today_date] = 1 | |
| # # # Rebuild list: OLDEST → NEWEST | |
| # # final_daily_entries = [ | |
| # # {"date": d, "count": daily_map[d]} | |
| # # for d in sorted(daily_map.keys()) | |
| # # ] | |
| # # # Keep only last 32 days | |
| # # final_daily_entries = final_daily_entries[-32:] | |
| # # # Atomic replace | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$set": { | |
| # # "ai_edit_daily_count": final_daily_entries, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # # ------------------------------------------------- | |
| # # # STEP 3: Try updating existing subCategory | |
| # # # ------------------------------------------------- | |
| # # update_result = await media_clicks_collection.update_one( | |
| # # { | |
| # # "userId": user_oid, | |
| # # "subCategories.subCategoryId": subcategory_oid | |
| # # }, | |
| # # { | |
| # # "$inc": { | |
| # # "subCategories.$.click_count": 1, | |
| # # "ai_edit_complete": 1 | |
| # # }, | |
| # # "$set": { | |
| # # "subCategories.$.lastClickedAt": now, | |
| # # "ai_edit_last_date": now, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # # ------------------------------------------------- | |
| # # # STEP 4: Push subCategory if missing | |
| # # # ------------------------------------------------- | |
| # # if update_result.matched_count == 0: | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$inc": { | |
| # # "ai_edit_complete": 1 | |
| # # }, | |
| # # "$set": { | |
| # # "ai_edit_last_date": now, | |
| # # "updatedAt": now | |
| # # }, | |
| # # "$push": { | |
| # # "subCategories": { | |
| # # "subCategoryId": subcategory_oid, | |
| # # "click_count": 1, | |
| # # "lastClickedAt": now | |
| # # } | |
| # # } | |
| # # } | |
| # # ) | |
| # # # ------------------------------------------------- | |
| # # # STEP 5: Sort subCategories by lastClickedAt (ascending - oldest first) | |
| # # # ------------------------------------------------- | |
| # # user_doc = await media_clicks_collection.find_one({"userId": user_oid}) | |
| # # if user_doc and "subCategories" in user_doc: | |
| # # subcategories = user_doc["subCategories"] | |
| # # # Sort by lastClickedAt in ascending order (oldest first) | |
| # # # Handle missing or None dates by using datetime.min | |
| # # subcategories_sorted = sorted( | |
| # # subcategories, | |
| # # key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| # # ) | |
| # # # Update with sorted array | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$set": { | |
| # # "subCategories": subcategories_sorted, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # logger.info( | |
| # # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| # # user_id, | |
| # # str(subcategory_oid) | |
| # # ) | |
| # # except Exception as media_err: | |
| # # logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # # elif user_id and media_clicks_collection is None: | |
| # # logger.warning("Media clicks collection unavailable; skipping media click tracking") | |
| # # # # ------------------------------------------------------------------ | |
| # # # # CASE 2 : target_category_id → DigitalOcean path (unchanged logic) | |
| # # # # ------------------------------------------------------------------ | |
| # # if target_category_id: | |
| # # client = get_spaces_client() | |
| # # base_prefix = "faceswap/target/" | |
| # # resp = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| # # ) | |
| # # # Extract categories from the CommonPrefixes | |
| # # categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| # # target_url = None | |
| # # # --- FIX STARTS HERE --- | |
| # # for category in categories: | |
| # # original_prefix = f"faceswap/target/{category}/original/" | |
| # # thumb_prefix = f"faceswap/target/{category}/thumb/" # Keep for file list check (optional but safe) | |
| # # # List objects in original/ | |
| # # original_objects = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| # # ).get("Contents", []) | |
| # # # List objects in thumb/ (optional: for the old code's extra check) | |
| # # thumb_objects = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| # # ).get("Contents", []) | |
| # # # Extract only the filenames and filter for .png | |
| # # original_filenames = sorted([ | |
| # # obj["Key"].split("/")[-1] for obj in original_objects | |
| # # if obj["Key"].split("/")[-1].endswith(".png") | |
| # # ]) | |
| # # thumb_filenames = [ | |
| # # obj["Key"].split("/")[-1] for obj in thumb_objects | |
| # # ] | |
| # # # Replicate the old indexing logic based on sorted filenames | |
| # # for idx, filename in enumerate(original_filenames, start=1): | |
| # # cid = f"{category.lower()}image_{idx}" | |
| # # # Optional: Replicate the thumb file check for 100% parity | |
| # # # if filename in thumb_filenames and cid == target_category_id: | |
| # # # Simpler check just on the ID, assuming thumb files are present | |
| # # if cid == target_category_id: | |
| # # # Construct the final target URL using the full prefix and the filename | |
| # # target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| # # break | |
| # # if target_url: | |
| # # break | |
| # # # --- FIX ENDS HERE --- | |
| # # if not target_url: | |
| # # raise HTTPException(404, "Target categoryId not found") | |
| # # # # ------------------------------------------------------------------ | |
| # # # # DOWNLOAD TARGET IMAGE | |
| # # # # ------------------------------------------------------------------ | |
| # # async with httpx.AsyncClient(timeout=30.0) as client: | |
| # # response = await client.get(target_url) | |
| # # response.raise_for_status() | |
| # # tgt_bytes = response.content | |
| # # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if src_bgr is None or tgt_bgr is None: | |
| # # raise HTTPException(400, "Invalid image data") | |
| # # src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| # # tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # # # ------------------------------------------------------------------ | |
| # # # FACE SWAP EXECUTION | |
| # # # ------------------------------------------------------------------ | |
| # # final_img, final_path, err = face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # # # #--------------------Version 2.0 ----------------------------------------# | |
| # # # final_img, final_path, err = enhanced_face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # # # #--------------------Version 2.0 ----------------------------------------# | |
| # # if err: | |
| # # raise HTTPException(500, err) | |
| # # with open(final_path, "rb") as f: | |
| # # result_bytes = f.read() | |
| # # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| # # result_url = upload_to_spaces(result_bytes, result_key) | |
| # # # ------------------------------------------------- | |
| # # # COMPRESS IMAGE (2–3 MB target) | |
| # # # ------------------------------------------------- | |
| # # compressed_bytes = compress_image( | |
| # # image_bytes=result_bytes, | |
| # # max_size=(1280, 1280), | |
| # # quality=72 | |
| # # ) | |
| # # compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| # # compressed_url = upload_to_spaces( | |
| # # compressed_bytes, | |
| # # compressed_key, | |
| # # content_type="image/jpeg" | |
| # # ) | |
| # # end_time = datetime.utcnow() | |
| # # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # # if database is not None: | |
| # # log_entry = { | |
| # # "endpoint": "/face-swap", | |
| # # "status": "success", | |
| # # "response_time_ms": response_time_ms, | |
| # # "timestamp": end_time | |
| # # } | |
| # # if appname: | |
| # # log_entry["appname"] = appname | |
| # # await database.api_logs.insert_one(log_entry) | |
| # # return { | |
| # # "result_key": result_key, | |
| # # "result_url": result_url, | |
| # # "Compressed_Image_URL": compressed_url | |
| # # } | |
| # # except Exception as e: | |
| # # end_time = datetime.utcnow() | |
| # # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # # if database is not None: | |
| # # log_entry = { | |
| # # "endpoint": "/face-swap", | |
| # # "status": "fail", | |
| # # "response_time_ms": response_time_ms, | |
| # # "timestamp": end_time, | |
| # # "error": str(e) | |
| # # } | |
| # # if appname: | |
| # # log_entry["appname"] = appname | |
| # # await database.api_logs.insert_one(log_entry) | |
| # # raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| # # @fastapi_app.get("/preview/{result_key:path}") | |
| # # async def preview_result(result_key: str): | |
| # # try: | |
| # # img_bytes = download_from_spaces(result_key) | |
| # # except Exception: | |
| # # raise HTTPException(status_code=404, detail="Result not found") | |
| # # return Response( | |
| # # content=img_bytes, | |
| # # media_type="image/png", | |
| # # headers={"Content-Disposition": "inline; filename=result.png"} | |
| # # ) | |
| # # @fastapi_app.post("/multi-face-swap", dependencies=[Depends(verify_token)]) | |
| # # async def multi_face_swap_api( | |
| # # source_image: UploadFile = File(...), | |
| # # target_image: UploadFile = File(...) | |
| # # ): | |
| # # start_time = datetime.utcnow() | |
| # # try: | |
| # # # ----------------------------- | |
| # # # Read images | |
| # # # ----------------------------- | |
| # # src_bytes = await source_image.read() | |
| # # tgt_bytes = await target_image.read() | |
| # # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if src_bgr is None or tgt_bgr is None: | |
| # # raise HTTPException(400, "Invalid image data") | |
| # # src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| # # tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # # # ----------------------------- | |
| # # # Multi-face swap | |
| # # # ----------------------------- | |
| # # swapped_rgb = multi_face_swap(src_rgb, tgt_rgb) | |
| # # # ----------------------------- | |
| # # # 🔥 MANDATORY ENHANCEMENT | |
| # # # ----------------------------- | |
| # # final_rgb = mandatory_enhancement(swapped_rgb) | |
| # # final_bgr = cv2.cvtColor(final_rgb, cv2.COLOR_RGB2BGR) | |
| # # # ----------------------------- | |
| # # # Save temp result | |
| # # # ----------------------------- | |
| # # temp_dir = tempfile.mkdtemp(prefix="multi_faceswap_") | |
| # # result_path = os.path.join(temp_dir, "result.png") | |
| # # cv2.imwrite(result_path, final_bgr) | |
| # # with open(result_path, "rb") as f: | |
| # # result_bytes = f.read() | |
| # # # ----------------------------- | |
| # # # Upload | |
| # # # ----------------------------- | |
| # # result_key = f"faceswap/multi/{uuid.uuid4().hex}.png" | |
| # # result_url = upload_to_spaces( | |
| # # result_bytes, | |
| # # result_key, | |
| # # content_type="image/png" | |
| # # ) | |
| # # return { | |
| # # "result_key": result_key, | |
| # # "result_url": result_url | |
| # # } | |
| # # except Exception as e: | |
| # # raise HTTPException(status_code=500, detail=str(e)) | |
| # # @fastapi_app.post("/face-swap-couple", dependencies=[Depends(verify_token)]) | |
| # # async def face_swap_api( | |
| # # image1: UploadFile = File(...), | |
| # # image2: Optional[UploadFile] = File(None), | |
| # # target_category_id: str = Form(None), | |
| # # new_category_id: str = Form(None), | |
| # # user_id: Optional[str] = Form(None), | |
| # # appname: Optional[str] = Form(None), | |
| # # credentials: HTTPAuthorizationCredentials = Security(security) | |
| # # ): | |
| # # """ | |
| # # Production-ready face swap endpoint supporting: | |
| # # - Multiple source images (image1 + optional image2) | |
| # # - Gender-based pairing | |
| # # - Merged faces from multiple sources | |
| # # - Mandatory CodeFormer enhancement | |
| # # """ | |
| # # start_time = datetime.utcnow() | |
| # # try: | |
| # # # ----------------------------- | |
| # # # Validate input | |
| # # # ----------------------------- | |
| # # if target_category_id == "": | |
| # # target_category_id = None | |
| # # if new_category_id == "": | |
| # # new_category_id = None | |
| # # if user_id == "": | |
| # # user_id = None | |
| # # media_clicks_collection = get_media_clicks_collection(appname) | |
| # # if target_category_id and new_category_id: | |
| # # raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| # # if not target_category_id and not new_category_id: | |
| # # raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # # logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # # # ----------------------------- | |
| # # # Read source images | |
| # # # ----------------------------- | |
| # # src_images = [] | |
| # # img1_bytes = await image1.read() | |
| # # src1 = cv2.imdecode(np.frombuffer(img1_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if src1 is None: | |
| # # raise HTTPException(400, "Invalid image1 data") | |
| # # src_images.append(cv2.cvtColor(src1, cv2.COLOR_BGR2RGB)) | |
| # # if image2: | |
| # # img2_bytes = await image2.read() | |
| # # src2 = cv2.imdecode(np.frombuffer(img2_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if src2 is not None: | |
| # # src_images.append(cv2.cvtColor(src2, cv2.COLOR_BGR2RGB)) | |
| # # # ----------------------------- | |
| # # # Resolve target image | |
| # # # ----------------------------- | |
| # # target_url = None | |
| # # if new_category_id: | |
| # # doc = await subcategories_col.find_one({ | |
| # # "asset_images._id": ObjectId(new_category_id) | |
| # # }) | |
| # # if not doc: | |
| # # raise HTTPException(404, "Asset image not found in database") | |
| # # asset = next( | |
| # # (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| # # None | |
| # # ) | |
| # # if not asset: | |
| # # raise HTTPException(404, "Asset image URL not found") | |
| # # target_url = asset["url"] | |
| # # subcategory_oid = doc["_id"] | |
| # # if user_id and media_clicks_collection is not None: | |
| # # try: | |
| # # user_id_clean = user_id.strip() | |
| # # if not user_id_clean: | |
| # # raise ValueError("user_id cannot be empty") | |
| # # try: | |
| # # user_oid = ObjectId(user_id_clean) | |
| # # except (InvalidId, ValueError): | |
| # # logger.error(f"Invalid user_id format: {user_id_clean}") | |
| # # raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| # # now = datetime.utcnow() | |
| # # # Step 1: ensure root document exists | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$setOnInsert": { | |
| # # "userId": user_oid, | |
| # # "createdAt": now, | |
| # # "ai_edit_complete": 0, | |
| # # "ai_edit_daily_count": [] | |
| # # } | |
| # # }, | |
| # # upsert=True | |
| # # ) | |
| # # # Step 2: handle daily usage (binary, no duplicates) | |
| # # doc = await media_clicks_collection.find_one( | |
| # # {"userId": user_oid}, | |
| # # {"ai_edit_daily_count": 1} | |
| # # ) | |
| # # daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # # today_date = datetime(now.year, now.month, now.day) | |
| # # daily_map = {} | |
| # # for entry in daily_entries: | |
| # # d = entry["date"] | |
| # # if isinstance(d, datetime): | |
| # # d = datetime(d.year, d.month, d.day) | |
| # # daily_map[d] = entry["count"] | |
| # # last_date = max(daily_map.keys()) if daily_map else None | |
| # # if last_date != today_date: | |
| # # daily_map[today_date] = 1 | |
| # # final_daily_entries = [ | |
| # # {"date": d, "count": daily_map[d]} | |
| # # for d in sorted(daily_map.keys()) | |
| # # ] | |
| # # final_daily_entries = final_daily_entries[-32:] | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$set": { | |
| # # "ai_edit_daily_count": final_daily_entries, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # # Step 3: try updating existing subCategory | |
| # # update_result = await media_clicks_collection.update_one( | |
| # # { | |
| # # "userId": user_oid, | |
| # # "subCategories.subCategoryId": subcategory_oid | |
| # # }, | |
| # # { | |
| # # "$inc": { | |
| # # "subCategories.$.click_count": 1, | |
| # # "ai_edit_complete": 1 | |
| # # }, | |
| # # "$set": { | |
| # # "subCategories.$.lastClickedAt": now, | |
| # # "ai_edit_last_date": now, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # # Step 4: push subCategory if missing | |
| # # if update_result.matched_count == 0: | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$inc": { | |
| # # "ai_edit_complete": 1 | |
| # # }, | |
| # # "$set": { | |
| # # "ai_edit_last_date": now, | |
| # # "updatedAt": now | |
| # # }, | |
| # # "$push": { | |
| # # "subCategories": { | |
| # # "subCategoryId": subcategory_oid, | |
| # # "click_count": 1, | |
| # # "lastClickedAt": now | |
| # # } | |
| # # } | |
| # # } | |
| # # ) | |
| # # # Step 5: sort subCategories by lastClickedAt (ascending) | |
| # # user_doc = await media_clicks_collection.find_one({"userId": user_oid}) | |
| # # if user_doc and "subCategories" in user_doc: | |
| # # subcategories = user_doc["subCategories"] | |
| # # subcategories_sorted = sorted( | |
| # # subcategories, | |
| # # key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| # # ) | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$set": { | |
| # # "subCategories": subcategories_sorted, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # logger.info( | |
| # # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| # # user_id, | |
| # # str(subcategory_oid) | |
| # # ) | |
| # # except Exception as media_err: | |
| # # logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # # elif user_id and media_clicks_collection is None: | |
| # # logger.warning("Media clicks collection unavailable; skipping media click tracking") | |
| # # if target_category_id: | |
| # # client = get_spaces_client() | |
| # # base_prefix = "faceswap/target/" | |
| # # resp = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| # # ) | |
| # # categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| # # for category in categories: | |
| # # original_prefix = f"faceswap/target/{category}/original/" | |
| # # thumb_prefix = f"faceswap/target/{category}/thumb/" | |
| # # original_objects = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| # # ).get("Contents", []) | |
| # # thumb_objects = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| # # ).get("Contents", []) | |
| # # original_filenames = sorted([ | |
| # # obj["Key"].split("/")[-1] for obj in original_objects | |
| # # if obj["Key"].split("/")[-1].endswith(".png") | |
| # # ]) | |
| # # for idx, filename in enumerate(original_filenames, start=1): | |
| # # cid = f"{category.lower()}image_{idx}" | |
| # # if cid == target_category_id: | |
| # # target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| # # break | |
| # # if target_url: | |
| # # break | |
| # # if not target_url: | |
| # # raise HTTPException(404, "Target categoryId not found") | |
| # # async with httpx.AsyncClient(timeout=30.0) as client: | |
| # # response = await client.get(target_url) | |
| # # response.raise_for_status() | |
| # # tgt_bytes = response.content | |
| # # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if tgt_bgr is None: | |
| # # raise HTTPException(400, "Invalid target image data") | |
| # # # ----------------------------- | |
| # # # Merge all source faces | |
| # # # ----------------------------- | |
| # # all_src_faces = [] | |
| # # for img in src_images: | |
| # # faces = face_analysis_app.get(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| # # all_src_faces.extend(faces) | |
| # # if not all_src_faces: | |
| # # raise HTTPException(400, "No faces detected in source images") | |
| # # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # # if not tgt_faces: | |
| # # raise HTTPException(400, "No faces detected in target image") | |
| # # # ----------------------------- | |
| # # # Gender-based pairing | |
| # # # ----------------------------- | |
| # # def face_sort_key(face): | |
| # # x1, y1, x2, y2 = face.bbox | |
| # # area = (x2 - x1) * (y2 - y1) | |
| # # cx = (x1 + x2) / 2 | |
| # # return (-area, cx) | |
| # # # Separate by gender | |
| # # src_male = sorted([f for f in all_src_faces if f.gender == 1], key=face_sort_key) | |
| # # src_female = sorted([f for f in all_src_faces if f.gender == 0], key=face_sort_key) | |
| # # tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| # # tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| # # pairs = [] | |
| # # for s, t in zip(src_male, tgt_male): | |
| # # pairs.append((s, t)) | |
| # # for s, t in zip(src_female, tgt_female): | |
| # # pairs.append((s, t)) | |
| # # # fallback if gender mismatch | |
| # # if not pairs: | |
| # # src_all = sorted(all_src_faces, key=face_sort_key) | |
| # # tgt_all = sorted(tgt_faces, key=face_sort_key) | |
| # # pairs = list(zip(src_all, tgt_all)) | |
| # # # ----------------------------- | |
| # # # Perform face swap | |
| # # # ----------------------------- | |
| # # with swap_lock: | |
| # # result_img = tgt_bgr.copy() | |
| # # for src_face, _ in pairs: | |
| # # if face_analysis_app is None: | |
| # # raise HTTPException(status_code=500, detail="Face analysis models not initialized. Please ensure models are downloaded.") | |
| # # current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| # # candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| # # target_face = candidates[0] | |
| # # if swapper is None: | |
| # # raise HTTPException(status_code=500, detail="Face swap models not initialized. Please ensure models are downloaded.") | |
| # # result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| # # result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| # # # ----------------------------- | |
| # # # Mandatory enhancement | |
| # # # ----------------------------- | |
| # # enhanced_rgb = mandatory_enhancement(result_rgb) | |
| # # enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| # # # ----------------------------- | |
| # # # Save, upload, compress | |
| # # # ----------------------------- | |
| # # temp_dir = tempfile.mkdtemp(prefix="faceswap_") | |
| # # final_path = os.path.join(temp_dir, "result.png") | |
| # # cv2.imwrite(final_path, enhanced_bgr) | |
| # # with open(final_path, "rb") as f: | |
| # # result_bytes = f.read() | |
| # # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| # # result_url = upload_to_spaces(result_bytes, result_key) | |
| # # compressed_bytes = compress_image(result_bytes, max_size=(1280, 1280), quality=72) | |
| # # compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| # # compressed_url = upload_to_spaces(compressed_bytes, compressed_key, content_type="image/jpeg") | |
| # # # ----------------------------- | |
| # # # Log API usage | |
| # # # ----------------------------- | |
| # # end_time = datetime.utcnow() | |
| # # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # # if database is not None: | |
| # # log_entry = { | |
| # # "endpoint": "/face-swap-couple", | |
| # # "status": "success", | |
| # # "response_time_ms": response_time_ms, | |
| # # "timestamp": end_time | |
| # # } | |
| # # if appname: | |
| # # log_entry["appname"] = appname | |
| # # await database.api_logs.insert_one(log_entry) | |
| # # return { | |
| # # "result_key": result_key, | |
| # # "result_url": result_url, | |
| # # "compressed_url": compressed_url | |
| # # } | |
| # # except Exception as e: | |
| # # end_time = datetime.utcnow() | |
| # # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # # if database is not None: | |
| # # log_entry = { | |
| # # "endpoint": "/face-swap-couple", | |
| # # "status": "fail", | |
| # # "response_time_ms": response_time_ms, | |
| # # "timestamp": end_time, | |
| # # "error": str(e) | |
| # # } | |
| # # if appname: | |
| # # log_entry["appname"] = appname | |
| # # await database.api_logs.insert_one(log_entry) | |
| # # raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| # # # --------------------- Mount Gradio --------------------- | |
| # # multi_faceswap_app = build_multi_faceswap_gradio() | |
| # # fastapi_app = mount_gradio_app( | |
| # # fastapi_app, | |
| # # multi_faceswap_app, | |
| # # path="/gradio-couple-faceswap" | |
| # # ) | |
| # # if __name__ == "__main__": | |
| # # uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |
| # # --------------------- List Images Endpoint --------------------- | |
| # # import os | |
| # # os.environ["OMP_NUM_THREADS"] = "1" | |
| # # import shutil | |
| # # import uuid | |
| # # import cv2 | |
| # # import numpy as np | |
| # # import threading | |
| # # import subprocess | |
| # # import logging | |
| # # import tempfile | |
| # # import sys | |
| # # from datetime import datetime,timedelta | |
| # # import tempfile | |
| # # import insightface | |
| # # from insightface.app import FaceAnalysis | |
| # # from huggingface_hub import hf_hub_download | |
| # # from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Depends, Security, Form | |
| # # from fastapi.responses import RedirectResponse | |
| # # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| # # from motor.motor_asyncio import AsyncIOMotorClient | |
| # # from bson import ObjectId | |
| # # from bson.errors import InvalidId | |
| # # import httpx | |
| # # import uvicorn | |
| # # import gradio as gr | |
| # # from gradio import mount_gradio_app | |
| # # from PIL import Image | |
| # # import io | |
| # # # from scipy import ndimage | |
| # # # DigitalOcean Spaces | |
| # # import boto3 | |
| # # from botocore.client import Config | |
| # # from typing import Optional | |
| # # # --------------------- Logging --------------------- | |
| # # logging.basicConfig(level=logging.INFO) | |
| # # logger = logging.getLogger(__name__) | |
| # # # --------------------- Secrets & Paths --------------------- | |
| # # REPO_ID = "HariLogicgo/face_swap_models" | |
| # # MODELS_DIR = "./models" | |
| # # os.makedirs(MODELS_DIR, exist_ok=True) | |
| # # HF_TOKEN = os.getenv("HF_TOKEN") | |
| # # API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| # # DO_SPACES_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| # # DO_SPACES_ENDPOINT = f"https://{DO_SPACES_REGION}.digitaloceanspaces.com" | |
| # # DO_SPACES_KEY = os.getenv("DO_SPACES_KEY") | |
| # # DO_SPACES_SECRET = os.getenv("DO_SPACES_SECRET") | |
| # # DO_SPACES_BUCKET = os.getenv("DO_SPACES_BUCKET") | |
| # # # NEW admin DB (with error handling for missing env vars) | |
| # # ADMIN_MONGO_URL = os.getenv("ADMIN_MONGO_URL") | |
| # # admin_client = None | |
| # # admin_db = None | |
| # # subcategories_col = None | |
| # # media_clicks_col = None | |
| # # if ADMIN_MONGO_URL: | |
| # # try: | |
| # # admin_client = AsyncIOMotorClient(ADMIN_MONGO_URL) | |
| # # admin_db = admin_client.adminPanel | |
| # # subcategories_col = admin_db.subcategories | |
| # # media_clicks_col = admin_db.media_clicks | |
| # # except Exception as e: | |
| # # logger.warning(f"MongoDB admin connection failed (optional): {e}") | |
| # # # Collage Maker DB (optional) | |
| # # COLLAGE_MAKER_DB_URL = os.getenv("COLLAGE_MAKER_DB_URL") | |
| # # collage_maker_client = None | |
| # # collage_maker_db = None | |
| # # collage_media_clicks_col = None | |
| # # if COLLAGE_MAKER_DB_URL: | |
| # # try: | |
| # # collage_maker_client = AsyncIOMotorClient(COLLAGE_MAKER_DB_URL) | |
| # # collage_maker_db = collage_maker_client.adminPanel | |
| # # collage_media_clicks_col = collage_maker_db.media_clicks | |
| # # except Exception as e: | |
| # # logger.warning(f"MongoDB ai-enhancer connection failed (optional): {e}") | |
| # # # AI Enhancer DB (optional) | |
| # # AI_ENHANCER_DB_URL = os.getenv("AI_ENHANCER_DB_URL") | |
| # # ai_enhancer_client = None | |
| # # ai_enhancer_db = None | |
| # # ai_enhancer_media_clicks_col = None | |
| # # ai_enhancer_subcategories_col = None | |
| # # if AI_ENHANCER_DB_URL: | |
| # # try: | |
| # # ai_enhancer_client = AsyncIOMotorClient(AI_ENHANCER_DB_URL) | |
| # # ai_enhancer_db = ai_enhancer_client.test # 🔴 test database | |
| # # ai_enhancer_media_clicks_col = ai_enhancer_db.media_clicks | |
| # # ai_enhancer_subcategories_col = ai_enhancer_db.subcategories | |
| # # except Exception as e: | |
| # # logger.warning(f"MongoDB ai-enhancer connection failed (optional): {e}") | |
| # # def get_media_clicks_collection(appname: Optional[str] = None): | |
| # # """Return the media clicks collection for the given app (default: main admin).""" | |
| # # if appname and str(appname).strip().lower() == "collage-maker": | |
| # # return collage_media_clicks_col | |
| # # return media_clicks_col | |
| # # # OLD logs DB | |
| # # MONGODB_URL = os.getenv("MONGODB_URL") | |
| # # client = None | |
| # # database = None | |
| # # # --------------------- Download Models --------------------- | |
| # # def download_models(): | |
| # # try: | |
| # # logger.info("Downloading models...") | |
| # # inswapper_path = hf_hub_download( | |
| # # repo_id=REPO_ID, | |
| # # filename="models/inswapper_128.onnx", | |
| # # repo_type="model", | |
| # # local_dir=MODELS_DIR, | |
| # # token=HF_TOKEN | |
| # # ) | |
| # # buffalo_files = ["1k3d68.onnx", "2d106det.onnx", "genderage.onnx", "det_10g.onnx", "w600k_r50.onnx"] | |
| # # for f in buffalo_files: | |
| # # hf_hub_download( | |
| # # repo_id=REPO_ID, | |
| # # filename=f"models/buffalo_l/" + f, | |
| # # repo_type="model", | |
| # # local_dir=MODELS_DIR, | |
| # # token=HF_TOKEN | |
| # # ) | |
| # # logger.info("Models downloaded successfully.") | |
| # # return inswapper_path | |
| # # except Exception as e: | |
| # # logger.error(f"Model download failed: {e}") | |
| # # raise | |
| # # try: | |
| # # inswapper_path = download_models() | |
| # # # --------------------- Face Analysis + Swapper --------------------- | |
| # # providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| # # face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| # # face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| # # swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| # # logger.info("Face analysis models loaded successfully") | |
| # # except Exception as e: | |
| # # logger.error(f"Failed to initialize face analysis models: {e}") | |
| # # # Set defaults to prevent crash | |
| # # inswapper_path = None | |
| # # face_analysis_app = None | |
| # # swapper = None | |
| # # # --------------------- CodeFormer --------------------- | |
| # # CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| # # def ensure_codeformer(): | |
| # # try: | |
| # # if not os.path.exists("CodeFormer"): | |
| # # logger.info("CodeFormer not found, cloning repository...") | |
| # # subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| # # subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=False) # Non-critical deps | |
| # # # Always ensure BasicSR is installed from local directory | |
| # # # This is needed for Hugging Face Spaces where BasicSR can't be installed from GitHub | |
| # # if os.path.exists("CodeFormer/basicsr/setup.py"): | |
| # # logger.info("Installing BasicSR from local directory...") | |
| # # subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True) | |
| # # logger.info("BasicSR installed successfully") | |
| # # # Install realesrgan after BasicSR is installed (realesrgan depends on BasicSR) | |
| # # # This must be done after BasicSR installation to avoid PyPI install issues | |
| # # try: | |
| # # import realesrgan | |
| # # logger.info("RealESRGAN already installed") | |
| # # except ImportError: | |
| # # logger.info("Installing RealESRGAN...") | |
| # # subprocess.run("pip install --no-cache-dir realesrgan", shell=True, check=True) | |
| # # logger.info("RealESRGAN installed successfully") | |
| # # # Download models if CodeFormer exists (fixed logic) | |
| # # if os.path.exists("CodeFormer"): | |
| # # try: | |
| # # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=False, timeout=300) | |
| # # except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| # # logger.warning("Failed to download facelib models (optional)") | |
| # # try: | |
| # # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=False, timeout=300) | |
| # # except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| # # logger.warning("Failed to download CodeFormer models (optional)") | |
| # # except Exception as e: | |
| # # logger.error(f"CodeFormer setup failed: {e}") | |
| # # logger.warning("Continuing without CodeFormer features...") | |
| # # ensure_codeformer() | |
| # # # --------------------- FastAPI --------------------- | |
| # # fastapi_app = FastAPI() | |
| # # @fastapi_app.on_event("startup") | |
| # # async def startup_db(): | |
| # # global client, database | |
| # # if MONGODB_URL: | |
| # # try: | |
| # # logger.info("Initializing MongoDB for API logs...") | |
| # # client = AsyncIOMotorClient(MONGODB_URL) | |
| # # database = client.FaceSwap | |
| # # logger.info("MongoDB initialized for API logs") | |
| # # except Exception as e: | |
| # # logger.warning(f"MongoDB connection failed (optional): {e}") | |
| # # client = None | |
| # # database = None | |
| # # else: | |
| # # logger.warning("MONGODB_URL not set, skipping MongoDB initialization") | |
| # # @fastapi_app.on_event("shutdown") | |
| # # async def shutdown_db(): | |
| # # global client, admin_client, collage_maker_client | |
| # # if client is not None: | |
| # # client.close() | |
| # # logger.info("MongoDB connection closed") | |
| # # if admin_client is not None: | |
| # # admin_client.close() | |
| # # logger.info("Admin MongoDB connection closed") | |
| # # if collage_maker_client is not None: | |
| # # collage_maker_client.close() | |
| # # logger.info("Collage Maker MongoDB connection closed") | |
| # # # --------------------- Auth --------------------- | |
| # # security = HTTPBearer() | |
| # # def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| # # if credentials.credentials != API_SECRET_TOKEN: | |
| # # raise HTTPException(status_code=401, detail="Invalid or missing token") | |
| # # return credentials.credentials | |
| # # # --------------------- DB Selector --------------------- | |
| # # # def get_media_clicks_collection(appname: Optional[str] = None): | |
| # # # """ | |
| # # # Returns the correct media_clicks collection based on appname. | |
| # # # Defaults to the primary admin database when no appname is provided | |
| # # # or when the requested database is unavailable. | |
| # # # """ | |
| # # # if appname: | |
| # # # normalized = appname.strip().lower() | |
| # # # if normalized == "collage-maker": | |
| # # # if collage_media_clicks_col is not None: | |
| # # # return collage_media_clicks_col | |
| # # # logger.warning("COLLAGE_MAKER_DB_URL not configured; falling back to default media_clicks collection") | |
| # # # return media_clicks_col | |
| # # def get_app_db_collections(appname: Optional[str] = None): | |
| # # """ | |
| # # Returns (media_clicks_collection, subcategories_collection) | |
| # # based on appname. | |
| # # """ | |
| # # if appname: | |
| # # app = appname.strip().lower() | |
| # # if app == "collage-maker": | |
| # # if collage_media_clicks_col is not None and subcategories_col is not None: | |
| # # return collage_media_clicks_col, subcategories_col | |
| # # logger.warning("Collage-maker DB not configured, falling back to admin") | |
| # # elif app == "ai-enhancer": | |
| # # if ai_enhancer_media_clicks_col is not None and ai_enhancer_subcategories_col is not None: | |
| # # return ai_enhancer_media_clicks_col, ai_enhancer_subcategories_col | |
| # # logger.warning("AI-Enhancer DB not configured, falling back to admin") | |
| # # # default fallback | |
| # # return media_clicks_col, subcategories_col | |
| # # # --------------------- Logging API Hits --------------------- | |
| # # async def log_faceswap_hit(token: str, status: str = "success"): | |
| # # global database | |
| # # if database is None: | |
| # # return | |
| # # await database.api_logs.insert_one({ | |
| # # "token": token, | |
| # # "endpoint": "/faceswap", | |
| # # "status": status, | |
| # # "timestamp": datetime.utcnow() | |
| # # }) | |
| # # # --------------------- Face Swap Pipeline --------------------- | |
| # # swap_lock = threading.Lock() | |
| # # def enhance_image_with_codeformer(rgb_img, temp_dir=None): | |
| # # if temp_dir is None: | |
| # # temp_dir = os.path.join(tempfile.gettempdir(), f"enhance_{uuid.uuid4().hex[:8]}") | |
| # # os.makedirs(temp_dir, exist_ok=True) | |
| # # input_path = os.path.join(temp_dir, "input.jpg") | |
| # # cv2.imwrite(input_path, cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR)) | |
| # # python_cmd = sys.executable if sys.executable else "python3" | |
| # # cmd = ( | |
| # # f"{python_cmd} {CODEFORMER_PATH} " | |
| # # f"-w 0.7 " | |
| # # f"--input_path {input_path} " | |
| # # f"--output_path {temp_dir} " | |
| # # f"--bg_upsampler realesrgan " | |
| # # f"--face_upsample" | |
| # # ) | |
| # # result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| # # if result.returncode != 0: | |
| # # raise RuntimeError(result.stderr) | |
| # # final_dir = os.path.join(temp_dir, "final_results") | |
| # # files = [f for f in os.listdir(final_dir) if f.endswith(".png")] | |
| # # if not files: | |
| # # raise RuntimeError("No enhanced output") | |
| # # final_path = os.path.join(final_dir, files[0]) | |
| # # enhanced = cv2.imread(final_path) | |
| # # return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB) | |
| # # def multi_face_swap(src_img, tgt_img): | |
| # # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # # src_faces = face_analysis_app.get(src_bgr) | |
| # # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # # if not src_faces or not tgt_faces: | |
| # # raise ValueError("No faces detected") | |
| # # def face_sort_key(face): | |
| # # x1, y1, x2, y2 = face.bbox | |
| # # area = (x2 - x1) * (y2 - y1) | |
| # # cx = (x1 + x2) / 2 | |
| # # return (-area, cx) | |
| # # # Split by gender | |
| # # src_male = [f for f in src_faces if f.gender == 1] | |
| # # src_female = [f for f in src_faces if f.gender == 0] | |
| # # tgt_male = [f for f in tgt_faces if f.gender == 1] | |
| # # tgt_female = [f for f in tgt_faces if f.gender == 0] | |
| # # # Sort inside gender groups | |
| # # src_male = sorted(src_male, key=face_sort_key) | |
| # # src_female = sorted(src_female, key=face_sort_key) | |
| # # tgt_male = sorted(tgt_male, key=face_sort_key) | |
| # # tgt_female = sorted(tgt_female, key=face_sort_key) | |
| # # # Build final swap pairs | |
| # # pairs = [] | |
| # # for s, t in zip(src_male, tgt_male): | |
| # # pairs.append((s, t)) | |
| # # for s, t in zip(src_female, tgt_female): | |
| # # pairs.append((s, t)) | |
| # # # Fallback if gender mismatch | |
| # # if not pairs: | |
| # # src_faces = sorted(src_faces, key=face_sort_key) | |
| # # tgt_faces = sorted(tgt_faces, key=face_sort_key) | |
| # # pairs = list(zip(src_faces, tgt_faces)) | |
| # # result_img = tgt_bgr.copy() | |
| # # for src_face, _ in pairs: | |
| # # # 🔁 re-detect current target faces | |
| # # if face_analysis_app is None: | |
| # # raise ValueError("Face analysis models not initialized. Please ensure models are downloaded.") | |
| # # current_faces = face_analysis_app.get(result_img) | |
| # # current_faces = sorted(current_faces, key=face_sort_key) | |
| # # # choose best matching gender | |
| # # candidates = [ | |
| # # f for f in current_faces if f.gender == src_face.gender | |
| # # ] or current_faces | |
| # # target_face = candidates[0] | |
| # # if swapper is None: | |
| # # raise ValueError("Face swap models not initialized. Please ensure models are downloaded.") | |
| # # result_img = swapper.get( | |
| # # result_img, | |
| # # target_face, | |
| # # src_face, | |
| # # paste_back=True | |
| # # ) | |
| # # return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| # # def face_swap_and_enhance(src_img, tgt_img, temp_dir=None): | |
| # # try: | |
| # # with swap_lock: | |
| # # # Use a temp dir for intermediate files | |
| # # if temp_dir is None: | |
| # # temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_work_{uuid.uuid4().hex[:8]}") | |
| # # if os.path.exists(temp_dir): | |
| # # shutil.rmtree(temp_dir) | |
| # # os.makedirs(temp_dir, exist_ok=True) | |
| # # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # # src_faces = face_analysis_app.get(src_bgr) | |
| # # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # # if face_analysis_app is None: | |
| # # return None, None, "❌ Face analysis models not initialized. Please ensure models are downloaded." | |
| # # if not src_faces or not tgt_faces: | |
| # # return None, None, "❌ Face not detected in one of the images" | |
| # # swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg") | |
| # # if swapper is None: | |
| # # return None, None, "❌ Face swap models not initialized. Please ensure models are downloaded." | |
| # # swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| # # if swapped_bgr is None: | |
| # # return None, None, "❌ Face swap failed" | |
| # # cv2.imwrite(swapped_path, swapped_bgr) | |
| # # python_cmd = sys.executable if sys.executable else "python3" | |
| # # cmd = f"{python_cmd} {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample" | |
| # # result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| # # if result.returncode != 0: | |
| # # return None, None, f"❌ CodeFormer failed:\n{result.stderr}" | |
| # # final_results_dir = os.path.join(temp_dir, "final_results") | |
| # # final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")] | |
| # # if not final_files: | |
| # # return None, None, "❌ No enhanced image found" | |
| # # final_path = os.path.join(final_results_dir, final_files[0]) | |
| # # final_img_bgr = cv2.imread(final_path) | |
| # # if final_img_bgr is None: | |
| # # return None, None, "❌ Failed to read enhanced image file" | |
| # # final_img = cv2.cvtColor(final_img_bgr, cv2.COLOR_BGR2RGB) | |
| # # return final_img, final_path, "" | |
| # # except Exception as e: | |
| # # return None, None, f"❌ Error: {str(e)}" | |
| # # def compress_image( | |
| # # image_bytes: bytes, | |
| # # max_size=(1280, 1280), # max width/height | |
| # # quality=75 # JPEG quality (60–80 is ideal) | |
| # # ) -> bytes: | |
| # # """ | |
| # # Compress image by resizing and lowering quality. | |
| # # Returns compressed image bytes. | |
| # # """ | |
| # # img = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| # # # Resize while maintaining aspect ratio | |
| # # img.thumbnail(max_size, Image.LANCZOS) | |
| # # output = io.BytesIO() | |
| # # img.save( | |
| # # output, | |
| # # format="JPEG", | |
| # # quality=quality, | |
| # # optimize=True, | |
| # # progressive=True | |
| # # ) | |
| # # return output.getvalue() | |
| # # # --------------------- DigitalOcean Spaces Helper --------------------- | |
| # # def get_spaces_client(): | |
| # # session = boto3.session.Session() | |
| # # client = session.client( | |
| # # 's3', | |
| # # region_name=DO_SPACES_REGION, | |
| # # endpoint_url=DO_SPACES_ENDPOINT, | |
| # # aws_access_key_id=DO_SPACES_KEY, | |
| # # aws_secret_access_key=DO_SPACES_SECRET, | |
| # # config=Config(signature_version='s3v4') | |
| # # ) | |
| # # return client | |
| # # def upload_to_spaces(file_bytes, key, content_type="image/png"): | |
| # # client = get_spaces_client() | |
| # # client.put_object(Bucket=DO_SPACES_BUCKET, Key=key, Body=file_bytes, ContentType=content_type, ACL='public-read') | |
| # # return f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{key}" | |
| # # def download_from_spaces(key): | |
| # # client = get_spaces_client() | |
| # # obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key) | |
| # # return obj['Body'].read() | |
| # # def build_multi_faceswap_gradio(): | |
| # # with gr.Blocks() as demo: | |
| # # gr.Markdown("## 👩❤️👨 Multi Face Swap (Couple → Couple)") | |
| # # with gr.Row(): | |
| # # src = gr.Image(type="numpy", label="Source Image (2 Faces)") | |
| # # tgt = gr.Image(type="numpy", label="Target Image (2 Faces)") | |
| # # out = gr.Image(type="numpy", label="Swapped Result") | |
| # # error = gr.Textbox(label="Logs", interactive=False) | |
| # # def process(src_img, tgt_img): | |
| # # try: | |
| # # swapped = multi_face_swap(src_img, tgt_img) | |
| # # enhanced = enhance_image_with_codeformer(swapped) | |
| # # return enhanced, "" | |
| # # except Exception as e: | |
| # # return None, str(e) | |
| # # btn = gr.Button("Swap Faces") | |
| # # btn.click(process, [src, tgt], [out, error]) | |
| # # return demo | |
| # # def mandatory_enhancement(rgb_img): | |
| # # """ | |
| # # Always runs CodeFormer on the final image. | |
| # # Fail-safe: returns original if enhancement fails. | |
| # # """ | |
| # # try: | |
| # # return enhance_image_with_codeformer(rgb_img) | |
| # # except Exception as e: | |
| # # logger.error(f"CodeFormer failed, returning original: {e}") | |
| # # return rgb_img | |
| # # # --------------------- API Endpoints --------------------- | |
| # # @fastapi_app.get("/") | |
| # # async def root(): | |
| # # """Root endpoint""" | |
| # # return { | |
| # # "success": True, | |
| # # "message": "FaceSwap API", | |
| # # "data": { | |
| # # "version": "1.0.0", | |
| # # "Product Name":"Beauty Camera - GlowCam AI Studio", | |
| # # "Released By" : "LogicGo Infotech" | |
| # # } | |
| # # } | |
| # # @fastapi_app.get("/health") | |
| # # async def health(): | |
| # # return {"status": "healthy"} | |
| # # from fastapi import Form | |
| # # import requests | |
| # # @fastapi_app.get("/test-admin-db") | |
| # # async def test_admin_db(): | |
| # # try: | |
| # # doc = await admin_db.list_collection_names() | |
| # # return {"ok": True, "collections": doc} | |
| # # except Exception as e: | |
| # # return {"ok": False, "error": str(e), "url": ADMIN_MONGO_URL} | |
| # # @fastapi_app.post("/face-swap", dependencies=[Depends(verify_token)]) | |
| # # async def face_swap_api( | |
| # # source: UploadFile = File(...), | |
| # # target_category_id: str = Form(None), | |
| # # new_category_id: str = Form(None), | |
| # # user_id: Optional[str] = Form(None), | |
| # # appname: Optional[str] = Form(None), | |
| # # credentials: HTTPAuthorizationCredentials = Security(security) | |
| # # ): | |
| # # start_time = datetime.utcnow() | |
| # # try: | |
| # # # ------------------------------------------------------------------ | |
| # # # VALIDATION | |
| # # # ------------------------------------------------------------------ | |
| # # # -------------------------------------------------------------- | |
| # # # BACKWARD COMPATIBILITY FOR OLD ANDROID VERSIONS | |
| # # # -------------------------------------------------------------- | |
| # # if target_category_id == "": | |
| # # target_category_id = None | |
| # # if new_category_id == "": | |
| # # new_category_id = None | |
| # # if user_id == "": | |
| # # user_id = None | |
| # # # media_clicks_collection = get_media_clicks_collection(appname) | |
| # # media_clicks_collection, subcategories_collection = get_app_db_collections(appname) | |
| # # logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # # if target_category_id and new_category_id: | |
| # # raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| # # if not target_category_id and not new_category_id: | |
| # # raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # # # ------------------------------------------------------------------ | |
| # # # READ SOURCE IMAGE | |
| # # # ------------------------------------------------------------------ | |
| # # src_bytes = await source.read() | |
| # # src_key = f"faceswap/source/{uuid.uuid4().hex}_{source.filename}" | |
| # # upload_to_spaces(src_bytes, src_key, content_type=source.content_type) | |
| # # # ------------------------------------------------------------------ | |
| # # # CASE 1 : new_category_id → MongoDB lookup | |
| # # # ------------------------------------------------------------------ | |
| # # if new_category_id: | |
| # # # doc = await subcategories_col.find_one({ | |
| # # # "asset_images._id": ObjectId(new_category_id) | |
| # # # }) | |
| # # doc = await subcategories_collection.find_one({ | |
| # # "asset_images._id": ObjectId(new_category_id) | |
| # # }) | |
| # # if not doc: | |
| # # raise HTTPException(404, "Asset image not found in database") | |
| # # # extract correct asset | |
| # # asset = next( | |
| # # (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| # # None | |
| # # ) | |
| # # if not asset: | |
| # # raise HTTPException(404, "Asset image URL not found") | |
| # # # correct URL | |
| # # target_url = asset["url"] | |
| # # # correct categoryId (ObjectId) | |
| # # #category_oid = doc["categoryId"] # <-- DO NOT CONVERT TO STRING | |
| # # subcategory_oid = doc["_id"] | |
| # # # ------------------------------------------------------------------# | |
| # # # # MEDIA_CLICKS (ONLY IF user_id PRESENT) | |
| # # # ------------------------------------------------------------------# | |
| # # if user_id and media_clicks_collection is not None: | |
| # # try: | |
| # # user_id_clean = user_id.strip() | |
| # # if not user_id_clean: | |
| # # raise ValueError("user_id cannot be empty") | |
| # # try: | |
| # # user_oid = ObjectId(user_id_clean) | |
| # # except (InvalidId, ValueError) as e: | |
| # # logger.error(f"Invalid user_id format: {user_id_clean}") | |
| # # raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| # # now = datetime.utcnow() | |
| # # # Normalize dates (UTC midnight) | |
| # # today_date = datetime(now.year, now.month, now.day) | |
| # # # ------------------------------------------------- | |
| # # # STEP 1: Ensure root document exists | |
| # # # ------------------------------------------------- | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$setOnInsert": { | |
| # # "userId": user_oid, | |
| # # "createdAt": now, | |
| # # "ai_edit_complete": 0, | |
| # # "ai_edit_daily_count": [] | |
| # # } | |
| # # }, | |
| # # upsert=True | |
| # # ) | |
| # # # ------------------------------------------------- | |
| # # # STEP 2: Handle DAILY USAGE (BINARY, NO DUPLICATES) | |
| # # # ------------------------------------------------- | |
| # # doc = await media_clicks_collection.find_one( | |
| # # {"userId": user_oid}, | |
| # # {"ai_edit_daily_count": 1} | |
| # # ) | |
| # # daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # # # Normalize today to UTC midnight | |
| # # today_date = datetime(now.year, now.month, now.day) | |
| # # # Build normalized date → count map (THIS ENFORCES UNIQUENESS) | |
| # # daily_map = {} | |
| # # for entry in daily_entries: | |
| # # d = entry["date"] | |
| # # if isinstance(d, datetime): | |
| # # d = datetime(d.year, d.month, d.day) | |
| # # daily_map[d] = entry["count"] # overwrite = no duplicates | |
| # # # Determine last recorded date | |
| # # last_date = max(daily_map.keys()) if daily_map else today_date | |
| # # # Fill ALL missing days with count = 0 | |
| # # next_day = last_date + timedelta(days=1) | |
| # # while next_day < today_date: | |
| # # daily_map.setdefault(next_day, 0) | |
| # # next_day += timedelta(days=1) | |
| # # # Mark today as used (binary) | |
| # # daily_map[today_date] = 1 | |
| # # # Rebuild list: OLDEST → NEWEST | |
| # # final_daily_entries = [ | |
| # # {"date": d, "count": daily_map[d]} | |
| # # for d in sorted(daily_map.keys()) | |
| # # ] | |
| # # # Keep only last 32 days | |
| # # final_daily_entries = final_daily_entries[-32:] | |
| # # # Atomic replace | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$set": { | |
| # # "ai_edit_daily_count": final_daily_entries, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # # ------------------------------------------------- | |
| # # # STEP 3: Try updating existing subCategory | |
| # # # ------------------------------------------------- | |
| # # update_result = await media_clicks_collection.update_one( | |
| # # { | |
| # # "userId": user_oid, | |
| # # "subCategories.subCategoryId": subcategory_oid | |
| # # }, | |
| # # { | |
| # # "$inc": { | |
| # # "subCategories.$.click_count": 1, | |
| # # "ai_edit_complete": 1 | |
| # # }, | |
| # # "$set": { | |
| # # "subCategories.$.lastClickedAt": now, | |
| # # "ai_edit_last_date": now, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # # ------------------------------------------------- | |
| # # # STEP 4: Push subCategory if missing | |
| # # # ------------------------------------------------- | |
| # # if update_result.matched_count == 0: | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$inc": { | |
| # # "ai_edit_complete": 1 | |
| # # }, | |
| # # "$set": { | |
| # # "ai_edit_last_date": now, | |
| # # "updatedAt": now | |
| # # }, | |
| # # "$push": { | |
| # # "subCategories": { | |
| # # "subCategoryId": subcategory_oid, | |
| # # "click_count": 1, | |
| # # "lastClickedAt": now | |
| # # } | |
| # # } | |
| # # } | |
| # # ) | |
| # # # ------------------------------------------------- | |
| # # # STEP 5: Sort subCategories by lastClickedAt (ascending - oldest first) | |
| # # # ------------------------------------------------- | |
| # # user_doc = await media_clicks_collection.find_one({"userId": user_oid}) | |
| # # if user_doc and "subCategories" in user_doc: | |
| # # subcategories = user_doc["subCategories"] | |
| # # # Sort by lastClickedAt in ascending order (oldest first) | |
| # # # Handle missing or None dates by using datetime.min | |
| # # subcategories_sorted = sorted( | |
| # # subcategories, | |
| # # key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| # # ) | |
| # # # Update with sorted array | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$set": { | |
| # # "subCategories": subcategories_sorted, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # logger.info( | |
| # # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| # # user_id, | |
| # # str(subcategory_oid) | |
| # # ) | |
| # # except Exception as media_err: | |
| # # logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # # elif user_id and media_clicks_collection is None: | |
| # # logger.warning("Media clicks collection unavailable; skipping media click tracking") | |
| # # # # ------------------------------------------------------------------ | |
| # # # # CASE 2 : target_category_id → DigitalOcean path (unchanged logic) | |
| # # # # ------------------------------------------------------------------ | |
| # # if target_category_id: | |
| # # client = get_spaces_client() | |
| # # base_prefix = "faceswap/target/" | |
| # # resp = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| # # ) | |
| # # # Extract categories from the CommonPrefixes | |
| # # categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| # # target_url = None | |
| # # # --- FIX STARTS HERE --- | |
| # # for category in categories: | |
| # # original_prefix = f"faceswap/target/{category}/original/" | |
| # # thumb_prefix = f"faceswap/target/{category}/thumb/" # Keep for file list check (optional but safe) | |
| # # # List objects in original/ | |
| # # original_objects = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| # # ).get("Contents", []) | |
| # # # List objects in thumb/ (optional: for the old code's extra check) | |
| # # thumb_objects = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| # # ).get("Contents", []) | |
| # # # Extract only the filenames and filter for .png | |
| # # original_filenames = sorted([ | |
| # # obj["Key"].split("/")[-1] for obj in original_objects | |
| # # if obj["Key"].split("/")[-1].endswith(".png") | |
| # # ]) | |
| # # thumb_filenames = [ | |
| # # obj["Key"].split("/")[-1] for obj in thumb_objects | |
| # # ] | |
| # # # Replicate the old indexing logic based on sorted filenames | |
| # # for idx, filename in enumerate(original_filenames, start=1): | |
| # # cid = f"{category.lower()}image_{idx}" | |
| # # # Optional: Replicate the thumb file check for 100% parity | |
| # # # if filename in thumb_filenames and cid == target_category_id: | |
| # # # Simpler check just on the ID, assuming thumb files are present | |
| # # if cid == target_category_id: | |
| # # # Construct the final target URL using the full prefix and the filename | |
| # # target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| # # break | |
| # # if target_url: | |
| # # break | |
| # # # --- FIX ENDS HERE --- | |
| # # if not target_url: | |
| # # raise HTTPException(404, "Target categoryId not found") | |
| # # # # ------------------------------------------------------------------ | |
| # # # # DOWNLOAD TARGET IMAGE | |
| # # # # ------------------------------------------------------------------ | |
| # # async with httpx.AsyncClient(timeout=30.0) as client: | |
| # # response = await client.get(target_url) | |
| # # response.raise_for_status() | |
| # # tgt_bytes = response.content | |
| # # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if src_bgr is None or tgt_bgr is None: | |
| # # raise HTTPException(400, "Invalid image data") | |
| # # src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| # # tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # # # ------------------------------------------------------------------ | |
| # # # FACE SWAP EXECUTION | |
| # # # ------------------------------------------------------------------ | |
| # # final_img, final_path, err = face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # # # #--------------------Version 2.0 ----------------------------------------# | |
| # # # final_img, final_path, err = enhanced_face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # # # #--------------------Version 2.0 ----------------------------------------# | |
| # # if err: | |
| # # raise HTTPException(500, err) | |
| # # with open(final_path, "rb") as f: | |
| # # result_bytes = f.read() | |
| # # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| # # result_url = upload_to_spaces(result_bytes, result_key) | |
| # # # ------------------------------------------------- | |
| # # # COMPRESS IMAGE (2–3 MB target) | |
| # # # ------------------------------------------------- | |
| # # compressed_bytes = compress_image( | |
| # # image_bytes=result_bytes, | |
| # # max_size=(1280, 1280), | |
| # # quality=72 | |
| # # ) | |
| # # compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| # # compressed_url = upload_to_spaces( | |
| # # compressed_bytes, | |
| # # compressed_key, | |
| # # content_type="image/jpeg" | |
| # # ) | |
| # # end_time = datetime.utcnow() | |
| # # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # # if database is not None: | |
| # # log_entry = { | |
| # # "endpoint": "/face-swap", | |
| # # "status": "success", | |
| # # "response_time_ms": response_time_ms, | |
| # # "timestamp": end_time | |
| # # } | |
| # # if appname: | |
| # # log_entry["appname"] = appname | |
| # # await database.api_logs.insert_one(log_entry) | |
| # # return { | |
| # # "result_key": result_key, | |
| # # "result_url": result_url, | |
| # # "Compressed_Image_URL": compressed_url | |
| # # } | |
| # # except Exception as e: | |
| # # end_time = datetime.utcnow() | |
| # # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # # if database is not None: | |
| # # log_entry = { | |
| # # "endpoint": "/face-swap", | |
| # # "status": "fail", | |
| # # "response_time_ms": response_time_ms, | |
| # # "timestamp": end_time, | |
| # # "error": str(e) | |
| # # } | |
| # # if appname: | |
| # # log_entry["appname"] = appname | |
| # # await database.api_logs.insert_one(log_entry) | |
| # # raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| # # @fastapi_app.get("/preview/{result_key:path}") | |
| # # async def preview_result(result_key: str): | |
| # # try: | |
| # # img_bytes = download_from_spaces(result_key) | |
| # # except Exception: | |
| # # raise HTTPException(status_code=404, detail="Result not found") | |
| # # return Response( | |
| # # content=img_bytes, | |
| # # media_type="image/png", | |
| # # headers={"Content-Disposition": "inline; filename=result.png"} | |
| # # ) | |
| # # @fastapi_app.post("/multi-face-swap", dependencies=[Depends(verify_token)]) | |
| # # async def multi_face_swap_api( | |
| # # source_image: UploadFile = File(...), | |
| # # target_image: UploadFile = File(...) | |
| # # ): | |
| # # start_time = datetime.utcnow() | |
| # # try: | |
| # # # ----------------------------- | |
| # # # Read images | |
| # # # ----------------------------- | |
| # # src_bytes = await source_image.read() | |
| # # tgt_bytes = await target_image.read() | |
| # # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if src_bgr is None or tgt_bgr is None: | |
| # # raise HTTPException(400, "Invalid image data") | |
| # # src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| # # tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # # # ----------------------------- | |
| # # # Multi-face swap | |
| # # # ----------------------------- | |
| # # swapped_rgb = multi_face_swap(src_rgb, tgt_rgb) | |
| # # # ----------------------------- | |
| # # # 🔥 MANDATORY ENHANCEMENT | |
| # # # ----------------------------- | |
| # # final_rgb = mandatory_enhancement(swapped_rgb) | |
| # # final_bgr = cv2.cvtColor(final_rgb, cv2.COLOR_RGB2BGR) | |
| # # # ----------------------------- | |
| # # # Save temp result | |
| # # # ----------------------------- | |
| # # temp_dir = tempfile.mkdtemp(prefix="multi_faceswap_") | |
| # # result_path = os.path.join(temp_dir, "result.png") | |
| # # cv2.imwrite(result_path, final_bgr) | |
| # # with open(result_path, "rb") as f: | |
| # # result_bytes = f.read() | |
| # # # ----------------------------- | |
| # # # Upload | |
| # # # ----------------------------- | |
| # # result_key = f"faceswap/multi/{uuid.uuid4().hex}.png" | |
| # # result_url = upload_to_spaces( | |
| # # result_bytes, | |
| # # result_key, | |
| # # content_type="image/png" | |
| # # ) | |
| # # return { | |
| # # "result_key": result_key, | |
| # # "result_url": result_url | |
| # # } | |
| # # except Exception as e: | |
| # # raise HTTPException(status_code=500, detail=str(e)) | |
| # # @fastapi_app.post("/face-swap-couple", dependencies=[Depends(verify_token)]) | |
| # # async def face_swap_api( | |
| # # image1: UploadFile = File(...), | |
| # # image2: Optional[UploadFile] = File(None), | |
| # # target_category_id: str = Form(None), | |
| # # new_category_id: str = Form(None), | |
| # # user_id: Optional[str] = Form(None), | |
| # # appname: Optional[str] = Form(None), | |
| # # credentials: HTTPAuthorizationCredentials = Security(security) | |
| # # ): | |
| # # """ | |
| # # Production-ready face swap endpoint supporting: | |
| # # - Multiple source images (image1 + optional image2) | |
| # # - Gender-based pairing | |
| # # - Merged faces from multiple sources | |
| # # - Mandatory CodeFormer enhancement | |
| # # """ | |
| # # start_time = datetime.utcnow() | |
| # # try: | |
| # # # ----------------------------- | |
| # # # Validate input | |
| # # # ----------------------------- | |
| # # if target_category_id == "": | |
| # # target_category_id = None | |
| # # if new_category_id == "": | |
| # # new_category_id = None | |
| # # if user_id == "": | |
| # # user_id = None | |
| # # media_clicks_collection = get_media_clicks_collection(appname) | |
| # # if target_category_id and new_category_id: | |
| # # raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| # # if not target_category_id and not new_category_id: | |
| # # raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # # logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # # # ----------------------------- | |
| # # # Read source images | |
| # # # ----------------------------- | |
| # # src_images = [] | |
| # # img1_bytes = await image1.read() | |
| # # src1 = cv2.imdecode(np.frombuffer(img1_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if src1 is None: | |
| # # raise HTTPException(400, "Invalid image1 data") | |
| # # src_images.append(cv2.cvtColor(src1, cv2.COLOR_BGR2RGB)) | |
| # # if image2: | |
| # # img2_bytes = await image2.read() | |
| # # src2 = cv2.imdecode(np.frombuffer(img2_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if src2 is not None: | |
| # # src_images.append(cv2.cvtColor(src2, cv2.COLOR_BGR2RGB)) | |
| # # # ----------------------------- | |
| # # # Resolve target image | |
| # # # ----------------------------- | |
| # # target_url = None | |
| # # if new_category_id: | |
| # # doc = await subcategories_col.find_one({ | |
| # # "asset_images._id": ObjectId(new_category_id) | |
| # # }) | |
| # # if not doc: | |
| # # raise HTTPException(404, "Asset image not found in database") | |
| # # asset = next( | |
| # # (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| # # None | |
| # # ) | |
| # # if not asset: | |
| # # raise HTTPException(404, "Asset image URL not found") | |
| # # target_url = asset["url"] | |
| # # subcategory_oid = doc["_id"] | |
| # # if user_id and media_clicks_collection is not None: | |
| # # try: | |
| # # user_id_clean = user_id.strip() | |
| # # if not user_id_clean: | |
| # # raise ValueError("user_id cannot be empty") | |
| # # try: | |
| # # user_oid = ObjectId(user_id_clean) | |
| # # except (InvalidId, ValueError): | |
| # # logger.error(f"Invalid user_id format: {user_id_clean}") | |
| # # raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| # # now = datetime.utcnow() | |
| # # # Step 1: ensure root document exists | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$setOnInsert": { | |
| # # "userId": user_oid, | |
| # # "createdAt": now, | |
| # # "ai_edit_complete": 0, | |
| # # "ai_edit_daily_count": [] | |
| # # } | |
| # # }, | |
| # # upsert=True | |
| # # ) | |
| # # # Step 2: handle daily usage (binary, no duplicates) | |
| # # doc = await media_clicks_collection.find_one( | |
| # # {"userId": user_oid}, | |
| # # {"ai_edit_daily_count": 1} | |
| # # ) | |
| # # daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # # today_date = datetime(now.year, now.month, now.day) | |
| # # daily_map = {} | |
| # # for entry in daily_entries: | |
| # # d = entry["date"] | |
| # # if isinstance(d, datetime): | |
| # # d = datetime(d.year, d.month, d.day) | |
| # # daily_map[d] = entry["count"] | |
| # # last_date = max(daily_map.keys()) if daily_map else None | |
| # # if last_date != today_date: | |
| # # daily_map[today_date] = 1 | |
| # # final_daily_entries = [ | |
| # # {"date": d, "count": daily_map[d]} | |
| # # for d in sorted(daily_map.keys()) | |
| # # ] | |
| # # final_daily_entries = final_daily_entries[-32:] | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$set": { | |
| # # "ai_edit_daily_count": final_daily_entries, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # # Step 3: try updating existing subCategory | |
| # # update_result = await media_clicks_collection.update_one( | |
| # # { | |
| # # "userId": user_oid, | |
| # # "subCategories.subCategoryId": subcategory_oid | |
| # # }, | |
| # # { | |
| # # "$inc": { | |
| # # "subCategories.$.click_count": 1, | |
| # # "ai_edit_complete": 1 | |
| # # }, | |
| # # "$set": { | |
| # # "subCategories.$.lastClickedAt": now, | |
| # # "ai_edit_last_date": now, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # # Step 4: push subCategory if missing | |
| # # if update_result.matched_count == 0: | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$inc": { | |
| # # "ai_edit_complete": 1 | |
| # # }, | |
| # # "$set": { | |
| # # "ai_edit_last_date": now, | |
| # # "updatedAt": now | |
| # # }, | |
| # # "$push": { | |
| # # "subCategories": { | |
| # # "subCategoryId": subcategory_oid, | |
| # # "click_count": 1, | |
| # # "lastClickedAt": now | |
| # # } | |
| # # } | |
| # # } | |
| # # ) | |
| # # # Step 5: sort subCategories by lastClickedAt (ascending) | |
| # # user_doc = await media_clicks_collection.find_one({"userId": user_oid}) | |
| # # if user_doc and "subCategories" in user_doc: | |
| # # subcategories = user_doc["subCategories"] | |
| # # subcategories_sorted = sorted( | |
| # # subcategories, | |
| # # key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| # # ) | |
| # # await media_clicks_collection.update_one( | |
| # # {"userId": user_oid}, | |
| # # { | |
| # # "$set": { | |
| # # "subCategories": subcategories_sorted, | |
| # # "updatedAt": now | |
| # # } | |
| # # } | |
| # # ) | |
| # # logger.info( | |
| # # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| # # user_id, | |
| # # str(subcategory_oid) | |
| # # ) | |
| # # except Exception as media_err: | |
| # # logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # # elif user_id and media_clicks_collection is None: | |
| # # logger.warning("Media clicks collection unavailable; skipping media click tracking") | |
| # # if target_category_id: | |
| # # client = get_spaces_client() | |
| # # base_prefix = "faceswap/target/" | |
| # # resp = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| # # ) | |
| # # categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| # # for category in categories: | |
| # # original_prefix = f"faceswap/target/{category}/original/" | |
| # # thumb_prefix = f"faceswap/target/{category}/thumb/" | |
| # # original_objects = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| # # ).get("Contents", []) | |
| # # thumb_objects = client.list_objects_v2( | |
| # # Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| # # ).get("Contents", []) | |
| # # original_filenames = sorted([ | |
| # # obj["Key"].split("/")[-1] for obj in original_objects | |
| # # if obj["Key"].split("/")[-1].endswith(".png") | |
| # # ]) | |
| # # for idx, filename in enumerate(original_filenames, start=1): | |
| # # cid = f"{category.lower()}image_{idx}" | |
| # # if cid == target_category_id: | |
| # # target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| # # break | |
| # # if target_url: | |
| # # break | |
| # # if not target_url: | |
| # # raise HTTPException(404, "Target categoryId not found") | |
| # # async with httpx.AsyncClient(timeout=30.0) as client: | |
| # # response = await client.get(target_url) | |
| # # response.raise_for_status() | |
| # # tgt_bytes = response.content | |
| # # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # # if tgt_bgr is None: | |
| # # raise HTTPException(400, "Invalid target image data") | |
| # # # ----------------------------- | |
| # # # Merge all source faces | |
| # # # ----------------------------- | |
| # # all_src_faces = [] | |
| # # for img in src_images: | |
| # # faces = face_analysis_app.get(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| # # all_src_faces.extend(faces) | |
| # # if not all_src_faces: | |
| # # raise HTTPException(400, "No faces detected in source images") | |
| # # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # # if not tgt_faces: | |
| # # raise HTTPException(400, "No faces detected in target image") | |
| # # # ----------------------------- | |
| # # # Gender-based pairing | |
| # # # ----------------------------- | |
| # # def face_sort_key(face): | |
| # # x1, y1, x2, y2 = face.bbox | |
| # # area = (x2 - x1) * (y2 - y1) | |
| # # cx = (x1 + x2) / 2 | |
| # # return (-area, cx) | |
| # # # Separate by gender | |
| # # src_male = sorted([f for f in all_src_faces if f.gender == 1], key=face_sort_key) | |
| # # src_female = sorted([f for f in all_src_faces if f.gender == 0], key=face_sort_key) | |
| # # tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| # # tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| # # pairs = [] | |
| # # for s, t in zip(src_male, tgt_male): | |
| # # pairs.append((s, t)) | |
| # # for s, t in zip(src_female, tgt_female): | |
| # # pairs.append((s, t)) | |
| # # # fallback if gender mismatch | |
| # # if not pairs: | |
| # # src_all = sorted(all_src_faces, key=face_sort_key) | |
| # # tgt_all = sorted(tgt_faces, key=face_sort_key) | |
| # # pairs = list(zip(src_all, tgt_all)) | |
| # # # ----------------------------- | |
| # # # Perform face swap | |
| # # # ----------------------------- | |
| # # with swap_lock: | |
| # # result_img = tgt_bgr.copy() | |
| # # for src_face, _ in pairs: | |
| # # if face_analysis_app is None: | |
| # # raise HTTPException(status_code=500, detail="Face analysis models not initialized. Please ensure models are downloaded.") | |
| # # current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| # # candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| # # target_face = candidates[0] | |
| # # if swapper is None: | |
| # # raise HTTPException(status_code=500, detail="Face swap models not initialized. Please ensure models are downloaded.") | |
| # # result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| # # result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| # # # ----------------------------- | |
| # # # Mandatory enhancement | |
| # # # ----------------------------- | |
| # # enhanced_rgb = mandatory_enhancement(result_rgb) | |
| # # enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| # # # ----------------------------- | |
| # # # Save, upload, compress | |
| # # # ----------------------------- | |
| # # temp_dir = tempfile.mkdtemp(prefix="faceswap_") | |
| # # final_path = os.path.join(temp_dir, "result.png") | |
| # # cv2.imwrite(final_path, enhanced_bgr) | |
| # # with open(final_path, "rb") as f: | |
| # # result_bytes = f.read() | |
| # # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| # # result_url = upload_to_spaces(result_bytes, result_key) | |
| # # compressed_bytes = compress_image(result_bytes, max_size=(1280, 1280), quality=72) | |
| # # compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| # # compressed_url = upload_to_spaces(compressed_bytes, compressed_key, content_type="image/jpeg") | |
| # # # ----------------------------- | |
| # # # Log API usage | |
| # # # ----------------------------- | |
| # # end_time = datetime.utcnow() | |
| # # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # # if database is not None: | |
| # # log_entry = { | |
| # # "endpoint": "/face-swap-couple", | |
| # # "status": "success", | |
| # # "response_time_ms": response_time_ms, | |
| # # "timestamp": end_time | |
| # # } | |
| # # if appname: | |
| # # log_entry["appname"] = appname | |
| # # await database.api_logs.insert_one(log_entry) | |
| # # return { | |
| # # "result_key": result_key, | |
| # # "result_url": result_url, | |
| # # "compressed_url": compressed_url | |
| # # } | |
| # # except Exception as e: | |
| # # end_time = datetime.utcnow() | |
| # # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # # if database is not None: | |
| # # log_entry = { | |
| # # "endpoint": "/face-swap-couple", | |
| # # "status": "fail", | |
| # # "response_time_ms": response_time_ms, | |
| # # "timestamp": end_time, | |
| # # "error": str(e) | |
| # # } | |
| # # if appname: | |
| # # log_entry["appname"] = appname | |
| # # await database.api_logs.insert_one(log_entry) | |
| # # raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| # # # --------------------- Mount Gradio --------------------- | |
| # # multi_faceswap_app = build_multi_faceswap_gradio() | |
| # # fastapi_app = mount_gradio_app( | |
| # # fastapi_app, | |
| # # multi_faceswap_app, | |
| # # path="/gradio-couple-faceswap" | |
| # # ) | |
| # # if __name__ == "__main__": | |
| # # uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |
| # import os | |
| # os.environ["OMP_NUM_THREADS"] = "1" | |
| # import shutil | |
| # import uuid | |
| # import cv2 | |
| # import numpy as np | |
| # import threading | |
| # import subprocess | |
| # import logging | |
| # import tempfile | |
| # import sys | |
| # from datetime import datetime,timedelta | |
| # import tempfile | |
| # import insightface | |
| # from insightface.app import FaceAnalysis | |
| # from huggingface_hub import hf_hub_download | |
| # from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Depends, Security, Form | |
| # from fastapi.responses import RedirectResponse | |
| # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| # from motor.motor_asyncio import AsyncIOMotorClient | |
| # from bson import ObjectId | |
| # from bson.errors import InvalidId | |
| # import httpx | |
| # import uvicorn | |
| # import gradio as gr | |
| # from gradio import mount_gradio_app | |
| # from PIL import Image | |
| # import io | |
| # # from scipy import ndimage | |
| # # DigitalOcean Spaces | |
| # import boto3 | |
| # from botocore.client import Config | |
| # from typing import Optional | |
| # # --------------------- Logging --------------------- | |
| # logging.basicConfig(level=logging.INFO) | |
| # logger = logging.getLogger(__name__) | |
| # # --------------------- Secrets & Paths --------------------- | |
| # REPO_ID = "HariLogicgo/face_swap_models" | |
| # MODELS_DIR = "./models" | |
| # os.makedirs(MODELS_DIR, exist_ok=True) | |
| # HF_TOKEN = os.getenv("HF_TOKEN") | |
| # API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| # DO_SPACES_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| # DO_SPACES_ENDPOINT = f"https://{DO_SPACES_REGION}.digitaloceanspaces.com" | |
| # DO_SPACES_KEY = os.getenv("DO_SPACES_KEY") | |
| # DO_SPACES_SECRET = os.getenv("DO_SPACES_SECRET") | |
| # DO_SPACES_BUCKET = os.getenv("DO_SPACES_BUCKET") | |
| # # NEW admin DB (with error handling for missing env vars) | |
| # ADMIN_MONGO_URL = os.getenv("ADMIN_MONGO_URL") | |
| # admin_client = None | |
| # admin_db = None | |
| # subcategories_col = None | |
| # media_clicks_col = None | |
| # if ADMIN_MONGO_URL: | |
| # try: | |
| # admin_client = AsyncIOMotorClient(ADMIN_MONGO_URL) | |
| # admin_db = admin_client.adminPanel | |
| # subcategories_col = admin_db.subcategories | |
| # media_clicks_col = admin_db.media_clicks | |
| # except Exception as e: | |
| # logger.warning(f"MongoDB admin connection failed (optional): {e}") | |
| # # Collage Maker DB (optional) | |
| # COLLAGE_MAKER_DB_URL = os.getenv("COLLAGE_MAKER_DB_URL") | |
| # collage_maker_client = None | |
| # collage_maker_db = None | |
| # collage_media_clicks_col = None | |
| # collage_subcategories_col = None | |
| # if COLLAGE_MAKER_DB_URL: | |
| # try: | |
| # collage_maker_client = AsyncIOMotorClient(COLLAGE_MAKER_DB_URL) | |
| # collage_maker_db = collage_maker_client.adminPanel | |
| # collage_media_clicks_col = collage_maker_db.media_clicks | |
| # collage_subcategories_col = collage_maker_db.subcategories | |
| # except Exception as e: | |
| # logger.warning(f"MongoDB collage-maker connection failed (optional): {e}") | |
| # # AI Enhancer DB (optional) | |
| # AI_ENHANCER_DB_URL = os.getenv("AI_ENHANCER_DB_URL") | |
| # ai_enhancer_client = None | |
| # ai_enhancer_db = None | |
| # ai_enhancer_media_clicks_col = None | |
| # ai_enhancer_subcategories_col = None | |
| # if AI_ENHANCER_DB_URL: | |
| # try: | |
| # ai_enhancer_client = AsyncIOMotorClient(AI_ENHANCER_DB_URL) | |
| # ai_enhancer_db = ai_enhancer_client.test # 🔴 test database | |
| # ai_enhancer_media_clicks_col = ai_enhancer_db.media_clicks | |
| # ai_enhancer_subcategories_col = ai_enhancer_db.subcategories | |
| # except Exception as e: | |
| # logger.warning(f"MongoDB ai-enhancer connection failed (optional): {e}") | |
| # def get_media_clicks_collection(appname: Optional[str] = None): | |
| # """Return the media clicks collection for the given app (default: main admin).""" | |
| # if appname and str(appname).strip().lower() == "collage-maker": | |
| # return collage_media_clicks_col | |
| # return media_clicks_col | |
| # # OLD logs DB | |
| # MONGODB_URL = os.getenv("MONGODB_URL") | |
| # client = None | |
| # database = None | |
| # # --------------------- Download Models --------------------- | |
| # def download_models(): | |
| # try: | |
| # logger.info("Downloading models...") | |
| # inswapper_path = hf_hub_download( | |
| # repo_id=REPO_ID, | |
| # filename="models/inswapper_128.onnx", | |
| # repo_type="model", | |
| # local_dir=MODELS_DIR, | |
| # token=HF_TOKEN | |
| # ) | |
| # buffalo_files = ["1k3d68.onnx", "2d106det.onnx", "genderage.onnx", "det_10g.onnx", "w600k_r50.onnx"] | |
| # for f in buffalo_files: | |
| # hf_hub_download( | |
| # repo_id=REPO_ID, | |
| # filename=f"models/buffalo_l/" + f, | |
| # repo_type="model", | |
| # local_dir=MODELS_DIR, | |
| # token=HF_TOKEN | |
| # ) | |
| # logger.info("Models downloaded successfully.") | |
| # return inswapper_path | |
| # except Exception as e: | |
| # logger.error(f"Model download failed: {e}") | |
| # raise | |
| # try: | |
| # inswapper_path = download_models() | |
| # # --------------------- Face Analysis + Swapper --------------------- | |
| # providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| # face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| # face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| # swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| # logger.info("Face analysis models loaded successfully") | |
| # except Exception as e: | |
| # logger.error(f"Failed to initialize face analysis models: {e}") | |
| # # Set defaults to prevent crash | |
| # inswapper_path = None | |
| # face_analysis_app = None | |
| # swapper = None | |
| # # --------------------- CodeFormer --------------------- | |
| # CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| # def ensure_codeformer(): | |
| # try: | |
| # if not os.path.exists("CodeFormer"): | |
| # logger.info("CodeFormer not found, cloning repository...") | |
| # subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| # subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=False) # Non-critical deps | |
| # # Always ensure BasicSR is installed from local directory | |
| # # This is needed for Hugging Face Spaces where BasicSR can't be installed from GitHub | |
| # if os.path.exists("CodeFormer/basicsr/setup.py"): | |
| # logger.info("Installing BasicSR from local directory...") | |
| # subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True) | |
| # logger.info("BasicSR installed successfully") | |
| # # Install realesrgan after BasicSR is installed (realesrgan depends on BasicSR) | |
| # # This must be done after BasicSR installation to avoid PyPI install issues | |
| # try: | |
| # import realesrgan | |
| # logger.info("RealESRGAN already installed") | |
| # except ImportError: | |
| # logger.info("Installing RealESRGAN...") | |
| # subprocess.run("pip install --no-cache-dir realesrgan", shell=True, check=True) | |
| # logger.info("RealESRGAN installed successfully") | |
| # # Download models if CodeFormer exists (fixed logic) | |
| # if os.path.exists("CodeFormer"): | |
| # try: | |
| # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=False, timeout=300) | |
| # except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| # logger.warning("Failed to download facelib models (optional)") | |
| # try: | |
| # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=False, timeout=300) | |
| # except (subprocess.TimeoutExpired, subprocess.CalledProcessError): | |
| # logger.warning("Failed to download CodeFormer models (optional)") | |
| # except Exception as e: | |
| # logger.error(f"CodeFormer setup failed: {e}") | |
| # logger.warning("Continuing without CodeFormer features...") | |
| # ensure_codeformer() | |
| # # --------------------- FastAPI --------------------- | |
| # fastapi_app = FastAPI() | |
| # @fastapi_app.on_event("startup") | |
| # async def startup_db(): | |
| # global client, database | |
| # if MONGODB_URL: | |
| # try: | |
| # logger.info("Initializing MongoDB for API logs...") | |
| # client = AsyncIOMotorClient(MONGODB_URL) | |
| # database = client.FaceSwap | |
| # logger.info("MongoDB initialized for API logs") | |
| # except Exception as e: | |
| # logger.warning(f"MongoDB connection failed (optional): {e}") | |
| # client = None | |
| # database = None | |
| # else: | |
| # logger.warning("MONGODB_URL not set, skipping MongoDB initialization") | |
| # @fastapi_app.on_event("shutdown") | |
| # async def shutdown_db(): | |
| # global client, admin_client, collage_maker_client | |
| # if client is not None: | |
| # client.close() | |
| # logger.info("MongoDB connection closed") | |
| # if admin_client is not None: | |
| # admin_client.close() | |
| # logger.info("Admin MongoDB connection closed") | |
| # if collage_maker_client is not None: | |
| # collage_maker_client.close() | |
| # logger.info("Collage Maker MongoDB connection closed") | |
| # # --------------------- Auth --------------------- | |
| # security = HTTPBearer() | |
| # def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| # if credentials.credentials != API_SECRET_TOKEN: | |
| # raise HTTPException(status_code=401, detail="Invalid or missing token") | |
| # return credentials.credentials | |
| # # --------------------- DB Selector --------------------- | |
| # # def get_media_clicks_collection(appname: Optional[str] = None): | |
| # # """ | |
| # # Returns the correct media_clicks collection based on appname. | |
| # # Defaults to the primary admin database when no appname is provided | |
| # # or when the requested database is unavailable. | |
| # # """ | |
| # # if appname: | |
| # # normalized = appname.strip().lower() | |
| # # if normalized == "collage-maker": | |
| # # if collage_media_clicks_col is not None: | |
| # # return collage_media_clicks_col | |
| # # logger.warning("COLLAGE_MAKER_DB_URL not configured; falling back to default media_clicks collection") | |
| # # return media_clicks_col | |
| # def get_app_db_collections(appname: Optional[str] = None): | |
| # """ | |
| # Returns (media_clicks_collection, subcategories_collection) | |
| # based on appname. | |
| # """ | |
| # if appname: | |
| # app = appname.strip().lower() | |
| # if app == "collage-maker": | |
| # if collage_media_clicks_col is not None and collage_subcategories_col is not None: | |
| # return collage_media_clicks_col, collage_subcategories_col | |
| # logger.warning("Collage-maker DB not configured, falling back to admin") | |
| # elif app == "ai-enhancer": | |
| # if ai_enhancer_media_clicks_col is not None and ai_enhancer_subcategories_col is not None: | |
| # return ai_enhancer_media_clicks_col, ai_enhancer_subcategories_col | |
| # logger.warning("AI-Enhancer DB not configured, falling back to admin") | |
| # # default fallback | |
| # return media_clicks_col, subcategories_col | |
| # # --------------------- Logging API Hits --------------------- | |
| # async def log_faceswap_hit(token: str, status: str = "success"): | |
| # global database | |
| # if database is None: | |
| # return | |
| # await database.api_logs.insert_one({ | |
| # "token": token, | |
| # "endpoint": "/faceswap", | |
| # "status": status, | |
| # "timestamp": datetime.utcnow() | |
| # }) | |
| # # --------------------- Face Swap Pipeline --------------------- | |
| # swap_lock = threading.Lock() | |
| # def enhance_image_with_codeformer(rgb_img, temp_dir=None): | |
| # if temp_dir is None: | |
| # temp_dir = os.path.join(tempfile.gettempdir(), f"enhance_{uuid.uuid4().hex[:8]}") | |
| # os.makedirs(temp_dir, exist_ok=True) | |
| # input_path = os.path.join(temp_dir, "input.jpg") | |
| # cv2.imwrite(input_path, cv2.cvtColor(rgb_img, cv2.COLOR_RGB2BGR)) | |
| # python_cmd = sys.executable if sys.executable else "python3" | |
| # cmd = ( | |
| # f"{python_cmd} {CODEFORMER_PATH} " | |
| # f"-w 0.7 " | |
| # f"--input_path {input_path} " | |
| # f"--output_path {temp_dir} " | |
| # f"--bg_upsampler realesrgan " | |
| # f"--face_upsample" | |
| # ) | |
| # result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| # if result.returncode != 0: | |
| # raise RuntimeError(result.stderr) | |
| # final_dir = os.path.join(temp_dir, "final_results") | |
| # files = [f for f in os.listdir(final_dir) if f.endswith(".png")] | |
| # if not files: | |
| # raise RuntimeError("No enhanced output") | |
| # final_path = os.path.join(final_dir, files[0]) | |
| # enhanced = cv2.imread(final_path) | |
| # return cv2.cvtColor(enhanced, cv2.COLOR_BGR2RGB) | |
| # def multi_face_swap(src_img, tgt_img): | |
| # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # src_faces = face_analysis_app.get(src_bgr) | |
| # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # if not src_faces or not tgt_faces: | |
| # raise ValueError("No faces detected") | |
| # def face_sort_key(face): | |
| # x1, y1, x2, y2 = face.bbox | |
| # area = (x2 - x1) * (y2 - y1) | |
| # cx = (x1 + x2) / 2 | |
| # return (-area, cx) | |
| # # Split by gender | |
| # src_male = [f for f in src_faces if f.gender == 1] | |
| # src_female = [f for f in src_faces if f.gender == 0] | |
| # tgt_male = [f for f in tgt_faces if f.gender == 1] | |
| # tgt_female = [f for f in tgt_faces if f.gender == 0] | |
| # # Sort inside gender groups | |
| # src_male = sorted(src_male, key=face_sort_key) | |
| # src_female = sorted(src_female, key=face_sort_key) | |
| # tgt_male = sorted(tgt_male, key=face_sort_key) | |
| # tgt_female = sorted(tgt_female, key=face_sort_key) | |
| # # Build final swap pairs | |
| # pairs = [] | |
| # for s, t in zip(src_male, tgt_male): | |
| # pairs.append((s, t)) | |
| # for s, t in zip(src_female, tgt_female): | |
| # pairs.append((s, t)) | |
| # # Fallback if gender mismatch | |
| # if not pairs: | |
| # src_faces = sorted(src_faces, key=face_sort_key) | |
| # tgt_faces = sorted(tgt_faces, key=face_sort_key) | |
| # pairs = list(zip(src_faces, tgt_faces)) | |
| # result_img = tgt_bgr.copy() | |
| # for src_face, _ in pairs: | |
| # # 🔁 re-detect current target faces | |
| # if face_analysis_app is None: | |
| # raise ValueError("Face analysis models not initialized. Please ensure models are downloaded.") | |
| # current_faces = face_analysis_app.get(result_img) | |
| # current_faces = sorted(current_faces, key=face_sort_key) | |
| # # choose best matching gender | |
| # candidates = [ | |
| # f for f in current_faces if f.gender == src_face.gender | |
| # ] or current_faces | |
| # target_face = candidates[0] | |
| # if swapper is None: | |
| # raise ValueError("Face swap models not initialized. Please ensure models are downloaded.") | |
| # result_img = swapper.get( | |
| # result_img, | |
| # target_face, | |
| # src_face, | |
| # paste_back=True | |
| # ) | |
| # return cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| # def face_swap_and_enhance(src_img, tgt_img, temp_dir=None): | |
| # try: | |
| # with swap_lock: | |
| # # Use a temp dir for intermediate files | |
| # if temp_dir is None: | |
| # temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_work_{uuid.uuid4().hex[:8]}") | |
| # if os.path.exists(temp_dir): | |
| # shutil.rmtree(temp_dir) | |
| # os.makedirs(temp_dir, exist_ok=True) | |
| # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # src_faces = face_analysis_app.get(src_bgr) | |
| # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # if face_analysis_app is None: | |
| # return None, None, "❌ Face analysis models not initialized. Please ensure models are downloaded." | |
| # if not src_faces or not tgt_faces: | |
| # return None, None, "❌ Face not detected in one of the images" | |
| # swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg") | |
| # if swapper is None: | |
| # return None, None, "❌ Face swap models not initialized. Please ensure models are downloaded." | |
| # swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| # if swapped_bgr is None: | |
| # return None, None, "❌ Face swap failed" | |
| # cv2.imwrite(swapped_path, swapped_bgr) | |
| # python_cmd = sys.executable if sys.executable else "python3" | |
| # cmd = f"{python_cmd} {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample" | |
| # result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| # if result.returncode != 0: | |
| # return None, None, f"❌ CodeFormer failed:\n{result.stderr}" | |
| # final_results_dir = os.path.join(temp_dir, "final_results") | |
| # final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")] | |
| # if not final_files: | |
| # return None, None, "❌ No enhanced image found" | |
| # final_path = os.path.join(final_results_dir, final_files[0]) | |
| # final_img_bgr = cv2.imread(final_path) | |
| # if final_img_bgr is None: | |
| # return None, None, "❌ Failed to read enhanced image file" | |
| # final_img = cv2.cvtColor(final_img_bgr, cv2.COLOR_BGR2RGB) | |
| # return final_img, final_path, "" | |
| # except Exception as e: | |
| # return None, None, f"❌ Error: {str(e)}" | |
| # def compress_image( | |
| # image_bytes: bytes, | |
| # max_size=(1280, 1280), # max width/height | |
| # quality=75 # JPEG quality (60–80 is ideal) | |
| # ) -> bytes: | |
| # """ | |
| # Compress image by resizing and lowering quality. | |
| # Returns compressed image bytes. | |
| # """ | |
| # img = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| # # Resize while maintaining aspect ratio | |
| # img.thumbnail(max_size, Image.LANCZOS) | |
| # output = io.BytesIO() | |
| # img.save( | |
| # output, | |
| # format="JPEG", | |
| # quality=quality, | |
| # optimize=True, | |
| # progressive=True | |
| # ) | |
| # return output.getvalue() | |
| # # --------------------- DigitalOcean Spaces Helper --------------------- | |
| # def get_spaces_client(): | |
| # session = boto3.session.Session() | |
| # client = session.client( | |
| # 's3', | |
| # region_name=DO_SPACES_REGION, | |
| # endpoint_url=DO_SPACES_ENDPOINT, | |
| # aws_access_key_id=DO_SPACES_KEY, | |
| # aws_secret_access_key=DO_SPACES_SECRET, | |
| # config=Config(signature_version='s3v4') | |
| # ) | |
| # return client | |
| # def upload_to_spaces(file_bytes, key, content_type="image/png"): | |
| # client = get_spaces_client() | |
| # client.put_object(Bucket=DO_SPACES_BUCKET, Key=key, Body=file_bytes, ContentType=content_type, ACL='public-read') | |
| # return f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{key}" | |
| # def download_from_spaces(key): | |
| # client = get_spaces_client() | |
| # obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key) | |
| # return obj['Body'].read() | |
| # def build_multi_faceswap_gradio(): | |
| # with gr.Blocks() as demo: | |
| # gr.Markdown("## 👩❤️👨 Multi Face Swap (Couple → Couple)") | |
| # with gr.Row(): | |
| # src = gr.Image(type="numpy", label="Source Image (2 Faces)") | |
| # tgt = gr.Image(type="numpy", label="Target Image (2 Faces)") | |
| # out = gr.Image(type="numpy", label="Swapped Result") | |
| # error = gr.Textbox(label="Logs", interactive=False) | |
| # def process(src_img, tgt_img): | |
| # try: | |
| # swapped = multi_face_swap(src_img, tgt_img) | |
| # enhanced = enhance_image_with_codeformer(swapped) | |
| # return enhanced, "" | |
| # except Exception as e: | |
| # return None, str(e) | |
| # btn = gr.Button("Swap Faces") | |
| # btn.click(process, [src, tgt], [out, error]) | |
| # return demo | |
| # def mandatory_enhancement(rgb_img): | |
| # """ | |
| # Always runs CodeFormer on the final image. | |
| # Fail-safe: returns original if enhancement fails. | |
| # """ | |
| # try: | |
| # return enhance_image_with_codeformer(rgb_img) | |
| # except Exception as e: | |
| # logger.error(f"CodeFormer failed, returning original: {e}") | |
| # return rgb_img | |
| # # --------------------- API Endpoints --------------------- | |
| # @fastapi_app.get("/") | |
| # async def root(): | |
| # """Root endpoint""" | |
| # return { | |
| # "success": True, | |
| # "message": "FaceSwap API", | |
| # "data": { | |
| # "version": "1.0.0", | |
| # "Product Name":"Collage Maker , AI Enhancer App - GlowCam AI Studio", | |
| # "Released By" : "LogicGo Infotech" | |
| # } | |
| # } | |
| # @fastapi_app.get("/health") | |
| # async def health(): | |
| # return {"status": "healthy"} | |
| # from fastapi import Form | |
| # import requests | |
| # @fastapi_app.get("/test-admin-db") | |
| # async def test_admin_db(): | |
| # try: | |
| # doc = await admin_db.list_collection_names() | |
| # return {"ok": True, "collections": doc} | |
| # except Exception as e: | |
| # return {"ok": False, "error": str(e), "url": ADMIN_MONGO_URL} | |
| # @fastapi_app.post("/face-swap", dependencies=[Depends(verify_token)]) | |
| # async def face_swap_api( | |
| # source: UploadFile = File(...), | |
| # target_category_id: str = Form(None), | |
| # new_category_id: str = Form(None), | |
| # user_id: Optional[str] = Form(None), | |
| # appname: Optional[str] = Form(None), | |
| # credentials: HTTPAuthorizationCredentials = Security(security) | |
| # ): | |
| # start_time = datetime.utcnow() | |
| # try: | |
| # # ------------------------------------------------------------------ | |
| # # VALIDATION | |
| # # ------------------------------------------------------------------ | |
| # # -------------------------------------------------------------- | |
| # # BACKWARD COMPATIBILITY FOR OLD ANDROID VERSIONS | |
| # # -------------------------------------------------------------- | |
| # if target_category_id == "": | |
| # target_category_id = None | |
| # if new_category_id == "": | |
| # new_category_id = None | |
| # if user_id == "": | |
| # user_id = None | |
| # # media_clicks_collection = get_media_clicks_collection(appname) | |
| # media_clicks_collection, subcategories_collection = get_app_db_collections(appname) | |
| # logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # if target_category_id and new_category_id: | |
| # raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| # if not target_category_id and not new_category_id: | |
| # raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # # ------------------------------------------------------------------ | |
| # # READ SOURCE IMAGE | |
| # # ------------------------------------------------------------------ | |
| # src_bytes = await source.read() | |
| # src_key = f"faceswap/source/{uuid.uuid4().hex}_{source.filename}" | |
| # upload_to_spaces(src_bytes, src_key, content_type=source.content_type) | |
| # # ------------------------------------------------------------------ | |
| # # CASE 1 : new_category_id → MongoDB lookup | |
| # # ------------------------------------------------------------------ | |
| # if new_category_id: | |
| # # doc = await subcategories_col.find_one({ | |
| # # "asset_images._id": ObjectId(new_category_id) | |
| # # }) | |
| # doc = await subcategories_collection.find_one({ | |
| # "asset_images._id": ObjectId(new_category_id) | |
| # }) | |
| # if not doc: | |
| # raise HTTPException(404, "Asset image not found in database") | |
| # # extract correct asset | |
| # asset = next( | |
| # (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| # None | |
| # ) | |
| # if not asset: | |
| # raise HTTPException(404, "Asset image URL not found") | |
| # # correct URL | |
| # target_url = asset["url"] | |
| # # correct categoryId (ObjectId) | |
| # #category_oid = doc["categoryId"] # <-- DO NOT CONVERT TO STRING | |
| # subcategory_oid = doc["_id"] | |
| # # ------------------------------------------------------------------# | |
| # # # MEDIA_CLICKS (ONLY IF user_id PRESENT) | |
| # # ------------------------------------------------------------------# | |
| # if user_id and media_clicks_collection is not None: | |
| # try: | |
| # user_id_clean = user_id.strip() | |
| # if not user_id_clean: | |
| # raise ValueError("user_id cannot be empty") | |
| # try: | |
| # user_oid = ObjectId(user_id_clean) | |
| # except (InvalidId, ValueError) as e: | |
| # logger.error(f"Invalid user_id format: {user_id_clean}") | |
| # raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| # now = datetime.utcnow() | |
| # # Normalize dates (UTC midnight) | |
| # today_date = datetime(now.year, now.month, now.day) | |
| # # ------------------------------------------------- | |
| # # STEP 1: Ensure root document exists | |
| # # ------------------------------------------------- | |
| # await media_clicks_collection.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$setOnInsert": { | |
| # "userId": user_oid, | |
| # "createdAt": now, | |
| # "ai_edit_complete": 0, | |
| # "ai_edit_daily_count": [] | |
| # } | |
| # }, | |
| # upsert=True | |
| # ) | |
| # # ------------------------------------------------- | |
| # # STEP 2: Handle DAILY USAGE (BINARY, NO DUPLICATES) | |
| # # ------------------------------------------------- | |
| # doc = await media_clicks_collection.find_one( | |
| # {"userId": user_oid}, | |
| # {"ai_edit_daily_count": 1} | |
| # ) | |
| # daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # # Normalize today to UTC midnight | |
| # today_date = datetime(now.year, now.month, now.day) | |
| # # Build normalized date → count map (THIS ENFORCES UNIQUENESS) | |
| # daily_map = {} | |
| # for entry in daily_entries: | |
| # d = entry["date"] | |
| # if isinstance(d, datetime): | |
| # d = datetime(d.year, d.month, d.day) | |
| # daily_map[d] = entry["count"] # overwrite = no duplicates | |
| # # Determine last recorded date | |
| # last_date = max(daily_map.keys()) if daily_map else today_date | |
| # # Fill ALL missing days with count = 0 | |
| # next_day = last_date + timedelta(days=1) | |
| # while next_day < today_date: | |
| # daily_map.setdefault(next_day, 0) | |
| # next_day += timedelta(days=1) | |
| # # Mark today as used (binary) | |
| # daily_map[today_date] = 1 | |
| # # Rebuild list: OLDEST → NEWEST | |
| # final_daily_entries = [ | |
| # {"date": d, "count": daily_map[d]} | |
| # for d in sorted(daily_map.keys()) | |
| # ] | |
| # # Keep only last 32 days | |
| # final_daily_entries = final_daily_entries[-32:] | |
| # # Atomic replace | |
| # await media_clicks_collection.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$set": { | |
| # "ai_edit_daily_count": final_daily_entries, | |
| # "updatedAt": now | |
| # } | |
| # } | |
| # ) | |
| # # ------------------------------------------------- | |
| # # STEP 3: Try updating existing subCategory | |
| # # ------------------------------------------------- | |
| # update_result = await media_clicks_collection.update_one( | |
| # { | |
| # "userId": user_oid, | |
| # "subCategories.subCategoryId": subcategory_oid | |
| # }, | |
| # { | |
| # "$inc": { | |
| # "subCategories.$.click_count": 1, | |
| # "ai_edit_complete": 1 | |
| # }, | |
| # "$set": { | |
| # "subCategories.$.lastClickedAt": now, | |
| # "ai_edit_last_date": now, | |
| # "updatedAt": now | |
| # } | |
| # } | |
| # ) | |
| # # ------------------------------------------------- | |
| # # STEP 4: Push subCategory if missing | |
| # # ------------------------------------------------- | |
| # if update_result.matched_count == 0: | |
| # await media_clicks_collection.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$inc": { | |
| # "ai_edit_complete": 1 | |
| # }, | |
| # "$set": { | |
| # "ai_edit_last_date": now, | |
| # "updatedAt": now | |
| # }, | |
| # "$push": { | |
| # "subCategories": { | |
| # "subCategoryId": subcategory_oid, | |
| # "click_count": 1, | |
| # "lastClickedAt": now | |
| # } | |
| # } | |
| # } | |
| # ) | |
| # # ------------------------------------------------- | |
| # # STEP 5: Sort subCategories by lastClickedAt (ascending - oldest first) | |
| # # ------------------------------------------------- | |
| # user_doc = await media_clicks_collection.find_one({"userId": user_oid}) | |
| # if user_doc and "subCategories" in user_doc: | |
| # subcategories = user_doc["subCategories"] | |
| # # Sort by lastClickedAt in ascending order (oldest first) | |
| # # Handle missing or None dates by using datetime.min | |
| # subcategories_sorted = sorted( | |
| # subcategories, | |
| # key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| # ) | |
| # # Update with sorted array | |
| # await media_clicks_collection.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$set": { | |
| # "subCategories": subcategories_sorted, | |
| # "updatedAt": now | |
| # } | |
| # } | |
| # ) | |
| # logger.info( | |
| # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| # user_id, | |
| # str(subcategory_oid) | |
| # ) | |
| # except Exception as media_err: | |
| # logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # elif user_id and media_clicks_collection is None: | |
| # logger.warning("Media clicks collection unavailable; skipping media click tracking") | |
| # # # ------------------------------------------------------------------ | |
| # # # CASE 2 : target_category_id → DigitalOcean path (unchanged logic) | |
| # # # ------------------------------------------------------------------ | |
| # if target_category_id: | |
| # client = get_spaces_client() | |
| # base_prefix = "faceswap/target/" | |
| # resp = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| # ) | |
| # # Extract categories from the CommonPrefixes | |
| # categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| # target_url = None | |
| # # --- FIX STARTS HERE --- | |
| # for category in categories: | |
| # original_prefix = f"faceswap/target/{category}/original/" | |
| # thumb_prefix = f"faceswap/target/{category}/thumb/" # Keep for file list check (optional but safe) | |
| # # List objects in original/ | |
| # original_objects = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| # ).get("Contents", []) | |
| # # List objects in thumb/ (optional: for the old code's extra check) | |
| # thumb_objects = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| # ).get("Contents", []) | |
| # # Extract only the filenames and filter for .png | |
| # original_filenames = sorted([ | |
| # obj["Key"].split("/")[-1] for obj in original_objects | |
| # if obj["Key"].split("/")[-1].endswith(".png") | |
| # ]) | |
| # thumb_filenames = [ | |
| # obj["Key"].split("/")[-1] for obj in thumb_objects | |
| # ] | |
| # # Replicate the old indexing logic based on sorted filenames | |
| # for idx, filename in enumerate(original_filenames, start=1): | |
| # cid = f"{category.lower()}image_{idx}" | |
| # # Optional: Replicate the thumb file check for 100% parity | |
| # # if filename in thumb_filenames and cid == target_category_id: | |
| # # Simpler check just on the ID, assuming thumb files are present | |
| # if cid == target_category_id: | |
| # # Construct the final target URL using the full prefix and the filename | |
| # target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| # break | |
| # if target_url: | |
| # break | |
| # # --- FIX ENDS HERE --- | |
| # if not target_url: | |
| # raise HTTPException(404, "Target categoryId not found") | |
| # # # ------------------------------------------------------------------ | |
| # # # DOWNLOAD TARGET IMAGE | |
| # # # ------------------------------------------------------------------ | |
| # async with httpx.AsyncClient(timeout=30.0) as client: | |
| # response = await client.get(target_url) | |
| # response.raise_for_status() | |
| # tgt_bytes = response.content | |
| # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # if src_bgr is None or tgt_bgr is None: | |
| # raise HTTPException(400, "Invalid image data") | |
| # src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| # tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # # ------------------------------------------------------------------ | |
| # # FACE SWAP EXECUTION | |
| # # ------------------------------------------------------------------ | |
| # final_img, final_path, err = face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # # #--------------------Version 2.0 ----------------------------------------# | |
| # # final_img, final_path, err = enhanced_face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # # #--------------------Version 2.0 ----------------------------------------# | |
| # if err: | |
| # raise HTTPException(500, err) | |
| # with open(final_path, "rb") as f: | |
| # result_bytes = f.read() | |
| # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| # result_url = upload_to_spaces(result_bytes, result_key) | |
| # # ------------------------------------------------- | |
| # # COMPRESS IMAGE (2–3 MB target) | |
| # # ------------------------------------------------- | |
| # compressed_bytes = compress_image( | |
| # image_bytes=result_bytes, | |
| # max_size=(1280, 1280), | |
| # quality=72 | |
| # ) | |
| # compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| # compressed_url = upload_to_spaces( | |
| # compressed_bytes, | |
| # compressed_key, | |
| # content_type="image/jpeg" | |
| # ) | |
| # end_time = datetime.utcnow() | |
| # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # if database is not None: | |
| # log_entry = { | |
| # "endpoint": "/face-swap", | |
| # "status": "success", | |
| # "response_time_ms": response_time_ms, | |
| # "timestamp": end_time | |
| # } | |
| # if appname: | |
| # log_entry["appname"] = appname | |
| # await database.api_logs.insert_one(log_entry) | |
| # return { | |
| # "result_key": result_key, | |
| # "result_url": result_url, | |
| # "Compressed_Image_URL": compressed_url | |
| # } | |
| # except Exception as e: | |
| # end_time = datetime.utcnow() | |
| # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # if database is not None: | |
| # log_entry = { | |
| # "endpoint": "/face-swap", | |
| # "status": "fail", | |
| # "response_time_ms": response_time_ms, | |
| # "timestamp": end_time, | |
| # "error": str(e) | |
| # } | |
| # if appname: | |
| # log_entry["appname"] = appname | |
| # await database.api_logs.insert_one(log_entry) | |
| # raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| # @fastapi_app.get("/preview/{result_key:path}") | |
| # async def preview_result(result_key: str): | |
| # try: | |
| # img_bytes = download_from_spaces(result_key) | |
| # except Exception: | |
| # raise HTTPException(status_code=404, detail="Result not found") | |
| # return Response( | |
| # content=img_bytes, | |
| # media_type="image/png", | |
| # headers={"Content-Disposition": "inline; filename=result.png"} | |
| # ) | |
| # @fastapi_app.post("/multi-face-swap", dependencies=[Depends(verify_token)]) | |
| # async def multi_face_swap_api( | |
| # source_image: UploadFile = File(...), | |
| # target_image: UploadFile = File(...) | |
| # ): | |
| # start_time = datetime.utcnow() | |
| # try: | |
| # # ----------------------------- | |
| # # Read images | |
| # # ----------------------------- | |
| # src_bytes = await source_image.read() | |
| # tgt_bytes = await target_image.read() | |
| # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # if src_bgr is None or tgt_bgr is None: | |
| # raise HTTPException(400, "Invalid image data") | |
| # src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| # tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # # ----------------------------- | |
| # # Multi-face swap | |
| # # ----------------------------- | |
| # swapped_rgb = multi_face_swap(src_rgb, tgt_rgb) | |
| # # ----------------------------- | |
| # # 🔥 MANDATORY ENHANCEMENT | |
| # # ----------------------------- | |
| # final_rgb = mandatory_enhancement(swapped_rgb) | |
| # final_bgr = cv2.cvtColor(final_rgb, cv2.COLOR_RGB2BGR) | |
| # # ----------------------------- | |
| # # Save temp result | |
| # # ----------------------------- | |
| # temp_dir = tempfile.mkdtemp(prefix="multi_faceswap_") | |
| # result_path = os.path.join(temp_dir, "result.png") | |
| # cv2.imwrite(result_path, final_bgr) | |
| # with open(result_path, "rb") as f: | |
| # result_bytes = f.read() | |
| # # ----------------------------- | |
| # # Upload | |
| # # ----------------------------- | |
| # result_key = f"faceswap/multi/{uuid.uuid4().hex}.png" | |
| # result_url = upload_to_spaces( | |
| # result_bytes, | |
| # result_key, | |
| # content_type="image/png" | |
| # ) | |
| # return { | |
| # "result_key": result_key, | |
| # "result_url": result_url | |
| # } | |
| # except Exception as e: | |
| # raise HTTPException(status_code=500, detail=str(e)) | |
| # @fastapi_app.post("/face-swap-couple", dependencies=[Depends(verify_token)]) | |
| # async def face_swap_api( | |
| # image1: UploadFile = File(...), | |
| # image2: Optional[UploadFile] = File(None), | |
| # target_category_id: str = Form(None), | |
| # new_category_id: str = Form(None), | |
| # user_id: Optional[str] = Form(None), | |
| # appname: Optional[str] = Form(None), | |
| # credentials: HTTPAuthorizationCredentials = Security(security) | |
| # ): | |
| # """ | |
| # Production-ready face swap endpoint supporting: | |
| # - Multiple source images (image1 + optional image2) | |
| # - Gender-based pairing | |
| # - Merged faces from multiple sources | |
| # - Mandatory CodeFormer enhancement | |
| # """ | |
| # start_time = datetime.utcnow() | |
| # try: | |
| # # ----------------------------- | |
| # # Validate input | |
| # # ----------------------------- | |
| # if target_category_id == "": | |
| # target_category_id = None | |
| # if new_category_id == "": | |
| # new_category_id = None | |
| # if user_id == "": | |
| # user_id = None | |
| # media_clicks_collection = get_media_clicks_collection(appname) | |
| # if target_category_id and new_category_id: | |
| # raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| # if not target_category_id and not new_category_id: | |
| # raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # # ----------------------------- | |
| # # Read source images | |
| # # ----------------------------- | |
| # src_images = [] | |
| # img1_bytes = await image1.read() | |
| # src1 = cv2.imdecode(np.frombuffer(img1_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # if src1 is None: | |
| # raise HTTPException(400, "Invalid image1 data") | |
| # src_images.append(cv2.cvtColor(src1, cv2.COLOR_BGR2RGB)) | |
| # if image2: | |
| # img2_bytes = await image2.read() | |
| # src2 = cv2.imdecode(np.frombuffer(img2_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # if src2 is not None: | |
| # src_images.append(cv2.cvtColor(src2, cv2.COLOR_BGR2RGB)) | |
| # # ----------------------------- | |
| # # Resolve target image | |
| # # ----------------------------- | |
| # target_url = None | |
| # if new_category_id: | |
| # doc = await subcategories_col.find_one({ | |
| # "asset_images._id": ObjectId(new_category_id) | |
| # }) | |
| # if not doc: | |
| # raise HTTPException(404, "Asset image not found in database") | |
| # asset = next( | |
| # (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| # None | |
| # ) | |
| # if not asset: | |
| # raise HTTPException(404, "Asset image URL not found") | |
| # target_url = asset["url"] | |
| # subcategory_oid = doc["_id"] | |
| # if user_id and media_clicks_collection is not None: | |
| # try: | |
| # user_id_clean = user_id.strip() | |
| # if not user_id_clean: | |
| # raise ValueError("user_id cannot be empty") | |
| # try: | |
| # user_oid = ObjectId(user_id_clean) | |
| # except (InvalidId, ValueError): | |
| # logger.error(f"Invalid user_id format: {user_id_clean}") | |
| # raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| # now = datetime.utcnow() | |
| # # Step 1: ensure root document exists | |
| # await media_clicks_collection.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$setOnInsert": { | |
| # "userId": user_oid, | |
| # "createdAt": now, | |
| # "ai_edit_complete": 0, | |
| # "ai_edit_daily_count": [] | |
| # } | |
| # }, | |
| # upsert=True | |
| # ) | |
| # # Step 2: handle daily usage (binary, no duplicates) | |
| # doc = await media_clicks_collection.find_one( | |
| # {"userId": user_oid}, | |
| # {"ai_edit_daily_count": 1} | |
| # ) | |
| # daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # today_date = datetime(now.year, now.month, now.day) | |
| # daily_map = {} | |
| # for entry in daily_entries: | |
| # d = entry["date"] | |
| # if isinstance(d, datetime): | |
| # d = datetime(d.year, d.month, d.day) | |
| # daily_map[d] = entry["count"] | |
| # last_date = max(daily_map.keys()) if daily_map else None | |
| # if last_date != today_date: | |
| # daily_map[today_date] = 1 | |
| # final_daily_entries = [ | |
| # {"date": d, "count": daily_map[d]} | |
| # for d in sorted(daily_map.keys()) | |
| # ] | |
| # final_daily_entries = final_daily_entries[-32:] | |
| # await media_clicks_collection.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$set": { | |
| # "ai_edit_daily_count": final_daily_entries, | |
| # "updatedAt": now | |
| # } | |
| # } | |
| # ) | |
| # # Step 3: try updating existing subCategory | |
| # update_result = await media_clicks_collection.update_one( | |
| # { | |
| # "userId": user_oid, | |
| # "subCategories.subCategoryId": subcategory_oid | |
| # }, | |
| # { | |
| # "$inc": { | |
| # "subCategories.$.click_count": 1, | |
| # "ai_edit_complete": 1 | |
| # }, | |
| # "$set": { | |
| # "subCategories.$.lastClickedAt": now, | |
| # "ai_edit_last_date": now, | |
| # "updatedAt": now | |
| # } | |
| # } | |
| # ) | |
| # # Step 4: push subCategory if missing | |
| # if update_result.matched_count == 0: | |
| # await media_clicks_collection.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$inc": { | |
| # "ai_edit_complete": 1 | |
| # }, | |
| # "$set": { | |
| # "ai_edit_last_date": now, | |
| # "updatedAt": now | |
| # }, | |
| # "$push": { | |
| # "subCategories": { | |
| # "subCategoryId": subcategory_oid, | |
| # "click_count": 1, | |
| # "lastClickedAt": now | |
| # } | |
| # } | |
| # } | |
| # ) | |
| # # Step 5: sort subCategories by lastClickedAt (ascending) | |
| # user_doc = await media_clicks_collection.find_one({"userId": user_oid}) | |
| # if user_doc and "subCategories" in user_doc: | |
| # subcategories = user_doc["subCategories"] | |
| # subcategories_sorted = sorted( | |
| # subcategories, | |
| # key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| # ) | |
| # await media_clicks_collection.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$set": { | |
| # "subCategories": subcategories_sorted, | |
| # "updatedAt": now | |
| # } | |
| # } | |
| # ) | |
| # logger.info( | |
| # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| # user_id, | |
| # str(subcategory_oid) | |
| # ) | |
| # except Exception as media_err: | |
| # logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # elif user_id and media_clicks_collection is None: | |
| # logger.warning("Media clicks collection unavailable; skipping media click tracking") | |
| # if target_category_id: | |
| # client = get_spaces_client() | |
| # base_prefix = "faceswap/target/" | |
| # resp = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| # ) | |
| # categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| # for category in categories: | |
| # original_prefix = f"faceswap/target/{category}/original/" | |
| # thumb_prefix = f"faceswap/target/{category}/thumb/" | |
| # original_objects = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| # ).get("Contents", []) | |
| # thumb_objects = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| # ).get("Contents", []) | |
| # original_filenames = sorted([ | |
| # obj["Key"].split("/")[-1] for obj in original_objects | |
| # if obj["Key"].split("/")[-1].endswith(".png") | |
| # ]) | |
| # for idx, filename in enumerate(original_filenames, start=1): | |
| # cid = f"{category.lower()}image_{idx}" | |
| # if cid == target_category_id: | |
| # target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| # break | |
| # if target_url: | |
| # break | |
| # if not target_url: | |
| # raise HTTPException(404, "Target categoryId not found") | |
| # async with httpx.AsyncClient(timeout=30.0) as client: | |
| # response = await client.get(target_url) | |
| # response.raise_for_status() | |
| # tgt_bytes = response.content | |
| # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # if tgt_bgr is None: | |
| # raise HTTPException(400, "Invalid target image data") | |
| # # ----------------------------- | |
| # # Merge all source faces | |
| # # ----------------------------- | |
| # all_src_faces = [] | |
| # for img in src_images: | |
| # faces = face_analysis_app.get(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| # all_src_faces.extend(faces) | |
| # if not all_src_faces: | |
| # raise HTTPException(400, "No faces detected in source images") | |
| # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # if not tgt_faces: | |
| # raise HTTPException(400, "No faces detected in target image") | |
| # # ----------------------------- | |
| # # Gender-based pairing | |
| # # ----------------------------- | |
| # def face_sort_key(face): | |
| # x1, y1, x2, y2 = face.bbox | |
| # area = (x2 - x1) * (y2 - y1) | |
| # cx = (x1 + x2) / 2 | |
| # return (-area, cx) | |
| # # Separate by gender | |
| # src_male = sorted([f for f in all_src_faces if f.gender == 1], key=face_sort_key) | |
| # src_female = sorted([f for f in all_src_faces if f.gender == 0], key=face_sort_key) | |
| # tgt_male = sorted([f for f in tgt_faces if f.gender == 1], key=face_sort_key) | |
| # tgt_female = sorted([f for f in tgt_faces if f.gender == 0], key=face_sort_key) | |
| # pairs = [] | |
| # for s, t in zip(src_male, tgt_male): | |
| # pairs.append((s, t)) | |
| # for s, t in zip(src_female, tgt_female): | |
| # pairs.append((s, t)) | |
| # # fallback if gender mismatch | |
| # if not pairs: | |
| # src_all = sorted(all_src_faces, key=face_sort_key) | |
| # tgt_all = sorted(tgt_faces, key=face_sort_key) | |
| # pairs = list(zip(src_all, tgt_all)) | |
| # # ----------------------------- | |
| # # Perform face swap | |
| # # ----------------------------- | |
| # with swap_lock: | |
| # result_img = tgt_bgr.copy() | |
| # for src_face, _ in pairs: | |
| # if face_analysis_app is None: | |
| # raise HTTPException(status_code=500, detail="Face analysis models not initialized. Please ensure models are downloaded.") | |
| # current_faces = sorted(face_analysis_app.get(result_img), key=face_sort_key) | |
| # candidates = [f for f in current_faces if f.gender == src_face.gender] or current_faces | |
| # target_face = candidates[0] | |
| # if swapper is None: | |
| # raise HTTPException(status_code=500, detail="Face swap models not initialized. Please ensure models are downloaded.") | |
| # result_img = swapper.get(result_img, target_face, src_face, paste_back=True) | |
| # result_rgb = cv2.cvtColor(result_img, cv2.COLOR_BGR2RGB) | |
| # # ----------------------------- | |
| # # Mandatory enhancement | |
| # # ----------------------------- | |
| # enhanced_rgb = mandatory_enhancement(result_rgb) | |
| # enhanced_bgr = cv2.cvtColor(enhanced_rgb, cv2.COLOR_RGB2BGR) | |
| # # ----------------------------- | |
| # # Save, upload, compress | |
| # # ----------------------------- | |
| # temp_dir = tempfile.mkdtemp(prefix="faceswap_") | |
| # final_path = os.path.join(temp_dir, "result.png") | |
| # cv2.imwrite(final_path, enhanced_bgr) | |
| # with open(final_path, "rb") as f: | |
| # result_bytes = f.read() | |
| # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| # result_url = upload_to_spaces(result_bytes, result_key) | |
| # compressed_bytes = compress_image(result_bytes, max_size=(1280, 1280), quality=72) | |
| # compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| # compressed_url = upload_to_spaces(compressed_bytes, compressed_key, content_type="image/jpeg") | |
| # # ----------------------------- | |
| # # Log API usage | |
| # # ----------------------------- | |
| # end_time = datetime.utcnow() | |
| # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # if database is not None: | |
| # log_entry = { | |
| # "endpoint": "/face-swap-couple", | |
| # "status": "success", | |
| # "response_time_ms": response_time_ms, | |
| # "timestamp": end_time | |
| # } | |
| # if appname: | |
| # log_entry["appname"] = appname | |
| # await database.api_logs.insert_one(log_entry) | |
| # return { | |
| # "result_key": result_key, | |
| # "result_url": result_url, | |
| # "compressed_url": compressed_url | |
| # } | |
| # except Exception as e: | |
| # end_time = datetime.utcnow() | |
| # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # if database is not None: | |
| # log_entry = { | |
| # "endpoint": "/face-swap-couple", | |
| # "status": "fail", | |
| # "response_time_ms": response_time_ms, | |
| # "timestamp": end_time, | |
| # "error": str(e) | |
| # } | |
| # if appname: | |
| # log_entry["appname"] = appname | |
| # await database.api_logs.insert_one(log_entry) | |
| # raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| # # --------------------- Mount Gradio --------------------- | |
| # multi_faceswap_app = build_multi_faceswap_gradio() | |
| # fastapi_app = mount_gradio_app( | |
| # fastapi_app, | |
| # multi_faceswap_app, | |
| # path="/gradio-couple-faceswap" | |
| # ) | |
| # if __name__ == "__main__": | |
| # uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |