robomind-vla / validation.py
mitvho09's picture
RoboMind VLA: vision-language reward model for robot locomotion (built with Codex)
321ba64 verified
Raw
History Blame Contribute Delete
10.7 kB
"""
RoboMind VLA — Validation & Test Suite.
Runs on Modal (CPU for analysis, GPU for inference).
Tests model accuracy per-tier, hybrid blending quality, and end-to-end pipeline.
Run:
modal run validation.py
"""
from __future__ import annotations
import modal
image = (
modal.Image.debian_slim(python_version="3.11")
.apt_install("ffmpeg")
.pip_install(
"torch==2.4.0",
"torchvision==0.19.0",
"transformers==4.40.0",
"peft==0.11.1",
"accelerate==0.30.1",
"pillow",
"sentencepiece",
"huggingface_hub",
"numpy<2",
"opencv-python-headless",
"datasets",
"scipy",
"scikit-learn",
)
.run_commands(
"python -c \""
"import os, sys; "
"d = os.path.join(sys.prefix, 'lib/python3.11/site-packages/flash_attn'); "
"os.makedirs(d, exist_ok=True); "
"open(os.path.join(d, '__init__.py'), 'w').write(''); "
"open(os.path.join(d, 'flash_attn_interface.py'), 'w').write("
"'def flash_attn_func(*a, **kw): raise NotImplementedError\\n"
"def flash_attn_varlen_func(*a, **kw): raise NotImplementedError\\n'); "
"print('flash_attn stub created')\""
)
.add_local_file("hybrid_judge.py", "/root/hybrid_judge.py")
)
app = modal.App("robomind-validation")
volume = modal.Volume.from_name("robomind-data", create_if_missing=True)
ADAPTER_REPO = "mitvho09/robomind-minicpm-loco-lora"
DATASET_REPO = "mitvho09/robomind-loco-judge-dataset"
INSTRUCTION_PROMPT = (
"You are RoboMind VLA, a vision-language reward model for robot locomotion. "
"You are shown keyframes from a MuJoCo locomotion rollout. "
"The robot was commanded to \"walk forward\". Analyze the rollout and "
"respond with ONLY a JSON object with these exact keys: timestep_range, "
"phase, command, command_followed, stability, fall_risk, gait_quality, "
"predicted_reward, anomaly, explanation."
)
@app.function(
image=image,
gpu="A100-40GB",
volumes={"/data": volume},
secrets=[modal.Secret.from_name("huggingface-secret")],
timeout=3600,
)
def run_validation():
"""Run full validation: VLM predictions vs ground-truth labels."""
import json
import os
import sys
import re
import numpy as np
import torch
from datasets import load_dataset
from huggingface_hub import login
from peft import PeftModel
from PIL import Image
from transformers import AutoModel, AutoTokenizer
from scipy.stats import spearmanr, pearsonr
from sklearn.metrics import mean_absolute_error, mean_squared_error
sys.path.insert(0, "/root")
from hybrid_judge import hybrid_judge, hybrid_to_dict
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
login(token=hf_token)
# --- 1. Load model ---
print("[val] loading model...")
tokenizer = AutoTokenizer.from_pretrained(
"openbmb/MiniCPM-V-2_6", trust_remote_code=True
)
base_model = AutoModel.from_pretrained(
"openbmb/MiniCPM-V-2_6",
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="auto",
)
model = PeftModel.from_pretrained(base_model, ADAPTER_REPO)
model.eval()
print("[val] model loaded")
# --- 2. Load dataset ---
print("[val] loading dataset...")
ds = load_dataset(DATASET_REPO, split="train")
# --- 3. Run predictions ---
print(f"[val] running predictions on {len(ds)} samples...")
results = []
tier_results = {}
for idx, row in enumerate(ds):
images = row["images"][:6]
target = json.loads(row["target_json"])
env = row["env"]
tier = row["tier"]
gt_reward = target.get("predicted_reward", 0.5)
gt_norm = row["gt_norm_reward"]
fell = row["fell"]
# Run VLM inference
n = min(len(images), 6)
image_tokens = "\n".join(f"<image_{k:02d}>" for k in range(n))
user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}"
with torch.no_grad():
response = model.chat(
image=images[:n],
msgs=[{"role": "user", "content": user_content}],
tokenizer=tokenizer,
max_new_tokens=512,
)
response = response if isinstance(response, str) else str(response)
# Parse VLM output
parsed = {}
json_match = re.search(r'\{[^{}]*\}', response, re.DOTALL)
if json_match:
try:
parsed = json.loads(json_match.group())
except json.JSONDecodeError:
pass
# Run hybrid judge
hybrid = hybrid_judge(
vlm_parsed=parsed,
ep_return=row.get("gt_return", 0),
min_return=0,
max_return=1,
fell=fell,
num_steps=0,
tier=tier,
env=env,
)
hybrid_dict = hybrid_to_dict(hybrid)
vlm_reward_raw = parsed.get("predicted_reward", 0.5)
if isinstance(vlm_reward_raw, str):
try:
vlm_reward_raw = float(vlm_reward_raw)
except ValueError:
vlm_reward_raw = 0.5
result = {
"idx": idx,
"env": env,
"tier": tier,
"gt_reward": gt_reward,
"gt_norm": gt_norm,
"vlm_reward": hybrid.vlm_reward,
"rule_reward": hybrid.rule_reward,
"hybrid_reward": hybrid.blended_reward,
"vlm_raw": vlm_reward_raw,
"fell": fell,
"stability": parsed.get("stability", "unknown"),
}
results.append(result)
if tier not in tier_results:
tier_results[tier] = []
tier_results[tier].append(result)
if (idx + 1) % 10 == 0:
print(f" [{idx+1}/{len(ds)}] processed")
# --- 4. Compute metrics ---
print("\n" + "=" * 70)
print("VALIDATION RESULTS")
print("=" * 70)
gt_rewards = np.array([r["gt_reward"] for r in results])
hybrid_rewards = np.array([r["hybrid_reward"] for r in results])
vlm_rewards = np.array([r["vlm_reward"] for r in results])
rule_rewards = np.array([r["rule_reward"] for r in results])
# Overall metrics
corr_hybrid, _ = spearmanr(gt_rewards, hybrid_rewards)
corr_vlm, _ = spearmanr(gt_rewards, vlm_rewards)
corr_rule, _ = spearmanr(gt_rewards, rule_rewards)
mae_hybrid = mean_absolute_error(gt_rewards, hybrid_rewards)
mae_vlm = mean_absolute_error(gt_rewards, vlm_rewards)
print(f"\nOverall ({len(results)} samples):")
print(f" Spearman correlation:")
print(f" Hybrid: {corr_hybrid:.4f}")
print(f" VLM: {corr_vlm:.4f}")
print(f" Rule: {corr_rule:.4f}")
print(f" MAE:")
print(f" Hybrid: {mae_hybrid:.4f}")
print(f" VLM: {mae_vlm:.4f}")
# Per-tier metrics
print(f"\nPer-tier breakdown:")
print(f" {'Tier':<12} {'N':>4} {'GT_mean':>8} {'Hybrid_mean':>12} {'VLM_mean':>10} {'Rule_mean':>10} {'MAE_hyb':>8}")
for tier in ["expert", "medium", "simple"]:
tr = tier_results.get(tier, [])
if not tr:
continue
gt = np.array([r["gt_reward"] for r in tr])
hyb = np.array([r["hybrid_reward"] for r in tr])
vlm = np.array([r["vlm_reward"] for r in tr])
rul = np.array([r["rule_reward"] for r in tr])
mae = mean_absolute_error(gt, hyb)
print(f" {tier:<12} {len(tr):>4} {gt.mean():>8.3f} {hyb.mean():>12.3f} {vlm.mean():>10.3f} {rul.mean():>10.3f} {mae:>8.3f}")
# Per-env metrics
envs = sorted(set(r["env"] for r in results))
print(f"\nPer-env breakdown:")
print(f" {'Env':<15} {'N':>4} {'GT_mean':>8} {'Hybrid_mean':>12} {'Spearman':>10}")
for env in envs:
er = [r for r in results if r["env"] == env]
gt = np.array([r["gt_reward"] for r in er])
hyb = np.array([r["hybrid_reward"] for r in er])
if len(gt) > 2:
corr, _ = spearmanr(gt, hyb)
else:
corr = float("nan")
print(f" {env:<15} {len(er):>4} {gt.mean():>8.3f} {hyb.mean():>12.3f} {corr:>10.4f}")
# Fall detection accuracy
fell_samples = [r for r in results if r["fell"]]
non_fell_samples = [r for r in results if not r["fell"]]
if fell_samples:
fell_rewards = [r["hybrid_reward"] for r in fell_samples]
print(f"\nFall detection:")
print(f" Fell episodes: {len(fell_samples)}, avg hybrid reward: {np.mean(fell_rewards):.3f}")
if non_fell_samples:
non_fell_rewards = [r["hybrid_reward"] for r in non_fell_samples]
print(f" Non-fell episodes: {len(non_fell_samples)}, avg hybrid reward: {np.mean(non_fell_rewards):.3f}")
# Tier separation test
expert_rewards = np.array([r["hybrid_reward"] for r in results if r["tier"] == "expert"])
simple_rewards = np.array([r["hybrid_reward"] for r in results if r["tier"] == "simple"])
if len(expert_rewards) > 0 and len(simple_rewards) > 0:
sep = expert_rewards.mean() - simple_rewards.mean()
print(f"\nTier separation (expert - simple): {sep:.3f}")
print(f" PASS: {sep > 0.1}")
# Save results
report = {
"total_samples": len(results),
"overall": {
"spearman_hybrid": corr_hybrid,
"spearman_vlm": corr_vlm,
"spearman_rule": corr_rule,
"mae_hybrid": mae_hybrid,
"mae_vlm": mae_vlm,
},
"per_tier": {
tier: {
"count": len(tier_results.get(tier, [])),
"gt_mean": float(np.mean([r["gt_reward"] for r in tier_results.get(tier, [])])) if tier_results.get(tier) else 0,
"hybrid_mean": float(np.mean([r["hybrid_reward"] for r in tier_results.get(tier, [])])) if tier_results.get(tier) else 0,
}
for tier in ["expert", "medium", "simple"]
},
"tier_separation": float(expert_rewards.mean() - simple_rewards.mean()) if len(expert_rewards) > 0 and len(simple_rewards) > 0 else 0,
"results": results,
}
report_path = "/data/validation_report.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f"\n[val] report saved to {report_path}")
volume.commit()
return report
@app.local_entrypoint()
def main():
result = run_validation.remote()
print("\n=== SUMMARY ===")
print(f"Samples: {result['total_samples']}")
print(f"Spearman (hybrid): {result['overall']['spearman_hybrid']:.4f}")
print(f"Spearman (VLM): {result['overall']['spearman_vlm']:.4f}")
print(f"Tier separation: {result['tier_separation']:.3f}")