robomind-vla / model_utils.py
mitvho09's picture
RoboMind VLA: vision-language reward model for robot locomotion (built with Codex)
321ba64 verified
Raw
History Blame Contribute Delete
6.38 kB
"""
RoboMind VLA — Task 6: model_utils.py
Inference utilities for the fine-tuned MiniCPM-V reward judge.
Runs on Modal GPU (A100-40GB) for inference.
"""
from __future__ import annotations
import json
import modal
image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install(
"torch==2.4.0",
"torchvision==0.19.0",
"transformers==4.40.0",
"peft==0.11.1",
"accelerate==0.30.1",
"pillow",
"sentencepiece",
"huggingface_hub",
)
.run_commands(
"python -c \""
"import os, sys; "
"d = os.path.join(sys.prefix, 'lib/python3.11/site-packages/flash_attn'); "
"os.makedirs(d, exist_ok=True); "
"open(os.path.join(d, '__init__.py'), 'w').write(''); "
"open(os.path.join(d, 'flash_attn_interface.py'), 'w').write("
"'def flash_attn_func(*a, **kw): raise NotImplementedError\\n"
"def flash_attn_varlen_func(*a, **kw): raise NotImplementedError\\n'); "
"print('flash_attn stub created')\""
)
)
app = modal.App("robomind-inference")
volume = modal.Volume.from_name("robomind-data", create_if_missing=True)
ADAPTER_REPO = "mitvho09/robomind-minicpm-loco-lora"
INSTRUCTION_PROMPT = (
"You are RoboMind VLA, a vision-language reward model for humanoid "
"locomotion. You are shown keyframes from a robot locomotion rollout. "
"The robot was commanded to \"walk forward\". Analyze the rollout and "
"respond with ONLY a JSON object with these exact keys: timestep_range, "
"phase, command, command_followed, stability, fall_risk, gait_quality, "
"predicted_reward, anomaly, explanation."
)
JudgeOutput = dict
@app.cls(
image=image,
gpu="A100-40GB",
volumes={"/data": volume},
secrets=[modal.Secret.from_name("huggingface-secret")],
timeout=600,
container_idle_timeout=120,
)
class RoboMindJudge:
"""LoRA-fine-tuned MiniCPM-V judge for humanoid locomotion reward."""
@modal.enter()
def load_model(self):
import os
import torch
from transformers import AutoModel, AutoTokenizer
from peft import PeftModel
from huggingface_hub import login
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
login(token=hf_token)
print("[rm] loading base MiniCPM-V-2_6...")
model_id = "openbmb/MiniCPM-V-2_6"
self.tokenizer = AutoTokenizer.from_pretrained(
model_id, trust_remote_code=True
)
base_model = AutoModel.from_pretrained(
model_id,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="auto",
)
print("[rm] loading LoRA adapter...")
self.model = PeftModel.from_pretrained(base_model, ADAPTER_REPO)
self.model.eval()
print("[rm] model ready")
@modal.method()
def judge_rollout(
self,
image_paths: list[str],
) -> JudgeOutput:
"""Run the judge on a list of keyframe image paths (up to 6).
Returns a dict with keys:
timestep_range, phase, command, command_followed, stability,
fall_risk, gait_quality, predicted_reward, anomaly, explanation
"""
from PIL import Image
n_images = min(len(image_paths), 6)
image_tokens = "\n".join(
f"<image_{k:02d}>" for k in range(n_images)
)
user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}"
images = []
for path in image_paths[:n_images]:
images.append(Image.open(path).convert("RGB"))
with self.model.generate(
image=images,
msgs=[{"role": "user", "content": user_content}],
tokenizer=self.tokenizer,
max_new_tokens=512,
) as gen:
output_ids = gen.sequences[0]
response = self.tokenizer.decode(output_ids, skip_special_tokens=True)
return self._parse_judge_response(response)
@modal.method()
def judge_from_pil(self, images: list) -> JudgeOutput:
"""Run the judge on a list of PIL Images directly."""
import torch
n_images = min(len(images), 6)
image_tokens = "\n".join(
f"<image_{k:02d}>" for k in range(n_images)
)
user_content = f"{image_tokens}\n{INSTRUCTION_PROMPT}"
response = self.model.chat(
image=images[:n_images],
msgs=[{"role": "user", "content": user_content}],
tokenizer=self.tokenizer,
max_new_tokens=512,
)
response = response if isinstance(response, str) else str(response)
return self._parse_judge_response(response)
@staticmethod
def _parse_judge_response(response: str) -> JudgeOutput:
"""Parse the model's JSON response into a structured dict."""
import re
response = response.strip()
json_match = re.search(r'\{[^{}]*\}', response, re.DOTALL)
if json_match:
try:
parsed = json.loads(json_match.group())
return parsed
except json.JSONDecodeError:
pass
parsed = {}
for key in [
"timestep_range", "phase", "command", "command_followed",
"stability", "fall_risk", "gait_quality", "predicted_reward",
"anomaly", "explanation",
]:
pattern = rf'"{key}"\s*:\s*"([^"]*)"'
match = re.search(pattern, response)
if match:
parsed[key] = match.group(1)
else:
pattern_num = rf'"{key}"\s*:\s*([\d.]+)'
match_num = re.search(pattern_num, response)
if match_num:
try:
parsed[key] = float(match_num.group(1))
except ValueError:
parsed[key] = match_num.group(1)
else:
parsed[key] = ""
if not parsed:
parsed = {
"error": "Failed to parse response",
"raw_response": response,
}
return parsed
@app.local_entrypoint()
def main():
"""Quick smoke test: judge a sample rollout."""
judge = RoboMindJudge()
result = judge.judge_rollout.remote(image_paths=[])
print("RESULT:", json.dumps(result, indent=2))