import os import time import math import random import numpy as np import gymnasium as gym from gymnasium import spaces import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, ReliabilityPolicy from geometry_msgs.msg import Twist from sensor_msgs.msg import LaserScan from nav_msgs.msg import Odometry import subprocess # --- STABLE BASELINES3 --- from stable_baselines3 import SAC from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack from stable_baselines3.common.monitor import Monitor # --- GENEL AYARLAR --- ROBOT_NAME = "bot1" WORLD_NAME = "arena" CMD_VEL_TOPIC = f"/{ROBOT_NAME}/cmd_vel" SCAN_TOPIC = f"/{ROBOT_NAME}/scan" # ÖNEMLİ: URDF dosyanızdaki plugin ayarına göre Odometry topic'i ODOM_TOPIC = f"/{ROBOT_NAME}/odometry" ARENA_SIZE = 4.5 class RcCarEnv(gym.Env): def __init__(self): super().__init__() if not rclpy.ok(): rclpy.init() self.node = rclpy.create_node("rl_driver_node") # Simülasyon verisi bazen düşebilir, Best Effort en iyisidir qos = QoSProfile(reliability=ReliabilityPolicy.BEST_EFFORT, depth=10) self.pub_cmd = self.node.create_publisher(Twist, CMD_VEL_TOPIC, 10) self.sub_scan = self.node.create_subscription(LaserScan, SCAN_TOPIC, self.scan_cb, qos) self.sub_odom = self.node.create_subscription(Odometry, ODOM_TOPIC, self.odom_cb, qos) print(f"📡 Bağlantı Başarılı: {ROBOT_NAME} | Topic: {ODOM_TOPIC} | Lidar: 180 Sample") # ===================== # FİZİK & DONANIM # ===================== self.max_speed = 1.0 self.max_steering = 0.6 self.current_steering = 0.0 self.last_action = np.array([0.0, 0.0]) # Action Smoothing için hafıza # ===================== # LIDAR (URDF UYUMLU: A1M8) # ===================== self.n_obs = 180 # URDF 180 self.max_range = 10.0 # URDF Max self.min_range = 0.15 # URDF Min (Kör Nokta) self.scan_data = None self.raw_scan_data = None self.scan_received = False # ===================== # HARİTA & HEDEF # ===================== self.obstacle_names = [f"box_{i}" for i in range(1, 7)] self.box_positions = [] self.target_x = 0.0 self.target_y = 0.0 self.robot_x = 0.0 self.robot_y = 0.0 self.robot_yaw = 0.0 self.prev_distance_to_target = None self.goal_reached = False self.step_count = 0 self.max_steps = 2500 self.episode_count = 0 self.stuck_counter = 0 # ===================== # GYM SPACE # ===================== # Action: [Gaz, Direksiyon] -> [-1, 1] self.action_space = spaces.Box( low=np.array([-1.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32 ) # Observation: Lidar (180) + Hız + Direksiyon + Hedef Mesafesi + Hedef Açısı = 184 self.observation_space = spaces.Box( low=0.0, high=1.0, shape=(self.n_obs + 4,), dtype=np.float32 ) # ===================================================== # CALLBACKS # ===================================================== def odom_cb(self, msg): self.robot_x = msg.pose.pose.position.x self.robot_y = msg.pose.pose.position.y # Quaternion -> Euler (Yaw) Dönüşümü q = msg.pose.pose.orientation siny_cosp = 2 * (q.w * q.z + q.x * q.y) cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z) self.robot_yaw = math.atan2(siny_cosp, cosy_cosp) def scan_cb(self, msg): raw = np.array(msg.ranges) # 1. Sonsuz (Inf) ve Hatalı (NaN) değerleri temizle raw = np.where(np.isinf(raw), self.max_range, raw) raw = np.where(np.isnan(raw), self.max_range, raw) self.raw_scan_data = np.clip(raw, 0.0, self.max_range) # 2. Downsampling / Processing # URDF zaten 180 veriyor ama matematiksel güvenlik için chunk hesabı yapıyoruz. chunk = len(self.raw_scan_data) // self.n_obs if chunk > 0: # Min-Pooling: İnce engelleri kaçırmamak için gruptaki en küçük değeri alıyoruz self.scan_data = np.array( [np.min(self.raw_scan_data[i * chunk:(i + 1) * chunk]) for i in range(self.n_obs)], dtype=np.float32 ) else: # Lidar verisi beklenen sayıdan az gelirse (Hata durumu) self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32) self.scan_received = True # ===================================================== # YARDIMCI FONKSİYONLAR # ===================================================== def _update_target_marker(self, x, y): # KRİTİK: Z=2.0 -> Hedefi havaya kaldırıyoruz ki Lidar görmesin cmd = [ 'gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--timeout', '100', '--req', f'name: "target_marker", position: {{x: {x}, y: {y}, z: 2.0}}' ] subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Eğer marker yoksa oluştur sdf_xml = f"""{x} {y} 2.0 0 0 0true0.30 1 0 10 1 0 1""" sdf_xml_str = sdf_xml.replace('\n', '').replace(' ', '') subprocess.Popen(['gz', 'service', '-s', f'/world/{WORLD_NAME}/create', '--reqtype', 'gz.msgs.EntityFactory', '--reptype', 'gz.msgs.Boolean', '--req', f'sdf: "{sdf_xml_str}", name: "target_marker"'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def _is_pos_valid(self, x, y, min_dist_robot=2.0, min_dist_box=1.3): # Hedef, robotun veya engellerin üzerine gelmesin if np.sqrt((x - self.robot_x)**2 + (y - self.robot_y)**2) < min_dist_robot: return False for (bx, by) in self.box_positions: if abs(x - bx) < min_dist_box and abs(y - by) < min_dist_box: return False return True def _randomize_obstacles(self): self.box_positions = [] for name in self.obstacle_names: bx, by = 0, 0 valid = False for _ in range(30): bx = np.random.uniform(-ARENA_SIZE, ARENA_SIZE) by = np.random.uniform(-ARENA_SIZE, ARENA_SIZE) overlap = False for (ex_x, ex_y) in self.box_positions: if np.sqrt((bx - ex_x)**2 + (by - ex_y)**2) < 1.5: overlap = True; break dist_to_robot = np.sqrt((bx - self.robot_x)**2 + (by - self.robot_y)**2) if not overlap and dist_to_robot > 2.5: valid = True; break if valid: self.box_positions.append((bx, by)) cmd = ['gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--req', f'name: "{name}", position: {{x: {bx}, y: {by}, z: 0.5}}'] subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(0.15) def _set_new_target(self): valid = False; tx, ty = 0, 0 for _ in range(50): tx = np.random.uniform(-ARENA_SIZE + 0.5, ARENA_SIZE - 0.5) ty = np.random.uniform(-ARENA_SIZE + 0.5, ARENA_SIZE - 0.5) if self._is_pos_valid(tx, ty, min_dist_robot=3.0, min_dist_box=1.3): valid = True; break if not valid: tx, ty = 2.0, 2.0 self.target_x = tx; self.target_y = ty self._update_target_marker(tx, ty) print(f"🎯 Yeni Hedef: ({tx:.1f}, {ty:.1f})") # ===================================================== # OBSERVATION (Giriş Verisi Hazırlama) # ===================================================== def _get_observation(self, twist): if self.scan_data is None: lidar_norm = np.ones(self.n_obs, dtype=np.float32) else: lidar_norm = self.scan_data / self.max_range speed_norm = twist.linear.x / self.max_speed steering_norm = (self.current_steering / self.max_steering + 1.0) / 2.0 dist = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y) angle = math.atan2(self.target_y - self.robot_y, self.target_x - self.robot_x) - self.robot_yaw while angle > math.pi: angle -= 2 * math.pi while angle < -math.pi: angle += 2 * math.pi dist_norm = np.clip(dist / (ARENA_SIZE * 2), 0.0, 1.0) angle_norm = (angle + math.pi) / (2 * math.pi) obs = np.concatenate([lidar_norm, [speed_norm, steering_norm, dist_norm, angle_norm]]) return obs.astype(np.float32) # ===================================================== # STEP (HAREKET VE ÖDÜL MEKANİZMASI) # ===================================================== def step(self, action): twist = Twist() # Action: [Gaz (-1,1), Direksiyon (-1,1)] throttle = float(action[0]) twist.linear.x = throttle * self.max_speed target_steer = float(action[1]) * self.max_steering # Ackermann Gecikme Simülasyonu self.current_steering = 0.6 * self.current_steering + 0.4 * target_steer twist.angular.z = self.current_steering self.pub_cmd.publish(twist) # Lidar Senkronizasyonu self.scan_received = False start_time = time.time() while not self.scan_received: rclpy.spin_once(self.node, timeout_sec=0.001) if time.time() - start_time > 0.1: break obs = self._get_observation(twist) # --- DÜZELTİLMİŞ REWARD HESAPLAMA --- dist = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y) if self.scan_data is not None: min_lidar = np.min(self.scan_data) else: min_lidar = 10.0 self.step_count += 1 reward = 0.0 terminated = False truncated = False if self.prev_distance_to_target is None: self.prev_distance_to_target = dist # 1. Mesafe İlerlemesi (Ana Motivasyon) progress = self.prev_distance_to_target - dist reward += progress * 40.0 self.prev_distance_to_target = dist # 2. HEADING REWARD (DÜZELTME: Sadece Hareket Ediyorsa Ver!) # Robotun "durup puan kasmasını" engelliyoruz. if twist.linear.x > 0.2: goal_angle = math.atan2(self.target_y - self.robot_y, self.target_x - self.robot_x) heading_error = goal_angle - self.robot_yaw while heading_error > math.pi: heading_error -= 2 * math.pi while heading_error < -math.pi: heading_error += 2 * math.pi reward += math.cos(heading_error) * 0.5 # 3. LIVING PENALTY (VAROLUŞ CEZASI - Tembelliği Kırmak İçin) # Her adımda puan kaybetsin ki hedefe varmak için acele etsin. reward -= 0.05 # 4. DURMA VE HIZ CEZALARI if twist.linear.x < 0.1: reward -= 0.1 # Durmak yasak! elif min_lidar > 1.0 and twist.linear.x > 0.8: reward += 0.1 # Önü boşsa bas gaza # 5. ACTION SMOOTHING (Titremeyi Azalt) delta_action = np.abs(self.last_action[1] - action[1]) reward -= delta_action * 0.2 self.last_action = action # 6. GÜVENLİK (Korkuyu Yönetmek) # 0.25m altına inince uyarı cezası if min_lidar < 0.25: reward -= 2.0 # 0.18m altı çarpışma (A1M8'in kör noktasına girmeden öldür) if min_lidar < 0.18: reward -= 50.0 terminated = True print("💥 ÇARPIŞMA") # 7. HEDEF if dist < 0.6: reward += 50.0 + (500.0 / self.step_count) # Hızlı bitiren daha çok kazanır terminated = True self.goal_reached = True print(f"🏆 HEDEF! Adım: {self.step_count}") # 8. SIKIŞMA KONTROLÜ if abs(progress) < 0.002 and abs(twist.linear.x) > 0.2: self.stuck_counter += 1 else: self.stuck_counter = 0 if self.stuck_counter > 100: reward -= 20.0 terminated = True print("💤 SIKIŞTI (Reset)") if self.step_count >= self.max_steps: truncated = True return obs, reward, terminated, truncated, {} # ===================================================== # RESET # ===================================================== def reset(self, seed=None, options=None): super().reset(seed=seed) self.episode_count += 1 self.pub_cmd.publish(Twist()) self.last_action = np.array([0.0, 0.0]) map_changed = False if self.episode_count % 20 == 0: self._randomize_obstacles() map_changed = True rx, ry, ryaw = 0, 0, 0 for _ in range(20): rx = np.random.uniform(-ARENA_SIZE, ARENA_SIZE) ry = np.random.uniform(-ARENA_SIZE, ARENA_SIZE) ryaw = np.random.uniform(-3.14, 3.14) if self._is_pos_valid(rx, ry, min_dist_robot=0, min_dist_box=1.5): break qw = math.cos(ryaw/2); qz = math.sin(ryaw/2) subprocess.run(['gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--timeout', '2000', '--req', f'name: "{ROBOT_NAME}", position: {{x: {rx}, y: {ry}, z: 0.06}}, orientation: {{w: {qw}, z: {qz}}}'], capture_output=True) time.sleep(0.05) if (self.target_x == 0 and self.target_y == 0) or self.goal_reached or map_changed: self._set_new_target() self.goal_reached = False self.prev_distance_to_target = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y) self.step_count = 0 self.stuck_counter = 0 self.scan_data = None self.scan_received = False wait_steps = 0 while not self.scan_received and wait_steps < 20: rclpy.spin_once(self.node, timeout_sec=0.1) wait_steps += 1 obs = self._get_observation(Twist()) return obs, {} def close(self): self.node.destroy_node() rclpy.shutdown() # ===================================================== # TRAIN (SAC + FrameStack + EvalCallback) # ===================================================== if __name__ == "__main__": log_dir = "./logs_bot1_sac/" checkpoint_dir = "./checkpoints_sac/" best_model_dir = "./best_model_sac/" os.makedirs(log_dir, exist_ok=True) os.makedirs(checkpoint_dir, exist_ok=True) os.makedirs(best_model_dir, exist_ok=True) # 1. Ortamı Kur env = RcCarEnv() env = Monitor(env, filename=os.path.join(log_dir, "monitor_log")) env = DummyVecEnv([lambda: env]) # 2. FrameStack (Hafıza) # 180 Lidar * 4 Frame = Robot hareket yönünü ve ivmesini anlar env = VecFrameStack(env, n_stack=4) # 3. EvalCallback (En İyi Modeli Kaydet) eval_callback = EvalCallback( env, best_model_save_path=best_model_dir, log_path=log_dir, eval_freq=10_000, deterministic=True, render=False, n_eval_episodes=5 ) checkpoint_callback = CheckpointCallback( save_freq=50_000, save_path=checkpoint_dir, name_prefix="bot1_sac" ) print("🚀 EĞİTİM BAŞLIYOR (SAC, 180 Samples, Anti-Lazy Mode)...") # 4. SAC Modeli model = SAC( "MlpPolicy", env, verbose=1, tensorboard_log=log_dir, buffer_size=300_000, learning_rate=3e-4, batch_size=512, ent_coef='auto', gamma=0.99, tau=0.01, train_freq=1, gradient_steps=1, # Ağ yapısı 180 sample veriyi kaldıracak kadar genişletildi policy_kwargs=dict(net_arch=[256, 256]), device="auto" ) try: model.learn( total_timesteps=2_000_000, callback=[checkpoint_callback, eval_callback], progress_bar=True ) model.save("bot1_sac_final") print("✅ EĞİTİM TAMAMLANDI") except KeyboardInterrupt: print("🛑 DURDURULDU") model.save("bot1_sac_interrupted") finally: env.close()