car_env / training_pro.py
Hajorda's picture
Upload folder using huggingface_hub
1a0d68d verified
import os
import time
import math
import random
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
import subprocess
# --- STABLE BASELINES3 ---
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.common.monitor import Monitor
# --- GENEL AYARLAR ---
ROBOT_NAME = "bot1"
WORLD_NAME = "arena"
CMD_VEL_TOPIC = f"/{ROBOT_NAME}/cmd_vel"
SCAN_TOPIC = f"/{ROBOT_NAME}/scan"
# URDF Uyumu: Odometry topic ismini düzelttik
ODOM_TOPIC = f"/{ROBOT_NAME}/odometry"
ARENA_SIZE = 4.5
class RcCarEnv(gym.Env):
def __init__(self):
super().__init__()
if not rclpy.ok():
rclpy.init()
self.node = rclpy.create_node("rl_driver_node")
# QoS: Simülasyon verisi kaybını önlemek için Best Effort
qos = QoSProfile(reliability=ReliabilityPolicy.BEST_EFFORT, depth=10)
self.pub_cmd = self.node.create_publisher(Twist, CMD_VEL_TOPIC, 10)
self.sub_scan = self.node.create_subscription(LaserScan, SCAN_TOPIC, self.scan_cb, qos)
self.sub_odom = self.node.create_subscription(Odometry, ODOM_TOPIC, self.odom_cb, qos)
print(f"📡 Sistem Başlatıldı: {ROBOT_NAME} | Action Repeat: AKTİF | Lidar: 180")
# =====================
# FİZİK & DONANIM
# =====================
self.max_speed = 1.0
self.max_steering = 0.6
self.current_steering = 0.0
self.last_action = np.array([0.0, 0.0])
# MAKALE TEKNİĞİ: Action Repeat (Maintain Decision)
# Robot her kararı 4 simülasyon adımı boyunca uygular.
# Bu, 0.1 saniyelik bir kararlılık sağlar ve titremeyi önler.
self.action_repeat = 4
# =====================
# LIDAR (A1M8 - URDF UYUMLU)
# =====================
self.n_obs = 180 # URDF <samples>180</samples>
self.max_range = 10.0
self.min_range = 0.15 # Kör nokta
self.scan_data = None
self.raw_scan_data = None
self.scan_received = False
# =====================
# HARİTA & HEDEF
# =====================
self.obstacle_names = [f"box_{i}" for i in range(1, 7)]
self.box_positions = []
self.target_x = 0.0
self.target_y = 0.0
self.robot_x = 0.0
self.robot_y = 0.0
self.robot_yaw = 0.0
self.prev_distance_to_target = None
self.goal_reached = False
self.step_count = 0
self.max_steps = 2500 // self.action_repeat # Adım sayısı repeat ile bölündü
self.episode_count = 0
self.stuck_counter = 0
# =====================
# GYM SPACE
# =====================
self.action_space = spaces.Box(
low=np.array([-1.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32
)
# Obs: Lidar(180) + Hız(1) + Direksiyon(1) + Hedef Mesafesi(1) + Hedef Açısı(1) = 184
self.observation_space = spaces.Box(
low=0.0, high=1.0, shape=(self.n_obs + 4,), dtype=np.float32
)
# =====================================================
# CALLBACKS
# =====================================================
def odom_cb(self, msg):
self.robot_x = msg.pose.pose.position.x
self.robot_y = msg.pose.pose.position.y
q = msg.pose.pose.orientation
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z)
self.robot_yaw = math.atan2(siny_cosp, cosy_cosp)
def scan_cb(self, msg):
raw = np.array(msg.ranges)
# Hatalı verileri temizle
raw = np.where(np.isinf(raw), self.max_range, raw)
raw = np.where(np.isnan(raw), self.max_range, raw)
self.raw_scan_data = np.clip(raw, 0.0, self.max_range)
# Downsampling (Min-Pooling)
chunk = len(self.raw_scan_data) // self.n_obs
if chunk > 0:
self.scan_data = np.array(
[np.min(self.raw_scan_data[i * chunk:(i + 1) * chunk]) for i in range(self.n_obs)],
dtype=np.float32
)
else:
self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32)
self.scan_received = True
# =====================================================
# YARDIMCI FONKSİYONLAR
# =====================================================
def _update_target_marker(self, x, y):
# TRICK: Hedefi Z=2.0m havaya kaldırıyoruz (Lidar görmesin)
cmd = [
'gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose',
'--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean',
'--timeout', '100',
'--req', f'name: "target_marker", position: {{x: {x}, y: {y}, z: 2.0}}'
]
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
sdf_xml = f"""<sdf version='1.8'><model name='target_marker'><pose>{x} {y} 2.0 0 0 0</pose><static>true</static><link name='link'><visual name='visual'><geometry><sphere><radius>0.3</radius></sphere></geometry><material><ambient>0 1 0 1</ambient><diffuse>0 1 0 1</diffuse></material></visual></link></model></sdf>"""
sdf_xml_str = sdf_xml.replace('\n', '').replace(' ', '')
subprocess.Popen(['gz', 'service', '-s', f'/world/{WORLD_NAME}/create', '--reqtype', 'gz.msgs.EntityFactory', '--reptype', 'gz.msgs.Boolean', '--req', f'sdf: "{sdf_xml_str}", name: "target_marker"'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def _is_pos_valid(self, x, y, min_dist_robot=2.0, min_dist_box=1.3):
if np.sqrt((x - self.robot_x)**2 + (y - self.robot_y)**2) < min_dist_robot: return False
for (bx, by) in self.box_positions:
if abs(x - bx) < min_dist_box and abs(y - by) < min_dist_box: return False
return True
def _randomize_obstacles(self):
self.box_positions = []
for name in self.obstacle_names:
bx, by = 0, 0
valid = False
for _ in range(30):
bx = np.random.uniform(-ARENA_SIZE, ARENA_SIZE)
by = np.random.uniform(-ARENA_SIZE, ARENA_SIZE)
overlap = False
for (ex_x, ex_y) in self.box_positions:
if np.sqrt((bx - ex_x)**2 + (by - ex_y)**2) < 1.5: overlap = True; break
dist_to_robot = np.sqrt((bx - self.robot_x)**2 + (by - self.robot_y)**2)
if not overlap and dist_to_robot > 2.5: valid = True; break
if valid:
self.box_positions.append((bx, by))
# Kutular yerde (Z=0.5) durmaya devam ediyor
cmd = ['gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--req', f'name: "{name}", position: {{x: {bx}, y: {by}, z: 0.5}}']
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.15)
def _set_new_target(self):
valid = False; tx, ty = 0, 0
for _ in range(50):
tx = np.random.uniform(-ARENA_SIZE + 0.5, ARENA_SIZE - 0.5)
ty = np.random.uniform(-ARENA_SIZE + 0.5, ARENA_SIZE - 0.5)
if self._is_pos_valid(tx, ty, min_dist_robot=3.0, min_dist_box=1.3): valid = True; break
if not valid: tx, ty = 2.0, 2.0
self.target_x = tx; self.target_y = ty
self._update_target_marker(tx, ty)
print(f"🎯 Yeni Hedef: ({tx:.1f}, {ty:.1f})")
def _get_observation(self, twist):
if self.scan_data is None: lidar_norm = np.ones(self.n_obs, dtype=np.float32)
else: lidar_norm = self.scan_data / self.max_range
speed_norm = twist.linear.x / self.max_speed
steering_norm = (self.current_steering / self.max_steering + 1.0) / 2.0
dist = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y)
angle = math.atan2(self.target_y - self.robot_y, self.target_x - self.robot_x) - self.robot_yaw
while angle > math.pi: angle -= 2 * math.pi
while angle < -math.pi: angle += 2 * math.pi
dist_norm = np.clip(dist / (ARENA_SIZE * 2), 0.0, 1.0)
angle_norm = (angle + math.pi) / (2 * math.pi)
# [180 Lidar + 4 Metrik] = 184 Boyutlu Vektör
obs = np.concatenate([lidar_norm, [speed_norm, steering_norm, dist_norm, angle_norm]])
return obs.astype(np.float32)
# =====================================================
# STEP (HAREKET, REPEAT ve ÖDÜL)
# =====================================================
def step(self, action):
twist = Twist()
throttle = float(action[0])
twist.linear.x = throttle * self.max_speed
target_steer = float(action[1]) * self.max_steering
self.current_steering = 0.6 * self.current_steering + 0.4 * target_steer
twist.angular.z = self.current_steering
# --- MAKALE TEKNİĞİ: ACTION REPEAT ---
# Kararı 4 defa uygula (Yaklaşık 0.1 - 0.2 saniye sürer)
# Bu sırada çarpışma olursa döngüyü kır.
total_collision_check = False
for _ in range(self.action_repeat):
self.pub_cmd.publish(twist)
# Veri senkronizasyonu
self.scan_received = False
s_time = time.time()
while not self.scan_received:
rclpy.spin_once(self.node, timeout_sec=0.001)
if time.time() - s_time > 0.1: break
# Eğer Repeat sırasında çarparsa dur
if self.scan_data is not None and np.min(self.scan_data) < 0.18:
total_collision_check = True
break
obs = self._get_observation(twist)
# --- ÖDÜL HESAPLAMA (ANTI-LAZY) ---
dist = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y)
if self.scan_data is not None: min_lidar = np.min(self.scan_data)
else: min_lidar = 10.0
self.step_count += 1
reward = 0.0
terminated = False
truncated = False
if self.prev_distance_to_target is None: self.prev_distance_to_target = dist
# 1. Mesafe İlerlemesi
progress = self.prev_distance_to_target - dist
reward += progress * 50.0
self.prev_distance_to_target = dist
# 2. HEADING (Sadece Hareketliyse)
if twist.linear.x > 0.2:
goal_angle = math.atan2(self.target_y - self.robot_y, self.target_x - self.robot_x)
heading_error = goal_angle - self.robot_yaw
while heading_error > math.pi: heading_error -= 2 * math.pi
while heading_error < -math.pi: heading_error += 2 * math.pi
reward += math.cos(heading_error) * 0.6 # Katsayı biraz artırıldı
# 3. LIVING PENALTY (Tembellik Cezası)
# Her karar anında (4 frame'de bir) ceza kes
reward -= 0.05
# 4. HIZ VE DURMA
if twist.linear.x < 0.1: reward -= 0.15 # Durmak daha pahalı
elif min_lidar > 1.5 and twist.linear.x > 0.8: reward += 0.2
# 5. ACTION SMOOTHING
delta_action = np.abs(self.last_action[1] - action[1])
reward -= delta_action * 0.2
self.last_action = action
# 6. GÜVENLİK
if min_lidar < 0.25: reward -= 2.0
# Çarpışma (Action Repeat içinde veya sonunda)
if min_lidar < 0.18 or total_collision_check:
reward -= 50.0
terminated = True
print("💥 ÇARPIŞMA")
# 7. HEDEF
if dist < 0.6:
reward += 100.0
terminated = True
self.goal_reached = True
print(f"🏆 HEDEF! Adım: {self.step_count}")
# 8. SIKIŞMA
if abs(progress) < 0.002 and abs(twist.linear.x) > 0.2: self.stuck_counter += 1
else: self.stuck_counter = 0
if self.stuck_counter > 50: # Repeat olduğu için eşiği düşürdük
reward -= 20.0
terminated = True
print("💤 SIKIŞTI")
if self.step_count >= self.max_steps: truncated = True
return obs, reward, terminated, truncated, {}
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.episode_count += 1
self.pub_cmd.publish(Twist())
self.last_action = np.array([0.0, 0.0])
map_changed = False
if self.episode_count % 20 == 0:
self._randomize_obstacles()
map_changed = True
rx, ry, ryaw = 0, 0, 0
for _ in range(20):
rx = np.random.uniform(-ARENA_SIZE, ARENA_SIZE)
ry = np.random.uniform(-ARENA_SIZE, ARENA_SIZE)
ryaw = np.random.uniform(-3.14, 3.14)
if self._is_pos_valid(rx, ry, min_dist_robot=0, min_dist_box=1.5): break
qw = math.cos(ryaw/2); qz = math.sin(ryaw/2)
subprocess.run(['gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--timeout', '2000', '--req', f'name: "{ROBOT_NAME}", position: {{x: {rx}, y: {ry}, z: 0.06}}, orientation: {{w: {qw}, z: {qz}}}'], capture_output=True)
time.sleep(0.05)
if (self.target_x == 0 and self.target_y == 0) or self.goal_reached or map_changed:
self._set_new_target()
self.goal_reached = False
self.prev_distance_to_target = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y)
self.step_count = 0
self.stuck_counter = 0
self.scan_data = None
self.scan_received = False
wait_steps = 0
while not self.scan_received and wait_steps < 20:
rclpy.spin_once(self.node, timeout_sec=0.1)
wait_steps += 1
obs = self._get_observation(Twist())
return obs, {}
def close(self):
self.node.destroy_node()
rclpy.shutdown()
# =====================================================
# MAIN (TRAINING)
# =====================================================
if __name__ == "__main__":
log_dir = "./logs_bot1_pro/"
checkpoint_dir = "./checkpoints_pro/"
best_model_dir = "./best_model_pro/"
os.makedirs(log_dir, exist_ok=True)
os.makedirs(checkpoint_dir, exist_ok=True)
os.makedirs(best_model_dir, exist_ok=True)
# 1. Ortamı Kur
env = RcCarEnv()
env = Monitor(env, filename=os.path.join(log_dir, "monitor_log"))
env = DummyVecEnv([lambda: env])
# 2. FrameStack (Hafıza)
# Action Repeat zaten var ama FrameStack ivme algısı için hala yararlı
env = VecFrameStack(env, n_stack=4)
# 3. EvalCallback (En İyi Modeli Kaydet)
eval_callback = EvalCallback(
env,
best_model_save_path=best_model_dir,
log_path=log_dir,
eval_freq=5000, # Repeat olduğu için daha sık kontrol et
deterministic=True,
render=False,
n_eval_episodes=5
)
checkpoint_callback = CheckpointCallback(
save_freq=25000,
save_path=checkpoint_dir,
name_prefix="bot1_pro"
)
print("🚀 EĞİTİM BAŞLIYOR (Pro Mode: SAC + Action Repeat + Anti-Lazy)...")
# 4. SAC Modeli
model = SAC(
"MlpPolicy",
env,
verbose=1,
tensorboard_log=log_dir,
buffer_size=300_000,
learning_rate=3e-4,
batch_size=512,
ent_coef='auto',
gamma=0.99,
tau=0.01,
train_freq=1,
gradient_steps=1,
# Genişletilmiş Ağ Yapısı
policy_kwargs=dict(net_arch=[256, 256]),
device="auto"
)
try:
model.learn(
total_timesteps=2_000_000,
callback=[checkpoint_callback, eval_callback],
progress_bar=True
)
model.save("bot1_pro_final")
print("✅ EĞİTİM TAMAMLANDI")
except KeyboardInterrupt:
print("🛑 DURDURULDU")
model.save("bot1_pro_interrupted")
finally:
env.close()