File size: 4,846 Bytes
2979822
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""Crop and resize face images from bbox annotations."""

from __future__ import annotations

import argparse
import os
import shutil
from multiprocessing import Pool, cpu_count
from typing import Tuple

import cv2
import pandas as pd
from tqdm import tqdm


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(description="Dataset preparation (crop + resize)")
    p.add_argument("--orig_dir", required=True, help="Path to dataset root")
    p.add_argument("--crop_dir", required=True, help="Output folder for cropped images")
    p.add_argument("--size", type=int, default=224, help="Output image size (square)")
    p.add_argument(
        "--bbox_expansion_factor", type=float, default=1.5, help="Crop expansion factor"
    )
    p.add_argument(
        "--label_dir",
        default="metas/labels",
        help="Relative label folder under orig_dir (default: metas/labels)",
    )
    p.add_argument(
        "--spoof_types",
        type=int,
        nargs="+",
        default=[0, 1, 2, 3, 7, 8, 9],
        help="Keep these label type codes (if applicable)",
    )
    return p.parse_args()


def _process_single_image(args_bundle: Tuple[str, str, str, int, float]) -> None:
    _, full_img_path, save_path, target_size, bbox_expansion_factor = args_bundle

    img = cv2.imread(full_img_path)
    if img is None:
        return

    original_height, original_width = img.shape[:2]

    bbox_file = full_img_path.replace(".jpg", "_BB.txt").replace(".png", "_BB.txt")
    if not os.path.exists(bbox_file):
        return

    try:
        with open(bbox_file, "r", encoding="utf-8") as f:
            line = f.readline().strip().split(" ")
            x_ref, y_ref, w_ref, h_ref = map(float, line[:4])

        x = int(x_ref * (original_width / 224))
        w = int(w_ref * (original_width / 224))
        y = int(y_ref * (original_height / 224))
        h = int(h_ref * (original_height / 224))
    except Exception:
        return

    center_x = x + w // 2
    center_y = y + h // 2
    side_len = int(max(w, h) * bbox_expansion_factor)

    x1 = center_x - side_len // 2
    y1 = center_y - side_len // 2
    x2 = x1 + side_len
    y2 = y1 + side_len

    pad_top = max(0, -y1)
    pad_bottom = max(0, y2 - original_height)
    pad_left = max(0, -x1)
    pad_right = max(0, x2 - original_width)

    if pad_top or pad_bottom or pad_left or pad_right:
        img = cv2.copyMakeBorder(
            img,
            pad_top,
            pad_bottom,
            pad_left,
            pad_right,
            cv2.BORDER_REFLECT_101,
        )
        x1 += pad_left
        y1 += pad_top
        x2 += pad_left
        y2 += pad_top

    face_crop = img[y1:y2, x1:x2]
    if face_crop.size == 0:
        return

    h_crop, w_crop = face_crop.shape[:2]
    crop_size = min(h_crop, w_crop)
    interp = cv2.INTER_LANCZOS4 if crop_size < target_size else cv2.INTER_AREA
    final_img = cv2.resize(face_crop, (target_size, target_size), interpolation=interp)

    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    cv2.imwrite(save_path, final_img)


def main(argv: list[str] | None = None) -> int:
    args = parse_args() if argv is None else parse_args()

    orig_dir = args.orig_dir
    if not orig_dir.endswith("/") and not orig_dir.endswith("\\"):
        orig_dir = orig_dir + os.sep

    label_dir = os.path.join(orig_dir, args.label_dir)

    train_json = os.path.join(label_dir, "train_label.json")
    test_json = os.path.join(label_dir, "test_label.json")

    if not os.path.exists(train_json) or not os.path.exists(test_json):
        raise FileNotFoundError(f"Label JSON files not found under: {label_dir}")

    train_df = pd.read_json(train_json, orient="index")
    test_df = pd.read_json(test_json, orient="index")

    if 40 in train_df.columns and args.spoof_types:
        train_df = train_df[train_df[40].isin(args.spoof_types)]
    if 40 in test_df.columns and args.spoof_types:
        test_df = test_df[test_df[40].isin(args.spoof_types)]

    all_files = pd.concat([train_df, test_df])

    tasks = []
    for index_path, _ in all_files.iterrows():
        full_img_path = os.path.join(orig_dir, str(index_path))
        rel_path = str(index_path)
        save_path = os.path.join(args.crop_dir, rel_path)
        tasks.append(
            (rel_path, full_img_path, save_path, args.size, args.bbox_expansion_factor)
        )

    with Pool(cpu_count()) as pool:
        list(tqdm(pool.imap_unordered(_process_single_image, tasks), total=len(tasks)))

    out_label_dir = os.path.join(args.crop_dir, "metas", "labels")
    os.makedirs(out_label_dir, exist_ok=True)
    shutil.copy(train_json, out_label_dir)
    shutil.copy(test_json, out_label_dir)

    print(f"done | wrote cropped dataset to: {args.crop_dir}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())