|
|
import os |
|
|
import json |
|
|
import tempfile |
|
|
import imageio |
|
|
|
|
|
import numpy as np |
|
|
import mujoco |
|
|
|
|
|
|
|
|
from scipy.spatial.transform import Rotation as R |
|
|
|
|
|
MODEL_PATH = "assets/tray.xml" |
|
|
N_OBJECTS = 5 |
|
|
PUSH_START_STEP = 50 |
|
|
SIM_STEPS = 200 |
|
|
IMPACT_STEP = 60 |
|
|
|
|
|
|
|
|
def quat_to_rotmat(q): |
|
|
"""Convert quaternion to rotation matrix.""" |
|
|
w, x, y, z = q |
|
|
return np.array([ |
|
|
[1 - 2*y**2 - 2*z**2, 2*x*y - 2*z*w, 2*x*z + 2*y*w], |
|
|
[2*x*y + 2*z*w, 1 - 2*x**2 - 2*z**2, 2*y*z - 2*x*w], |
|
|
[2*x*z - 2*y*w, 2*y*z + 2*x*w, 1 - 2*x**2 - 2*y**2] |
|
|
]) |
|
|
|
|
|
def classify_orientation_stability(data, model, alpha_deg=45): |
|
|
""" |
|
|
Check object stability using orientation. |
|
|
Returns a list of booleans, True if the object is stable. |
|
|
""" |
|
|
stable_objects = [] |
|
|
cos_alpha = np.cos(np.deg2rad(alpha_deg)) |
|
|
g = np.array([0, 0, -1]) |
|
|
|
|
|
for i in range(N_OBJECTS): |
|
|
q = data.qpos[i*7 + 3 : i*7 + 7] |
|
|
rotmat = quat_to_rotmat(q) |
|
|
obj_z_world = rotmat[:, 2] |
|
|
cos_theta = -np.dot(g, obj_z_world) |
|
|
stable = cos_theta > cos_alpha |
|
|
stable_objects.append(stable) |
|
|
|
|
|
return stable_objects |
|
|
|
|
|
def classify_orientation_stability_verbose(data, model, alpha_deg=45): |
|
|
results = [] |
|
|
cos_alpha = np.cos(np.deg2rad(alpha_deg)) |
|
|
g = np.array([0, 0, -1]) |
|
|
|
|
|
for i in range(N_OBJECTS): |
|
|
q = data.qpos[i*7 + 3 : i*7 + 7] |
|
|
rotmat = quat_to_rotmat(q) |
|
|
obj_z_world = rotmat[:, 2] |
|
|
cos_theta = -np.dot(g, obj_z_world) |
|
|
|
|
|
if cos_theta > cos_alpha: |
|
|
results.append({"stable": True, "reason": "upright (tilt within limit)"}) |
|
|
else: |
|
|
angle_deg = np.rad2deg(np.arccos(cos_theta)) |
|
|
results.append({ |
|
|
"stable": False, |
|
|
"reason": f"tilt angle {angle_deg:.2f}° exceeds {alpha_deg}°" |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def classify_stability(data, model): |
|
|
""" |
|
|
Classify object stability based on position data. |
|
|
|
|
|
For each object in the scene: |
|
|
- Check the object's position using data.qpos[z]. |
|
|
- Determine whether the object is upright, within the tray bounds, or has fallen/rolled over. |
|
|
- Define thresholds for "stable" as: |
|
|
* z-position > 0.01 |
|
|
* x and y within tray bounds of ±0.3 |
|
|
|
|
|
Args: |
|
|
data: MuJoCo data object containing simulation state. |
|
|
model: MuJoCo model (unused in this function but typically passed alongside data). |
|
|
|
|
|
Returns: |
|
|
List of booleans indicating whether each object is stable. |
|
|
""" |
|
|
|
|
|
stable_objects = [] |
|
|
tray_bounds = 0.3 |
|
|
for i in range(N_OBJECTS): |
|
|
pos = data.qpos[i*7 : i*7 + 3] |
|
|
if abs(pos[0]) < tray_bounds and abs(pos[1]) < tray_bounds and pos[2] > 0.01: |
|
|
stable_objects.append(True) |
|
|
else: |
|
|
stable_objects.append(False) |
|
|
return stable_objects |
|
|
|
|
|
|
|
|
def classify_stability_verbose(data, model): |
|
|
flags = [] |
|
|
tray_bounds = 0.3 |
|
|
for i in range(N_OBJECTS): |
|
|
pos = data.qpos[i*7 : i*7 + 3] |
|
|
reasons = [] |
|
|
if pos[2] < 0.01: |
|
|
reasons.append("z too low (possibly toppled)") |
|
|
if abs(pos[0]) > tray_bounds or abs(pos[1]) > tray_bounds: |
|
|
reasons.append("outside tray bounds") |
|
|
if not reasons: |
|
|
flags.append({"stable": True, "reason": "object upright and within tray"}) |
|
|
else: |
|
|
flags.append({"stable": False, "reason": ", ".join(reasons)}) |
|
|
return flags |
|
|
|
|
|
|
|
|
def format_llm_prompt(physics_state): |
|
|
""" |
|
|
Converts a physics state dictionary of a simulated scene into a JSON-serializable list of objects |
|
|
(human-readable), where each object contains position, orientation, velocity, angular velocity, |
|
|
and stability information suitable for prompting a large language model (LLM). |
|
|
|
|
|
Each object is described by its: |
|
|
- 3D position |
|
|
- orientation in ZYX Euler angles (in degrees) |
|
|
- linear velocity |
|
|
- angular velocity |
|
|
- stability status (Stable or Unstable) |
|
|
|
|
|
Parameters: |
|
|
physics_state (dict): A dictionary containing the physics properties of each object, |
|
|
with the following keys: |
|
|
- "positions_xyz": List of shape (N, 3), each entry a position vector [x, y, z] |
|
|
- "orientations_euler": List of shape (N, 3), each entry an orientation vector [z, y, x] in degrees |
|
|
- "velocities_linear": List of shape (N, 3), linear velocity vectors |
|
|
- "velocities_angular": List of shape (N, 3), angular velocity vectors |
|
|
- "stable_flags": List of bools indicating whether each object is stable |
|
|
|
|
|
Returns: |
|
|
List[dict]: Each dictionary represents an object with formatted fields |
|
|
suitable for JSON serialization. |
|
|
""" |
|
|
formatted = [] |
|
|
for i in range(len(physics_state["positions_xyz"])): |
|
|
obj = { |
|
|
"id": i, |
|
|
"position": np.round(physics_state["positions_xyz"][i], 3).tolist(), |
|
|
"orientation_euler_zyx_deg": np.round(physics_state["orientations_euler"][i], 1).tolist(), |
|
|
"linear_velocity": np.round(physics_state["velocities_linear"][i], 3).tolist(), |
|
|
"angular_velocity": np.round(physics_state["velocities_angular"][i], 3).tolist(), |
|
|
"status": "Stable" if physics_state["stable_flags"][i] else "Unstable" |
|
|
|
|
|
} |
|
|
formatted.append(obj) |
|
|
return formatted |
|
|
|
|
|
def format_as_natural_language_prompt(scene_description, task_description=None): |
|
|
""" |
|
|
Convert structured physics state into a natural language prompt for LLMs. |
|
|
|
|
|
Args: |
|
|
scene_description (List[dict]): Output of format_llm_prompt(). |
|
|
task_description (str, optional): Additional instruction to prepend, such as |
|
|
"Describe which blocks are unstable and why" or |
|
|
"Predict which objects might move further if the tray tilts". |
|
|
|
|
|
Returns: |
|
|
str: Natural language prompt for an LLM. |
|
|
""" |
|
|
lines = [] |
|
|
if task_description: |
|
|
lines.append(task_description.strip()) |
|
|
lines.append("") |
|
|
|
|
|
lines.append("Here is the scene summary:") |
|
|
|
|
|
for obj in scene_description: |
|
|
line = ( |
|
|
f"Object {obj['id']} is at position {obj['position']} with orientation {obj['orientation_euler_zyx_deg']} degrees. " |
|
|
f"It has linear velocity {obj['linear_velocity']} and angular velocity {obj['angular_velocity']}. " |
|
|
f"Status: {obj['status']}." |
|
|
) |
|
|
lines.append(line) |
|
|
|
|
|
return "\n".join(lines) |
|
|
|
|
|
def run_tray_simulation( |
|
|
seed=0, |
|
|
num_objects=N_OBJECTS, |
|
|
azimuth=45, |
|
|
elevation=-25, |
|
|
distance=0.6, |
|
|
unique_id=None |
|
|
): |
|
|
np.random.seed(seed) |
|
|
model = mujoco.MjModel.from_xml_path(MODEL_PATH) |
|
|
data = mujoco.MjData(model) |
|
|
|
|
|
|
|
|
mujoco.mj_resetData(model, data) |
|
|
|
|
|
tray_id = mujoco.mj_name2id( |
|
|
model, |
|
|
mujoco.mjtObj.mjOBJ_BODY, |
|
|
"tray" |
|
|
) |
|
|
|
|
|
|
|
|
tray_force_applied = True |
|
|
|
|
|
|
|
|
|
|
|
pusher_idx = num_objects * 7 |
|
|
pusher_vel_idx = num_objects * 6 |
|
|
|
|
|
|
|
|
pusher_pos = np.array([0.35, -0.35, 0.08]) |
|
|
pusher_quat = [1, 0, 0, 0] |
|
|
data.qpos[pusher_idx:pusher_idx+3] = pusher_pos |
|
|
data.qpos[pusher_idx+3:pusher_idx+7] = pusher_quat |
|
|
|
|
|
|
|
|
target = np.array([0.0, 0.0, 0.05]) |
|
|
direction = target - pusher_pos |
|
|
direction[2] = 0.0 |
|
|
direction /= np.linalg.norm(direction) |
|
|
push_velocity = 0.15 * direction |
|
|
|
|
|
|
|
|
for i in range(num_objects): |
|
|
obj_start = i * 7 |
|
|
pos = np.random.uniform(low=[-0.1, -0.1, 0.05], high=[0.1, 0.1, 0.15]) |
|
|
quat = [1, 0, 0, 0] |
|
|
data.qpos[obj_start:obj_start+3] = pos |
|
|
data.qpos[obj_start+3:obj_start+7] = quat |
|
|
data.qvel[i*6:i*6+6] = 0 |
|
|
print(f"Object {i} initial position: {pos}") |
|
|
|
|
|
|
|
|
renderer = mujoco.Renderer(model, width=480, height=480) |
|
|
|
|
|
|
|
|
scene_option = mujoco.MjvOption() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cam = mujoco.MjvCamera() |
|
|
cam.type = mujoco.mjtCamera.mjCAMERA_FREE |
|
|
cam.lookat[:] = np.array([0.0, 0.0, 0.02]) |
|
|
cam.distance = distance |
|
|
cam.azimuth = azimuth |
|
|
cam.elevation = elevation |
|
|
|
|
|
frames = [] |
|
|
keyframes = {0: None, 25: None, 50: None, 200: None} |
|
|
|
|
|
for t in range(SIM_STEPS): |
|
|
if t == PUSH_START_STEP: |
|
|
|
|
|
|
|
|
data.qvel[pusher_vel_idx:pusher_vel_idx+3] = push_velocity |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mujoco.mj_step(model, data) |
|
|
|
|
|
renderer.update_scene( |
|
|
data, |
|
|
camera=cam, |
|
|
scene_option=scene_option |
|
|
) |
|
|
img = renderer.render() |
|
|
|
|
|
if t in keyframes: |
|
|
|
|
|
imageio.imwrite(f"/tmp/frame_{t}_{unique_id}.png", img) |
|
|
|
|
|
frames.append(img) |
|
|
|
|
|
renderer.close() |
|
|
|
|
|
stability_flags = classify_stability(data, model) |
|
|
|
|
|
|
|
|
print("Stability:", stability_flags) |
|
|
|
|
|
euler_angles = [] |
|
|
for i in range(num_objects): |
|
|
quat = data.qpos[i*7+3 : i*7+7] |
|
|
|
|
|
rot = R.from_quat([quat[1], quat[2], quat[3], quat[0]]) |
|
|
euler = rot.as_euler('zyx', degrees=True) |
|
|
euler_angles.append(euler.tolist()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
physics_state = { |
|
|
"positions_xyz": data.qpos[:num_objects * 7].reshape(num_objects, 7)[:, :3].tolist(), |
|
|
"orientations_quat": data.qpos[:num_objects * 7].reshape(num_objects, 7)[:, 3:].tolist(), |
|
|
"orientations_euler": euler_angles, |
|
|
"velocities_linear": data.qvel[:num_objects * 6].reshape(num_objects, 6)[:, :3].tolist(), |
|
|
"velocities_angular": data.qvel[:num_objects * 6].reshape(num_objects, 6)[:, 3:].tolist(), |
|
|
"stable_flags": stability_flags, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
json_path = os.path.join(tempfile.gettempdir(), f"tray_sim_{unique_id}_state.json") |
|
|
with open(json_path, "w") as f: |
|
|
json.dump(physics_state, f, indent=2) |
|
|
|
|
|
|
|
|
formatted = format_llm_prompt(physics_state) |
|
|
prompt = format_as_natural_language_prompt( |
|
|
formatted, |
|
|
task_description="Explain which objects are likely to fall if the tray is tilted slightly to the right." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gif_path = os.path.join(tempfile.gettempdir(), f"tray_sim_{unique_id}.gif") |
|
|
imageio.mimsave(gif_path, frames, fps=20) |
|
|
|
|
|
return gif_path, stability_flags, physics_state, prompt, json_path |