File size: 7,905 Bytes
8a16253
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
"""
Batch QA harness — swap many SOURCE faces onto every gender + location, save the
results, build a contact-sheet grid per location, and write a metrics report.

Put source faces in:
    tests/faces/male/*.jpg     tests/faces/female/*.jpg
(or pass --faces <dir>). If empty, falls back to external/HF_weights/input/.

Run on the GPU machine:
    python scripts/batch_test.py                 # face-only, all locations (fast)
    python scripts/batch_test.py --hair          # include HairFastGAN (slow!)
    python scripts/batch_test.py --gender Male --limit 5
    python scripts/batch_test.py --locations "ICT Department,Library"

Outputs:
    tests/output/<gender>/<location>/<source>.jpg
    tests/output/<gender>/<location>/_grid.jpg     (contact sheet)
    tests/output/report.csv                        (alignment/blend/ΔE/naturalness)
"""
import os, sys, csv, glob, time, argparse
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault("PYTHONIOENCODING", "utf-8")

try:
    from dotenv import load_dotenv; load_dotenv()
except Exception:
    pass

import cv2
import numpy as np
from utils.image_io import resize_keep_aspect
from core.detector import detect_faces, _get_insightface
from core.swapper import swap_face_insightface
from core.super_res import restore_faces, _device
from core.head_swap import (full_head_swap, swap_hair, match_skin_to_source,
                            transfer_glasses)
from core.hair_transfer import transfer_hair
from core.blender import laplacian_blend
from core.segmentor import segment_hair_neck_skin
from core.quality_checker import compute_quality_score, _naturalness_score
from core import supabase_store

ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
FACES_DIR = os.path.join(ROOT, "tests", "faces")
OUT_DIR   = os.path.join(ROOT, "tests", "output")
LOC_DIR   = os.path.join(ROOT, "Location")
EXTS = (".jpg", ".jpeg", ".png", ".webp", ".JPG", ".JPEG", ".PNG")


def list_sources(gender, faces_dir):
    d = os.path.join(faces_dir, gender.lower())
    files = [f for f in sorted(glob.glob(os.path.join(d, "*"))) if f.endswith(EXTS)]
    if not files:  # fallback sample set
        files = sorted(glob.glob(os.path.join(ROOT, "external", "HF_weights", "input", "*.png")))
    return files


def list_locations(gender):
    """Locations with a usable target image (Supabase if enabled, else local)."""
    if supabase_store.is_enabled():
        return [(l["folder"], None) for l in supabase_store.list_locations(gender) if l["has_image"]]
    out = []
    gdir = os.path.join(LOC_DIR, gender)
    if os.path.isdir(gdir):
        for name in sorted(os.listdir(gdir)):
            folder = os.path.join(gdir, name)
            if os.path.isdir(folder):
                imgs = [f for f in sorted(os.listdir(folder)) if f.endswith(EXTS)]
                if imgs:
                    out.append((name, os.path.join(folder, imgs[0])))
    return out


def load_target(gender, folder, local_path):
    if local_path:
        return cv2.imread(local_path)
    data = supabase_store.get_image_bytes(gender, folder)
    if not data:
        return None
    return cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR)


def run_swap(src, tgt, with_hair):
    fs, ft = detect_faces(src), detect_faces(tgt)
    if not fs or not ft:
        return None, {}
    base = tgt
    hs = full_head_swap(src, tgt)
    if hs is not None:
        base = hs
    sw = swap_face_insightface(src, base)
    sw = restore_faces(sw)
    sw = match_skin_to_source(sw, src, fs[0], ft[0], strength=0.75)
    if with_hair:
        try:
            hf = transfer_hair(face_bgr=sw, shape_bgr=src, color_bgr=src)
            if hf is not None:
                pad = int(max(hf.shape[:2]) * 0.4)
                hfp = cv2.copyMakeBorder(hf, pad, pad, pad, pad, cv2.BORDER_CONSTANT, value=(127, 127, 127))
                sw = swap_hair(sw, hfp, sw, include_face=False)
        except Exception as e:
            print("   hair err:", e)
    try:
        sw = transfer_glasses(sw, src)
        fm = segment_hair_neck_skin(sw).get("face_mask")
        if fm is not None and fm.max() > 0:
            sw = laplacian_blend(sw, tgt, fm, levels=4)
    except Exception:
        pass
    q = compute_quality_score(sw, tgt, None, None)
    return sw, q


