File size: 11,259 Bytes
22a6915
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
"""
Single source for pooled train/val/test data and splits.

- Data: load_all_pooled() / load_per_person() from data/collected_*/*.npz (same pattern everywhere).
- Splits: get_numpy_splits() / get_dataloaders() use stratified train/val/test with a fixed seed from config.
- Test is held out before any preprocessing; StandardScaler is fit on train only, then applied to val and test.
"""

import os
import glob

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

torch = None
Dataset = object  # type: ignore
DataLoader = None

# Defaults for stratified split (overridden by config when available)
_DEFAULT_SPLIT_RATIOS = (0.7, 0.15, 0.15)
_DEFAULT_SPLIT_SEED = 42


def _require_torch():
    global torch, Dataset, DataLoader
    if torch is None:
        try:
            import torch as _torch
            from torch.utils.data import Dataset as _Dataset, DataLoader as _DataLoader
        except ImportError as exc:  # pragma: no cover
            raise ImportError("PyTorch not installed") from exc

        torch = _torch
        Dataset = _Dataset  # type: ignore
        DataLoader = _DataLoader  # type: ignore

    return torch, Dataset, DataLoader

DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")

SELECTED_FEATURES = {
    "face_orientation": [
        'head_deviation', 's_face', 's_eye', 'h_gaze', 'pitch',
        'ear_left', 'ear_avg', 'ear_right', 'gaze_offset', 'perclos'
    ],
    "eye_behaviour": [
        'ear_left', 'ear_right', 'ear_avg', 'mar',
        'blink_rate', 'closure_duration', 'perclos', 'yawn_duration'
    ]
}


class FeatureVectorDataset(Dataset):
    def __init__(self, features: np.ndarray, labels: np.ndarray):
        torch_mod, _, _ = _require_torch()
        self.features = torch_mod.tensor(features, dtype=torch_mod.float32)
        self.labels = torch_mod.tensor(labels, dtype=torch_mod.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]


# ── Low-level helpers ────────────────────────────────────────────────────

def _clean_npz(raw, names):
    """Apply clipping rules in-place. Shared by all loaders."""
    for col, lo, hi in [('yaw', -45, 45), ('pitch', -30, 30), ('roll', -30, 30)]:
        if col in names:
            raw[:, names.index(col)] = np.clip(raw[:, names.index(col)], lo, hi)
    for feat in ['ear_left', 'ear_right', 'ear_avg']:
        if feat in names:
            raw[:, names.index(feat)] = np.clip(raw[:, names.index(feat)], 0, 0.85)
    return raw


def _load_one_npz(npz_path, target_features):
    """Load a single .npz file, clean and select features. Returns (X, y, selected_feature_names)."""
    data = np.load(npz_path, allow_pickle=True)
    raw = data['features'].astype(np.float32)
    labels = data['labels'].astype(np.int64)
    names = list(data['feature_names'])
    raw = _clean_npz(raw, names)
    selected = [f for f in target_features if f in names]
    idx = [names.index(f) for f in selected]
    return raw[:, idx], labels, selected


# ── Public data loaders ──────────────────────────────────────────────────

def load_all_pooled(model_name: str = "face_orientation", data_dir: str = None):
    """Load all collected_*/*.npz, clean, select features, concatenate.

    Returns (X_all, y_all, all_feature_names).
    """
    data_dir = data_dir or DATA_DIR
    target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
    pattern = os.path.join(data_dir, "collected_*", "*.npz")
    npz_files = sorted(glob.glob(pattern))

    if not npz_files:
        raise FileNotFoundError(
            f"No .npz files matching {pattern}. "
            "Collect data first with `python -m models.collect_features --name <name>`."
        )

    all_X, all_y = [], []
    all_names = None
    for npz_path in npz_files:
        X, y, names = _load_one_npz(npz_path, target_features)
        if all_names is None:
            all_names = names
        all_X.append(X)
        all_y.append(y)
        print(f"[DATA]   + {os.path.basename(npz_path)}: {X.shape[0]} samples")

    X_all = np.concatenate(all_X, axis=0)
    y_all = np.concatenate(all_y, axis=0)
    print(f"[DATA] Loaded {len(npz_files)} file(s) for '{model_name}': "
          f"{X_all.shape[0]} total samples, {X_all.shape[1]} features")
    return X_all, y_all, all_names


def load_per_person(model_name: str = "face_orientation", data_dir: str = None):
    """Load collected_*/*.npz grouped by person (folder name).

    Returns dict { person_name: (X, y) } where X/y are per-person numpy arrays.
    Also returns (X_all, y_all) as pooled data.
    """
    data_dir = data_dir or DATA_DIR
    target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
    pattern = os.path.join(data_dir, "collected_*", "*.npz")
    npz_files = sorted(glob.glob(pattern))

    if not npz_files:
        raise FileNotFoundError(f"No .npz files matching {pattern}")

    by_person = {}
    all_X, all_y = [], []
    for npz_path in npz_files:
        folder = os.path.basename(os.path.dirname(npz_path))
        person = folder.replace("collected_", "", 1)
        X, y, _ = _load_one_npz(npz_path, target_features)
        all_X.append(X)
        all_y.append(y)
        if person not in by_person:
            by_person[person] = []
        by_person[person].append((X, y))
        print(f"[DATA]   + {person}/{os.path.basename(npz_path)}: {X.shape[0]} samples")

    for person, chunks in by_person.items():
        by_person[person] = (
            np.concatenate([c[0] for c in chunks], axis=0),
            np.concatenate([c[1] for c in chunks], axis=0),
        )

    X_all = np.concatenate(all_X, axis=0)
    y_all = np.concatenate(all_y, axis=0)
    print(f"[DATA] {len(by_person)} persons, {X_all.shape[0]} total samples, {X_all.shape[1]} features")
    return by_person, X_all, y_all


