|
|
import rclpy |
|
|
from rclpy.node import Node |
|
|
from rclpy.qos import QoSProfile, ReliabilityPolicy |
|
|
from geometry_msgs.msg import Twist, Pose |
|
|
from sensor_msgs.msg import LaserScan |
|
|
import gymnasium as gym |
|
|
from gymnasium import spaces |
|
|
import numpy as np |
|
|
import time |
|
|
import os |
|
|
import math |
|
|
|
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import pennylane as qml |
|
|
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
n_qubits = 4 |
|
|
n_layers = 2 |
|
|
dev = qml.device("default.qubit", wires=n_qubits) |
|
|
|
|
|
@qml.qnode(dev, interface="torch") |
|
|
def quantum_circuit(inputs, weights): |
|
|
|
|
|
qml.templates.AngleEmbedding(inputs, wires=range(n_qubits)) |
|
|
|
|
|
|
|
|
qml.templates.StronglyEntanglingLayers(weights, wires=range(n_qubits)) |
|
|
|
|
|
|
|
|
return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)] |
|
|
|
|
|
class HybridQuantumLayer(nn.Module): |
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
weight_shapes = {"weights": (n_layers, n_qubits, 3)} |
|
|
|
|
|
self.q_layer = qml.qnn.TorchLayer(quantum_circuit, weight_shapes) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.q_layer(x) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QuantumFeatureExtractor(BaseFeaturesExtractor): |
|
|
def __init__(self, observation_space: spaces.Box, features_dim: int = 4): |
|
|
super().__init__(observation_space, features_dim) |
|
|
|
|
|
|
|
|
self.classical_pre_net = nn.Sequential( |
|
|
nn.Linear(10, 32), |
|
|
nn.ReLU(), |
|
|
nn.Linear(32, n_qubits), |
|
|
nn.Tanh() |
|
|
) |
|
|
|
|
|
|
|
|
self.quantum_net = HybridQuantumLayer() |
|
|
|
|
|
def forward(self, observations: torch.Tensor) -> torch.Tensor: |
|
|
|
|
|
x = self.classical_pre_net(observations) |
|
|
|
|
|
x = self.quantum_net(x) |
|
|
return x |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RcCarEnv(gym.Env): |
|
|
def __init__(self): |
|
|
super(RcCarEnv, self).__init__() |
|
|
|
|
|
if not rclpy.ok(): |
|
|
rclpy.init() |
|
|
|
|
|
self.node = rclpy.create_node('quantum_ackermann_agent') |
|
|
|
|
|
|
|
|
qos_reliable = QoSProfile(reliability=ReliabilityPolicy.RELIABLE, depth=10) |
|
|
qos_best_effort = QoSProfile(reliability=ReliabilityPolicy.BEST_EFFORT, depth=10) |
|
|
|
|
|
self.pub_cmd = self.node.create_publisher(Twist, '/cmd_vel', 10) |
|
|
self.sub_scan = self.node.create_subscription(LaserScan, '/scan', self.scan_cb, qos_best_effort) |
|
|
|
|
|
|
|
|
self.max_speed = 0.5 |
|
|
self.max_steering = 0.45 |
|
|
self.steering_smooth_factor = 0.2 |
|
|
|
|
|
self.current_steering = 0.0 |
|
|
self.scan_data = None |
|
|
self.scan_received = False |
|
|
|
|
|
|
|
|
self.action_space = spaces.Box(low=np.array([0.0, -1.0]), high=np.array([1.0, 1.0]), dtype=np.float32) |
|
|
|
|
|
|
|
|
self.n_obs = 10 |
|
|
self.max_range = 10.0 |
|
|
self.observation_space = spaces.Box(low=0.0, high=1.0, shape=(self.n_obs,), dtype=np.float32) |
|
|
|
|
|
self.step_count = 0 |
|
|
self.max_steps = 2000 |
|
|
|
|
|
def scan_cb(self, msg): |
|
|
raw = np.array(msg.ranges) |
|
|
raw = np.where(np.isinf(raw), self.max_range, raw) |
|
|
raw = np.where(np.isnan(raw), self.max_range, raw) |
|
|
raw = np.clip(raw, 0.0, self.max_range) |
|
|
|
|
|
chunk = len(raw) // self.n_obs |
|
|
if chunk > 0: |
|
|
self.scan_data = np.array([np.min(raw[i*chunk:(i+1)*chunk]) for i in range(self.n_obs)], dtype=np.float32) |
|
|
else: |
|
|
self.scan_data = np.full(self.n_obs, self.max_range, dtype=np.float32) |
|
|
self.scan_received = True |
|
|
|
|
|
def step(self, action): |
|
|
twist = Twist() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
forward_action = (float(action[0]) + 1.0) / 2.0 |
|
|
target_speed = forward_action * self.max_speed |
|
|
twist.linear.x = target_speed |
|
|
|
|
|
|
|
|
target_steering = float(action[1]) * self.max_steering |
|
|
self.current_steering = (1.0 - self.steering_smooth_factor) * self.current_steering + \ |
|
|
(self.steering_smooth_factor * target_steering) |
|
|
twist.angular.z = self.current_steering |
|
|
|
|
|
self.pub_cmd.publish(twist) |
|
|
|
|
|
|
|
|
self.scan_received = False |
|
|
start_wait = time.time() |
|
|
while not self.scan_received: |
|
|
rclpy.spin_once(self.node, timeout_sec=0.01) |
|
|
if time.time() - start_wait > 0.2: break |
|
|
|
|
|
if self.scan_data is not None: |
|
|
observation = self.scan_data / self.max_range |
|
|
else: |
|
|
observation = np.ones(self.n_obs, dtype=np.float32) |
|
|
|
|
|
self.step_count += 1 |
|
|
|
|
|
|
|
|
real_min_dist = np.min(observation) * self.max_range |
|
|
reward = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if twist.linear.x > 0.1: |
|
|
reward += 0.1 |
|
|
reward += twist.linear.x * 0.5 |
|
|
else: |
|
|
reward -= 0.05 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
reward -= abs(self.current_steering) * 0.6 |
|
|
|
|
|
|
|
|
if real_min_dist < 0.6: |
|
|
reward -= (0.6 - real_min_dist) * 2.0 |
|
|
|
|
|
terminated = False |
|
|
truncated = False |
|
|
|
|
|
|
|
|
if real_min_dist < 0.25: |
|
|
reward = -20.0 |
|
|
terminated = True |
|
|
print(f"💥 Bam! ({self.step_count} adım)") |
|
|
self.pub_cmd.publish(Twist()) |
|
|
|
|
|
if self.step_count >= self.max_steps: |
|
|
truncated = True |
|
|
|
|
|
return observation.astype(np.float32), reward, terminated, truncated, {} |
|
|
def reset(self, seed=None, options=None): |
|
|
super().reset(seed=seed) |
|
|
print("\n⚛️ Kuantum Reset (Direct Gazebo Call)...") |
|
|
|
|
|
self.current_steering = 0.0 |
|
|
self.pub_cmd.publish(Twist()) |
|
|
|
|
|
|
|
|
cmd = """gz service -s /world/arena/set_pose \ |
|
|
--reqtype gz.msgs.Pose \ |
|
|
--reptype gz.msgs.Boolean \ |
|
|
--timeout 300 \ |
|
|
--req 'name: "my_rc_car", position: {x: 0, y: 0, z: 0.2}, orientation: {w: 1}'""" |
|
|
os.system(cmd) |
|
|
|
|
|
self.scan_data = None |
|
|
self.scan_received = False |
|
|
start_wait = time.time() |
|
|
while time.time() - start_wait < 1.0: |
|
|
rclpy.spin_once(self.node, timeout_sec=0.1) |
|
|
|
|
|
if self.scan_data is not None: |
|
|
obs = self.scan_data / self.max_range |
|
|
else: |
|
|
obs = np.ones(self.n_obs, dtype=np.float32) |
|
|
|
|
|
self.step_count = 0 |
|
|
return obs.astype(np.float32), {} |
|
|
|
|
|
def close(self): |
|
|
self.pub_cmd.publish(Twist()) |
|
|
self.node.destroy_node() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
from stable_baselines3 import PPO |
|
|
|
|
|
log_dir = "./logs_quantum/" |
|
|
os.makedirs(log_dir, exist_ok=True) |
|
|
|
|
|
env = RcCarEnv() |
|
|
|
|
|
print("---------------------------------------") |
|
|
print("⚛️ HİBRİT KUANTUM RL EĞİTİMİ BAŞLIYOR") |
|
|
print("🧠 4 Qubit Devrede | PennyLane & PyTorch") |
|
|
print("❤️ Engin'i Seviyorum! Geleceği Kodluyoruz!") |
|
|
print("---------------------------------------") |
|
|
|
|
|
|
|
|
policy_kwargs = dict( |
|
|
features_extractor_class=QuantumFeatureExtractor, |
|
|
features_extractor_kwargs=dict(features_dim=n_qubits), |
|
|
) |
|
|
|
|
|
model = PPO( |
|
|
"MlpPolicy", |
|
|
env, |
|
|
verbose=1, |
|
|
tensorboard_log=log_dir, |
|
|
learning_rate=3e-4, |
|
|
policy_kwargs=policy_kwargs, |
|
|
batch_size=32, |
|
|
n_steps=512, |
|
|
gamma=0.99 |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
model.learn(total_timesteps=50000) |
|
|
model.save("rc_car_quantum_master") |
|
|
print("✅ Kuantum Eğitimi Başarıyla Tamamlandı!") |
|
|
except KeyboardInterrupt: |
|
|
model.save("rc_car_quantum_interrupted") |
|
|
print("\n⚠️ Eğitim durduruldu ve kaydedildi.") |
|
|
finally: |
|
|
env.close() |