jshah13 commited on
Commit
5dd283a
Β·
verified Β·
1 Parent(s): 8c8ea52

Upload server/robosim/vision.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. server/robosim/vision.py +205 -0
server/robosim/vision.py ADDED
@@ -0,0 +1,205 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vision layer β€” converts camera images into symbolic state.
3
+
4
+ This is what runs in front of the stub when you have a real camera.
5
+ The stub bypasses this entirely and gives symbolic state directly.
6
+
7
+ Three modes:
8
+ 1. Stub mode (default): skip vision, get symbolic state from sim config
9
+ 2. Sim vision mode: run perception on MuJoCo camera renders
10
+ 3. Real camera mode: run perception on actual robot camera feed
11
+
12
+ The LLM sees identical observations in all three modes.
13
+ That's the point β€” we can train in stub mode and deploy with real vision.
14
+ """
15
+ from __future__ import annotations
16
+ from dataclasses import dataclass
17
+ from typing import Optional
18
+ import numpy as np
19
+
20
+
21
+ @dataclass
22
+ class VisionResult:
23
+ """Symbolic facts extracted from an image."""
24
+ detected_objects: list[dict] # [{name, x, y, z, confidence}]
25
+ gripper_pos: Optional[np.ndarray]
26
+ gripper_open: Optional[bool]
27
+ depth_map: Optional[np.ndarray] # HxW float array, meters
28
+
29
+
30
+ # ── Stub mode: no vision, use sim ground truth ────────────────────────
31
+
32
+ def stub_vision(sim_state) -> VisionResult:
33
+ """
34
+ In stub mode, we already have ground-truth symbolic state.
35
+ No vision model needed.
36
+ """
37
+ objects = [
38
+ {
39
+ "name": name,
40
+ "x": float(obj.pos[0]),
41
+ "y": float(obj.pos[1]),
42
+ "z": float(obj.pos[2]),
43
+ "confidence": 1.0,
44
+ "reachable": obj.reachable,
45
+ }
46
+ for name, obj in sim_state.objects.items()
47
+ ]
48
+ return VisionResult(
49
+ detected_objects=objects,
50
+ gripper_pos=sim_state.gripper_pos,
51
+ gripper_open=sim_state.gripper_open,
52
+ depth_map=None,
53
+ )
54
+
55
+
56
+ # ── Sim vision: run YOLO on MuJoCo camera renders ─────────────────────
57
+
58
+ def sim_vision(rgb_image: np.ndarray, depth_image: Optional[np.ndarray] = None,
59
+ camera_matrix: Optional[np.ndarray] = None) -> VisionResult:
60
+ """
61
+ Run object detection on a rendered MuJoCo camera image.
62
+ Used when use_stub=False to extract state from the virtual camera.
63
+
64
+ rgb_image: HxWx3 uint8 array from robosuite
65
+ depth_image: HxW float array (optional, improves 3D localization)
66
+ """
67
+ try:
68
+ from ultralytics import YOLO
69
+ model = _get_yolo_model()
70
+ return _run_yolo(model, rgb_image, depth_image, camera_matrix)
71
+ except ImportError:
72
+ # YOLO not installed β€” fall back to color-based detection
73
+ return _color_detection(rgb_image, depth_image)
74
+
75
+
76
+ def _get_yolo_model():
77
+ """Load YOLO model (cached after first call)."""
78
+ from ultralytics import YOLO
79
+ if not hasattr(_get_yolo_model, "_model"):
80
+ # Use YOLOv8n (nano) β€” fast enough for real-time robot control
81
+ # For better accuracy: use yolov8m or fine-tune on robot images
82
+ _get_yolo_model._model = YOLO("yolov8n.pt")
83
+ return _get_yolo_model._model
84
+
85
+
86
+ def _run_yolo(model, rgb: np.ndarray, depth: Optional[np.ndarray],
87
+ camera_matrix: Optional[np.ndarray]) -> VisionResult:
88
+ """Run YOLO detection and convert to symbolic object list."""
89
+ results = model(rgb, verbose=False)[0]
90
+ objects = []
91
+ for box in results.boxes:
92
+ cls_name = model.names[int(box.cls[0])]
93
+ conf = float(box.conf[0])
94
+ if conf < 0.4:
95
+ continue
96
+ # Get 2D center
97
+ x1, y1, x2, y2 = box.xyxy[0].tolist()
98
+ cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
99
+ # Get 3D position if depth available
100
+ if depth is not None and camera_matrix is not None:
101
+ z = float(depth[int(cy), int(cx)])
102
+ x3d, y3d = _pixel_to_world(cx, cy, z, camera_matrix)
103
+ else:
104
+ x3d, y3d, z = 0.0, 0.0, 0.85
105
+ objects.append({
106
+ "name": _map_class_to_block(cls_name),
107
+ "x": x3d, "y": y3d, "z": z,
108
+ "confidence": conf,
109
+ "reachable": True, # blocking computed separately
110
+ })
111
+ return VisionResult(
112
+ detected_objects=objects,
113
+ gripper_pos=None, # detected separately from robot state
114
+ gripper_open=None,
115
+ depth_map=depth,
116
+ )
117
+
118
+
119
+ def _color_detection(rgb: np.ndarray, depth: Optional[np.ndarray]) -> VisionResult:
120
+ """
121
+ Simple color-based object detection when YOLO isn't available.
122
+ Works for colored blocks on a plain table surface.
123
+ """
124
+ import cv2
125
+
126
+ COLOR_RANGES = {
127
+ "red_block": ([0,100,100], [10,255,255]),
128
+ "blue_block": ([100,100,100], [130,255,255]),
129
+ "green_block": ([40,100,100], [80,255,255]),
130
+ "yellow_block": ([20,100,100], [35,255,255]),
131
+ "purple_block": ([130,50,100], [160,255,255]),
132
+ }
133
+
134
+ hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV)
135
+ h, w = rgb.shape[:2]
136
+ objects = []
137
+
138
+ for name, (lower, upper) in COLOR_RANGES.items():
139
+ mask = cv2.inRange(hsv, np.array(lower), np.array(upper))
140
+ contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
141
+ for cnt in contours:
142
+ area = cv2.contourArea(cnt)
143
+ if area < 100: # too small, ignore
144
+ continue
145
+ M = cv2.moments(cnt)
146
+ if M["m00"] == 0:
147
+ continue
148
+ cx = M["m10"] / M["m00"]
149
+ cy = M["m01"] / M["m00"]
150
+ # Normalize to [-0.3, 0.3] workspace coords (rough)
151
+ x3d = (cx / w - 0.5) * 0.6
152
+ y3d = -(cy / h - 0.5) * 0.6
153
+ z3d = float(depth[int(cy), int(cx)]) if depth is not None else 0.85
154
+ objects.append({
155
+ "name": name, "x": x3d, "y": y3d, "z": z3d,
156
+ "confidence": min(area / 2000.0, 1.0),
157
+ "reachable": True,
158
+ })
159
+ break # one detection per color class
160
+
161
+ return VisionResult(
162
+ detected_objects=objects,
163
+ gripper_pos=None,
164
+ gripper_open=None,
165
+ depth_map=depth,
166
+ )
167
+
168
+
169
+ def _pixel_to_world(cx: float, cy: float, z: float,
170
+ K: np.ndarray) -> tuple[float, float]:
171
+ """Back-project a pixel to world XY using camera intrinsics K."""
172
+ fx, fy = K[0, 0], K[1, 1]
173
+ px, py = K[0, 2], K[1, 2]
174
+ x = (cx - px) * z / fx
175
+ y = (cy - py) * z / fy
176
+ return x, y
177
+
178
+
179
+ def _map_class_to_block(cls_name: str) -> str:
180
+ """Map YOLO class name to our block naming convention."""
181
+ mapping = {
182
+ "cup": "red_block", "bottle": "blue_block",
183
+ "bowl": "green_block", "box": "yellow_block",
184
+ "block": "red_block", # generic
185
+ }
186
+ return mapping.get(cls_name.lower(), f"{cls_name}_block")
187
+
188
+
189
+ # ── Real camera: same interface, real hardware ────────────────────────
190
+
191
+ def real_camera_vision(camera_feed) -> VisionResult:
192
+ """
193
+ Same perception pipeline but reading from a real camera.
194
+ camera_feed: OpenCV VideoCapture or similar.
195
+
196
+ This is what you'd run on a real robot deployment.
197
+ The symbolic state it produces is identical to stub_vision output,
198
+ which is why a policy trained in stub mode transfers.
199
+ """
200
+ import cv2
201
+ ret, frame = camera_feed.read()
202
+ if not ret:
203
+ return VisionResult([], None, None, None)
204
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
205
+ return sim_vision(rgb)