Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import tempfile | |
| import imageio | |
| import numpy as np | |
| import mujoco | |
| # Extract final object rotations in Euler angles for interpretability (optional) | |
| from scipy.spatial.transform import Rotation as R | |
| MODEL_PATH = "assets/tray.xml" | |
| N_OBJECTS = 1 # Single puck (number of dynamic blocks to randomize) | |
| PUSH_START_STEP = 50 | |
| SIM_STEPS = 250 #200 | |
| IMPACT_STEP = 60 # is a good starting point, just after the pusher activates. | |
| def quat_to_rotmat(q): | |
| """Convert quaternion to rotation matrix.""" | |
| w, x, y, z = q | |
| return np.array([ | |
| [1 - 2*y**2 - 2*z**2, 2*x*y - 2*z*w, 2*x*z + 2*y*w], | |
| [2*x*y + 2*z*w, 1 - 2*x**2 - 2*z**2, 2*y*z - 2*x*w], | |
| [2*x*z - 2*y*w, 2*y*z + 2*x*w, 1 - 2*x**2 - 2*y**2] | |
| ]) | |
| def run_tray_simulation( | |
| seed=0, | |
| num_objects=N_OBJECTS, | |
| azimuth=45, | |
| elevation=-25, | |
| distance=1.0, | |
| unique_id=None # Unique file naming | |
| ): | |
| np.random.seed(seed) | |
| # Explicitly reset the simulation state | |
| model = mujoco.MjModel.from_xml_path(MODEL_PATH) | |
| data = mujoco.MjData(model) | |
| #mujoco.mj_resetData(model, data) | |
| mujoco.mj_resetDataKeyframe(model, data, 0) # Reset the state to keyframe 0 | |
| # Object naming convention: puck, obj0, obj1, ... | |
| object_names = ["puck"] + [f"obj{i}" for i in range(num_objects - 1)] | |
| # Validate all objects exist | |
| for name in object_names: | |
| if mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, name) == -1: | |
| raise ValueError(f"Body '{name}' not found in model") | |
| # Validate all objects exist and get joint addresses | |
| # Resolve joint offsets | |
| joint_qpos_addrs = [] | |
| joint_qvel_addrs = [] | |
| for name in object_names: | |
| body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, name) # assumes <joint> has no name | |
| if body_id == -1: | |
| raise ValueError(f"Body '{name}' not found in model") | |
| dof_addr = model.body_dofadr[body_id] | |
| joint_addr = model.body_jntadr[body_id] | |
| # Confirm it's a free joint | |
| if model.jnt_type[model.body_jntadr[body_id]] != mujoco.mjtJoint.mjJNT_FREE: | |
| raise ValueError(f"Body '{name}' does not have a free joint") | |
| joint_qpos_addrs.append(model.jnt_qposadr[joint_addr]) | |
| joint_qvel_addrs.append(model.jnt_dofadr[joint_addr]) | |
| # Extract puck's initial position and quaternion from the model | |
| puck_body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "puck") | |
| puck_pos = model.body_pos[puck_body_id].copy() # Extract initial position | |
| puck_quat = model.body_quat[puck_body_id].copy() # Extract initial quaternion | |
| puck_vel_idx = joint_qvel_addrs[0] | |
| #tray_id = mujoco.mj_name2id( | |
| # model, | |
| # mujoco.mjtObj.mjOBJ_BODY, | |
| # "tray" | |
| #) | |
| # Apply a transient force to the tray | |
| #tray_force_applied = True # False # One-time trigger | |
| # Locate pusher DOF index (last one in qpos if you have 10 objects) | |
| # Compute DOF offsets | |
| # Set initial position and orientation for the puck | |
| data.qpos[joint_qpos_addrs[0]:joint_qpos_addrs[0]+3] = puck_pos | |
| data.qpos[joint_qpos_addrs[0]+3:joint_qpos_addrs[0]+7] = puck_quat | |
| # Push velocity directed toward center | |
| # Compute velocity towards center | |
| target = np.array([0.0, 0.0, 0.05]) # center of tray at surface level (or slightly outside tray corner) | |
| direction = target - puck_pos | |
| direction[2] = 0.0 | |
| norm = np.linalg.norm(direction) | |
| if norm > 1e-8: | |
| direction /= norm | |
| else: | |
| direction = np.array([1.0, 0.0, 0.0]) # arbitrary direction to avoid divide-by-zero | |
| direction /= np.linalg.norm(direction) | |
| push_velocity = 0.15 * direction # Adjust magnitude | |
| # Renderer setup | |
| renderer = mujoco.Renderer(model, width=480, height=480) # width=640, height=640 | |
| # Custom camera parameters | |
| scene_option = mujoco.MjvOption() | |
| cam = mujoco.MjvCamera() | |
| cam.type = mujoco.mjtCamera.mjCAMERA_FREE | |
| cam.lookat[:] = np.array([0.0, 0.0, 0.02]) # Center of the tray | |
| cam.distance = distance # Zoom | |
| cam.azimuth = azimuth # Yaw angle | |
| cam.elevation = elevation # Pitch angle | |
| frames = [] | |
| keyframes = {0: None, 25: None, 50: None, 200: None} | |
| for t in range(SIM_STEPS): | |
| if t == PUSH_START_STEP: | |
| # Activate pusher | |
| data.qvel[puck_vel_idx:puck_vel_idx+3] = push_velocity | |
| mujoco.mj_step(model, data) | |
| renderer.update_scene(data, camera=cam, scene_option=scene_option) | |
| img = renderer.render() | |
| if t in keyframes: | |
| imageio.imwrite(f"/tmp/frame_{t}_{unique_id}.png", img) | |
| frames.append(img) | |
| renderer.close() | |
| # Orientation extraction | |
| euler_angles = [] | |
| for i in range(num_objects): | |
| quat = data.qpos[joint_qpos_addrs[i]+3:joint_qpos_addrs[i]+7] # qw, qx, qy, qz | |
| # Convert to Euler (yaw-pitch-roll) | |
| rot = R.from_quat([quat[1], quat[2], quat[3], quat[0]]) # MuJoCo uses [qw, qx, qy, qz] | |
| euler = rot.as_euler('zyx', degrees=True) | |
| euler_angles.append(euler.tolist()) | |
| # Fully structured snapshot of: Positions, | |
| # Orientations (in both quaternion and Euler angle forms), | |
| # Linear and angular velocities | |
| qpos = np.array([data.qpos[addr:addr+7] for addr in joint_qpos_addrs]) | |
| qvel = np.array([data.qvel[addr:addr+6] for addr in joint_qvel_addrs]) | |
| physics_state = { | |
| "positions_xyz": qpos[:, :3].tolist(), | |
| "orientations_quat": qpos[:, 3:].tolist(), | |
| "orientations_euler": euler_angles, | |
| "velocities_linear": qvel[:, :3].tolist(), | |
| "velocities_angular": qvel[:, 3:].tolist(), | |
| } | |
| # Save JSON Snapshot with unique ID (optional, for LLM input/debugging) | |
| json_path = os.path.join(tempfile.gettempdir(), f"tray_sim_{unique_id}_state.json") | |
| with open(json_path, "w") as f: | |
| json.dump(physics_state, f, indent=2) | |
| # Save GIF with unique ID | |
| gif_path = os.path.join(tempfile.gettempdir(), f"tray_sim_{unique_id}.gif") | |
| imageio.mimsave(gif_path, frames, fps=20) | |
| print(f"Bodies: {model.nbody}, qpos size: {model.nq}, qvel size: {model.nv}") | |
| for i in range(model.nbody): | |
| name = mujoco.mj_id2name(model, mujoco.mjtObj.mjOBJ_BODY, i) | |
| if name: | |
| print(f"Body {i}: {name}, xpos: {data.xpos[i]}") | |
| return gif_path, physics_state, json_path |