#!/usr/bin/env python3 """ End-to-end task suite for one Xperience-10M episode released by Ropedia. The purpose is not to estimate generalization from one sample episode. It is to turn the episode into multiple meaningful supervised/self-supervised learning problems and write reproducible artifacts for each one. """ from __future__ import annotations import argparse import csv import json import math import sys from collections import Counter, OrderedDict from pathlib import Path import numpy as np from train_all_modalities_model import ( VIDEO_FILES, extract_all_window_features, prepare_modalities, ) from train_min_action_model import ( add_toolkit_to_path, compute_metrics, encode_labels, fit_scaler, frame_label, majority_label, predict, portable_path, softmax, train_softmax_classifier, ) TASKS = [ "timeline_action", "timeline_subtask", "transition_detection", "next_action", "hand_trajectory_forecast", "contact_prediction", "object_relevance", "caption_grounding", "cross_modal_retrieval", "modality_reconstruction", "temporal_order", "misalignment_detection", ] def parse_args() -> argparse.Namespace: workspace_default = Path(__file__).resolve().parents[1] annotation_default = workspace_default / "data/sample/xperience-10m-sample/annotation.hdf5" parser = argparse.ArgumentParser(description="Run an end-to-end task suite on one Xperience-10M episode.") parser.add_argument("--workspace", type=Path, default=workspace_default) parser.add_argument("--annotation", type=Path, default=annotation_default) parser.add_argument("--output-dir", type=Path, default=workspace_default / "outputs/episode_task_suite") parser.add_argument("--cache-dir", type=Path, default=workspace_default / "outputs/feature_cache") parser.add_argument("--window-frames", type=int, default=20) parser.add_argument("--stride-frames", type=int, default=5) parser.add_argument("--min-label-fraction", type=float, default=0.6) parser.add_argument("--test-fraction", type=float, default=0.30) parser.add_argument("--epochs", type=int, default=400) parser.add_argument("--learning-rate", type=float, default=0.12) parser.add_argument("--l2", type=float, default=2e-3) parser.add_argument("--ridge-l2", type=float, default=10.0) parser.add_argument("--seed", type=int, default=7) parser.add_argument("--future-frames", type=int, default=20, help="Future offset for next-action prediction.") parser.add_argument("--forecast-frames", type=int, default=10, help="Future hand trajectory length.") parser.add_argument("--boundary-tolerance-frames", type=int, default=10) parser.add_argument("--misalignment-shift-windows", type=int, default=8) parser.add_argument("--tasks", default="all", help="Comma-separated task list or 'all'.") # Match train_all_modalities_model defaults used by prepare_modalities. parser.add_argument("--force-rebuild-cache", action="store_true") parser.add_argument("--video-image-size", type=int, default=32) parser.add_argument("--video-grid-size", type=int, default=8) parser.add_argument("--video-hist-bins", type=int, default=8) parser.add_argument("--depth-grid-size", type=int, default=8) parser.add_argument("--text-hash-dim", type=int, default=128) parser.add_argument("--audio-source", choices=list(VIDEO_FILES), default="fisheye_cam0") parser.add_argument("--audio-sample-rate", type=int, default=16000) parser.add_argument("--audio-band-count", type=int, default=16) parser.add_argument("--include-label-text", action="store_true") parser.add_argument("--no-class-weights", action="store_true") parser.add_argument("--include-neural", action="store_true", help="Also run lightweight PyTorch MLP baselines for selected tasks.") parser.add_argument("--neural-output-name", default="neural_mlp", help="Subdirectory under --output-dir for neural task artifacts.") parser.add_argument("--neural-epochs", type=int, default=80) parser.add_argument("--neural-learning-rate", type=float, default=1e-3) parser.add_argument("--neural-weight-decay", type=float, default=1e-4) parser.add_argument("--neural-hidden-dim", type=int, default=128) parser.add_argument("--neural-batch-size", type=int, default=128) parser.add_argument("--neural-dropout", type=float, default=0.10) parser.add_argument("--neural-device", default="auto", choices=["auto", "cpu", "cuda"]) return parser.parse_args() def selected_tasks(spec: str) -> list[str]: if spec.strip().lower() == "all": return TASKS chosen = [x.strip() for x in spec.split(",") if x.strip()] unknown = [x for x in chosen if x not in TASKS] if unknown: raise ValueError(f"Unknown tasks: {unknown}. Valid tasks: {TASKS}") return chosen def write_json(path: Path, data: dict | list) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2), encoding="utf-8") def write_csv(path: Path, rows: list[dict], fieldnames: list[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", newline="", encoding="utf-8") as fp: writer = csv.DictWriter(fp, fieldnames=fieldnames, lineterminator="\n") writer.writeheader() writer.writerows(rows) def write_confusion(path: Path, cm: np.ndarray, class_names: list[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", newline="", encoding="utf-8") as fp: writer = csv.writer(fp, lineterminator="\n") writer.writerow(["true\\pred"] + class_names) for i, name in enumerate(class_names): writer.writerow([name] + [int(v) for v in cm[i]]) def chronological_split_indices(n: int, test_fraction: float) -> tuple[np.ndarray, np.ndarray]: if n < 2: raise ValueError("Need at least two samples for train/test split.") split = int(round(n * (1.0 - test_fraction))) split = max(1, min(split, n - 1)) return np.arange(split, dtype=np.int64), np.arange(split, n, dtype=np.int64) def build_windows(args: argparse.Namespace, ann: dict, extras: dict): frame_info = ann["caption_frame_info_map"] n_frames = len(ann["img_names"]) rows = [] X = [] feature_manifest = None for start in range(0, n_frames - args.window_frames + 1, args.stride_frames): end = start + args.window_frames action_labels = [frame_label(frame_info.get(i, {}), "action") for i in range(start, end)] subtask_labels = [frame_label(frame_info.get(i, {}), "subtask") for i in range(start, end)] action, action_frac = majority_label(action_labels, args.min_label_fraction) subtask, subtask_frac = majority_label(subtask_labels, args.min_label_fraction) if feature_manifest is None: vec, blocks = extract_all_window_features(ann, extras, start, end, return_blocks=True) offset = 0 feature_manifest = [] for name, dim in blocks: feature_manifest.append({"name": name, "start": offset, "end": offset + dim, "dim": dim}) offset += dim else: vec = extract_all_window_features(ann, extras, start, end) X.append(vec) rows.append({ "window_index": len(rows), "start_frame": start, "end_frame": end - 1, "center_frame": (start + end - 1) // 2, "action_label": action, "action_fraction": action_frac, "subtask_label": subtask, "subtask_fraction": subtask_frac, }) return np.stack(X).astype(np.float32), rows, feature_manifest or [] def block_indices(feature_manifest: list[dict], include: list[str] | None = None, exclude: list[str] | None = None) -> np.ndarray: include = include or [] exclude = exclude or [] idxs = [] for block in feature_manifest: name = block["name"] if include and not any(name == p or name.startswith(p) for p in include): continue if exclude and any(name == p or name.startswith(p) for p in exclude): continue idxs.extend(range(int(block["start"]), int(block["end"]))) return np.asarray(idxs, dtype=np.int64) def label_array(rows: list[dict], key: str) -> np.ndarray: return np.asarray([str(row.get(key, "") or "") for row in rows], dtype=object) def classification_task( out_dir: Path, X: np.ndarray, labels: np.ndarray, rows: list[dict], args: argparse.Namespace, task_name: str, input_description: str, ) -> dict: out_dir.mkdir(parents=True, exist_ok=True) valid = np.asarray([bool(x) for x in labels]) valid_idx = np.flatnonzero(valid) Xv = X[valid_idx] labelv = labels[valid_idx] rowv = [rows[int(i)] for i in valid_idx] y, class_names = encode_labels(labelv) train_local, test_local = chronological_split_indices(len(y), args.test_fraction) train_classes = set(int(x) for x in y[train_local]) test_classes = set(int(x) for x in y[test_local]) unseen_test_classes = sorted(class_names[i] for i in (test_classes - train_classes)) mean, std = fit_scaler(Xv[train_local]) Xs = (Xv - mean) / std W, b, history = train_softmax_classifier( Xs[train_local], y[train_local], n_classes=len(class_names), epochs=args.epochs, lr=args.learning_rate, l2=args.l2, use_class_weights=not args.no_class_weights, seed=args.seed, ) pred, probs = predict(Xs[test_local], W, b) metrics, per_class, cm = compute_metrics(y[test_local], pred, class_names) majority = Counter(y[train_local]).most_common(1)[0][0] metrics.update({ "task": task_name, "input": input_description, "split": "chronological", "num_windows": int(len(y)), "num_train_windows": int(len(train_local)), "num_test_windows": int(len(test_local)), "num_classes": int(len(class_names)), "feature_dim": int(X.shape[1]), "majority_baseline_accuracy": float(np.mean(y[test_local] == majority)), "train_final_accuracy": float(history[-1]["train_accuracy"]), "train_final_loss": float(history[-1]["loss"]), "unseen_test_classes": unseen_test_classes, }) pred_rows = [] for local_pos, pred_id in zip(test_local, pred): row = rowv[int(local_pos)] true_id = int(y[int(local_pos)]) pred_rows.append({ "window_index": row["window_index"], "start_frame": row["start_frame"], "end_frame": row["end_frame"], "center_frame": row["center_frame"], "true_label": class_names[true_id], "predicted_label": class_names[int(pred_id)], "confidence": float(probs[list(test_local).index(local_pos), int(pred_id)]), "correct": int(true_id == int(pred_id)), }) write_json(out_dir / "metrics.json", metrics) write_csv(out_dir / "per_class_metrics.csv", per_class, ["class_id", "class_name", "support", "predicted", "precision", "recall", "f1"]) write_confusion(out_dir / "confusion_matrix.csv", cm, class_names) write_csv(out_dir / "predictions.csv", pred_rows, ["window_index", "start_frame", "end_frame", "center_frame", "true_label", "predicted_label", "confidence", "correct"]) np.savez_compressed(out_dir / "model.npz", mean=mean, std=std, W=W, b=b, class_names=np.asarray(class_names, dtype=object)) return metrics def binary_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict: y_true = y_true.astype(np.int64) y_pred = y_pred.astype(np.int64) tp = int(np.sum((y_true == 1) & (y_pred == 1))) tn = int(np.sum((y_true == 0) & (y_pred == 0))) fp = int(np.sum((y_true == 0) & (y_pred == 1))) fn = int(np.sum((y_true == 1) & (y_pred == 0))) precision = tp / (tp + fp) if tp + fp else 0.0 recall = tp / (tp + fn) if tp + fn else 0.0 f1 = 2 * precision * recall / (precision + recall) if precision + recall else 0.0 return { "accuracy": float((tp + tn) / max(len(y_true), 1)), "precision": precision, "recall": recall, "f1": f1, "tp": tp, "tn": tn, "fp": fp, "fn": fn, "positive_rate_true": float(np.mean(y_true)) if len(y_true) else 0.0, "positive_rate_pred": float(np.mean(y_pred)) if len(y_pred) else 0.0, } def boundary_f1(true_frames: list[int], pred_frames: list[int], tolerance: int) -> dict: used = set() matches = 0 errors = [] for pf in pred_frames: candidates = [(abs(pf - tf), j, tf) for j, tf in enumerate(true_frames) if j not in used and abs(pf - tf) <= tolerance] if not candidates: continue diff, j, tf = min(candidates) used.add(j) matches += 1 errors.append(diff) precision = matches / len(pred_frames) if pred_frames else 0.0 recall = matches / len(true_frames) if true_frames else 0.0 f1 = 2 * precision * recall / (precision + recall) if precision + recall else 0.0 return { "boundary_precision": precision, "boundary_recall": recall, "boundary_f1": f1, "matched_boundaries": matches, "true_boundaries": len(true_frames), "predicted_boundaries": len(pred_frames), "mean_abs_timing_error_frames": float(np.mean(errors)) if errors else None, } def task_transition_detection(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, args: argparse.Namespace) -> dict: frame_info = ann["caption_frame_info_map"] n_frames = len(ann["img_names"]) per_frame = [frame_label(frame_info.get(i, {}), "action") for i in range(n_frames)] true_boundaries = [i for i in range(1, n_frames) if per_frame[i] and per_frame[i - 1] and per_frame[i] != per_frame[i - 1]] y = [] for row in rows: c = int(row["center_frame"]) y.append(int(any(abs(c - b) <= args.boundary_tolerance_frames for b in true_boundaries))) labels = np.asarray(["transition" if v else "steady" for v in y], dtype=object) metrics = classification_task(out_dir, X, labels, rows, args, "transition_detection", "all modalities -> action boundary/steady") pred_path = out_dir / "predictions.csv" pred_rows = [] with pred_path.open("r", encoding="utf-8") as fp: for row in csv.DictReader(fp): pred_rows.append(row) pred_frames = [int(r["center_frame"]) for r in pred_rows if r["predicted_label"] == "transition"] test_start = min((int(r["center_frame"]) for r in pred_rows), default=0) test_end = max((int(r["center_frame"]) for r in pred_rows), default=0) true_test = [b for b in true_boundaries if test_start <= b <= test_end] metrics.update(boundary_f1(true_test, pred_frames, args.boundary_tolerance_frames)) write_json(out_dir / "metrics.json", metrics) write_csv(out_dir / "true_boundaries.csv", [{"frame": x} for x in true_boundaries], ["frame"]) return metrics def task_next_action(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, args: argparse.Namespace) -> dict: frame_info = ann["caption_frame_info_map"] labels = [] for row in rows: future_frame = min(len(ann["img_names"]) - 1, int(row["end_frame"]) + args.future_frames) labels.append(frame_label(frame_info.get(future_frame, {}), "action")) return classification_task(out_dir, X, np.asarray(labels, dtype=object), rows, args, "next_action", f"all modalities at t -> action at t+{args.future_frames} frames") def ridge_fit_predict(X_train: np.ndarray, Y_train: np.ndarray, X_test: np.ndarray, l2: float): x_mean, x_std = fit_scaler(X_train) y_mean = Y_train.mean(axis=0) y_std = Y_train.std(axis=0) y_std = np.where(y_std < 1e-6, 1.0, y_std) Xtr = (X_train - x_mean) / x_std Xte = (X_test - x_mean) / x_std Ytr = (Y_train - y_mean) / y_std Xtr_aug = np.concatenate([Xtr, np.ones((len(Xtr), 1), dtype=np.float32)], axis=1) Xte_aug = np.concatenate([Xte, np.ones((len(Xte), 1), dtype=np.float32)], axis=1) K = Xtr_aug @ Xtr_aug.T alpha = np.linalg.solve(K + l2 * np.eye(K.shape[0], dtype=np.float32), Ytr) W = Xtr_aug.T @ alpha pred = (Xte_aug @ W) * y_std + y_mean return pred.astype(np.float32), {"x_mean": x_mean, "x_std": x_std, "y_mean": y_mean.astype(np.float32), "y_std": y_std.astype(np.float32), "W": W.astype(np.float32)} def regression_metrics(Y_true: np.ndarray, Y_pred: np.ndarray) -> dict: mse = float(np.mean((Y_true - Y_pred) ** 2)) mae = float(np.mean(np.abs(Y_true - Y_pred))) ss_res = float(np.sum((Y_true - Y_pred) ** 2)) ss_tot = float(np.sum((Y_true - Y_true.mean(axis=0)) ** 2)) r2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0 return {"mse": mse, "mae": mae, "r2": r2} def neural_config(args: argparse.Namespace): from neural_task_models import NeuralConfig return NeuralConfig( epochs=args.neural_epochs, learning_rate=args.neural_learning_rate, weight_decay=args.neural_weight_decay, hidden_dim=args.neural_hidden_dim, batch_size=args.neural_batch_size, dropout=args.neural_dropout, device=args.neural_device, seed=args.seed, ) def neural_common_metrics(args: argparse.Namespace, result: dict, head: str) -> dict: final = result["history"][-1] if result.get("history") else {} metrics = { "model": "neural_mlp", "head": head, "neural_epochs": int(args.neural_epochs), "neural_hidden_dim": int(args.neural_hidden_dim), "neural_batch_size": int(args.neural_batch_size), "neural_learning_rate": float(args.neural_learning_rate), "neural_weight_decay": float(args.neural_weight_decay), "neural_dropout": float(args.neural_dropout), "neural_device": result.get("device", args.neural_device), } if "loss" in final: metrics["train_final_loss"] = float(final["loss"]) if "train_accuracy" in final: metrics["train_final_accuracy"] = float(final["train_accuracy"]) return metrics def save_neural_model(out_dir: Path, result: dict, model_type: str, extra: dict | None = None) -> None: from neural_task_models import save_torch_model payload = { "model_type": model_type, "state_dict": result["state_dict"], "scaler": {k: result[k] for k in ("mean", "std", "x_mean", "x_std", "y_mean", "y_std") if k in result}, "config": extra or {}, } save_torch_model(out_dir / "model.pt", payload) def neural_classification_task( out_dir: Path, X: np.ndarray, labels: np.ndarray, rows: list[dict], args: argparse.Namespace, task_name: str, input_description: str, ) -> dict: from neural_task_models import train_classifier out_dir.mkdir(parents=True, exist_ok=True) valid = np.asarray([bool(x) for x in labels]) valid_idx = np.flatnonzero(valid) Xv = X[valid_idx] labelv = labels[valid_idx] rowv = [rows[int(i)] for i in valid_idx] y, class_names = encode_labels(labelv) train_local, test_local = chronological_split_indices(len(y), args.test_fraction) train_classes = set(int(x) for x in y[train_local]) test_classes = set(int(x) for x in y[test_local]) unseen_test_classes = sorted(class_names[i] for i in (test_classes - train_classes)) result = train_classifier( Xv, y, train_local, test_local, n_classes=len(class_names), config=neural_config(args), use_class_weights=not args.no_class_weights, ) pred = result["pred"] probs = result["prob"] metrics, per_class, cm = compute_metrics(y[test_local], pred, class_names) majority = Counter(y[train_local]).most_common(1)[0][0] metrics.update({ "task": task_name, "input": input_description, "split": "chronological", "num_windows": int(len(y)), "num_train_windows": int(len(train_local)), "num_test_windows": int(len(test_local)), "num_classes": int(len(class_names)), "feature_dim": int(X.shape[1]), "majority_baseline_accuracy": float(np.mean(y[test_local] == majority)), "unseen_test_classes": unseen_test_classes, }) metrics.update(neural_common_metrics(args, result, "z-score -> MLP softmax")) pred_rows = [] for k, (local_pos, pred_id) in enumerate(zip(test_local, pred)): row = rowv[int(local_pos)] true_id = int(y[int(local_pos)]) pred_rows.append({ "window_index": row["window_index"], "start_frame": row["start_frame"], "end_frame": row["end_frame"], "center_frame": row["center_frame"], "true_label": class_names[true_id], "predicted_label": class_names[int(pred_id)], "confidence": float(probs[k, int(pred_id)]), "correct": int(true_id == int(pred_id)), }) write_json(out_dir / "metrics.json", metrics) write_json(out_dir / "history.json", result["history"]) write_csv(out_dir / "per_class_metrics.csv", per_class, ["class_id", "class_name", "support", "predicted", "precision", "recall", "f1"]) write_confusion(out_dir / "confusion_matrix.csv", cm, class_names) write_csv(out_dir / "predictions.csv", pred_rows, ["window_index", "start_frame", "end_frame", "center_frame", "true_label", "predicted_label", "confidence", "correct"]) save_neural_model(out_dir, result, "classifier", {"class_names": class_names}) return metrics def neural_transition_detection(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, args: argparse.Namespace) -> dict: frame_info = ann["caption_frame_info_map"] n_frames = len(ann["img_names"]) per_frame = [frame_label(frame_info.get(i, {}), "action") for i in range(n_frames)] true_boundaries = [i for i in range(1, n_frames) if per_frame[i] and per_frame[i - 1] and per_frame[i] != per_frame[i - 1]] labels = [] for row in rows: c = int(row["center_frame"]) labels.append("transition" if any(abs(c - b) <= args.boundary_tolerance_frames for b in true_boundaries) else "steady") metrics = neural_classification_task(out_dir, X, np.asarray(labels, dtype=object), rows, args, "transition_detection", "all modalities -> action boundary/steady") with (out_dir / "predictions.csv").open("r", encoding="utf-8") as fp: pred_rows = list(csv.DictReader(fp)) pred_frames = [int(r["center_frame"]) for r in pred_rows if r["predicted_label"] == "transition"] test_start = min((int(r["center_frame"]) for r in pred_rows), default=0) test_end = max((int(r["center_frame"]) for r in pred_rows), default=0) true_test = [b for b in true_boundaries if test_start <= b <= test_end] metrics.update(boundary_f1(true_test, pred_frames, args.boundary_tolerance_frames)) write_json(out_dir / "metrics.json", metrics) write_csv(out_dir / "true_boundaries.csv", [{"frame": x} for x in true_boundaries], ["frame"]) return metrics def neural_next_action(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, args: argparse.Namespace) -> dict: frame_info = ann["caption_frame_info_map"] labels = [] for row in rows: future_frame = min(len(ann["img_names"]) - 1, int(row["end_frame"]) + args.future_frames) labels.append(frame_label(frame_info.get(future_frame, {}), "action")) return neural_classification_task(out_dir, X, np.asarray(labels, dtype=object), rows, args, "next_action", f"all modalities at t -> action at t+{args.future_frames} frames") def neural_hand_forecast(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, args: argparse.Namespace) -> dict: from neural_task_models import train_regressor left = ann.get("hand_left_joints") right = ann.get("hand_right_joints") body = ann.get("smplh_body_joints") if left is None or right is None: raise ValueError("Hand joints not available.") valid_idx, Y = [], [] n_frames = len(left) for i, row in enumerate(rows): future_start = int(row["end_frame"]) + 1 future_end = future_start + args.forecast_frames if future_end > n_frames: continue hand = np.concatenate([left[future_start:future_end], right[future_start:future_end]], axis=1) if body is not None and future_end <= len(body): root = body[future_start:future_end, :1, :] hand = hand - root valid_idx.append(i) Y.append(hand.reshape(-1)) valid_idx = np.asarray(valid_idx, dtype=np.int64) Y = np.stack(Y).astype(np.float32) train, test = chronological_split_indices(len(valid_idx), args.test_fraction) result = train_regressor(X[valid_idx], Y, train, test, neural_config(args)) pred = result["pred"] metrics = regression_metrics(Y[test], pred) true_hand = Y[test].reshape(len(test), args.forecast_frames, 42, 3) pred_hand = pred.reshape(len(test), args.forecast_frames, 42, 3) metrics.update({ "task": "hand_trajectory_forecast", "input": "all modalities at t -> future left/right hand 3D joints", "split": "chronological", "num_windows": int(len(valid_idx)), "num_train_windows": int(len(train)), "num_test_windows": int(len(test)), "forecast_frames": int(args.forecast_frames), "mpjpe": float(np.linalg.norm(true_hand - pred_hand, axis=-1).mean()), "final_frame_mpjpe": float(np.linalg.norm(true_hand[:, -1] - pred_hand[:, -1], axis=-1).mean()), "target_dim": int(Y.shape[1]), }) metrics.update(neural_common_metrics(args, result, "z-score -> MLP regression")) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) write_json(out_dir / "history.json", result["history"]) np.savez_compressed(out_dir / "predictions.npz", y_true=Y[test], y_pred=pred, test_window_indices=valid_idx[test]) save_neural_model(out_dir, result, "regressor", {"target_dim": int(Y.shape[1])}) return metrics def neural_object_relevance(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, manifest: list[dict], args: argparse.Namespace) -> dict: from neural_task_models import train_multilabel frame_info = ann["caption_frame_info_map"] vocab = OrderedDict() labels = [] for row in rows: counts = Counter() for frame in range(int(row["start_frame"]), int(row["end_frame"]) + 1): counts.update(extract_objects(frame_info.get(frame, {}))) objects = [obj for obj, count in counts.items() if count > 0] for obj in objects: if obj not in vocab: vocab[obj] = len(vocab) labels.append(objects) if not vocab: raise ValueError("No object labels found.") Y = np.zeros((len(rows), len(vocab)), dtype=np.float32) for i, objects in enumerate(labels): for obj in objects: Y[i, vocab[obj]] = 1.0 keep = block_indices(manifest, exclude=["caption_objects_interaction_text"]) Xo = X[:, keep] train, test = chronological_split_indices(len(rows), args.test_fraction) result = train_multilabel(Xo, Y, train, test, neural_config(args)) prob = result["prob"] pred = result["pred"] empty = np.where(pred.sum(axis=1) == 0)[0] if len(empty): pred[empty, np.argmax(prob[empty], axis=1)] = 1 metrics = multilabel_metrics(Y[test], pred) metrics.update({ "task": "object_relevance", "input": "all non-caption modalities -> current relevant object set", "split": "chronological", "num_windows": int(len(rows)), "num_train_windows": int(len(train)), "num_test_windows": int(len(test)), "num_objects": int(len(vocab)), "feature_dim": int(Xo.shape[1]), }) metrics.update(neural_common_metrics(args, result, "z-score -> MLP sigmoid multilabel")) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) write_json(out_dir / "history.json", result["history"]) write_json(out_dir / "object_vocab.json", list(vocab.keys())) rows_out = [] names = list(vocab.keys()) for local_i, global_i in enumerate(test): true_objs = [names[j] for j in np.flatnonzero(Y[global_i] > 0)] pred_objs = [names[j] for j in np.flatnonzero(pred[local_i] > 0)] rows_out.append({ "window_index": int(global_i), "start_frame": rows[int(global_i)]["start_frame"], "end_frame": rows[int(global_i)]["end_frame"], "true_objects": "|".join(true_objs), "predicted_objects": "|".join(pred_objs), }) write_csv(out_dir / "predictions.csv", rows_out, ["window_index", "start_frame", "end_frame", "true_objects", "predicted_objects"]) save_neural_model(out_dir, result, "multilabel", {"object_vocab": names}) return metrics def neural_projection_task( out_dir: Path, X_in: np.ndarray, Y_out: np.ndarray, args: argparse.Namespace, task_name: str, input_desc: str, output_desc: str | None = None, retrieval_query: np.ndarray | None = None, retrieval_candidates: np.ndarray | None = None, retrieval_pred_as_query: bool = False, ) -> dict: from neural_task_models import train_regressor train, test = chronological_split_indices(len(X_in), args.test_fraction) result = train_regressor(X_in, Y_out, train, test, neural_config(args)) pred = result["pred"] if retrieval_query is not None and retrieval_candidates is not None: if retrieval_pred_as_query: metrics = retrieval_metrics(pred, retrieval_candidates[test], np.arange(len(test))) else: metrics = retrieval_metrics(retrieval_query[test], pred, np.arange(len(test))) else: metrics = regression_metrics(Y_out[test], pred) metrics.update({ "task": task_name, "input": input_desc, "split": "chronological", "num_train_windows": int(len(train)), "num_test_windows": int(len(test)), "target_dim": int(Y_out.shape[1]), }) if output_desc is not None: metrics["output"] = output_desc metrics.update(neural_common_metrics(args, result, "z-score -> MLP projection/regression")) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) write_json(out_dir / "history.json", result["history"]) np.savez_compressed(out_dir / "predictions.npz", y_true=Y_out[test], y_pred=pred, test_window_indices=test) save_neural_model(out_dir, result, "regressor", {"target_dim": int(Y_out.shape[1])}) return metrics def neural_binary_classification_from_arrays(out_dir: Path, X: np.ndarray, y: np.ndarray, args: argparse.Namespace, task: str, input_desc: str) -> dict: from neural_task_models import train_classifier train, test = chronological_split_indices(len(y), args.test_fraction) result = train_classifier(X, y.astype(np.int64), train, test, n_classes=2, config=neural_config(args), use_class_weights=True) pred = result["pred"] prob = result["prob"] metrics = binary_metrics(y[test], pred) metrics.update({ "task": task, "input": input_desc, "split": "chronological", "num_samples": int(len(y)), "num_train_samples": int(len(train)), "num_test_samples": int(len(test)), "feature_dim": int(X.shape[1]), }) metrics.update(neural_common_metrics(args, result, "z-score -> MLP binary softmax")) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) write_json(out_dir / "history.json", result["history"]) pred_rows = [] for k, idx in enumerate(test): pred_rows.append({"sample_index": int(idx), "true": int(y[idx]), "predicted": int(pred[k]), "prob_positive": float(prob[k, 1])}) write_csv(out_dir / "predictions.csv", pred_rows, ["sample_index", "true", "predicted", "prob_positive"]) save_neural_model(out_dir, result, "classifier", {"class_names": ["negative", "positive"]}) return metrics def run_neural_task(task: str, out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, manifest: list[dict], args: argparse.Namespace) -> dict: if task == "timeline_action": return neural_classification_task(out_dir, X, label_array(rows, "action_label"), rows, args, task, "all modalities -> current action label") if task == "timeline_subtask": return neural_classification_task(out_dir, X, label_array(rows, "subtask_label"), rows, args, task, "all modalities -> current subtask label") if task == "transition_detection": return neural_transition_detection(out_dir, X, rows, ann, args) if task == "next_action": return neural_next_action(out_dir, X, rows, ann, args) if task == "hand_trajectory_forecast": return neural_hand_forecast(out_dir, X, rows, ann, args) if task == "contact_prediction": contacts = ann.get("contacts") if contacts is None: raise ValueError("Contacts not available.") y = [] for row in rows: c = contacts[int(row["start_frame"]):int(row["end_frame"]) + 1] y.append("contact" if np.any(c > 0) else "no_contact") keep = block_indices(manifest, exclude=["body_contacts", "caption_objects_interaction_text"]) return neural_classification_task(out_dir, X[:, keep], np.asarray(y, dtype=object), rows, args, task, "all non-contact/non-caption-label modalities -> any body contact") if task == "object_relevance": return neural_object_relevance(out_dir, X, rows, ann, manifest, args) if task == "caption_grounding": text_idx = block_indices(manifest, include=["caption_objects_interaction_text"]) sensor_idx = block_indices(manifest, exclude=["caption_objects_interaction_text"]) return neural_projection_task( out_dir, X[:, sensor_idx], X[:, text_idx], args, "caption_grounding", "caption objects/interaction text query + candidate sensor windows", "matching time window", retrieval_query=X[:, text_idx], retrieval_candidates=X[:, text_idx], ) if task == "cross_modal_retrieval": motion_idx = block_indices(manifest, include=["hand_", "body_joints", "body_contacts", "camera_", "imu_", "audio_"]) visual_idx = block_indices(manifest, include=["depth_confidence", "video_"]) return neural_projection_task( out_dir, X[:, motion_idx], X[:, visual_idx], args, "cross_modal_retrieval", "motion/IMU/camera/audio query", "matching depth/video window", retrieval_query=X[:, visual_idx], retrieval_candidates=X[:, visual_idx], retrieval_pred_as_query=True, ) if task == "modality_reconstruction": motion_idx = block_indices(manifest, include=["hand_", "body_joints", "body_contacts", "camera_", "imu_", "audio_"]) visual_idx = block_indices(manifest, include=["depth_confidence", "video_"]) return neural_projection_task(out_dir, X[:, motion_idx], X[:, visual_idx], args, "modality_reconstruction", "motion/IMU/camera/audio", "depth/video feature vector") if task == "temporal_order": pairs, y = [], [] for i in range(len(X) - 1): a, b = X[i], X[i + 1] pairs.append(np.concatenate([a, b, b - a])) y.append(1) pairs.append(np.concatenate([b, a, a - b])) y.append(0) return neural_binary_classification_from_arrays(out_dir, np.stack(pairs).astype(np.float32), np.asarray(y, dtype=np.int64), args, "temporal_order", "two adjacent windows -> whether order is correct") if task == "misalignment_detection": motion_idx = block_indices(manifest, include=["hand_", "body_joints", "body_contacts", "camera_", "imu_"]) visual_idx = block_indices(manifest, include=["depth_confidence", "video_", "audio_"]) shift = args.misalignment_shift_windows pairs, y = [], [] limit = len(X) - shift for i in range(limit): pairs.append(np.concatenate([X[i, motion_idx], X[i, visual_idx]])) y.append(1) pairs.append(np.concatenate([X[i, motion_idx], X[i + shift, visual_idx]])) y.append(0) return neural_binary_classification_from_arrays(out_dir, np.stack(pairs).astype(np.float32), np.asarray(y, dtype=np.int64), args, "misalignment_detection", f"motion+visual/audio pair -> aligned vs shifted by {shift} windows") raise ValueError(task) def task_hand_forecast(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, args: argparse.Namespace) -> dict: left = ann.get("hand_left_joints") right = ann.get("hand_right_joints") body = ann.get("smplh_body_joints") if left is None or right is None: raise ValueError("Hand joints not available.") valid_idx, Y = [], [] n_frames = len(left) for i, row in enumerate(rows): future_start = int(row["end_frame"]) + 1 future_end = future_start + args.forecast_frames if future_end > n_frames: continue hand = np.concatenate([left[future_start:future_end], right[future_start:future_end]], axis=1) if body is not None and future_end <= len(body): root = body[future_start:future_end, :1, :] hand = hand - root valid_idx.append(i) Y.append(hand.reshape(-1)) valid_idx = np.asarray(valid_idx, dtype=np.int64) Y = np.stack(Y).astype(np.float32) train, test = chronological_split_indices(len(valid_idx), args.test_fraction) pred, model = ridge_fit_predict(X[valid_idx[train]], Y[train], X[valid_idx[test]], args.ridge_l2) metrics = regression_metrics(Y[test], pred) true_hand = Y[test].reshape(len(test), args.forecast_frames, 42, 3) pred_hand = pred.reshape(len(test), args.forecast_frames, 42, 3) mpjpe = np.linalg.norm(true_hand - pred_hand, axis=-1).mean() final_error = np.linalg.norm(true_hand[:, -1] - pred_hand[:, -1], axis=-1).mean() metrics.update({ "task": "hand_trajectory_forecast", "input": "all modalities at t -> future left/right hand 3D joints", "split": "chronological", "num_windows": int(len(valid_idx)), "num_train_windows": int(len(train)), "num_test_windows": int(len(test)), "forecast_frames": int(args.forecast_frames), "mpjpe": float(mpjpe), "final_frame_mpjpe": float(final_error), "target_dim": int(Y.shape[1]), }) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) np.savez_compressed(out_dir / "predictions.npz", y_true=Y[test], y_pred=pred, test_window_indices=valid_idx[test], **model) return metrics def task_contact_prediction(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, manifest: list[dict], args: argparse.Namespace) -> dict: contacts = ann.get("contacts") if contacts is None: raise ValueError("Contacts not available.") y = [] for row in rows: c = contacts[int(row["start_frame"]):int(row["end_frame"]) + 1] y.append("contact" if np.any(c > 0) else "no_contact") keep = block_indices(manifest, exclude=["body_contacts", "caption_objects_interaction_text"]) return classification_task(out_dir, X[:, keep], np.asarray(y, dtype=object), rows, args, "contact_prediction", "all non-contact/non-caption-label modalities -> any body contact") def extract_objects(info: dict) -> list[str]: objects = info.get("objects") if isinstance(objects, list): return [str(x).strip() for x in objects if str(x).strip()] if objects: return [str(objects).strip()] return [] def sigmoid(z: np.ndarray) -> np.ndarray: return 1.0 / (1.0 + np.exp(-np.clip(z, -40, 40))) def train_multilabel_logistic(X: np.ndarray, Y: np.ndarray, epochs: int, lr: float, l2: float, seed: int): rng = np.random.default_rng(seed) n, d = X.shape c = Y.shape[1] W = rng.normal(0, 0.01, size=(d, c)).astype(np.float32) b = np.zeros(c, dtype=np.float32) counts = Y.sum(axis=0) pos_weight = (n - counts) / np.maximum(counts, 1.0) pos_weight = np.clip(pos_weight, 1.0, 20.0).astype(np.float32) history = [] for epoch in range(1, epochs + 1): P = sigmoid(X @ W + b) weights = np.where(Y > 0, pos_weight[None, :], 1.0) diff = (P - Y) * weights / n W -= lr * (X.T @ diff + l2 * W) b -= lr * diff.sum(axis=0) if epoch == 1 or epoch == epochs or epoch % max(1, epochs // 5) == 0: pred = (P >= 0.5).astype(np.float32) history.append({"epoch": epoch, **multilabel_metrics(Y, pred)}) return W.astype(np.float32), b.astype(np.float32), history def multilabel_metrics(Y: np.ndarray, P: np.ndarray) -> dict: Y = Y.astype(np.int64) P = P.astype(np.int64) tp = int(np.sum((Y == 1) & (P == 1))) fp = int(np.sum((Y == 0) & (P == 1))) fn = int(np.sum((Y == 1) & (P == 0))) precision = tp / (tp + fp) if tp + fp else 0.0 recall = tp / (tp + fn) if tp + fn else 0.0 micro_f1 = 2 * precision * recall / (precision + recall) if precision + recall else 0.0 per_f1 = [] for j in range(Y.shape[1]): tpj = np.sum((Y[:, j] == 1) & (P[:, j] == 1)) fpj = np.sum((Y[:, j] == 0) & (P[:, j] == 1)) fnj = np.sum((Y[:, j] == 1) & (P[:, j] == 0)) pj = tpj / (tpj + fpj) if tpj + fpj else 0.0 rj = tpj / (tpj + fnj) if tpj + fnj else 0.0 per_f1.append(2 * pj * rj / (pj + rj) if pj + rj else 0.0) exact = float(np.mean(np.all(Y == P, axis=1))) return {"micro_f1": float(micro_f1), "macro_f1": float(np.mean(per_f1)), "exact_match": exact, "precision": precision, "recall": recall} def task_object_relevance(out_dir: Path, X: np.ndarray, rows: list[dict], ann: dict, manifest: list[dict], args: argparse.Namespace) -> dict: frame_info = ann["caption_frame_info_map"] vocab = OrderedDict() labels = [] for row in rows: counts = Counter() for frame in range(int(row["start_frame"]), int(row["end_frame"]) + 1): counts.update(extract_objects(frame_info.get(frame, {}))) objects = [obj for obj, count in counts.items() if count > 0] for obj in objects: if obj not in vocab: vocab[obj] = len(vocab) labels.append(objects) if not vocab: raise ValueError("No object labels found.") Y = np.zeros((len(rows), len(vocab)), dtype=np.float32) for i, objects in enumerate(labels): for obj in objects: Y[i, vocab[obj]] = 1.0 keep = block_indices(manifest, exclude=["caption_objects_interaction_text"]) Xo = X[:, keep] train, test = chronological_split_indices(len(rows), args.test_fraction) mean, std = fit_scaler(Xo[train]) Xs = (Xo - mean) / std W, b, history = train_multilabel_logistic(Xs[train], Y[train], args.epochs, 0.05, args.l2, args.seed) prob = sigmoid(Xs[test] @ W + b) pred = (prob >= 0.5).astype(np.float32) # Ensure at least one object is emitted per row. empty = np.where(pred.sum(axis=1) == 0)[0] if len(empty): pred[empty, np.argmax(prob[empty], axis=1)] = 1 metrics = multilabel_metrics(Y[test], pred) metrics.update({ "task": "object_relevance", "input": "all non-caption modalities -> current relevant object set", "split": "chronological", "num_windows": int(len(rows)), "num_train_windows": int(len(train)), "num_test_windows": int(len(test)), "num_objects": int(len(vocab)), }) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) write_json(out_dir / "object_vocab.json", list(vocab.keys())) rows_out = [] names = list(vocab.keys()) for local_i, global_i in enumerate(test): true_objs = [names[j] for j in np.flatnonzero(Y[global_i] > 0)] pred_objs = [names[j] for j in np.flatnonzero(pred[local_i] > 0)] rows_out.append({ "window_index": int(global_i), "start_frame": rows[int(global_i)]["start_frame"], "end_frame": rows[int(global_i)]["end_frame"], "true_objects": "|".join(true_objs), "predicted_objects": "|".join(pred_objs), }) write_csv(out_dir / "predictions.csv", rows_out, ["window_index", "start_frame", "end_frame", "true_objects", "predicted_objects"]) np.savez_compressed(out_dir / "model.npz", mean=mean, std=std, W=W, b=b, object_vocab=np.asarray(names, dtype=object), history=np.asarray(history, dtype=object)) return metrics def normalize_rows(A: np.ndarray) -> np.ndarray: norm = np.linalg.norm(A, axis=1, keepdims=True) return A / np.maximum(norm, 1e-8) def retrieval_metrics(query: np.ndarray, candidates: np.ndarray, positive_indices: np.ndarray, topks=(1, 5, 10)) -> dict: Q = normalize_rows(query) C = normalize_rows(candidates) sims = Q @ C.T ranks = [] for i, pos in enumerate(positive_indices): order = np.argsort(-sims[i]) rank = int(np.where(order == pos)[0][0]) + 1 ranks.append(rank) ranks = np.asarray(ranks) out = { "mrr": float(np.mean(1.0 / ranks)), "median_rank": float(np.median(ranks)), "mean_rank": float(np.mean(ranks)), "num_queries": int(len(ranks)), } for k in topks: out[f"top{k}_accuracy"] = float(np.mean(ranks <= k)) return out def task_caption_grounding(out_dir: Path, X: np.ndarray, manifest: list[dict], args: argparse.Namespace) -> dict: text_idx = block_indices(manifest, include=["caption_objects_interaction_text"]) sensor_idx = block_indices(manifest, exclude=["caption_objects_interaction_text"]) train, test = chronological_split_indices(len(X), args.test_fraction) pred_text, model = ridge_fit_predict(X[train][:, sensor_idx], X[train][:, text_idx], X[test][:, sensor_idx], args.ridge_l2) # Query is true text; candidates are sensor windows projected into text space. metrics = retrieval_metrics(X[test][:, text_idx], pred_text, np.arange(len(test))) metrics.update({ "task": "caption_grounding", "input": "caption objects/interaction text query + candidate sensor windows", "output": "matching time window", "split": "chronological", "num_train_windows": int(len(train)), "num_test_windows": int(len(test)), }) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) np.savez_compressed(out_dir / "model.npz", **model) return metrics def task_cross_modal_retrieval(out_dir: Path, X: np.ndarray, manifest: list[dict], args: argparse.Namespace) -> dict: motion_idx = block_indices(manifest, include=["hand_", "body_joints", "body_contacts", "camera_", "imu_", "audio_"]) visual_idx = block_indices(manifest, include=["depth_confidence", "video_"]) train, test = chronological_split_indices(len(X), args.test_fraction) pred_visual, model = ridge_fit_predict(X[train][:, motion_idx], X[train][:, visual_idx], X[test][:, motion_idx], args.ridge_l2) metrics = retrieval_metrics(pred_visual, X[test][:, visual_idx], np.arange(len(test))) metrics.update({ "task": "cross_modal_retrieval", "input": "motion/IMU/camera/audio query", "output": "matching depth/video window", "split": "chronological", "num_train_windows": int(len(train)), "num_test_windows": int(len(test)), }) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) np.savez_compressed(out_dir / "model.npz", **model) return metrics def task_modality_reconstruction(out_dir: Path, X: np.ndarray, manifest: list[dict], args: argparse.Namespace) -> dict: motion_idx = block_indices(manifest, include=["hand_", "body_joints", "body_contacts", "camera_", "imu_", "audio_"]) visual_idx = block_indices(manifest, include=["depth_confidence", "video_"]) train, test = chronological_split_indices(len(X), args.test_fraction) pred, model = ridge_fit_predict(X[train][:, motion_idx], X[train][:, visual_idx], X[test][:, motion_idx], args.ridge_l2) metrics = regression_metrics(X[test][:, visual_idx], pred) metrics.update({ "task": "modality_reconstruction", "input": "motion/IMU/camera/audio", "output": "depth/video feature vector", "split": "chronological", "num_train_windows": int(len(train)), "num_test_windows": int(len(test)), "target_dim": int(len(visual_idx)), }) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) np.savez_compressed(out_dir / "predictions.npz", y_true=X[test][:, visual_idx], y_pred=pred, **model) return metrics def binary_classification_from_arrays(out_dir: Path, X: np.ndarray, y: np.ndarray, args: argparse.Namespace, task: str, input_desc: str) -> dict: train, test = chronological_split_indices(len(y), args.test_fraction) mean, std = fit_scaler(X[train]) Xs = (X - mean) / std W, b, history = train_softmax_classifier( Xs[train], y[train].astype(np.int64), n_classes=2, epochs=args.epochs, lr=args.learning_rate, l2=args.l2, use_class_weights=True, seed=args.seed, ) pred, prob = predict(Xs[test], W, b) metrics = binary_metrics(y[test], pred) metrics.update({ "task": task, "input": input_desc, "split": "chronological", "num_samples": int(len(y)), "num_train_samples": int(len(train)), "num_test_samples": int(len(test)), "train_final_accuracy": float(history[-1]["train_accuracy"]), }) out_dir.mkdir(parents=True, exist_ok=True) write_json(out_dir / "metrics.json", metrics) pred_rows = [] for k, idx in enumerate(test): pred_rows.append({"sample_index": int(idx), "true": int(y[idx]), "predicted": int(pred[k]), "prob_positive": float(prob[k, 1])}) write_csv(out_dir / "predictions.csv", pred_rows, ["sample_index", "true", "predicted", "prob_positive"]) np.savez_compressed(out_dir / "model.npz", mean=mean, std=std, W=W, b=b) return metrics def task_temporal_order(out_dir: Path, X: np.ndarray, args: argparse.Namespace) -> dict: pairs, y = [], [] for i in range(len(X) - 1): a, b = X[i], X[i + 1] pairs.append(np.concatenate([a, b, b - a])) y.append(1) pairs.append(np.concatenate([b, a, a - b])) y.append(0) return binary_classification_from_arrays(out_dir, np.stack(pairs).astype(np.float32), np.asarray(y, dtype=np.int64), args, "temporal_order", "two adjacent windows -> whether order is correct") def task_misalignment(out_dir: Path, X: np.ndarray, manifest: list[dict], args: argparse.Namespace) -> dict: motion_idx = block_indices(manifest, include=["hand_", "body_joints", "body_contacts", "camera_", "imu_"]) visual_idx = block_indices(manifest, include=["depth_confidence", "video_", "audio_"]) shift = args.misalignment_shift_windows pairs, y = [], [] limit = len(X) - shift for i in range(limit): pairs.append(np.concatenate([X[i, motion_idx], X[i, visual_idx]])) y.append(1) pairs.append(np.concatenate([X[i, motion_idx], X[i + shift, visual_idx]])) y.append(0) return binary_classification_from_arrays(out_dir, np.stack(pairs).astype(np.float32), np.asarray(y, dtype=np.int64), args, "misalignment_detection", f"motion+visual/audio pair -> aligned vs shifted by {shift} windows") def main() -> int: args = parse_args() add_toolkit_to_path(args.workspace) from data_loader import load_from_annotation_hdf5 args.output_dir.mkdir(parents=True, exist_ok=True) tasks = selected_tasks(args.tasks) print(f"Loading annotation: {args.annotation}") ann = load_from_annotation_hdf5(args.annotation, 0, None, load_slam_point_cloud=True) extras, available_modalities = prepare_modalities(args, ann) print("Building shared all-modality windows") X, rows, manifest = build_windows(args, ann, extras) write_json(args.output_dir / "available_modalities.json", available_modalities) write_json(args.output_dir / "feature_manifest.json", manifest) write_csv(args.output_dir / "windows.csv", rows, ["window_index", "start_frame", "end_frame", "center_frame", "action_label", "action_fraction", "subtask_label", "subtask_fraction"]) np.savez_compressed(args.output_dir / "shared_windows.npz", X=X, starts=np.asarray([r["start_frame"] for r in rows]), ends=np.asarray([r["end_frame"] for r in rows])) summary = { "annotation": portable_path(args.annotation, args.workspace), "num_frames": int(len(ann["img_names"])), "num_windows": int(len(rows)), "feature_dim": int(X.shape[1]), "window_frames": int(args.window_frames), "stride_frames": int(args.stride_frames), "tasks": {}, } if args.include_neural: summary["neural_model"] = { "name": args.neural_output_name, "type": "lightweight PyTorch MLP over shared window features", "epochs": int(args.neural_epochs), "hidden_dim": int(args.neural_hidden_dim), "batch_size": int(args.neural_batch_size), "learning_rate": float(args.neural_learning_rate), "weight_decay": float(args.neural_weight_decay), "dropout": float(args.neural_dropout), "device": args.neural_device, } summary["neural_tasks"] = {} print(f"Windows: {len(rows)}, feature_dim: {X.shape[1]}") for task in tasks: print(f"\nRunning task: {task}") out = args.output_dir / task try: if task == "timeline_action": metrics = classification_task(out, X, label_array(rows, "action_label"), rows, args, task, "all modalities -> current action label") elif task == "timeline_subtask": metrics = classification_task(out, X, label_array(rows, "subtask_label"), rows, args, task, "all modalities -> current subtask label") elif task == "transition_detection": metrics = task_transition_detection(out, X, rows, ann, args) elif task == "next_action": metrics = task_next_action(out, X, rows, ann, args) elif task == "hand_trajectory_forecast": metrics = task_hand_forecast(out, X, rows, ann, args) elif task == "contact_prediction": metrics = task_contact_prediction(out, X, rows, ann, manifest, args) elif task == "object_relevance": metrics = task_object_relevance(out, X, rows, ann, manifest, args) elif task == "caption_grounding": metrics = task_caption_grounding(out, X, manifest, args) elif task == "cross_modal_retrieval": metrics = task_cross_modal_retrieval(out, X, manifest, args) elif task == "modality_reconstruction": metrics = task_modality_reconstruction(out, X, manifest, args) elif task == "temporal_order": metrics = task_temporal_order(out, X, args) elif task == "misalignment_detection": metrics = task_misalignment(out, X, manifest, args) else: raise ValueError(task) summary["tasks"][task] = metrics key_metrics = {k: metrics[k] for k in ("accuracy", "macro_f1", "f1", "mpjpe", "mrr", "r2", "micro_f1") if k in metrics} print(f" done: {key_metrics}") except Exception as exc: summary["tasks"][task] = {"error": str(exc)} write_json(out / "error.json", {"task": task, "error": str(exc)}) print(f" error: {exc}") if args.include_neural: neural_out = args.output_dir / args.neural_output_name / task try: print(f" running neural baseline: {args.neural_output_name}") neural_metrics = run_neural_task(task, neural_out, X, rows, ann, manifest, args) summary["neural_tasks"][task] = neural_metrics neural_key_metrics = {k: neural_metrics[k] for k in ("accuracy", "macro_f1", "f1", "mpjpe", "mrr", "r2", "micro_f1") if k in neural_metrics} print(f" neural done: {neural_key_metrics}") except Exception as exc: summary["neural_tasks"][task] = {"error": str(exc)} write_json(neural_out / "error.json", {"task": task, "error": str(exc), "model": args.neural_output_name}) print(f" neural error: {exc}") write_json(args.output_dir / "summary_report.json", summary) print(f"\nSuite artifacts written to: {args.output_dir}") return 0 if __name__ == "__main__": raise SystemExit(main())