pranit / phase_controller.py
RushiMane2003's picture
Upload 41 files
99f938a verified
"""
phase_controller.py β€” Movement-Level Phase Controller
======================================================
Defines all intersection phases (including turning phases)
and a PhaseManager that selects the optimal phase based on
movement-level queue counts (left/straight/right per lane).
Required by: realtime_engine_v2.py
Classes:
Phase β€” Enum of all valid intersection phases
PhaseManager β€” Selects and manages active phase using max-pressure
"""
from enum import Enum
import time
from typing import Optional
# ── Phase Enum ────────────────────────────────────────────────────────────
class Phase(Enum):
"""
All conflict-free signal phases for a standard 4-way intersection.
NS phases serve North + South simultaneously (they don't conflict).
EW phases serve East + West simultaneously.
_STRAIGHT variants: straight-going and right-turning vehicles
_LEFT variants: protected left-turn vehicles (opposing turns blocked)
"""
# Core phases (compatible with dashboard_final.py PHASES dict)
NS_GREEN = "NS_GREEN" # N+S straight + right (standard green)
EW_GREEN = "EW_GREEN" # E+W straight + right (standard green)
ALL_RED = "ALL_RED" # All red β€” transition / EVP clearance
# Extended phases (movement-level)
NS_STRAIGHT = "NS_STRAIGHT" # N+S straight-only (alias for NS_GREEN)
NS_LEFT = "NS_LEFT" # N+S protected left turns only
EW_STRAIGHT = "EW_STRAIGHT" # E+W straight-only (alias for EW_GREEN)
EW_LEFT = "EW_LEFT" # E+W protected left turns only
def to_simple(self) -> str:
"""
Convert to one of the 3 simple phases understood by the
dashboard and traffic_env.py:
NS_GREEN | EW_GREEN | ALL_RED
"""
_map = {
"NS_GREEN": "NS_GREEN",
"EW_GREEN": "EW_GREEN",
"ALL_RED": "ALL_RED",
"NS_STRAIGHT": "NS_GREEN",
"NS_LEFT": "NS_GREEN",
"EW_STRAIGHT": "EW_GREEN",
"EW_LEFT": "EW_GREEN",
}
return _map.get(self.value, "ALL_RED")
# ── Signal state per phase ────────────────────────────────────────────────
_SIGNAL_STATES: dict[Phase, dict[str, str]] = {
Phase.NS_GREEN: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"},
Phase.EW_GREEN: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"},
Phase.ALL_RED: {"N": "RED", "S": "RED", "E": "RED", "W": "RED"},
Phase.NS_STRAIGHT: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"},
Phase.NS_LEFT: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"},
Phase.EW_STRAIGHT: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"},
Phase.EW_LEFT: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"},
}
# Which movements does each phase serve?
_PHASE_MOVEMENTS: dict[Phase, dict] = {
Phase.NS_GREEN: {"N": ("straight", "right"), "S": ("straight", "right"), "E": (), "W": ()},
Phase.NS_STRAIGHT: {"N": ("straight",), "S": ("straight",), "E": (), "W": ()},
Phase.NS_LEFT: {"N": ("left",), "S": ("left",), "E": (), "W": ()},
Phase.EW_GREEN: {"N": (), "S": (), "E": ("straight", "right"), "W": ("straight", "right")},
Phase.EW_STRAIGHT: {"N": (), "S": (), "E": ("straight",), "W": ("straight",)},
Phase.EW_LEFT: {"N": (), "S": (), "E": ("left",), "W": ("left",)},
Phase.ALL_RED: {"N": (), "S": (), "E": (), "W": ()},
}
# Candidate phases to consider for each axis (ordered greedily)
_NS_PHASES = [Phase.NS_GREEN, Phase.NS_STRAIGHT, Phase.NS_LEFT]
_EW_PHASES = [Phase.EW_GREEN, Phase.EW_STRAIGHT, Phase.EW_LEFT]
_ALL_CANDIDATE_PHASES = _NS_PHASES + _EW_PHASES
# ── PhaseManager ─────────────────────────────────────────────────────────
class PhaseManager:
"""
Selects the optimal intersection phase using movement-level max-pressure.
Features:
- Considers per-movement queue counts (left/straight/right)
- Min green time guard (prevents rapid oscillation)
- Integrates with EVP override (set ev_flag/ev_lane externally)
- Compatible with both dashboard_final.py and realtime_engine_v2.py
"""
def __init__(
self,
min_green_sec: float = 5.0,
min_left_vehicles: int = 3,
):
"""
Args:
min_green_sec : Minimum seconds to hold a phase
min_left_vehicles : Minimum vehicles in left-turn queue
to trigger a dedicated NS_LEFT / EW_LEFT phase
"""
self.min_green_sec = min_green_sec
self.min_left_vehicles = min_left_vehicles
self._current_phase: Phase = Phase.NS_GREEN
self._phase_start: float = time.time()
self._ev_flag: bool = False
self._ev_lane: Optional[str] = None
# Statistics
self._phase_counts: dict[str, int] = {}
self._total_decisions: int = 0
# ── External EV control ───────────────────────────────────────────────
def trigger_ev(self, lane: str) -> None:
"""Override: immediately grant green to emergency vehicle lane."""
self._ev_flag = True
self._ev_lane = lane
def clear_ev(self) -> None:
"""Resume normal max-pressure after EV has cleared."""
self._ev_flag = False
self._ev_lane = None
# ── Core decision ─────────────────────────────────────────────────────
def decide_phase(self, movements: dict) -> Phase:
"""
Choose the next phase based on movement queue pressures.
Args:
movements: {
'N': {'left': int, 'straight': int, 'right': int},
'S': {'left': int, 'straight': int, 'right': int},
'E': {'left': int, 'straight': int, 'right': int},
'W': {'left': int, 'straight': int, 'right': int},
}
Returns:
Phase enum value
"""
# ── P0: Emergency Vehicle Priority ────────────────────────────────
if self._ev_flag and self._ev_lane:
target = (
Phase.NS_GREEN
if self._ev_lane in ("N", "S")
else Phase.EW_GREEN
)
self._set_phase(target)
return self._current_phase
# ── Min green time guard ──────────────────────────────────────────
elapsed = time.time() - self._phase_start
if elapsed < self.min_green_sec:
return self._current_phase # too soon to switch
# ── Compute pressure for each candidate phase ─────────────────────
best_phase = self._current_phase
best_pressure = -1.0
for candidate in _ALL_CANDIDATE_PHASES:
pressure = self._compute_pressure(candidate, movements)
if pressure > best_pressure:
best_pressure = pressure
best_phase = candidate
# ── Only switch if candidate clearly outperforms current ──────────
current_pressure = self._compute_pressure(self._current_phase, movements)
switch_threshold = 0.5 # hysteresis: only switch if gain > 0.5 vehicles
if best_phase != self._current_phase and (best_pressure - current_pressure) > switch_threshold:
self._set_phase(best_phase)
return self._current_phase
def _compute_pressure(self, phase: Phase, movements: dict) -> float:
"""
Sum queue lengths for all movements served by this phase.
Higher = more vehicles will benefit from this phase.
"""
total = 0.0
phase_mvs = _PHASE_MOVEMENTS.get(phase, {})
for lane, mv_tuple in phase_mvs.items():
lane_data = movements.get(lane, {})
for mv in mv_tuple:
total += lane_data.get(mv, 0)
return total
def _set_phase(self, phase: Phase) -> None:
"""Internal: commit to a new phase and record stats."""
self._current_phase = phase
self._phase_start = time.time()
self._total_decisions += 1
key = phase.value
self._phase_counts[key] = self._phase_counts.get(key, 0) + 1
# ── Accessors ─────────────────────────────────────────────────────────
def get_signal_state(self) -> dict:
"""
Return per-lane signal state for the current phase.
Returns:
{'N': 'GREEN'|'RED', 'S': ..., 'E': ..., 'W': ...}
"""
return _SIGNAL_STATES.get(self._current_phase, _SIGNAL_STATES[Phase.ALL_RED])
def get_simple_phase(self) -> str:
"""Return current phase as one of NS_GREEN / EW_GREEN / ALL_RED."""
return self._current_phase.to_simple()
def time_in_phase(self) -> float:
"""Seconds since last phase change."""
return time.time() - self._phase_start
def get_stats(self) -> dict:
"""Return phase usage statistics."""
return {
"current_phase": self._current_phase.value,
"time_in_phase_s": round(self.time_in_phase(), 1),
"total_decisions": self._total_decisions,
"phase_counts": self._phase_counts.copy(),
"ev_active": self._ev_flag,
"ev_lane": self._ev_lane,
}
def get_phase_pressure(self, movements: dict) -> dict:
"""
Return pressure score for each candidate phase (useful for dashboard).
"""
return {
p.value: round(self._compute_pressure(p, movements), 2)
for p in _ALL_CANDIDATE_PHASES
}
# ── Convenience factory ───────────────────────────────────────────────────
def make_phase_manager(min_green_sec: float = 5.0) -> PhaseManager:
"""Create a PhaseManager with default settings."""
return PhaseManager(min_green_sec=min_green_sec)
# ── CLI test ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
import random
print("🚦 Phase Controller β€” Self Test")
print("=" * 50)
pm = PhaseManager(min_green_sec=0.0) # no min time for testing
scenarios = [
{
"desc": "Heavy NS straight traffic",
"movements": {
'N': {'left': 1, 'straight': 12, 'right': 2},
'S': {'left': 1, 'straight': 10, 'right': 1},
'E': {'left': 0, 'straight': 3, 'right': 1},
'W': {'left': 0, 'straight': 2, 'right': 0},
}
},
{
"desc": "Balanced EW and NS",
"movements": {
'N': {'left': 2, 'straight': 5, 'right': 1},
'S': {'left': 1, 'straight': 4, 'right': 2},
'E': {'left': 3, 'straight': 7, 'right': 2},
'W': {'left': 2, 'straight': 6, 'right': 1},
}
},
{
"desc": "Emergency vehicle from East",
"movements": {
'N': {'left': 5, 'straight': 8, 'right': 2},
'S': {'left': 3, 'straight': 6, 'right': 1},
'E': {'left': 0, 'straight': 2, 'right': 0},
'W': {'left': 0, 'straight': 1, 'right': 0},
}
},
]
for i, s in enumerate(scenarios):
if i == 2:
pm.trigger_ev("E")
phase = pm.decide_phase(s["movements"])
sigs = pm.get_signal_state()
print(f"\nScenario {i+1}: {s['desc']}")
print(f" Phase : {phase.value}")
print(f" Simple : {pm.get_simple_phase()}")
print(f" Signals : N={sigs['N']} S={sigs['S']} E={sigs['E']} W={sigs['W']}")
print(f" Stats : {pm.get_stats()}")
print("\nβœ“ Phase controller working")