jshah13 commited on
Commit
b9f29f4
Β·
verified Β·
1 Parent(s): cc00a9d

Upload server/robosim/sim_wrapper.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. server/robosim/sim_wrapper.py +358 -0
server/robosim/sim_wrapper.py ADDED
@@ -0,0 +1,358 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ SimWrapper β€” physics backend for RoboReplan.
3
+
4
+ Two modes:
5
+ use_stub=False Real MuJoCo + robosuite PickPlace environment.
6
+ Object positions, blocking, and grasp success come from
7
+ actual physics. This is what makes training meaningful.
8
+
9
+ use_stub=True Lightweight Python sim for fast local testing.
10
+ Same interface, no physics dependency.
11
+
12
+ The wrapper always exposes the same symbolic SimState so the planning
13
+ layer above never needs to know which backend is running.
14
+ """
15
+ import random
16
+ import numpy as np
17
+ from dataclasses import dataclass, field
18
+ from typing import Optional
19
+
20
+ from .randomizer import randomize_scenario, ScenarioConfig
21
+
22
+
23
+ # ── Symbolic state types ───────────────────────────────────────────────
24
+
25
+ @dataclass
26
+ class ObjectState:
27
+ name: str
28
+ pos: np.ndarray
29
+ reachable: bool = True
30
+ blocking: Optional[str] = None
31
+ in_bin: Optional[str] = None
32
+ is_held: bool = False
33
+
34
+
35
+ @dataclass
36
+ class SimState:
37
+ objects: dict = field(default_factory=dict)
38
+ gripper_pos: np.ndarray = field(default_factory=lambda: np.zeros(3))
39
+ gripper_open: bool = True
40
+ holding: Optional[str] = None
41
+
42
+
43
+ # ── Main wrapper ───────────────────────────────────────────────────────
44
+
45
+ class SimWrapper:
46
+ """
47
+ Wraps either robosuite (real) or a Python stub (fast testing).
48
+ Always produces symbolic SimState for the planning layer.
49
+ """
50
+
51
+ def __init__(self, use_stub: bool = True):
52
+ self.use_stub = use_stub
53
+ self._env = None
54
+ self._controller = None
55
+ self._state = SimState()
56
+ self._last_moved_to: Optional[str] = None
57
+ self._current_cfg: Optional[ScenarioConfig] = None
58
+ self._facing: str = "N"
59
+
60
+ if not use_stub:
61
+ self._init_robosuite()
62
+
63
+ # ── Init ───────────────────────────────────────────────────────────
64
+
65
+ def _init_robosuite(self):
66
+ """
67
+ Initialize a real robosuite PickPlace environment.
68
+
69
+ We use PickPlace because it already has:
70
+ - Panda arm (most standard research robot)
71
+ - Multiple objects with real physics
72
+ - Multiple bin targets
73
+ - OSC_POSE controller (operational space β€” moves in Cartesian coords,
74
+ which our high-level controller can use directly)
75
+ """
76
+ try:
77
+ import robosuite as suite
78
+ from robosuite.controllers import load_composite_controller_config
79
+
80
+ controller_config = load_composite_controller_config(controller="BASIC")
81
+
82
+ self._env = suite.make(
83
+ env_name="PickPlace",
84
+ robots="Panda",
85
+ controller_configs=controller_config,
86
+ has_renderer=False, # no display on server
87
+ has_offscreen_renderer=True, # needed for camera obs
88
+ use_camera_obs=True,
89
+ camera_names=["frontview", "agentview"],
90
+ camera_heights=128,
91
+ camera_widths=128,
92
+ reward_shaping=False, # we compute our own reward
93
+ control_freq=20,
94
+ single_object_mode=0, # all objects
95
+ object_type=None, # random objects
96
+ )
97
+
98
+ from .controller import MotionController
99
+ self._controller = MotionController(self._env)
100
+ print("[SimWrapper] robosuite PickPlace loaded (Panda arm)")
101
+
102
+ except ImportError:
103
+ print("[SimWrapper] robosuite not installed β†’ falling back to stub")
104
+ self.use_stub = True
105
+ except Exception as e:
106
+ print(f"[SimWrapper] robosuite init failed: {e} β†’ falling back to stub")
107
+ self.use_stub = True
108
+
109
+ # ── Reset ──────────────────────────────────────────────────────────
110
+
111
+ def reset(self, scenario: str = "random") -> tuple[SimState, ScenarioConfig]:
112
+ """Reset scene. Returns (SimState, ScenarioConfig)."""
113
+ force_blocked = random.random() < 0.6
114
+ cfg = randomize_scenario(force_blocked=force_blocked)
115
+
116
+ if not self.use_stub and self._env is not None:
117
+ self._reset_robosuite(cfg)
118
+ else:
119
+ self._build_state_from_config(cfg)
120
+
121
+ return self._state, cfg
122
+
123
+ def _reset_robosuite(self, cfg: ScenarioConfig):
124
+ """Reset robosuite and sync symbolic state from physics."""
125
+ obs = self._env.reset()
126
+ self._sync_state_from_obs(obs, cfg)
127
+
128
+ def _sync_state_from_obs(self, obs: dict, cfg: ScenarioConfig):
129
+ """
130
+ Extract symbolic state from robosuite observation dict.
131
+ Uses the perception layer to detect blocking from real 3D positions.
132
+ """
133
+ try:
134
+ from .perception import extract_scene
135
+ scene = extract_scene(
136
+ mj_data=self._env.sim.data,
137
+ mj_model=self._env.sim.model,
138
+ robot_name="robot0",
139
+ object_names=list(cfg.objects),
140
+ )
141
+ objects = {}
142
+ for name, p in scene.objects.items():
143
+ objects[name] = ObjectState(
144
+ name=name,
145
+ pos=p.pos,
146
+ reachable=p.reachable,
147
+ blocking=p.blocking,
148
+ in_bin=p.in_bin,
149
+ is_held=p.is_held,
150
+ )
151
+ self._state = SimState(
152
+ objects=objects,
153
+ gripper_pos=scene.gripper_pos,
154
+ gripper_open=scene.gripper_open,
155
+ holding=scene.holding,
156
+ )
157
+ except Exception as e:
158
+ print(f"[SimWrapper] perception sync failed: {e}, using stub fallback")
159
+ self._build_state_from_config(cfg)
160
+
161
+ self._current_cfg = cfg
162
+
163
+ # ── Stub state builder ─────────────────────────────────────────────
164
+
165
+ def get_last_moved_to(self) -> Optional[str]:
166
+ return self._last_moved_to
167
+
168
+ def _build_state_from_config(self, cfg: ScenarioConfig):
169
+ """Build stub SimState from randomized scenario config."""
170
+ self._last_moved_to = None
171
+ self._facing = "N"
172
+ objects = {}
173
+ for obj_name in cfg.objects:
174
+ x, y = cfg.positions.get(obj_name, (0.0, 0.0))
175
+ is_blocked = obj_name in cfg.blockers.values()
176
+ objects[obj_name] = ObjectState(
177
+ name=obj_name,
178
+ pos=np.array([x, y, 0.82]),
179
+ reachable=not is_blocked,
180
+ blocking=cfg.blockers.get(obj_name),
181
+ )
182
+ self._state = SimState(
183
+ objects=objects,
184
+ gripper_pos=np.array([0.0, 0.25, 1.0]),
185
+ gripper_open=True,
186
+ holding=None,
187
+ )
188
+ self._current_cfg = cfg
189
+
190
+ def get_facing(self) -> str:
191
+ return self._facing
192
+
193
+ def _cell_from_pos(self, pos: np.ndarray) -> tuple[int, int]:
194
+ x = int(round(float(pos[0]) / 0.1))
195
+ y = int(round(float(pos[1]) / 0.1))
196
+ return max(-3, min(3, x)), max(-3, min(3, y))
197
+
198
+ def _step_gripper(self, dx: int, dy: int) -> None:
199
+ s = self._state
200
+ cell_x, cell_y = self._cell_from_pos(s.gripper_pos)
201
+ nx = max(-3, min(3, cell_x + dx))
202
+ ny = max(-3, min(3, cell_y + dy))
203
+ s.gripper_pos = np.array([nx * 0.1, ny * 0.1, s.gripper_pos[2]])
204
+
205
+ def _rotate(self, clockwise: bool) -> None:
206
+ dirs = ["N", "E", "S", "W"]
207
+ idx = dirs.index(self._facing)
208
+ self._facing = dirs[(idx + (1 if clockwise else -1)) % 4]
209
+
210
+ def _is_adjacent(self, obj: ObjectState) -> bool:
211
+ gx, gy = self._cell_from_pos(self._state.gripper_pos)
212
+ ox, oy = self._cell_from_pos(obj.pos)
213
+ return abs(gx - ox) + abs(gy - oy) <= 1
214
+
215
+ # ── Execute action ─────────────────────────────────────────────────
216
+
217
+ def execute(self, action: str) -> str:
218
+ """Execute a high-level action. Returns result string."""
219
+ if not self.use_stub and self._env is not None and self._controller is not None:
220
+ return self._execute_real(action)
221
+ return self._execute_stub(action)
222
+
223
+ def _execute_real(self, action: str) -> str:
224
+ """Execute via real robosuite physics + motion controller."""
225
+ result = self._controller.execute(action)
226
+ # Re-sync symbolic state from physics
227
+ obs = self._env._get_observations()
228
+ if self._current_cfg:
229
+ self._sync_state_from_obs(obs, self._current_cfg)
230
+ return result
231
+
232
+ def _execute_stub(self, action: str) -> str:
233
+ """Execute in the lightweight Python stub."""
234
+ s = self._state
235
+
236
+ if action == "SCAN_SCENE":
237
+ return "SUCCESS"
238
+ elif action == "MOVE_NORTH":
239
+ self._step_gripper(0, 1)
240
+ return "SUCCESS"
241
+ elif action == "MOVE_SOUTH":
242
+ self._step_gripper(0, -1)
243
+ return "SUCCESS"
244
+ elif action == "MOVE_EAST":
245
+ self._step_gripper(1, 0)
246
+ return "SUCCESS"
247
+ elif action == "MOVE_WEST":
248
+ self._step_gripper(-1, 0)
249
+ return "SUCCESS"
250
+ elif action == "ROTATE_LEFT":
251
+ self._rotate(clockwise=False)
252
+ return "SUCCESS"
253
+ elif action == "ROTATE_RIGHT":
254
+ self._rotate(clockwise=True)
255
+ return "SUCCESS"
256
+
257
+ elif action.startswith("MOVE_TO_"):
258
+ color = action[len("MOVE_TO_"):].lower()
259
+ # Try "<color>_block" first (default pack), then bare name (professional packs)
260
+ name = color + "_block" if (color + "_block") in s.objects else color
261
+ if name not in s.objects:
262
+ return "FAILED_INVALID"
263
+ obj = s.objects[name]
264
+ if not obj.reachable:
265
+ return "FAILED_BLOCKED"
266
+ s.gripper_pos = obj.pos.copy() + np.array([0, 0, 0.05])
267
+ self._last_moved_to = name
268
+ return "SUCCESS"
269
+
270
+ elif action == "PICK":
271
+ if s.holding is not None:
272
+ return "FAILED_INVALID"
273
+ candidates = []
274
+ for obj in s.objects.values():
275
+ if obj.reachable and not obj.is_held and obj.in_bin is None:
276
+ dist = np.linalg.norm(s.gripper_pos[:2] - obj.pos[:2])
277
+ candidates.append((dist, obj))
278
+ # Prefer the object we last moved to
279
+ candidates.sort(key=lambda x: (
280
+ 0 if x[1].name == self._last_moved_to else 1, x[0]
281
+ ))
282
+ for _, obj in candidates:
283
+ dist = np.linalg.norm(s.gripper_pos[:2] - obj.pos[:2])
284
+ if dist < 0.25:
285
+ obj.is_held = True
286
+ s.holding = obj.name
287
+ s.gripper_open = False
288
+ self._last_moved_to = None
289
+ return "SUCCESS"
290
+ return "FAILED_EMPTY"
291
+
292
+ elif action in ("PLACE_BIN_A", "PLACE_BIN_B"):
293
+ if s.holding is None:
294
+ return "FAILED_EMPTY"
295
+ bin_name = "A" if action == "PLACE_BIN_A" else "B"
296
+ obj = s.objects[s.holding]
297
+ # If this object was blocking something, reveal that target now.
298
+ if obj.blocking and obj.blocking in s.objects:
299
+ s.objects[obj.blocking].reachable = True
300
+ obj.blocking = None
301
+ obj.in_bin = bin_name
302
+ obj.is_held = False
303
+ obj.reachable = False
304
+ s.holding = None
305
+ s.gripper_open = True
306
+ return "SUCCESS"
307
+
308
+ elif action == "CLEAR_BLOCKER":
309
+ for obj in s.objects.values():
310
+ if obj.blocking is not None and obj.reachable:
311
+ blocked_name = obj.blocking
312
+ obj.blocking = None
313
+ obj.pos = obj.pos + np.array([0.28, 0.1, 0])
314
+ if blocked_name in s.objects:
315
+ s.objects[blocked_name].reachable = True
316
+ return "SUCCESS"
317
+ return "FAILED_INVALID"
318
+
319
+ return "FAILED_INVALID"
320
+
321
+ def get_state(self) -> SimState:
322
+ return self._state
323
+
324
+ def get_camera_obs(self) -> Optional[dict]:
325
+ """
326
+ Return camera observations + vision-extracted symbolic state.
327
+
328
+ Stub mode: returns None (symbolic state comes from sim config directly)
329
+ Real mode: returns RGB images + runs vision.py to extract object positions
330
+
331
+ The planning layer above never needs to know which path ran β€”
332
+ it always receives the same symbolic SimState either way.
333
+ """
334
+ if self.use_stub:
335
+ return None # stub: symbolic state already in self._state, no camera needed
336
+
337
+ if self._env is not None:
338
+ obs = self._env._get_observations()
339
+ rgb_front = obs.get("frontview_image")
340
+ rgb_agent = obs.get("agentview_image")
341
+
342
+ # Run vision pipeline to get symbolic state from images
343
+ if rgb_front is not None and self._current_cfg is not None:
344
+ from .vision import sim_vision
345
+ vision_result = sim_vision(rgb_front)
346
+ # Merge detected positions back into symbolic state
347
+ # (perception layer updates what was set from physics)
348
+ for det in vision_result.detected_objects:
349
+ name = det["name"]
350
+ if name in self._state.objects:
351
+ self._state.objects[name].pos = np.array([det["x"], det["y"], det["z"]])
352
+
353
+ return {"frontview": rgb_front, "agentview": rgb_agent}
354
+
355
+
356
+ # ── Re-exports ─────────────────────────────────────────────────────────
357
+
358
+ __all__ = ["SimWrapper", "SimState", "ObjectState"]