| """ |
| RoboMind VLA — Validation & Test Suite. |
| |
| Runs on Modal (CPU for analysis, GPU for inference). |
| Tests model accuracy per-tier, hybrid blending quality, and end-to-end pipeline. |
| |
| Run: |
| modal run validation.py |
| """ |
| from __future__ import annotations |
|
|
| import modal |
|
|
| image = ( |
| modal.Image.debian_slim(python_version="3.11") |
| .apt_install("ffmpeg") |
| .pip_install( |
| "torch==2.4.0", |
| "torchvision==0.19.0", |
| "transformers==4.40.0", |
| "peft==0.11.1", |
| "accelerate==0.30.1", |
| "pillow", |
| "sentencepiece", |
| "huggingface_hub", |
| "numpy<2", |
| "opencv-python-headless", |
| "datasets", |
| "scipy", |
| "scikit-learn", |
| ) |
| .run_commands( |
| "python -c \"" |
| "import os, sys; " |
| "d = os.path.join(sys.prefix, 'lib/python3.11/site-packages/flash_attn'); " |
| "os.makedirs(d, exist_ok=True); " |
| "open(os.path.join(d, '__init__.py'), 'w').write(''); " |
| "open(os.path.join(d, 'flash_attn_interface.py'), 'w').write(" |
| "'def flash_attn_func(*a, **kw): raise NotImplementedError\\n" |
| "def flash_attn_varlen_func(*a, **kw): raise NotImplementedError\\n'); " |
| "print('flash_attn stub created')\"" |
| ) |
| .add_local_file("hybrid_judge.py", "/root/hybrid_judge.py") |
| ) |
|
|
| app = modal.App("robomind-validation") |
| volume = modal.Volume.from_name("robomind-data", create_if_missing=True) |
| ADAPTER_REPO = "mitvho09/robomind-minicpm-loco-lora" |
| DATASET_REPO = "mitvho09/robomind-loco-judge-dataset" |
|
|
| INSTRUCTION_PROMPT = ( |
| "You are RoboMind VLA, a vision-language reward model for robot locomotion. " |
| "You are shown keyframes from a MuJoCo locomotion rollout. " |
| "The robot was commanded to \"walk forward\". Analyze the rollout and " |
| "respond with ONLY a JSON object with these exact keys: timestep_range, " |
| "phase, command, command_followed, stability, fall_risk, gait_quality, " |
| "predicted_reward, anomaly, explanation." |
| ) |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="A100-40GB", |
| volumes={"/data": volume}, |
| secrets=[modal.Secret.from_name("huggingface-secret")], |
| timeout=3600, |
| ) |
| def run_validation(): |
| """Run full validation: VLM predictions vs ground-truth labels.""" |
| import json |
| import os |
| import sys |
| import re |
|
|
| import numpy as np |
| import torch |
| from datasets import load_dataset |
| from huggingface_hub import login |
| from peft import PeftModel |
| from PIL import Image |
| from transformers import AutoModel, AutoTokenizer |
| from scipy.stats import spearmanr, pearsonr |
| from sklearn.metrics import mean_absolute_error, mean_squared_error |
|
|
| sys.path.insert(0, "/root") |
| from hybrid_judge import hybrid_judge, hybrid_to_dict |
|
|
| hf_token = os.environ.get("HF_TOKEN") |
| if hf_token: |
| login(token=hf_token) |
|
|
| |
| print("[val] loading model...") |
| tokenizer = AutoTokenizer.from_pretrained( |
| "openbmb/MiniCPM-V-2_6", trust_remote_code=True |
| ) |
| base_model = AutoModel.from_pretrained( |
| "openbmb/MiniCPM-V-2_6", |
| trust_remote_code=True, |
| torch_dtype=torch.bfloat16, |
| device_map="auto", |
| ) |
| model = PeftModel.from_pretrained(base_model, ADAPTER_REPO) |
| model.eval() |
| print("[val] model loaded") |
|
|
| |
| print("[val] loading dataset...") |
| ds = load_dataset(DATASET_REPO, split="train") |
|
|
| |
| print(f"[val] running predictions on {len(ds)} samples...") |
| results = [] |
| tier_results = {} |
|
|
| for idx, row in enumerate(ds): |
| images = row["images"][:6] |
| target = json.loads(row["target_json"]) |
| env = row["env"] |
| tier = row["tier"] |
| gt_reward = target.get("predicted_reward", 0.5) |
| gt_norm = row["gt_norm_reward"] |
| fell = row["fell"] |
|
|
| |
| n = min(len(images), 6) |
| image_tokens = "\n".join(f"<image_{k:02d}>" for k in range(n)) |
| user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}" |
|
|
| with torch.no_grad(): |
| response = model.chat( |
| image=images[:n], |
| msgs=[{"role": "user", "content": user_content}], |
| tokenizer=tokenizer, |
| max_new_tokens=512, |
| ) |
| response = response if isinstance(response, str) else str(response) |
|
|
| |
| parsed = {} |
| json_match = re.search(r'\{[^{}]*\}', response, re.DOTALL) |
| if json_match: |
| try: |
| parsed = json.loads(json_match.group()) |
| except json.JSONDecodeError: |
| pass |
|
|
| |
| hybrid = hybrid_judge( |
| vlm_parsed=parsed, |
| ep_return=row.get("gt_return", 0), |
| min_return=0, |
| max_return=1, |
| fell=fell, |
| num_steps=0, |
| tier=tier, |
| env=env, |
| ) |
| hybrid_dict = hybrid_to_dict(hybrid) |
|
|
| vlm_reward_raw = parsed.get("predicted_reward", 0.5) |
| if isinstance(vlm_reward_raw, str): |
| try: |
| vlm_reward_raw = float(vlm_reward_raw) |
| except ValueError: |
| vlm_reward_raw = 0.5 |
|
|
| result = { |
| "idx": idx, |
| "env": env, |
| "tier": tier, |
| "gt_reward": gt_reward, |
| "gt_norm": gt_norm, |
| "vlm_reward": hybrid.vlm_reward, |
| "rule_reward": hybrid.rule_reward, |
| "hybrid_reward": hybrid.blended_reward, |
| "vlm_raw": vlm_reward_raw, |
| "fell": fell, |
| "stability": parsed.get("stability", "unknown"), |
| } |
| results.append(result) |
|
|
| if tier not in tier_results: |
| tier_results[tier] = [] |
| tier_results[tier].append(result) |
|
|
| if (idx + 1) % 10 == 0: |
| print(f" [{idx+1}/{len(ds)}] processed") |
|
|
| |
| print("\n" + "=" * 70) |
| print("VALIDATION RESULTS") |
| print("=" * 70) |
|
|
| gt_rewards = np.array([r["gt_reward"] for r in results]) |
| hybrid_rewards = np.array([r["hybrid_reward"] for r in results]) |
| vlm_rewards = np.array([r["vlm_reward"] for r in results]) |
| rule_rewards = np.array([r["rule_reward"] for r in results]) |
|
|
| |
| corr_hybrid, _ = spearmanr(gt_rewards, hybrid_rewards) |
| corr_vlm, _ = spearmanr(gt_rewards, vlm_rewards) |
| corr_rule, _ = spearmanr(gt_rewards, rule_rewards) |
| mae_hybrid = mean_absolute_error(gt_rewards, hybrid_rewards) |
| mae_vlm = mean_absolute_error(gt_rewards, vlm_rewards) |
|
|
| print(f"\nOverall ({len(results)} samples):") |
| print(f" Spearman correlation:") |
| print(f" Hybrid: {corr_hybrid:.4f}") |
| print(f" VLM: {corr_vlm:.4f}") |
| print(f" Rule: {corr_rule:.4f}") |
| print(f" MAE:") |
| print(f" Hybrid: {mae_hybrid:.4f}") |
| print(f" VLM: {mae_vlm:.4f}") |
|
|
| |
| print(f"\nPer-tier breakdown:") |
| print(f" {'Tier':<12} {'N':>4} {'GT_mean':>8} {'Hybrid_mean':>12} {'VLM_mean':>10} {'Rule_mean':>10} {'MAE_hyb':>8}") |
| for tier in ["expert", "medium", "simple"]: |
| tr = tier_results.get(tier, []) |
| if not tr: |
| continue |
| gt = np.array([r["gt_reward"] for r in tr]) |
| hyb = np.array([r["hybrid_reward"] for r in tr]) |
| vlm = np.array([r["vlm_reward"] for r in tr]) |
| rul = np.array([r["rule_reward"] for r in tr]) |
| mae = mean_absolute_error(gt, hyb) |
| print(f" {tier:<12} {len(tr):>4} {gt.mean():>8.3f} {hyb.mean():>12.3f} {vlm.mean():>10.3f} {rul.mean():>10.3f} {mae:>8.3f}") |
|
|
| |
| envs = sorted(set(r["env"] for r in results)) |
| print(f"\nPer-env breakdown:") |
| print(f" {'Env':<15} {'N':>4} {'GT_mean':>8} {'Hybrid_mean':>12} {'Spearman':>10}") |
| for env in envs: |
| er = [r for r in results if r["env"] == env] |
| gt = np.array([r["gt_reward"] for r in er]) |
| hyb = np.array([r["hybrid_reward"] for r in er]) |
| if len(gt) > 2: |
| corr, _ = spearmanr(gt, hyb) |
| else: |
| corr = float("nan") |
| print(f" {env:<15} {len(er):>4} {gt.mean():>8.3f} {hyb.mean():>12.3f} {corr:>10.4f}") |
|
|
| |
| fell_samples = [r for r in results if r["fell"]] |
| non_fell_samples = [r for r in results if not r["fell"]] |
| if fell_samples: |
| fell_rewards = [r["hybrid_reward"] for r in fell_samples] |
| print(f"\nFall detection:") |
| print(f" Fell episodes: {len(fell_samples)}, avg hybrid reward: {np.mean(fell_rewards):.3f}") |
| if non_fell_samples: |
| non_fell_rewards = [r["hybrid_reward"] for r in non_fell_samples] |
| print(f" Non-fell episodes: {len(non_fell_samples)}, avg hybrid reward: {np.mean(non_fell_rewards):.3f}") |
|
|
| |
| expert_rewards = np.array([r["hybrid_reward"] for r in results if r["tier"] == "expert"]) |
| simple_rewards = np.array([r["hybrid_reward"] for r in results if r["tier"] == "simple"]) |
| if len(expert_rewards) > 0 and len(simple_rewards) > 0: |
| sep = expert_rewards.mean() - simple_rewards.mean() |
| print(f"\nTier separation (expert - simple): {sep:.3f}") |
| print(f" PASS: {sep > 0.1}") |
|
|
| |
| report = { |
| "total_samples": len(results), |
| "overall": { |
| "spearman_hybrid": corr_hybrid, |
| "spearman_vlm": corr_vlm, |
| "spearman_rule": corr_rule, |
| "mae_hybrid": mae_hybrid, |
| "mae_vlm": mae_vlm, |
| }, |
| "per_tier": { |
| tier: { |
| "count": len(tier_results.get(tier, [])), |
| "gt_mean": float(np.mean([r["gt_reward"] for r in tier_results.get(tier, [])])) if tier_results.get(tier) else 0, |
| "hybrid_mean": float(np.mean([r["hybrid_reward"] for r in tier_results.get(tier, [])])) if tier_results.get(tier) else 0, |
| } |
| for tier in ["expert", "medium", "simple"] |
| }, |
| "tier_separation": float(expert_rewards.mean() - simple_rewards.mean()) if len(expert_rewards) > 0 and len(simple_rewards) > 0 else 0, |
| "results": results, |
| } |
|
|
| report_path = "/data/validation_report.json" |
| with open(report_path, "w") as f: |
| json.dump(report, f, indent=2) |
| print(f"\n[val] report saved to {report_path}") |
|
|
| volume.commit() |
| return report |
|
|
|
|
| @app.local_entrypoint() |
| def main(): |
| result = run_validation.remote() |
| print("\n=== SUMMARY ===") |
| print(f"Samples: {result['total_samples']}") |
| print(f"Spearman (hybrid): {result['overall']['spearman_hybrid']:.4f}") |
| print(f"Spearman (VLM): {result['overall']['spearman_vlm']:.4f}") |
| print(f"Tier separation: {result['tier_separation']:.3f}") |
|
|