import gymnasium as gym from gymnasium import spaces import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, ReliabilityPolicy from geometry_msgs.msg import Twist from sensor_msgs.msg import LaserScan from stable_baselines3 import PPO from stable_baselines3.common.vec_env import SubprocVecEnv import numpy as np import time import os import sys # --- 1. ORTAM SINIFI (Buraya Gömüldü) --- class RcCarEnv(gym.Env): def __init__(self): super(RcCarEnv, self).__init__() # Her işlem (process) kendi ROS bağlamını başlatmalı if not rclpy.ok(): rclpy.init() self.node = rclpy.create_node('rl_parallel_agent') # QoS ve İletişim qos_best_effort = QoSProfile(reliability=ReliabilityPolicy.BEST_EFFORT, depth=10) self.pub_cmd = self.node.create_publisher(Twist, '/cmd_vel', 10) self.sub_scan = self.node.create_subscription(LaserScan, '/scan', self.scan_cb, qos_best_effort) # Parametreler self.max_speed = 1.0 self.max_steering = 0.5 self.steering_smooth_factor = 0.2 self.current_steering = 0.0 self.scan_data = None self.scan_received = False self.n_obs = 10 self.max_range = 10.0 self.turn_history = 0.0 # Uzaylar self.action_space = spaces.Box(low=np.array([-1.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32) self.observation_space = spaces.Box(low=0.0, high=1.0, shape=(self.n_obs,), dtype=np.float32) self.step_count = 0 self.max_steps = 2500 def scan_cb(self, msg): raw = np.array(msg.ranges) raw = np.where(np.isinf(raw), self.max_range, raw) raw = np.where(np.isnan(raw), self.max_range, raw) raw = np.clip(raw, 0.0, self.max_range) chunk = len(raw) // self.n_obs if chunk > 0: self.scan_data = np.array([np.min(raw[i*chunk:(i+1)*chunk]) for i in range(self.n_obs)], dtype=np.float32) else: self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32) self.scan_received = True def step(self, action): twist = Twist() # Action Mapping throttle_input = (float(action[0]) + 1.0) / 2.0 twist.linear.x = throttle_input * self.max_speed target_steering = float(action[1]) * self.max_steering self.current_steering = (1.0 - self.steering_smooth_factor) * self.current_steering + \ (self.steering_smooth_factor * target_steering) twist.angular.z = self.current_steering self.pub_cmd.publish(twist) # Sync self.scan_received = False start_wait = time.time() while not self.scan_received: rclpy.spin_once(self.node, timeout_sec=0.005) if time.time() - start_wait > 0.2: break if self.scan_data is not None: observation = self.scan_data / self.max_range else: observation = np.ones(self.n_obs, dtype=np.float32) self.step_count += 1 # --- REWARD FUNCTION --- reward = 0.0 terminated = False truncated = False real_min_dist = np.min(observation) * self.max_range # Speed Reward speed_reward = twist.linear.x * 2.0 if abs(self.current_steering) > 0.3: speed_reward *= 0.5 if twist.linear.x > 0.2: reward += speed_reward else: reward -= 0.1 # Steering Cost reward -= abs(self.current_steering) * 0.8 # Wall Penalty if real_min_dist < 0.8: reward -= (0.8 - real_min_dist) * 2.0 # Crash if real_min_dist < 0.25: reward = -30.0 terminated = True self.pub_cmd.publish(Twist()) if self.step_count >= self.max_steps: truncated = True return observation.astype(np.float32), reward, terminated, truncated, {} def reset(self, seed=None, options=None): super().reset(seed=seed) self.pub_cmd.publish(Twist()) self.turn_history = 0.0 # Gazebo Reset (Paralel uyumlu) # os.environ["GZ_PARTITION"] ayarlandığı için her process kendi simülasyonunu resetler cmd = """gz service -s /world/arena/set_pose \ --reqtype gz.msgs.Pose \ --reptype gz.msgs.Boolean \ --timeout 300 \ --req 'name: "my_rc_car", position: {x: 0, y: 0, z: 0.3}, orientation: {w: 1}' > /dev/null 2>&1""" os.system(cmd) # Spawn Kill Koruması self.scan_data = None self.scan_received = False self.current_steering = 0.0 max_retries = 20 for _ in range(max_retries): start_wait = time.time() self.scan_received = False while not self.scan_received: rclpy.spin_once(self.node, timeout_sec=0.01) if time.time() - start_wait > 0.2: break if self.scan_data is not None: if np.min(self.scan_data) > 0.5: break else: os.system(cmd) # Tekrar ışınla if self.scan_data is not None: obs = self.scan_data / self.max_range else: obs = np.ones(self.n_obs, dtype=np.float32) self.step_count = 0 return obs, {} def close(self): self.pub_cmd.publish(Twist()) self.node.destroy_node() # --- 2. PARALEL YÖNETİCİ --- def make_env(rank: int, seed: int = 0): def _init(): # HER SİMÜLASYONU İZOLE ET unique_id = str(rank + 50) # Çakışmayı önlemek için 50'den başlat # ROS 2 Domain ID (Her robotun kendi ağı olur) os.environ["ROS_DOMAIN_ID"] = unique_id # Gazebo Partition (Her simülasyonun kendi fizik dünyası olur) os.environ["GZ_PARTITION"] = f"sim_{unique_id}" env = RcCarEnv() env.reset(seed=seed + rank) return env return _init # --- 3. EĞİTİM DÖNGÜSÜ --- if __name__ == "__main__": # CPU Sayısını Belirle num_cpu = 4 # Bilgisayarın iyiyse 6 yapabilirsin print(f"🚀 {num_cpu} Adet Paralel Ortam Hazırlanıyor...") print("⚠️ Dikkat: Bu işlem RAM tüketir. Diğer Gazebo pencerelerini kapat.") # Ortamları Başlat vec_env = SubprocVecEnv([make_env(i) for i in range(num_cpu)]) # Log Klasörü log_dir = "./logs_parallel/" os.makedirs(log_dir, exist_ok=True) print("🧠 Model Hazırlanıyor (PPO)...") model = PPO( "MlpPolicy", vec_env, verbose=1, tensorboard_log=log_dir, use_sde=True, learning_rate=3e-4, batch_size=256, # Paralel olduğu için batch'i artırdık n_steps=2048, gamma=0.99 ) print("---------------------------------------") print(f"🏎️ TURBO EĞİTİM BAŞLADI ({num_cpu}x Hız)") print("---------------------------------------") try: # 1 Milyon Adım (Çok hızlı biter) model.learn(total_timesteps=1_000_000) model.save("rc_car_parallel_master") print("✅ Eğitim Tamamlandı ve Kaydedildi!") except KeyboardInterrupt: model.save("rc_car_parallel_interrupted") print("\n⚠️ Eğitim durduruldu. Model kaydedildi.") finally: vec_env.close()