Spaces:
Running
Running
Add scene-harmonisation final pass: match the photo's grain + colour cast over the swapped head so the GAN-smooth face/hair stop looking synthetic/pasted and read as photographed with the scene
4b38f72 | """ | |
| Flask web server for the Face Swap Deepfake application. | |
| Run: python web_app.py | |
| Then open http://localhost:5000 | |
| """ | |
| import os | |
| # Load .env before anything reads os.environ (Supabase keys, admin passwords). | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except Exception: | |
| pass | |
| # Required before importing mediapipe/insightface on some platforms to avoid | |
| # protobuf C-extension symbol errors. | |
| os.environ.setdefault("PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION", "python") | |
| import io | |
| import re | |
| import base64 | |
| import traceback | |
| import mimetypes | |
| # python:3.10-slim has an incomplete MIME database — JS/CSS would be served as | |
| # application/octet-stream and the browser would refuse to execute them. | |
| mimetypes.add_type("application/javascript", ".js") | |
| mimetypes.add_type("text/css", ".css") | |
| mimetypes.add_type("image/svg+xml", ".svg") | |
| mimetypes.add_type("application/json", ".json") | |
| import cv2 | |
| import cv2.data | |
| import numpy as np | |
| from flask import Flask, request, jsonify, send_from_directory, send_file | |
| from flask_cors import CORS | |
| from PIL import Image, ImageOps | |
| from core.detector import detect_faces, _get_insightface, align_face_upright, pad_until_detectable | |
| from core.swapper import swap_face_insightface | |
| from core.skin_tone import analyze_skin_tone | |
| from core.super_res import restore_faces, upscale_image | |
| from core.head_swap import (swap_hair, match_skin_to_source, transfer_glasses, | |
| full_head_swap, harmonize_to_scene) | |
| from core.hair_transfer import transfer_hair | |
| from core.blender import laplacian_blend | |
| from core.quality_checker import compute_quality_score | |
| from core import supabase_store | |
| from utils.image_io import resize_keep_aspect | |
| REACT_BUILD = os.path.join("static", "react") | |
| LOCATIONS_DIR = "Location" # Location/Male/<loc>/img + Location/Female/<loc>/img | |
| VALID_GENDERS = {"Male", "Female"} | |
| _IMG_EXTS = (".jpg", ".jpeg", ".png", ".webp", ".bmp") | |
| # Control Panel access is restricted to these Google accounts. Keep in sync with | |
| # ALLOWED_ADMINS in frontend/src/pages/AdminPage.jsx. Comma-separated in env. | |
| ALLOWED_ADMIN_EMAILS = set(filter(None, ( | |
| e.strip().lower() for e in | |
| (os.environ.get("ALLOWED_ADMIN_EMAILS") | |
| or "adivid198986@gmail.com,nishith.kotak@gmail.com,cdparmar9824416484@gmail.com").split(",") | |
| ))) | |
| # Primary admin(s) — full control incl. DELETE. Other allowlisted admins can | |
| # add + edit (update/rename) but NOT delete. Keep in sync with the frontend. | |
| PRIMARY_ADMIN_EMAILS = set(filter(None, ( | |
| e.strip().lower() for e in | |
| (os.environ.get("PRIMARY_ADMIN_EMAILS") or "adivid198986@gmail.com").split(",") | |
| ))) | |
| # Firebase Web API key — used to verify the owner's Google ID token server-side. | |
| FIREBASE_API_KEY = os.environ.get( | |
| "FIREBASE_API_KEY", "AIzaSyCF9t3xPf_Se4qkqAHFRoW-YqG8LfoQIpo") | |
| # Optional legacy passwords (break-glass). Empty by default — Google is primary. | |
| ADMIN_PASSWORDS = set(filter(None, ( | |
| os.environ.get("ADMIN_PASSWORD_1", ""), | |
| os.environ.get("ADMIN_PASSWORD_2", ""), | |
| ))) | |
| def _verify_google_email(id_token: str): | |
| """ | |
| Verify a Firebase Google ID token via Identity Toolkit and return the | |
| verified email (lowercased), or None. Rejects invalid/expired/unverified. | |
| """ | |
| if not id_token or not FIREBASE_API_KEY: | |
| return None | |
| try: | |
| import requests | |
| r = requests.post( | |
| f"https://identitytoolkit.googleapis.com/v1/accounts:lookup?key={FIREBASE_API_KEY}", | |
| json={"idToken": id_token}, timeout=10) | |
| if r.status_code != 200: | |
| return None | |
| users = r.json().get("users") or [] | |
| if not users: | |
| return None | |
| u = users[0] | |
| email = (u.get("email") or "").lower() | |
| if not email or not u.get("emailVerified", False): | |
| return None | |
| return email | |
| except Exception as e: | |
| print(f"[admin] token verify failed: {e}") | |
| return None | |
| def _admin_identity(req): | |
| """ | |
| Resolve the caller's admin identity from the request token. | |
| Returns (email, is_admin): | |
| - email: verified Google email (lowercased), or "" for legacy-password auth | |
| - is_admin: True if authorised at all | |
| Token is read from the X-Admin-Token header, form field, or JSON body. | |
| """ | |
| tok = req.headers.get("X-Admin-Token") or "" | |
| if not tok and req.form: | |
| tok = req.form.get("admin_token") or "" | |
| if not tok: | |
| data = req.get_json(silent=True) or {} | |
| tok = data.get("admin_token") or "" | |
| if not tok: | |
| return (None, False) | |
| # Firebase ID tokens are JWTs: header.payload.signature (two dots, long). | |
| if tok.count(".") == 2 and len(tok) > 100: | |
| email = _verify_google_email(tok) | |
| if email and email in ALLOWED_ADMIN_EMAILS: | |
| return (email, True) | |
| return (None, False) | |
| # Legacy password fallback (break-glass) — treated as a primary admin. | |
| if bool(ADMIN_PASSWORDS) and tok in ADMIN_PASSWORDS: | |
| return ("", True) | |
| return (None, False) | |
| def _is_primary(email) -> bool: | |
| """Primary admin (full control incl. delete). Legacy password == primary.""" | |
| return email == "" or (email is not None and email in PRIMARY_ADMIN_EMAILS) | |
| def _check_admin(req) -> bool: | |
| """Authorised at all? (add/edit allowed for any admin).""" | |
| return _admin_identity(req)[1] | |
| def _clean_location_label(name: str) -> str: | |
| """Strip a leading numeric prefix like '1. ' for display.""" | |
| return re.sub(r"^\s*\d+\.\s*", "", name).strip() | |
| def _list_locations(gender: str) -> list: | |
| """ | |
| Return [{folder, label}] for every location sub-folder under the gender, | |
| sorted by the numeric prefix. Folders are listed even if they have no image | |
| yet (the picker shows them; the swap call validates the image exists). | |
| """ | |
| if gender not in VALID_GENDERS: | |
| return [] | |
| gender_dir = os.path.join(LOCATIONS_DIR, gender) | |
| if not os.path.isdir(gender_dir): | |
| return [] | |
| def sort_key(n): | |
| m = re.match(r"^\s*(\d+)", n) | |
| return (int(m.group(1)) if m else 9999, n.lower()) | |
| out = [] | |
| for name in sorted(os.listdir(gender_dir), key=sort_key): | |
| folder = os.path.join(gender_dir, name) | |
| if os.path.isdir(folder): | |
| msg = "" | |
| mp = os.path.join(folder, "message.txt") | |
| if os.path.isfile(mp): | |
| try: | |
| with open(mp, encoding="utf-8") as f: | |
| msg = f.read().strip() | |
| except OSError: | |
| pass | |
| out.append({"folder": name, "label": _clean_location_label(name), | |
| "has_image": _find_location_image(gender, name) is not None, | |
| "message": msg}) | |
| return out | |
| def _find_location_image(gender: str, location: str): | |
| """ | |
| Resolve the target image path inside Location/<gender>/<location>/. | |
| Returns the first image file found, or None. Guards against path traversal. | |
| """ | |
| if gender not in VALID_GENDERS: | |
| return None | |
| # Reject anything that could escape the locations dir. | |
| if not location or ".." in location or "/" in location or "\\" in location: | |
| return None | |
| folder = os.path.join(LOCATIONS_DIR, gender, location) | |
| if not os.path.isdir(folder): | |
| return None | |
| for f in sorted(os.listdir(folder)): | |
| if f.lower().endswith(_IMG_EXTS) and os.path.isfile(os.path.join(folder, f)): | |
| return os.path.join(folder, f) | |
| return None | |
| def _next_location_number(gender: str) -> int: | |
| """Highest numeric prefix in the gender folder + 1 (for new locations).""" | |
| gender_dir = os.path.join(LOCATIONS_DIR, gender) | |
| n = 0 | |
| if os.path.isdir(gender_dir): | |
| for name in os.listdir(gender_dir): | |
| m = re.match(r"^\s*(\d+)", name) | |
| if m: | |
| n = max(n, int(m.group(1))) | |
| return n + 1 | |
| def _resolve_or_create_location(gender: str, location_name: str, create: bool = True): | |
| """ | |
| Find the folder for a location label inside a gender (matched by cleaned | |
| label, case-insensitive). Creates '<next-number>. <name>' if missing and | |
| create=True. Returns the folder name, or None. | |
| """ | |
| gender_dir = os.path.join(LOCATIONS_DIR, gender) | |
| clean = _clean_location_label(location_name) | |
| if not clean: | |
| return None | |
| if os.path.isdir(gender_dir): | |
| for name in os.listdir(gender_dir): | |
| if os.path.isdir(os.path.join(gender_dir, name)) and \ | |
| _clean_location_label(name).lower() == clean.lower(): | |
| return name | |
| if not create: | |
| return None | |
| folder = f"{_next_location_number(gender)}. {clean}" | |
| os.makedirs(os.path.join(gender_dir, folder), exist_ok=True) | |
| return folder | |
| app = Flask(__name__, static_folder="static") | |
| app.config["MAX_CONTENT_LENGTH"] = 50 * 1024 * 1024 # 50 MB | |
| _debug_mode = os.environ.get("FLASK_DEBUG", "true").lower() == "true" | |
| CORS(app, origins=["http://localhost:5173", "http://127.0.0.1:5173"] if _debug_mode else "*") | |
| # Nothing is written to disk — uploads are decoded in memory and the result is | |
| # returned inline as a data-URI for client-side download. | |
| # -- helpers ------------------------------------------------------------------- | |
| def _decode_image(data_or_file) -> np.ndarray | None: | |
| """ | |
| Accept a Flask FileStorage or a base64 data-URI string and return a BGR image. | |
| Honours EXIF orientation so phone photos (stored rotated with an orientation | |
| tag) aren't processed sideways — cv2.imdecode ignores EXIF, PIL applies it. | |
| """ | |
| if isinstance(data_or_file, str): | |
| # base64 data URI: "data:image/jpeg;base64,<data>" | |
| if "," in data_or_file: | |
| data_or_file = data_or_file.split(",", 1)[1] | |
| raw = base64.b64decode(data_or_file) | |
| else: | |
| raw = data_or_file.read() | |
| try: | |
| pil = Image.open(io.BytesIO(raw)) | |
| pil = ImageOps.exif_transpose(pil) or pil # auto-rotate; fallback if None | |
| return cv2.cvtColor(np.array(pil.convert("RGB")), cv2.COLOR_RGB2BGR) | |
| except Exception: | |
| # Fallback: raw decode (no EXIF) if PIL can't read it. | |
| arr = np.frombuffer(raw, np.uint8) | |
| return cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
| def _encode_image(img: np.ndarray, fmt: str = "JPEG", quality: int = 88) -> str: | |
| """Return a base64 data-URI for a BGR numpy image.""" | |
| rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| pil = Image.fromarray(rgb) | |
| buf = io.BytesIO() | |
| pil.save(buf, format=fmt, quality=quality) | |
| b64 = base64.b64encode(buf.getvalue()).decode() | |
| mime = "image/jpeg" if fmt == "JPEG" else "image/png" | |
| return f"data:{mime};base64,{b64}" | |
| def _safe_detect(img): | |
| faces = detect_faces(img) | |
| if faces: | |
| return faces | |
| # Close-up face filling the frame defeats the detector (0 faces). Pad a | |
| # replicated margin, detect there, and remap the boxes back to the original | |
| # image coords so the count is right and the upload isn't wrongly rejected. | |
| h, w = img.shape[:2] | |
| for frac in (0.3, 0.5, 0.8): | |
| p = int(max(h, w) * frac) | |
| padded = cv2.copyMakeBorder(img, p, p, p, p, cv2.BORDER_REPLICATE) | |
| pf = detect_faces(padded) | |
| if pf: | |
| return [(max(0, x1 - p), max(0, y1 - p), | |
| min(w, x2 - p), min(h, y2 - p)) for (x1, y1, x2, y2) in pf] | |
| return [] | |
| def _enhance_input(img: np.ndarray) -> np.ndarray: | |
| """ | |
| Enhance an uploaded image before it enters the swap pipeline, so low-res | |
| photos don't lose detail. Small images are upscaled (RealESRGAN/Lanczos) to a | |
| workable size, then GFPGAN restores facial detail. Large images are just | |
| face-restored. The result feeds detection + swap, and the final output is | |
| enhanced again — detail is preserved at both ends. | |
| """ | |
| try: | |
| h, w = img.shape[:2] | |
| # Only enhance genuinely low-res uploads. Already-decent photos are left | |
| # alone so they don't get GFPGAN-restored on input AND output (stacking | |
| # GFPGAN makes skin look plastic/unnatural). | |
| if max(h, w) < 800: | |
| img = upscale_image(img, scale=2) # bring small uploads up | |
| img = resize_keep_aspect(img, 1280) # but cap the working size | |
| img = restore_faces(img) # restore detail in the upscale | |
| return img | |
| except Exception as e: | |
| print(f"[swap] input enhance skipped: {e}") | |
| return img | |
| # -- routes -------------------------------------------------------------------- | |
| def serve_react(path): | |
| """Serve the React SPA — assets by exact path, everything else → index.html.""" | |
| react_dir = os.path.abspath(REACT_BUILD) | |
| react_index = os.path.join(react_dir, "index.html") | |
| if not os.path.exists(react_index): | |
| return ( | |
| "<h2 style='font-family:sans-serif;padding:2rem;color:#D4DE95;" | |
| "background:#0f1209;min-height:100vh'>" | |
| "React build not found.<br><br>" | |
| "<code style='font-size:0.9rem'>" | |
| "cd frontend && npm install && npm run build" | |
| "</code></h2>", | |
| 503, | |
| ) | |
| # Serve the asset if it exists (JS, CSS, images, etc.) | |
| if path: | |
| asset = os.path.join(react_dir, path) | |
| if os.path.isfile(asset): | |
| return send_from_directory(react_dir, path) | |
| # SPA fallback — let React Router handle the route | |
| return send_from_directory(react_dir, "index.html") | |
| def api_locations(): | |
| """List the curated destination locations for a gender (Male|Female).""" | |
| gender = (request.args.get("gender") or "").strip().capitalize() | |
| if gender not in VALID_GENDERS: | |
| return jsonify({"ok": False, "error": "gender must be Male or Female"}), 400 | |
| locs = supabase_store.list_locations(gender) if supabase_store.is_enabled() \ | |
| else _list_locations(gender) | |
| resp = jsonify({"ok": True, "gender": gender, "locations": locs}) | |
| resp.headers["Cache-Control"] = "no-store" # always reflect latest uploads | |
| return resp | |
| def api_location_image(): | |
| """Serve the curated target image for a gender + location (for the preview).""" | |
| gender = (request.args.get("gender") or "").strip().capitalize() | |
| location = (request.args.get("location") or "").strip() | |
| if supabase_store.is_enabled(): | |
| data = supabase_store.get_image_bytes(gender, location) | |
| if not data: | |
| return jsonify({"ok": False, "error": "Image not available yet"}), 404 | |
| resp = send_file(io.BytesIO(data), mimetype="image/jpeg") | |
| resp.headers["Cache-Control"] = "no-cache" | |
| return resp | |
| path = _find_location_image(gender, location) | |
| if path is None: | |
| return jsonify({"ok": False, "error": "Image not available yet"}), 404 | |
| resp = send_file(os.path.abspath(path)) | |
| resp.headers["Cache-Control"] = "no-cache" # always reflect latest upload | |
| return resp | |
| def api_admin_login(): | |
| """Validate owner credentials server-side (frontend also checks).""" | |
| data = request.get_json(silent=True) or {} | |
| pw = (data.get("password") or "").strip() | |
| return jsonify({"ok": pw in ADMIN_PASSWORDS}) | |
| def api_admin_add_location(): | |
| """ | |
| Owner-only: add or update a location image. | |
| Form: admin_token, gender (Male|Female), location (display name), image (file). | |
| Creates the folder if new, then stores the (re-encoded) image. Becomes | |
| visible on the app page immediately. | |
| """ | |
| if not _check_admin(request): | |
| return jsonify({"ok": False, "error": "Unauthorised — owner login required."}), 401 | |
| gender = (request.form.get("gender") or "").strip().capitalize() | |
| if gender not in VALID_GENDERS: | |
| return jsonify({"ok": False, "error": "Gender must be Male or Female."}), 400 | |
| location_name = (request.form.get("location") or "").strip() | |
| if not location_name: | |
| return jsonify({"ok": False, "error": "Location name is required."}), 400 | |
| if ".." in location_name or "/" in location_name or "\\" in location_name: | |
| return jsonify({"ok": False, "error": "Invalid location name."}), 400 | |
| file = request.files.get("image") | |
| if not file or not file.filename: | |
| return jsonify({"ok": False, "error": "An image file is required."}), 400 | |
| img = _decode_image(file) | |
| if img is None: | |
| return jsonify({"ok": False, "error": "Could not read the image file."}), 400 | |
| # Normalised JPEG bytes (strips EXIF, validates the upload). | |
| ok, buf = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 92]) | |
| if not ok: | |
| return jsonify({"ok": False, "error": "Could not encode the image."}), 500 | |
| jpeg_bytes = buf.tobytes() | |
| # ── Supabase mode ───────────────────────────────────────────────────────── | |
| if supabase_store.is_enabled(): | |
| try: | |
| res = supabase_store.upsert_location(gender, _clean_location_label(location_name), | |
| jpeg_bytes) | |
| return jsonify({"ok": True, "gender": gender, **res}) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": f"Supabase store failed: {e}"}), 502 | |
| # ── Local filesystem fallback ───────────────────────────────────────────── | |
| folder = _resolve_or_create_location(gender, location_name, create=True) | |
| if folder is None: | |
| return jsonify({"ok": False, "error": "Could not create the location."}), 500 | |
| folder_path = os.path.join(LOCATIONS_DIR, gender, folder) | |
| # Remove any existing image(s) so each location holds exactly one photo. | |
| for f in os.listdir(folder_path): | |
| if f.lower().endswith(_IMG_EXTS): | |
| try: | |
| os.remove(os.path.join(folder_path, f)) | |
| except OSError: | |
| pass | |
| with open(os.path.join(folder_path, "image.jpg"), "wb") as out: | |
| out.write(jpeg_bytes) | |
| return jsonify({"ok": True, "gender": gender, "folder": folder, | |
| "label": _clean_location_label(folder)}) | |
| def api_admin_delete_location(): | |
| """Primary-admin only: delete a whole location for a gender.""" | |
| _email, _ok = _admin_identity(request) | |
| if not _ok: | |
| return jsonify({"ok": False, "error": "Unauthorised — owner login required."}), 401 | |
| if not _is_primary(_email): | |
| return jsonify({"ok": False, "error": | |
| "Only the Primary Admin can delete locations."}), 403 | |
| data = request.get_json(silent=True) or request.form | |
| gender = (data.get("gender") or "").strip().capitalize() | |
| location = (data.get("location") or "").strip() # folder name | |
| if gender not in VALID_GENDERS or not location: | |
| return jsonify({"ok": False, "error": "gender and location are required."}), 400 | |
| if ".." in location or "/" in location or "\\" in location: | |
| return jsonify({"ok": False, "error": "Invalid location."}), 400 | |
| if supabase_store.is_enabled(): | |
| try: | |
| supabase_store.delete_location(gender, location) | |
| return jsonify({"ok": True}) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": f"Supabase delete failed: {e}"}), 502 | |
| folder_path = os.path.join(LOCATIONS_DIR, gender, location) | |
| if not os.path.isdir(folder_path): | |
| return jsonify({"ok": False, "error": "Location not found."}), 404 | |
| import shutil | |
| shutil.rmtree(folder_path, ignore_errors=True) | |
| return jsonify({"ok": True}) | |
| def api_admin_rename_location(): | |
| """Owner-only: rename a location (keeps its numeric prefix + its photo).""" | |
| if not _check_admin(request): | |
| return jsonify({"ok": False, "error": "Unauthorised — owner login required."}), 401 | |
| data = request.get_json(silent=True) or request.form | |
| gender = (data.get("gender") or "").strip().capitalize() | |
| location = (data.get("location") or "").strip() # current folder name | |
| new_name = (data.get("new_name") or "").strip() | |
| if gender not in VALID_GENDERS or not location or not new_name: | |
| return jsonify({"ok": False, "error": "gender, location and new_name are required."}), 400 | |
| for v in (location, new_name): | |
| if ".." in v or "/" in v or "\\" in v: | |
| return jsonify({"ok": False, "error": "Invalid name."}), 400 | |
| if supabase_store.is_enabled(): | |
| try: | |
| res = supabase_store.rename_location(gender, location, | |
| _clean_location_label(new_name)) | |
| return jsonify({"ok": True, **res}) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": str(e)}), 502 | |
| gender_dir = os.path.join(LOCATIONS_DIR, gender) | |
| src = os.path.join(gender_dir, location) | |
| if not os.path.isdir(src): | |
| return jsonify({"ok": False, "error": "Location not found."}), 404 | |
| # Keep the existing numeric prefix if there was one. | |
| m = re.match(r"^\s*(\d+)\.", location) | |
| prefix = f"{m.group(1)}. " if m else "" | |
| new_folder = f"{prefix}{_clean_location_label(new_name)}" | |
| dst = os.path.join(gender_dir, new_folder) | |
| if os.path.abspath(src) != os.path.abspath(dst): | |
| if os.path.exists(dst): | |
| return jsonify({"ok": False, "error": "A location with that name already exists."}), 409 | |
| os.rename(src, dst) | |
| return jsonify({"ok": True, "folder": new_folder, "label": _clean_location_label(new_folder)}) | |
| def api_admin_set_message(): | |
| """ | |
| Owner (any admin) sets/clears a location's custom result message. The message | |
| may contain {name} and {location} placeholders, filled in on the result. | |
| """ | |
| if not _check_admin(request): | |
| return jsonify({"ok": False, "error": "Unauthorised - owner login required."}), 401 | |
| data = request.get_json(silent=True) or request.form | |
| gender = (data.get("gender") or "").strip().capitalize() | |
| location = (data.get("location") or "").strip() # folder/name | |
| message = (data.get("message") or "").strip() | |
| if gender not in VALID_GENDERS or not location: | |
| return jsonify({"ok": False, "error": "gender and location are required."}), 400 | |
| if ".." in location or "/" in location or "\\" in location: | |
| return jsonify({"ok": False, "error": "Invalid location."}), 400 | |
| if supabase_store.is_enabled(): | |
| try: | |
| supabase_store.set_message(gender, location, message) | |
| return jsonify({"ok": True}) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": str(e)}), 502 | |
| # FS fallback: store message.txt alongside the image. | |
| folder_path = os.path.join(LOCATIONS_DIR, gender, location) | |
| if not os.path.isdir(folder_path): | |
| return jsonify({"ok": False, "error": "Location not found."}), 404 | |
| msg_path = os.path.join(folder_path, "message.txt") | |
| try: | |
| if message: | |
| with open(msg_path, "w", encoding="utf-8") as f: | |
| f.write(message) | |
| elif os.path.exists(msg_path): | |
| os.remove(msg_path) | |
| return jsonify({"ok": True}) | |
| except OSError as e: | |
| return jsonify({"ok": False, "error": str(e)}), 500 | |
| def api_detect(): | |
| """Quick face-detection check. Returns count + thumbnail with boxes drawn.""" | |
| try: | |
| if "image" in request.files: | |
| img = _decode_image(request.files["image"]) | |
| else: | |
| data = request.get_json(force=True) | |
| img = _decode_image(data["image"]) | |
| if img is None: | |
| return jsonify({"ok": False, "error": "Could not decode image"}), 400 | |
| img = resize_keep_aspect(img, 800) | |
| faces = _safe_detect(img) | |
| # Draw boxes on thumbnail | |
| preview = img.copy() | |
| for (x1, y1, x2, y2) in faces: | |
| cv2.rectangle(preview, (x1, y1), (x2, y2), (0, 220, 80), 2) | |
| thumbnail = resize_keep_aspect(preview, 400) | |
| return jsonify({ | |
| "ok": True, | |
| "faces": len(faces), | |
| "thumbnail": _encode_image(thumbnail), | |
| }) | |
| except Exception as e: | |
| return jsonify({"ok": False, "error": str(e)}), 500 | |
| def api_swap(): | |
| """ | |
| Full face-swap pipeline: InsightFace swap → GFPGAN face restoration → | |
| RealESRGAN 4K upscale (for download). | |
| Accepts multipart/form-data: | |
| - source_file (file) OR source_b64 (string) - source face | |
| - target_file (file) - target face | |
| Returns JSON with result_image (base64), quality metrics, delta_e. | |
| """ | |
| try: | |
| # -- decode source ---------------------------------------------------- | |
| if "source_file" in request.files and request.files["source_file"].filename: | |
| source = _decode_image(request.files["source_file"]) | |
| elif request.form.get("source_b64"): | |
| source = _decode_image(request.form["source_b64"]) | |
| else: | |
| return jsonify({"ok": False, "error": "No source image provided"}), 400 | |
| # -- resolve target --------------------------------------------------- | |
| # The user no longer uploads a target. They pick a gender + location and | |
| # we use the corresponding curated image from Location/<gender>/<location>/. | |
| # (A direct target upload is still accepted as an admin override.) | |
| if "target_file" in request.files and request.files["target_file"].filename: | |
| target = _decode_image(request.files["target_file"]) | |
| elif request.form.get("target_b64"): | |
| target = _decode_image(request.form["target_b64"]) | |
| else: | |
| gender = (request.form.get("gender") or "").strip().capitalize() | |
| location = (request.form.get("location") or "").strip() | |
| if gender not in VALID_GENDERS or not location: | |
| return jsonify({"ok": False, "error": | |
| "Please choose a gender (Male/Female) and a location."}), 400 | |
| if supabase_store.is_enabled(): | |
| data = supabase_store.get_image_bytes(gender, location) | |
| if not data: | |
| return jsonify({"ok": False, "error": | |
| f"No photo is available yet for {gender} · " | |
| f"{_clean_location_label(location)}. Please pick another location."}), 404 | |
| target = _decode_image(io.BytesIO(data)) | |
| else: | |
| tgt_path = _find_location_image(gender, location) | |
| if tgt_path is None: | |
| return jsonify({"ok": False, "error": | |
| f"No photo is available yet for {gender} · " | |
| f"{_clean_location_label(location)}. Please pick another location."}), 404 | |
| with open(tgt_path, "rb") as _tf: | |
| target = _decode_image(_tf) | |
| if source is None or target is None: | |
| return jsonify({"ok": False, "error": "Could not decode one or both images"}), 400 | |
| user_name = (request.form.get("name") or "").strip() | |
| # -- resize ----------------------------------------------------------- | |
| # Working resolution: 1536 on GPU (sharper composite + better landmarks; | |
| # the RTX-class card handles it easily) and 1024 on CPU to stay fast. | |
| # InsightFace still swaps at 128px and GFPGAN restores 512px crops; the | |
| # larger canvas mainly helps the final blend + the RealESRGAN 4x upscale. | |
| from core.super_res import _device as _sr_device | |
| _work_res = 1536 if _sr_device() == "cuda" else 1024 | |
| source = resize_keep_aspect(source, _work_res) | |
| target = resize_keep_aspect(target, _work_res) | |
| # Close-up selfie? A face that fills the whole frame defeats the detector | |
| # (0 faces -> crude paste -> no real swap/hair/skin). Pad a margin so the | |
| # face is found. The source is only read for its landmarks, so the border | |
| # never appears in the output (which is drawn on the target canvas). | |
| source = pad_until_detectable(source) | |
| # De-roll a tilted source selfie so the eyes are level. Without this, | |
| # InsightFace misses faces rolled >~15deg and the swap silently falls | |
| # back to a crude paste. The swapper then aligns the upright source to | |
| # the target's pose as usual. | |
| source = align_face_upright(source) | |
| # -- enhance inputs (upscale small + GFPGAN restore) so detail isn't | |
| # lost through the pipeline; the output is enhanced again at the end. | |
| source = _enhance_input(source) | |
| target = _enhance_input(target) | |
| # -- face detection --------------------------------------------------- | |
| faces_src = _safe_detect(source) | |
| faces_tgt = _safe_detect(target) | |
| if not faces_src: | |
| return jsonify({"ok": False, "error": | |
| "No face detected in source image. Tips: ensure good lighting, " | |
| "face the camera directly, remove heavy occlusions (mask/sunglasses), " | |
| "and use a photo where the face is at least 10% of the frame."}), 400 | |
| if not faces_tgt: | |
| return jsonify({"ok": False, "error": | |
| "No face detected in target image. Tips: ensure good lighting, " | |
| "face the camera directly, and use a clear frontal portrait."}), 400 | |
| # -- warn on very small detected faces (quality will be poor) --------- | |
| warnings = [] | |
| x1, y1, x2, y2 = faces_src[0] | |
| src_face_px = min(x2 - x1, y2 - y1) | |
| if src_face_px < 80: | |
| warnings.append( | |
| "Source face is very small — swap quality may be reduced. " | |
| "Use a closer/higher-resolution photo for best results." | |
| ) | |
| x1, y1, x2, y2 = faces_tgt[0] | |
| tgt_face_px = min(x2 - x1, y2 - y1) | |
| if tgt_face_px < 80: | |
| warnings.append( | |
| "Target face is very small — swap quality may be reduced. " | |
| "Use a closer/higher-resolution photo for best results." | |
| ) | |
| # -- skin tone analysis (for the info chips only) --------------------- | |
| src_tone = analyze_skin_tone(source, faces_src[0]) | |
| tgt_tone = analyze_skin_tone(target, faces_tgt[0]) | |
| delta_e = ( | |
| (src_tone["L"] - tgt_tone["L"]) ** 2 + | |
| (src_tone["a"] - tgt_tone["a"]) ** 2 + | |
| (src_tone["b"] - tgt_tone["b"]) ** 2 | |
| ) ** 0.5 | |
| # Default pipeline = the clean, high-quality FACE swap (this is what | |
| # produced the good results). HEAD swap and HAIR swap are OPT-IN only | |
| # (?head_swap=1 / ?swap_hair=1) because, with the current models, they | |
| # degrade the clean face swap more often than they help. | |
| # | |
| # 1. (optional) HEAD SWAP — transplant the source head shape+hair. | |
| base = target | |
| if request.form.get("head_swap", "0") in ("1", "true", "on"): | |
| try: | |
| hs = full_head_swap(source, target) | |
| if hs is not None: | |
| base = hs | |
| except Exception as e: | |
| print(f"[swap] head swap error: {e}") | |
| # 2. FACE SWAP — the main event. Source identity onto the main face only | |
| # (background/poster faces are never swapped). | |
| swapped = swap_face_insightface(source, base) | |
| # 3. GFPGAN face restoration — recovers detail lost in the 128px swap. | |
| swapped = restore_faces(swapped) | |
| # 3b. Laplacian pyramid blend over the FACE-swap boundary — smooths the | |
| # seam left by InsightFace's paste_back. MUST run BEFORE the hair step: | |
| # it composites the swapped FACE over the target everywhere else, so if | |
| # it ran after the hair it would overwrite the newly-transferred hair | |
| # (which lies outside the face mask) with the target's original hair. | |
| try: | |
| from core.segmentor import segment_hair_neck_skin | |
| _fmask = segment_hair_neck_skin(swapped).get("face_mask") | |
| if _fmask is not None and _fmask.max() > 0: | |
| swapped = laplacian_blend(swapped, target, _fmask, levels=4) | |
| except Exception as e: | |
| print(f"[swap] Laplacian blend skipped: {e}") | |
| # 4. (skin-tone match moved to the very END of the pipeline — see below.) | |
| # 5. HAIR + NECK — transfer the SOURCE's hair onto the result. HairFastGAN | |
| # generates the source's hairstyle on an aligned portrait; swap_hair now | |
| # composites the FULL long hair back into the scene (the parse crop + | |
| # region clamp were widened so long hair is no longer cut off). ON by | |
| # default (set swap_hair=0 for a fast face-only swap). | |
| if request.form.get("swap_hair", "1") in ("1", "true", "on"): | |
| hf_portrait = None | |
| try: | |
| hf_portrait = transfer_hair(face_bgr=swapped, shape_bgr=source, | |
| color_bgr=source) | |
| except Exception as e: | |
| print(f"[swap] HairFastGAN error: {e}") | |
| try: | |
| if hf_portrait is not None: | |
| pad = int(max(hf_portrait.shape[:2]) * 0.4) | |
| hf_padded = cv2.copyMakeBorder(hf_portrait, pad, pad, pad, pad, | |
| cv2.BORDER_CONSTANT, value=(127, 127, 127)) | |
| swapped = swap_hair(swapped, hf_padded, swapped, include_face=False) | |
| else: | |
| swapped = swap_hair(swapped, source, target, include_face=False) | |
| except Exception as e: | |
| print(f"[swap] hair compose error: {e}") | |
| # 6. Glasses (no-op if the source isn't wearing any). | |
| if request.form.get("keep_glasses", "1") in ("1", "true", "on"): | |
| try: | |
| swapped = transfer_glasses(swapped, source) | |
| except Exception as e: | |
| print(f"[swap] glasses transfer error: {e}") | |
| # 7. SKIN TONE — the FINAL step, so nothing can override it. Drive ALL | |
| # visible skin (face + neck + arms + hands) to the SOURCE complexion; | |
| # clothes/background are excluded. Done AFTER the Laplacian blend so the | |
| # blend can't pull the face colour back toward the target's tone. | |
| try: | |
| swapped = match_skin_to_source( | |
| swapped, source, faces_src[0], faces_tgt[0], strength=0.92 | |
| ) | |
| except Exception as e: | |
| print(f"[swap] skin tone match skipped: {e}") | |
| # 8. HARMONISE — make the swapped head look PHOTOGRAPHED WITH the scene | |
| # (not pasted on it): add the photo's grain over the GAN-smooth face/hair | |
| # and gently match its colour cast. This is the last visual step, so the | |
| # grain isn't smoothed away by anything after it. | |
| try: | |
| swapped = harmonize_to_scene(swapped, target, faces_tgt[0], grain=0.9) | |
| except Exception as e: | |
| print(f"[swap] harmonize skipped: {e}") | |
| # -- quality metrics -------------------------------------------------- | |
| quality = compute_quality_score(swapped, target, None, None) | |
| # Real alignment: how closely the swapped face's 5 landmarks sit on the | |
| # target's (the swap keeps the target geometry, so a clean swap aligns | |
| # tightly). Normalised by inter-ocular distance => resolution-independent. | |
| try: | |
| _ifa = _get_insightface() | |
| _area = lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1]) | |
| sw = _ifa.get(swapped) if _ifa else [] | |
| tg = _ifa.get(target) if _ifa else [] | |
| if sw and tg: | |
| swf = max(sw, key=_area) | |
| sk = np.asarray(swf.kps, dtype=np.float32) | |
| tk = np.asarray(max(tg, key=_area).kps, dtype=np.float32) | |
| err = float(np.linalg.norm(sk - tk, axis=1).mean()) | |
| iod = float(np.linalg.norm(tk[0] - tk[1])) or 1.0 | |
| # Deviation as a fraction of inter-ocular distance. A clean swap | |
| # lands within a few % (re-detection + GFPGAN shift the features | |
| # slightly), so 0% => 100 and 15% => 0 gives an honest ~75-95. | |
| quality["alignment"] = round(max(0.0, 100.0 * (1 - (err / iod) / 0.15)), 1) | |
| # Naturalness on the FACE region only. Measuring the whole frame | |
| # unfairly penalises studio shots (white background + dark shirt | |
| # = lots of clipped pixels + hard edges) even when the face is | |
| # perfect, so crop to the face before scoring. | |
| from core.quality_checker import _naturalness_score | |
| x1, y1, x2, y2 = [int(v) for v in swf.bbox] | |
| pad = int((x2 - x1) * 0.25) | |
| fy1, fy2 = max(0, y1 - pad), min(swapped.shape[0], y2 + pad) | |
| fx1, fx2 = max(0, x1 - pad), min(swapped.shape[1], x2 + pad) | |
| face = swapped[fy1:fy2, fx1:fx2] | |
| if face.size > 0: | |
| quality["naturalness"] = _naturalness_score(face) | |
| except Exception as e: | |
| print(f"[swap] alignment metric skipped: {e}") | |
| # -- 4K upscale for download (RealESRGAN x4, Lanczos fallback) -------- | |
| hi_res = upscale_image(swapped, scale=4) | |
| # The 4K result is returned inline as a base64 data-URI so the user can | |
| # download it client-side (works on the HF Space too). We do NOT store it | |
| # server-side — nothing is written to disk (privacy + no disk growth). | |
| download_uri = _encode_image(hi_res, fmt="JPEG", quality=95) | |
| return jsonify({ | |
| "ok": True, | |
| "result_image": _encode_image(swapped, fmt="JPEG", quality=92), | |
| "download_image": download_uri, | |
| "quality": quality, | |
| "delta_e": round(delta_e, 2), | |
| "src_tone": src_tone, | |
| "tgt_tone": tgt_tone, | |
| "warnings": warnings, | |
| "name": user_name, | |
| }) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return jsonify({"ok": False, "error": str(e)}), 500 | |
| def _prewarm_models(): | |
| """Load heavy ML models at startup so the first swap request is fast.""" | |
| print("Pre-warming ML models (this takes ~30s on first run)...") | |
| try: | |
| from core.detector import _get_insightface | |
| from core.swapper import _load_swapper | |
| from core.super_res import _load_gfpgan, _load_realesrgan | |
| _get_insightface() | |
| _load_swapper() | |
| _load_gfpgan() | |
| _load_realesrgan() | |
| print("Models ready.") | |
| except Exception as e: | |
| print(f"Model pre-warm skipped: {e}") | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 5000)) | |
| debug = os.environ.get("FLASK_DEBUG", "true").lower() == "true" | |
| print(f"Starting Face Swap Web App on port {port}...") | |
| if not debug: | |
| _prewarm_models() | |
| app.run(debug=debug, use_reloader=False, host="0.0.0.0", port=port) | |