| |
| """ |
| Grasp Phase Timing Analysis — Flagship visualization for the paper. |
| |
| Classic neuroscience finding: |
| Eye gaze → EMG activation → Hand motion → Pressure contact |
| |
| This script: |
| 1. Detects grasp events (pressure onset: 0 → >5g) |
| 2. Looks back in time to find: |
| - EMG envelope activation onset |
| - Hand velocity peak (from MoCap) |
| - Eye gaze fixation (if available) |
| 3. Computes statistics over all grasp events |
| 4. Produces the canonical "grasp phase" timing figure |
| """ |
|
|
| import os |
| import glob |
| import json |
| import numpy as np |
| import pandas as pd |
| import matplotlib |
| matplotlib.use('Agg') |
| import matplotlib.pyplot as plt |
| from scipy import signal as scisig |
| from collections import defaultdict |
|
|
| DATASET_DIR = "${PULSE_ROOT}/dataset" |
| OUTPUT_DIR = "${PULSE_ROOT}/results/grasp_phase" |
| SAMPLING_RATE = 100 |
| PRESSURE_THRESHOLD = 5.0 |
| CONTEXT_WINDOW_SEC = 2.0 |
| CONTEXT_FRAMES = int(CONTEXT_WINDOW_SEC * SAMPLING_RATE) |
|
|
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
| def load_pressure(scenario_dir): |
| """Load pressure data and return (T, 2) array: [right_total, left_total].""" |
| f = os.path.join(scenario_dir, "aligned_pressure_100hz.csv") |
| if not os.path.exists(f): |
| return None |
| df = pd.read_csv(f, low_memory=False) |
| r_cols = [c for c in df.columns if c.startswith('R') and c.endswith('(g)')] |
| l_cols = [c for c in df.columns if c.startswith('L') and c.endswith('(g)')] |
| if not r_cols or not l_cols: |
| return None |
| r = df[r_cols].apply(pd.to_numeric, errors='coerce').fillna(0).values.sum(axis=1) |
| l = df[l_cols].apply(pd.to_numeric, errors='coerce').fillna(0).values.sum(axis=1) |
| return np.stack([r, l], axis=1) |
|
|
|
|
| def load_emg(scenario_dir): |
| """Load EMG data: (T, 8) array.""" |
| f = os.path.join(scenario_dir, "aligned_emg_100hz.csv") |
| if not os.path.exists(f): |
| return None |
| df = pd.read_csv(f, low_memory=False) |
| |
| numeric_cols = df.select_dtypes(include=[np.number]).columns |
| numeric_cols = [c for c in numeric_cols if c not in ('Frame', 'Time', 'time', 'UTC')] |
| if len(numeric_cols) < 4: |
| return None |
| arr = df[numeric_cols].values.astype(np.float32) |
| arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) |
| return arr |
|
|
|
|
| def load_mocap(scenario_dir, vol, scenario): |
| """Load MoCap hand position, return (T, 3) right hand velocity magnitude, (T, 3) left hand.""" |
| f = os.path.join(scenario_dir, f"aligned_{vol}{scenario}_s_Q.tsv") |
| if not os.path.exists(f): |
| return None, None |
| df = pd.read_csv(f, sep='\t', low_memory=False) |
| |
| |
| r_cols = [c for c in df.columns if 'RightHand' in c and (c.endswith('_X') or c.endswith('_Y') or c.endswith('_Z'))] |
| l_cols = [c for c in df.columns if 'LeftHand' in c and (c.endswith('_X') or c.endswith('_Y') or c.endswith('_Z'))] |
| if not r_cols or not l_cols: |
| |
| r_cols = [c for c in df.columns if 'R_Hand' in c or 'RHand' in c][:3] |
| l_cols = [c for c in df.columns if 'L_Hand' in c or 'LHand' in c][:3] |
| if not r_cols or not l_cols: |
| return None, None |
|
|
| r_pos = df[r_cols[:3]].apply(pd.to_numeric, errors='coerce').fillna(0).values |
| l_pos = df[l_cols[:3]].apply(pd.to_numeric, errors='coerce').fillna(0).values |
| return r_pos, l_pos |
|
|
|
|
| def compute_emg_envelope(emg, window_size=20): |
| """Rectify and low-pass filter EMG to get envelope.""" |
| |
| rectified = np.abs(emg - np.mean(emg, axis=0)) |
| |
| kernel = np.ones(window_size) / window_size |
| envelope = np.zeros_like(rectified) |
| for ch in range(rectified.shape[1]): |
| envelope[:, ch] = np.convolve(rectified[:, ch], kernel, mode='same') |
| |
| total = envelope.sum(axis=1) |
| if total.max() > total.min(): |
| total = (total - total.min()) / (total.max() - total.min() + 1e-8) |
| return total |
|
|
|
|
| def compute_velocity(position, window=3): |
| """Compute velocity magnitude from 3D position.""" |
| vel = np.zeros_like(position) |
| vel[1:] = position[1:] - position[:-1] |
| vel_mag = np.linalg.norm(vel, axis=1) |
| |
| kernel = np.ones(window) / window |
| vel_mag = np.convolve(vel_mag, kernel, mode='same') |
| return vel_mag |
|
|
|
|
| def detect_grasp_events(pressure_1d, threshold=5.0, min_duration=10, min_gap=50): |
| """Detect pressure onset events (0 → >threshold). |
| |
| Returns list of onset frame indices. |
| """ |
| above = pressure_1d > threshold |
| |
| onsets = [] |
| last_state = False |
| stable_counter = 0 |
| for i, a in enumerate(above): |
| if a and not last_state: |
| |
| if i + min_duration < len(above) and np.mean(above[i:i+min_duration]) > 0.7: |
| if not onsets or i - onsets[-1] > min_gap: |
| onsets.append(i) |
| last_state = True |
| elif not a and last_state: |
| |
| if i + 5 < len(above) and np.mean(above[i:i+5]) < 0.3: |
| last_state = False |
| return onsets |
|
|
|
|
| def find_signal_onset(signal, ref_idx, window_frames, threshold_ratio=0.3): |
| """Find the LATEST pre-contact onset of signal activation. |
| |
| Strategy: walk backward from ref_idx. Look for the last sample that's |
| still 'active' (> baseline + threshold_ratio * (peak-baseline)). |
| The first 'inactive' sample going backward marks the onset. |
| |
| Returns: frame index of onset relative to ref_idx (negative = before). |
| """ |
| start = max(0, ref_idx - window_frames) |
| segment = signal[start:ref_idx + 1] |
| if len(segment) < 10: |
| return None |
|
|
| |
| |
| early_part = segment[:max(10, int(len(segment) * 0.3))] |
| baseline = np.percentile(early_part, 25) |
|
|
| |
| peak = np.max(segment) |
| if peak - baseline < 1e-4: |
| return None |
|
|
| threshold = baseline + (peak - baseline) * threshold_ratio |
|
|
| |
| |
| above = segment > threshold |
| if not above[-1]: |
| |
| |
| rising = np.where(np.diff(above.astype(int)) == 1)[0] |
| if len(rising) == 0: |
| return None |
| onset_local = rising[-1] + 1 |
| else: |
| |
| onset_local = len(segment) - 1 |
| while onset_local > 0 and above[onset_local - 1]: |
| onset_local -= 1 |
|
|
| onset_global = start + onset_local |
| return onset_global - ref_idx |
|
|
|
|
| def is_clean_grasp(emg_env, velocity, pressure_trace, onset, look_back=150, rest_window=50): |
| """Check if this is a CLEAN grasp starting from rest. |
| |
| Requires: EMG and velocity are both low in the REST window (onset-150 ~ onset-100). |
| """ |
| rest_start = onset - look_back |
| rest_end = onset - (look_back - rest_window) |
| if rest_start < 0: |
| return False |
|
|
| |
| emg_rest = emg_env[rest_start:rest_end].mean() |
| vel_rest = velocity[rest_start:rest_end].mean() |
|
|
| |
| emg_pre = emg_env[rest_end:onset] |
| vel_pre = velocity[rest_end:onset] |
|
|
| if len(emg_pre) < 10: |
| return False |
|
|
| |
| emg_active = np.percentile(emg_pre, 75) |
| vel_active = np.percentile(vel_pre, 75) |
|
|
| emg_increase = emg_active - emg_rest |
| vel_increase = vel_active - vel_rest |
|
|
| |
| emg_dyn = emg_env.max() - emg_env.min() |
| vel_dyn = velocity.max() - velocity.min() |
|
|
| if emg_dyn < 1e-6 or vel_dyn < 1e-6: |
| return False |
|
|
| return (emg_increase / emg_dyn > 0.1) and (vel_increase / vel_dyn > 0.1) |
|
|
|
|
| def analyze_one_scenario(vol, scenario): |
| """Analyze clean grasp events starting from rest.""" |
| scenario_dir = os.path.join(DATASET_DIR, vol, scenario) |
|
|
| pressure = load_pressure(scenario_dir) |
| emg = load_emg(scenario_dir) |
| mocap_r, mocap_l = load_mocap(scenario_dir, vol, scenario) |
|
|
| if pressure is None or emg is None or mocap_r is None: |
| return None |
|
|
| min_len = min(pressure.shape[0], emg.shape[0], mocap_r.shape[0]) |
| pressure = pressure[:min_len] |
| emg = emg[:min_len] |
| mocap_r = mocap_r[:min_len] |
| mocap_l = mocap_l[:min_len] |
|
|
| emg_env = compute_emg_envelope(emg) |
| vel_r = compute_velocity(mocap_r) |
| vel_l = compute_velocity(mocap_l) |
|
|
| events = [] |
|
|
| for hand_name, hand_pressure, hand_vel in [ |
| ('right', pressure[:, 0], vel_r), |
| ('left', pressure[:, 1], vel_l), |
| ]: |
| onsets = detect_grasp_events(hand_pressure, threshold=PRESSURE_THRESHOLD) |
| for onset in onsets: |
| if onset < CONTEXT_FRAMES: |
| continue |
|
|
| |
| if not is_clean_grasp(emg_env, hand_vel, hand_pressure, onset): |
| continue |
|
|
| |
| emg_delay = find_signal_onset(emg_env, onset, CONTEXT_FRAMES, threshold_ratio=0.3) |
| motion_delay = find_signal_onset(hand_vel, onset, CONTEXT_FRAMES, threshold_ratio=0.3) |
| if emg_delay is None or motion_delay is None: |
| continue |
|
|
| |
| if emg_delay * 10 < -1500 or emg_delay * 10 > 0: |
| continue |
| if motion_delay * 10 < -1500 or motion_delay * 10 > 0: |
| continue |
|
|
| start = onset - CONTEXT_FRAMES |
| end = onset + 50 |
| events.append({ |
| 'pressure': hand_pressure[start:end], |
| 'emg': emg_env[start:end], |
| 'velocity': hand_vel[start:end], |
| 'hand': hand_name, |
| 'onset_idx': onset, |
| 'emg_delay_ms': emg_delay * 10, |
| 'motion_delay_ms': motion_delay * 10, |
| }) |
|
|
| return events |
|
|
|
|
| def main(): |
| all_events = [] |
| stats = defaultdict(int) |
|
|
| for vol_dir in sorted(glob.glob(f"{DATASET_DIR}/v*")): |
| vol = os.path.basename(vol_dir) |
| for scenario_dir in sorted(glob.glob(f"{vol_dir}/s*")): |
| scenario = os.path.basename(scenario_dir) |
| meta_path = os.path.join(scenario_dir, 'alignment_metadata.json') |
| if not os.path.exists(meta_path): |
| continue |
| meta = json.load(open(meta_path)) |
| |
| if not {'pressure', 'emg', 'mocap'}.issubset(set(meta['modalities'])): |
| stats['no_modality'] += 1 |
| continue |
|
|
| events = analyze_one_scenario(vol, scenario) |
| if events is None: |
| stats['load_error'] += 1 |
| continue |
| all_events.extend(events) |
| stats['scenarios'] += 1 |
| stats['events'] += len(events) |
| print(f"[{vol}/{scenario}] {len(events)} grasp events", flush=True) |
|
|
| print(f"\n=== Summary ===") |
| print(f"Scenarios processed: {stats['scenarios']}") |
| print(f"Total grasp events: {stats['events']}") |
| print(f"Loading errors: {stats['load_error']}") |
| print(f"Missing modality: {stats['no_modality']}") |
|
|
| if not all_events: |
| print("No events found!") |
| return |
|
|
| |
| emg_delays = np.array([e['emg_delay_ms'] for e in all_events]) |
| motion_delays = np.array([e['motion_delay_ms'] for e in all_events]) |
|
|
| print(f"\n=== Timing Statistics (ms, negative = before contact) ===") |
| print(f"EMG onset delay: mean={emg_delays.mean():.1f} median={np.median(emg_delays):.1f} std={emg_delays.std():.1f}") |
| print(f"Motion peak delay: mean={motion_delays.mean():.1f} median={np.median(motion_delays):.1f} std={motion_delays.std():.1f}") |
|
|
| |
| stats_dict = { |
| 'n_events': len(all_events), |
| 'emg_delay_ms': {'mean': float(emg_delays.mean()), 'median': float(np.median(emg_delays)), |
| 'std': float(emg_delays.std()), 'p25': float(np.percentile(emg_delays, 25)), |
| 'p75': float(np.percentile(emg_delays, 75))}, |
| 'motion_delay_ms': {'mean': float(motion_delays.mean()), 'median': float(np.median(motion_delays)), |
| 'std': float(motion_delays.std()), 'p25': float(np.percentile(motion_delays, 25)), |
| 'p75': float(np.percentile(motion_delays, 75))}, |
| } |
| with open(os.path.join(OUTPUT_DIR, 'timing_stats.json'), 'w') as f: |
| json.dump(stats_dict, f, indent=2) |
|
|
| |
| |
| valid = [e for e in all_events if len(e['pressure']) == CONTEXT_FRAMES + 50] |
| print(f"\nEvents with full context: {len(valid)} / {len(all_events)}") |
|
|
| if len(valid) < 10: |
| print("Not enough events for plotting") |
| return |
|
|
| |
| def normalize(sigs): |
| sigs = np.stack(sigs) |
| |
| sigs = sigs - sigs.min(axis=1, keepdims=True) |
| maxs = sigs.max(axis=1, keepdims=True) |
| sigs = sigs / (maxs + 1e-8) |
| return sigs |
|
|
| pressure_stack = normalize([e['pressure'] for e in valid]) |
| emg_stack = normalize([e['emg'] for e in valid]) |
| vel_stack = normalize([e['velocity'] for e in valid]) |
|
|
| time_axis = np.arange(-CONTEXT_FRAMES, 50) * 10 |
|
|
| fig, ax = plt.subplots(figsize=(9, 5)) |
|
|
| |
| for sigs, color, label in [ |
| (emg_stack, '#E74C3C', 'EMG envelope'), |
| (vel_stack, '#3498DB', 'Hand velocity'), |
| (pressure_stack, '#27AE60', 'Pressure (contact)'), |
| ]: |
| mean = sigs.mean(axis=0) |
| std = sigs.std(axis=0) |
| ax.plot(time_axis, mean, color=color, linewidth=2.5, label=label) |
| ax.fill_between(time_axis, mean - std * 0.5, mean + std * 0.5, color=color, alpha=0.15) |
|
|
| ax.axvline(0, color='black', linestyle='--', linewidth=1.2, alpha=0.7, label='Contact onset') |
| ax.axvline(emg_delays.mean(), color='#E74C3C', linestyle=':', alpha=0.8) |
| ax.axvline(motion_delays.mean(), color='#3498DB', linestyle=':', alpha=0.8) |
|
|
| |
| ax.annotate(f'EMG\n{emg_delays.mean():.0f}ms', |
| xy=(emg_delays.mean(), 0.85), ha='center', fontsize=10, color='#C0392B', |
| bbox=dict(boxstyle="round,pad=0.3", fc='#FADBD8', ec='#E74C3C', alpha=0.9)) |
| ax.annotate(f'Motion\n{motion_delays.mean():.0f}ms', |
| xy=(motion_delays.mean(), 0.65), ha='center', fontsize=10, color='#1F618D', |
| bbox=dict(boxstyle="round,pad=0.3", fc='#D6EAF8', ec='#3498DB', alpha=0.9)) |
|
|
| ax.set_xlabel('Time relative to contact onset (ms)', fontsize=12) |
| ax.set_ylabel('Normalized amplitude', fontsize=12) |
| ax.set_title(f'Grasp Phase Timing ({len(valid)} events, {stats["scenarios"]} recordings)', |
| fontsize=13, fontweight='bold') |
| ax.set_xlim(-CONTEXT_WINDOW_SEC * 1000, 500) |
| ax.legend(loc='upper left', frameon=True, fontsize=10) |
| ax.grid(True, alpha=0.3) |
| ax.set_ylim(-0.05, 1.1) |
|
|
| plt.tight_layout() |
| fig_path = os.path.join(OUTPUT_DIR, 'grasp_phase_timing.png') |
| plt.savefig(fig_path, dpi=150, bbox_inches='tight') |
| plt.savefig(fig_path.replace('.png', '.pdf'), bbox_inches='tight') |
| print(f"Saved figure: {fig_path}") |
|
|
| |
| fig, axes = plt.subplots(1, 2, figsize=(11, 4)) |
|
|
| axes[0].hist(emg_delays, bins=30, color='#E74C3C', alpha=0.7, edgecolor='black') |
| axes[0].axvline(emg_delays.mean(), color='black', linestyle='--', linewidth=2, label=f'Mean: {emg_delays.mean():.0f}ms') |
| axes[0].axvline(np.median(emg_delays), color='grey', linestyle=':', linewidth=2, label=f'Median: {np.median(emg_delays):.0f}ms') |
| axes[0].set_xlabel('EMG onset - Contact onset (ms)', fontsize=11) |
| axes[0].set_ylabel('Count', fontsize=11) |
| axes[0].set_title('EMG → Contact Delay', fontsize=12, fontweight='bold') |
| axes[0].legend(fontsize=10) |
| axes[0].grid(True, alpha=0.3) |
|
|
| axes[1].hist(motion_delays, bins=30, color='#3498DB', alpha=0.7, edgecolor='black') |
| axes[1].axvline(motion_delays.mean(), color='black', linestyle='--', linewidth=2, label=f'Mean: {motion_delays.mean():.0f}ms') |
| axes[1].axvline(np.median(motion_delays), color='grey', linestyle=':', linewidth=2, label=f'Median: {np.median(motion_delays):.0f}ms') |
| axes[1].set_xlabel('Motion onset - Contact onset (ms)', fontsize=11) |
| axes[1].set_ylabel('Count', fontsize=11) |
| axes[1].set_title('Hand Motion → Contact Delay', fontsize=12, fontweight='bold') |
| axes[1].legend(fontsize=10) |
| axes[1].grid(True, alpha=0.3) |
|
|
| plt.tight_layout() |
| fig2_path = os.path.join(OUTPUT_DIR, 'delay_distributions.png') |
| plt.savefig(fig2_path, dpi=150, bbox_inches='tight') |
| plt.savefig(fig2_path.replace('.png', '.pdf'), bbox_inches='tight') |
| print(f"Saved figure: {fig2_path}") |
|
|
| print(f"\nAll outputs saved to: {OUTPUT_DIR}") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|