import os import time import math import random import numpy as np import gymnasium as gym from gymnasium import spaces import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, ReliabilityPolicy from geometry_msgs.msg import Twist from sensor_msgs.msg import LaserScan from nav_msgs.msg import Odometry import subprocess # --- LSTM KÜTÜPHANELERİ --- from sb3_contrib import RecurrentPPO from stable_baselines3.common.callbacks import CheckpointCallback from stable_baselines3.common.vec_env import DummyVecEnv from stable_baselines3.common.monitor import Monitor # --- GENEL AYARLAR --- ROBOT_NAME = "bot1" WORLD_NAME = "arena" CMD_VEL_TOPIC = f"/{ROBOT_NAME}/cmd_vel" SCAN_TOPIC = f"/{ROBOT_NAME}/scan" ODOM_TOPIC = f"/{ROBOT_NAME}/odom" ARENA_SIZE = 4.5 class RcCarEnv(gym.Env): def __init__(self): super().__init__() if not rclpy.ok(): rclpy.init() self.node = rclpy.create_node("rl_driver_node") qos = QoSProfile( reliability=ReliabilityPolicy.BEST_EFFORT, depth=10 ) self.pub_cmd = self.node.create_publisher(Twist, CMD_VEL_TOPIC, 10) self.sub_scan = self.node.create_subscription( LaserScan, SCAN_TOPIC, self.scan_cb, qos ) self.sub_odom = self.node.create_subscription( Odometry, ODOM_TOPIC, self.odom_cb, qos ) print(f"📡 Bağlantı Başarılı: {ROBOT_NAME} (Sabit Ceza Modu)") # ===================== # FİZİK # ===================== self.max_speed = 1.0 self.max_steering = 0.6 self.current_steering = 0.0 # ===================== # HARİTA # ===================== self.obstacle_names = [f"box_{i}" for i in range(1, 11)] self.box_positions = [] self.target_x = 0.0 self.target_y = 0.0 self.robot_x = 0.0 self.robot_y = 0.0 self.robot_yaw = 0.0 self.prev_distance_to_target = None self.goal_reached = False # ===================== # LIDAR # ===================== self.scan_data = None self.raw_scan_data = None self.scan_received = False self.n_obs = 60 self.max_range = 10.0 # ===================== # EPISODE # ===================== self.step_count = 0 self.max_steps = 2500 self.episode_count = 0 self.stuck_counter = 0 # ===================== # GYM # ===================== self.action_space = spaces.Box( low=np.array([-1.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32 ) self.observation_space = spaces.Box( low=0.0, high=1.0, shape=(self.n_obs + 4,), dtype=np.float32 ) # ===================================================== # CALLBACKS # ===================================================== def odom_cb(self, msg): self.robot_x = msg.pose.pose.position.x self.robot_y = msg.pose.pose.position.y q = msg.pose.pose.orientation siny_cosp = 2 * (q.w * q.z + q.x * q.y) cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z) self.robot_yaw = math.atan2(siny_cosp, cosy_cosp) def scan_cb(self, msg): raw = np.array(msg.ranges) raw = np.where(np.isinf(raw), self.max_range, raw) raw = np.where(np.isnan(raw), self.max_range, raw) self.raw_scan_data = np.clip(raw, 0.0, self.max_range) chunk = len(self.raw_scan_data) // self.n_obs if chunk > 0: self.scan_data = np.array( [ np.min(self.raw_scan_data[i * chunk:(i + 1) * chunk]) for i in range(self.n_obs) ], dtype=np.float32 ) else: self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32) self.scan_received = True # ===================================================== # OBSERVATION # ===================================================== def _get_observation(self, twist): if self.scan_data is None: lidar_norm = np.ones(self.n_obs, dtype=np.float32) else: lidar_norm = self.scan_data / self.max_range speed_norm = twist.linear.x / self.max_speed steering_norm = (self.current_steering / self.max_steering + 1.0) / 2.0 dist = math.hypot( self.target_x - self.robot_x, self.target_y - self.robot_y ) angle = math.atan2( self.target_y - self.robot_y, self.target_x - self.robot_x ) - self.robot_yaw while angle > math.pi: angle -= 2 * math.pi while angle < -math.pi: angle += 2 * math.pi dist_norm = np.clip(dist / (ARENA_SIZE * 2), 0.0, 1.0) angle_norm = (angle + math.pi) / (2 * math.pi) obs = np.concatenate( [lidar_norm, [speed_norm, steering_norm, dist_norm, angle_norm]] ) return obs.astype(np.float32) # ===================================================== # STEP # ===================================================== def step(self, action): twist = Twist() throttle = (float(action[0]) + 1.0) / 2.0 twist.linear.x = throttle * self.max_speed target_steer = float(action[1]) * self.max_steering self.current_steering = ( 0.6 * self.current_steering + 0.4 * target_steer ) twist.angular.z = self.current_steering self.pub_cmd.publish(twist) # --- lidar bekle --- self.scan_received = False start_time = time.time() while not self.scan_received: rclpy.spin_once(self.node, timeout_sec=0.001) if time.time() - start_time > 0.1: break obs = self._get_observation(twist) # --- metrikler --- dist = math.hypot( self.target_x - self.robot_x, self.target_y - self.robot_y ) angle = math.atan2( self.target_y - self.robot_y, self.target_x - self.robot_x ) - self.robot_yaw while angle > math.pi: angle -= 2 * math.pi while angle < -math.pi: angle += 2 * math.pi if self.scan_data is not None: min_lidar = np.min(self.scan_data) front_slice = self.n_obs // 4 front_start = self.n_obs // 2 - front_slice // 2 front_end = self.n_obs // 2 + front_slice // 2 front_min = np.min(self.scan_data[front_start:front_end]) else: min_lidar = 10.0 front_min = 10.0 # --- reward --- self.step_count += 1 reward = 0.0 terminated = False truncated = False if self.prev_distance_to_target is None: self.prev_distance_to_target = dist progress = self.prev_distance_to_target - dist heading_bonus = max(0.0, math.cos(angle)) reward += progress * 25.0 * (1.0 + heading_bonus) reward -= abs(self.current_steering) * 0.1 reward -= 0.2 self.prev_distance_to_target = dist # hedef if dist < 0.6: time_bonus = max( 0.0, (self.max_steps - self.step_count) / 10.0 ) reward += 200.0 + time_bonus terminated = True print(f"🏆 HEDEF! Bonus: {time_bonus:.1f}") # çarpışma if min_lidar < 0.25: reward -= 100.0 terminated = True print("💥 ÇARPIŞMA") # sıkışma if abs(progress) < 0.005 and twist.linear.x > 0.2: self.stuck_counter += 1 else: self.stuck_counter = 0 if self.stuck_counter > 80: reward -= 50.0 terminated = True print("💤 SIKIŞTI") if self.step_count >= self.max_steps: truncated = True reward -= 30.0 print("⏱️ ZAMAN AŞIMI") return obs, reward, terminated, truncated, {} # ===================================================== # RESET # ===================================================== def reset(self, seed=None, options=None): super().reset(seed=seed) self.episode_count += 1 self.pub_cmd.publish(Twist()) self.prev_distance_to_target = None self.step_count = 0 self.stuck_counter = 0 self.scan_data = None self.scan_received = False for _ in range(5): rclpy.spin_once(self.node, timeout_sec=0.1) obs = self._get_observation(Twist()) return obs, {} def close(self): self.node.destroy_node() rclpy.shutdown() # ===================================================== # TRAIN # ===================================================== if __name__ == "__main__": log_dir = "./logs_bot1/" checkpoint_dir = "./checkpoints/" os.makedirs(log_dir, exist_ok=True) os.makedirs(checkpoint_dir, exist_ok=True) env = RcCarEnv() env = Monitor(env, filename=os.path.join(log_dir, "monitor_log")) env = DummyVecEnv([lambda: env]) checkpoint_callback = CheckpointCallback( save_freq=50_000, save_path=checkpoint_dir, name_prefix="bot1_fixed" ) print("🚀 EĞİTİM BAŞLIYOR...") model = RecurrentPPO( "MlpLstmPolicy", env, verbose=1, tensorboard_log=log_dir, learning_rate=1e-4, n_steps=2048, batch_size=64, gamma=0.99, ent_coef=0.01, policy_kwargs=dict( net_arch=[dict(pi=[256, 256], vf=[256, 256])], lstm_hidden_size=256, n_lstm_layers=1 ), device="auto" ) try: model.learn( total_timesteps=2_000_000, callback=checkpoint_callback, progress_bar=True ) model.save("bot1_fixed_final") print("✅ EĞİTİM TAMAMLANDI") except KeyboardInterrupt: print("🛑 DURDURULDU") model.save("bot1_fixed_interrupted") finally: env.close()