| """ |
| RoboMind VLA — Task 6: model_utils.py |
| |
| Inference utilities for the fine-tuned MiniCPM-V reward judge. |
| Runs on Modal GPU (A100-40GB) for inference. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import modal |
|
|
| image = ( |
| modal.Image.debian_slim(python_version="3.11") |
| .pip_install( |
| "torch==2.4.0", |
| "torchvision==0.19.0", |
| "transformers==4.40.0", |
| "peft==0.11.1", |
| "accelerate==0.30.1", |
| "pillow", |
| "sentencepiece", |
| "huggingface_hub", |
| ) |
| .run_commands( |
| "python -c \"" |
| "import os, sys; " |
| "d = os.path.join(sys.prefix, 'lib/python3.11/site-packages/flash_attn'); " |
| "os.makedirs(d, exist_ok=True); " |
| "open(os.path.join(d, '__init__.py'), 'w').write(''); " |
| "open(os.path.join(d, 'flash_attn_interface.py'), 'w').write(" |
| "'def flash_attn_func(*a, **kw): raise NotImplementedError\\n" |
| "def flash_attn_varlen_func(*a, **kw): raise NotImplementedError\\n'); " |
| "print('flash_attn stub created')\"" |
| ) |
| ) |
|
|
| app = modal.App("robomind-inference") |
| volume = modal.Volume.from_name("robomind-data", create_if_missing=True) |
| ADAPTER_REPO = "mitvho09/robomind-minicpm-loco-lora" |
|
|
| INSTRUCTION_PROMPT = ( |
| "You are RoboMind VLA, a vision-language reward model for humanoid " |
| "locomotion. You are shown keyframes from a robot locomotion rollout. " |
| "The robot was commanded to \"walk forward\". Analyze the rollout and " |
| "respond with ONLY a JSON object with these exact keys: timestep_range, " |
| "phase, command, command_followed, stability, fall_risk, gait_quality, " |
| "predicted_reward, anomaly, explanation." |
| ) |
|
|
| JudgeOutput = dict |
|
|
|
|
| @app.cls( |
| image=image, |
| gpu="A100-40GB", |
| volumes={"/data": volume}, |
| secrets=[modal.Secret.from_name("huggingface-secret")], |
| timeout=600, |
| container_idle_timeout=120, |
| ) |
| class RoboMindJudge: |
| """LoRA-fine-tuned MiniCPM-V judge for humanoid locomotion reward.""" |
|
|
| @modal.enter() |
| def load_model(self): |
| import os |
| import torch |
| from transformers import AutoModel, AutoTokenizer |
| from peft import PeftModel |
| from huggingface_hub import login |
|
|
| hf_token = os.environ.get("HF_TOKEN") |
| if hf_token: |
| login(token=hf_token) |
|
|
| print("[rm] loading base MiniCPM-V-2_6...") |
| model_id = "openbmb/MiniCPM-V-2_6" |
| self.tokenizer = AutoTokenizer.from_pretrained( |
| model_id, trust_remote_code=True |
| ) |
| base_model = AutoModel.from_pretrained( |
| model_id, |
| trust_remote_code=True, |
| torch_dtype=torch.bfloat16, |
| device_map="auto", |
| ) |
|
|
| print("[rm] loading LoRA adapter...") |
| self.model = PeftModel.from_pretrained(base_model, ADAPTER_REPO) |
| self.model.eval() |
| print("[rm] model ready") |
|
|
| @modal.method() |
| def judge_rollout( |
| self, |
| image_paths: list[str], |
| ) -> JudgeOutput: |
| """Run the judge on a list of keyframe image paths (up to 6). |
| |
| Returns a dict with keys: |
| timestep_range, phase, command, command_followed, stability, |
| fall_risk, gait_quality, predicted_reward, anomaly, explanation |
| """ |
| from PIL import Image |
|
|
| n_images = min(len(image_paths), 6) |
| image_tokens = "\n".join( |
| f"<image_{k:02d}>" for k in range(n_images) |
| ) |
| user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}" |
|
|
| images = [] |
| for path in image_paths[:n_images]: |
| images.append(Image.open(path).convert("RGB")) |
|
|
| with self.model.generate( |
| image=images, |
| msgs=[{"role": "user", "content": user_content}], |
| tokenizer=self.tokenizer, |
| max_new_tokens=512, |
| ) as gen: |
| output_ids = gen.sequences[0] |
|
|
| response = self.tokenizer.decode(output_ids, skip_special_tokens=True) |
| return self._parse_judge_response(response) |
|
|
| @modal.method() |
| def judge_from_pil(self, images: list) -> JudgeOutput: |
| """Run the judge on a list of PIL Images directly.""" |
| import torch |
|
|
| n_images = min(len(images), 6) |
| image_tokens = "\n".join( |
| f"<image_{k:02d}>" for k in range(n_images) |
| ) |
| user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}" |
|
|
| response = self.model.chat( |
| image=images[:n_images], |
| msgs=[{"role": "user", "content": user_content}], |
| tokenizer=self.tokenizer, |
| max_new_tokens=512, |
| ) |
| response = response if isinstance(response, str) else str(response) |
| return self._parse_judge_response(response) |
|
|
| @staticmethod |
| def _parse_judge_response(response: str) -> JudgeOutput: |
| """Parse the model's JSON response into a structured dict.""" |
| import re |
|
|
| response = response.strip() |
|
|
| json_match = re.search(r'\{[^{}]*\}', response, re.DOTALL) |
| if json_match: |
| try: |
| parsed = json.loads(json_match.group()) |
| return parsed |
| except json.JSONDecodeError: |
| pass |
|
|
| parsed = {} |
| for key in [ |
| "timestep_range", "phase", "command", "command_followed", |
| "stability", "fall_risk", "gait_quality", "predicted_reward", |
| "anomaly", "explanation", |
| ]: |
| pattern = rf'"{key}"\s*:\s*"([^"]*)"' |
| match = re.search(pattern, response) |
| if match: |
| parsed[key] = match.group(1) |
| else: |
| pattern_num = rf'"{key}"\s*:\s*([\d.]+)' |
| match_num = re.search(pattern_num, response) |
| if match_num: |
| try: |
| parsed[key] = float(match_num.group(1)) |
| except ValueError: |
| parsed[key] = match_num.group(1) |
| else: |
| parsed[key] = "" |
|
|
| if not parsed: |
| parsed = { |
| "error": "Failed to parse response", |
| "raw_response": response, |
| } |
|
|
| return parsed |
|
|
|
|
| @app.local_entrypoint() |
| def main(): |
| """Quick smoke test: judge a sample rollout.""" |
| judge = RoboMindJudge() |
| result = judge.judge_rollout.remote(image_paths=[]) |
| print("RESULT:", json.dumps(result, indent=2)) |
|
|