JaamCTRL-OpenEnv / src /signal_controller.py
Akshara
Fresh start: Clean repo without large file history - ready for HF Spaces
f4f43f0
"""
signal_controller.py
Rule-based adaptive traffic signal controller for 3 coordinated intersections.
Logic:
- Each junction independently monitors queue length on each incoming edge.
- If the main arterial (E-W) queue > threshold → extend green on E-W phase.
- If cross-street (N-S) queue > threshold → extend green on N-S phase.
- Central coordinator applies a green-wave offset: J1 gets +offset, J2 gets +2*offset
so a platoon cleared from J0 hits J1 green, then J2 green.
All calls use traci to read/set phase durations.
"""
import traci
# ── Constants ────────────────────────────────────────────────────────────────
TL_IDS = ["J0", "J1", "J2"]
# Edge groups per junction: (ew_edges, ns_edges)
JUNCTION_EDGES = {
"J0": {
"ew": ["W0J0", "J1J0"], # main arterial approaches
"ns": ["N0J0", "S0J0"], # cross street approaches
},
"J1": {
"ew": ["J0J1", "J2J1"],
"ns": ["N1J1", "S1J1"],
},
"J2": {
"ew": ["J1J2"],
"ns": ["N2J2", "S2J2"],
},
}
# Phase indices (must match tlLogic in .net.xml)
PHASE_EW_GREEN = 0 # E-W green
PHASE_EW_YELLOW = 1
PHASE_NS_GREEN = 2 # N-S green
PHASE_NS_YELLOW = 3
# Timing limits (seconds)
MIN_GREEN = 15
MAX_GREEN = 60
DEFAULT_EW = 35
DEFAULT_NS = 30
YELLOW_DUR = 5
# Queue threshold to trigger extension (vehicles)
QUEUE_THRESHOLD = 5
# Green-wave offset between consecutive junctions (seconds)
GREEN_WAVE_OFFSET = 12
# ── Helpers ──────────────────────────────────────────────────────────────────
def _get_queue(edge_id: str) -> int:
"""Number of vehicles with speed < 0.1 m/s on an edge (proxy for queue)."""
try:
vids = traci.edge.getLastStepVehicleIDs(edge_id)
return sum(1 for v in vids if traci.vehicle.getSpeed(v) < 0.1)
except traci.TraCIException:
return 0
def _total_queue(edges: list[str]) -> int:
return sum(_get_queue(e) for e in edges)
def _clamp(value: int, lo: int, hi: int) -> int:
return max(lo, min(hi, value))
# ── Main controller ───────────────────────────────────────────────────────────
class RuleBasedController:
"""
Stateful rule-based controller.
Call .step(sim_step) every simulation second.
"""
def __init__(self):
# Per-junction state: current phase duration remaining
self._phase_timer = {tl: DEFAULT_EW for tl in TL_IDS}
self._current_phase = {tl: PHASE_EW_GREEN for tl in TL_IDS}
self._green_wave_active = False
# ── Public API ────────────────────────────────────────────────────────────
def step(self, sim_step: int) -> dict:
"""
Run one control step.
Returns a dict of {tl_id: current_phase} for dashboard display.
"""
# Apply green-wave offset on first step
if sim_step == 1:
self._apply_green_wave_offsets()
phase_info = {}
for tl_id in TL_IDS:
phase_info[tl_id] = self._control_junction(tl_id, sim_step)
return phase_info
# ── Internal ──────────────────────────────────────────────────────────────
def _apply_green_wave_offsets(self):
"""Stagger initial phases so platoons hit consecutive junctions on green."""
try:
traci.trafficlight.setPhase("J1", PHASE_EW_GREEN)
traci.trafficlight.setPhaseDuration(
"J1", DEFAULT_EW - GREEN_WAVE_OFFSET
)
traci.trafficlight.setPhase("J2", PHASE_EW_GREEN)
traci.trafficlight.setPhaseDuration(
"J2", DEFAULT_EW - 2 * GREEN_WAVE_OFFSET
)
except traci.TraCIException:
pass
def _control_junction(self, tl_id: str, step: int) -> int:
"""Adaptive rule for a single junction; returns current phase index."""
try:
current = traci.trafficlight.getPhase(tl_id)
time_in_phase = traci.trafficlight.getNextSwitch(tl_id) - step
except traci.TraCIException:
return 0
# Only act on green phases; leave yellow phases alone
if current not in (PHASE_EW_GREEN, PHASE_NS_GREEN):
return current
edges = JUNCTION_EDGES[tl_id]
ew_q = _total_queue(edges["ew"])
ns_q = _total_queue(edges["ns"])
# Decide target green durations
if current == PHASE_EW_GREEN:
if ew_q > QUEUE_THRESHOLD and time_in_phase < 10:
# Extend E-W green
new_dur = _clamp(DEFAULT_EW + (ew_q - QUEUE_THRESHOLD) * 2,
MIN_GREEN, MAX_GREEN)
try:
traci.trafficlight.setPhaseDuration(tl_id, new_dur)
except traci.TraCIException:
pass
elif ns_q > ew_q * 1.5 and time_in_phase > 10:
# Cut E-W short, give N-S a chance
try:
traci.trafficlight.setPhaseDuration(tl_id, MIN_GREEN)
except traci.TraCIException:
pass
elif current == PHASE_NS_GREEN:
if ns_q > QUEUE_THRESHOLD and time_in_phase < 10:
new_dur = _clamp(DEFAULT_NS + (ns_q - QUEUE_THRESHOLD) * 2,
MIN_GREEN, MAX_GREEN)
try:
traci.trafficlight.setPhaseDuration(tl_id, new_dur)
except traci.TraCIException:
pass
elif ew_q > ns_q * 2 and time_in_phase > 8:
try:
traci.trafficlight.setPhaseDuration(tl_id, MIN_GREEN)
except traci.TraCIException:
pass
return current
# ── Fixed-time controller (baseline) ─────────────────────────────────────────
class FixedTimeController:
"""
Applies the default fixed-time program from the .net.xml (no changes).
Used as the 'before' baseline.
"""
def step(self, sim_step: int) -> dict: # noqa: D401
"""No-op: SUMO runs its built-in fixed program."""
return {tl: 0 for tl in TL_IDS}