jshah13's picture
Upload server/robosim/vision.py with huggingface_hub
5dd283a verified
"""
Vision layer β€” converts camera images into symbolic state.
This is what runs in front of the stub when you have a real camera.
The stub bypasses this entirely and gives symbolic state directly.
Three modes:
1. Stub mode (default): skip vision, get symbolic state from sim config
2. Sim vision mode: run perception on MuJoCo camera renders
3. Real camera mode: run perception on actual robot camera feed
The LLM sees identical observations in all three modes.
That's the point β€” we can train in stub mode and deploy with real vision.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import numpy as np
@dataclass
class VisionResult:
"""Symbolic facts extracted from an image."""
detected_objects: list[dict] # [{name, x, y, z, confidence}]
gripper_pos: Optional[np.ndarray]
gripper_open: Optional[bool]
depth_map: Optional[np.ndarray] # HxW float array, meters
# ── Stub mode: no vision, use sim ground truth ────────────────────────
def stub_vision(sim_state) -> VisionResult:
"""
In stub mode, we already have ground-truth symbolic state.
No vision model needed.
"""
objects = [
{
"name": name,
"x": float(obj.pos[0]),
"y": float(obj.pos[1]),
"z": float(obj.pos[2]),
"confidence": 1.0,
"reachable": obj.reachable,
}
for name, obj in sim_state.objects.items()
]
return VisionResult(
detected_objects=objects,
gripper_pos=sim_state.gripper_pos,
gripper_open=sim_state.gripper_open,
depth_map=None,
)
# ── Sim vision: run YOLO on MuJoCo camera renders ─────────────────────
def sim_vision(rgb_image: np.ndarray, depth_image: Optional[np.ndarray] = None,
camera_matrix: Optional[np.ndarray] = None) -> VisionResult:
"""
Run object detection on a rendered MuJoCo camera image.
Used when use_stub=False to extract state from the virtual camera.
rgb_image: HxWx3 uint8 array from robosuite
depth_image: HxW float array (optional, improves 3D localization)
"""
try:
from ultralytics import YOLO
model = _get_yolo_model()
return _run_yolo(model, rgb_image, depth_image, camera_matrix)
except ImportError:
# YOLO not installed β€” fall back to color-based detection
return _color_detection(rgb_image, depth_image)
def _get_yolo_model():
"""Load YOLO model (cached after first call)."""
from ultralytics import YOLO
if not hasattr(_get_yolo_model, "_model"):
# Use YOLOv8n (nano) β€” fast enough for real-time robot control
# For better accuracy: use yolov8m or fine-tune on robot images
_get_yolo_model._model = YOLO("yolov8n.pt")
return _get_yolo_model._model
def _run_yolo(model, rgb: np.ndarray, depth: Optional[np.ndarray],
camera_matrix: Optional[np.ndarray]) -> VisionResult:
"""Run YOLO detection and convert to symbolic object list."""
results = model(rgb, verbose=False)[0]
objects = []
for box in results.boxes:
cls_name = model.names[int(box.cls[0])]
conf = float(box.conf[0])
if conf < 0.4:
continue
# Get 2D center
x1, y1, x2, y2 = box.xyxy[0].tolist()
cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
# Get 3D position if depth available
if depth is not None and camera_matrix is not None:
z = float(depth[int(cy), int(cx)])
x3d, y3d = _pixel_to_world(cx, cy, z, camera_matrix)
else:
x3d, y3d, z = 0.0, 0.0, 0.85
objects.append({
"name": _map_class_to_block(cls_name),
"x": x3d, "y": y3d, "z": z,
"confidence": conf,
"reachable": True, # blocking computed separately
})
return VisionResult(
detected_objects=objects,
gripper_pos=None, # detected separately from robot state
gripper_open=None,
depth_map=depth,
)
def _color_detection(rgb: np.ndarray, depth: Optional[np.ndarray]) -> VisionResult:
"""
Simple color-based object detection when YOLO isn't available.
Works for colored blocks on a plain table surface.
"""
import cv2
COLOR_RANGES = {
"red_block": ([0,100,100], [10,255,255]),
"blue_block": ([100,100,100], [130,255,255]),
"green_block": ([40,100,100], [80,255,255]),
"yellow_block": ([20,100,100], [35,255,255]),
"purple_block": ([130,50,100], [160,255,255]),
}
hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV)
h, w = rgb.shape[:2]
objects = []
for name, (lower, upper) in COLOR_RANGES.items():
mask = cv2.inRange(hsv, np.array(lower), np.array(upper))
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
area = cv2.contourArea(cnt)
if area < 100: # too small, ignore
continue
M = cv2.moments(cnt)
if M["m00"] == 0:
continue
cx = M["m10"] / M["m00"]
cy = M["m01"] / M["m00"]
# Normalize to [-0.3, 0.3] workspace coords (rough)
x3d = (cx / w - 0.5) * 0.6
y3d = -(cy / h - 0.5) * 0.6
z3d = float(depth[int(cy), int(cx)]) if depth is not None else 0.85
objects.append({
"name": name, "x": x3d, "y": y3d, "z": z3d,
"confidence": min(area / 2000.0, 1.0),
"reachable": True,
})
break # one detection per color class
return VisionResult(
detected_objects=objects,
gripper_pos=None,
gripper_open=None,
depth_map=depth,
)
def _pixel_to_world(cx: float, cy: float, z: float,
K: np.ndarray) -> tuple[float, float]:
"""Back-project a pixel to world XY using camera intrinsics K."""
fx, fy = K[0, 0], K[1, 1]
px, py = K[0, 2], K[1, 2]
x = (cx - px) * z / fx
y = (cy - py) * z / fy
return x, y
def _map_class_to_block(cls_name: str) -> str:
"""Map YOLO class name to our block naming convention."""
mapping = {
"cup": "red_block", "bottle": "blue_block",
"bowl": "green_block", "box": "yellow_block",
"block": "red_block", # generic
}
return mapping.get(cls_name.lower(), f"{cls_name}_block")
# ── Real camera: same interface, real hardware ────────────────────────
def real_camera_vision(camera_feed) -> VisionResult:
"""
Same perception pipeline but reading from a real camera.
camera_feed: OpenCV VideoCapture or similar.
This is what you'd run on a real robot deployment.
The symbolic state it produces is identical to stub_vision output,
which is why a policy trained in stub mode transfers.
"""
import cv2
ret, frame = camera_feed.read()
if not ret:
return VisionResult([], None, None, None)
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return sim_vision(rgb)