pose-deep-learning / A13 /preprocess_raw_data.py
Bachstelze
init A13 data
b94b2ad
#!/usr/bin/env python3
"""
Script to preprocess raw kinect data according to the specifications:
- Fixed size sequence with c=10 frames equidistantly distributed
- Mark as good/bad based on filenames (A1, G* = good; W* = bad)
- Average coordinates of surrounding frames (optional)
"""
import os
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split
def load_csv_data(csv_path):
"""Load CSV data and return as numpy array. Handles both space-separated and comma-separated files with or without headers."""
# Read the first line to check if it contains headers
with open(csv_path, 'r') as f:
first_line = f.readline().strip()
# Try to convert first line to float to check if it's data or header
try:
# Try converting the first element to float
parts = first_line.split(None, 1) # Split on first whitespace to check first value
first_value = parts[0]
float(first_value)
# If successful, the file doesn't have headers, process as before
data = []
with open(csv_path, 'r') as f:
for line in f:
row = [float(x) for x in line.strip().split()]
data.append(row)
return np.array(data)
except ValueError:
# If conversion fails, it's likely a header, so skip it
data = []
with open(csv_path, 'r') as f:
next(f) # Skip header line
for line in f:
# Handle both space-separated and comma-separated values
if ',' in line:
row = [float(x) for x in line.strip().split(',')]
else:
row = [float(x) for x in line.strip().split()]
data.append(row)
return np.array(data)
def extract_frame_sequence(data, start_frame, end_frame):
"""Extract frames from start_frame to end_frame (inclusive)."""
mask = (data[:, 0] >= start_frame) & (data[:, 0] <= end_frame)
return data[mask]
def select_equidistant_frames(sequence, c=10):
"""Select c equidistant frames from the sequence."""
if len(sequence) <= c:
# If sequence is shorter than c, pad with zeros or repeat last frame
selected_frames = []
for i in range(c):
if i < len(sequence):
selected_frames.append(sequence[i])
else:
# Pad with zeros for missing frames
padded_frame = np.zeros_like(sequence[0])
# Keep the frame number as an indicator
padded_frame[0] = -1 # Use -1 to indicate padding
selected_frames.append(padded_frame)
return np.array(selected_frames)
# Select c equidistant indices
indices = np.linspace(0, len(sequence) - 1, c, dtype=int)
return sequence[indices]
def average_surrounding_frames(frame_data, window_size=3):
"""
Average coordinates of surrounding frames (optional processing).
This helps smooth the data by considering neighboring frames.
"""
if len(frame_data) == 1:
return frame_data
averaged_data = np.copy(frame_data)
for i in range(len(frame_data)):
start_idx = max(0, i - window_size // 2)
end_idx = min(len(frame_data), i + window_size // 2 + 1)
# Average the coordinate columns (skip the first few columns which are metadata)
# Assuming first 3 columns are metadata (frame_num, timestamp, ?) and rest are coordinates
coord_start = 3 # Start averaging from column 3 onwards
for j in range(coord_start, frame_data.shape[1]):
avg_val = np.mean(frame_data[start_idx:end_idx, j])
averaged_data[i, j] = avg_val
return averaged_data
def process_video_file(csv_path, start_stop_path=None, apply_averaging=False):
"""Process a single video file and return processed data."""
# Load the raw data
raw_data = load_csv_data(csv_path)
if start_stop_path and os.path.exists(start_stop_path):
# Load start and stop frames if available
with open(start_stop_path, 'r') as f:
start_frame, stop_frame = map(int, f.read().strip().split())
# Extract the relevant frame sequence
frame_sequence = extract_frame_sequence(raw_data, start_frame, stop_frame)
else:
# Use the full sequence if no start/stop is provided
frame_sequence = raw_data
# Select equidistant frames (c=10)
equidistant_frames = select_equidistant_frames(frame_sequence, c=10)
# Optionally average surrounding frames
if apply_averaging:
equidistant_frames = average_surrounding_frames(equidistant_frames)
return equidistant_frames
def get_label_from_filename(filename):
"""Determine if the file represents a 'good' or 'bad' sequence."""
basename = Path(filename).stem
if basename.startswith('G') or basename == 'A1':
return 1 # Good
elif basename.startswith('W'):
return 0 # Bad
else:
raise ValueError(f"Unknown file type for {filename}")
def process_folder(base_dir, output_dir):
"""Process all files in a folder."""
# Find all CSV files
csv_files = list(base_dir.glob("*.csv"))
processed_sequences = []
labels = []
filenames = []
print(f"Processing video files in {base_dir}...")
for csv_file in csv_files:
# Skip if it's not a main video file (e.g., skip .new_json files)
if '.new_json' in str(csv_file) or '.json' in str(csv_file):
continue
# Check if corresponding start_stop file exists
start_stop_file = csv_file.with_suffix('.start_stop_frames')
# Process the file
try:
processed_data = process_video_file(csv_file, start_stop_file if start_stop_file.exists() else None, apply_averaging=True)
# Get the label based on filename
label = get_label_from_filename(csv_file.name)
processed_sequences.append(processed_data)
labels.append(label)
filenames.append(csv_file.stem)
print(f"Processed {csv_file.name}: shape {processed_data.shape}, label {label}")
except Exception as e:
print(f"Error processing {csv_file.name}: {str(e)}")
continue
return processed_sequences, labels, filenames
def main():
# Define paths
base_dirs = [
Path("Data-intensive-systems/A13/Raw_data/Good vs Bad"),
Path("Data-intensive-systems/A13/Raw_data/kinect_good_vs_bad_not_preprocessed")
]
output_dir = Path("Data-intensive-systems/A13/Processed_Data")
output_dir.mkdir(exist_ok=True)
for i, base_dir in enumerate(base_dirs):
if base_dir.exists():
print(f"\nProcessing directory {i+1}: {base_dir}")
sequences, labels, filenames = process_folder(base_dir, output_dir)
if not sequences:
print(f"No files were processed successfully in {base_dir}.")
continue
# Convert to numpy arrays
processed_sequences = np.array(sequences)
labels = np.array(labels)
print(f"Final dataset shape: {processed_sequences.shape}")
print(f"Labels shape: {labels.shape}")
print(f"Number of good sequences: {np.sum(labels == 1)}")
print(f"Number of bad sequences: {np.sum(labels == 0)}")
# Split the data into train and test sets
X_train, X_test, y_train, y_test, filenames_train, filenames_test = train_test_split(
processed_sequences, labels, filenames,
test_size=0.2, random_state=42, stratify=labels
)
print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")
print(f"Training good sequences: {np.sum(y_train == 1)}")
print(f"Training bad sequences: {np.sum(y_train == 0)}")
print(f"Test good sequences: {np.sum(y_test == 1)}")
print(f"Test bad sequences: {np.sum(y_test == 0)}")
# Determine directory name for saving files
dir_name = base_dir.name.replace(' ', '_').replace('-', '_')
# Save the processed data with directory-specific naming
np.save(output_dir / f"sequences_{dir_name}_train.npy", X_train)
np.save(output_dir / f"sequences_{dir_name}_test.npy", X_test)
np.save(output_dir / f"labels_{dir_name}_train.npy", y_train)
np.save(output_dir / f"labels_{dir_name}_test.npy", y_test)
# Also save as CSV for easier inspection
# Reshape sequences to 2D for CSV export
n_train, n_frames, n_features = X_train.shape
n_test = X_test.shape[0]
reshaped_X_train = X_train.reshape(n_train, n_frames * n_features)
reshaped_X_test = X_test.reshape(n_test, n_frames * n_features)
# Create CSV for training data
df_train = pd.DataFrame(reshaped_X_train)
df_train.insert(0, 'label', y_train)
df_train.insert(0, 'filename', filenames_train)
df_train.to_csv(output_dir / f"processed_sequences_{dir_name}_train.csv", index=False)
# Create CSV for test data
df_test = pd.DataFrame(reshaped_X_test)
df_test.insert(0, 'label', y_test)
df_test.insert(0, 'filename', filenames_test)
df_test.to_csv(output_dir / f"processed_sequences_{dir_name}_test.csv", index=False)
print(f"\nProcessed data for {base_dir.name} saved to {output_dir}/")
print(f"- sequences_{dir_name}_train.npy: Numpy array of shape (n_train, 10, n_features) for training")
print(f"- sequences_{dir_name}_test.npy: Numpy array of shape (n_test, 10, n_features) for testing")
print(f"- labels_{dir_name}_train.npy: Numpy array of shape (n_train,) with binary labels for training")
print(f"- labels_{dir_name}_test.npy: Numpy array of shape (n_test,) with binary labels for testing")
print(f"- processed_sequences_{dir_name}_train.csv: CSV file with training labels and filenames")
print(f"- processed_sequences_{dir_name}_test.csv: CSV file with test labels and filenames")
if __name__ == "__main__":
main()