Spaces:
Sleeping
Sleeping
| """ | |
| signal_controller.py | |
| Rule-based adaptive traffic signal controller for 3 coordinated intersections. | |
| Logic: | |
| - Each junction independently monitors queue length on each incoming edge. | |
| - If the main arterial (E-W) queue > threshold → extend green on E-W phase. | |
| - If cross-street (N-S) queue > threshold → extend green on N-S phase. | |
| - Central coordinator applies a green-wave offset: J1 gets +offset, J2 gets +2*offset | |
| so a platoon cleared from J0 hits J1 green, then J2 green. | |
| All calls use traci to read/set phase durations. | |
| """ | |
| import traci | |
| # ── Constants ──────────────────────────────────────────────────────────────── | |
| TL_IDS = ["J0", "J1", "J2"] | |
| # Edge groups per junction: (ew_edges, ns_edges) | |
| JUNCTION_EDGES = { | |
| "J0": { | |
| "ew": ["W0J0", "J1J0"], # main arterial approaches | |
| "ns": ["N0J0", "S0J0"], # cross street approaches | |
| }, | |
| "J1": { | |
| "ew": ["J0J1", "J2J1"], | |
| "ns": ["N1J1", "S1J1"], | |
| }, | |
| "J2": { | |
| "ew": ["J1J2"], | |
| "ns": ["N2J2", "S2J2"], | |
| }, | |
| } | |
| # Phase indices (must match tlLogic in .net.xml) | |
| PHASE_EW_GREEN = 0 # E-W green | |
| PHASE_EW_YELLOW = 1 | |
| PHASE_NS_GREEN = 2 # N-S green | |
| PHASE_NS_YELLOW = 3 | |
| # Timing limits (seconds) | |
| MIN_GREEN = 15 | |
| MAX_GREEN = 60 | |
| DEFAULT_EW = 35 | |
| DEFAULT_NS = 30 | |
| YELLOW_DUR = 5 | |
| # Queue threshold to trigger extension (vehicles) | |
| QUEUE_THRESHOLD = 5 | |
| # Green-wave offset between consecutive junctions (seconds) | |
| GREEN_WAVE_OFFSET = 12 | |
| # ── Helpers ────────────────────────────────────────────────────────────────── | |
| def _get_queue(edge_id: str) -> int: | |
| """Number of vehicles with speed < 0.1 m/s on an edge (proxy for queue).""" | |
| try: | |
| vids = traci.edge.getLastStepVehicleIDs(edge_id) | |
| return sum(1 for v in vids if traci.vehicle.getSpeed(v) < 0.1) | |
| except traci.TraCIException: | |
| return 0 | |
| def _total_queue(edges: list[str]) -> int: | |
| return sum(_get_queue(e) for e in edges) | |
| def _clamp(value: int, lo: int, hi: int) -> int: | |
| return max(lo, min(hi, value)) | |
| # ── Main controller ─────────────────────────────────────────────────────────── | |
| class RuleBasedController: | |
| """ | |
| Stateful rule-based controller. | |
| Call .step(sim_step) every simulation second. | |
| """ | |
| def __init__(self): | |
| # Per-junction state: current phase duration remaining | |
| self._phase_timer = {tl: DEFAULT_EW for tl in TL_IDS} | |
| self._current_phase = {tl: PHASE_EW_GREEN for tl in TL_IDS} | |
| self._green_wave_active = False | |
| # ── Public API ──────────────────────────────────────────────────────────── | |
| def step(self, sim_step: int) -> dict: | |
| """ | |
| Run one control step. | |
| Returns a dict of {tl_id: current_phase} for dashboard display. | |
| """ | |
| # Apply green-wave offset on first step | |
| if sim_step == 1: | |
| self._apply_green_wave_offsets() | |
| phase_info = {} | |
| for tl_id in TL_IDS: | |
| phase_info[tl_id] = self._control_junction(tl_id, sim_step) | |
| return phase_info | |
| # ── Internal ────────────────────────────────────────────────────────────── | |
| def _apply_green_wave_offsets(self): | |
| """Stagger initial phases so platoons hit consecutive junctions on green.""" | |
| try: | |
| traci.trafficlight.setPhase("J1", PHASE_EW_GREEN) | |
| traci.trafficlight.setPhaseDuration( | |
| "J1", DEFAULT_EW - GREEN_WAVE_OFFSET | |
| ) | |
| traci.trafficlight.setPhase("J2", PHASE_EW_GREEN) | |
| traci.trafficlight.setPhaseDuration( | |
| "J2", DEFAULT_EW - 2 * GREEN_WAVE_OFFSET | |
| ) | |
| except traci.TraCIException: | |
| pass | |
| def _control_junction(self, tl_id: str, step: int) -> int: | |
| """Adaptive rule for a single junction; returns current phase index.""" | |
| try: | |
| current = traci.trafficlight.getPhase(tl_id) | |
| time_in_phase = traci.trafficlight.getNextSwitch(tl_id) - step | |
| except traci.TraCIException: | |
| return 0 | |
| # Only act on green phases; leave yellow phases alone | |
| if current not in (PHASE_EW_GREEN, PHASE_NS_GREEN): | |
| return current | |
| edges = JUNCTION_EDGES[tl_id] | |
| ew_q = _total_queue(edges["ew"]) | |
| ns_q = _total_queue(edges["ns"]) | |
| # Decide target green durations | |
| if current == PHASE_EW_GREEN: | |
| if ew_q > QUEUE_THRESHOLD and time_in_phase < 10: | |
| # Extend E-W green | |
| new_dur = _clamp(DEFAULT_EW + (ew_q - QUEUE_THRESHOLD) * 2, | |
| MIN_GREEN, MAX_GREEN) | |
| try: | |
| traci.trafficlight.setPhaseDuration(tl_id, new_dur) | |
| except traci.TraCIException: | |
| pass | |
| elif ns_q > ew_q * 1.5 and time_in_phase > 10: | |
| # Cut E-W short, give N-S a chance | |
| try: | |
| traci.trafficlight.setPhaseDuration(tl_id, MIN_GREEN) | |
| except traci.TraCIException: | |
| pass | |
| elif current == PHASE_NS_GREEN: | |
| if ns_q > QUEUE_THRESHOLD and time_in_phase < 10: | |
| new_dur = _clamp(DEFAULT_NS + (ns_q - QUEUE_THRESHOLD) * 2, | |
| MIN_GREEN, MAX_GREEN) | |
| try: | |
| traci.trafficlight.setPhaseDuration(tl_id, new_dur) | |
| except traci.TraCIException: | |
| pass | |
| elif ew_q > ns_q * 2 and time_in_phase > 8: | |
| try: | |
| traci.trafficlight.setPhaseDuration(tl_id, MIN_GREEN) | |
| except traci.TraCIException: | |
| pass | |
| return current | |
| # ── Fixed-time controller (baseline) ───────────────────────────────────────── | |
| class FixedTimeController: | |
| """ | |
| Applies the default fixed-time program from the .net.xml (no changes). | |
| Used as the 'before' baseline. | |
| """ | |
| def step(self, sim_step: int) -> dict: # noqa: D401 | |
| """No-op: SUMO runs its built-in fixed program.""" | |
| return {tl: 0 for tl in TL_IDS} | |