#!/usr/bin/env python3 """ Script to preprocess raw kinect data according to the specifications: - Fixed size sequence with c=10 frames equidistantly distributed - Mark as good/bad based on filenames (A1, G* = good; W* = bad) - Average coordinates of surrounding frames (optional) """ import os import numpy as np import pandas as pd from pathlib import Path from sklearn.model_selection import train_test_split def load_csv_data(csv_path): """Load CSV data and return as numpy array. Handles both space-separated and comma-separated files with or without headers.""" # Read the first line to check if it contains headers with open(csv_path, 'r') as f: first_line = f.readline().strip() # Try to convert first line to float to check if it's data or header try: # Try converting the first element to float parts = first_line.split(None, 1) # Split on first whitespace to check first value first_value = parts[0] float(first_value) # If successful, the file doesn't have headers, process as before data = [] with open(csv_path, 'r') as f: for line in f: row = [float(x) for x in line.strip().split()] data.append(row) return np.array(data) except ValueError: # If conversion fails, it's likely a header, so skip it data = [] with open(csv_path, 'r') as f: next(f) # Skip header line for line in f: # Handle both space-separated and comma-separated values if ',' in line: row = [float(x) for x in line.strip().split(',')] else: row = [float(x) for x in line.strip().split()] data.append(row) return np.array(data) def extract_frame_sequence(data, start_frame, end_frame): """Extract frames from start_frame to end_frame (inclusive).""" mask = (data[:, 0] >= start_frame) & (data[:, 0] <= end_frame) return data[mask] def select_equidistant_frames(sequence, c=10): """Select c equidistant frames from the sequence.""" if len(sequence) <= c: # If sequence is shorter than c, pad with zeros or repeat last frame selected_frames = [] for i in range(c): if i < len(sequence): selected_frames.append(sequence[i]) else: # Pad with zeros for missing frames padded_frame = np.zeros_like(sequence[0]) # Keep the frame number as an indicator padded_frame[0] = -1 # Use -1 to indicate padding selected_frames.append(padded_frame) return np.array(selected_frames) # Select c equidistant indices indices = np.linspace(0, len(sequence) - 1, c, dtype=int) return sequence[indices] def average_surrounding_frames(frame_data, window_size=3): """ Average coordinates of surrounding frames (optional processing). This helps smooth the data by considering neighboring frames. """ if len(frame_data) == 1: return frame_data averaged_data = np.copy(frame_data) for i in range(len(frame_data)): start_idx = max(0, i - window_size // 2) end_idx = min(len(frame_data), i + window_size // 2 + 1) # Average the coordinate columns (skip the first few columns which are metadata) # Assuming first 3 columns are metadata (frame_num, timestamp, ?) and rest are coordinates coord_start = 3 # Start averaging from column 3 onwards for j in range(coord_start, frame_data.shape[1]): avg_val = np.mean(frame_data[start_idx:end_idx, j]) averaged_data[i, j] = avg_val return averaged_data def process_video_file(csv_path, start_stop_path=None, apply_averaging=False): """Process a single video file and return processed data.""" # Load the raw data raw_data = load_csv_data(csv_path) if start_stop_path and os.path.exists(start_stop_path): # Load start and stop frames if available with open(start_stop_path, 'r') as f: start_frame, stop_frame = map(int, f.read().strip().split()) # Extract the relevant frame sequence frame_sequence = extract_frame_sequence(raw_data, start_frame, stop_frame) else: # Use the full sequence if no start/stop is provided frame_sequence = raw_data # Select equidistant frames (c=10) equidistant_frames = select_equidistant_frames(frame_sequence, c=10) # Optionally average surrounding frames if apply_averaging: equidistant_frames = average_surrounding_frames(equidistant_frames) return equidistant_frames def get_label_from_filename(filename): """Determine if the file represents a 'good' or 'bad' sequence.""" basename = Path(filename).stem if basename.startswith('G') or basename == 'A1': return 1 # Good elif basename.startswith('W'): return 0 # Bad else: raise ValueError(f"Unknown file type for {filename}") def process_folder(base_dir, output_dir): """Process all files in a folder.""" # Find all CSV files csv_files = list(base_dir.glob("*.csv")) processed_sequences = [] labels = [] filenames = [] print(f"Processing video files in {base_dir}...") for csv_file in csv_files: # Skip if it's not a main video file (e.g., skip .new_json files) if '.new_json' in str(csv_file) or '.json' in str(csv_file): continue # Check if corresponding start_stop file exists start_stop_file = csv_file.with_suffix('.start_stop_frames') # Process the file try: processed_data = process_video_file(csv_file, start_stop_file if start_stop_file.exists() else None, apply_averaging=True) # Get the label based on filename label = get_label_from_filename(csv_file.name) processed_sequences.append(processed_data) labels.append(label) filenames.append(csv_file.stem) print(f"Processed {csv_file.name}: shape {processed_data.shape}, label {label}") except Exception as e: print(f"Error processing {csv_file.name}: {str(e)}") continue return processed_sequences, labels, filenames def main(): # Define paths base_dirs = [ Path("Data-intensive-systems/A13/Raw_data/Good vs Bad"), Path("Data-intensive-systems/A13/Raw_data/kinect_good_vs_bad_not_preprocessed") ] output_dir = Path("Data-intensive-systems/A13/Processed_Data") output_dir.mkdir(exist_ok=True) for i, base_dir in enumerate(base_dirs): if base_dir.exists(): print(f"\nProcessing directory {i+1}: {base_dir}") sequences, labels, filenames = process_folder(base_dir, output_dir) if not sequences: print(f"No files were processed successfully in {base_dir}.") continue # Convert to numpy arrays processed_sequences = np.array(sequences) labels = np.array(labels) print(f"Final dataset shape: {processed_sequences.shape}") print(f"Labels shape: {labels.shape}") print(f"Number of good sequences: {np.sum(labels == 1)}") print(f"Number of bad sequences: {np.sum(labels == 0)}") # Split the data into train and test sets X_train, X_test, y_train, y_test, filenames_train, filenames_test = train_test_split( processed_sequences, labels, filenames, test_size=0.2, random_state=42, stratify=labels ) print(f"Training set shape: {X_train.shape}") print(f"Test set shape: {X_test.shape}") print(f"Training good sequences: {np.sum(y_train == 1)}") print(f"Training bad sequences: {np.sum(y_train == 0)}") print(f"Test good sequences: {np.sum(y_test == 1)}") print(f"Test bad sequences: {np.sum(y_test == 0)}") # Determine directory name for saving files dir_name = base_dir.name.replace(' ', '_').replace('-', '_') # Save the processed data with directory-specific naming np.save(output_dir / f"sequences_{dir_name}_train.npy", X_train) np.save(output_dir / f"sequences_{dir_name}_test.npy", X_test) np.save(output_dir / f"labels_{dir_name}_train.npy", y_train) np.save(output_dir / f"labels_{dir_name}_test.npy", y_test) # Also save as CSV for easier inspection # Reshape sequences to 2D for CSV export n_train, n_frames, n_features = X_train.shape n_test = X_test.shape[0] reshaped_X_train = X_train.reshape(n_train, n_frames * n_features) reshaped_X_test = X_test.reshape(n_test, n_frames * n_features) # Create CSV for training data df_train = pd.DataFrame(reshaped_X_train) df_train.insert(0, 'label', y_train) df_train.insert(0, 'filename', filenames_train) df_train.to_csv(output_dir / f"processed_sequences_{dir_name}_train.csv", index=False) # Create CSV for test data df_test = pd.DataFrame(reshaped_X_test) df_test.insert(0, 'label', y_test) df_test.insert(0, 'filename', filenames_test) df_test.to_csv(output_dir / f"processed_sequences_{dir_name}_test.csv", index=False) print(f"\nProcessed data for {base_dir.name} saved to {output_dir}/") print(f"- sequences_{dir_name}_train.npy: Numpy array of shape (n_train, 10, n_features) for training") print(f"- sequences_{dir_name}_test.npy: Numpy array of shape (n_test, 10, n_features) for testing") print(f"- labels_{dir_name}_train.npy: Numpy array of shape (n_train,) with binary labels for training") print(f"- labels_{dir_name}_test.npy: Numpy array of shape (n_test,) with binary labels for testing") print(f"- processed_sequences_{dir_name}_train.csv: CSV file with training labels and filenames") print(f"- processed_sequences_{dir_name}_test.csv: CSV file with test labels and filenames") if __name__ == "__main__": main()