pose-deep-learning / A13 /classification_problems /prepare_classification_data_v2.py
Amol Kaushik
classifiers garbage training issue resolved
4df89bc
#!/usr/bin/env python3
"""Rebuild the prepared classification arrays from clean raw Kinect data.
Replaces the broken ``prepare_classification_problems.py`` whose
"first 39 features per frame" slice silently captured 3 metadata columns
(FrameNo, timestamp, padding-zero) from the 102-feature processed format,
producing 1.27e9-magnitude garbage in "joint 0" and shifting all later
joints by one axis. That made the BatchNorm-first-layer model learn on
fantasy features and always predict "good" when the app fed real
coordinate-scale inputs.
This v2 reads the original 40-column raw Kinect CSVs directly
(``A13/kinect_good_vs_bad_not_preprocessed/``) and builds clean
(10, 13, 3) sequences with real meter-scale joint coordinates.
Outputs to ``A13/classification_problems/prepared_data/`` the exact
file set expected by ``A13/dl_models/data_loader.py``:
{A,B}_{Dense,CNN}_{train,train_aug,test}_{X,y}.npy
{A,B}_{Dense,CNN}_{train_aug,test}_filenames.npy
Problem A = 3D (Kinect, 13 joints x 3 dims).
Problem B = 2D (x,y projection of the same Kinect data; the repo
does not contain PoseNet recordings for the Good-vs-Bad clips, so we
project rather than guess. The architecture and CV protocol are
unchanged; only the input channel count differs).
Augmentations applied to the training set only (test never augmented):
_mirror : negate x coordinates
_rotate_pos : +10 deg around vertical (Y) axis
_rotate_neg : -10 deg around vertical (Y) axis
_stretch : isotropic scale by 1.05
Run::
python -m A13.classification_problems.prepare_classification_data_v2
"""
from __future__ import annotations
from pathlib import Path
import sys
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
# --------------------------------------------------------------------- paths
THIS_DIR = Path(__file__).resolve().parent
RAW_DIR = THIS_DIR.parent / "kinect_good_vs_bad_not_preprocessed"
OUT_DIR = THIS_DIR / "prepared_data"
# --------------------------------------------------------------------- consts
FRAMES = 10
JOINTS = 13 # head + 6 upper-body + 6 lower-body, matches the CSV schema
DIMS = 3
RANDOM_STATE = 42
TEST_SIZE = 0.2
ROT_DEG = 10.0
STRETCH = 1.05
def log(msg: str) -> None:
print(msg, flush=True)
# ------------------------------------------------------------------ labeling
def label_from_filename(stem: str) -> int:
"""G* or A1 -> 1 (good); W* -> 0 (bad). Matches the original spec."""
if stem == "A1" or stem.startswith("G"):
return 1
if stem.startswith("W"):
return 0
raise ValueError(f"Unknown label for {stem!r}")
# ----------------------------------------------------------------- load clip
def load_clip(csv_path: Path) -> np.ndarray:
"""Return (FRAMES, JOINTS, DIMS) float32 array of joint coords."""
df = pd.read_csv(csv_path)
df.columns = [c.strip() for c in df.columns]
# Drop FrameNo; the remaining 39 cols are 13 joints x (x, y, z).
if "FrameNo" not in df.columns:
raise ValueError(f"{csv_path.name}: expected a FrameNo column")
coords = df.drop(columns=["FrameNo"]).values.astype("float32")
n_rows, n_cols = coords.shape
if n_cols != JOINTS * DIMS:
raise ValueError(
f"{csv_path.name}: expected {JOINTS * DIMS} coord cols, got {n_cols}"
)
# Equidistant subsample to FRAMES; if shorter, pad with last frame.
if n_rows >= FRAMES:
idx = np.linspace(0, n_rows - 1, FRAMES, dtype=int)
seq = coords[idx]
else:
seq = np.zeros((FRAMES, n_cols), dtype="float32")
seq[:n_rows] = coords
if n_rows > 0:
seq[n_rows:] = coords[-1]
return seq.reshape(FRAMES, JOINTS, DIMS)
# ------------------------------------------------------------- augmentations
def aug_mirror(seq: np.ndarray) -> np.ndarray:
out = seq.copy()
out[..., 0] = -out[..., 0]
return out
def _rotate_y(seq: np.ndarray, deg: float) -> np.ndarray:
r = np.deg2rad(deg)
c, s = np.cos(r), np.sin(r)
out = seq.copy()
x = seq[..., 0]
z = seq[..., 2]
out[..., 0] = c * x + s * z
out[..., 2] = -s * x + c * z
return out
def aug_rotate_pos(seq: np.ndarray) -> np.ndarray:
return _rotate_y(seq, +ROT_DEG)
def aug_rotate_neg(seq: np.ndarray) -> np.ndarray:
return _rotate_y(seq, -ROT_DEG)
def aug_stretch(seq: np.ndarray) -> np.ndarray:
return seq * STRETCH
AUGS = [
("_mirror", aug_mirror),
("_rotate_pos", aug_rotate_pos),
("_rotate_neg", aug_rotate_neg),
("_stretch", aug_stretch),
]
# ----------------------------------------------------------------- pipeline
def collect_clips() -> tuple[np.ndarray, np.ndarray, np.ndarray]:
files = sorted(p for p in RAW_DIR.glob("*.csv"))
if not files:
raise FileNotFoundError(f"No CSVs in {RAW_DIR}")
log(f"[1] reading {len(files)} clips from {RAW_DIR}")
seqs, labels, names = [], [], []
for i, p in enumerate(files):
stem = p.stem
try:
y = label_from_filename(stem)
except ValueError as e:
log(f" skip {stem}: {e}")
continue
seq = load_clip(p)
seqs.append(seq)
labels.append(y)
names.append(stem)
if (i + 1) % 25 == 0:
log(f" loaded {i + 1}/{len(files)}")
X = np.stack(seqs).astype("float32") # (N, 10, 13, 3)
y = np.asarray(labels, dtype="int32") # (N,)
fn = np.asarray(names, dtype=object) # (N,)
log(f" -> X {X.shape} y {y.shape} good={int(y.sum())} bad={int((y == 0).sum())}")
log(f" coord scale: min={X.min():.3g} max={X.max():.3g} mean={X.mean():.3g}")
return X, y, fn
def split(X, y, fn):
log(f"[2] stratified split test_size={TEST_SIZE} random_state={RANDOM_STATE}")
Xtr, Xte, ytr, yte, ftr, fte = train_test_split(
X, y, fn, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y
)
log(f" train: {Xtr.shape} good={int(ytr.sum())}/{len(ytr)}")
log(f" test: {Xte.shape} good={int(yte.sum())}/{len(yte)}")
return Xtr, ytr, ftr, Xte, yte, fte
def augment(Xtr, ytr, ftr):
log(f"[3] augmenting train (originals + {len(AUGS)} variants each)")
X_all = [Xtr]
y_all = [ytr]
f_all = [ftr]
for suf, fn in AUGS:
X_all.append(np.stack([fn(s) for s in Xtr]))
y_all.append(ytr.copy())
f_all.append(np.asarray([f"{n}{suf}" for n in ftr], dtype=object))
X = np.concatenate(X_all, axis=0).astype("float32")
y = np.concatenate(y_all, axis=0).astype("int32")
f = np.concatenate(f_all, axis=0)
log(f" -> aug train: {X.shape} good={int(y.sum())}/{len(y)}")
return X, y, f
def save_problem(problem: str, dims_keep: int,
Xtr, ytr, ftr,
Xtr_aug, ytr_aug, ftr_aug,
Xte, yte, fte):
"""Slice last axis to ``dims_keep`` and write Dense+CNN variants."""
def proj(X):
return X[..., :dims_keep]
Xtr_p = proj(Xtr)
Xtr_aug_p = proj(Xtr_aug)
Xte_p = proj(Xte)
# Dense = flatten
n_feat = FRAMES * JOINTS * dims_keep
pairs_dense = {
f"{problem}_Dense_train_X": Xtr_p.reshape(len(Xtr_p), n_feat),
f"{problem}_Dense_train_y": ytr,
f"{problem}_Dense_train_aug_X": Xtr_aug_p.reshape(len(Xtr_aug_p), n_feat),
f"{problem}_Dense_train_aug_y": ytr_aug,
f"{problem}_Dense_train_aug_filenames": ftr_aug,
f"{problem}_Dense_test_X": Xte_p.reshape(len(Xte_p), n_feat),
f"{problem}_Dense_test_y": yte,
f"{problem}_Dense_test_filenames": fte,
}
# CNN = keep (frames, joints, dims)
pairs_cnn = {
f"{problem}_CNN_train_X": Xtr_p,
f"{problem}_CNN_train_y": ytr,
f"{problem}_CNN_train_aug_X": Xtr_aug_p,
f"{problem}_CNN_train_aug_y": ytr_aug,
f"{problem}_CNN_train_aug_filenames": ftr_aug,
f"{problem}_CNN_test_X": Xte_p,
f"{problem}_CNN_test_y": yte,
f"{problem}_CNN_test_filenames": fte,
}
OUT_DIR.mkdir(parents=True, exist_ok=True)
for name, arr in {**pairs_dense, **pairs_cnn}.items():
np.save(OUT_DIR / f"{name}.npy", arr)
log(
f" wrote 16 files for problem {problem} "
f"(Dense {n_feat}-feat, CNN {(FRAMES, JOINTS, dims_keep)})"
)
def main():
log("=" * 70)
log("prepare_classification_data_v2: clean rebuild from raw Kinect CSVs")
log("=" * 70)
X, y, fn = collect_clips()
Xtr, ytr, ftr, Xte, yte, fte = split(X, y, fn)
Xtr_aug, ytr_aug, ftr_aug = augment(Xtr, ytr, ftr)
log("[4] writing Problem A (3D Kinect, 13x3)")
save_problem("A", 3, Xtr, ytr, ftr, Xtr_aug, ytr_aug, ftr_aug, Xte, yte, fte)
log("[5] writing Problem B (2D x,y projection of Kinect, 13x2)")
save_problem("B", 2, Xtr, ytr, ftr, Xtr_aug, ytr_aug, ftr_aug, Xte, yte, fte)
log(f"[6] done. output dir: {OUT_DIR}")
if __name__ == "__main__":
sys.exit(main() or 0)