def grid(images, cols=5, cell=220):
    if not images:
        return None
    rows = (len(images) + cols - 1) // cols
    canvas = np.full((rows * cell, cols * cell, 3), 240, np.uint8)
    for i, im in enumerate(images):
        r, c = divmod(i, cols)
        thumb = resize_keep_aspect(im, cell - 8)
        h, w = thumb.shape[:2]
        y, x = r * cell + (cell - h) // 2, c * cell + (cell - w) // 2
        canvas[y:y+h, x:x+w] = thumb
    return canvas


def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--faces", default=FACES_DIR)
    ap.add_argument("--gender", choices=["Male", "Female", "both"], default="both")
    ap.add_argument("--locations", default="", help="comma-separated location labels (default: all)")
    ap.add_argument("--limit", type=int, default=0, help="max sources per gender (0 = all)")
    ap.add_argument("--hair", action="store_true", help="include HairFastGAN (slow)")
    args = ap.parse_args()

    if _get_insightface() is None:
        print("InsightFace not available — aborting."); return 1
    print(f"device: {_device()}  hair: {args.hair}")

    genders = ["Male", "Female"] if args.gender == "both" else [args.gender]
    want_locs = {s.strip().lower() for s in args.locations.split(",") if s.strip()}
    os.makedirs(OUT_DIR, exist_ok=True)
    rows = [("gender", "location", "source", "alignment", "blend", "delta_e", "naturalness", "seconds")]
    total = 0

    for gender in genders:
        sources = list_sources(gender, args.faces)
        if args.limit:
            sources = sources[:args.limit]
        locs = list_locations(gender)
        if want_locs:
            from core.detector import _get_cascade  # noqa
            def clean(n): import re; return re.sub(r"^\s*\d+\.\s*", "", n).strip().lower()
            locs = [(f, p) for (f, p) in locs if clean(f) in want_locs]
        print(f"\n== {gender}: {len(sources)} sources x {len(locs)} locations ==")

        for folder, lpath in locs:
            tgt = load_target(gender, folder, lpath)
            if tgt is None:
                print(f"   [skip] {folder}: no target"); continue
            tgt = resize_keep_aspect(tgt, 1536 if _device() == "cuda" else 1024)
            odir = os.path.join(OUT_DIR, gender, folder.replace("/", "_"))
            os.makedirs(odir, exist_ok=True)
            results = []
            for sp in sources:
                src = resize_keep_aspect(cv2.imread(sp), 1536 if _device() == "cuda" else 1024)
                name = os.path.splitext(os.path.basename(sp))[0]
                t = time.time()
                sw, q = run_swap(src, tgt, args.hair)
                dt = time.time() - t
                if sw is None:
                    print(f"   [fail] {folder} <- {name}"); continue
                cv2.imwrite(os.path.join(odir, f"{name}.jpg"), sw)
                results.append(sw)
                rows.append((gender, folder, name, q.get("alignment"), q.get("blend"),
                             q.get("delta_e"), q.get("naturalness"), round(dt, 1)))
                total += 1
                print(f"   ok {folder} <- {name}  ({dt:.1f}s)  align={q.get('alignment')} nat={q.get('naturalness')}")
            g = grid(results)
            if g is not None:
                cv2.imwrite(os.path.join(odir, "_grid.jpg"), g)

    with open(os.path.join(OUT_DIR, "report.csv"), "w", newline="", encoding="utf-8") as f:
        csv.writer(f).writerows(rows)
    print(f"\nDone — {total} swaps. Report: tests/output/report.csv")
    return 0


if __name__ == "__main__":
    sys.exit(main())