car_env / train_parallel.py
Hajorda's picture
Upload folder using huggingface_hub
1a0d68d verified
import gymnasium as gym
from gymnasium import spaces
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv
import numpy as np
import time
import os
import sys
# --- 1. ORTAM SINIFI (Buraya Gömüldü) ---
class RcCarEnv(gym.Env):
def __init__(self):
super(RcCarEnv, self).__init__()
# Her işlem (process) kendi ROS bağlamını başlatmalı
if not rclpy.ok():
rclpy.init()
self.node = rclpy.create_node('rl_parallel_agent')
# QoS ve İletişim
qos_best_effort = QoSProfile(reliability=ReliabilityPolicy.BEST_EFFORT, depth=10)
self.pub_cmd = self.node.create_publisher(Twist, '/cmd_vel', 10)
self.sub_scan = self.node.create_subscription(LaserScan, '/scan', self.scan_cb, qos_best_effort)
# Parametreler
self.max_speed = 1.0
self.max_steering = 0.5
self.steering_smooth_factor = 0.2
self.current_steering = 0.0
self.scan_data = None
self.scan_received = False
self.n_obs = 10
self.max_range = 10.0
self.turn_history = 0.0
# Uzaylar
self.action_space = spaces.Box(low=np.array([-1.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32)
self.observation_space = spaces.Box(low=0.0, high=1.0, shape=(self.n_obs,), dtype=np.float32)
self.step_count = 0
self.max_steps = 2500
def scan_cb(self, msg):
raw = np.array(msg.ranges)
raw = np.where(np.isinf(raw), self.max_range, raw)
raw = np.where(np.isnan(raw), self.max_range, raw)
raw = np.clip(raw, 0.0, self.max_range)
chunk = len(raw) // self.n_obs
if chunk > 0:
self.scan_data = np.array([np.min(raw[i*chunk:(i+1)*chunk]) for i in range(self.n_obs)], dtype=np.float32)
else:
self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32)
self.scan_received = True
def step(self, action):
twist = Twist()
# Action Mapping
throttle_input = (float(action[0]) + 1.0) / 2.0
twist.linear.x = throttle_input * self.max_speed
target_steering = float(action[1]) * self.max_steering
self.current_steering = (1.0 - self.steering_smooth_factor) * self.current_steering + \
(self.steering_smooth_factor * target_steering)
twist.angular.z = self.current_steering
self.pub_cmd.publish(twist)
# Sync
self.scan_received = False
start_wait = time.time()
while not self.scan_received:
rclpy.spin_once(self.node, timeout_sec=0.005)
if time.time() - start_wait > 0.2: break
if self.scan_data is not None:
observation = self.scan_data / self.max_range
else:
observation = np.ones(self.n_obs, dtype=np.float32)
self.step_count += 1
# --- REWARD FUNCTION ---
reward = 0.0
terminated = False
truncated = False
real_min_dist = np.min(observation) * self.max_range
# Speed Reward
speed_reward = twist.linear.x * 2.0
if abs(self.current_steering) > 0.3:
speed_reward *= 0.5
if twist.linear.x > 0.2:
reward += speed_reward
else:
reward -= 0.1
# Steering Cost
reward -= abs(self.current_steering) * 0.8
# Wall Penalty
if real_min_dist < 0.8:
reward -= (0.8 - real_min_dist) * 2.0
# Crash
if real_min_dist < 0.25:
reward = -30.0
terminated = True
self.pub_cmd.publish(Twist())
if self.step_count >= self.max_steps:
truncated = True
return observation.astype(np.float32), reward, terminated, truncated, {}
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.pub_cmd.publish(Twist())
self.turn_history = 0.0
# Gazebo Reset (Paralel uyumlu)
# os.environ["GZ_PARTITION"] ayarlandığı için her process kendi simülasyonunu resetler
cmd = """gz service -s /world/arena/set_pose \
--reqtype gz.msgs.Pose \
--reptype gz.msgs.Boolean \
--timeout 300 \
--req 'name: "my_rc_car", position: {x: 0, y: 0, z: 0.3}, orientation: {w: 1}' > /dev/null 2>&1"""
os.system(cmd)
# Spawn Kill Koruması
self.scan_data = None
self.scan_received = False
self.current_steering = 0.0
max_retries = 20
for _ in range(max_retries):
start_wait = time.time()
self.scan_received = False
while not self.scan_received:
rclpy.spin_once(self.node, timeout_sec=0.01)
if time.time() - start_wait > 0.2: break
if self.scan_data is not None:
if np.min(self.scan_data) > 0.5: break
else:
os.system(cmd) # Tekrar ışınla
if self.scan_data is not None:
obs = self.scan_data / self.max_range
else:
obs = np.ones(self.n_obs, dtype=np.float32)
self.step_count = 0
return obs, {}
def close(self):
self.pub_cmd.publish(Twist())
self.node.destroy_node()
# --- 2. PARALEL YÖNETİCİ ---
def make_env(rank: int, seed: int = 0):
def _init():
# HER SİMÜLASYONU İZOLE ET
unique_id = str(rank + 50) # Çakışmayı önlemek için 50'den başlat
# ROS 2 Domain ID (Her robotun kendi ağı olur)
os.environ["ROS_DOMAIN_ID"] = unique_id
# Gazebo Partition (Her simülasyonun kendi fizik dünyası olur)
os.environ["GZ_PARTITION"] = f"sim_{unique_id}"
env = RcCarEnv()
env.reset(seed=seed + rank)
return env
return _init
# --- 3. EĞİTİM DÖNGÜSÜ ---
if __name__ == "__main__":
# CPU Sayısını Belirle
num_cpu = 4 # Bilgisayarın iyiyse 6 yapabilirsin
print(f"🚀 {num_cpu} Adet Paralel Ortam Hazırlanıyor...")
print("⚠️ Dikkat: Bu işlem RAM tüketir. Diğer Gazebo pencerelerini kapat.")
# Ortamları Başlat
vec_env = SubprocVecEnv([make_env(i) for i in range(num_cpu)])
# Log Klasörü
log_dir = "./logs_parallel/"
os.makedirs(log_dir, exist_ok=True)
print("🧠 Model Hazırlanıyor (PPO)...")
model = PPO(
"MlpPolicy",
vec_env,
verbose=1,
tensorboard_log=log_dir,
use_sde=True,
learning_rate=3e-4,
batch_size=256, # Paralel olduğu için batch'i artırdık
n_steps=2048,
gamma=0.99
)
print("---------------------------------------")
print(f"🏎️ TURBO EĞİTİM BAŞLADI ({num_cpu}x Hız)")
print("---------------------------------------")
try:
# 1 Milyon Adım (Çok hızlı biter)
model.learn(total_timesteps=1_000_000)
model.save("rc_car_parallel_master")
print("✅ Eğitim Tamamlandı ve Kaydedildi!")
except KeyboardInterrupt:
model.save("rc_car_parallel_interrupted")
print("\n⚠️ Eğitim durduruldu. Model kaydedildi.")
finally:
vec_env.close()