""" phase_controller.py — Movement-Level Phase Controller ====================================================== Defines all intersection phases (including turning phases) and a PhaseManager that selects the optimal phase based on movement-level queue counts (left/straight/right per lane). Required by: realtime_engine_v2.py Classes: Phase — Enum of all valid intersection phases PhaseManager — Selects and manages active phase using max-pressure """ from enum import Enum import time from typing import Optional # ── Phase Enum ──────────────────────────────────────────────────────────── class Phase(Enum): """ All conflict-free signal phases for a standard 4-way intersection. NS phases serve North + South simultaneously (they don't conflict). EW phases serve East + West simultaneously. _STRAIGHT variants: straight-going and right-turning vehicles _LEFT variants: protected left-turn vehicles (opposing turns blocked) """ # Core phases (compatible with dashboard_final.py PHASES dict) NS_GREEN = "NS_GREEN" # N+S straight + right (standard green) EW_GREEN = "EW_GREEN" # E+W straight + right (standard green) ALL_RED = "ALL_RED" # All red — transition / EVP clearance # Extended phases (movement-level) NS_STRAIGHT = "NS_STRAIGHT" # N+S straight-only (alias for NS_GREEN) NS_LEFT = "NS_LEFT" # N+S protected left turns only EW_STRAIGHT = "EW_STRAIGHT" # E+W straight-only (alias for EW_GREEN) EW_LEFT = "EW_LEFT" # E+W protected left turns only def to_simple(self) -> str: """ Convert to one of the 3 simple phases understood by the dashboard and traffic_env.py: NS_GREEN | EW_GREEN | ALL_RED """ _map = { "NS_GREEN": "NS_GREEN", "EW_GREEN": "EW_GREEN", "ALL_RED": "ALL_RED", "NS_STRAIGHT": "NS_GREEN", "NS_LEFT": "NS_GREEN", "EW_STRAIGHT": "EW_GREEN", "EW_LEFT": "EW_GREEN", } return _map.get(self.value, "ALL_RED") # ── Signal state per phase ──────────────────────────────────────────────── _SIGNAL_STATES: dict[Phase, dict[str, str]] = { Phase.NS_GREEN: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"}, Phase.EW_GREEN: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"}, Phase.ALL_RED: {"N": "RED", "S": "RED", "E": "RED", "W": "RED"}, Phase.NS_STRAIGHT: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"}, Phase.NS_LEFT: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"}, Phase.EW_STRAIGHT: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"}, Phase.EW_LEFT: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"}, } # Which movements does each phase serve? _PHASE_MOVEMENTS: dict[Phase, dict] = { Phase.NS_GREEN: {"N": ("straight", "right"), "S": ("straight", "right"), "E": (), "W": ()}, Phase.NS_STRAIGHT: {"N": ("straight",), "S": ("straight",), "E": (), "W": ()}, Phase.NS_LEFT: {"N": ("left",), "S": ("left",), "E": (), "W": ()}, Phase.EW_GREEN: {"N": (), "S": (), "E": ("straight", "right"), "W": ("straight", "right")}, Phase.EW_STRAIGHT: {"N": (), "S": (), "E": ("straight",), "W": ("straight",)}, Phase.EW_LEFT: {"N": (), "S": (), "E": ("left",), "W": ("left",)}, Phase.ALL_RED: {"N": (), "S": (), "E": (), "W": ()}, } # Candidate phases to consider for each axis (ordered greedily) _NS_PHASES = [Phase.NS_GREEN, Phase.NS_STRAIGHT, Phase.NS_LEFT] _EW_PHASES = [Phase.EW_GREEN, Phase.EW_STRAIGHT, Phase.EW_LEFT] _ALL_CANDIDATE_PHASES = _NS_PHASES + _EW_PHASES # ── PhaseManager ───────────────────────────────────────────────────────── class PhaseManager: """ Selects the optimal intersection phase using movement-level max-pressure. Features: - Considers per-movement queue counts (left/straight/right) - Min green time guard (prevents rapid oscillation) - Integrates with EVP override (set ev_flag/ev_lane externally) - Compatible with both dashboard_final.py and realtime_engine_v2.py """ def __init__( self, min_green_sec: float = 5.0, min_left_vehicles: int = 3, ): """ Args: min_green_sec : Minimum seconds to hold a phase min_left_vehicles : Minimum vehicles in left-turn queue to trigger a dedicated NS_LEFT / EW_LEFT phase """ self.min_green_sec = min_green_sec self.min_left_vehicles = min_left_vehicles self._current_phase: Phase = Phase.NS_GREEN self._phase_start: float = time.time() self._ev_flag: bool = False self._ev_lane: Optional[str] = None # Statistics self._phase_counts: dict[str, int] = {} self._total_decisions: int = 0 # ── External EV control ─────────────────────────────────────────────── def trigger_ev(self, lane: str) -> None: """Override: immediately grant green to emergency vehicle lane.""" self._ev_flag = True self._ev_lane = lane def clear_ev(self) -> None: """Resume normal max-pressure after EV has cleared.""" self._ev_flag = False self._ev_lane = None # ── Core decision ───────────────────────────────────────────────────── def decide_phase(self, movements: dict) -> Phase: """ Choose the next phase based on movement queue pressures. Args: movements: { 'N': {'left': int, 'straight': int, 'right': int}, 'S': {'left': int, 'straight': int, 'right': int}, 'E': {'left': int, 'straight': int, 'right': int}, 'W': {'left': int, 'straight': int, 'right': int}, } Returns: Phase enum value """ # ── P0: Emergency Vehicle Priority ──────────────────────────────── if self._ev_flag and self._ev_lane: target = ( Phase.NS_GREEN if self._ev_lane in ("N", "S") else Phase.EW_GREEN ) self._set_phase(target) return self._current_phase # ── Min green time guard ────────────────────────────────────────── elapsed = time.time() - self._phase_start if elapsed < self.min_green_sec: return self._current_phase # too soon to switch # ── Compute pressure for each candidate phase ───────────────────── best_phase = self._current_phase best_pressure = -1.0 for candidate in _ALL_CANDIDATE_PHASES: pressure = self._compute_pressure(candidate, movements) if pressure > best_pressure: best_pressure = pressure best_phase = candidate # ── Only switch if candidate clearly outperforms current ────────── current_pressure = self._compute_pressure(self._current_phase, movements) switch_threshold = 0.5 # hysteresis: only switch if gain > 0.5 vehicles if best_phase != self._current_phase and (best_pressure - current_pressure) > switch_threshold: self._set_phase(best_phase) return self._current_phase def _compute_pressure(self, phase: Phase, movements: dict) -> float: """ Sum queue lengths for all movements served by this phase. Higher = more vehicles will benefit from this phase. """ total = 0.0 phase_mvs = _PHASE_MOVEMENTS.get(phase, {}) for lane, mv_tuple in phase_mvs.items(): lane_data = movements.get(lane, {}) for mv in mv_tuple: total += lane_data.get(mv, 0) return total def _set_phase(self, phase: Phase) -> None: """Internal: commit to a new phase and record stats.""" self._current_phase = phase self._phase_start = time.time() self._total_decisions += 1 key = phase.value self._phase_counts[key] = self._phase_counts.get(key, 0) + 1 # ── Accessors ───────────────────────────────────────────────────────── def get_signal_state(self) -> dict: """ Return per-lane signal state for the current phase. Returns: {'N': 'GREEN'|'RED', 'S': ..., 'E': ..., 'W': ...} """ return _SIGNAL_STATES.get(self._current_phase, _SIGNAL_STATES[Phase.ALL_RED]) def get_simple_phase(self) -> str: """Return current phase as one of NS_GREEN / EW_GREEN / ALL_RED.""" return self._current_phase.to_simple() def time_in_phase(self) -> float: """Seconds since last phase change.""" return time.time() - self._phase_start def get_stats(self) -> dict: """Return phase usage statistics.""" return { "current_phase": self._current_phase.value, "time_in_phase_s": round(self.time_in_phase(), 1), "total_decisions": self._total_decisions, "phase_counts": self._phase_counts.copy(), "ev_active": self._ev_flag, "ev_lane": self._ev_lane, } def get_phase_pressure(self, movements: dict) -> dict: """ Return pressure score for each candidate phase (useful for dashboard). """ return { p.value: round(self._compute_pressure(p, movements), 2) for p in _ALL_CANDIDATE_PHASES } # ── Convenience factory ─────────────────────────────────────────────────── def make_phase_manager(min_green_sec: float = 5.0) -> PhaseManager: """Create a PhaseManager with default settings.""" return PhaseManager(min_green_sec=min_green_sec) # ── CLI test ────────────────────────────────────────────────────────────── if __name__ == "__main__": import random print("🚦 Phase Controller — Self Test") print("=" * 50) pm = PhaseManager(min_green_sec=0.0) # no min time for testing scenarios = [ { "desc": "Heavy NS straight traffic", "movements": { 'N': {'left': 1, 'straight': 12, 'right': 2}, 'S': {'left': 1, 'straight': 10, 'right': 1}, 'E': {'left': 0, 'straight': 3, 'right': 1}, 'W': {'left': 0, 'straight': 2, 'right': 0}, } }, { "desc": "Balanced EW and NS", "movements": { 'N': {'left': 2, 'straight': 5, 'right': 1}, 'S': {'left': 1, 'straight': 4, 'right': 2}, 'E': {'left': 3, 'straight': 7, 'right': 2}, 'W': {'left': 2, 'straight': 6, 'right': 1}, } }, { "desc": "Emergency vehicle from East", "movements": { 'N': {'left': 5, 'straight': 8, 'right': 2}, 'S': {'left': 3, 'straight': 6, 'right': 1}, 'E': {'left': 0, 'straight': 2, 'right': 0}, 'W': {'left': 0, 'straight': 1, 'right': 0}, } }, ] for i, s in enumerate(scenarios): if i == 2: pm.trigger_ev("E") phase = pm.decide_phase(s["movements"]) sigs = pm.get_signal_state() print(f"\nScenario {i+1}: {s['desc']}") print(f" Phase : {phase.value}") print(f" Simple : {pm.get_simple_phase()}") print(f" Signals : N={sigs['N']} S={sigs['S']} E={sigs['E']} W={sigs['W']}") print(f" Stats : {pm.get_stats()}") print("\n✓ Phase controller working")