| | import numpy as np |
| | import numpy.core.multiarray as multiarray |
| | import torch |
| | import torch.nn as nn |
| | from huggingface_hub import hf_hub_download |
| | from torch.serialization import add_safe_globals |
| |
|
| | from diffusers import DDPMScheduler, UNet1DModel |
| |
|
| |
|
| | add_safe_globals( |
| | [ |
| | multiarray._reconstruct, |
| | np.ndarray, |
| | np.dtype, |
| | np.dtype(np.float32).type, |
| | np.dtype(np.float64).type, |
| | np.dtype(np.int32).type, |
| | np.dtype(np.int64).type, |
| | type(np.dtype(np.float32)), |
| | type(np.dtype(np.float64)), |
| | type(np.dtype(np.int32)), |
| | type(np.dtype(np.int64)), |
| | ] |
| | ) |
| |
|
| | """ |
| | An example of using HuggingFace's diffusers library for diffusion policy, |
| | generating smooth movement trajectories. |
| | |
| | This implements a robot control model for pushing a T-shaped block into a target area. |
| | The model takes in the robot arm position, block position, and block angle, |
| | then outputs a sequence of 16 (x,y) positions for the robot arm to follow. |
| | """ |
| |
|
| |
|
| | class ObservationEncoder(nn.Module): |
| | """ |
| | Converts raw robot observations (positions/angles) into a more compact representation |
| | |
| | state_dim (int): Dimension of the input state vector (default: 5) |
| | [robot_x, robot_y, block_x, block_y, block_angle] |
| | |
| | - Input shape: (batch_size, state_dim) |
| | - Output shape: (batch_size, 256) |
| | """ |
| |
|
| | def __init__(self, state_dim): |
| | super().__init__() |
| | self.net = nn.Sequential(nn.Linear(state_dim, 512), nn.ReLU(), nn.Linear(512, 256)) |
| |
|
| | def forward(self, x): |
| | return self.net(x) |
| |
|
| |
|
| | class ObservationProjection(nn.Module): |
| | """ |
| | Takes the encoded observation and transforms it into 32 values that represent the current robot/block situation. |
| | These values are used as additional contextual information during the diffusion model's trajectory generation. |
| | |
| | - Input: 256-dim vector (padded to 512) |
| | Shape: (batch_size, 256) |
| | - Output: 32 contextual information values for the diffusion model |
| | Shape: (batch_size, 32) |
| | """ |
| |
|
| | def __init__(self): |
| | super().__init__() |
| | self.weight = nn.Parameter(torch.randn(32, 512)) |
| | self.bias = nn.Parameter(torch.zeros(32)) |
| |
|
| | def forward(self, x): |
| | if x.size(-1) == 256: |
| | x = torch.cat([x, torch.zeros(*x.shape[:-1], 256, device=x.device)], dim=-1) |
| | return nn.functional.linear(x, self.weight, self.bias) |
| |
|
| |
|
| | class DiffusionPolicy: |
| | """ |
| | Implements diffusion policy for generating robot arm trajectories. |
| | Uses diffusion to generate sequences of positions for a robot arm, conditioned on |
| | the current state of the robot and the block it needs to push. |
| | |
| | The model expects observations in pixel coordinates (0-512 range) and block angle in radians. |
| | It generates trajectories as sequences of (x,y) coordinates also in the 0-512 range. |
| | """ |
| |
|
| | def __init__(self, state_dim=5, device="cpu"): |
| | self.device = device |
| |
|
| | |
| | self.stats = { |
| | "obs": {"min": torch.zeros(5), "max": torch.tensor([512, 512, 512, 512, 2 * np.pi])}, |
| | "action": {"min": torch.zeros(2), "max": torch.full((2,), 512)}, |
| | } |
| |
|
| | self.obs_encoder = ObservationEncoder(state_dim).to(device) |
| | self.obs_projection = ObservationProjection().to(device) |
| |
|
| | |
| | |
| | |
| | self.model = UNet1DModel( |
| | sample_size=16, |
| | in_channels=34, |
| | out_channels=2, |
| | layers_per_block=2, |
| | block_out_channels=(128,), |
| | down_block_types=("DownBlock1D",), |
| | up_block_types=("UpBlock1D",), |
| | ).to(device) |
| |
|
| | |
| | self.noise_scheduler = DDPMScheduler( |
| | num_train_timesteps=100, |
| | beta_schedule="squaredcos_cap_v2", |
| | ) |
| |
|
| | |
| | checkpoint = torch.load( |
| | hf_hub_download("dorsar/diffusion_policy", "push_tblock.pt"), weights_only=True, map_location=device |
| | ) |
| | self.model.load_state_dict(checkpoint["model_state_dict"]) |
| | self.obs_encoder.load_state_dict(checkpoint["encoder_state_dict"]) |
| | self.obs_projection.load_state_dict(checkpoint["projection_state_dict"]) |
| |
|
| | |
| | def normalize_data(self, data, stats): |
| | return ((data - stats["min"]) / (stats["max"] - stats["min"])) * 2 - 1 |
| |
|
| | |
| | def unnormalize_data(self, ndata, stats): |
| | return ((ndata + 1) / 2) * (stats["max"] - stats["min"]) + stats["min"] |
| |
|
| | @torch.no_grad() |
| | def predict(self, observation): |
| | """ |
| | Generates a trajectory of robot arm positions given the current state. |
| | |
| | Args: |
| | observation (torch.Tensor): Current state [robot_x, robot_y, block_x, block_y, block_angle] |
| | Shape: (batch_size, 5) |
| | |
| | Returns: |
| | torch.Tensor: Sequence of (x,y) positions for the robot arm to follow |
| | Shape: (batch_size, 16, 2) where: |
| | - 16 is the number of steps in the trajectory |
| | - 2 is the (x,y) coordinates in pixel space (0-512) |
| | |
| | The function first encodes the observation, then uses it to condition a diffusion |
| | process that gradually denoises random trajectories into smooth, purposeful movements. |
| | """ |
| | observation = observation.to(self.device) |
| | normalized_obs = self.normalize_data(observation, self.stats["obs"]) |
| |
|
| | |
| | cond = self.obs_projection(self.obs_encoder(normalized_obs)) |
| | |
| | cond = cond.view(normalized_obs.shape[0], -1, 1).expand(-1, -1, 16) |
| |
|
| | |
| | action = torch.randn((observation.shape[0], 2, 16), device=self.device) |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | self.noise_scheduler.set_timesteps(100) |
| | for t in self.noise_scheduler.timesteps: |
| | model_output = self.model(torch.cat([action, cond], dim=1), t) |
| | action = self.noise_scheduler.step(model_output.sample, t, action).prev_sample |
| |
|
| | action = action.transpose(1, 2) |
| | action = self.unnormalize_data(action, self.stats["action"]) |
| | return action |
| |
|
| |
|
| | if __name__ == "__main__": |
| | policy = DiffusionPolicy() |
| |
|
| | |
| | |
| | obs = torch.tensor( |
| | [ |
| | [ |
| | 256.0, |
| | 256.0, |
| | 200.0, |
| | 300.0, |
| | np.pi / 2, |
| | ] |
| | ] |
| | ) |
| |
|
| | action = policy.predict(obs) |
| |
|
| | print("Action shape:", action.shape) |
| | print("\nPredicted trajectory:") |
| | for i, (x, y) in enumerate(action[0]): |
| | print(f"Step {i:2d}: x={x:6.1f}, y={y:6.1f}") |
| |
|