car_env / training.py
Hajorda's picture
Upload folder using huggingface_hub
1a0d68d verified
import os
import time
import math
import random
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
import subprocess
# --- LSTM KÜTÜPHANELERİ ---
from sb3_contrib import RecurrentPPO
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.monitor import Monitor
# --- GENEL AYARLAR ---
ROBOT_NAME = "bot1"
WORLD_NAME = "arena"
CMD_VEL_TOPIC = f"/{ROBOT_NAME}/cmd_vel"
SCAN_TOPIC = f"/{ROBOT_NAME}/scan"
ODOM_TOPIC = f"/{ROBOT_NAME}/odom"
ARENA_SIZE = 4.5
class RcCarEnv(gym.Env):
def __init__(self):
super().__init__()
if not rclpy.ok():
rclpy.init()
self.node = rclpy.create_node("rl_driver_node")
qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
depth=10
)
self.pub_cmd = self.node.create_publisher(Twist, CMD_VEL_TOPIC, 10)
self.sub_scan = self.node.create_subscription(
LaserScan, SCAN_TOPIC, self.scan_cb, qos
)
self.sub_odom = self.node.create_subscription(
Odometry, ODOM_TOPIC, self.odom_cb, qos
)
print(f"📡 Bağlantı Başarılı: {ROBOT_NAME} (Sabit Ceza Modu)")
# =====================
# FİZİK
# =====================
self.max_speed = 1.0
self.max_steering = 0.6
self.current_steering = 0.0
# =====================
# HARİTA
# =====================
self.obstacle_names = [f"box_{i}" for i in range(1, 11)]
self.box_positions = []
self.target_x = 0.0
self.target_y = 0.0
self.robot_x = 0.0
self.robot_y = 0.0
self.robot_yaw = 0.0
self.prev_distance_to_target = None
self.goal_reached = False
# =====================
# LIDAR
# =====================
self.scan_data = None
self.raw_scan_data = None
self.scan_received = False
self.n_obs = 60
self.max_range = 10.0
# =====================
# EPISODE
# =====================
self.step_count = 0
self.max_steps = 2500
self.episode_count = 0
self.stuck_counter = 0
# =====================
# GYM
# =====================
self.action_space = spaces.Box(
low=np.array([-1.0, -1.0]),
high=np.array([1.0, 1.0]),
dtype=np.float32
)
self.observation_space = spaces.Box(
low=0.0,
high=1.0,
shape=(self.n_obs + 4,),
dtype=np.float32
)
# =====================================================
# CALLBACKS
# =====================================================
def odom_cb(self, msg):
self.robot_x = msg.pose.pose.position.x
self.robot_y = msg.pose.pose.position.y
q = msg.pose.pose.orientation
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z)
self.robot_yaw = math.atan2(siny_cosp, cosy_cosp)
def scan_cb(self, msg):
raw = np.array(msg.ranges)
raw = np.where(np.isinf(raw), self.max_range, raw)
raw = np.where(np.isnan(raw), self.max_range, raw)
self.raw_scan_data = np.clip(raw, 0.0, self.max_range)
chunk = len(self.raw_scan_data) // self.n_obs
if chunk > 0:
self.scan_data = np.array(
[
np.min(self.raw_scan_data[i * chunk:(i + 1) * chunk])
for i in range(self.n_obs)
],
dtype=np.float32
)
else:
self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32)
self.scan_received = True
# =====================================================
# OBSERVATION
# =====================================================
def _get_observation(self, twist):
if self.scan_data is None:
lidar_norm = np.ones(self.n_obs, dtype=np.float32)
else:
lidar_norm = self.scan_data / self.max_range
speed_norm = twist.linear.x / self.max_speed
steering_norm = (self.current_steering / self.max_steering + 1.0) / 2.0
dist = math.hypot(
self.target_x - self.robot_x,
self.target_y - self.robot_y
)
angle = math.atan2(
self.target_y - self.robot_y,
self.target_x - self.robot_x
) - self.robot_yaw
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
dist_norm = np.clip(dist / (ARENA_SIZE * 2), 0.0, 1.0)
angle_norm = (angle + math.pi) / (2 * math.pi)
obs = np.concatenate(
[lidar_norm, [speed_norm, steering_norm, dist_norm, angle_norm]]
)
return obs.astype(np.float32)
# =====================================================
# STEP
# =====================================================
def step(self, action):
twist = Twist()
throttle = (float(action[0]) + 1.0) / 2.0
twist.linear.x = throttle * self.max_speed
target_steer = float(action[1]) * self.max_steering
self.current_steering = (
0.6 * self.current_steering +
0.4 * target_steer
)
twist.angular.z = self.current_steering
self.pub_cmd.publish(twist)
# --- lidar bekle ---
self.scan_received = False
start_time = time.time()
while not self.scan_received:
rclpy.spin_once(self.node, timeout_sec=0.001)
if time.time() - start_time > 0.1:
break
obs = self._get_observation(twist)
# --- metrikler ---
dist = math.hypot(
self.target_x - self.robot_x,
self.target_y - self.robot_y
)
angle = math.atan2(
self.target_y - self.robot_y,
self.target_x - self.robot_x
) - self.robot_yaw
while angle > math.pi:
angle -= 2 * math.pi
while angle < -math.pi:
angle += 2 * math.pi
if self.scan_data is not None:
min_lidar = np.min(self.scan_data)
front_slice = self.n_obs // 4
front_start = self.n_obs // 2 - front_slice // 2
front_end = self.n_obs // 2 + front_slice // 2
front_min = np.min(self.scan_data[front_start:front_end])
else:
min_lidar = 10.0
front_min = 10.0
# --- reward ---
self.step_count += 1
reward = 0.0
terminated = False
truncated = False
if self.prev_distance_to_target is None:
self.prev_distance_to_target = dist
progress = self.prev_distance_to_target - dist
heading_bonus = max(0.0, math.cos(angle))
reward += progress * 25.0 * (1.0 + heading_bonus)
reward -= abs(self.current_steering) * 0.1
reward -= 0.2
self.prev_distance_to_target = dist
# hedef
if dist < 0.6:
time_bonus = max(
0.0,
(self.max_steps - self.step_count) / 10.0
)
reward += 200.0 + time_bonus
terminated = True
print(f"🏆 HEDEF! Bonus: {time_bonus:.1f}")
# çarpışma
if min_lidar < 0.25:
reward -= 100.0
terminated = True
print("💥 ÇARPIŞMA")
# sıkışma
if abs(progress) < 0.005 and twist.linear.x > 0.2:
self.stuck_counter += 1
else:
self.stuck_counter = 0
if self.stuck_counter > 80:
reward -= 50.0
terminated = True
print("💤 SIKIŞTI")
if self.step_count >= self.max_steps:
truncated = True
reward -= 30.0
print("⏱️ ZAMAN AŞIMI")
return obs, reward, terminated, truncated, {}
# =====================================================
# RESET
# =====================================================
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.episode_count += 1
self.pub_cmd.publish(Twist())
self.prev_distance_to_target = None
self.step_count = 0
self.stuck_counter = 0
self.scan_data = None
self.scan_received = False
for _ in range(5):
rclpy.spin_once(self.node, timeout_sec=0.1)
obs = self._get_observation(Twist())
return obs, {}
def close(self):
self.node.destroy_node()
rclpy.shutdown()
# =====================================================
# TRAIN
# =====================================================
if __name__ == "__main__":
log_dir = "./logs_bot1/"
checkpoint_dir = "./checkpoints/"
os.makedirs(log_dir, exist_ok=True)
os.makedirs(checkpoint_dir, exist_ok=True)
env = RcCarEnv()
env = Monitor(env, filename=os.path.join(log_dir, "monitor_log"))
env = DummyVecEnv([lambda: env])
checkpoint_callback = CheckpointCallback(
save_freq=50_000,
save_path=checkpoint_dir,
name_prefix="bot1_fixed"
)
print("🚀 EĞİTİM BAŞLIYOR...")
model = RecurrentPPO(
"MlpLstmPolicy",
env,
verbose=1,
tensorboard_log=log_dir,
learning_rate=1e-4,
n_steps=2048,
batch_size=64,
gamma=0.99,
ent_coef=0.01,
policy_kwargs=dict(
net_arch=[dict(pi=[256, 256], vf=[256, 256])],
lstm_hidden_size=256,
n_lstm_layers=1
),
device="auto"
)
try:
model.learn(
total_timesteps=2_000_000,
callback=checkpoint_callback,
progress_bar=True
)
model.save("bot1_fixed_final")
print("✅ EĞİTİM TAMAMLANDI")
except KeyboardInterrupt:
print("🛑 DURDURULDU")
model.save("bot1_fixed_interrupted")
finally:
env.close()