""" Train a PPO agent with Liquid Neural Network policy on the DroneWindEnv environment. This script uses stable-baselines3 PPO with a Liquid Neural Network feature extractor to train an agent to survive and navigate in the 2D drone environment with wind. The trained model is saved to models/liquid_policy.zip and TensorBoard logs are written to logs/ppo_liquid/. """ import os import sys import argparse from typing import Optional import gymnasium as gym from stable_baselines3 import PPO from stable_baselines3.common.vec_env import DummyVecEnv from stable_baselines3.common.monitor import Monitor # Add project root to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from env.drone_env import DroneWindEnv from models.liquid_policy import LiquidFeatureExtractor def make_env(seed: Optional[int] = None) -> gym.Env: """ Create and wrap a DroneWindEnv instance with Monitor. Args: seed: Optional random seed for the environment Returns: Wrapped Gymnasium environment """ env = DroneWindEnv() env = Monitor(env) if seed is not None: env.reset(seed=seed) return env def make_vec_env(num_envs: int = 4) -> DummyVecEnv: """ Create a vectorized environment with multiple parallel instances. Args: num_envs: Number of parallel environments Returns: Vectorized environment """ def make_vec_env_fn(seed: Optional[int] = None): def _init(): return make_env(seed) return _init vec_env = DummyVecEnv([make_vec_env_fn(seed=i) for i in range(num_envs)]) return vec_env def main(): """Main training function.""" parser = argparse.ArgumentParser(description="Train PPO agent with Liquid NN on DroneWindEnv") parser.add_argument( "--timesteps", type=int, default=100_000, help="Total number of training timesteps (default: 100000)" ) parser.add_argument( "--seed", type=int, default=0, help="Random seed (default: 0)" ) parser.add_argument( "--logdir", type=str, default="logs/ppo_liquid", help="Directory for TensorBoard logs (default: logs/ppo_liquid)" ) parser.add_argument( "--model-path", type=str, default="models/liquid_policy.zip", help="Path to save the trained model (default: models/liquid_policy.zip)" ) parser.add_argument( "--num-envs", type=int, default=4, help="Number of parallel environments (default: 4)" ) parser.add_argument( "--hidden-size", type=int, default=32, help="Hidden size for liquid cell (default: 32)" ) parser.add_argument( "--dt", type=float, default=0.1, help="Time step for liquid cell (default: 0.1)" ) args = parser.parse_args() # Create directories if they don't exist os.makedirs(os.path.dirname(args.model_path), exist_ok=True) os.makedirs(args.logdir, exist_ok=True) print("=" * 60) print("Training PPO Agent with Liquid NN on DroneWindEnv") print("=" * 60) print(f"Total timesteps: {args.timesteps:,}") print(f"Number of parallel environments: {args.num_envs}") print(f"Liquid cell hidden size: {args.hidden_size}") print(f"Liquid cell dt: {args.dt}") print(f"Model will be saved to: {args.model_path}") print(f"TensorBoard logs: {args.logdir}") print("=" * 60) # Create vectorized environment print("Creating vectorized environment...") vec_env = make_vec_env(num_envs=args.num_envs) # Get observation space for feature extractor obs_space = vec_env.observation_space # Configure policy with liquid feature extractor policy_kwargs = dict( features_extractor_class=LiquidFeatureExtractor, features_extractor_kwargs=dict( features_dim=args.hidden_size, hidden_size=args.hidden_size, dt=args.dt, ), net_arch=dict(pi=[64], vf=[64]), # Policy and value heads with 64 hidden units ) # Create PPO agent print("Initializing PPO agent with Liquid NN...") model = PPO( policy="MlpPolicy", env=vec_env, policy_kwargs=policy_kwargs, n_steps=1024, batch_size=64, gamma=0.99, learning_rate=3e-4, gae_lambda=0.95, clip_range=0.2, ent_coef=0.01, verbose=1, tensorboard_log=args.logdir, seed=args.seed, ) # Training with curriculum (commented out for now - use fixed mild wind) # For curriculum learning, you could do: # # # Phase 1: Mild wind (0-30k steps) # if args.timesteps > 30000: # print("Training phase 1: Mild wind (0-30k steps)...") # model.learn(total_timesteps=30000, progress_bar=True) # # # Phase 2: Medium wind (30k-60k steps) # if args.timesteps > 60000: # print("Training phase 2: Medium wind (30k-60k steps)...") # # Would need to recreate env with difficulty=1 # model.learn(total_timesteps=30000, progress_bar=True, reset_num_timesteps=False) # # # Phase 3: Strong wind (60k+ steps) # if args.timesteps > 60000: # print("Training phase 3: Strong wind (60k+ steps)...") # # Would need to recreate env with difficulty=2 # model.learn(total_timesteps=args.timesteps - 60000, progress_bar=True, reset_num_timesteps=False) # else: # model.learn(total_timesteps=args.timesteps - 30000, progress_bar=True, reset_num_timesteps=False) # else: # model.learn(total_timesteps=args.timesteps, progress_bar=True) # For now, train on fixed mild wind print("\nStarting training...") model.learn( total_timesteps=args.timesteps, progress_bar=True ) # Save the model print(f"\nSaving model to {args.model_path}...") model.save(args.model_path) print("\n" + "=" * 60) print("Training completed successfully!") print(f"Model saved to: {args.model_path}") print(f"TensorBoard logs available at: {args.logdir}") print("=" * 60) print("\nTo view training progress, run:") print(f" tensorboard --logdir {args.logdir}") print("\nTo evaluate the model, run:") print(f" python eval/eval_liquid_policy.py --model-path {args.model_path}") if __name__ == "__main__": main()