Spaces:
Running
Running
File size: 10,337 Bytes
b94b2ad | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 | #!/usr/bin/env python3
"""
Script to preprocess raw kinect data according to the specifications:
- Fixed size sequence with c=10 frames equidistantly distributed
- Mark as good/bad based on filenames (A1, G* = good; W* = bad)
- Average coordinates of surrounding frames (optional)
"""
import os
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split
def load_csv_data(csv_path):
"""Load CSV data and return as numpy array. Handles both space-separated and comma-separated files with or without headers."""
# Read the first line to check if it contains headers
with open(csv_path, 'r') as f:
first_line = f.readline().strip()
# Try to convert first line to float to check if it's data or header
try:
# Try converting the first element to float
parts = first_line.split(None, 1) # Split on first whitespace to check first value
first_value = parts[0]
float(first_value)
# If successful, the file doesn't have headers, process as before
data = []
with open(csv_path, 'r') as f:
for line in f:
row = [float(x) for x in line.strip().split()]
data.append(row)
return np.array(data)
except ValueError:
# If conversion fails, it's likely a header, so skip it
data = []
with open(csv_path, 'r') as f:
next(f) # Skip header line
for line in f:
# Handle both space-separated and comma-separated values
if ',' in line:
row = [float(x) for x in line.strip().split(',')]
else:
row = [float(x) for x in line.strip().split()]
data.append(row)
return np.array(data)
def extract_frame_sequence(data, start_frame, end_frame):
"""Extract frames from start_frame to end_frame (inclusive)."""
mask = (data[:, 0] >= start_frame) & (data[:, 0] <= end_frame)
return data[mask]
def select_equidistant_frames(sequence, c=10):
"""Select c equidistant frames from the sequence."""
if len(sequence) <= c:
# If sequence is shorter than c, pad with zeros or repeat last frame
selected_frames = []
for i in range(c):
if i < len(sequence):
selected_frames.append(sequence[i])
else:
# Pad with zeros for missing frames
padded_frame = np.zeros_like(sequence[0])
# Keep the frame number as an indicator
padded_frame[0] = -1 # Use -1 to indicate padding
selected_frames.append(padded_frame)
return np.array(selected_frames)
# Select c equidistant indices
indices = np.linspace(0, len(sequence) - 1, c, dtype=int)
return sequence[indices]
def average_surrounding_frames(frame_data, window_size=3):
"""
Average coordinates of surrounding frames (optional processing).
This helps smooth the data by considering neighboring frames.
"""
if len(frame_data) == 1:
return frame_data
averaged_data = np.copy(frame_data)
for i in range(len(frame_data)):
start_idx = max(0, i - window_size // 2)
end_idx = min(len(frame_data), i + window_size // 2 + 1)
# Average the coordinate columns (skip the first few columns which are metadata)
# Assuming first 3 columns are metadata (frame_num, timestamp, ?) and rest are coordinates
coord_start = 3 # Start averaging from column 3 onwards
for j in range(coord_start, frame_data.shape[1]):
avg_val = np.mean(frame_data[start_idx:end_idx, j])
averaged_data[i, j] = avg_val
return averaged_data
def process_video_file(csv_path, start_stop_path=None, apply_averaging=False):
"""Process a single video file and return processed data."""
# Load the raw data
raw_data = load_csv_data(csv_path)
if start_stop_path and os.path.exists(start_stop_path):
# Load start and stop frames if available
with open(start_stop_path, 'r') as f:
start_frame, stop_frame = map(int, f.read().strip().split())
# Extract the relevant frame sequence
frame_sequence = extract_frame_sequence(raw_data, start_frame, stop_frame)
else:
# Use the full sequence if no start/stop is provided
frame_sequence = raw_data
# Select equidistant frames (c=10)
equidistant_frames = select_equidistant_frames(frame_sequence, c=10)
# Optionally average surrounding frames
if apply_averaging:
equidistant_frames = average_surrounding_frames(equidistant_frames)
return equidistant_frames
def get_label_from_filename(filename):
"""Determine if the file represents a 'good' or 'bad' sequence."""
basename = Path(filename).stem
if basename.startswith('G') or basename == 'A1':
return 1 # Good
elif basename.startswith('W'):
return 0 # Bad
else:
raise ValueError(f"Unknown file type for {filename}")
def process_folder(base_dir, output_dir):
"""Process all files in a folder."""
# Find all CSV files
csv_files = list(base_dir.glob("*.csv"))
processed_sequences = []
labels = []
filenames = []
print(f"Processing video files in {base_dir}...")
for csv_file in csv_files:
# Skip if it's not a main video file (e.g., skip .new_json files)
if '.new_json' in str(csv_file) or '.json' in str(csv_file):
continue
# Check if corresponding start_stop file exists
start_stop_file = csv_file.with_suffix('.start_stop_frames')
# Process the file
try:
processed_data = process_video_file(csv_file, start_stop_file if start_stop_file.exists() else None, apply_averaging=True)
# Get the label based on filename
label = get_label_from_filename(csv_file.name)
processed_sequences.append(processed_data)
labels.append(label)
filenames.append(csv_file.stem)
print(f"Processed {csv_file.name}: shape {processed_data.shape}, label {label}")
except Exception as e:
print(f"Error processing {csv_file.name}: {str(e)}")
continue
return processed_sequences, labels, filenames
def main():
# Define paths
base_dirs = [
Path("Data-intensive-systems/A13/Raw_data/Good vs Bad"),
Path("Data-intensive-systems/A13/Raw_data/kinect_good_vs_bad_not_preprocessed")
]
output_dir = Path("Data-intensive-systems/A13/Processed_Data")
output_dir.mkdir(exist_ok=True)
for i, base_dir in enumerate(base_dirs):
if base_dir.exists():
print(f"\nProcessing directory {i+1}: {base_dir}")
sequences, labels, filenames = process_folder(base_dir, output_dir)
if not sequences:
print(f"No files were processed successfully in {base_dir}.")
continue
# Convert to numpy arrays
processed_sequences = np.array(sequences)
labels = np.array(labels)
print(f"Final dataset shape: {processed_sequences.shape}")
print(f"Labels shape: {labels.shape}")
print(f"Number of good sequences: {np.sum(labels == 1)}")
print(f"Number of bad sequences: {np.sum(labels == 0)}")
# Split the data into train and test sets
X_train, X_test, y_train, y_test, filenames_train, filenames_test = train_test_split(
processed_sequences, labels, filenames,
test_size=0.2, random_state=42, stratify=labels
)
print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")
print(f"Training good sequences: {np.sum(y_train == 1)}")
print(f"Training bad sequences: {np.sum(y_train == 0)}")
print(f"Test good sequences: {np.sum(y_test == 1)}")
print(f"Test bad sequences: {np.sum(y_test == 0)}")
# Determine directory name for saving files
dir_name = base_dir.name.replace(' ', '_').replace('-', '_')
# Save the processed data with directory-specific naming
np.save(output_dir / f"sequences_{dir_name}_train.npy", X_train)
np.save(output_dir / f"sequences_{dir_name}_test.npy", X_test)
np.save(output_dir / f"labels_{dir_name}_train.npy", y_train)
np.save(output_dir / f"labels_{dir_name}_test.npy", y_test)
# Also save as CSV for easier inspection
# Reshape sequences to 2D for CSV export
n_train, n_frames, n_features = X_train.shape
n_test = X_test.shape[0]
reshaped_X_train = X_train.reshape(n_train, n_frames * n_features)
reshaped_X_test = X_test.reshape(n_test, n_frames * n_features)
# Create CSV for training data
df_train = pd.DataFrame(reshaped_X_train)
df_train.insert(0, 'label', y_train)
df_train.insert(0, 'filename', filenames_train)
df_train.to_csv(output_dir / f"processed_sequences_{dir_name}_train.csv", index=False)
# Create CSV for test data
df_test = pd.DataFrame(reshaped_X_test)
df_test.insert(0, 'label', y_test)
df_test.insert(0, 'filename', filenames_test)
df_test.to_csv(output_dir / f"processed_sequences_{dir_name}_test.csv", index=False)
print(f"\nProcessed data for {base_dir.name} saved to {output_dir}/")
print(f"- sequences_{dir_name}_train.npy: Numpy array of shape (n_train, 10, n_features) for training")
print(f"- sequences_{dir_name}_test.npy: Numpy array of shape (n_test, 10, n_features) for testing")
print(f"- labels_{dir_name}_train.npy: Numpy array of shape (n_train,) with binary labels for training")
print(f"- labels_{dir_name}_test.npy: Numpy array of shape (n_test,) with binary labels for testing")
print(f"- processed_sequences_{dir_name}_train.csv: CSV file with training labels and filenames")
print(f"- processed_sequences_{dir_name}_test.csv: CSV file with test labels and filenames")
if __name__ == "__main__":
main()
|