#!/usr/bin/env python3 """Interactive hopper control via OpenEnv. This example demonstrates using the dm_control OpenEnv client with the hopper environment. Press SPACE to apply random forces to the joints. Controls: SPACE: Apply random force to all joints R: Reset environment ESC or Q: Quit Requirements: pip install pygame Usage: 1. Start the server: uvicorn server.app:app --host 0.0.0.0 --port 8000 2. Run this script: python examples/hopper_control.py For visual mode (requires working MuJoCo rendering): python examples/hopper_control.py --visual """ import argparse import random import sys from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from client import DMControlEnv from models import DMControlAction def get_action_dim(env: DMControlEnv) -> int: """Get the action dimension from the environment state.""" state = env.state() action_spec = state.action_spec if action_spec and "shape" in action_spec: shape = action_spec["shape"] if isinstance(shape, list) and len(shape) > 0: return shape[0] # Hopper default: 4 actuators (hip, knee, ankle, toe) return 4 def generate_random_action(action_dim: int, magnitude: float = 1.0) -> DMControlAction: """Generate a random action with values in [-magnitude, magnitude].""" values = [random.uniform(-magnitude, magnitude) for _ in range(action_dim)] return DMControlAction(values=values) def generate_zero_action(action_dim: int) -> DMControlAction: """Generate a zero action (no force applied).""" return DMControlAction(values=[0.0] * action_dim) def run_headless(env: DMControlEnv, task: str = "hop", max_steps: int = 1000): """Run hopper control in headless mode.""" print("\n=== Headless Mode (OpenEnv Step/Observation Pattern) ===") print("This mode demonstrates the OpenEnv API with the hopper.\n") # Reset environment using OpenEnv pattern result = env.reset(domain_name="hopper", task_name=task) print(f"Initial observations: {list(result.observation.observations.keys())}") # Get action dimension action_dim = get_action_dim(env) print(f"Action dimension: {action_dim}") total_reward = 0.0 step_count = 0 print("\nRunning with periodic random forces...") print("Every 30 steps, a random force burst is applied.\n") while not result.done and step_count < max_steps: # Apply random force every 30 steps, otherwise zero action if step_count % 30 < 5: # Random force burst for 5 steps action = generate_random_action(action_dim, magnitude=0.8) else: # No force action = generate_zero_action(action_dim) # Step the environment using OpenEnv pattern result = env.step(action) # Access observation and reward from result total_reward += result.reward or 0.0 step_count += 1 # Print progress periodically if step_count % 100 == 0: # Get some observation values position = result.observation.observations.get("position", []) velocity = result.observation.observations.get("velocity", []) print( f"Step {step_count}: reward={result.reward:.3f}, " f"total={total_reward:.2f}, done={result.done}" ) if position: print(f" position: {position[:3]}") if velocity: print(f" velocity: {velocity[:3]}") print(f"\nEpisode finished: {step_count} steps, total reward: {total_reward:.2f}") def run_interactive(env: DMControlEnv, task: str = "hop"): """Run interactive control with keyboard input via pygame.""" import pygame print("\n=== Interactive Mode (OpenEnv Step/Observation Pattern) ===") print("Press SPACE to apply random force, R to reset, ESC to quit.\n") # Reset environment using OpenEnv pattern result = env.reset(domain_name="hopper", task_name=task) print(f"Initial observations: {list(result.observation.observations.keys())}") # Get action dimension action_dim = get_action_dim(env) print(f"Action dimension: {action_dim}") # Initialize pygame for keyboard input (minimal window) pygame.init() screen = pygame.display.set_mode((400, 100)) pygame.display.set_caption("Hopper Control - SPACE for random force, R to reset") clock = pygame.time.Clock() # Font for display font = pygame.font.Font(None, 24) running = True total_reward = 0.0 step_count = 0 apply_random_force = False print("\nControls:") print(" SPACE: Apply random force to joints") print(" R: Reset environment") print(" ESC or Q: Quit\n") while running: # Handle events for event in pygame.event.get(): if event.type == pygame.QUIT: running = False elif event.type == pygame.KEYDOWN: if event.key in (pygame.K_ESCAPE, pygame.K_q): running = False elif event.key == pygame.K_r: result = env.reset(domain_name="hopper", task_name=task) total_reward = 0.0 step_count = 0 print("Environment reset") # Check for held keys keys = pygame.key.get_pressed() apply_random_force = keys[pygame.K_SPACE] # Generate action based on input if apply_random_force: action = generate_random_action(action_dim, magnitude=2.0) else: action = generate_zero_action(action_dim) # Step the environment using OpenEnv pattern result = env.step(action) # Track reward from result total_reward += result.reward or 0.0 step_count += 1 # Check if episode is done if result.done: print( f"Episode finished! Steps: {step_count}, " f"Total reward: {total_reward:.2f}" ) # Auto-reset on done result = env.reset(domain_name="hopper", task_name=task) total_reward = 0.0 step_count = 0 # Update display screen.fill((30, 30, 30)) status = "FORCE!" if apply_random_force else "idle" text = font.render( f"Step: {step_count} | Reward: {total_reward:.1f} | {status}", True, (255, 255, 255), ) screen.blit(text, (10, 40)) pygame.display.flip() # Print progress periodically if step_count % 200 == 0 and step_count > 0: print(f"Step {step_count}: Total reward: {total_reward:.2f}") # Cap at 30 FPS clock.tick(30) pygame.quit() print(f"Session ended. Final reward: {total_reward:.2f}") def run_visual(env: DMControlEnv, task: str = "hop"): """Run with pygame visualization showing rendered frames.""" import base64 import io import pygame print("\n=== Visual Mode (OpenEnv Step/Observation Pattern) ===") # Reset environment with rendering enabled result = env.reset(domain_name="hopper", task_name=task, render=True) print(f"Initial observations: {list(result.observation.observations.keys())}") # Get action dimension action_dim = get_action_dim(env) print(f"Action dimension: {action_dim}") # Get first frame to determine window size if result.observation.pixels is None: print("Error: Server did not return rendered pixels.") print("Make sure the server supports render=True") print("\nTry running in interactive mode (default) instead.") sys.exit(1) # Decode base64 PNG to pygame surface png_data = base64.b64decode(result.observation.pixels) frame = pygame.image.load(io.BytesIO(png_data)) frame_size = frame.get_size() # Initialize pygame pygame.init() screen = pygame.display.set_mode(frame_size) pygame.display.set_caption( "Hopper (OpenEnv) - SPACE for random force, R to Reset, ESC to Quit" ) clock = pygame.time.Clock() print("Controls:") print(" SPACE: Apply random force to joints") print(" R: Reset environment") print(" ESC or Q: Quit") running = True total_reward = 0.0 step_count = 0 while running: # Handle events for event in pygame.event.get(): if event.type == pygame.QUIT: running = False elif event.type == pygame.KEYDOWN: if event.key in (pygame.K_ESCAPE, pygame.K_q): running = False elif event.key == pygame.K_r: result = env.reset( domain_name="hopper", task_name=task, render=True ) total_reward = 0.0 step_count = 0 print("Environment reset") # Check for held keys keys = pygame.key.get_pressed() apply_random_force = keys[pygame.K_SPACE] # Generate action based on input if apply_random_force: action = generate_random_action(action_dim, magnitude=2.0) else: action = generate_zero_action(action_dim) # Step the environment using OpenEnv pattern result = env.step(action, render=True) # Track reward from result total_reward += result.reward or 0.0 step_count += 1 # Check if episode is done if result.done: print( f"Episode finished! Steps: {step_count}, " f"Total reward: {total_reward:.2f}" ) result = env.reset(domain_name="hopper", task_name=task, render=True) total_reward = 0.0 step_count = 0 # Render the frame from observation pixels if result.observation.pixels: png_data = base64.b64decode(result.observation.pixels) frame = pygame.image.load(io.BytesIO(png_data)) screen.blit(frame, (0, 0)) pygame.display.flip() # Print progress periodically if step_count % 200 == 0 and step_count > 0: print(f"Step {step_count}: Total reward: {total_reward:.2f}") # Cap at 30 FPS clock.tick(30) pygame.quit() print(f"Session ended. Final reward: {total_reward:.2f}") def main(): parser = argparse.ArgumentParser( description="Interactive hopper control via OpenEnv" ) parser.add_argument( "--visual", action="store_true", help="Enable pygame visualization with rendered frames", ) parser.add_argument( "--headless", action="store_true", help="Run in headless mode (no pygame, automated control)", ) parser.add_argument( "--max-steps", type=int, default=1000, help="Maximum steps for headless mode (default: 1000)", ) parser.add_argument( "--task", type=str, default="hop", choices=["stand", "hop"], help="Hopper task (default: hop)", ) args = parser.parse_args() server_url = "http://localhost:8000" print(f"Connecting to {server_url}...") try: with DMControlEnv(base_url=server_url) as env: print("Connected!") # Get environment state state = env.state() print(f"Domain: {state.domain_name}, Task: {state.task_name}") print(f"Action spec: {state.action_spec}") if args.headless: run_headless(env, task=args.task, max_steps=args.max_steps) elif args.visual: run_visual(env, task=args.task) else: run_interactive(env, task=args.task) except ConnectionError as e: print(f"Failed to connect: {e}") print("\nMake sure the server is running:") print(" cd OpenEnv") print( " PYTHONPATH=src:envs uvicorn envs.dm_control_env.server.app:app --port 8000" ) sys.exit(1) if __name__ == "__main__": main()