| """ |
| Protocol type definitions for Nova-Sim WebSocket API. |
| |
| This module defines the structure of all WebSocket messages exchanged between |
| clients and the server, including both request and response messages. |
| """ |
|
|
| from typing import TypedDict, Literal, Optional, Union, List |
|
|
|
|
| |
| |
| |
|
|
| RobotType = Literal["g1", "spot", "ur5", "ur5_t_push"] |
| SceneType = Optional[str] |
| ControlMode = Literal["ik", "joint"] |
|
|
|
|
| |
| |
| |
|
|
| class ActionData(TypedDict, total=False): |
| """Velocity-based action commands for robot control. |
| |
| For locomotion robots (G1, Spot): |
| - vx: Forward/backward velocity [-1, 1] |
| - vy: Left/right strafe velocity [-1, 1] |
| - vyaw: Turn rate [-1, 1] |
| |
| For robot arms (UR5): |
| - vx, vy, vz: Cartesian translation velocities (m/s) |
| - vrx, vry, vrz: Cartesian rotation velocities (rad/s) |
| - j1-j6: Joint velocities (rad/s) |
| - gripper: Gripper position [0-255] |
| """ |
| |
| vx: float |
| vy: float |
| vz: float |
|
|
| |
| vyaw: float |
|
|
| |
| vrx: float |
| vry: float |
| vrz: float |
|
|
| |
| j1: float |
| j2: float |
| j3: float |
| j4: float |
| j5: float |
| j6: float |
|
|
| |
| gripper: float |
|
|
|
|
| class ActionMessage(TypedDict): |
| """Action command message for all robots.""" |
| type: Literal["action"] |
| data: ActionData |
|
|
|
|
| class TeleopActionData(TypedDict, total=False): |
| """Teleoperation action data (backward compatible with old teleop_command).""" |
| vx: float |
| vy: float |
| vz: float |
| |
| dx: float |
| dy: float |
| dz: float |
|
|
|
|
| class TeleopActionMessage(TypedDict): |
| """Teleoperation action message (UR5 keyboard control).""" |
| type: Literal["teleop_action"] |
| data: TeleopActionData |
|
|
|
|
| |
| |
| |
|
|
| class ResetData(TypedDict, total=False): |
| """Reset environment data.""" |
| seed: Optional[int] |
|
|
|
|
| class ResetMessage(TypedDict): |
| """Reset environment message.""" |
| type: Literal["reset"] |
| data: ResetData |
|
|
|
|
| class SwitchRobotData(TypedDict): |
| """Switch robot data.""" |
| robot: RobotType |
| scene: Optional[str] |
|
|
|
|
| class SwitchRobotMessage(TypedDict): |
| """Switch robot message.""" |
| type: Literal["switch_robot"] |
| data: SwitchRobotData |
|
|
|
|
| class CameraRotateData(TypedDict): |
| """Camera rotation data.""" |
| action: Literal["rotate"] |
| dx: float |
| dy: float |
|
|
|
|
| class CameraZoomData(TypedDict): |
| """Camera zoom data.""" |
| action: Literal["zoom"] |
| dz: float |
|
|
|
|
| class CameraPanData(TypedDict): |
| """Camera pan data.""" |
| action: Literal["pan"] |
| dx: float |
| dy: float |
|
|
|
|
| class CameraSetDistanceData(TypedDict): |
| """Camera set distance data.""" |
| action: Literal["set_distance"] |
| distance: float |
|
|
|
|
| CameraData = Union[CameraRotateData, CameraZoomData, CameraPanData, CameraSetDistanceData] |
|
|
|
|
| class CameraMessage(TypedDict): |
| """Camera control message.""" |
| type: Literal["camera"] |
| data: CameraData |
|
|
|
|
| class CameraFollowData(TypedDict): |
| """Camera follow mode data.""" |
| follow: bool |
|
|
|
|
| class CameraFollowMessage(TypedDict): |
| """Camera follow mode message.""" |
| type: Literal["camera_follow"] |
| data: CameraFollowData |
|
|
|
|
| |
| |
| |
|
|
| class ArmTargetData(TypedDict): |
| """Arm target position data (IK mode).""" |
| x: float |
| y: float |
| z: float |
|
|
|
|
| class ArmTargetMessage(TypedDict): |
| """Set arm target position message (UR5, IK mode).""" |
| type: Literal["arm_target"] |
| data: ArmTargetData |
|
|
|
|
| class ArmOrientationData(TypedDict): |
| """Arm target orientation data (IK mode).""" |
| roll: float |
| pitch: float |
| yaw: float |
|
|
|
|
| class ArmOrientationMessage(TypedDict): |
| """Set arm target orientation message (UR5, IK mode).""" |
| type: Literal["arm_orientation"] |
| data: ArmOrientationData |
|
|
|
|
| class UseOrientationData(TypedDict): |
| """Toggle orientation control data.""" |
| enabled: bool |
|
|
|
|
| class UseOrientationMessage(TypedDict): |
| """Toggle orientation control message (UR5).""" |
| type: Literal["use_orientation"] |
| data: UseOrientationData |
|
|
|
|
| class JointPositionsData(TypedDict): |
| """Joint positions data (joint mode).""" |
| positions: List[float] |
|
|
|
|
| class JointPositionsMessage(TypedDict): |
| """Set joint positions message (UR5, joint mode).""" |
| type: Literal["joint_positions"] |
| data: JointPositionsData |
|
|
|
|
| class ControlModeData(TypedDict): |
| """Control mode data.""" |
| mode: ControlMode |
|
|
|
|
| class ControlModeMessage(TypedDict): |
| """Set control mode message (UR5).""" |
| type: Literal["control_mode"] |
| data: ControlModeData |
|
|
|
|
| class GripperData(TypedDict): |
| """Gripper control data.""" |
| action: Literal["open", "close"] |
| value: Optional[int] |
|
|
|
|
| class GripperMessage(TypedDict): |
| """Gripper control message (UR5).""" |
| type: Literal["gripper"] |
| data: GripperData |
|
|
|
|
| class NovaModeSetting(TypedDict): |
| """Nova API mode settings.""" |
| state_streaming: bool |
| ik: bool |
|
|
|
|
| class SetNovaModeData(TypedDict): |
| """Set Nova API mode data.""" |
| enabled: Optional[bool] |
| |
| state_streaming: Optional[bool] |
| ik: Optional[bool] |
|
|
|
|
| class SetNovaModeMessage(TypedDict): |
| """Set Nova API mode message (UR5).""" |
| type: Literal["set_nova_mode"] |
| data: SetNovaModeData |
|
|
|
|
| |
| |
| |
|
|
| class ClientIdentityData(TypedDict): |
| """Client identity data.""" |
| client_id: str |
|
|
|
|
| class ClientIdentityMessage(TypedDict): |
| """Client identity handshake message.""" |
| type: Literal["client_identity"] |
| data: ClientIdentityData |
|
|
|
|
| class ClientNotificationData(TypedDict, total=False): |
| """Client notification data.""" |
| message: str |
| level: Literal["info", "warning", "error"] |
|
|
|
|
| class ClientNotificationMessage(TypedDict): |
| """Client notification message.""" |
| type: Literal["client_notification"] |
| data: ClientNotificationData |
|
|
|
|
| class EpisodeControlData(TypedDict): |
| """Episode control data.""" |
| action: Literal["terminate", "truncate"] |
|
|
|
|
| class EpisodeControlMessage(TypedDict): |
| """Episode control message (trainer only).""" |
| type: Literal["episode_control"] |
| data: EpisodeControlData |
|
|
|
|
| |
| |
| |
|
|
| class Position(TypedDict): |
| """3D position.""" |
| x: float |
| y: float |
| z: float |
|
|
|
|
| class Quaternion(TypedDict): |
| """Quaternion orientation.""" |
| w: float |
| x: float |
| y: float |
| z: float |
|
|
|
|
| class EulerAngles(TypedDict): |
| """Euler angles orientation.""" |
| roll: float |
| pitch: float |
| yaw: float |
|
|
|
|
| class SceneObject(TypedDict): |
| """Scene object information (position and orientation).""" |
| name: str |
| position: Position |
| orientation: Quaternion |
|
|
|
|
| class LocomotionObservation(TypedDict): |
| """Observation data for locomotion robots (G1, Spot).""" |
| position: Position |
| orientation: Quaternion |
|
|
|
|
| class UR5Observation(TypedDict): |
| """Observation data for UR5 robot arm.""" |
| end_effector: Position |
| ee_orientation: Quaternion |
| ee_target: Position |
| ee_target_orientation: EulerAngles |
| gripper: int |
| joint_positions: List[float] |
| joint_targets: List[float] |
|
|
|
|
| Observation = Union[LocomotionObservation, UR5Observation] |
|
|
|
|
| class NovaApiStatus(TypedDict): |
| """Nova API integration status.""" |
| connected: bool |
| state_streaming: bool |
| ik: bool |
|
|
|
|
| class StateData(TypedDict, total=False): |
| """State broadcast data.""" |
| observation: Observation |
| steps: int |
| reward: float |
| teleop_action: ActionData |
| connected_clients: List[str] |
| scene_objects: List[SceneObject] |
|
|
| |
| control_mode: ControlMode |
| nova_api: NovaApiStatus |
|
|
|
|
| class StateMessage(TypedDict): |
| """State broadcast message.""" |
| type: Literal["state"] |
| data: StateData |
|
|
|
|
| class ConnectedClientsData(TypedDict): |
| """Connected clients status data.""" |
| clients: List[str] |
|
|
|
|
| class ConnectedClientsMessage(TypedDict): |
| """Connected clients broadcast message (to all clients).""" |
| type: Literal["connected_clients"] |
| data: ConnectedClientsData |
|
|
|
|
| class ClientNotificationBroadcast(TypedDict): |
| """Client notification broadcast (to all clients).""" |
| type: Literal["client_notification"] |
| data: ClientNotificationData |
|
|
|
|
| |
| |
| |
|
|
| class CameraIntrinsics(TypedDict): |
| """Camera intrinsics for pinhole camera model.""" |
| fx: float |
| fy: float |
| cx: float |
| cy: float |
| width: int |
| height: int |
| fovy_degrees: float |
|
|
|
|
| class CameraPose(TypedDict): |
| """Camera pose in world coordinates.""" |
| position: Position |
| orientation: Quaternion |
|
|
|
|
| class CameraFeed(TypedDict, total=False): |
| """Camera feed information.""" |
| name: str |
| label: str |
| url: str |
| pose: CameraPose |
| intrinsics: CameraIntrinsics |
|
|
|
|
| class EnvResponse(TypedDict, total=False): |
| """GET /env response.""" |
| robot: RobotType |
| scene: str |
| has_gripper: bool |
| control_mode: ControlMode |
| action_space: dict |
| observation_space: dict |
| camera_feeds: List[CameraFeed] |
| home_pose: List[float] |
|
|
|
|
| class CommandInfo(TypedDict): |
| """Command metadata.""" |
| name: str |
| description: str |
|
|
|
|
| class MetadataResponse(TypedDict): |
| """GET /metadata response.""" |
| robots: List[str] |
| commands: List[CommandInfo] |
|
|
|
|
| |
| |
| |
|
|
| ClientMessage = Union[ |
| ActionMessage, |
| TeleopActionMessage, |
| ResetMessage, |
| SwitchRobotMessage, |
| CameraMessage, |
| CameraFollowMessage, |
| ArmTargetMessage, |
| ArmOrientationMessage, |
| UseOrientationMessage, |
| JointPositionsMessage, |
| ControlModeMessage, |
| GripperMessage, |
| SetNovaModeMessage, |
| ClientIdentityMessage, |
| ClientNotificationMessage, |
| EpisodeControlMessage, |
| ] |
|
|
| ServerMessage = Union[ |
| StateMessage, |
| ConnectedClientsMessage, |
| ClientNotificationBroadcast, |
| ] |
|
|