Spaces:
Sleeping
Sleeping
| """ | |
| Traffic Generator β simulates vehicle arrivals at the intersection. | |
| Implements complex, realistic traffic patterns: | |
| - Extreme lane imbalance (North gets ~70% of traffic) | |
| - Dynamic peak/low phases (alternating every 100 steps) | |
| - Random traffic bursts (15% probability, 4Γ multiplier) | |
| - Variable vehicle counts (2β8 per arrival event) | |
| """ | |
| import numpy as np | |
| class TrafficGenerator: | |
| """ | |
| Generates stochastic traffic patterns for the simulation. | |
| The deliberately uneven distribution ensures that a fixed-timing signal | |
| cannot match the performance of an adaptive RL agent. | |
| """ | |
| def __init__(self, config): | |
| """ | |
| Args: | |
| config: Module or object exposing traffic-related constants. | |
| """ | |
| self.config = config | |
| # Base density β drastically lowered for ~2000-4000 throughput | |
| self.traffic_density = config.TRAFFIC_DENSITY | |
| self.peak_hours = config.PEAK_HOURS | |
| self.peak_multiplier = config.PEAK_MULTIPLIER | |
| # Random number generator | |
| self.rng = np.random.default_rng() | |
| # Burst parameters | |
| self.burst_probability = 0.15 # 15% chance of burst per step | |
| self.burst_active = False | |
| self.burst_duration = 0 | |
| self.burst_direction = 0 | |
| # Lane imbalance: North gets ~45% of traffic β uneven but manageable | |
| # Previously [2.8, 0.4, 0.5, 0.3] caused North density > 1.0 every step | |
| weights = np.array([1.8, 0.7, 0.7, 0.6]) | |
| self.lane_weights = weights / weights.sum() * 4 # Normalised | |
| # Dynamic phase: alternates peak/low every 100 steps | |
| self.phase_length = 100 | |
| self.current_phase_step = 0 | |
| self.is_peak_phase = True | |
| # ------------------------------------------------------------------ | |
| # Public API | |
| # ------------------------------------------------------------------ | |
| def reset(self): | |
| """Reset generator state at the start of each episode.""" | |
| self.burst_active = False | |
| self.burst_duration = 0 | |
| self.burst_direction = 0 | |
| self.current_phase_step = 0 | |
| self.is_peak_phase = True | |
| def generate(self, current_step: int) -> np.ndarray: | |
| """ | |
| Generate new vehicles for the current time step. | |
| Args: | |
| current_step: Current simulation step counter. | |
| Returns: | |
| new_vehicles: Array [N_SR, N_L, E_SR, E_L, S_SR, S_L, W_SR, W_L] of vehicle counts. | |
| """ | |
| # --- Dynamic phase (peak / low, toggles every 100 steps) --- | |
| self.current_phase_step += 1 | |
| if self.current_phase_step >= self.phase_length: | |
| self.current_phase_step = 0 | |
| self.is_peak_phase = not self.is_peak_phase | |
| density = ( | |
| self.traffic_density * 1.5 | |
| if self.is_peak_phase | |
| else self.traffic_density * 0.5 | |
| ) | |
| # Traditional peak-hour multiplier | |
| hour = (current_step // 3600) % 24 | |
| for start_h, end_h in self.peak_hours: | |
| if start_h <= hour < end_h: | |
| density *= self.peak_multiplier | |
| break | |
| # --- Traffic burst --- | |
| if not self.burst_active: | |
| if self.rng.random() < self.burst_probability: | |
| self.burst_active = True | |
| self.burst_duration = int(self.rng.integers(15, 40)) | |
| self.burst_direction = int(self.rng.integers(0, 4)) | |
| else: | |
| self.burst_duration -= 1 | |
| if self.burst_duration <= 0: | |
| self.burst_active = False | |
| # --- Vehicle arrivals per direction (8 queues) --- | |
| new_vehicles = np.zeros(8, dtype=np.float32) | |
| for direction in range(4): | |
| lane_density = density * self.lane_weights[direction] | |
| if self.burst_active and direction == self.burst_direction: | |
| lane_density *= 4.0 # Burst spike | |
| if self.rng.random() < lane_density: | |
| # 1-3 vehicles arrive | |
| total_arriving = self.rng.integers(1, 4) | |
| for _ in range(total_arriving): | |
| # 20% chance of turning left | |
| is_left_turn = self.rng.random() < 0.2 | |
| if is_left_turn: | |
| new_vehicles[direction * 2 + 1] += 1.0 # Left turn queue | |
| else: | |
| new_vehicles[direction * 2] += 1.0 # Straight/Right queue | |
| return new_vehicles | |
| def set_density(self, density: float): | |
| """Override base traffic density (0.0β1.0).""" | |
| self.traffic_density = float(np.clip(density, 0.0, 1.0)) | |