| import os | |
| os.environ["OMP_NUM_THREADS"] = "1" | |
| import shutil | |
| import uuid | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| import subprocess | |
| import logging | |
| from datetime import datetime | |
| import requests | |
| from pymongo import MongoClient | |
| import time | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| from huggingface_hub import hf_hub_download | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, Security, Form, Response | |
| from fastapi.responses import RedirectResponse | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from fastapi.concurrency import run_in_threadpool | |
| from fastapi.staticfiles import StaticFiles | |
| import uvicorn | |
| import gradio as gr | |
| from gradio import mount_gradio_app | |
| # --------------------- LOGGING --------------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --------------------- TARGET IMAGE BASE URL --------------------- | |
| TARGET_BASE_URL = "https://halloween-image-generation.onrender.com/garment_templates" | |
| # --------------------- PATHS --------------------- | |
| REPO_ID = "HariLogicgo/face_swap_models" | |
| MODELS_DIR = "./models" | |
| os.makedirs(MODELS_DIR, exist_ok=True) | |
| # --------------------- SECRETS --------------------- | |
| # --------------------- MONGODB --------------------- | |
| MONGO_URL = "mongodb+srv://harilogicgo_db_user:g6Zz4M2xWpr3B2VM@cluster0.bnzjt7f.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0" | |
| mongo_client = MongoClient(MONGO_URL) | |
| mongo_db = mongo_client["halloween_db"] | |
| api_logs_collection = mongo_db["api_logs"] | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| # --------------------- DOWNLOAD MODELS --------------------- | |
| def download_models(): | |
| logger.info("Downloading models...") | |
| inswapper_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="models/inswapper_128.onnx", | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| buffalo_files = [ | |
| "1k3d68.onnx", | |
| "2d106det.onnx", | |
| "genderage.onnx", | |
| "det_10g.onnx", | |
| "w600k_r50.onnx" | |
| ] | |
| for f in buffalo_files: | |
| hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=f"models/buffalo_l/{f}", | |
| repo_type="model", | |
| local_dir=MODELS_DIR, | |
| token=HF_TOKEN | |
| ) | |
| return inswapper_path | |
| inswapper_path = download_models() | |
| # --------------------- FACE ANALYSIS --------------------- | |
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| # --------------------- CODEFORMER --------------------- | |
| CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| def ensure_codeformer(): | |
| if not os.path.exists("CodeFormer"): | |
| subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=True) | |
| subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True) | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=True) | |
| subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=True) | |
| ensure_codeformer() | |
| # --------------------- FASTAPI --------------------- | |
| fastapi_app = FastAPI() | |
| # --------------------- AUTH --------------------- | |
| security = HTTPBearer() | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| if credentials.credentials != API_SECRET_TOKEN: | |
| raise HTTPException(status_code=401, detail="Invalid Token") | |
| return credentials.credentials | |
| # --------------------- FACE SWAP LOGIC --------------------- | |
| swap_lock = threading.Lock() | |
| def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap"): | |
| try: | |
| with swap_lock: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| src_faces = face_analysis_app.get(src_bgr) | |
| tgt_faces = face_analysis_app.get(tgt_bgr) | |
| if not src_faces or not tgt_faces: | |
| return None, None, "Face not detected" | |
| swapped_path = os.path.join(temp_dir, "swap.jpg") | |
| swapped = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| cv2.imwrite(swapped_path, swapped) | |
| cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --face_upsample" | |
| subprocess.run(cmd, shell=True, check=True) | |
| final_dir = os.path.join(temp_dir, "final_results") | |
| final_file = os.listdir(final_dir)[0] | |
| final_path = os.path.join(final_dir, final_file) | |
| final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB) | |
| return final_img, final_path, "" | |
| except Exception as e: | |
| return None, None, str(e) | |
| # --------------------- TARGET LOAD FROM PUBLIC URL --------------------- | |
| async def load_target_from_url(filename_or_index: str): | |
| if filename_or_index.isdigit(): | |
| filename = f"{filename_or_index}.png" | |
| else: | |
| filename = filename_or_index | |
| url = f"{TARGET_BASE_URL}/{filename}" | |
| logger.info(f"Fetching target from: {url}") | |
| resp = requests.get(url, timeout=15) | |
| if resp.status_code != 200: | |
| raise HTTPException(status_code=404, detail="Target image not found") | |
| arr = np.frombuffer(resp.content, np.uint8) | |
| img = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| if img is None: | |
| raise HTTPException(status_code=400, detail="Invalid target image") | |
| return cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # --------------------- API --------------------- | |
| async def face_swap_api( | |
| source: UploadFile = File(...), | |
| target: str = Form(...) | |
| ): | |
| start_time = time.time() | |
| status = "success" | |
| error_msg = None | |
| try: | |
| src_bytes = await source.read() | |
| src_arr = np.frombuffer(src_bytes, np.uint8) | |
| src_img = cv2.imdecode(src_arr, cv2.IMREAD_COLOR) | |
| if src_img is None: | |
| raise HTTPException(status_code=400, detail="Invalid source image") | |
| src_rgb = cv2.cvtColor(src_img, cv2.COLOR_BGR2RGB) | |
| tgt_rgb = await load_target_from_url(target) | |
| img, path, err = await run_in_threadpool( | |
| face_swap_and_enhance, | |
| src_rgb, | |
| tgt_rgb | |
| ) | |
| if err: | |
| raise HTTPException(status_code=500, detail=err) | |
| os.makedirs("garment_output", exist_ok=True) | |
| output_uuid = str(uuid.uuid4()) | |
| output_filename = f"{output_uuid}.webp" | |
| output_path = os.path.join("garment_output", output_filename) | |
| cv2.imwrite(output_path, cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| response_data = { | |
| "status": "success", | |
| "preview_url": f"/garment_output/{output_filename}", | |
| "filename": output_filename | |
| } | |
| return response_data | |
| except Exception as e: | |
| status = "failure" | |
| error_msg = str(e) | |
| raise | |
| finally: | |
| end_time = time.time() | |
| response_time_ms = round((end_time - start_time) * 1000, 2) | |
| log_data = { | |
| "api": "/face-swap", | |
| "status": status, | |
| "date": datetime.utcnow().strftime("%Y-%m-%d"), | |
| "time": datetime.utcnow().strftime("%H:%M:%S"), | |
| "response_time_ms": response_time_ms, | |
| "target": target, | |
| "error": error_msg | |
| } | |
| api_logs_collection.insert_one(log_data) | |
| # @fastapi_app.post("/face-swap", dependencies=[Depends(verify_token)]) | |
| # async def face_swap_api( | |
| # source: UploadFile = File(...), | |
| # target: str = Form(...) | |
| # ): | |
| # src_bytes = await source.read() | |
| # src_arr = np.frombuffer(src_bytes, np.uint8) | |
| # src_img = cv2.imdecode(src_arr, cv2.IMREAD_COLOR) | |
| # if src_img is None: | |
| # raise HTTPException(status_code=400, detail="Invalid source image") | |
| # src_rgb = cv2.cvtColor(src_img, cv2.COLOR_BGR2RGB) | |
| # tgt_rgb = await load_target_from_url(target) | |
| # img, path, err = await run_in_threadpool( | |
| # face_swap_and_enhance, | |
| # src_rgb, | |
| # tgt_rgb | |
| # ) | |
| # if err: | |
| # raise HTTPException(status_code=500, detail=err) | |
| # # ---------------- SAVE OUTPUT TO garment_output ---------------- | |
| # os.makedirs("garment_output", exist_ok=True) | |
| # output_uuid = str(uuid.uuid4()) | |
| # output_filename = f"{output_uuid}.webp" | |
| # output_path = os.path.join("garment_output", output_filename) | |
| # cv2.imwrite(output_path, cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| # return { | |
| # "status": "success", | |
| # "preview_url": f"/garment_output/{output_filename}", | |
| # "filename": output_filename | |
| # } | |
| # --------------------- GRADIO --------------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Face Swap (Target from URL)") | |
| src = gr.Image(type="numpy", label="Upload Face") | |
| target_name = gr.Textbox(label="Target Number (e.g. 1 or 10)") | |
| btn = gr.Button("Swap") | |
| out = gr.Image() | |
| msg = gr.Textbox() | |
| def process(src_img, filename): | |
| tgt = requests.get(f"{TARGET_BASE_URL}/{filename}.png") | |
| arr = np.frombuffer(tgt.content, np.uint8) | |
| tgt_img = cv2.cvtColor(cv2.imdecode(arr, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB) | |
| img, _, err = face_swap_and_enhance(src_img, tgt_img) | |
| return img, err | |
| btn.click(process, [src, target_name], [out, msg]) | |
| # ---------------- STATIC FILES (garment_output) ---------------- | |
| os.makedirs("garment_output", exist_ok=True) | |
| fastapi_app.mount("/garment_output", StaticFiles(directory="garment_output"), name="garment_output") | |
| # --------------------- MOUNT GRADIO --------------------- | |
| fastapi_app = mount_gradio_app(fastapi_app, demo, path="/gradio") | |
| # --------------------- HEALTH CHECK --------------------- | |
| def health_check(): | |
| return { | |
| "status": "healthy", | |
| "service": "face-swap", | |
| "timestamp": datetime.utcnow().isoformat() + "Z" | |
| } | |
| def root(): | |
| return RedirectResponse("/gradio") | |
| # --------------------- RUN --------------------- | |
| if __name__ == "__main__": | |
| uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |
| # import os | |
| # os.environ["OMP_NUM_THREADS"] = "1" | |
| # import shutil | |
| # import uuid | |
| # import cv2 | |
| # import numpy as np | |
| # import threading | |
| # import subprocess | |
| # import logging | |
| # from datetime import datetime | |
| # import requests | |
| # import insightface | |
| # from insightface.app import FaceAnalysis | |
| # from huggingface_hub import hf_hub_download | |
| # from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, Security, Form, Response | |
| # from fastapi.responses import RedirectResponse | |
| # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| # from fastapi.concurrency import run_in_threadpool | |
| # import uvicorn | |
| # import gradio as gr | |
| # from gradio import mount_gradio_app | |
| # # --------------------- LOGGING --------------------- | |
| # logging.basicConfig(level=logging.INFO) | |
| # logger = logging.getLogger(__name__) | |
| # # --------------------- TARGET IMAGE BASE URL --------------------- | |
| # TARGET_BASE_URL = "https://halloween-image-generation.onrender.com/garment_templates" | |
| # # --------------------- PATHS --------------------- | |
| # REPO_ID = "HariLogicgo/face_swap_models" | |
| # MODELS_DIR = "./models" | |
| # os.makedirs(MODELS_DIR, exist_ok=True) | |
| # # --------------------- SECRETS --------------------- | |
| # HF_TOKEN = os.getenv("HF_TOKEN") | |
| # API_SECRET_TOKEN = os.getenv("API_SECRET_TOKEN") | |
| # # --------------------- DOWNLOAD MODELS --------------------- | |
| # def download_models(): | |
| # logger.info("Downloading models...") | |
| # inswapper_path = hf_hub_download( | |
| # repo_id=REPO_ID, | |
| # filename="models/inswapper_128.onnx", | |
| # repo_type="model", | |
| # local_dir=MODELS_DIR, | |
| # token=HF_TOKEN | |
| # ) | |
| # buffalo_files = [ | |
| # "1k3d68.onnx", | |
| # "2d106det.onnx", | |
| # "genderage.onnx", | |
| # "det_10g.onnx", | |
| # "w600k_r50.onnx" | |
| # ] | |
| # for f in buffalo_files: | |
| # hf_hub_download( | |
| # repo_id=REPO_ID, | |
| # filename=f"models/buffalo_l/{f}", | |
| # repo_type="model", | |
| # local_dir=MODELS_DIR, | |
| # token=HF_TOKEN | |
| # ) | |
| # return inswapper_path | |
| # inswapper_path = download_models() | |
| # # --------------------- FACE ANALYSIS --------------------- | |
| # providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| # face_analysis_app = FaceAnalysis(name="buffalo_l", root=MODELS_DIR, providers=providers) | |
| # face_analysis_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| # swapper = insightface.model_zoo.get_model(inswapper_path, providers=providers) | |
| # # --------------------- CODEFORMER --------------------- | |
| # CODEFORMER_PATH = "CodeFormer/inference_codeformer.py" | |
| # def ensure_codeformer(): | |
| # if not os.path.exists("CodeFormer"): | |
| # subprocess.run("git clone https://github.com/sczhou/CodeFormer.git", shell=True, check=True) | |
| # subprocess.run("pip install -r CodeFormer/requirements.txt", shell=True, check=True) | |
| # subprocess.run("python CodeFormer/basicsr/setup.py develop", shell=True, check=True) | |
| # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py facelib", shell=True, check=True) | |
| # subprocess.run("python CodeFormer/scripts/download_pretrained_models.py CodeFormer", shell=True, check=True) | |
| # ensure_codeformer() | |
| # # --------------------- FASTAPI --------------------- | |
| # fastapi_app = FastAPI() | |
| # # --------------------- AUTH --------------------- | |
| # security = HTTPBearer() | |
| # def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| # if credentials.credentials != API_SECRET_TOKEN: | |
| # raise HTTPException(status_code=401, detail="Invalid Token") | |
| # return credentials.credentials | |
| # # --------------------- FACE SWAP LOGIC --------------------- | |
| # swap_lock = threading.Lock() | |
| # def face_swap_and_enhance(src_img, tgt_img, temp_dir="/tmp/faceswap"): | |
| # try: | |
| # with swap_lock: | |
| # shutil.rmtree(temp_dir, ignore_errors=True) | |
| # os.makedirs(temp_dir, exist_ok=True) | |
| # src_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR) | |
| # tgt_bgr = cv2.cvtColor(tgt_img, cv2.COLOR_RGB2BGR) | |
| # src_faces = face_analysis_app.get(src_bgr) | |
| # tgt_faces = face_analysis_app.get(tgt_bgr) | |
| # if not src_faces or not tgt_faces: | |
| # return None, None, "Face not detected" | |
| # swapped_path = os.path.join(temp_dir, "swap.jpg") | |
| # swapped = swapper.get(tgt_bgr, tgt_faces[0], src_faces[0]) | |
| # cv2.imwrite(swapped_path, swapped) | |
| # cmd = f"python {CODEFORMER_PATH} -w 0.7 --input_path {swapped_path} --output_path {temp_dir} --face_upsample" | |
| # subprocess.run(cmd, shell=True, check=True) | |
| # final_dir = os.path.join(temp_dir, "final_results") | |
| # final_file = os.listdir(final_dir)[0] | |
| # final_path = os.path.join(final_dir, final_file) | |
| # final_img = cv2.cvtColor(cv2.imread(final_path), cv2.COLOR_BGR2RGB) | |
| # return final_img, final_path, "" | |
| # except Exception as e: | |
| # return None, None, str(e) | |
| # # --------------------- TARGET LOAD FROM PUBLIC URL --------------------- | |
| # async def load_target_from_url(filename_or_index: str): | |
| # if filename_or_index.isdigit(): | |
| # filename = f"{filename_or_index}.png" | |
| # else: | |
| # filename = filename_or_index | |
| # url = f"{TARGET_BASE_URL}/{filename}" | |
| # logger.info(f"Fetching target from: {url}") | |
| # resp = requests.get(url, timeout=15) | |
| # if resp.status_code != 200: | |
| # raise HTTPException(status_code=404, detail="Target image not found") | |
| # arr = np.frombuffer(resp.content, np.uint8) | |
| # img = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| # if img is None: | |
| # raise HTTPException(status_code=400, detail="Invalid target image") | |
| # return cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # # --------------------- API --------------------- | |
| # @fastapi_app.post("/face-swap", dependencies=[Depends(verify_token)]) | |
| # async def face_swap_api( | |
| # source: UploadFile = File(...), | |
| # target: str = Form(...) | |
| # ): | |
| # src_bytes = await source.read() | |
| # src_arr = np.frombuffer(src_bytes, np.uint8) | |
| # src_img = cv2.imdecode(src_arr, cv2.IMREAD_COLOR) | |
| # if src_img is None: | |
| # raise HTTPException(status_code=400, detail="Invalid source image") | |
| # src_rgb = cv2.cvtColor(src_img, cv2.COLOR_BGR2RGB) | |
| # tgt_rgb = await load_target_from_url(target) | |
| # img, path, err = await run_in_threadpool( | |
| # face_swap_and_enhance, | |
| # src_rgb, | |
| # tgt_rgb | |
| # ) | |
| # if err: | |
| # raise HTTPException(status_code=500, detail=err) | |
| # with open(path, "rb") as f: | |
| # data = f.read() | |
| # return Response(content=data, media_type="image/png") | |
| # # --------------------- GRADIO --------------------- | |
| # with gr.Blocks() as demo: | |
| # gr.Markdown("## Face Swap (Target from URL)") | |
| # src = gr.Image(type="numpy", label="Upload Face") | |
| # target_name = gr.Textbox(label="Target Number (e.g. 1 or 10)") | |
| # btn = gr.Button("Swap") | |
| # out = gr.Image() | |
| # msg = gr.Textbox() | |
| # def process(src_img, filename): | |
| # tgt = requests.get(f"{TARGET_BASE_URL}/{filename}.png") | |
| # arr = np.frombuffer(tgt.content, np.uint8) | |
| # tgt_img = cv2.cvtColor(cv2.imdecode(arr, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB) | |
| # img, _, err = face_swap_and_enhance(src_img, tgt_img) | |
| # return img, err | |
| # btn.click(process, [src, target_name], [out, msg]) | |
| # # --------------------- MOUNT --------------------- | |
| # fastapi_app = mount_gradio_app(fastapi_app, demo, path="/gradio") | |
| # @fastapi_app.get("/") | |
| # def root(): | |
| # return RedirectResponse("/gradio") | |
| # # --------------------- RUN --------------------- | |
| # if __name__ == "__main__": | |
| # uvicorn.run(fastapi_app, host="0.0.0.0", port=7860) | |