def load_raw_npz(npz_path):
    """Load a single .npz without cleaning or feature selection. For exploration notebooks."""
    data = np.load(npz_path, allow_pickle=True)
    features = data['features'].astype(np.float32)
    labels = data['labels'].astype(np.int64)
    names = list(data['feature_names'])
    return features, labels, names


# ── Legacy helpers (used by models/mlp/train.py and models/xgboost/train.py) ─

def _load_real_data(model_name: str):
    X, y, _ = load_all_pooled(model_name)
    return X, y


def _generate_synthetic_data(model_name: str):
    target_features = SELECTED_FEATURES.get(model_name, SELECTED_FEATURES["face_orientation"])
    n = 500
    d = len(target_features)
    c = 2
    rng = np.random.RandomState(42)
    features = rng.randn(n, d).astype(np.float32)
    labels = rng.randint(0, c, size=n).astype(np.int64)
    print(f"[DATA] Using synthetic data for '{model_name}': {n} samples, {d} features, {c} classes")
    return features, labels


def get_default_split_config():
    """Return (split_ratios, seed) from config so all scripts use the same split. Reproducible and consistent."""
    try:
        from config import get
        data = get("data") or {}
        ratios = data.get("split_ratios", list(_DEFAULT_SPLIT_RATIOS))
        seed = get("mlp.seed") or _DEFAULT_SPLIT_SEED
        return (tuple(ratios), int(seed))
    except Exception:
        return (_DEFAULT_SPLIT_RATIOS, _DEFAULT_SPLIT_SEED)


def _split_and_scale(features, labels, split_ratios, seed, scale):
    """Stratified train/val/test split. Test is held out first; val is split from the rest.
    No training data is used for validation or test. Scaler is fit on train only, then
    applied to val and test (no leakage from val/test into scaling).
    """
    test_ratio = split_ratios[2]
    val_ratio = split_ratios[1] / (split_ratios[0] + split_ratios[1])

    X_train_val, X_test, y_train_val, y_test = train_test_split(
        features, labels, test_size=test_ratio, random_state=seed, stratify=labels,
    )
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_val, y_train_val, test_size=val_ratio, random_state=seed, stratify=y_train_val,
    )

    scaler = None
    if scale:
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_val = scaler.transform(X_val)
        X_test = scaler.transform(X_test)
        print("[DATA] Applied StandardScaler (fitted on training split only)")

    splits = {
        "X_train": X_train, "y_train": y_train,
        "X_val": X_val,     "y_val": y_val,
        "X_test": X_test,   "y_test": y_test,
    }

    print(f"[DATA] Split (stratified): train={len(y_train)}, val={len(y_val)}, test={len(y_test)}")
    return splits, scaler


def get_numpy_splits(model_name: str, split_ratios=None, seed=None, scale: bool = True):
    """Return train/val/test numpy arrays. Uses config defaults for split_ratios/seed when None.
    Same dataset and split logic as get_dataloaders for consistent evaluation."""
    if split_ratios is None or seed is None:
        _ratios, _seed = get_default_split_config()
        split_ratios = split_ratios if split_ratios is not None else _ratios
        seed = seed if seed is not None else _seed
    features, labels = _load_real_data(model_name)
    num_features = features.shape[1]
    num_classes = int(labels.max()) + 1
    if num_classes < 2:
        raise ValueError("Dataset has only one class; need at least 2 for classification.")
    splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)
    return splits, num_features, num_classes, scaler


def get_dataloaders(model_name: str, batch_size: int = 32, split_ratios=None, seed=None, scale: bool = True):
    """Return PyTorch DataLoaders. Uses config defaults for split_ratios/seed when None.
    Test set is held out before preprocessing; scaler fit on train only."""
    if split_ratios is None or seed is None:
        _ratios, _seed = get_default_split_config()
        split_ratios = split_ratios if split_ratios is not None else _ratios
        seed = seed if seed is not None else _seed
    _, _, dataloader_cls = _require_torch()
    features, labels = _load_real_data(model_name)
    num_features = features.shape[1]
    num_classes = int(labels.max()) + 1
    if num_classes < 2:
        raise ValueError("Dataset has only one class; need at least 2 for classification.")
    splits, scaler = _split_and_scale(features, labels, split_ratios, seed, scale)

    train_ds = FeatureVectorDataset(splits["X_train"], splits["y_train"])
    val_ds   = FeatureVectorDataset(splits["X_val"],   splits["y_val"])
    test_ds  = FeatureVectorDataset(splits["X_test"],  splits["y_test"])

    train_loader = dataloader_cls(train_ds, batch_size=batch_size, shuffle=True)
    val_loader   = dataloader_cls(val_ds,   batch_size=batch_size, shuffle=False)
    test_loader  = dataloader_cls(test_ds,  batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader, num_features, num_classes, scaler