""" RoboMind VLA — Task 6: model_utils.py Inference utilities for the fine-tuned MiniCPM-V reward judge. Runs on Modal GPU (A100-40GB) for inference. """ from __future__ import annotations import json import modal image = ( modal.Image.debian_slim(python_version="3.11") .pip_install( "torch==2.4.0", "torchvision==0.19.0", "transformers==4.40.0", "peft==0.11.1", "accelerate==0.30.1", "pillow", "sentencepiece", "huggingface_hub", ) .run_commands( "python -c \"" "import os, sys; " "d = os.path.join(sys.prefix, 'lib/python3.11/site-packages/flash_attn'); " "os.makedirs(d, exist_ok=True); " "open(os.path.join(d, '__init__.py'), 'w').write(''); " "open(os.path.join(d, 'flash_attn_interface.py'), 'w').write(" "'def flash_attn_func(*a, **kw): raise NotImplementedError\\n" "def flash_attn_varlen_func(*a, **kw): raise NotImplementedError\\n'); " "print('flash_attn stub created')\"" ) ) app = modal.App("robomind-inference") volume = modal.Volume.from_name("robomind-data", create_if_missing=True) ADAPTER_REPO = "mitvho09/robomind-minicpm-loco-lora" INSTRUCTION_PROMPT = ( "You are RoboMind VLA, a vision-language reward model for humanoid " "locomotion. You are shown keyframes from a robot locomotion rollout. " "The robot was commanded to \"walk forward\". Analyze the rollout and " "respond with ONLY a JSON object with these exact keys: timestep_range, " "phase, command, command_followed, stability, fall_risk, gait_quality, " "predicted_reward, anomaly, explanation." ) JudgeOutput = dict @app.cls( image=image, gpu="A100-40GB", volumes={"/data": volume}, secrets=[modal.Secret.from_name("huggingface-secret")], timeout=600, container_idle_timeout=120, ) class RoboMindJudge: """LoRA-fine-tuned MiniCPM-V judge for humanoid locomotion reward.""" @modal.enter() def load_model(self): import os import torch from transformers import AutoModel, AutoTokenizer from peft import PeftModel from huggingface_hub import login hf_token = os.environ.get("HF_TOKEN") if hf_token: login(token=hf_token) print("[rm] loading base MiniCPM-V-2_6...") model_id = "openbmb/MiniCPM-V-2_6" self.tokenizer = AutoTokenizer.from_pretrained( model_id, trust_remote_code=True ) base_model = AutoModel.from_pretrained( model_id, trust_remote_code=True, torch_dtype=torch.bfloat16, device_map="auto", ) print("[rm] loading LoRA adapter...") self.model = PeftModel.from_pretrained(base_model, ADAPTER_REPO) self.model.eval() print("[rm] model ready") @modal.method() def judge_rollout( self, image_paths: list[str], ) -> JudgeOutput: """Run the judge on a list of keyframe image paths (up to 6). Returns a dict with keys: timestep_range, phase, command, command_followed, stability, fall_risk, gait_quality, predicted_reward, anomaly, explanation """ from PIL import Image n_images = min(len(image_paths), 6) image_tokens = "\n".join( f"" for k in range(n_images) ) user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}" images = [] for path in image_paths[:n_images]: images.append(Image.open(path).convert("RGB")) with self.model.generate( image=images, msgs=[{"role": "user", "content": user_content}], tokenizer=self.tokenizer, max_new_tokens=512, ) as gen: output_ids = gen.sequences[0] response = self.tokenizer.decode(output_ids, skip_special_tokens=True) return self._parse_judge_response(response) @modal.method() def judge_from_pil(self, images: list) -> JudgeOutput: """Run the judge on a list of PIL Images directly.""" import torch n_images = min(len(images), 6) image_tokens = "\n".join( f"" for k in range(n_images) ) user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}" response = self.model.chat( image=images[:n_images], msgs=[{"role": "user", "content": user_content}], tokenizer=self.tokenizer, max_new_tokens=512, ) response = response if isinstance(response, str) else str(response) return self._parse_judge_response(response) @staticmethod def _parse_judge_response(response: str) -> JudgeOutput: """Parse the model's JSON response into a structured dict.""" import re response = response.strip() json_match = re.search(r'\{[^{}]*\}', response, re.DOTALL) if json_match: try: parsed = json.loads(json_match.group()) return parsed except json.JSONDecodeError: pass parsed = {} for key in [ "timestep_range", "phase", "command", "command_followed", "stability", "fall_risk", "gait_quality", "predicted_reward", "anomaly", "explanation", ]: pattern = rf'"{key}"\s*:\s*"([^"]*)"' match = re.search(pattern, response) if match: parsed[key] = match.group(1) else: pattern_num = rf'"{key}"\s*:\s*([\d.]+)' match_num = re.search(pattern_num, response) if match_num: try: parsed[key] = float(match_num.group(1)) except ValueError: parsed[key] = match_num.group(1) else: parsed[key] = "" if not parsed: parsed = { "error": "Failed to parse response", "raw_response": response, } return parsed @app.local_entrypoint() def main(): """Quick smoke test: judge a sample rollout.""" judge = RoboMindJudge() result = judge.judge_rollout.remote(image_paths=[]) print("RESULT:", json.dumps(result, indent=2))