Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Dataset Augmentation Script for Processed Skeleton-based Classification Data | |
| This script applies the following augmentations to the processed classification data: | |
| 1. Mirror on y-axis (flip left/right) | |
| 2. Rotate on y-axis by a few degrees | |
| 3. Stretch/compress a few % in x, y, z axes | |
| The script only augments original datapoints, not generated ones. | |
| Usage: | |
| python3 augment_processed_data.py --input <input_csv> --output <output_csv> | |
| """ | |
| import argparse | |
| import pandas as pd | |
| import numpy as np | |
| from typing import List | |
| def get_coordinate_indices(df: pd.DataFrame) -> List[int]: | |
| """ | |
| Get indices for coordinate values in the dataframe. | |
| The processed data has 1020 features per row (10 frames x 102 features), | |
| preceded by 'filename' and 'label' columns. | |
| Args: | |
| df: Input dataframe | |
| Returns: | |
| List of indices corresponding to coordinate values | |
| """ | |
| # Skip the first 2 columns (filename, label) to get to the coordinate data | |
| start_idx = 2 | |
| end_idx = min(len(df.columns), 1022) # 2 (filename, label) + 1020 (features) | |
| return list(range(start_idx, end_idx)) | |
| def get_frame_indices() -> List[List[int]]: | |
| """ | |
| Get the indices for each frame in the sequence. | |
| Each sequence has 10 frames with 102 features per frame. | |
| Returns: | |
| List of lists, where each inner list contains the indices for one frame | |
| """ | |
| frame_indices = [] | |
| # Start from index 2 to skip filename and label columns | |
| for frame_idx in range(10): # 10 frames per sequence | |
| start = 2 + (frame_idx * 102) # Skip filename and label (indices 0, 1) | |
| end = 2 + ((frame_idx + 1) * 102) | |
| frame_indices.append(list(range(start, end))) | |
| return frame_indices | |
| def identify_original_samples(df: pd.DataFrame) -> pd.Series: | |
| """ | |
| Identify original samples (not augmented ones) based on filename patterns. | |
| Args: | |
| df: Input dataframe with 'filename' column | |
| Returns: | |
| Boolean Series indicating which rows are original samples | |
| """ | |
| # Original samples have simple names like G01, W01, A1, etc. | |
| # Augmented samples would have suffixes like _mirror, _rotate, etc. | |
| original_mask = ~df['filename'].str.contains(r'_mirror|_rotate|_stretch|_neg', na=False) | |
| return original_mask | |
| def mirror_on_y_axis(df: pd.DataFrame, coord_indices: List[int]) -> pd.DataFrame: | |
| """ | |
| Mirror the skeleton on the y-axis by flipping x-coordinates. | |
| This assumes coordinates are arranged in x, y, z groups throughout the sequence. | |
| Args: | |
| df: Input dataframe | |
| coord_indices: List of indices for coordinate values | |
| Returns: | |
| Mirrored dataframe | |
| """ | |
| df_augmented = df.copy() | |
| # In skeleton data, coordinates typically follow an x, y, z pattern | |
| # So every third coordinate starting from the first coordinate is an x-value | |
| # Since we start from index 2 (after filename and label), the first coordinate is at index 2 | |
| # Then we have x, y, z at indices 2, 3, 4; then x, y, z at indices 5, 6, 7; etc. | |
| # Find x-coordinate positions (every third index starting from the first coordinate position) | |
| for i in range(0, len(coord_indices), 3): # Every third coordinate is x | |
| x_idx = coord_indices[i] | |
| if x_idx < df.shape[1]: | |
| df_augmented.iloc[:, x_idx] = -df.iloc[:, x_idx] | |
| return df_augmented | |
| def rotate_on_y_axis(df: pd.DataFrame, frame_indices: List[List[int]], | |
| angle_deg: float) -> pd.DataFrame: | |
| """ | |
| Rotate the skeleton around the y-axis by a given angle. | |
| This assumes coordinates are arranged in x, y, z groups. | |
| Args: | |
| df: Input dataframe | |
| frame_indices: List of indices for each frame | |
| angle_deg: Rotation angle in degrees (positive = counter-clockwise) | |
| Returns: | |
| Rotated dataframe | |
| """ | |
| df_augmented = df.copy() | |
| angle_rad = np.radians(angle_deg) | |
| cos_a = np.cos(angle_rad) | |
| sin_a = np.sin(angle_rad) | |
| # Rotation matrix for y-axis: | |
| # x' = x*cos(θ) + z*sin(θ) | |
| # y' = y | |
| # z' = -x*sin(θ) + z*cos(θ) | |
| # Apply rotation to each frame | |
| for frame_idx_list in frame_indices: | |
| # Process every group of 3 coordinates (x, y, z) in this frame | |
| for i in range(0, len(frame_idx_list), 3): | |
| if i + 2 < len(frame_idx_list): # Ensure we have x, y, z indices | |
| x_idx = frame_idx_list[i] | |
| y_idx = frame_idx_list[i + 1] | |
| z_idx = frame_idx_list[i + 2] | |
| if x_idx < df.shape[1] and y_idx < df.shape[1] and z_idx < df.shape[1]: | |
| # Store original values | |
| x_orig = df.iloc[:, x_idx].values | |
| y_orig = df.iloc[:, y_idx].values | |
| z_orig = df.iloc[:, z_idx].values | |
| # Apply rotation | |
| df_augmented.iloc[:, x_idx] = x_orig * cos_a + z_orig * sin_a | |
| df_augmented.iloc[:, z_idx] = -x_orig * sin_a + z_orig * cos_a | |
| # y remains unchanged | |
| return df_augmented | |
| def stretch_compress(df: pd.DataFrame, frame_indices: List[List[int]], | |
| scale_x: float, scale_y: float, scale_z: float) -> pd.DataFrame: | |
| """ | |
| Apply scaling/stretching to the skeleton data. | |
| This assumes coordinates are arranged in x, y, z groups. | |
| Args: | |
| df: Input dataframe | |
| frame_indices: List of indices for each frame | |
| scale_x: Scale factor for x-axis (e.g., 1.05 = 5% stretch) | |
| scale_y: Scale factor for y-axis | |
| scale_z: Scale factor for z-axis | |
| Returns: | |
| Scaled dataframe | |
| """ | |
| df_augmented = df.copy() | |
| # Apply scaling to each frame | |
| for frame_idx_list in frame_indices: | |
| # Process every group of 3 coordinates (x, y, z) in this frame | |
| for i in range(0, len(frame_idx_list), 3): | |
| if i + 2 < len(frame_idx_list): # Ensure we have x, y, z indices | |
| x_idx = frame_idx_list[i] | |
| y_idx = frame_idx_list[i + 1] | |
| z_idx = frame_idx_list[i + 2] | |
| if x_idx < df.shape[1]: | |
| df_augmented.iloc[:, x_idx] *= scale_x | |
| if y_idx < df.shape[1]: | |
| df_augmented.iloc[:, y_idx] *= scale_y | |
| if z_idx < df.shape[1]: | |
| df_augmented.iloc[:, z_idx] *= scale_z | |
| return df_augmented | |
| def generate_augmented_dataset(input_file: str, output_file: str) -> None: | |
| """ | |
| Generate an augmented dataset from the input file. | |
| Args: | |
| input_file: Path to input CSV file | |
| output_file: Path to output CSV file | |
| """ | |
| print(f"Loading data from {input_file}...") | |
| df = pd.read_csv(input_file) | |
| print(f"Loaded {len(df)} samples with {len(df.columns)} columns") | |
| # Identify original samples only (not previously augmented ones) | |
| original_mask = identify_original_samples(df) | |
| df_original = df[original_mask].copy() | |
| print(f"Found {len(df_original)} original samples to augment") | |
| # Get coordinate indices and frame structure | |
| coord_indices = get_coordinate_indices(df_original) | |
| frame_indices = get_frame_indices() | |
| # Define augmentation configurations | |
| # 1. Mirror on y-axis | |
| print("\n1. Applying mirror on y-axis...") | |
| df_mirror = mirror_on_y_axis(df_original.copy(), coord_indices) | |
| df_mirror['filename'] = df_original['filename'].astype(str) + '_mirror' | |
| # 2. Rotate on y-axis by +10 degrees | |
| print("2. Applying y-axis rotation (+10 degrees)...") | |
| df_rotate_pos = rotate_on_y_axis(df_original.copy(), frame_indices, 10) | |
| df_rotate_pos['filename'] = df_original['filename'].astype(str) + '_rotate_pos' | |
| # 3. Rotate on y-axis by -10 degrees | |
| print("3. Applying y-axis rotation (-10 degrees)...") | |
| df_rotate_neg = rotate_on_y_axis(df_original.copy(), frame_indices, -10) | |
| df_rotate_neg['filename'] = df_original['filename'].astype(str) + '_rotate_neg' | |
| # 4. Stretch/compress in x, y, z axes | |
| print("4. Applying stretch/compress (x: +5%, y: -5%, z: +2%)...") | |
| df_stretch = stretch_compress(df_original.copy(), frame_indices, 1.05, 0.95, 1.02) | |
| df_stretch['filename'] = df_original['filename'].astype(str) + '_stretch' | |
| # Combine all augmented data with original | |
| df_combined = pd.concat([ | |
| df_original, # Original | |
| df_mirror, # Mirror | |
| df_rotate_pos, # Rotate +10 | |
| df_rotate_neg, # Rotate -10 | |
| df_stretch # Stretch | |
| ], ignore_index=True) | |
| print(f"\n=== Summary ===") | |
| print(f"Original samples: {len(df_original)}") | |
| print(f"Mirror samples: {len(df_mirror)}") | |
| print(f"Rotate +10 samples: {len(df_rotate_pos)}") | |
| print(f"Rotate -10 samples: {len(df_rotate_neg)}") | |
| print(f"Stretch samples: {len(df_stretch)}") | |
| print(f"Total samples: {len(df_combined)}") | |
| # Save to CSV | |
| print(f"\nSaving to {output_file}...") | |
| df_combined.to_csv(output_file, index=False) | |
| print("Done!") | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Dataset Augmentation for Processed Skeleton Data') | |
| parser.add_argument('--input', type=str, required=True, | |
| help='Input CSV file path') | |
| parser.add_argument('--output', type=str, required=True, | |
| help='Output CSV file path') | |
| args = parser.parse_args() | |
| generate_augmented_dataset( | |
| input_file=args.input, | |
| output_file=args.output | |
| ) | |
| if __name__ == '__main__': | |
| main() | |