""" RoboMind VLA — Validation & Test Suite. Runs on Modal (CPU for analysis, GPU for inference). Tests model accuracy per-tier, hybrid blending quality, and end-to-end pipeline. Run: modal run validation.py """ from __future__ import annotations import modal image = ( modal.Image.debian_slim(python_version="3.11") .apt_install("ffmpeg") .pip_install( "torch==2.4.0", "torchvision==0.19.0", "transformers==4.40.0", "peft==0.11.1", "accelerate==0.30.1", "pillow", "sentencepiece", "huggingface_hub", "numpy<2", "opencv-python-headless", "datasets", "scipy", "scikit-learn", ) .run_commands( "python -c \"" "import os, sys; " "d = os.path.join(sys.prefix, 'lib/python3.11/site-packages/flash_attn'); " "os.makedirs(d, exist_ok=True); " "open(os.path.join(d, '__init__.py'), 'w').write(''); " "open(os.path.join(d, 'flash_attn_interface.py'), 'w').write(" "'def flash_attn_func(*a, **kw): raise NotImplementedError\\n" "def flash_attn_varlen_func(*a, **kw): raise NotImplementedError\\n'); " "print('flash_attn stub created')\"" ) .add_local_file("hybrid_judge.py", "/root/hybrid_judge.py") ) app = modal.App("robomind-validation") volume = modal.Volume.from_name("robomind-data", create_if_missing=True) ADAPTER_REPO = "mitvho09/robomind-minicpm-loco-lora" DATASET_REPO = "mitvho09/robomind-loco-judge-dataset" INSTRUCTION_PROMPT = ( "You are RoboMind VLA, a vision-language reward model for robot locomotion. " "You are shown keyframes from a MuJoCo locomotion rollout. " "The robot was commanded to \"walk forward\". Analyze the rollout and " "respond with ONLY a JSON object with these exact keys: timestep_range, " "phase, command, command_followed, stability, fall_risk, gait_quality, " "predicted_reward, anomaly, explanation." ) @app.function( image=image, gpu="A100-40GB", volumes={"/data": volume}, secrets=[modal.Secret.from_name("huggingface-secret")], timeout=3600, ) def run_validation(): """Run full validation: VLM predictions vs ground-truth labels.""" import json import os import sys import re import numpy as np import torch from datasets import load_dataset from huggingface_hub import login from peft import PeftModel from PIL import Image from transformers import AutoModel, AutoTokenizer from scipy.stats import spearmanr, pearsonr from sklearn.metrics import mean_absolute_error, mean_squared_error sys.path.insert(0, "/root") from hybrid_judge import hybrid_judge, hybrid_to_dict hf_token = os.environ.get("HF_TOKEN") if hf_token: login(token=hf_token) # --- 1. Load model --- print("[val] loading model...") tokenizer = AutoTokenizer.from_pretrained( "openbmb/MiniCPM-V-2_6", trust_remote_code=True ) base_model = AutoModel.from_pretrained( "openbmb/MiniCPM-V-2_6", trust_remote_code=True, torch_dtype=torch.bfloat16, device_map="auto", ) model = PeftModel.from_pretrained(base_model, ADAPTER_REPO) model.eval() print("[val] model loaded") # --- 2. Load dataset --- print("[val] loading dataset...") ds = load_dataset(DATASET_REPO, split="train") # --- 3. Run predictions --- print(f"[val] running predictions on {len(ds)} samples...") results = [] tier_results = {} for idx, row in enumerate(ds): images = row["images"][:6] target = json.loads(row["target_json"]) env = row["env"] tier = row["tier"] gt_reward = target.get("predicted_reward", 0.5) gt_norm = row["gt_norm_reward"] fell = row["fell"] # Run VLM inference n = min(len(images), 6) image_tokens = "\n".join(f"" for k in range(n)) user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}" with torch.no_grad(): response = model.chat( image=images[:n], msgs=[{"role": "user", "content": user_content}], tokenizer=tokenizer, max_new_tokens=512, ) response = response if isinstance(response, str) else str(response) # Parse VLM output parsed = {} json_match = re.search(r'\{[^{}]*\}', response, re.DOTALL) if json_match: try: parsed = json.loads(json_match.group()) except json.JSONDecodeError: pass # Run hybrid judge hybrid = hybrid_judge( vlm_parsed=parsed, ep_return=row.get("gt_return", 0), min_return=0, max_return=1, fell=fell, num_steps=0, tier=tier, env=env, ) hybrid_dict = hybrid_to_dict(hybrid) vlm_reward_raw = parsed.get("predicted_reward", 0.5) if isinstance(vlm_reward_raw, str): try: vlm_reward_raw = float(vlm_reward_raw) except ValueError: vlm_reward_raw = 0.5 result = { "idx": idx, "env": env, "tier": tier, "gt_reward": gt_reward, "gt_norm": gt_norm, "vlm_reward": hybrid.vlm_reward, "rule_reward": hybrid.rule_reward, "hybrid_reward": hybrid.blended_reward, "vlm_raw": vlm_reward_raw, "fell": fell, "stability": parsed.get("stability", "unknown"), } results.append(result) if tier not in tier_results: tier_results[tier] = [] tier_results[tier].append(result) if (idx + 1) % 10 == 0: print(f" [{idx+1}/{len(ds)}] processed") # --- 4. Compute metrics --- print("\n" + "=" * 70) print("VALIDATION RESULTS") print("=" * 70) gt_rewards = np.array([r["gt_reward"] for r in results]) hybrid_rewards = np.array([r["hybrid_reward"] for r in results]) vlm_rewards = np.array([r["vlm_reward"] for r in results]) rule_rewards = np.array([r["rule_reward"] for r in results]) # Overall metrics corr_hybrid, _ = spearmanr(gt_rewards, hybrid_rewards) corr_vlm, _ = spearmanr(gt_rewards, vlm_rewards) corr_rule, _ = spearmanr(gt_rewards, rule_rewards) mae_hybrid = mean_absolute_error(gt_rewards, hybrid_rewards) mae_vlm = mean_absolute_error(gt_rewards, vlm_rewards) print(f"\nOverall ({len(results)} samples):") print(f" Spearman correlation:") print(f" Hybrid: {corr_hybrid:.4f}") print(f" VLM: {corr_vlm:.4f}") print(f" Rule: {corr_rule:.4f}") print(f" MAE:") print(f" Hybrid: {mae_hybrid:.4f}") print(f" VLM: {mae_vlm:.4f}") # Per-tier metrics print(f"\nPer-tier breakdown:") print(f" {'Tier':<12} {'N':>4} {'GT_mean':>8} {'Hybrid_mean':>12} {'VLM_mean':>10} {'Rule_mean':>10} {'MAE_hyb':>8}") for tier in ["expert", "medium", "simple"]: tr = tier_results.get(tier, []) if not tr: continue gt = np.array([r["gt_reward"] for r in tr]) hyb = np.array([r["hybrid_reward"] for r in tr]) vlm = np.array([r["vlm_reward"] for r in tr]) rul = np.array([r["rule_reward"] for r in tr]) mae = mean_absolute_error(gt, hyb) print(f" {tier:<12} {len(tr):>4} {gt.mean():>8.3f} {hyb.mean():>12.3f} {vlm.mean():>10.3f} {rul.mean():>10.3f} {mae:>8.3f}") # Per-env metrics envs = sorted(set(r["env"] for r in results)) print(f"\nPer-env breakdown:") print(f" {'Env':<15} {'N':>4} {'GT_mean':>8} {'Hybrid_mean':>12} {'Spearman':>10}") for env in envs: er = [r for r in results if r["env"] == env] gt = np.array([r["gt_reward"] for r in er]) hyb = np.array([r["hybrid_reward"] for r in er]) if len(gt) > 2: corr, _ = spearmanr(gt, hyb) else: corr = float("nan") print(f" {env:<15} {len(er):>4} {gt.mean():>8.3f} {hyb.mean():>12.3f} {corr:>10.4f}") # Fall detection accuracy fell_samples = [r for r in results if r["fell"]] non_fell_samples = [r for r in results if not r["fell"]] if fell_samples: fell_rewards = [r["hybrid_reward"] for r in fell_samples] print(f"\nFall detection:") print(f" Fell episodes: {len(fell_samples)}, avg hybrid reward: {np.mean(fell_rewards):.3f}") if non_fell_samples: non_fell_rewards = [r["hybrid_reward"] for r in non_fell_samples] print(f" Non-fell episodes: {len(non_fell_samples)}, avg hybrid reward: {np.mean(non_fell_rewards):.3f}") # Tier separation test expert_rewards = np.array([r["hybrid_reward"] for r in results if r["tier"] == "expert"]) simple_rewards = np.array([r["hybrid_reward"] for r in results if r["tier"] == "simple"]) if len(expert_rewards) > 0 and len(simple_rewards) > 0: sep = expert_rewards.mean() - simple_rewards.mean() print(f"\nTier separation (expert - simple): {sep:.3f}") print(f" PASS: {sep > 0.1}") # Save results report = { "total_samples": len(results), "overall": { "spearman_hybrid": corr_hybrid, "spearman_vlm": corr_vlm, "spearman_rule": corr_rule, "mae_hybrid": mae_hybrid, "mae_vlm": mae_vlm, }, "per_tier": { tier: { "count": len(tier_results.get(tier, [])), "gt_mean": float(np.mean([r["gt_reward"] for r in tier_results.get(tier, [])])) if tier_results.get(tier) else 0, "hybrid_mean": float(np.mean([r["hybrid_reward"] for r in tier_results.get(tier, [])])) if tier_results.get(tier) else 0, } for tier in ["expert", "medium", "simple"] }, "tier_separation": float(expert_rewards.mean() - simple_rewards.mean()) if len(expert_rewards) > 0 and len(simple_rewards) > 0 else 0, "results": results, } report_path = "/data/validation_report.json" with open(report_path, "w") as f: json.dump(report, f, indent=2) print(f"\n[val] report saved to {report_path}") volume.commit() return report @app.local_entrypoint() def main(): result = run_validation.remote() print("\n=== SUMMARY ===") print(f"Samples: {result['total_samples']}") print(f"Spearman (hybrid): {result['overall']['spearman_hybrid']:.4f}") print(f"Spearman (VLM): {result['overall']['spearman_vlm']:.4f}") print(f"Tier separation: {result['tier_separation']:.3f}")