| | import os |
| | import time |
| | import math |
| | import random |
| | import numpy as np |
| | import gymnasium as gym |
| | from gymnasium import spaces |
| |
|
| | import rclpy |
| | from rclpy.node import Node |
| | from rclpy.qos import QoSProfile, ReliabilityPolicy |
| |
|
| | from geometry_msgs.msg import Twist |
| | from sensor_msgs.msg import LaserScan |
| | from nav_msgs.msg import Odometry |
| |
|
| | import subprocess |
| |
|
| | |
| | from stable_baselines3 import SAC |
| | from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback |
| | from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack |
| | from stable_baselines3.common.monitor import Monitor |
| |
|
| | |
| | ROBOT_NAME = "bot1" |
| | WORLD_NAME = "arena" |
| |
|
| | CMD_VEL_TOPIC = f"/{ROBOT_NAME}/cmd_vel" |
| | SCAN_TOPIC = f"/{ROBOT_NAME}/scan" |
| | |
| | ODOM_TOPIC = f"/{ROBOT_NAME}/odometry" |
| |
|
| | ARENA_SIZE = 4.5 |
| |
|
| | class RcCarEnv(gym.Env): |
| | def __init__(self): |
| | super().__init__() |
| |
|
| | if not rclpy.ok(): |
| | rclpy.init() |
| |
|
| | self.node = rclpy.create_node("rl_driver_node") |
| |
|
| | |
| | qos = QoSProfile(reliability=ReliabilityPolicy.BEST_EFFORT, depth=10) |
| |
|
| | self.pub_cmd = self.node.create_publisher(Twist, CMD_VEL_TOPIC, 10) |
| | self.sub_scan = self.node.create_subscription(LaserScan, SCAN_TOPIC, self.scan_cb, qos) |
| | self.sub_odom = self.node.create_subscription(Odometry, ODOM_TOPIC, self.odom_cb, qos) |
| |
|
| | print(f"📡 Sistem Başlatıldı: {ROBOT_NAME} | Action Repeat: AKTİF | Lidar: 180") |
| |
|
| | |
| | |
| | |
| | self.max_speed = 1.0 |
| | self.max_steering = 0.6 |
| | self.current_steering = 0.0 |
| | self.last_action = np.array([0.0, 0.0]) |
| |
|
| | |
| | |
| | |
| | self.action_repeat = 4 |
| |
|
| | |
| | |
| | |
| | self.n_obs = 180 |
| | self.max_range = 10.0 |
| | self.min_range = 0.15 |
| | |
| | self.scan_data = None |
| | self.raw_scan_data = None |
| | self.scan_received = False |
| |
|
| | |
| | |
| | |
| | self.obstacle_names = [f"box_{i}" for i in range(1, 7)] |
| | self.box_positions = [] |
| | self.target_x = 0.0 |
| | self.target_y = 0.0 |
| | self.robot_x = 0.0 |
| | self.robot_y = 0.0 |
| | self.robot_yaw = 0.0 |
| | |
| | self.prev_distance_to_target = None |
| | self.goal_reached = False |
| | self.step_count = 0 |
| | self.max_steps = 2500 // self.action_repeat |
| | self.episode_count = 0 |
| | self.stuck_counter = 0 |
| |
|
| | |
| | |
| | |
| | self.action_space = spaces.Box( |
| | low=np.array([-1.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32 |
| | ) |
| |
|
| | |
| | self.observation_space = spaces.Box( |
| | low=0.0, high=1.0, shape=(self.n_obs + 4,), dtype=np.float32 |
| | ) |
| |
|
| | |
| | |
| | |
| | def odom_cb(self, msg): |
| | self.robot_x = msg.pose.pose.position.x |
| | self.robot_y = msg.pose.pose.position.y |
| |
|
| | q = msg.pose.pose.orientation |
| | siny_cosp = 2 * (q.w * q.z + q.x * q.y) |
| | cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z) |
| | self.robot_yaw = math.atan2(siny_cosp, cosy_cosp) |
| |
|
| | def scan_cb(self, msg): |
| | raw = np.array(msg.ranges) |
| | |
| | raw = np.where(np.isinf(raw), self.max_range, raw) |
| | raw = np.where(np.isnan(raw), self.max_range, raw) |
| | |
| | self.raw_scan_data = np.clip(raw, 0.0, self.max_range) |
| |
|
| | |
| | chunk = len(self.raw_scan_data) // self.n_obs |
| | if chunk > 0: |
| | self.scan_data = np.array( |
| | [np.min(self.raw_scan_data[i * chunk:(i + 1) * chunk]) for i in range(self.n_obs)], |
| | dtype=np.float32 |
| | ) |
| | else: |
| | self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32) |
| |
|
| | self.scan_received = True |
| |
|
| | |
| | |
| | |
| | def _update_target_marker(self, x, y): |
| | |
| | cmd = [ |
| | 'gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', |
| | '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', |
| | '--timeout', '100', |
| | '--req', f'name: "target_marker", position: {{x: {x}, y: {y}, z: 2.0}}' |
| | ] |
| | subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| | |
| | sdf_xml = f"""<sdf version='1.8'><model name='target_marker'><pose>{x} {y} 2.0 0 0 0</pose><static>true</static><link name='link'><visual name='visual'><geometry><sphere><radius>0.3</radius></sphere></geometry><material><ambient>0 1 0 1</ambient><diffuse>0 1 0 1</diffuse></material></visual></link></model></sdf>""" |
| | sdf_xml_str = sdf_xml.replace('\n', '').replace(' ', '') |
| | subprocess.Popen(['gz', 'service', '-s', f'/world/{WORLD_NAME}/create', '--reqtype', 'gz.msgs.EntityFactory', '--reptype', 'gz.msgs.Boolean', '--req', f'sdf: "{sdf_xml_str}", name: "target_marker"'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| |
|
| | def _is_pos_valid(self, x, y, min_dist_robot=2.0, min_dist_box=1.3): |
| | if np.sqrt((x - self.robot_x)**2 + (y - self.robot_y)**2) < min_dist_robot: return False |
| | for (bx, by) in self.box_positions: |
| | if abs(x - bx) < min_dist_box and abs(y - by) < min_dist_box: return False |
| | return True |
| |
|
| | def _randomize_obstacles(self): |
| | self.box_positions = [] |
| | for name in self.obstacle_names: |
| | bx, by = 0, 0 |
| | valid = False |
| | for _ in range(30): |
| | bx = np.random.uniform(-ARENA_SIZE, ARENA_SIZE) |
| | by = np.random.uniform(-ARENA_SIZE, ARENA_SIZE) |
| | overlap = False |
| | for (ex_x, ex_y) in self.box_positions: |
| | if np.sqrt((bx - ex_x)**2 + (by - ex_y)**2) < 1.5: overlap = True; break |
| | dist_to_robot = np.sqrt((bx - self.robot_x)**2 + (by - self.robot_y)**2) |
| | if not overlap and dist_to_robot > 2.5: valid = True; break |
| | |
| | if valid: |
| | self.box_positions.append((bx, by)) |
| | |
| | cmd = ['gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--req', f'name: "{name}", position: {{x: {bx}, y: {by}, z: 0.5}}'] |
| | subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| | time.sleep(0.15) |
| |
|
| | def _set_new_target(self): |
| | valid = False; tx, ty = 0, 0 |
| | for _ in range(50): |
| | tx = np.random.uniform(-ARENA_SIZE + 0.5, ARENA_SIZE - 0.5) |
| | ty = np.random.uniform(-ARENA_SIZE + 0.5, ARENA_SIZE - 0.5) |
| | if self._is_pos_valid(tx, ty, min_dist_robot=3.0, min_dist_box=1.3): valid = True; break |
| | if not valid: tx, ty = 2.0, 2.0 |
| | self.target_x = tx; self.target_y = ty |
| | self._update_target_marker(tx, ty) |
| | print(f"🎯 Yeni Hedef: ({tx:.1f}, {ty:.1f})") |
| |
|
| | def _get_observation(self, twist): |
| | if self.scan_data is None: lidar_norm = np.ones(self.n_obs, dtype=np.float32) |
| | else: lidar_norm = self.scan_data / self.max_range |
| |
|
| | speed_norm = twist.linear.x / self.max_speed |
| | steering_norm = (self.current_steering / self.max_steering + 1.0) / 2.0 |
| |
|
| | dist = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y) |
| | angle = math.atan2(self.target_y - self.robot_y, self.target_x - self.robot_x) - self.robot_yaw |
| | while angle > math.pi: angle -= 2 * math.pi |
| | while angle < -math.pi: angle += 2 * math.pi |
| |
|
| | dist_norm = np.clip(dist / (ARENA_SIZE * 2), 0.0, 1.0) |
| | angle_norm = (angle + math.pi) / (2 * math.pi) |
| | |
| | |
| | obs = np.concatenate([lidar_norm, [speed_norm, steering_norm, dist_norm, angle_norm]]) |
| | return obs.astype(np.float32) |
| |
|
| | |
| | |
| | |
| | def step(self, action): |
| | twist = Twist() |
| | throttle = float(action[0]) |
| | twist.linear.x = throttle * self.max_speed |
| | |
| | target_steer = float(action[1]) * self.max_steering |
| | self.current_steering = 0.6 * self.current_steering + 0.4 * target_steer |
| | twist.angular.z = self.current_steering |
| |
|
| | |
| | |
| | |
| | total_collision_check = False |
| | |
| | for _ in range(self.action_repeat): |
| | self.pub_cmd.publish(twist) |
| | |
| | |
| | self.scan_received = False |
| | s_time = time.time() |
| | while not self.scan_received: |
| | rclpy.spin_once(self.node, timeout_sec=0.001) |
| | if time.time() - s_time > 0.1: break |
| | |
| | |
| | if self.scan_data is not None and np.min(self.scan_data) < 0.18: |
| | total_collision_check = True |
| | break |
| |
|
| | obs = self._get_observation(twist) |
| | |
| | |
| | dist = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y) |
| | if self.scan_data is not None: min_lidar = np.min(self.scan_data) |
| | else: min_lidar = 10.0 |
| |
|
| | self.step_count += 1 |
| | reward = 0.0 |
| | terminated = False |
| | truncated = False |
| |
|
| | if self.prev_distance_to_target is None: self.prev_distance_to_target = dist |
| |
|
| | |
| | progress = self.prev_distance_to_target - dist |
| | reward += progress * 50.0 |
| | self.prev_distance_to_target = dist |
| |
|
| | |
| | if twist.linear.x > 0.2: |
| | goal_angle = math.atan2(self.target_y - self.robot_y, self.target_x - self.robot_x) |
| | heading_error = goal_angle - self.robot_yaw |
| | while heading_error > math.pi: heading_error -= 2 * math.pi |
| | while heading_error < -math.pi: heading_error += 2 * math.pi |
| | reward += math.cos(heading_error) * 0.6 |
| |
|
| | |
| | |
| | reward -= 0.05 |
| |
|
| | |
| | if twist.linear.x < 0.1: reward -= 0.15 |
| | elif min_lidar > 1.5 and twist.linear.x > 0.8: reward += 0.2 |
| |
|
| | |
| | delta_action = np.abs(self.last_action[1] - action[1]) |
| | reward -= delta_action * 0.2 |
| | self.last_action = action |
| |
|
| | |
| | if min_lidar < 0.25: reward -= 2.0 |
| | |
| | |
| | if min_lidar < 0.18 or total_collision_check: |
| | reward -= 50.0 |
| | terminated = True |
| | print("💥 ÇARPIŞMA") |
| |
|
| | |
| | if dist < 0.6: |
| | reward += 100.0 |
| | terminated = True |
| | self.goal_reached = True |
| | print(f"🏆 HEDEF! Adım: {self.step_count}") |
| |
|
| | |
| | if abs(progress) < 0.002 and abs(twist.linear.x) > 0.2: self.stuck_counter += 1 |
| | else: self.stuck_counter = 0 |
| |
|
| | if self.stuck_counter > 50: |
| | reward -= 20.0 |
| | terminated = True |
| | print("💤 SIKIŞTI") |
| |
|
| | if self.step_count >= self.max_steps: truncated = True |
| |
|
| | return obs, reward, terminated, truncated, {} |
| |
|
| | def reset(self, seed=None, options=None): |
| | super().reset(seed=seed) |
| | self.episode_count += 1 |
| | self.pub_cmd.publish(Twist()) |
| | self.last_action = np.array([0.0, 0.0]) |
| |
|
| | map_changed = False |
| | if self.episode_count % 20 == 0: |
| | self._randomize_obstacles() |
| | map_changed = True |
| |
|
| | rx, ry, ryaw = 0, 0, 0 |
| | for _ in range(20): |
| | rx = np.random.uniform(-ARENA_SIZE, ARENA_SIZE) |
| | ry = np.random.uniform(-ARENA_SIZE, ARENA_SIZE) |
| | ryaw = np.random.uniform(-3.14, 3.14) |
| | if self._is_pos_valid(rx, ry, min_dist_robot=0, min_dist_box=1.5): break |
| | |
| | qw = math.cos(ryaw/2); qz = math.sin(ryaw/2) |
| | subprocess.run(['gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--timeout', '2000', '--req', f'name: "{ROBOT_NAME}", position: {{x: {rx}, y: {ry}, z: 0.06}}, orientation: {{w: {qw}, z: {qz}}}'], capture_output=True) |
| | time.sleep(0.05) |
| |
|
| | if (self.target_x == 0 and self.target_y == 0) or self.goal_reached or map_changed: |
| | self._set_new_target() |
| | self.goal_reached = False |
| |
|
| | self.prev_distance_to_target = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y) |
| | self.step_count = 0 |
| | self.stuck_counter = 0 |
| | |
| | self.scan_data = None |
| | self.scan_received = False |
| | wait_steps = 0 |
| | while not self.scan_received and wait_steps < 20: |
| | rclpy.spin_once(self.node, timeout_sec=0.1) |
| | wait_steps += 1 |
| |
|
| | obs = self._get_observation(Twist()) |
| | return obs, {} |
| |
|
| | def close(self): |
| | self.node.destroy_node() |
| | rclpy.shutdown() |
| |
|
| | |
| | |
| | |
| | if __name__ == "__main__": |
| | log_dir = "./logs_bot1_pro/" |
| | checkpoint_dir = "./checkpoints_pro/" |
| | best_model_dir = "./best_model_pro/" |
| |
|
| | os.makedirs(log_dir, exist_ok=True) |
| | os.makedirs(checkpoint_dir, exist_ok=True) |
| | os.makedirs(best_model_dir, exist_ok=True) |
| |
|
| | |
| | env = RcCarEnv() |
| | env = Monitor(env, filename=os.path.join(log_dir, "monitor_log")) |
| | env = DummyVecEnv([lambda: env]) |
| | |
| | |
| | |
| | env = VecFrameStack(env, n_stack=4) |
| |
|
| | |
| | eval_callback = EvalCallback( |
| | env, |
| | best_model_save_path=best_model_dir, |
| | log_path=log_dir, |
| | eval_freq=5000, |
| | deterministic=True, |
| | render=False, |
| | n_eval_episodes=5 |
| | ) |
| |
|
| | checkpoint_callback = CheckpointCallback( |
| | save_freq=25000, |
| | save_path=checkpoint_dir, |
| | name_prefix="bot1_pro" |
| | ) |
| |
|
| | print("🚀 EĞİTİM BAŞLIYOR (Pro Mode: SAC + Action Repeat + Anti-Lazy)...") |
| |
|
| | |
| | model = SAC( |
| | "MlpPolicy", |
| | env, |
| | verbose=1, |
| | tensorboard_log=log_dir, |
| | buffer_size=300_000, |
| | learning_rate=3e-4, |
| | batch_size=512, |
| | ent_coef='auto', |
| | gamma=0.99, |
| | tau=0.01, |
| | train_freq=1, |
| | gradient_steps=1, |
| | |
| | policy_kwargs=dict(net_arch=[256, 256]), |
| | device="auto" |
| | ) |
| |
|
| | try: |
| | model.learn( |
| | total_timesteps=2_000_000, |
| | callback=[checkpoint_callback, eval_callback], |
| | progress_bar=True |
| | ) |
| | model.save("bot1_pro_final") |
| | print("✅ EĞİTİM TAMAMLANDI") |
| |
|
| | except KeyboardInterrupt: |
| | print("🛑 DURDURULDU") |
| | model.save("bot1_pro_interrupted") |
| |
|
| | finally: |
| | env.close() |