Spaces:
Running
Running
| """ | |
| A10 Data Loader Module - Issue #40 | |
| =================================== | |
| Data loading and preprocessing for 2D PoseNet/MoveNet to 2D Kinect mapping. | |
| Per Issue #40 specification (clarified by Rasa): | |
| - Input: PoseNet/MoveNet 2D keypoints (xpn, ypn) -> 26 features | |
| - Output: Kinect 2D keypoints (xk, yk) -> 26 features | |
| NOTE: The one-step variant (xpn,ypn) -> (xk,yk,zk) is Issue #41 (separate task). | |
| This module supports three output modes for flexibility: | |
| - 'xy' : Kinect (xk, yk) -> 26 features [Issue #40 - PRIMARY] | |
| - 'z' : Kinect zk only -> 13 features [legacy depth-only] | |
| - 'xyz' : Kinect (xk, yk, zk) -> 39 features [Issue #41 one-step variant] | |
| """ | |
| import os | |
| import io | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.preprocessing import StandardScaler, MinMaxScaler | |
| # ============================================================================= | |
| # Joint Definitions | |
| # ============================================================================= | |
| # MoveNet COCO keypoints (17 keypoints) | |
| MOVENET_KEYPOINTS = [ | |
| 'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear', | |
| 'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow', | |
| 'left_wrist', 'right_wrist', 'left_hip', 'right_hip', | |
| 'left_knee', 'right_knee', 'left_ankle', 'right_ankle' | |
| ] | |
| # Kinect joints (13 joints) - matches slide column order | |
| KINECT_JOINTS = [ | |
| 'head', 'left_shoulder', 'left_elbow', 'right_shoulder', 'right_elbow', | |
| 'left_hand', 'right_hand', 'left_hip', 'right_hip', | |
| 'left_knee', 'right_knee', 'left_foot', 'right_foot' | |
| ] | |
| # Mapping: Kinect joint -> MoveNet keypoint name | |
| KINECT_TO_MOVENET = { | |
| 'head': 'nose', | |
| 'left_shoulder': 'left_shoulder', | |
| 'right_shoulder': 'right_shoulder', | |
| 'left_elbow': 'left_elbow', | |
| 'right_elbow': 'right_elbow', | |
| 'left_hand': 'left_wrist', | |
| 'right_hand': 'right_wrist', | |
| 'left_hip': 'left_hip', | |
| 'right_hip': 'right_hip', | |
| 'left_knee': 'left_knee', | |
| 'right_knee': 'right_knee', | |
| 'left_foot': 'left_ankle', | |
| 'right_foot': 'right_ankle', | |
| } | |
| N_KINECT_JOINTS = len(KINECT_JOINTS) # 13 joints | |
| N_INPUT = N_KINECT_JOINTS * 2 # 26 features (PoseNet x,y) | |
| N_OUTPUT_XY = N_KINECT_JOINTS * 2 # 26 features (Kinect x,y) - Issue #40 | |
| N_OUTPUT_Z = N_KINECT_JOINTS # 13 features (Kinect z only) | |
| N_OUTPUT_XYZ = N_KINECT_JOINTS * 3 # 39 features (Kinect x,y,z) - Issue #41 | |
| # ============================================================================= | |
| # Data Loading Functions | |
| # ============================================================================= | |
| def load_kinect_csv(filepath: Union[str, bytes]) -> Dict[str, np.ndarray]: | |
| """ | |
| Load a Kinect CSV file. | |
| Returns: | |
| Dict with: | |
| 'xy' : (N, 26) Kinect x,y (Issue #40 target) | |
| 'z' : (N, 13) Kinect z | |
| 'xyz' : (N, 39) Kinect x,y,z (Issue #41 target) | |
| 'frames' : (N,) FrameNo values (int) if the column exists, else | |
| np.arange(N) as a fallback. | |
| """ | |
| if isinstance(filepath, (str, os.PathLike)): | |
| df = pd.read_csv(filepath) | |
| else: | |
| df = pd.read_csv(io.BytesIO(filepath)) | |
| df.columns = df.columns.str.strip() | |
| xy_cols, z_cols, xyz_cols = [], [], [] | |
| for joint in KINECT_JOINTS: | |
| xy_cols.extend([f"{joint}_x", f"{joint}_y"]) | |
| z_cols.append(f"{joint}_z") | |
| xyz_cols.extend([f"{joint}_x", f"{joint}_y", f"{joint}_z"]) | |
| if 'FrameNo' in df.columns: | |
| frames = df['FrameNo'].values.astype(np.int64) | |
| else: | |
| frames = np.arange(len(df), dtype=np.int64) | |
| return { | |
| 'xy': df[xy_cols].values.astype(np.float32), | |
| 'z': df[z_cols].values.astype(np.float32), | |
| 'xyz': df[xyz_cols].values.astype(np.float32), | |
| 'frames': frames, | |
| } | |
| def load_posenet_csv( | |
| filepath: str, | |
| frame_filter: Optional[np.ndarray] = None, | |
| ) -> np.ndarray: | |
| """ | |
| Load a PoseNet/MoveNet CSV already aligned to Kinect joint order. | |
| Expected columns (per slide spec): | |
| FrameNo, head_x, head_y, left_shoulder_x, left_shoulder_y, ... | |
| Args: | |
| filepath: PoseNet CSV path. | |
| frame_filter: Optional array of FrameNo values to select in order. | |
| Used to temporally align PoseNet frames to the corresponding | |
| Kinect frames (Kinect CSVs may start at FrameNo != 0). | |
| Returns: | |
| (N, 26) PoseNet x,y for 13 joints. | |
| """ | |
| df = pd.read_csv(filepath) | |
| df.columns = df.columns.str.strip() | |
| xy_cols = [] | |
| for joint in KINECT_JOINTS: | |
| xy_cols.extend([f"{joint}_x", f"{joint}_y"]) | |
| if frame_filter is not None and 'FrameNo' in df.columns: | |
| df = df.set_index('FrameNo') | |
| missing = [f for f in frame_filter if f not in df.index] | |
| if missing: | |
| raise ValueError( | |
| f"{len(missing)} FrameNo(s) missing from {filepath} " | |
| f"(first missing: {missing[:5]})" | |
| ) | |
| df = df.loc[frame_filter] | |
| return df[xy_cols].values.astype(np.float32) | |
| def load_movenet_raw_csv(filepath: str) -> np.ndarray: | |
| """ | |
| Load raw MoveNet CSV (17 COCO keypoints) and project to Kinect's 13 joints. | |
| """ | |
| df = pd.read_csv(filepath) | |
| df.columns = df.columns.str.strip() | |
| xy_cols = [] | |
| for kinect_joint in KINECT_JOINTS: | |
| movenet_name = KINECT_TO_MOVENET[kinect_joint] | |
| xy_cols.extend([f"{movenet_name}_x", f"{movenet_name}_y"]) | |
| return df[xy_cols].values.astype(np.float32) | |
| def load_paired_sequence( | |
| kinect_path: str, | |
| posenet_path: Optional[str] = None, | |
| simulate_posenet: bool = True, | |
| noise_std: float = 0.02, | |
| random_state: Optional[int] = None, | |
| ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: | |
| """ | |
| Load one paired sequence: (PoseNet input, Kinect targets). | |
| If posenet_path is None and simulate_posenet=True, PoseNet input is | |
| synthesised from Kinect xy by adding gaussian noise. This allows the | |
| pipeline to be validated before real PoseNet CSVs are generated. | |
| """ | |
| kinect = load_kinect_csv(kinect_path) | |
| if posenet_path is not None: | |
| # Align PoseNet to Kinect by FrameNo when both CSVs carry that column. | |
| X = load_posenet_csv(posenet_path, frame_filter=kinect['frames']) | |
| elif simulate_posenet: | |
| rng = np.random.default_rng(random_state) | |
| X = kinect['xy'] + rng.normal(0.0, noise_std, kinect['xy'].shape).astype(np.float32) | |
| else: | |
| raise ValueError("Provide posenet_path or set simulate_posenet=True") | |
| return X, kinect | |
| def load_all_paired_sequences( | |
| kinect_folder: str, | |
| posenet_folder: Optional[str] = None, | |
| simulate_posenet: bool = True, | |
| noise_std: float = 0.02, | |
| random_state: int = 42, | |
| ) -> Tuple[List[Tuple[np.ndarray, Dict]], List[str]]: | |
| """Load all paired (PoseNet, Kinect) sequences from a folder.""" | |
| sequences, file_names = [], [] | |
| csv_files = sorted(f for f in os.listdir(kinect_folder) if f.endswith('.csv')) | |
| print(f"Found {len(csv_files)} Kinect CSVs in {kinect_folder}") | |
| for i, name in enumerate(csv_files): | |
| k_path = os.path.join(kinect_folder, name) | |
| p_path = None | |
| if posenet_folder is not None: | |
| cand = os.path.join(posenet_folder, name) | |
| if os.path.exists(cand): | |
| p_path = cand | |
| X, targets = load_paired_sequence( | |
| k_path, | |
| posenet_path=p_path, | |
| simulate_posenet=simulate_posenet, | |
| noise_std=noise_std, | |
| random_state=random_state + i, | |
| ) | |
| sequences.append((X, targets)) | |
| file_names.append(name) | |
| if posenet_folder is None and simulate_posenet: | |
| print(f"Note: using SIMULATED PoseNet input (Kinect xy + noise std={noise_std}).") | |
| return sequences, file_names | |
| # ============================================================================= | |
| # Preprocessing | |
| # ============================================================================= | |
| def flatten_sequences( | |
| sequences: List[Tuple[np.ndarray, Dict]], | |
| output_type: str = 'xy', | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| if output_type not in ('xy', 'z', 'xyz'): | |
| raise ValueError(f"output_type must be 'xy', 'z', or 'xyz'; got {output_type!r}") | |
| X = np.concatenate([s[0] for s in sequences], axis=0) | |
| Y = np.concatenate([s[1][output_type] for s in sequences], axis=0) | |
| return X, Y | |
| def make_windowed_sequences( | |
| sequences: List[Tuple[np.ndarray, Dict]], | |
| window_size: int = 30, | |
| stride: int = 1, | |
| output_type: str = 'xy', | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """Create fixed-length windows for Conv1D/LSTM/GRU (returns full Y windows).""" | |
| if output_type not in ('xy', 'z', 'xyz'): | |
| raise ValueError(f"Invalid output_type: {output_type}") | |
| X_list, Y_list = [], [] | |
| for X_seq, targets in sequences: | |
| Y_seq = targets[output_type] | |
| n = len(X_seq) | |
| for start in range(0, n - window_size + 1, stride): | |
| X_list.append(X_seq[start:start + window_size]) | |
| Y_list.append(Y_seq[start:start + window_size]) | |
| return (np.array(X_list, dtype=np.float32), | |
| np.array(Y_list, dtype=np.float32)) | |
| class DataNormalizer: | |
| """StandardScaler/MinMaxScaler normalizer for input and output.""" | |
| def __init__(self, method: str = 'standard'): | |
| self.method = method | |
| self.input_scaler = StandardScaler() if method == 'standard' else MinMaxScaler() | |
| self.output_scaler = StandardScaler() if method == 'standard' else MinMaxScaler() | |
| self._fitted = False | |
| def fit(self, X: np.ndarray, Y: np.ndarray): | |
| self.input_scaler.fit(X) | |
| self.output_scaler.fit(Y) | |
| self._fitted = True | |
| return self | |
| def transform(self, X: np.ndarray, Y: np.ndarray = None): | |
| if not self._fitted: | |
| raise RuntimeError("Normalizer must be fitted before transform") | |
| X_norm = self.input_scaler.transform(X).astype(np.float32) | |
| if Y is None: | |
| return X_norm | |
| return X_norm, self.output_scaler.transform(Y).astype(np.float32) | |
| def fit_transform(self, X: np.ndarray, Y: np.ndarray): | |
| self.fit(X, Y) | |
| return self.transform(X, Y) | |
| def inverse_transform_output(self, Y_norm: np.ndarray) -> np.ndarray: | |
| return self.output_scaler.inverse_transform(Y_norm) | |
| # ============================================================================= | |
| # CV Utilities | |
| # ============================================================================= | |
| def create_cv_splits( | |
| sequences: List, | |
| n_folds: int = 5, | |
| random_state: int = 42, | |
| ) -> List[Tuple[List[int], List[int]]]: | |
| """Sequence-level CV splits (~10 sequences per fold per instructions).""" | |
| rng = np.random.default_rng(random_state) | |
| n = len(sequences) | |
| indices = np.arange(n) | |
| rng.shuffle(indices) | |
| fold_size = max(1, n // n_folds) | |
| splits = [] | |
| for fold in range(n_folds): | |
| start = fold * fold_size | |
| end = start + fold_size if fold < n_folds - 1 else n | |
| test_idx = indices[start:end].tolist() | |
| train_idx = [i for i in indices if i not in test_idx] | |
| splits.append((train_idx, test_idx)) | |
| return splits | |
| def get_fold_data( | |
| sequences: List[Tuple[np.ndarray, Dict]], | |
| train_indices: List[int], | |
| test_indices: List[int], | |
| output_type: str = 'xy', | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: | |
| train_seqs = [sequences[i] for i in train_indices] | |
| test_seqs = [sequences[i] for i in test_indices] | |
| X_train, Y_train = flatten_sequences(train_seqs, output_type=output_type) | |
| X_test, Y_test = flatten_sequences(test_seqs, output_type=output_type) | |
| return X_train, Y_train, X_test, Y_test | |
| # ============================================================================= | |
| # Main entry | |
| # ============================================================================= | |
| def load_dataset( | |
| kinect_folder: str, | |
| posenet_folder: Optional[str] = None, | |
| simulate_posenet: bool = True, | |
| output_type: str = 'xy', | |
| normalize: bool = True, | |
| test_split: float = 0.2, | |
| random_state: int = 42, | |
| noise_std: float = 0.02, | |
| ) -> Dict: | |
| """ | |
| Load full paired dataset and split train/test. | |
| Default (output_type='xy') implements Issue #40: | |
| Input : PoseNet 2D (26) | |
| Output : Kinect 2D (26) | |
| """ | |
| sequences, file_names = load_all_paired_sequences( | |
| kinect_folder, | |
| posenet_folder=posenet_folder, | |
| simulate_posenet=simulate_posenet, | |
| noise_std=noise_std, | |
| random_state=random_state, | |
| ) | |
| rng = np.random.default_rng(random_state) | |
| n = len(sequences) | |
| indices = rng.permutation(n) | |
| n_test = int(n * test_split) | |
| test_idx = indices[:n_test].tolist() | |
| train_idx = indices[n_test:].tolist() | |
| X_train, Y_train, X_test, Y_test = get_fold_data( | |
| sequences, train_idx, test_idx, output_type | |
| ) | |
| normalizer = None | |
| if normalize: | |
| normalizer = DataNormalizer(method='standard') | |
| X_train, Y_train = normalizer.fit_transform(X_train, Y_train) | |
| X_test, Y_test = normalizer.transform(X_test, Y_test) | |
| return { | |
| 'X_train': X_train, 'Y_train': Y_train, | |
| 'X_test': X_test, 'Y_test': Y_test, | |
| 'sequences': sequences, 'file_names': file_names, | |
| 'train_indices': train_idx, 'test_indices': test_idx, | |
| 'normalizer': normalizer, | |
| 'output_type': output_type, | |
| 'input_dim': X_train.shape[1], | |
| 'output_dim': Y_train.shape[1], | |
| } | |
| # ============================================================================= | |
| # Demo | |
| # ============================================================================= | |
| if __name__ == '__main__': | |
| REPO_ROOT = Path(__file__).parent.parent | |
| KINECT_PATH = REPO_ROOT / 'kinect_good_preprocessed' | |
| if not KINECT_PATH.exists(): | |
| print(f"Kinect data not found at: {KINECT_PATH}") | |
| else: | |
| print("Loading paired dataset (Issue #40: PoseNet 2D -> Kinect 2D)...") | |
| data = load_dataset( | |
| str(KINECT_PATH), | |
| posenet_folder=None, | |
| simulate_posenet=True, | |
| output_type='xy', | |
| normalize=True, | |
| ) | |
| print(f"\nInput dim: {data['input_dim']} (PoseNet x,y for 13 joints)") | |
| print(f"Output dim: {data['output_dim']} (Kinect x,y for 13 joints)") | |
| print(f"X_train: {data['X_train'].shape}") | |
| print(f"Y_train: {data['Y_train'].shape}") | |
| print(f"X_test : {data['X_test'].shape}") | |
| print(f"Y_test : {data['Y_test'].shape}") | |
| print(f"Sequences: total={len(data['sequences'])}, " | |
| f"train={len(data['train_indices'])}, test={len(data['test_indices'])}") | |