Spaces:
Running
on
T4
Running
on
T4
| # --------------------- List Images Endpoint --------------------- | |
| import os | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| import shutil | |
| import uuid | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| import subprocess | |
| import logging | |
| import tempfile | |
| import sys | |
| from datetime import datetime,timedelta | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| from huggingface_hub import hf_hub_download | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Depends, Security, Form | |
| from fastapi.responses import RedirectResponse | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| import httpx | |
| import uvicorn | |
| import gradio as gr | |
| from gradio import mount_gradio_app | |
| from PIL import Image | |
| import io | |
| # DigitalOcean Spaces | |
| import boto3 | |
| from botocore.client import Config | |
| from typing import Optional | |
| # --------------------- Logging --------------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --------------------- Secrets & Paths --------------------- | |
| REPO_ID = "HariLogicgo/face_swap_models" | |
| MODELS_DIR = "./models" | |
| os.makedirs(MODELS_DIR, exist_ok=True) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| DO_SPACES_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| DO_SPACES_ENDPOINT = f"https://{DO_SPACES_REGION}.digitaloceanspaces.com" | |
| DO_SPACES_KEY = os.getenv("DO_SPACES_KEY") | |
| DO_SPACES_SECRET = os.getenv("DO_SPACES_SECRET") | |
| DO_SPACES_BUCKET = os.getenv("DO_SPACES_BUCKET") | |
| # NEW admin DB | |
| ADMIN_MONGO_URL = os.getenv("ADMIN_MONGO_URL") | |
| admin_client = AsyncIOMotorClient(ADMIN_MONGO_URL) | |
| admin_db = admin_client.adminPanel | |
| subcategories_col = admin_db.subcategories | |
| media_clicks_col = admin_db.media_clicks | |
| # OLD logs DB | |
| MONGODB_URL = os.getenv("MONGODB_URL") | |
| client = None | |
| database = None | |
| # --------------------- Download Models --------------------- | |
| def download_models(): | |
| logger.info("Downloading models...") | |
| inswapper_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="models/inswapper_128.onnx", | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| buffalo_files = ["1k3d68.onnx", "2d106det.onnx", "genderage.onnx", "det_10g.onnx", "w600k_r50.onnx"] | |
| for f in buffalo_files: | |
| hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=f"models/buffalo_l/" + f, | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| logger.info("Models downloaded.") | |
| return inswapper_path | |
| inswapper_path = download_models() | |
| # --------------------- Face Analysis + Swapper --------------------- | |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| # --------------------- CodeFormer --------------------- | |
| CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| def ensure_codeformer(): | |
| if not os.path.exists("CodeFormer"): | |
| subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=True) | |
| subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True) | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=True) | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=True) | |
| ensure_codeformer() | |
| # --------------------- FastAPI --------------------- | |
| fastapi_app = FastAPI() | |
| async def startup_db(): | |
| global client, database | |
| logger.info("Initializing MongoDB for API logs...") | |
| client = AsyncIOMotorClient(MONGODB_URL) | |
| database = client.FaceSwap | |
| logger.info("MongoDB initialized for API logs") | |
| async def shutdown_db(): | |
| global client | |
| if client: | |
| client.close() | |
| logger.info("MongoDB connection closed") | |
| # --------------------- Auth --------------------- | |
| security = HTTPBearer() | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| if credentials.credentials != API_SECRET_TOKEN: | |
| raise HTTPException(status_code=401, detail="Invalid or missing token") | |
| return credentials.credentials | |
| # --------------------- Logging API Hits --------------------- | |
| async def log_faceswap_hit(token: str, status: str = "success"): | |
| global database | |
| if database is None: | |
| return | |
| await database.api_logs.insert_one({ | |
| "token": token, | |
| "endpoint": "/faceswap", | |
| "status": status, | |
| "timestamp": datetime.utcnow() | |
| }) | |
| # --------------------- Face Swap Pipeline --------------------- | |
| swap_lock = threading.Lock() | |
| def face_swap_and_enhance(src_img, tgt_img, temp_dir=None): | |
| try: | |
| with swap_lock: | |
| # Use a temp dir for intermediate files | |
| if temp_dir is None: | |
| temp_dir = os.path.join(tempfile.gettempdir(), f"faceswap_work_{uuid.uuid4().hex[:8]}") | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| src_faces = face_analysis_app.get(src_bgr) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| if not src_faces or not tgt_faces: | |
| return None, None, "❌ Face not detected in one of the images" | |
| swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg") | |
| swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| if swapped_bgr is None: | |
| return None, None, "❌ Face swap failed" | |
| cv2.imwrite(swapped_path, swapped_bgr) | |
| python_cmd = sys.executable if sys.executable else "python3" | |
| cmd = f"{python_cmd} {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample" | |
| result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| return None, None, f"❌ CodeFormer failed:\n{result.stderr}" | |
| final_results_dir = os.path.join(temp_dir, "final_results") | |
| final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")] | |
| if not final_files: | |
| return None, None, "❌ No enhanced image found" | |
| final_path = os.path.join(final_results_dir, final_files[0]) | |
| final_img_bgr = cv2.imread(final_path) | |
| if final_img_bgr is None: | |
| return None, None, "❌ Failed to read enhanced image file" | |
| final_img = cv2.cvtColor(final_img_bgr, cv2.COLOR_BGR2RGB) | |
| return final_img, final_path, "" | |
| except Exception as e: | |
| return None, None, f"❌ Error: {str(e)}" | |
| def compress_image( | |
| image_bytes: bytes, | |
| max_size=(1280, 1280), # max width/height | |
| quality=75 # JPEG quality (60–80 is ideal) | |
| ) -> bytes: | |
| """ | |
| Compress image by resizing and lowering quality. | |
| Returns compressed image bytes. | |
| """ | |
| img = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| # Resize while maintaining aspect ratio | |
| img.thumbnail(max_size, Image.LANCZOS) | |
| output = io.BytesIO() | |
| img.save( | |
| output, | |
| format="JPEG", | |
| quality=quality, | |
| optimize=True, | |
| progressive=True | |
| ) | |
| return output.getvalue() | |
| # --------------------- Gradio --------------------- | |
| # with gr.Blocks() as demo: | |
| # gr.Markdown("Face Swap") | |
| # with gr.Row(): | |
| # src_input = gr.Image(type="numpy", label="Upload Your Face") | |
| # tgt_input = gr.Image(type="numpy", label="Upload Target Image") | |
| # btn = gr.Button("Swap Face") | |
| # output_img = gr.Image(type="numpy", label="Enhanced Output") | |
| # download = gr.File(label="⬇️ Download Enhanced Image") | |
| # error_box = gr.Textbox(label="Logs / Errors", interactive=False) | |
| # def process(src, tgt): | |
| # img, path, err = face_swap_and_enhance(src, tgt) | |
| # return img, path, err | |
| # btn.click(process, [src_input, tgt_input], [output_img, download, error_box]) | |
| # --------------------- DigitalOcean Spaces Helper --------------------- | |
| def get_spaces_client(): | |
| session = boto3.session.Session() | |
| client = session.client( | |
| 's3', | |
| region_name=DO_SPACES_REGION, | |
| endpoint_url=DO_SPACES_ENDPOINT, | |
| aws_access_key_id=DO_SPACES_KEY, | |
| aws_secret_access_key=DO_SPACES_SECRET, | |
| config=Config(signature_version='s3v4') | |
| ) | |
| return client | |
| def upload_to_spaces(file_bytes, key, content_type="image/png"): | |
| client = get_spaces_client() | |
| client.put_object(Bucket=DO_SPACES_BUCKET, Key=key, Body=file_bytes, ContentType=content_type, ACL='public-read') | |
| return f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{key}" | |
| def download_from_spaces(key): | |
| client = get_spaces_client() | |
| obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key) | |
| return obj['Body'].read() | |
| # --------------------- API Endpoints --------------------- | |
| def root(): | |
| return {"status": "healthy"} | |
| async def health(): | |
| return {"status": "healthy"} | |
| from fastapi import Form | |
| import requests | |
| async def test_admin_db(): | |
| try: | |
| doc = await admin_db.list_collection_names() | |
| return {"ok": True, "collections": doc} | |
| except Exception as e: | |
| return {"ok": False, "error": str(e), "url": ADMIN_MONGO_URL} | |
| async def face_swap_api( | |
| source: UploadFile = File(...), | |
| target_category_id: str = Form(None), | |
| new_category_id: str = Form(None), | |
| user_id: Optional[str] = Form(None), | |
| credentials: HTTPAuthorizationCredentials = Security(security) | |
| ): | |
| start_time = datetime.utcnow() | |
| try: | |
| # ------------------------------------------------------------------ | |
| # VALIDATION | |
| # ------------------------------------------------------------------ | |
| # -------------------------------------------------------------- | |
| # BACKWARD COMPATIBILITY FOR OLD ANDROID VERSIONS | |
| # -------------------------------------------------------------- | |
| if target_category_id == "": | |
| target_category_id = None | |
| if new_category_id == "": | |
| new_category_id = None | |
| if user_id == "": | |
| user_id = None | |
| logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| if target_category_id and new_category_id: | |
| raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| if not target_category_id and not new_category_id: | |
| raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # ------------------------------------------------------------------ | |
| # READ SOURCE IMAGE | |
| # ------------------------------------------------------------------ | |
| src_bytes = await source.read() | |
| src_key = f"faceswap/source/{uuid.uuid4().hex}_{source.filename}" | |
| upload_to_spaces(src_bytes, src_key, content_type=source.content_type) | |
| # ------------------------------------------------------------------ | |
| # CASE 1 : new_category_id → MongoDB lookup | |
| # ------------------------------------------------------------------ | |
| if new_category_id: | |
| doc = await subcategories_col.find_one({ | |
| "asset_images._id": ObjectId(new_category_id) | |
| }) | |
| if not doc: | |
| raise HTTPException(404, "Asset image not found in database") | |
| # extract correct asset | |
| asset = next( | |
| (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| None | |
| ) | |
| if not asset: | |
| raise HTTPException(404, "Asset image URL not found") | |
| # correct URL | |
| target_url = asset["url"] | |
| # correct categoryId (ObjectId) | |
| #category_oid = doc["categoryId"] # <-- DO NOT CONVERT TO STRING | |
| subcategory_oid = doc["_id"] | |
| # ------------------------------------------------------------------# | |
| # # MEDIA_CLICKS (ONLY IF user_id PRESENT) | |
| # ------------------------------------------------------------------# | |
| if user_id: | |
| try: | |
| user_id_clean = user_id.strip() | |
| if not user_id_clean: | |
| raise ValueError("user_id cannot be empty") | |
| try: | |
| user_oid = ObjectId(user_id_clean) | |
| except (InvalidId, ValueError) as e: | |
| logger.error(f"Invalid user_id format: {user_id_clean}") | |
| raise ValueError(f"Invalid user_id format: {user_id_clean}") | |
| now = datetime.utcnow() | |
| # Normalize dates (UTC midnight) | |
| today_date = datetime(now.year, now.month, now.day) | |
| # ------------------------------------------------- | |
| # STEP 1: Ensure root document exists | |
| # ------------------------------------------------- | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$setOnInsert": { | |
| "userId": user_oid, | |
| "createdAt": now, | |
| "ai_edit_complete": 0, | |
| "ai_edit_daily_count": [] | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 2: Handle DAILY USAGE (BINARY, NO DUPLICATES) | |
| # ------------------------------------------------- | |
| doc = await media_clicks_col.find_one( | |
| {"userId": user_oid}, | |
| {"ai_edit_daily_count": 1} | |
| ) | |
| daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # Normalize today to UTC midnight | |
| today_date = datetime(now.year, now.month, now.day) | |
| # Build normalized date → count map (THIS ENFORCES UNIQUENESS) | |
| daily_map = {} | |
| for entry in daily_entries: | |
| d = entry["date"] | |
| if isinstance(d, datetime): | |
| d = datetime(d.year, d.month, d.day) | |
| daily_map[d] = entry["count"] # overwrite = no duplicates | |
| # Determine last recorded date | |
| last_date = max(daily_map.keys()) if daily_map else today_date | |
| # Fill ALL missing days with count = 0 | |
| next_day = last_date + timedelta(days=1) | |
| while next_day < today_date: | |
| daily_map.setdefault(next_day, 0) | |
| next_day += timedelta(days=1) | |
| # Mark today as used (binary) | |
| daily_map[today_date] = 1 | |
| # Rebuild list: OLDEST → NEWEST | |
| final_daily_entries = [ | |
| {"date": d, "count": daily_map[d]} | |
| for d in sorted(daily_map.keys()) | |
| ] | |
| # Keep only last 32 days | |
| final_daily_entries = final_daily_entries[-32:] | |
| # Atomic replace | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "ai_edit_daily_count": final_daily_entries, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 3: Try updating existing subCategory | |
| # ------------------------------------------------- | |
| update_result = await media_clicks_col.update_one( | |
| { | |
| "userId": user_oid, | |
| "subCategories.subCategoryId": subcategory_oid | |
| }, | |
| { | |
| "$inc": { | |
| "subCategories.$.click_count": 1, | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "subCategories.$.lastClickedAt": now, | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 4: Push subCategory if missing | |
| # ------------------------------------------------- | |
| if update_result.matched_count == 0: | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$inc": { | |
| "ai_edit_complete": 1 | |
| }, | |
| "$set": { | |
| "ai_edit_last_date": now, | |
| "updatedAt": now | |
| }, | |
| "$push": { | |
| "subCategories": { | |
| "subCategoryId": subcategory_oid, | |
| "click_count": 1, | |
| "lastClickedAt": now | |
| } | |
| } | |
| } | |
| ) | |
| # ------------------------------------------------- | |
| # STEP 5: Sort subCategories by lastClickedAt (ascending - oldest first) | |
| # ------------------------------------------------- | |
| user_doc = await media_clicks_col.find_one({"userId": user_oid}) | |
| if user_doc and "subCategories" in user_doc: | |
| subcategories = user_doc["subCategories"] | |
| # Sort by lastClickedAt in ascending order (oldest first) | |
| # Handle missing or None dates by using datetime.min | |
| subcategories_sorted = sorted( | |
| subcategories, | |
| key=lambda x: x.get("lastClickedAt") if x.get("lastClickedAt") is not None else datetime.min | |
| ) | |
| # Update with sorted array | |
| await media_clicks_col.update_one( | |
| {"userId": user_oid}, | |
| { | |
| "$set": { | |
| "subCategories": subcategories_sorted, | |
| "updatedAt": now | |
| } | |
| } | |
| ) | |
| logger.info( | |
| "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| user_id, | |
| str(subcategory_oid) | |
| ) | |
| except Exception as media_err: | |
| logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # # ------------------------------------------------------------------ | |
| # # CASE 2 : target_category_id → DigitalOcean path (unchanged logic) | |
| # # ------------------------------------------------------------------ | |
| if target_category_id: | |
| client = get_spaces_client() | |
| base_prefix = "faceswap/target/" | |
| resp = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| ) | |
| # Extract categories from the CommonPrefixes | |
| categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| target_url = None | |
| # --- FIX STARTS HERE --- | |
| for category in categories: | |
| original_prefix = f"faceswap/target/{category}/original/" | |
| thumb_prefix = f"faceswap/target/{category}/thumb/" # Keep for file list check (optional but safe) | |
| # List objects in original/ | |
| original_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| ).get("Contents", []) | |
| # List objects in thumb/ (optional: for the old code's extra check) | |
| thumb_objects = client.list_objects_v2( | |
| Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| ).get("Contents", []) | |
| # Extract only the filenames and filter for .png | |
| original_filenames = sorted([ | |
| obj["Key"].split("/")[-1] for obj in original_objects | |
| if obj["Key"].split("/")[-1].endswith(".png") | |
| ]) | |
| thumb_filenames = [ | |
| obj["Key"].split("/")[-1] for obj in thumb_objects | |
| ] | |
| # Replicate the old indexing logic based on sorted filenames | |
| for idx, filename in enumerate(original_filenames, start=1): | |
| cid = f"{category.lower()}image_{idx}" | |
| # Optional: Replicate the thumb file check for 100% parity | |
| # if filename in thumb_filenames and cid == target_category_id: | |
| # Simpler check just on the ID, assuming thumb files are present | |
| if cid == target_category_id: | |
| # Construct the final target URL using the full prefix and the filename | |
| target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| break | |
| if target_url: | |
| break | |
| # --- FIX ENDS HERE --- | |
| if not target_url: | |
| raise HTTPException(404, "Target categoryId not found") | |
| # # ------------------------------------------------------------------ | |
| # # DOWNLOAD TARGET IMAGE | |
| # # ------------------------------------------------------------------ | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.get(target_url) | |
| response.raise_for_status() | |
| tgt_bytes = response.content | |
| src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if src_bgr is None or tgt_bgr is None: | |
| raise HTTPException(400, "Invalid image data") | |
| src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # ------------------------------------------------------------------ | |
| # FACE SWAP EXECUTION | |
| # ------------------------------------------------------------------ | |
| final_img, final_path, err = face_swap_and_enhance(src_rgb, tgt_rgb) | |
| if err: | |
| raise HTTPException(500, err) | |
| with open(final_path, "rb") as f: | |
| result_bytes = f.read() | |
| result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| result_url = upload_to_spaces(result_bytes, result_key) | |
| # ------------------------------------------------- | |
| # COMPRESS IMAGE (2–3 MB target) | |
| # ------------------------------------------------- | |
| compressed_bytes = compress_image( | |
| image_bytes=result_bytes, | |
| max_size=(1280, 1280), | |
| quality=72 | |
| ) | |
| compressed_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced_compressed.jpg" | |
| compressed_url = upload_to_spaces( | |
| compressed_bytes, | |
| compressed_key, | |
| content_type="image/jpeg" | |
| ) | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| await database.api_logs.insert_one({ | |
| "endpoint": "/face-swap", | |
| "status": "success", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time | |
| }) | |
| return { | |
| "result_key": result_key, | |
| "result_url": result_url, | |
| "Compressed_Image_URL": compressed_url | |
| } | |
| except Exception as e: | |
| end_time = datetime.utcnow() | |
| response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| if database is not None: | |
| await database.api_logs.insert_one({ | |
| "endpoint": "/face-swap", | |
| "status": "fail", | |
| "response_time_ms": response_time_ms, | |
| "timestamp": end_time, | |
| "error": str(e) | |
| }) | |
| raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| async def preview_result(result_key: str): | |
| try: | |
| img_bytes = download_from_spaces(result_key) | |
| except Exception: | |
| raise HTTPException(status_code=404, detail="Result not found") | |
| return Response( | |
| content=img_bytes, | |
| media_type="image/png", | |
| headers={"Content-Disposition": "inline; filename=result.png"} | |
| ) | |
| # --------------------- Mount Gradio --------------------- | |
| if __name__ == "__main__": | |
| uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |
| # # --------------------- List Images Endpoint --------------------- | |
| # import os | |
| # os.environ["OMP_NUM_THREADS"] = "1" | |
| # import shutil | |
| # import uuid | |
| # import cv2 | |
| # import numpy as np | |
| # import threading | |
| # import subprocess | |
| # import logging | |
| # from datetime import datetime,timedelta | |
| # import insightface | |
| # from insightface.app import FaceAnalysis | |
| # from huggingface_hub import hf_hub_download | |
| # from fastapi import FastAPI, UploadFile, File, HTTPException, Response, Depends, Security, Form | |
| # from fastapi.responses import RedirectResponse | |
| # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| # from motor.motor_asyncio import AsyncIOMotorClient | |
| # from bson import ObjectId | |
| # import requests | |
| # import uvicorn | |
| # import gradio as gr | |
| # from gradio import mount_gradio_app | |
| # # DigitalOcean Spaces | |
| # import boto3 | |
| # from botocore.client import Config | |
| # from typing import Optional | |
| # # --------------------- Logging --------------------- | |
| # logging.basicConfig(level=logging.INFO) | |
| # logger = logging.getLogger(__name__) | |
| # # --------------------- Secrets & Paths --------------------- | |
| # REPO_ID = "HariLogicgo/face_swap_models" | |
| # MODELS_DIR = "./models" | |
| # os.makedirs(MODELS_DIR, exist_ok=True) | |
| # HF_TOKEN = os.getenv("HF_TOKEN") | |
| # API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| # DO_SPACES_REGION = os.getenv("DO_SPACES_REGION", "blr1") | |
| # DO_SPACES_ENDPOINT = f"https://{DO_SPACES_REGION}.digitaloceanspaces.com" | |
| # DO_SPACES_KEY = os.getenv("DO_SPACES_KEY") | |
| # DO_SPACES_SECRET = os.getenv("DO_SPACES_SECRET") | |
| # DO_SPACES_BUCKET = os.getenv("DO_SPACES_BUCKET") | |
| # # NEW admin DB | |
| # ADMIN_MONGO_URL = os.getenv("ADMIN_MONGO_URL") | |
| # admin_client = AsyncIOMotorClient(ADMIN_MONGO_URL) | |
| # admin_db = admin_client.adminPanel | |
| # subcategories_col = admin_db.subcategories | |
| # media_clicks_col = admin_db.media_clicks | |
| # # OLD logs DB | |
| # MONGODB_URL = os.getenv("MONGODB_URL") | |
| # client = None | |
| # database = None | |
| # # --------------------- Download Models --------------------- | |
| # def download_models(): | |
| # logger.info("Downloading models...") | |
| # inswapper_path = hf_hub_download( | |
| # repo_id=REPO_ID, | |
| # filename="models/inswapper_128.onnx", | |
| # repo_type="model", | |
| # local_dir=MODELS_DIR, | |
| # token=HF_TOKEN | |
| # ) | |
| # buffalo_files = ["1k3d68.onnx", "2d106det.onnx", "genderage.onnx", "det_10g.onnx", "w600k_r50.onnx"] | |
| # for f in buffalo_files: | |
| # hf_hub_download( | |
| # repo_id=REPO_ID, | |
| # filename=f"models/buffalo_l/" + f, | |
| # repo_type="model", | |
| # local_dir=MODELS_DIR, | |
| # token=HF_TOKEN | |
| # ) | |
| # logger.info("Models downloaded.") | |
| # return inswapper_path | |
| # inswapper_path = download_models() | |
| # # --------------------- Face Analysis + Swapper --------------------- | |
| # providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| # face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| # face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| # swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| # # --------------------- CodeFormer --------------------- | |
| # CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| # def ensure_codeformer(): | |
| # if not os.path.exists("CodeFormer"): | |
| # subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| # subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=True) | |
| # subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True) | |
| # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=True) | |
| # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=True) | |
| # ensure_codeformer() | |
| # # --------------------- FastAPI --------------------- | |
| # fastapi_app = FastAPI() | |
| # @fastapi_app.on_event("startup") | |
| # async def startup_db(): | |
| # global client, database | |
| # logger.info("Initializing MongoDB for API logs...") | |
| # client = AsyncIOMotorClient(MONGODB_URL) | |
| # database = client.FaceSwap | |
| # logger.info("MongoDB initialized for API logs") | |
| # @fastapi_app.on_event("shutdown") | |
| # async def shutdown_db(): | |
| # global client | |
| # if client: | |
| # client.close() | |
| # logger.info("MongoDB connection closed") | |
| # # --------------------- Auth --------------------- | |
| # security = HTTPBearer() | |
| # def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| # if credentials.credentials != API_SECRET_TOKEN: | |
| # raise HTTPException(status_code=401, detail="Invalid or missing token") | |
| # return credentials.credentials | |
| # # --------------------- Logging API Hits --------------------- | |
| # async def log_faceswap_hit(token: str, status: str = "success"): | |
| # global database | |
| # if database is None: | |
| # return | |
| # await database.api_logs.insert_one({ | |
| # "token": token, | |
| # "endpoint": "/faceswap", | |
| # "status": status, | |
| # "timestamp": datetime.utcnow() | |
| # }) | |
| # # --------------------- Face Swap Pipeline --------------------- | |
| # swap_lock = threading.Lock() | |
| # def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap_work"): | |
| # try: | |
| # with swap_lock: | |
| # # Use a temp dir for intermediate files | |
| # if os.path.exists(temp_dir): | |
| # shutil.rmtree(temp_dir) | |
| # os.makedirs(temp_dir, exist_ok=True) | |
| # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # src_faces = face_analysis_app.get(src_bgr) | |
| # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # if not src_faces or not tgt_faces: | |
| # return None, None, "❌ Face not detected in one of the images" | |
| # swapped_path = os.path.join(temp_dir, f"swapped_{uuid.uuid4().hex[:8]}.jpg") | |
| # swapped_bgr = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| # if swapped_bgr is None: | |
| # return None, None, "❌ Face swap failed" | |
| # cv2.imwrite(swapped_path, swapped_bgr) | |
| # cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --bg_upsampler realesrgan --face_upsample" | |
| # result = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| # if result.returncode != 0: | |
| # return None, None, f"❌ CodeFormer failed:\n{result.stderr}" | |
| # final_results_dir = os.path.join(temp_dir, "final_results") | |
| # final_files = [f for f in os.listdir(final_results_dir) if f.endswith(".png")] | |
| # if not final_files: | |
| # return None, None, "❌ No enhanced image found" | |
| # final_path = os.path.join(final_results_dir, final_files[0]) | |
| # final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB) | |
| # return final_img, final_path, "" | |
| # except Exception as e: | |
| # return None, None, f"❌ Error: {str(e)}" | |
| # # --------------------- Gradio --------------------- | |
| # with gr.Blocks() as demo: | |
| # gr.Markdown("Face Swap") | |
| # with gr.Row(): | |
| # src_input = gr.Image(type="numpy", label="Upload Your Face") | |
| # tgt_input = gr.Image(type="numpy", label="Upload Target Image") | |
| # btn = gr.Button("Swap Face") | |
| # output_img = gr.Image(type="numpy", label="Enhanced Output") | |
| # download = gr.File(label="⬇️ Download Enhanced Image") | |
| # error_box = gr.Textbox(label="Logs / Errors", interactive=False) | |
| # def process(src, tgt): | |
| # img, path, err = face_swap_and_enhance(src, tgt) | |
| # return img, path, err | |
| # btn.click(process, [src_input, tgt_input], [output_img, download, error_box]) | |
| # # --------------------- DigitalOcean Spaces Helper --------------------- | |
| # def get_spaces_client(): | |
| # session = boto3.session.Session() | |
| # client = session.client( | |
| # 's3', | |
| # region_name=DO_SPACES_REGION, | |
| # endpoint_url=DO_SPACES_ENDPOINT, | |
| # aws_access_key_id=DO_SPACES_KEY, | |
| # aws_secret_access_key=DO_SPACES_SECRET, | |
| # config=Config(signature_version='s3v4') | |
| # ) | |
| # return client | |
| # def upload_to_spaces(file_bytes, key, content_type="image/png"): | |
| # client = get_spaces_client() | |
| # client.put_object(Bucket=DO_SPACES_BUCKET, Key=key, Body=file_bytes, ContentType=content_type, ACL='public-read') | |
| # return f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{key}" | |
| # def download_from_spaces(key): | |
| # client = get_spaces_client() | |
| # obj = client.get_object(Bucket=DO_SPACES_BUCKET, Key=key) | |
| # return obj['Body'].read() | |
| # # --------------------- API Endpoints --------------------- | |
| # @fastapi_app.get("/") | |
| # def root(): | |
| # return RedirectResponse("/gradio") | |
| # @fastapi_app.get("/health") | |
| # async def health(): | |
| # return {"status": "healthy"} | |
| # from fastapi import Form | |
| # import requests | |
| # @fastapi_app.get("/test-admin-db") | |
| # async def test_admin_db(): | |
| # try: | |
| # doc = await admin_db.list_collection_names() | |
| # return {"ok": True, "collections": doc} | |
| # except Exception as e: | |
| # return {"ok": False, "error": str(e), "url": ADMIN_MONGO_URL} | |
| # @fastapi_app.post("/face-swap", dependencies=[Depends(verify_token)]) | |
| # async def face_swap_api( | |
| # source: UploadFile = File(...), | |
| # target_category_id: str = Form(None), | |
| # new_category_id: str = Form(None), | |
| # user_id: Optional[str] = Form(None), | |
| # credentials: HTTPAuthorizationCredentials = Security(security) | |
| # ): | |
| # start_time = datetime.utcnow() | |
| # try: | |
| # # ------------------------------------------------------------------ | |
| # # VALIDATION | |
| # # ------------------------------------------------------------------ | |
| # # -------------------------------------------------------------- | |
| # # BACKWARD COMPATIBILITY FOR OLD ANDROID VERSIONS | |
| # # -------------------------------------------------------------- | |
| # if target_category_id == "": | |
| # target_category_id = None | |
| # if new_category_id == "": | |
| # new_category_id = None | |
| # if user_id == "": | |
| # user_id = None | |
| # logger.info(f"[FaceSwap] Incoming request → target_category_id={target_category_id}, new_category_id={new_category_id}, user_id={user_id}") | |
| # if target_category_id and new_category_id: | |
| # raise HTTPException(400, "Provide only one of new_category_id or target_category_id.") | |
| # if not target_category_id and not new_category_id: | |
| # raise HTTPException(400, "Either new_category_id or target_category_id is required.") | |
| # # ------------------------------------------------------------------ | |
| # # READ SOURCE IMAGE | |
| # # ------------------------------------------------------------------ | |
| # src_bytes = await source.read() | |
| # src_key = f"faceswap/source/{uuid.uuid4().hex}_{source.filename}" | |
| # upload_to_spaces(src_bytes, src_key, content_type=source.content_type) | |
| # # ------------------------------------------------------------------ | |
| # # CASE 1 : new_category_id → MongoDB lookup | |
| # # ------------------------------------------------------------------ | |
| # if new_category_id: | |
| # doc = await subcategories_col.find_one({ | |
| # "asset_images._id": ObjectId(new_category_id) | |
| # }) | |
| # if not doc: | |
| # raise HTTPException(404, "Asset image not found in database") | |
| # # extract correct asset | |
| # asset = next( | |
| # (img for img in doc["asset_images"] if str(img["_id"]) == new_category_id), | |
| # None | |
| # ) | |
| # if not asset: | |
| # raise HTTPException(404, "Asset image URL not found") | |
| # # correct URL | |
| # target_url = asset["url"] | |
| # # correct categoryId (ObjectId) | |
| # #category_oid = doc["categoryId"] # <-- DO NOT CONVERT TO STRING | |
| # subcategory_oid = doc["_id"] | |
| # # ------------------------------------------------------------------# | |
| # # # MEDIA_CLICKS (ONLY IF user_id PRESENT) | |
| # # ------------------------------------------------------------------# | |
| # if user_id: | |
| # try: | |
| # user_oid = ObjectId(user_id.strip()) | |
| # now = datetime.utcnow() | |
| # # Normalize dates (UTC midnight) | |
| # today_date = datetime(now.year, now.month, now.day) | |
| # yesterday_date = today_date - timedelta(days=1) | |
| # # ------------------------------------------------- | |
| # # STEP 1: Ensure root document exists | |
| # # ------------------------------------------------- | |
| # await media_clicks_col.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$setOnInsert": { | |
| # "userId": user_oid, | |
| # "createdAt": now, | |
| # "ai_edit_complete": 0, | |
| # "ai_edit_daily_count": [] | |
| # } | |
| # }, | |
| # upsert=True | |
| # ) | |
| # # ------------------------------------------------- | |
| # # STEP 2: Handle DAILY USAGE (BINARY, NO DUPLICATES) | |
| # # ------------------------------------------------- | |
| # doc = await media_clicks_col.find_one( | |
| # {"userId": user_oid}, | |
| # {"ai_edit_daily_count": 1} | |
| # ) | |
| # daily_entries = doc.get("ai_edit_daily_count", []) if doc else [] | |
| # # Normalize today to UTC midnight | |
| # today_date = datetime(now.year, now.month, now.day) | |
| # # Build normalized date → count map (THIS ENFORCES UNIQUENESS) | |
| # daily_map = {} | |
| # for entry in daily_entries: | |
| # d = entry["date"] | |
| # if isinstance(d, datetime): | |
| # d = datetime(d.year, d.month, d.day) | |
| # daily_map[d] = entry["count"] # overwrite = no duplicates | |
| # # Determine last recorded date | |
| # last_date = max(daily_map.keys()) if daily_map else today_date | |
| # # Fill ALL missing days with count = 0 | |
| # next_day = last_date + timedelta(days=1) | |
| # while next_day < today_date: | |
| # daily_map.setdefault(next_day, 0) | |
| # next_day += timedelta(days=1) | |
| # # Mark today as used (binary) | |
| # daily_map[today_date] = 1 | |
| # # Rebuild list: OLDEST → NEWEST | |
| # final_daily_entries = [ | |
| # {"date": d, "count": daily_map[d]} | |
| # for d in sorted(daily_map.keys()) | |
| # ] | |
| # # Keep only last 32 days | |
| # final_daily_entries = final_daily_entries[-32:] | |
| # # Atomic replace | |
| # await media_clicks_col.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$set": { | |
| # "ai_edit_daily_count": final_daily_entries, | |
| # "updatedAt": now | |
| # } | |
| # } | |
| # ) | |
| # # ------------------------------------------------- | |
| # # STEP 3: Try updating existing subCategory | |
| # # ------------------------------------------------- | |
| # update_result = await media_clicks_col.update_one( | |
| # { | |
| # "userId": user_oid, | |
| # "subCategories.subCategoryId": subcategory_oid | |
| # }, | |
| # { | |
| # "$inc": { | |
| # "subCategories.$.click_count": 1, | |
| # "ai_edit_complete": 1 | |
| # }, | |
| # "$set": { | |
| # "subCategories.$.lastClickedAt": now, | |
| # "ai_edit_last_date": now, | |
| # "updatedAt": now | |
| # } | |
| # } | |
| # ) | |
| # # ------------------------------------------------- | |
| # # STEP 4: Push subCategory if missing | |
| # # ------------------------------------------------- | |
| # if update_result.matched_count == 0: | |
| # await media_clicks_col.update_one( | |
| # {"userId": user_oid}, | |
| # { | |
| # "$inc": { | |
| # "ai_edit_complete": 1 | |
| # }, | |
| # "$set": { | |
| # "ai_edit_last_date": now, | |
| # "updatedAt": now | |
| # }, | |
| # "$push": { | |
| # "subCategories": { | |
| # "subCategoryId": subcategory_oid, | |
| # "click_count": 1, | |
| # "lastClickedAt": now | |
| # } | |
| # } | |
| # } | |
| # ) | |
| # logger.info( | |
| # "[MEDIA_CLICK] user=%s subCategory=%s ai_edit_complete++ daily_tracked", | |
| # user_id, | |
| # str(subcategory_oid) | |
| # ) | |
| # except Exception as media_err: | |
| # logger.error(f"MEDIA_CLICK ERROR: {media_err}") | |
| # # # ------------------------------------------------------------------ | |
| # # # CASE 2 : target_category_id → DigitalOcean path (unchanged logic) | |
| # # # ------------------------------------------------------------------ | |
| # if target_category_id: | |
| # client = get_spaces_client() | |
| # base_prefix = "faceswap/target/" | |
| # resp = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=base_prefix, Delimiter="/" | |
| # ) | |
| # # Extract categories from the CommonPrefixes | |
| # categories = [p["Prefix"].split("/")[2] for p in resp.get("CommonPrefixes", [])] | |
| # target_url = None | |
| # # --- FIX STARTS HERE --- | |
| # for category in categories: | |
| # original_prefix = f"faceswap/target/{category}/original/" | |
| # thumb_prefix = f"faceswap/target/{category}/thumb/" # Keep for file list check (optional but safe) | |
| # # List objects in original/ | |
| # original_objects = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=original_prefix | |
| # ).get("Contents", []) | |
| # # List objects in thumb/ (optional: for the old code's extra check) | |
| # thumb_objects = client.list_objects_v2( | |
| # Bucket=DO_SPACES_BUCKET, Prefix=thumb_prefix | |
| # ).get("Contents", []) | |
| # # Extract only the filenames and filter for .png | |
| # original_filenames = sorted([ | |
| # obj["Key"].split("/")[-1] for obj in original_objects | |
| # if obj["Key"].split("/")[-1].endswith(".png") | |
| # ]) | |
| # thumb_filenames = [ | |
| # obj["Key"].split("/")[-1] for obj in thumb_objects | |
| # ] | |
| # # Replicate the old indexing logic based on sorted filenames | |
| # for idx, filename in enumerate(original_filenames, start=1): | |
| # cid = f"{category.lower()}image_{idx}" | |
| # # Optional: Replicate the thumb file check for 100% parity | |
| # # if filename in thumb_filenames and cid == target_category_id: | |
| # # Simpler check just on the ID, assuming thumb files are present | |
| # if cid == target_category_id: | |
| # # Construct the final target URL using the full prefix and the filename | |
| # target_url = f"{DO_SPACES_ENDPOINT}/{DO_SPACES_BUCKET}/{original_prefix}{filename}" | |
| # break | |
| # if target_url: | |
| # break | |
| # # --- FIX ENDS HERE --- | |
| # if not target_url: | |
| # raise HTTPException(404, "Target categoryId not found") | |
| # # # ------------------------------------------------------------------ | |
| # # # DOWNLOAD TARGET IMAGE | |
| # # # ------------------------------------------------------------------ | |
| # tgt_bytes = requests.get(target_url).content | |
| # src_bgr = cv2.imdecode(np.frombuffer(src_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # tgt_bgr = cv2.imdecode(np.frombuffer(tgt_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| # if src_bgr is None or tgt_bgr is None: | |
| # raise HTTPException(400, "Invalid image data") | |
| # src_rgb = cv2.cvtColor(src_bgr, cv2.COLOR_BGR2RGB) | |
| # tgt_rgb = cv2.cvtColor(tgt_bgr, cv2.COLOR_BGR2RGB) | |
| # # ------------------------------------------------------------------ | |
| # # FACE SWAP EXECUTION | |
| # # ------------------------------------------------------------------ | |
| # final_img, final_path, err = face_swap_and_enhance(src_rgb, tgt_rgb) | |
| # if err: | |
| # raise HTTPException(500, err) | |
| # with open(final_path, "rb") as f: | |
| # result_bytes = f.read() | |
| # result_key = f"faceswap/result/{uuid.uuid4().hex}_enhanced.png" | |
| # result_url = upload_to_spaces(result_bytes, result_key) | |
| # end_time = datetime.utcnow() | |
| # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # if database is not None: | |
| # await database.api_logs.insert_one({ | |
| # "endpoint": "/face-swap", | |
| # "status": "success", | |
| # "response_time_ms": response_time_ms, | |
| # "timestamp": end_time | |
| # }) | |
| # return { | |
| # "result_key": result_key, | |
| # "result_url": result_url | |
| # } | |
| # except Exception as e: | |
| # end_time = datetime.utcnow() | |
| # response_time_ms = (end_time - start_time).total_seconds() * 1000 | |
| # if database is not None: | |
| # await database.api_logs.insert_one({ | |
| # "endpoint": "/face-swap", | |
| # "status": "fail", | |
| # "response_time_ms": response_time_ms, | |
| # "timestamp": end_time, | |
| # "error": str(e) | |
| # }) | |
| # raise HTTPException(500, f"Face swap failed: {str(e)}") | |
| # @fastapi_app.get("/preview/{result_key:path}") | |
| # async def preview_result(result_key: str): | |
| # try: | |
| # img_bytes = download_from_spaces(result_key) | |
| # except Exception: | |
| # raise HTTPException(status_code=404, detail="Result not found") | |
| # return Response( | |
| # content=img_bytes, | |
| # media_type="image/png", | |
| # headers={"Content-Disposition": "inline; filename=result.png"} | |
| # ) | |
| # # --------------------- Mount Gradio --------------------- | |
| # fastapi_app = mount_gradio_app(fastapi_app, demo, path="/gradio") | |
| # if __name__ == "__main__": | |
| # uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |