Spaces:
Sleeping
Sleeping
| """ | |
| phase_controller.py β Movement-Level Phase Controller | |
| ====================================================== | |
| Defines all intersection phases (including turning phases) | |
| and a PhaseManager that selects the optimal phase based on | |
| movement-level queue counts (left/straight/right per lane). | |
| Required by: realtime_engine_v2.py | |
| Classes: | |
| Phase β Enum of all valid intersection phases | |
| PhaseManager β Selects and manages active phase using max-pressure | |
| """ | |
| from enum import Enum | |
| import time | |
| from typing import Optional | |
| # ββ Phase Enum ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class Phase(Enum): | |
| """ | |
| All conflict-free signal phases for a standard 4-way intersection. | |
| NS phases serve North + South simultaneously (they don't conflict). | |
| EW phases serve East + West simultaneously. | |
| _STRAIGHT variants: straight-going and right-turning vehicles | |
| _LEFT variants: protected left-turn vehicles (opposing turns blocked) | |
| """ | |
| # Core phases (compatible with dashboard_final.py PHASES dict) | |
| NS_GREEN = "NS_GREEN" # N+S straight + right (standard green) | |
| EW_GREEN = "EW_GREEN" # E+W straight + right (standard green) | |
| ALL_RED = "ALL_RED" # All red β transition / EVP clearance | |
| # Extended phases (movement-level) | |
| NS_STRAIGHT = "NS_STRAIGHT" # N+S straight-only (alias for NS_GREEN) | |
| NS_LEFT = "NS_LEFT" # N+S protected left turns only | |
| EW_STRAIGHT = "EW_STRAIGHT" # E+W straight-only (alias for EW_GREEN) | |
| EW_LEFT = "EW_LEFT" # E+W protected left turns only | |
| def to_simple(self) -> str: | |
| """ | |
| Convert to one of the 3 simple phases understood by the | |
| dashboard and traffic_env.py: | |
| NS_GREEN | EW_GREEN | ALL_RED | |
| """ | |
| _map = { | |
| "NS_GREEN": "NS_GREEN", | |
| "EW_GREEN": "EW_GREEN", | |
| "ALL_RED": "ALL_RED", | |
| "NS_STRAIGHT": "NS_GREEN", | |
| "NS_LEFT": "NS_GREEN", | |
| "EW_STRAIGHT": "EW_GREEN", | |
| "EW_LEFT": "EW_GREEN", | |
| } | |
| return _map.get(self.value, "ALL_RED") | |
| # ββ Signal state per phase ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _SIGNAL_STATES: dict[Phase, dict[str, str]] = { | |
| Phase.NS_GREEN: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"}, | |
| Phase.EW_GREEN: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"}, | |
| Phase.ALL_RED: {"N": "RED", "S": "RED", "E": "RED", "W": "RED"}, | |
| Phase.NS_STRAIGHT: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"}, | |
| Phase.NS_LEFT: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"}, | |
| Phase.EW_STRAIGHT: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"}, | |
| Phase.EW_LEFT: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"}, | |
| } | |
| # Which movements does each phase serve? | |
| _PHASE_MOVEMENTS: dict[Phase, dict] = { | |
| Phase.NS_GREEN: {"N": ("straight", "right"), "S": ("straight", "right"), "E": (), "W": ()}, | |
| Phase.NS_STRAIGHT: {"N": ("straight",), "S": ("straight",), "E": (), "W": ()}, | |
| Phase.NS_LEFT: {"N": ("left",), "S": ("left",), "E": (), "W": ()}, | |
| Phase.EW_GREEN: {"N": (), "S": (), "E": ("straight", "right"), "W": ("straight", "right")}, | |
| Phase.EW_STRAIGHT: {"N": (), "S": (), "E": ("straight",), "W": ("straight",)}, | |
| Phase.EW_LEFT: {"N": (), "S": (), "E": ("left",), "W": ("left",)}, | |
| Phase.ALL_RED: {"N": (), "S": (), "E": (), "W": ()}, | |
| } | |
| # Candidate phases to consider for each axis (ordered greedily) | |
| _NS_PHASES = [Phase.NS_GREEN, Phase.NS_STRAIGHT, Phase.NS_LEFT] | |
| _EW_PHASES = [Phase.EW_GREEN, Phase.EW_STRAIGHT, Phase.EW_LEFT] | |
| _ALL_CANDIDATE_PHASES = _NS_PHASES + _EW_PHASES | |
| # ββ PhaseManager βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class PhaseManager: | |
| """ | |
| Selects the optimal intersection phase using movement-level max-pressure. | |
| Features: | |
| - Considers per-movement queue counts (left/straight/right) | |
| - Min green time guard (prevents rapid oscillation) | |
| - Integrates with EVP override (set ev_flag/ev_lane externally) | |
| - Compatible with both dashboard_final.py and realtime_engine_v2.py | |
| """ | |
| def __init__( | |
| self, | |
| min_green_sec: float = 5.0, | |
| min_left_vehicles: int = 3, | |
| ): | |
| """ | |
| Args: | |
| min_green_sec : Minimum seconds to hold a phase | |
| min_left_vehicles : Minimum vehicles in left-turn queue | |
| to trigger a dedicated NS_LEFT / EW_LEFT phase | |
| """ | |
| self.min_green_sec = min_green_sec | |
| self.min_left_vehicles = min_left_vehicles | |
| self._current_phase: Phase = Phase.NS_GREEN | |
| self._phase_start: float = time.time() | |
| self._ev_flag: bool = False | |
| self._ev_lane: Optional[str] = None | |
| # Statistics | |
| self._phase_counts: dict[str, int] = {} | |
| self._total_decisions: int = 0 | |
| # ββ External EV control βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def trigger_ev(self, lane: str) -> None: | |
| """Override: immediately grant green to emergency vehicle lane.""" | |
| self._ev_flag = True | |
| self._ev_lane = lane | |
| def clear_ev(self) -> None: | |
| """Resume normal max-pressure after EV has cleared.""" | |
| self._ev_flag = False | |
| self._ev_lane = None | |
| # ββ Core decision βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def decide_phase(self, movements: dict) -> Phase: | |
| """ | |
| Choose the next phase based on movement queue pressures. | |
| Args: | |
| movements: { | |
| 'N': {'left': int, 'straight': int, 'right': int}, | |
| 'S': {'left': int, 'straight': int, 'right': int}, | |
| 'E': {'left': int, 'straight': int, 'right': int}, | |
| 'W': {'left': int, 'straight': int, 'right': int}, | |
| } | |
| Returns: | |
| Phase enum value | |
| """ | |
| # ββ P0: Emergency Vehicle Priority ββββββββββββββββββββββββββββββββ | |
| if self._ev_flag and self._ev_lane: | |
| target = ( | |
| Phase.NS_GREEN | |
| if self._ev_lane in ("N", "S") | |
| else Phase.EW_GREEN | |
| ) | |
| self._set_phase(target) | |
| return self._current_phase | |
| # ββ Min green time guard ββββββββββββββββββββββββββββββββββββββββββ | |
| elapsed = time.time() - self._phase_start | |
| if elapsed < self.min_green_sec: | |
| return self._current_phase # too soon to switch | |
| # ββ Compute pressure for each candidate phase βββββββββββββββββββββ | |
| best_phase = self._current_phase | |
| best_pressure = -1.0 | |
| for candidate in _ALL_CANDIDATE_PHASES: | |
| pressure = self._compute_pressure(candidate, movements) | |
| if pressure > best_pressure: | |
| best_pressure = pressure | |
| best_phase = candidate | |
| # ββ Only switch if candidate clearly outperforms current ββββββββββ | |
| current_pressure = self._compute_pressure(self._current_phase, movements) | |
| switch_threshold = 0.5 # hysteresis: only switch if gain > 0.5 vehicles | |
| if best_phase != self._current_phase and (best_pressure - current_pressure) > switch_threshold: | |
| self._set_phase(best_phase) | |
| return self._current_phase | |
| def _compute_pressure(self, phase: Phase, movements: dict) -> float: | |
| """ | |
| Sum queue lengths for all movements served by this phase. | |
| Higher = more vehicles will benefit from this phase. | |
| """ | |
| total = 0.0 | |
| phase_mvs = _PHASE_MOVEMENTS.get(phase, {}) | |
| for lane, mv_tuple in phase_mvs.items(): | |
| lane_data = movements.get(lane, {}) | |
| for mv in mv_tuple: | |
| total += lane_data.get(mv, 0) | |
| return total | |
| def _set_phase(self, phase: Phase) -> None: | |
| """Internal: commit to a new phase and record stats.""" | |
| self._current_phase = phase | |
| self._phase_start = time.time() | |
| self._total_decisions += 1 | |
| key = phase.value | |
| self._phase_counts[key] = self._phase_counts.get(key, 0) + 1 | |
| # ββ Accessors βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_signal_state(self) -> dict: | |
| """ | |
| Return per-lane signal state for the current phase. | |
| Returns: | |
| {'N': 'GREEN'|'RED', 'S': ..., 'E': ..., 'W': ...} | |
| """ | |
| return _SIGNAL_STATES.get(self._current_phase, _SIGNAL_STATES[Phase.ALL_RED]) | |
| def get_simple_phase(self) -> str: | |
| """Return current phase as one of NS_GREEN / EW_GREEN / ALL_RED.""" | |
| return self._current_phase.to_simple() | |
| def time_in_phase(self) -> float: | |
| """Seconds since last phase change.""" | |
| return time.time() - self._phase_start | |
| def get_stats(self) -> dict: | |
| """Return phase usage statistics.""" | |
| return { | |
| "current_phase": self._current_phase.value, | |
| "time_in_phase_s": round(self.time_in_phase(), 1), | |
| "total_decisions": self._total_decisions, | |
| "phase_counts": self._phase_counts.copy(), | |
| "ev_active": self._ev_flag, | |
| "ev_lane": self._ev_lane, | |
| } | |
| def get_phase_pressure(self, movements: dict) -> dict: | |
| """ | |
| Return pressure score for each candidate phase (useful for dashboard). | |
| """ | |
| return { | |
| p.value: round(self._compute_pressure(p, movements), 2) | |
| for p in _ALL_CANDIDATE_PHASES | |
| } | |
| # ββ Convenience factory βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def make_phase_manager(min_green_sec: float = 5.0) -> PhaseManager: | |
| """Create a PhaseManager with default settings.""" | |
| return PhaseManager(min_green_sec=min_green_sec) | |
| # ββ CLI test ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import random | |
| print("π¦ Phase Controller β Self Test") | |
| print("=" * 50) | |
| pm = PhaseManager(min_green_sec=0.0) # no min time for testing | |
| scenarios = [ | |
| { | |
| "desc": "Heavy NS straight traffic", | |
| "movements": { | |
| 'N': {'left': 1, 'straight': 12, 'right': 2}, | |
| 'S': {'left': 1, 'straight': 10, 'right': 1}, | |
| 'E': {'left': 0, 'straight': 3, 'right': 1}, | |
| 'W': {'left': 0, 'straight': 2, 'right': 0}, | |
| } | |
| }, | |
| { | |
| "desc": "Balanced EW and NS", | |
| "movements": { | |
| 'N': {'left': 2, 'straight': 5, 'right': 1}, | |
| 'S': {'left': 1, 'straight': 4, 'right': 2}, | |
| 'E': {'left': 3, 'straight': 7, 'right': 2}, | |
| 'W': {'left': 2, 'straight': 6, 'right': 1}, | |
| } | |
| }, | |
| { | |
| "desc": "Emergency vehicle from East", | |
| "movements": { | |
| 'N': {'left': 5, 'straight': 8, 'right': 2}, | |
| 'S': {'left': 3, 'straight': 6, 'right': 1}, | |
| 'E': {'left': 0, 'straight': 2, 'right': 0}, | |
| 'W': {'left': 0, 'straight': 1, 'right': 0}, | |
| } | |
| }, | |
| ] | |
| for i, s in enumerate(scenarios): | |
| if i == 2: | |
| pm.trigger_ev("E") | |
| phase = pm.decide_phase(s["movements"]) | |
| sigs = pm.get_signal_state() | |
| print(f"\nScenario {i+1}: {s['desc']}") | |
| print(f" Phase : {phase.value}") | |
| print(f" Simple : {pm.get_simple_phase()}") | |
| print(f" Signals : N={sigs['N']} S={sigs['S']} E={sigs['E']} W={sigs['W']}") | |
| print(f" Stats : {pm.get_stats()}") | |
| print("\nβ Phase controller working") | |