Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Rebuild the prepared classification arrays from clean raw Kinect data. | |
| Replaces the broken ``prepare_classification_problems.py`` whose | |
| "first 39 features per frame" slice silently captured 3 metadata columns | |
| (FrameNo, timestamp, padding-zero) from the 102-feature processed format, | |
| producing 1.27e9-magnitude garbage in "joint 0" and shifting all later | |
| joints by one axis. That made the BatchNorm-first-layer model learn on | |
| fantasy features and always predict "good" when the app fed real | |
| coordinate-scale inputs. | |
| This v2 reads the original 40-column raw Kinect CSVs directly | |
| (``A13/kinect_good_vs_bad_not_preprocessed/``) and builds clean | |
| (10, 13, 3) sequences with real meter-scale joint coordinates. | |
| Outputs to ``A13/classification_problems/prepared_data/`` the exact | |
| file set expected by ``A13/dl_models/data_loader.py``: | |
| {A,B}_{Dense,CNN}_{train,train_aug,test}_{X,y}.npy | |
| {A,B}_{Dense,CNN}_{train_aug,test}_filenames.npy | |
| Problem A = 3D (Kinect, 13 joints x 3 dims). | |
| Problem B = 2D (x,y projection of the same Kinect data; the repo | |
| does not contain PoseNet recordings for the Good-vs-Bad clips, so we | |
| project rather than guess. The architecture and CV protocol are | |
| unchanged; only the input channel count differs). | |
| Augmentations applied to the training set only (test never augmented): | |
| _mirror : negate x coordinates | |
| _rotate_pos : +10 deg around vertical (Y) axis | |
| _rotate_neg : -10 deg around vertical (Y) axis | |
| _stretch : isotropic scale by 1.05 | |
| Run:: | |
| python -m A13.classification_problems.prepare_classification_data_v2 | |
| """ | |
| from __future__ import annotations | |
| from pathlib import Path | |
| import sys | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.model_selection import train_test_split | |
| # --------------------------------------------------------------------- paths | |
| THIS_DIR = Path(__file__).resolve().parent | |
| RAW_DIR = THIS_DIR.parent / "kinect_good_vs_bad_not_preprocessed" | |
| OUT_DIR = THIS_DIR / "prepared_data" | |
| # --------------------------------------------------------------------- consts | |
| FRAMES = 10 | |
| JOINTS = 13 # head + 6 upper-body + 6 lower-body, matches the CSV schema | |
| DIMS = 3 | |
| RANDOM_STATE = 42 | |
| TEST_SIZE = 0.2 | |
| ROT_DEG = 10.0 | |
| STRETCH = 1.05 | |
| def log(msg: str) -> None: | |
| print(msg, flush=True) | |
| # ------------------------------------------------------------------ labeling | |
| def label_from_filename(stem: str) -> int: | |
| """G* or A1 -> 1 (good); W* -> 0 (bad). Matches the original spec.""" | |
| if stem == "A1" or stem.startswith("G"): | |
| return 1 | |
| if stem.startswith("W"): | |
| return 0 | |
| raise ValueError(f"Unknown label for {stem!r}") | |
| # ----------------------------------------------------------------- load clip | |
| def load_clip(csv_path: Path) -> np.ndarray: | |
| """Return (FRAMES, JOINTS, DIMS) float32 array of joint coords.""" | |
| df = pd.read_csv(csv_path) | |
| df.columns = [c.strip() for c in df.columns] | |
| # Drop FrameNo; the remaining 39 cols are 13 joints x (x, y, z). | |
| if "FrameNo" not in df.columns: | |
| raise ValueError(f"{csv_path.name}: expected a FrameNo column") | |
| coords = df.drop(columns=["FrameNo"]).values.astype("float32") | |
| n_rows, n_cols = coords.shape | |
| if n_cols != JOINTS * DIMS: | |
| raise ValueError( | |
| f"{csv_path.name}: expected {JOINTS * DIMS} coord cols, got {n_cols}" | |
| ) | |
| # Equidistant subsample to FRAMES; if shorter, pad with last frame. | |
| if n_rows >= FRAMES: | |
| idx = np.linspace(0, n_rows - 1, FRAMES, dtype=int) | |
| seq = coords[idx] | |
| else: | |
| seq = np.zeros((FRAMES, n_cols), dtype="float32") | |
| seq[:n_rows] = coords | |
| if n_rows > 0: | |
| seq[n_rows:] = coords[-1] | |
| return seq.reshape(FRAMES, JOINTS, DIMS) | |
| # ------------------------------------------------------------- augmentations | |
| def aug_mirror(seq: np.ndarray) -> np.ndarray: | |
| out = seq.copy() | |
| out[..., 0] = -out[..., 0] | |
| return out | |
| def _rotate_y(seq: np.ndarray, deg: float) -> np.ndarray: | |
| r = np.deg2rad(deg) | |
| c, s = np.cos(r), np.sin(r) | |
| out = seq.copy() | |
| x = seq[..., 0] | |
| z = seq[..., 2] | |
| out[..., 0] = c * x + s * z | |
| out[..., 2] = -s * x + c * z | |
| return out | |
| def aug_rotate_pos(seq: np.ndarray) -> np.ndarray: | |
| return _rotate_y(seq, +ROT_DEG) | |
| def aug_rotate_neg(seq: np.ndarray) -> np.ndarray: | |
| return _rotate_y(seq, -ROT_DEG) | |
| def aug_stretch(seq: np.ndarray) -> np.ndarray: | |
| return seq * STRETCH | |
| AUGS = [ | |
| ("_mirror", aug_mirror), | |
| ("_rotate_pos", aug_rotate_pos), | |
| ("_rotate_neg", aug_rotate_neg), | |
| ("_stretch", aug_stretch), | |
| ] | |
| # ----------------------------------------------------------------- pipeline | |
| def collect_clips() -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| files = sorted(p for p in RAW_DIR.glob("*.csv")) | |
| if not files: | |
| raise FileNotFoundError(f"No CSVs in {RAW_DIR}") | |
| log(f"[1] reading {len(files)} clips from {RAW_DIR}") | |
| seqs, labels, names = [], [], [] | |
| for i, p in enumerate(files): | |
| stem = p.stem | |
| try: | |
| y = label_from_filename(stem) | |
| except ValueError as e: | |
| log(f" skip {stem}: {e}") | |
| continue | |
| seq = load_clip(p) | |
| seqs.append(seq) | |
| labels.append(y) | |
| names.append(stem) | |
| if (i + 1) % 25 == 0: | |
| log(f" loaded {i + 1}/{len(files)}") | |
| X = np.stack(seqs).astype("float32") # (N, 10, 13, 3) | |
| y = np.asarray(labels, dtype="int32") # (N,) | |
| fn = np.asarray(names, dtype=object) # (N,) | |
| log(f" -> X {X.shape} y {y.shape} good={int(y.sum())} bad={int((y == 0).sum())}") | |
| log(f" coord scale: min={X.min():.3g} max={X.max():.3g} mean={X.mean():.3g}") | |
| return X, y, fn | |
| def split(X, y, fn): | |
| log(f"[2] stratified split test_size={TEST_SIZE} random_state={RANDOM_STATE}") | |
| Xtr, Xte, ytr, yte, ftr, fte = train_test_split( | |
| X, y, fn, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y | |
| ) | |
| log(f" train: {Xtr.shape} good={int(ytr.sum())}/{len(ytr)}") | |
| log(f" test: {Xte.shape} good={int(yte.sum())}/{len(yte)}") | |
| return Xtr, ytr, ftr, Xte, yte, fte | |
| def augment(Xtr, ytr, ftr): | |
| log(f"[3] augmenting train (originals + {len(AUGS)} variants each)") | |
| X_all = [Xtr] | |
| y_all = [ytr] | |
| f_all = [ftr] | |
| for suf, fn in AUGS: | |
| X_all.append(np.stack([fn(s) for s in Xtr])) | |
| y_all.append(ytr.copy()) | |
| f_all.append(np.asarray([f"{n}{suf}" for n in ftr], dtype=object)) | |
| X = np.concatenate(X_all, axis=0).astype("float32") | |
| y = np.concatenate(y_all, axis=0).astype("int32") | |
| f = np.concatenate(f_all, axis=0) | |
| log(f" -> aug train: {X.shape} good={int(y.sum())}/{len(y)}") | |
| return X, y, f | |
| def save_problem(problem: str, dims_keep: int, | |
| Xtr, ytr, ftr, | |
| Xtr_aug, ytr_aug, ftr_aug, | |
| Xte, yte, fte): | |
| """Slice last axis to ``dims_keep`` and write Dense+CNN variants.""" | |
| def proj(X): | |
| return X[..., :dims_keep] | |
| Xtr_p = proj(Xtr) | |
| Xtr_aug_p = proj(Xtr_aug) | |
| Xte_p = proj(Xte) | |
| # Dense = flatten | |
| n_feat = FRAMES * JOINTS * dims_keep | |
| pairs_dense = { | |
| f"{problem}_Dense_train_X": Xtr_p.reshape(len(Xtr_p), n_feat), | |
| f"{problem}_Dense_train_y": ytr, | |
| f"{problem}_Dense_train_aug_X": Xtr_aug_p.reshape(len(Xtr_aug_p), n_feat), | |
| f"{problem}_Dense_train_aug_y": ytr_aug, | |
| f"{problem}_Dense_train_aug_filenames": ftr_aug, | |
| f"{problem}_Dense_test_X": Xte_p.reshape(len(Xte_p), n_feat), | |
| f"{problem}_Dense_test_y": yte, | |
| f"{problem}_Dense_test_filenames": fte, | |
| } | |
| # CNN = keep (frames, joints, dims) | |
| pairs_cnn = { | |
| f"{problem}_CNN_train_X": Xtr_p, | |
| f"{problem}_CNN_train_y": ytr, | |
| f"{problem}_CNN_train_aug_X": Xtr_aug_p, | |
| f"{problem}_CNN_train_aug_y": ytr_aug, | |
| f"{problem}_CNN_train_aug_filenames": ftr_aug, | |
| f"{problem}_CNN_test_X": Xte_p, | |
| f"{problem}_CNN_test_y": yte, | |
| f"{problem}_CNN_test_filenames": fte, | |
| } | |
| OUT_DIR.mkdir(parents=True, exist_ok=True) | |
| for name, arr in {**pairs_dense, **pairs_cnn}.items(): | |
| np.save(OUT_DIR / f"{name}.npy", arr) | |
| log( | |
| f" wrote 16 files for problem {problem} " | |
| f"(Dense {n_feat}-feat, CNN {(FRAMES, JOINTS, dims_keep)})" | |
| ) | |
| def main(): | |
| log("=" * 70) | |
| log("prepare_classification_data_v2: clean rebuild from raw Kinect CSVs") | |
| log("=" * 70) | |
| X, y, fn = collect_clips() | |
| Xtr, ytr, ftr, Xte, yte, fte = split(X, y, fn) | |
| Xtr_aug, ytr_aug, ftr_aug = augment(Xtr, ytr, ftr) | |
| log("[4] writing Problem A (3D Kinect, 13x3)") | |
| save_problem("A", 3, Xtr, ytr, ftr, Xtr_aug, ytr_aug, ftr_aug, Xte, yte, fte) | |
| log("[5] writing Problem B (2D x,y projection of Kinect, 13x2)") | |
| save_problem("B", 2, Xtr, ytr, ftr, Xtr_aug, ytr_aug, ftr_aug, Xte, yte, fte) | |
| log(f"[6] done. output dir: {OUT_DIR}") | |
| if __name__ == "__main__": | |
| sys.exit(main() or 0) | |