Spaces:
Sleeping
Sleeping
| """ | |
| Vision layer β converts camera images into symbolic state. | |
| This is what runs in front of the stub when you have a real camera. | |
| The stub bypasses this entirely and gives symbolic state directly. | |
| Three modes: | |
| 1. Stub mode (default): skip vision, get symbolic state from sim config | |
| 2. Sim vision mode: run perception on MuJoCo camera renders | |
| 3. Real camera mode: run perception on actual robot camera feed | |
| The LLM sees identical observations in all three modes. | |
| That's the point β we can train in stub mode and deploy with real vision. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| import numpy as np | |
| class VisionResult: | |
| """Symbolic facts extracted from an image.""" | |
| detected_objects: list[dict] # [{name, x, y, z, confidence}] | |
| gripper_pos: Optional[np.ndarray] | |
| gripper_open: Optional[bool] | |
| depth_map: Optional[np.ndarray] # HxW float array, meters | |
| # ββ Stub mode: no vision, use sim ground truth ββββββββββββββββββββββββ | |
| def stub_vision(sim_state) -> VisionResult: | |
| """ | |
| In stub mode, we already have ground-truth symbolic state. | |
| No vision model needed. | |
| """ | |
| objects = [ | |
| { | |
| "name": name, | |
| "x": float(obj.pos[0]), | |
| "y": float(obj.pos[1]), | |
| "z": float(obj.pos[2]), | |
| "confidence": 1.0, | |
| "reachable": obj.reachable, | |
| } | |
| for name, obj in sim_state.objects.items() | |
| ] | |
| return VisionResult( | |
| detected_objects=objects, | |
| gripper_pos=sim_state.gripper_pos, | |
| gripper_open=sim_state.gripper_open, | |
| depth_map=None, | |
| ) | |
| # ββ Sim vision: run YOLO on MuJoCo camera renders βββββββββββββββββββββ | |
| def sim_vision(rgb_image: np.ndarray, depth_image: Optional[np.ndarray] = None, | |
| camera_matrix: Optional[np.ndarray] = None) -> VisionResult: | |
| """ | |
| Run object detection on a rendered MuJoCo camera image. | |
| Used when use_stub=False to extract state from the virtual camera. | |
| rgb_image: HxWx3 uint8 array from robosuite | |
| depth_image: HxW float array (optional, improves 3D localization) | |
| """ | |
| try: | |
| from ultralytics import YOLO | |
| model = _get_yolo_model() | |
| return _run_yolo(model, rgb_image, depth_image, camera_matrix) | |
| except ImportError: | |
| # YOLO not installed β fall back to color-based detection | |
| return _color_detection(rgb_image, depth_image) | |
| def _get_yolo_model(): | |
| """Load YOLO model (cached after first call).""" | |
| from ultralytics import YOLO | |
| if not hasattr(_get_yolo_model, "_model"): | |
| # Use YOLOv8n (nano) β fast enough for real-time robot control | |
| # For better accuracy: use yolov8m or fine-tune on robot images | |
| _get_yolo_model._model = YOLO("yolov8n.pt") | |
| return _get_yolo_model._model | |
| def _run_yolo(model, rgb: np.ndarray, depth: Optional[np.ndarray], | |
| camera_matrix: Optional[np.ndarray]) -> VisionResult: | |
| """Run YOLO detection and convert to symbolic object list.""" | |
| results = model(rgb, verbose=False)[0] | |
| objects = [] | |
| for box in results.boxes: | |
| cls_name = model.names[int(box.cls[0])] | |
| conf = float(box.conf[0]) | |
| if conf < 0.4: | |
| continue | |
| # Get 2D center | |
| x1, y1, x2, y2 = box.xyxy[0].tolist() | |
| cx, cy = (x1 + x2) / 2, (y1 + y2) / 2 | |
| # Get 3D position if depth available | |
| if depth is not None and camera_matrix is not None: | |
| z = float(depth[int(cy), int(cx)]) | |
| x3d, y3d = _pixel_to_world(cx, cy, z, camera_matrix) | |
| else: | |
| x3d, y3d, z = 0.0, 0.0, 0.85 | |
| objects.append({ | |
| "name": _map_class_to_block(cls_name), | |
| "x": x3d, "y": y3d, "z": z, | |
| "confidence": conf, | |
| "reachable": True, # blocking computed separately | |
| }) | |
| return VisionResult( | |
| detected_objects=objects, | |
| gripper_pos=None, # detected separately from robot state | |
| gripper_open=None, | |
| depth_map=depth, | |
| ) | |
| def _color_detection(rgb: np.ndarray, depth: Optional[np.ndarray]) -> VisionResult: | |
| """ | |
| Simple color-based object detection when YOLO isn't available. | |
| Works for colored blocks on a plain table surface. | |
| """ | |
| import cv2 | |
| COLOR_RANGES = { | |
| "red_block": ([0,100,100], [10,255,255]), | |
| "blue_block": ([100,100,100], [130,255,255]), | |
| "green_block": ([40,100,100], [80,255,255]), | |
| "yellow_block": ([20,100,100], [35,255,255]), | |
| "purple_block": ([130,50,100], [160,255,255]), | |
| } | |
| hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV) | |
| h, w = rgb.shape[:2] | |
| objects = [] | |
| for name, (lower, upper) in COLOR_RANGES.items(): | |
| mask = cv2.inRange(hsv, np.array(lower), np.array(upper)) | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| for cnt in contours: | |
| area = cv2.contourArea(cnt) | |
| if area < 100: # too small, ignore | |
| continue | |
| M = cv2.moments(cnt) | |
| if M["m00"] == 0: | |
| continue | |
| cx = M["m10"] / M["m00"] | |
| cy = M["m01"] / M["m00"] | |
| # Normalize to [-0.3, 0.3] workspace coords (rough) | |
| x3d = (cx / w - 0.5) * 0.6 | |
| y3d = -(cy / h - 0.5) * 0.6 | |
| z3d = float(depth[int(cy), int(cx)]) if depth is not None else 0.85 | |
| objects.append({ | |
| "name": name, "x": x3d, "y": y3d, "z": z3d, | |
| "confidence": min(area / 2000.0, 1.0), | |
| "reachable": True, | |
| }) | |
| break # one detection per color class | |
| return VisionResult( | |
| detected_objects=objects, | |
| gripper_pos=None, | |
| gripper_open=None, | |
| depth_map=depth, | |
| ) | |
| def _pixel_to_world(cx: float, cy: float, z: float, | |
| K: np.ndarray) -> tuple[float, float]: | |
| """Back-project a pixel to world XY using camera intrinsics K.""" | |
| fx, fy = K[0, 0], K[1, 1] | |
| px, py = K[0, 2], K[1, 2] | |
| x = (cx - px) * z / fx | |
| y = (cy - py) * z / fy | |
| return x, y | |
| def _map_class_to_block(cls_name: str) -> str: | |
| """Map YOLO class name to our block naming convention.""" | |
| mapping = { | |
| "cup": "red_block", "bottle": "blue_block", | |
| "bowl": "green_block", "box": "yellow_block", | |
| "block": "red_block", # generic | |
| } | |
| return mapping.get(cls_name.lower(), f"{cls_name}_block") | |
| # ββ Real camera: same interface, real hardware ββββββββββββββββββββββββ | |
| def real_camera_vision(camera_feed) -> VisionResult: | |
| """ | |
| Same perception pipeline but reading from a real camera. | |
| camera_feed: OpenCV VideoCapture or similar. | |
| This is what you'd run on a real robot deployment. | |
| The symbolic state it produces is identical to stub_vision output, | |
| which is why a policy trained in stub mode transfers. | |
| """ | |
| import cv2 | |
| ret, frame = camera_feed.read() | |
| if not ret: | |
| return VisionResult([], None, None, None) | |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| return sim_vision(rgb) | |