car_env / training2.py
Hajorda's picture
Upload folder using huggingface_hub
1a0d68d verified
import os
import time
import math
import random
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
import subprocess
# --- LSTM KÜTÜPHANELERİ ---
from sb3_contrib import RecurrentPPO
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.monitor import Monitor
# --- GENEL AYARLAR ---
ROBOT_NAME = "bot1"
WORLD_NAME = "arena"
CMD_VEL_TOPIC = f"/{ROBOT_NAME}/cmd_vel"
SCAN_TOPIC = f"/{ROBOT_NAME}/scan"
ODOM_TOPIC = f"/{ROBOT_NAME}/odom"
ARENA_SIZE = 4.5
class RcCarEnv(gym.Env):
def __init__(self):
super().__init__()
if not rclpy.ok():
rclpy.init()
self.node = rclpy.create_node("rl_driver_node")
qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
depth=10
)
self.pub_cmd = self.node.create_publisher(Twist, CMD_VEL_TOPIC, 10)
self.sub_scan = self.node.create_subscription(
LaserScan, SCAN_TOPIC, self.scan_cb, qos
)
self.sub_odom = self.node.create_subscription(
Odometry, ODOM_TOPIC, self.odom_cb, qos
)
print(f"📡 Bağlantı Başarılı: {ROBOT_NAME} (Revize Edilmiş Mod)")
# =====================
# FİZİK
# =====================
self.max_speed = 1.0
self.max_steering = 0.6
self.current_steering = 0.0
# =====================
# HARİTA
# =====================
self.obstacle_names = [f"box_{i}" for i in range(1, 7)]
self.box_positions = []
self.target_x = 0.0
self.target_y = 0.0
self.robot_x = 0.0
self.robot_y = 0.0
self.robot_yaw = 0.0
self.prev_distance_to_target = None
self.goal_reached = False
# =====================
# LIDAR
# =====================
self.scan_data = None
self.raw_scan_data = None
self.scan_received = False
self.n_obs = 60
self.max_range = 10.0
# =====================
# EPISODE
# =====================
self.step_count = 0
self.max_steps = 2500
self.episode_count = 0
self.stuck_counter = 0
# =====================
# GYM
# =====================
self.action_space = spaces.Box(
low=np.array([-1.0, -1.0]),
high=np.array([1.0, 1.0]),
dtype=np.float32
)
self.observation_space = spaces.Box(
low=0.0,
high=1.0,
shape=(self.n_obs + 4,),
dtype=np.float32
)
# =====================================================
# CALLBACKS
# =====================================================
def odom_cb(self, msg):
self.robot_x = msg.pose.pose.position.x
self.robot_y = msg.pose.pose.position.y
q = msg.pose.pose.orientation
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z)
self.robot_yaw = math.atan2(siny_cosp, cosy_cosp)
def scan_cb(self, msg):
raw = np.array(msg.ranges)
raw = np.where(np.isinf(raw), self.max_range, raw)
raw = np.where(np.isnan(raw), self.max_range, raw)
self.raw_scan_data = np.clip(raw, 0.0, self.max_range)
chunk = len(self.raw_scan_data) // self.n_obs
if chunk > 0:
self.scan_data = np.array(
[
np.min(self.raw_scan_data[i * chunk:(i + 1) * chunk])
for i in range(self.n_obs)
],
dtype=np.float32
)
else:
self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32)
self.scan_received = True
# =====================================================
# YARDIMCI FONKSİYONLAR
# =====================================================
def _update_target_marker(self, x, y):
# Sessiz ve hızlı marker güncelleme
cmd = [
'gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose',
'--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean',
'--timeout', '100', # Timeout düşürüldü
'--req', f'name: "target_marker", position: {{x: {x}, y: {y}, z: 0.3}}'
]
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# İlk oluşturma (Hata verse de devam et)
sdf_xml = f"""<sdf version='1.8'><model name='target_marker'><pose>{x} {y} 0.3 0 0 0</pose><static>true</static><link name='link'><visual name='visual'><geometry><sphere><radius>0.3</radius></sphere></geometry><material><ambient>1 0 0 1</ambient><diffuse>1 0 0 1</diffuse></material></visual></link></model></sdf>"""
subprocess.Popen(['gz', 'service', '-s', f'/world/{WORLD_NAME}/create', '--reqtype', 'gz.msgs.EntityFactory', '--reptype', 'gz.msgs.Boolean', '--req', f'sdf: "{sdf_xml}", name: "target_marker"'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def _is_pos_valid(self, x, y, min_dist_robot=2.0, min_dist_box=1.3):
if np.sqrt((x - self.robot_x)**2 + (y - self.robot_y)**2) < min_dist_robot: return False
for (bx, by) in self.box_positions:
if abs(x - bx) < min_dist_box and abs(y - by) < min_dist_box: return False
return True
def _randomize_obstacles(self):
self.box_positions = []
print(f"📦 Harita Karıştırılıyor...")
for name in self.obstacle_names:
bx, by = 0, 0
valid = False
for _ in range(30):
bx = np.random.uniform(-ARENA_SIZE, ARENA_SIZE)
by = np.random.uniform(-ARENA_SIZE, ARENA_SIZE)
overlap = False
for (ex_x, ex_y) in self.box_positions:
if np.sqrt((bx - ex_x)**2 + (by - ex_y)**2) < 1.5:
overlap = True; break
dist_to_robot = np.sqrt((bx - self.robot_x)**2 + (by - self.robot_y)**2)
if not overlap and dist_to_robot > 2.5:
valid = True; break
if valid:
self.box_positions.append((bx, by))
cmd = [
'gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose',
'--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean',
'--req', f'name: "{name}", position: {{x: {bx}, y: {by}, z: 0.5}}'
]
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.15) # Süre kısaltıldı
def _set_new_target(self):
valid = False; tx, ty = 0, 0
for _ in range(50):
tx = np.random.uniform(-ARENA_SIZE + 0.5, ARENA_SIZE - 0.5)
ty = np.random.uniform(-ARENA_SIZE + 0.5, ARENA_SIZE - 0.5)
if self._is_pos_valid(tx, ty, min_dist_robot=3.0, min_dist_box=1.3): valid = True; break
if not valid: tx, ty = 2.0, 2.0
self.target_x = tx; self.target_y = ty
self._update_target_marker(tx, ty)
print(f"🎯 Yeni Hedef: ({tx:.1f}, {ty:.1f})")
# =====================================================
# OBSERVATION
# =====================================================
def _get_observation(self, twist):
if self.scan_data is None:
lidar_norm = np.ones(self.n_obs, dtype=np.float32)
else:
lidar_norm = self.scan_data / self.max_range
# Speed norm: [-1, 1] aralığını korumak için max_speed'e bölüyoruz
# Ama Neural Network genelde [0,1] sever, ancak [-1, 1] de tanh için uygundur.
speed_norm = twist.linear.x / self.max_speed
steering_norm = (self.current_steering / self.max_steering + 1.0) / 2.0
dist = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y)
angle = math.atan2(self.target_y - self.robot_y, self.target_x - self.robot_x) - self.robot_yaw
while angle > math.pi: angle -= 2 * math.pi
while angle < -math.pi: angle += 2 * math.pi
dist_norm = np.clip(dist / (ARENA_SIZE * 2), 0.0, 1.0)
angle_norm = (angle + math.pi) / (2 * math.pi)
obs = np.concatenate([lidar_norm, [speed_norm, steering_norm, dist_norm, angle_norm]])
return obs.astype(np.float32)
# =====================================================
# STEP (GÜNCELLENDİ)
# =====================================================
def step(self, action):
twist = Twist()
# --- GÜNCELLEME 1: GERİ VİTES ---
# Artık [0, 1] değil, [-1, 1] aralığını doğrudan kullanıyoruz.
throttle = float(action[0])
twist.linear.x = throttle * self.max_speed
target_steer = float(action[1]) * self.max_steering
self.current_steering = 0.6 * self.current_steering + 0.4 * target_steer
twist.angular.z = self.current_steering
self.pub_cmd.publish(twist)
# --- Lidar Bekleme (Timeoutlu) ---
self.scan_received = False
start_time = time.time()
while not self.scan_received:
rclpy.spin_once(self.node, timeout_sec=0.001)
# Timeout mekanizması
if time.time() - start_time > 0.1: break
obs = self._get_observation(twist)
# --- Metrikler ---
dist = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y)
if self.scan_data is not None:
min_lidar = np.min(self.scan_data)
# Ön bölgeyi kontrol et
front_slice = self.n_obs // 4
front_start = self.n_obs // 2 - front_slice // 2
front_end = self.n_obs // 2 + front_slice // 2
if front_end > self.n_obs: front_end = self.n_obs
front_min = np.min(self.scan_data[front_start:front_end])
else:
min_lidar = 10.0
front_min = 10.0
# --- REWARD HESAPLAMA (GÜNCELLENDİ) ---
self.step_count += 1
reward = 0.0
terminated = False
truncated = False
if self.prev_distance_to_target is None:
self.prev_distance_to_target = dist
# 1️⃣ HEDEF İLERLEMESİ (Heading Bonus Kaldırıldı)
progress = self.prev_distance_to_target - dist
# Katsayıyı biraz artırdık çünkü bonusu kaldırdık (25 -> 35)
reward += progress * 35.0
self.prev_distance_to_target = dist
# 2️⃣ HIZ ve GERİ VİTES YÖNETİMİ
speed_reward = 0.0
if front_min > 1.5:
# Önü açıksa ve hızlı ileri gidiyorsa ödül
if twist.linear.x > 0.7:
speed_reward += 0.5
# Geri gitme cezası (Sadece gerekliyse yapsın)
if twist.linear.x < 0:
speed_reward -= 0.1 * abs(twist.linear.x)
reward += speed_reward
# 3️⃣ ENGELDEN KAÇINMA
safety_reward = 0.0
if min_lidar < 0.30:
safety_reward -= 5.0
elif min_lidar < 0.50:
safety_reward -= 1.5
elif min_lidar > 1.0:
safety_reward += 0.1 # Ufak teşvik
reward += safety_reward
# 4️⃣ DİREKSİYON CEZASI (Azaltıldı)
# Viraj almaktan korkmasın diye ceza düşürüldü
reward -= abs(self.current_steering) * 0.05
# 5️⃣ ZAMAN CEZASI
reward -= 0.05 # Biraz daha agresif olması için düşürdüm
# 6️⃣ HEDEF BAŞARISI
if dist < 0.6:
time_bonus = max(0.0, (self.max_steps - self.step_count) / 10.0)
reward += 200.0 + time_bonus
terminated = True
self.goal_reached = True
print(f"🏆 HEDEF! Adım: {self.step_count}, Puan: {reward:.1f}")
# 7️⃣ ÇARPIŞMA
if min_lidar < 0.22: # Biraz tolerans artırıldı
reward -= 100.0
terminated = True
print("💥 ÇARPIŞMA")
# 8️⃣ SIKIŞMA (Geri vites için güncellendi)
# Hem ileri hem geri komut verip ilerleyemiyorsa sıkışmıştır.
if abs(progress) < 0.005 and abs(twist.linear.x) > 0.2:
self.stuck_counter += 1
else:
self.stuck_counter = 0
if self.stuck_counter > 100: # 80'den 100'e çektim, geri manevra şansı olsun
reward -= 50.0
terminated = True
print("💤 SIKIŞTI (Reset)")
# 9️⃣ ZAMAN AŞIMI
if self.step_count >= self.max_steps:
truncated = True
print("⏱️ ZAMAN AŞIMI")
return obs, reward, terminated, truncated, {}
# =====================================================
# RESET
# =====================================================
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.episode_count += 1
self.pub_cmd.publish(Twist()) # Durdur
# A. Harita Yenileme
map_changed = False
if self.episode_count % 20 == 0:
self._randomize_obstacles()
map_changed = True
# B. Robotu Işınla
rx, ry, ryaw = 0, 0, 0
for _ in range(20):
rx = np.random.uniform(-ARENA_SIZE, ARENA_SIZE)
ry = np.random.uniform(-ARENA_SIZE, ARENA_SIZE)
ryaw = np.random.uniform(-3.14, 3.14)
if self._is_pos_valid(rx, ry, min_dist_robot=0, min_dist_box=1.5): break
qw = math.cos(ryaw/2); qz = math.sin(ryaw/2)
subprocess.run(['gz', 'service', '-s', f'/world/{WORLD_NAME}/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--timeout', '2000', '--req', f'name: "{ROBOT_NAME}", position: {{x: {rx}, y: {ry}, z: 0.06}}, orientation: {{w: {qw}, z: {qz}}}'], capture_output=True)
time.sleep(0.05) # Süre kısaltıldı
# Odom bekle
for _ in range(10):
rclpy.spin_once(self.node, timeout_sec=0.05)
if math.hypot(self.robot_x - rx, self.robot_y - ry) < 1.0: break
# C. Hedef Yenile
if (self.target_x == 0 and self.target_y == 0) or self.goal_reached or map_changed:
self._set_new_target()
self.goal_reached = False
self.prev_distance_to_target = math.hypot(self.target_x - self.robot_x, self.target_y - self.robot_y)
self.step_count = 0
self.stuck_counter = 0
self.scan_data = None
self.scan_received = False
for _ in range(3): rclpy.spin_once(self.node, timeout_sec=0.1)
obs = self._get_observation(Twist())
return obs, {}
def close(self):
self.node.destroy_node()
rclpy.shutdown()
# =====================================================
# TRAIN
# =====================================================
if __name__ == "__main__":
log_dir = "./logs_bot1_revised/"
checkpoint_dir = "./checkpoints_revised/"
os.makedirs(log_dir, exist_ok=True)
os.makedirs(checkpoint_dir, exist_ok=True)
env = RcCarEnv()
env = Monitor(env, filename=os.path.join(log_dir, "monitor_log"))
env = DummyVecEnv([lambda: env])
checkpoint_callback = CheckpointCallback(
save_freq=50_000,
save_path=checkpoint_dir,
name_prefix="bot1_rev"
)
print("🚀 EĞİTİM BAŞLIYOR (V2 - Geri Vites Aktif)...")
model = RecurrentPPO(
"MlpLstmPolicy",
env,
verbose=1,
tensorboard_log=log_dir,
learning_rate=1e-4,
n_steps=2048,
batch_size=64,
gamma=0.99,
ent_coef=0.01,
policy_kwargs=dict(
net_arch=[dict(pi=[256, 256], vf=[256, 256])],
lstm_hidden_size=256,
n_lstm_layers=1
),
device="auto"
)
try:
model.learn(
total_timesteps=2_000_000,
callback=checkpoint_callback,
progress_bar=True
)
model.save("bot1_revised_final")
print("✅ EĞİTİM TAMAMLANDI")
except KeyboardInterrupt:
print("🛑 DURDURULDU")
model.save("bot1_revised_interrupted")
finally:
env.close()