| """ |
| RoboMind VLA — Task 12: tests.py |
| |
| End-to-end tests and verification for the entire pipeline. |
| Runs on Modal CPU for data checks, GPU for model checks. |
| |
| Usage: |
| modal run tests.py |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
|
|
| import modal |
|
|
| image = ( |
| modal.Image.debian_slim(python_version="3.11") |
| .pip_install( |
| "numpy<2", |
| "torch==2.4.0", |
| "torchvision==0.19.0", |
| "transformers==4.40.0", |
| "peft==0.11.1", |
| "accelerate==0.30.1", |
| "datasets", |
| "pillow", |
| "huggingface_hub", |
| ) |
| .run_commands( |
| "python -c \"" |
| "import os, sys; " |
| "d = os.path.join(sys.prefix, 'lib/python3.11/site-packages/flash_attn'); " |
| "os.makedirs(d, exist_ok=True); " |
| "open(os.path.join(d, '__init__.py'), 'w').write(''); " |
| "open(os.path.join(d, 'flash_attn_interface.py'), 'w').write(" |
| "'def flash_attn_func(*a, **kw): raise NotImplementedError\\n" |
| "def flash_attn_varlen_func(*a, **kw): raise NotImplementedError\\n'); " |
| "print('flash_attn stub created')\"" |
| ) |
| ) |
|
|
| app = modal.App("robomind-tests") |
| volume = modal.Volume.from_name("robomind-data", create_if_missing=True) |
|
|
|
|
| @app.function(image=image, secrets=[modal.Secret.from_name("huggingface-secret")], timeout=300) |
| def test_data_repos(): |
| """Test 1: Verify HF data repos exist and have correct structure.""" |
| from huggingface_hub import HfApi, hf_hub_download |
| import os |
| from huggingface_hub import login |
|
|
| hf_token = os.environ.get("HF_TOKEN") |
| if hf_token: |
| login(token=hf_token) |
|
|
| api = HfApi() |
| results = {} |
|
|
| |
| try: |
| files = api.list_repo_files("mitvho09/robomind-rollouts") |
| mp4_count = len([f for f in files if f.endswith(".mp4")]) |
| has_metadata = "metadata.jsonl" in files |
| results["rollouts"] = { |
| "exists": True, |
| "mp4_files": mp4_count, |
| "has_metadata": has_metadata, |
| "total_files": len(files), |
| } |
| print(f"[test] rollouts: {mp4_count} mp4s, metadata={has_metadata}") |
| except Exception as e: |
| results["rollouts"] = {"exists": False, "error": str(e)} |
| print(f"[test] rollouts FAILED: {e}") |
|
|
| |
| try: |
| files = api.list_repo_files("mitvho09/robomind-loco-judge-dataset") |
| results["dataset"] = { |
| "exists": True, |
| "total_files": len(files), |
| } |
| print(f"[test] dataset: {len(files)} files") |
| except Exception as e: |
| results["dataset"] = {"exists": False, "error": str(e)} |
| print(f"[test] dataset FAILED: {e}") |
|
|
| |
| try: |
| files = api.list_repo_files("mitvho09/robomind-minicpm-loco-lora") |
| has_adapter = "adapter_model.safetensors" in files |
| has_config = "adapter_config.json" in files |
| results["adapter"] = { |
| "exists": True, |
| "has_adapter_weights": has_adapter, |
| "has_config": has_config, |
| "total_files": len(files), |
| } |
| print(f"[test] adapter: weights={has_adapter}, config={has_config}") |
| except Exception as e: |
| results["adapter"] = {"exists": False, "error": str(e)} |
| print(f"[test] adapter FAILED: {e}") |
|
|
| return results |
|
|
|
|
| @app.function(image=image, timeout=300) |
| def test_dataset_structure(): |
| """Test 2: Verify dataset has correct schema and content.""" |
| from datasets import load_dataset |
|
|
| ds = load_dataset("mitvho09/robomind-loco-judge-dataset", split="train") |
| results = {"n_samples": len(ds)} |
|
|
| required_cols = ["env", "tier", "episode_id", "images", "target_json"] |
| for col in required_cols: |
| results[f"has_{col}"] = col in ds.column_names |
|
|
| envs = set(ds["env"]) |
| tiers = set(ds["tier"]) |
| results["envs"] = sorted(envs) |
| results["tiers"] = sorted(tiers) |
|
|
| n_images = [len(row["images"]) for row in ds] |
| results["min_images"] = min(n_images) |
| results["max_images"] = max(n_images) |
| results["avg_images"] = sum(n_images) / len(n_images) |
|
|
| target_jsons = [json.loads(row["target_json"]) for row in ds] |
| all_keys = set() |
| for tj in target_jsons: |
| all_keys.update(tj.keys()) |
| results["target_json_keys"] = sorted(all_keys) |
|
|
| print(f"[test] dataset: {results['n_samples']} samples, {len(envs)} envs, {len(tiers)} tiers") |
| print(f"[test] images per sample: min={results['min_images']}, max={results['max_images']}") |
| print(f"[test] envs: {results['envs']}") |
| print(f"[test] target keys: {results['target_json_keys']}") |
|
|
| return results |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="A100-40GB", |
| volumes={"/data": volume}, |
| secrets=[modal.Secret.from_name("huggingface-secret")], |
| timeout=600, |
| ) |
| def test_model_load(): |
| """Test 3: Verify model loads correctly with LoRA adapter.""" |
| import torch |
| from transformers import AutoModel, AutoTokenizer |
| from peft import PeftModel |
| from huggingface_hub import login |
| import os |
|
|
| hf_token = os.environ.get("HF_TOKEN") |
| if hf_token: |
| login(token=hf_token) |
|
|
| results = {} |
|
|
| try: |
| tokenizer = AutoTokenizer.from_pretrained( |
| "openbmb/MiniCPM-V-2_6", trust_remote_code=True |
| ) |
| results["tokenizer_loaded"] = True |
| print("[test] tokenizer loaded") |
| except Exception as e: |
| results["tokenizer_loaded"] = False |
| results["tokenizer_error"] = str(e) |
| print(f"[test] tokenizer FAILED: {e}") |
| return results |
|
|
| try: |
| base_model = AutoModel.from_pretrained( |
| "openbmb/MiniCPM-V-2_6", |
| trust_remote_code=True, |
| torch_dtype=torch.bfloat16, |
| device_map="auto", |
| ) |
| results["base_model_loaded"] = True |
| print("[test] base model loaded") |
| except Exception as e: |
| results["base_model_loaded"] = False |
| results["base_model_error"] = str(e) |
| print(f"[test] base model FAILED: {e}") |
| return results |
|
|
| try: |
| model = PeftModel.from_pretrained(base_model, "mitvho09/robomind-minicpm-loco-lora") |
| results["adapter_loaded"] = True |
| print("[test] LoRA adapter loaded") |
| except Exception as e: |
| results["adapter_loaded"] = False |
| results["adapter_error"] = str(e) |
| print(f"[test] adapter FAILED: {e}") |
| return results |
|
|
| model.eval() |
| results["model_ready"] = True |
| print("[test] model ready for inference") |
| return results |
|
|
|
|
| @app.function( |
| image=image, |
| gpu="A100-40GB", |
| volumes={"/data": volume}, |
| secrets=[modal.Secret.from_name("huggingface-secret")], |
| timeout=600, |
| ) |
| def test_inference(): |
| """Test 4: Run a dummy inference to verify the model generates output.""" |
| import os |
| import torch |
| import json |
| from transformers import AutoModel, AutoTokenizer |
| from peft import PeftModel |
| from huggingface_hub import login |
| from PIL import Image |
| import numpy as np |
|
|
| hf_token = os.environ.get("HF_TOKEN") |
| if hf_token: |
| login(token=hf_token) |
|
|
| results = {} |
|
|
| tokenizer = AutoTokenizer.from_pretrained( |
| "openbmb/MiniCPM-V-2_6", trust_remote_code=True |
| ) |
| base_model = AutoModel.from_pretrained( |
| "openbmb/MiniCPM-V-2_6", |
| trust_remote_code=True, |
| torch_dtype=torch.bfloat16, |
| device_map="auto", |
| ) |
| model = PeftModel.from_pretrained(base_model, "mitvho09/robomind-minicpm-loco-lora") |
| model.eval() |
|
|
| dummy_images = [ |
| Image.fromarray(np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)) |
| for _ in range(3) |
| ] |
|
|
| instruction = ( |
| "You are RoboMind VLA, a vision-language reward model for humanoid " |
| "locomotion. You are shown keyframes from a robot locomotion rollout. " |
| "The robot was commanded to \"walk forward\". Analyze the rollout and " |
| "respond with ONLY a JSON object with these exact keys: timestep_range, " |
| "phase, command, command_followed, stability, fall_risk, gait_quality, " |
| "predicted_reward, anomaly, explanation." |
| ) |
|
|
| image_tokens = "\n".join(f"<image_{k:02d}>" for k in range(3)) |
| user_content = f"{image_tokens}\n{instruction}" |
|
|
| with torch.no_grad(): |
| output = model.chat( |
| image=dummy_images, |
| msgs=[{"role": "user", "content": user_content}], |
| tokenizer=tokenizer, |
| max_new_tokens=256, |
| ) |
|
|
| response = output if isinstance(output, str) else str(output) |
| results["response_length"] = len(response) |
| results["response_preview"] = response[:300] |
|
|
| has_json = "{" in response and "}" in response |
| results["contains_json"] = has_json |
|
|
| print(f"[test] inference OK: {len(response)} chars") |
| print(f"[test] response: {response[:300]}") |
| return results |
|
|
|
|
| @app.function(image=image, timeout=300) |
| def test_volume_data(): |
| """Test 5: Verify Modal volume has expected data.""" |
| import os |
|
|
| results = {} |
|
|
| ft_dir = "/data/ft" |
| if os.path.exists(ft_dir): |
| images_dir = os.path.join(ft_dir, "images") |
| if os.path.exists(images_dir): |
| n_images = len([f for f in os.listdir(images_dir) if f.endswith(".jpg")]) |
| results["ft_images"] = n_images |
| print(f"[test] volume images: {n_images}") |
|
|
| train_json = os.path.join(ft_dir, "train.json") |
| if os.path.exists(train_json): |
| with open(train_json) as f: |
| data = json.load(f) |
| results["train_samples"] = len(data) |
| print(f"[test] volume train.json: {len(data)} samples") |
|
|
| lora_dir = os.path.join(ft_dir, "lora_output") |
| if os.path.exists(lora_dir): |
| results["lora_output_exists"] = True |
| has_adapter = any("adapter_model" in f for f in os.listdir(lora_dir)) |
| results["has_adapter_in_output"] = has_adapter |
| print(f"[test] LoRA output exists, adapter={has_adapter}") |
|
|
| mcpm_dir = "/data/MiniCPM-V" |
| results["minicpm_cloned"] = os.path.exists(mcpm_dir) |
|
|
| print(f"[test] volume data: {results}") |
| return results |
|
|
|
|
| @app.local_entrypoint() |
| def main(): |
| """Run all tests.""" |
| print("=" * 60) |
| print("RoboMind VLA — End-to-End Tests") |
| print("=" * 60) |
|
|
| all_results = {} |
|
|
| print("\n--- Test 1: HF Data Repos ---") |
| all_results["data_repos"] = test_data_repos.remote() |
|
|
| print("\n--- Test 2: Dataset Structure ---") |
| all_results["dataset_structure"] = test_dataset_structure.remote() |
|
|
| print("\n--- Test 3: Model Load ---") |
| all_results["model_load"] = test_model_load.remote() |
|
|
| print("\n--- Test 4: Inference ---") |
| all_results["inference"] = test_inference.remote() |
|
|
| print("\n--- Test 5: Volume Data ---") |
| all_results["volume_data"] = test_volume_data.remote() |
|
|
| print("\n" + "=" * 60) |
| print("SUMMARY") |
| print("=" * 60) |
|
|
| passed = 0 |
| failed = 0 |
| for test_name, result in all_results.items(): |
| has_error = any("error" in str(v).lower() for v in result.values() if isinstance(v, str)) |
| has_false = any(v is False for v in result.values() if isinstance(v, bool)) |
| if has_error or has_false: |
| status = "FAIL" |
| failed += 1 |
| else: |
| status = "PASS" |
| passed += 1 |
| print(f" {test_name}: {status}") |
|
|
| print(f"\nTotal: {passed} passed, {failed} failed, {passed + failed} total") |
| return all_results |
|
|