Spaces:
No application file
No application file
sam133
Refactor: Restructure codebase with modular design patterns and fix orchestrator implementation
9529bc2
| from abc import ABC, abstractmethod | |
| from typing import List, Tuple, Dict, Any | |
| import pybullet as p | |
| import pybullet_data | |
| import numpy as np | |
| from ..core.robot_design import RobotDesign | |
| class SimulationStrategy(ABC): | |
| def setup_environment(self): | |
| pass | |
| def create_robot(self, design: RobotDesign) -> int: | |
| pass | |
| def run_simulation(self, duration_sec: int) -> Tuple[List[bytes], Dict[str, Any]]: | |
| pass | |
| class PyBulletSimulator(SimulationStrategy): | |
| def __init__(self): | |
| self.client_id = p.connect(p.DIRECT) | |
| p.setAdditionalSearchPath(pybullet_data.getDataPath()) | |
| self.robot_id = None | |
| self.wheel_ids = [] | |
| self.motor_ids = [] | |
| self.frames = [] | |
| def setup_environment(self): | |
| p.resetSimulation() | |
| p.setGravity(0, 0, -9.81) | |
| p.setRealTimeSimulation(0) | |
| # Create ground plane | |
| ground_id = p.createCollisionShape(p.GEOM_PLANE) | |
| p.createMultiBody(0, ground_id) | |
| # Create obstacle | |
| obstacle_height = 0.05 # 5cm obstacle | |
| obstacle_pos = [0.75, 0, obstacle_height/2] | |
| obstacle_shape = p.createCollisionShape( | |
| p.GEOM_BOX, | |
| halfExtents=[0.1, 0.5, obstacle_height/2] | |
| ) | |
| p.createMultiBody(0, obstacle_shape, basePosition=obstacle_pos) | |
| def create_robot(self, design: RobotDesign) -> int: | |
| # Create body | |
| body_specs = design.body | |
| body_collision = p.createCollisionShape( | |
| p.GEOM_BOX, | |
| halfExtents=[d/2 for d in body_specs.dimensions] | |
| ) | |
| body_visual = p.createVisualShape( | |
| p.GEOM_BOX, | |
| halfExtents=[d/2 for d in body_specs.dimensions], | |
| rgbaColor=[0.8, 0.2, 0.2, 1] | |
| ) | |
| self.robot_id = p.createMultiBody( | |
| baseMass=body_specs.mass, | |
| baseCollisionShapeIndex=body_collision, | |
| baseVisualShapeIndex=body_visual, | |
| basePosition=[0, 0, body_specs.dimensions[2]/2] | |
| ) | |
| # Create wheels | |
| for i, wheel_spec in enumerate(design.wheels): | |
| wheel_collision = p.createCollisionShape( | |
| p.GEOM_CYLINDER, | |
| radius=wheel_spec.radius, | |
| height=wheel_spec.width | |
| ) | |
| wheel_visual = p.createVisualShape( | |
| p.GEOM_CYLINDER, | |
| radius=wheel_spec.radius, | |
| length=wheel_spec.width, | |
| rgbaColor=[0.2, 0.2, 0.2, 1] | |
| ) | |
| wheel_id = p.createMultiBody( | |
| baseMass=1.0, | |
| baseCollisionShapeIndex=wheel_collision, | |
| baseVisualShapeIndex=wheel_visual, | |
| basePosition=wheel_spec.position | |
| ) | |
| self.wheel_ids.append(wheel_id) | |
| # Create joint | |
| joint_id = p.createConstraint( | |
| self.robot_id, | |
| wheel_id, | |
| p.JOINT_POINT2POINT, | |
| [0, 0, 0], | |
| wheel_spec.position, | |
| [0, 0, 0] | |
| ) | |
| p.changeConstraint(joint_id, maxForce=design.motors[i].max_force) | |
| self.motor_ids.append(joint_id) | |
| return self.robot_id | |
| def run_simulation(self, duration_sec: int = 7) -> Tuple[List[bytes], Dict[str, Any]]: | |
| self.frames = [] | |
| steps = int(duration_sec * 240) # 240 Hz simulation | |
| for i in range(steps): | |
| # Apply motor forces | |
| for motor_id in self.motor_ids: | |
| p.setJointMotorControl2( | |
| self.robot_id, | |
| motor_id, | |
| p.VELOCITY_CONTROL, | |
| targetVelocity=10.0, | |
| force=100.0 | |
| ) | |
| p.stepSimulation() | |
| # Capture frame every 10 steps | |
| if i % 10 == 0: | |
| view_matrix = p.computeViewMatrix( | |
| cameraEyePosition=[2, 2, 2], | |
| cameraTargetPosition=[0, 0, 0], | |
| cameraUpVector=[0, 0, 1] | |
| ) | |
| proj_matrix = p.computeProjectionMatrixFOV( | |
| fov=60, | |
| aspect=1.0, | |
| nearVal=0.1, | |
| farVal=100.0 | |
| ) | |
| _, _, rgb, _, _ = p.getCameraImage( | |
| width=320, | |
| height=240, | |
| viewMatrix=view_matrix, | |
| projectionMatrix=proj_matrix, | |
| renderer=p.ER_BULLET_HARDWARE_OPENGL | |
| ) | |
| self.frames.append(rgb.tobytes()) | |
| # Get final position and orientation | |
| pos, quat = p.getBasePositionAndOrientation(self.robot_id) | |
| results = { | |
| "final_position": pos, | |
| "final_orientation": quat, | |
| "success": pos[0] > 0.8 and pos[2] > 0.05 # Success criteria | |
| } | |
| return self.frames, results | |
| class SimulationFactory: | |
| def create_simulator(simulator_type: str = "pybullet") -> SimulationStrategy: | |
| if simulator_type.lower() == "pybullet": | |
| return PyBulletSimulator() | |
| else: | |
| raise ValueError(f"Unknown simulator type: {simulator_type}") |