Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Interactive hopper control via OpenEnv. | |
| This example demonstrates using the dm_control OpenEnv client with | |
| the hopper environment. Press SPACE to apply random forces to the joints. | |
| Controls: | |
| SPACE: Apply random force to all joints | |
| R: Reset environment | |
| ESC or Q: Quit | |
| Requirements: | |
| pip install pygame | |
| Usage: | |
| 1. Start the server: uvicorn server.app:app --host 0.0.0.0 --port 8000 | |
| 2. Run this script: python examples/hopper_control.py | |
| For visual mode (requires working MuJoCo rendering): | |
| python examples/hopper_control.py --visual | |
| """ | |
| import argparse | |
| import random | |
| import sys | |
| from pathlib import Path | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from client import DMControlEnv | |
| from models import DMControlAction | |
| def get_action_dim(env: DMControlEnv) -> int: | |
| """Get the action dimension from the environment state.""" | |
| state = env.state() | |
| action_spec = state.action_spec | |
| if action_spec and "shape" in action_spec: | |
| shape = action_spec["shape"] | |
| if isinstance(shape, list) and len(shape) > 0: | |
| return shape[0] | |
| # Hopper default: 4 actuators (hip, knee, ankle, toe) | |
| return 4 | |
| def generate_random_action(action_dim: int, magnitude: float = 1.0) -> DMControlAction: | |
| """Generate a random action with values in [-magnitude, magnitude].""" | |
| values = [random.uniform(-magnitude, magnitude) for _ in range(action_dim)] | |
| return DMControlAction(values=values) | |
| def generate_zero_action(action_dim: int) -> DMControlAction: | |
| """Generate a zero action (no force applied).""" | |
| return DMControlAction(values=[0.0] * action_dim) | |
| def run_headless(env: DMControlEnv, task: str = "hop", max_steps: int = 1000): | |
| """Run hopper control in headless mode.""" | |
| print("\n=== Headless Mode (OpenEnv Step/Observation Pattern) ===") | |
| print("This mode demonstrates the OpenEnv API with the hopper.\n") | |
| # Reset environment using OpenEnv pattern | |
| result = env.reset(domain_name="hopper", task_name=task) | |
| print(f"Initial observations: {list(result.observation.observations.keys())}") | |
| # Get action dimension | |
| action_dim = get_action_dim(env) | |
| print(f"Action dimension: {action_dim}") | |
| total_reward = 0.0 | |
| step_count = 0 | |
| print("\nRunning with periodic random forces...") | |
| print("Every 30 steps, a random force burst is applied.\n") | |
| while not result.done and step_count < max_steps: | |
| # Apply random force every 30 steps, otherwise zero action | |
| if step_count % 30 < 5: | |
| # Random force burst for 5 steps | |
| action = generate_random_action(action_dim, magnitude=0.8) | |
| else: | |
| # No force | |
| action = generate_zero_action(action_dim) | |
| # Step the environment using OpenEnv pattern | |
| result = env.step(action) | |
| # Access observation and reward from result | |
| total_reward += result.reward or 0.0 | |
| step_count += 1 | |
| # Print progress periodically | |
| if step_count % 100 == 0: | |
| # Get some observation values | |
| position = result.observation.observations.get("position", []) | |
| velocity = result.observation.observations.get("velocity", []) | |
| print( | |
| f"Step {step_count}: reward={result.reward:.3f}, " | |
| f"total={total_reward:.2f}, done={result.done}" | |
| ) | |
| if position: | |
| print(f" position: {position[:3]}") | |
| if velocity: | |
| print(f" velocity: {velocity[:3]}") | |
| print(f"\nEpisode finished: {step_count} steps, total reward: {total_reward:.2f}") | |
| def run_interactive(env: DMControlEnv, task: str = "hop"): | |
| """Run interactive control with keyboard input via pygame.""" | |
| import pygame | |
| print("\n=== Interactive Mode (OpenEnv Step/Observation Pattern) ===") | |
| print("Press SPACE to apply random force, R to reset, ESC to quit.\n") | |
| # Reset environment using OpenEnv pattern | |
| result = env.reset(domain_name="hopper", task_name=task) | |
| print(f"Initial observations: {list(result.observation.observations.keys())}") | |
| # Get action dimension | |
| action_dim = get_action_dim(env) | |
| print(f"Action dimension: {action_dim}") | |
| # Initialize pygame for keyboard input (minimal window) | |
| pygame.init() | |
| screen = pygame.display.set_mode((400, 100)) | |
| pygame.display.set_caption("Hopper Control - SPACE for random force, R to reset") | |
| clock = pygame.time.Clock() | |
| # Font for display | |
| font = pygame.font.Font(None, 24) | |
| running = True | |
| total_reward = 0.0 | |
| step_count = 0 | |
| apply_random_force = False | |
| print("\nControls:") | |
| print(" SPACE: Apply random force to joints") | |
| print(" R: Reset environment") | |
| print(" ESC or Q: Quit\n") | |
| while running: | |
| # Handle events | |
| for event in pygame.event.get(): | |
| if event.type == pygame.QUIT: | |
| running = False | |
| elif event.type == pygame.KEYDOWN: | |
| if event.key in (pygame.K_ESCAPE, pygame.K_q): | |
| running = False | |
| elif event.key == pygame.K_r: | |
| result = env.reset(domain_name="hopper", task_name=task) | |
| total_reward = 0.0 | |
| step_count = 0 | |
| print("Environment reset") | |
| # Check for held keys | |
| keys = pygame.key.get_pressed() | |
| apply_random_force = keys[pygame.K_SPACE] | |
| # Generate action based on input | |
| if apply_random_force: | |
| action = generate_random_action(action_dim, magnitude=2.0) | |
| else: | |
| action = generate_zero_action(action_dim) | |
| # Step the environment using OpenEnv pattern | |
| result = env.step(action) | |
| # Track reward from result | |
| total_reward += result.reward or 0.0 | |
| step_count += 1 | |
| # Check if episode is done | |
| if result.done: | |
| print( | |
| f"Episode finished! Steps: {step_count}, " | |
| f"Total reward: {total_reward:.2f}" | |
| ) | |
| # Auto-reset on done | |
| result = env.reset(domain_name="hopper", task_name=task) | |
| total_reward = 0.0 | |
| step_count = 0 | |
| # Update display | |
| screen.fill((30, 30, 30)) | |
| status = "FORCE!" if apply_random_force else "idle" | |
| text = font.render( | |
| f"Step: {step_count} | Reward: {total_reward:.1f} | {status}", | |
| True, | |
| (255, 255, 255), | |
| ) | |
| screen.blit(text, (10, 40)) | |
| pygame.display.flip() | |
| # Print progress periodically | |
| if step_count % 200 == 0 and step_count > 0: | |
| print(f"Step {step_count}: Total reward: {total_reward:.2f}") | |
| # Cap at 30 FPS | |
| clock.tick(30) | |
| pygame.quit() | |
| print(f"Session ended. Final reward: {total_reward:.2f}") | |
| def run_visual(env: DMControlEnv, task: str = "hop"): | |
| """Run with pygame visualization showing rendered frames.""" | |
| import base64 | |
| import io | |
| import pygame | |
| print("\n=== Visual Mode (OpenEnv Step/Observation Pattern) ===") | |
| # Reset environment with rendering enabled | |
| result = env.reset(domain_name="hopper", task_name=task, render=True) | |
| print(f"Initial observations: {list(result.observation.observations.keys())}") | |
| # Get action dimension | |
| action_dim = get_action_dim(env) | |
| print(f"Action dimension: {action_dim}") | |
| # Get first frame to determine window size | |
| if result.observation.pixels is None: | |
| print("Error: Server did not return rendered pixels.") | |
| print("Make sure the server supports render=True") | |
| print("\nTry running in interactive mode (default) instead.") | |
| sys.exit(1) | |
| # Decode base64 PNG to pygame surface | |
| png_data = base64.b64decode(result.observation.pixels) | |
| frame = pygame.image.load(io.BytesIO(png_data)) | |
| frame_size = frame.get_size() | |
| # Initialize pygame | |
| pygame.init() | |
| screen = pygame.display.set_mode(frame_size) | |
| pygame.display.set_caption( | |
| "Hopper (OpenEnv) - SPACE for random force, R to Reset, ESC to Quit" | |
| ) | |
| clock = pygame.time.Clock() | |
| print("Controls:") | |
| print(" SPACE: Apply random force to joints") | |
| print(" R: Reset environment") | |
| print(" ESC or Q: Quit") | |
| running = True | |
| total_reward = 0.0 | |
| step_count = 0 | |
| while running: | |
| # Handle events | |
| for event in pygame.event.get(): | |
| if event.type == pygame.QUIT: | |
| running = False | |
| elif event.type == pygame.KEYDOWN: | |
| if event.key in (pygame.K_ESCAPE, pygame.K_q): | |
| running = False | |
| elif event.key == pygame.K_r: | |
| result = env.reset( | |
| domain_name="hopper", task_name=task, render=True | |
| ) | |
| total_reward = 0.0 | |
| step_count = 0 | |
| print("Environment reset") | |
| # Check for held keys | |
| keys = pygame.key.get_pressed() | |
| apply_random_force = keys[pygame.K_SPACE] | |
| # Generate action based on input | |
| if apply_random_force: | |
| action = generate_random_action(action_dim, magnitude=2.0) | |
| else: | |
| action = generate_zero_action(action_dim) | |
| # Step the environment using OpenEnv pattern | |
| result = env.step(action, render=True) | |
| # Track reward from result | |
| total_reward += result.reward or 0.0 | |
| step_count += 1 | |
| # Check if episode is done | |
| if result.done: | |
| print( | |
| f"Episode finished! Steps: {step_count}, " | |
| f"Total reward: {total_reward:.2f}" | |
| ) | |
| result = env.reset(domain_name="hopper", task_name=task, render=True) | |
| total_reward = 0.0 | |
| step_count = 0 | |
| # Render the frame from observation pixels | |
| if result.observation.pixels: | |
| png_data = base64.b64decode(result.observation.pixels) | |
| frame = pygame.image.load(io.BytesIO(png_data)) | |
| screen.blit(frame, (0, 0)) | |
| pygame.display.flip() | |
| # Print progress periodically | |
| if step_count % 200 == 0 and step_count > 0: | |
| print(f"Step {step_count}: Total reward: {total_reward:.2f}") | |
| # Cap at 30 FPS | |
| clock.tick(30) | |
| pygame.quit() | |
| print(f"Session ended. Final reward: {total_reward:.2f}") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Interactive hopper control via OpenEnv" | |
| ) | |
| parser.add_argument( | |
| "--visual", | |
| action="store_true", | |
| help="Enable pygame visualization with rendered frames", | |
| ) | |
| parser.add_argument( | |
| "--headless", | |
| action="store_true", | |
| help="Run in headless mode (no pygame, automated control)", | |
| ) | |
| parser.add_argument( | |
| "--max-steps", | |
| type=int, | |
| default=1000, | |
| help="Maximum steps for headless mode (default: 1000)", | |
| ) | |
| parser.add_argument( | |
| "--task", | |
| type=str, | |
| default="hop", | |
| choices=["stand", "hop"], | |
| help="Hopper task (default: hop)", | |
| ) | |
| args = parser.parse_args() | |
| server_url = "http://localhost:8000" | |
| print(f"Connecting to {server_url}...") | |
| try: | |
| with DMControlEnv(base_url=server_url) as env: | |
| print("Connected!") | |
| # Get environment state | |
| state = env.state() | |
| print(f"Domain: {state.domain_name}, Task: {state.task_name}") | |
| print(f"Action spec: {state.action_spec}") | |
| if args.headless: | |
| run_headless(env, task=args.task, max_steps=args.max_steps) | |
| elif args.visual: | |
| run_visual(env, task=args.task) | |
| else: | |
| run_interactive(env, task=args.task) | |
| except ConnectionError as e: | |
| print(f"Failed to connect: {e}") | |
| print("\nMake sure the server is running:") | |
| print(" cd OpenEnv") | |
| print( | |
| " PYTHONPATH=src:envs uvicorn envs.dm_control_env.server.app:app --port 8000" | |
| ) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |