Spaces:
Sleeping
Sleeping
File size: 3,384 Bytes
f259f2b e2485ba f259f2b e2485ba 3864abc f259f2b 7a78f7e f259f2b e2485ba f259f2b e2485ba f259f2b e2485ba f259f2b 3864abc 7a78f7e 3864abc f259f2b 3864abc f259f2b 3864abc f259f2b 3864abc e2485ba f259f2b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | import random
from src.models import State
class DeterministicAgent:
def __init__(self):
self.last_switch_time = 0
self.ns_active_time = 0
self.ew_active_time = 0
def get_action(self, state: State) -> int:
ns_total = state.north_queue + state.south_queue
ew_total = state.east_queue + state.west_queue
current_idx = 1 if state.current_signal == "green_ns" else (2 if state.current_signal == "green_ew" else 0)
time_since_switch = state.time_step - self.last_switch_time
# B. EMERGENCY HANDLING
emergency_bonus = 10000.0
ns_em = 1 if (state.emergency_vehicle_present and state.emergency_direction == 'ns') else 0
ew_em = 1 if (state.emergency_vehicle_present and state.emergency_direction == 'ew') else 0
# Override and extend green logic
if ns_em and current_idx != 1:
self.last_switch_time = state.time_step
return 1
if ew_em and current_idx != 2:
self.last_switch_time = state.time_step
return 2
# Weights for heuristic
w1 = 12.0 # queue_length
w2 = 3.0 # total_wait_time
w3 = emergency_bonus # emergency_presence
w4 = 6.0 # starvation_penalty
w5 = 15.0 # recently_served_penalty
# Starvation penalization - drastically spikes priority if ignored while waiting
ns_starvation = (state.ns_wait_time ** 1.5) if current_idx != 1 else 0
ew_starvation = (state.ew_wait_time ** 1.5) if current_idx != 2 else 0
# Priority calculation per lane
ns_priority = (
w1 * ns_total +
w2 * state.ns_wait_time +
w3 * ns_em +
w4 * ns_starvation -
w5 * (time_since_switch if current_idx == 1 else 0)
)
ew_priority = (
w1 * ew_total +
w2 * state.ew_wait_time +
w3 * ew_em +
w4 * ew_starvation -
w5 * (time_since_switch if current_idx == 2 else 0)
)
# Controlled variability to fix deterministic bounds
epsilon = 0.5
ns_priority += random.uniform(-epsilon, epsilon)
ew_priority += random.uniform(-epsilon, epsilon)
# Target determination
target_idx = current_idx
if current_idx == 1:
if ew_priority > ns_priority: target_idx = 2
elif current_idx == 2:
if ns_priority > ew_priority: target_idx = 1
else:
target_idx = 1 if ns_priority >= ew_priority else 2
# C. ADAPTIVE SIGNAL TIMING
# Avoid fixed timing: longer green time for high congestion
current_queue = ns_total if current_idx == 1 else (ew_total if current_idx == 2 else 0)
dynamic_min_green = max(3, min(12, current_queue // 5))
# Fast exit: Switch immediately if active lane is completely empty
if current_queue == 0 and (ns_total > 0 or ew_total > 0):
target_idx = 2 if current_idx == 1 else 1
elif current_idx != 0 and time_since_switch < dynamic_min_green and current_queue > 0:
target_idx = current_idx
if target_idx != current_idx:
self.last_switch_time = state.time_step
return target_idx
|