Traffic-Control / environment /traffic_generator.py
Dhaerya's picture
Add files
b00d5d5
"""
Traffic Generator β€” simulates vehicle arrivals at the intersection.
Implements complex, realistic traffic patterns:
- Extreme lane imbalance (North gets ~70% of traffic)
- Dynamic peak/low phases (alternating every 100 steps)
- Random traffic bursts (15% probability, 4Γ— multiplier)
- Variable vehicle counts (2–8 per arrival event)
"""
import numpy as np
class TrafficGenerator:
"""
Generates stochastic traffic patterns for the simulation.
The deliberately uneven distribution ensures that a fixed-timing signal
cannot match the performance of an adaptive RL agent.
"""
def __init__(self, config):
"""
Args:
config: Module or object exposing traffic-related constants.
"""
self.config = config
# Base density β€” drastically lowered for ~2000-4000 throughput
self.traffic_density = config.TRAFFIC_DENSITY
self.peak_hours = config.PEAK_HOURS
self.peak_multiplier = config.PEAK_MULTIPLIER
# Random number generator
self.rng = np.random.default_rng()
# Burst parameters
self.burst_probability = 0.15 # 15% chance of burst per step
self.burst_active = False
self.burst_duration = 0
self.burst_direction = 0
# Lane imbalance: North gets ~45% of traffic β€” uneven but manageable
# Previously [2.8, 0.4, 0.5, 0.3] caused North density > 1.0 every step
weights = np.array([1.8, 0.7, 0.7, 0.6])
self.lane_weights = weights / weights.sum() * 4 # Normalised
# Dynamic phase: alternates peak/low every 100 steps
self.phase_length = 100
self.current_phase_step = 0
self.is_peak_phase = True
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def reset(self):
"""Reset generator state at the start of each episode."""
self.burst_active = False
self.burst_duration = 0
self.burst_direction = 0
self.current_phase_step = 0
self.is_peak_phase = True
def generate(self, current_step: int) -> np.ndarray:
"""
Generate new vehicles for the current time step.
Args:
current_step: Current simulation step counter.
Returns:
new_vehicles: Array [N_SR, N_L, E_SR, E_L, S_SR, S_L, W_SR, W_L] of vehicle counts.
"""
# --- Dynamic phase (peak / low, toggles every 100 steps) ---
self.current_phase_step += 1
if self.current_phase_step >= self.phase_length:
self.current_phase_step = 0
self.is_peak_phase = not self.is_peak_phase
density = (
self.traffic_density * 1.5
if self.is_peak_phase
else self.traffic_density * 0.5
)
# Traditional peak-hour multiplier
hour = (current_step // 3600) % 24
for start_h, end_h in self.peak_hours:
if start_h <= hour < end_h:
density *= self.peak_multiplier
break
# --- Traffic burst ---
if not self.burst_active:
if self.rng.random() < self.burst_probability:
self.burst_active = True
self.burst_duration = int(self.rng.integers(15, 40))
self.burst_direction = int(self.rng.integers(0, 4))
else:
self.burst_duration -= 1
if self.burst_duration <= 0:
self.burst_active = False
# --- Vehicle arrivals per direction (8 queues) ---
new_vehicles = np.zeros(8, dtype=np.float32)
for direction in range(4):
lane_density = density * self.lane_weights[direction]
if self.burst_active and direction == self.burst_direction:
lane_density *= 4.0 # Burst spike
if self.rng.random() < lane_density:
# 1-3 vehicles arrive
total_arriving = self.rng.integers(1, 4)
for _ in range(total_arriving):
# 20% chance of turning left
is_left_turn = self.rng.random() < 0.2
if is_left_turn:
new_vehicles[direction * 2 + 1] += 1.0 # Left turn queue
else:
new_vehicles[direction * 2] += 1.0 # Straight/Right queue
return new_vehicles
def set_density(self, density: float):
"""Override base traffic density (0.0–1.0)."""
self.traffic_density = float(np.clip(density, 0.0, 1.0))