Spaces:
Sleeping
Sleeping
| import random | |
| from src.models import State | |
| class DeterministicAgent: | |
| def __init__(self): | |
| self.last_switch_time = 0 | |
| self.ns_active_time = 0 | |
| self.ew_active_time = 0 | |
| def get_action(self, state: State) -> int: | |
| ns_total = state.north_queue + state.south_queue | |
| ew_total = state.east_queue + state.west_queue | |
| current_idx = 1 if state.current_signal == "green_ns" else (2 if state.current_signal == "green_ew" else 0) | |
| time_since_switch = state.time_step - self.last_switch_time | |
| # B. EMERGENCY HANDLING | |
| emergency_bonus = 10000.0 | |
| ns_em = 1 if (state.emergency_vehicle_present and state.emergency_direction == 'ns') else 0 | |
| ew_em = 1 if (state.emergency_vehicle_present and state.emergency_direction == 'ew') else 0 | |
| # Override and extend green logic | |
| if ns_em and current_idx != 1: | |
| self.last_switch_time = state.time_step | |
| return 1 | |
| if ew_em and current_idx != 2: | |
| self.last_switch_time = state.time_step | |
| return 2 | |
| # Weights for heuristic | |
| w1 = 12.0 # queue_length | |
| w2 = 3.0 # total_wait_time | |
| w3 = emergency_bonus # emergency_presence | |
| w4 = 6.0 # starvation_penalty | |
| w5 = 15.0 # recently_served_penalty | |
| # Starvation penalization - drastically spikes priority if ignored while waiting | |
| ns_starvation = (state.ns_wait_time ** 1.5) if current_idx != 1 else 0 | |
| ew_starvation = (state.ew_wait_time ** 1.5) if current_idx != 2 else 0 | |
| # Priority calculation per lane | |
| ns_priority = ( | |
| w1 * ns_total + | |
| w2 * state.ns_wait_time + | |
| w3 * ns_em + | |
| w4 * ns_starvation - | |
| w5 * (time_since_switch if current_idx == 1 else 0) | |
| ) | |
| ew_priority = ( | |
| w1 * ew_total + | |
| w2 * state.ew_wait_time + | |
| w3 * ew_em + | |
| w4 * ew_starvation - | |
| w5 * (time_since_switch if current_idx == 2 else 0) | |
| ) | |
| # Controlled variability to fix deterministic bounds | |
| epsilon = 0.5 | |
| ns_priority += random.uniform(-epsilon, epsilon) | |
| ew_priority += random.uniform(-epsilon, epsilon) | |
| # Target determination | |
| target_idx = current_idx | |
| if current_idx == 1: | |
| if ew_priority > ns_priority: target_idx = 2 | |
| elif current_idx == 2: | |
| if ns_priority > ew_priority: target_idx = 1 | |
| else: | |
| target_idx = 1 if ns_priority >= ew_priority else 2 | |
| # C. ADAPTIVE SIGNAL TIMING | |
| # Avoid fixed timing: longer green time for high congestion | |
| current_queue = ns_total if current_idx == 1 else (ew_total if current_idx == 2 else 0) | |
| dynamic_min_green = max(3, min(12, current_queue // 5)) | |
| # Fast exit: Switch immediately if active lane is completely empty | |
| if current_queue == 0 and (ns_total > 0 or ew_total > 0): | |
| target_idx = 2 if current_idx == 1 else 1 | |
| elif current_idx != 0 and time_since_switch < dynamic_min_green and current_queue > 0: | |
| target_idx = current_idx | |
| if target_idx != current_idx: | |
| self.last_switch_time = state.time_step | |
| return target_idx | |