Spaces:
Sleeping
Sleeping
File size: 13,240 Bytes
99f938a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 | """
phase_controller.py β Movement-Level Phase Controller
======================================================
Defines all intersection phases (including turning phases)
and a PhaseManager that selects the optimal phase based on
movement-level queue counts (left/straight/right per lane).
Required by: realtime_engine_v2.py
Classes:
Phase β Enum of all valid intersection phases
PhaseManager β Selects and manages active phase using max-pressure
"""
from enum import Enum
import time
from typing import Optional
# ββ Phase Enum ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class Phase(Enum):
"""
All conflict-free signal phases for a standard 4-way intersection.
NS phases serve North + South simultaneously (they don't conflict).
EW phases serve East + West simultaneously.
_STRAIGHT variants: straight-going and right-turning vehicles
_LEFT variants: protected left-turn vehicles (opposing turns blocked)
"""
# Core phases (compatible with dashboard_final.py PHASES dict)
NS_GREEN = "NS_GREEN" # N+S straight + right (standard green)
EW_GREEN = "EW_GREEN" # E+W straight + right (standard green)
ALL_RED = "ALL_RED" # All red β transition / EVP clearance
# Extended phases (movement-level)
NS_STRAIGHT = "NS_STRAIGHT" # N+S straight-only (alias for NS_GREEN)
NS_LEFT = "NS_LEFT" # N+S protected left turns only
EW_STRAIGHT = "EW_STRAIGHT" # E+W straight-only (alias for EW_GREEN)
EW_LEFT = "EW_LEFT" # E+W protected left turns only
def to_simple(self) -> str:
"""
Convert to one of the 3 simple phases understood by the
dashboard and traffic_env.py:
NS_GREEN | EW_GREEN | ALL_RED
"""
_map = {
"NS_GREEN": "NS_GREEN",
"EW_GREEN": "EW_GREEN",
"ALL_RED": "ALL_RED",
"NS_STRAIGHT": "NS_GREEN",
"NS_LEFT": "NS_GREEN",
"EW_STRAIGHT": "EW_GREEN",
"EW_LEFT": "EW_GREEN",
}
return _map.get(self.value, "ALL_RED")
# ββ Signal state per phase ββββββββββββββββββββββββββββββββββββββββββββββββ
_SIGNAL_STATES: dict[Phase, dict[str, str]] = {
Phase.NS_GREEN: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"},
Phase.EW_GREEN: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"},
Phase.ALL_RED: {"N": "RED", "S": "RED", "E": "RED", "W": "RED"},
Phase.NS_STRAIGHT: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"},
Phase.NS_LEFT: {"N": "GREEN", "S": "GREEN", "E": "RED", "W": "RED"},
Phase.EW_STRAIGHT: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"},
Phase.EW_LEFT: {"N": "RED", "S": "RED", "E": "GREEN", "W": "GREEN"},
}
# Which movements does each phase serve?
_PHASE_MOVEMENTS: dict[Phase, dict] = {
Phase.NS_GREEN: {"N": ("straight", "right"), "S": ("straight", "right"), "E": (), "W": ()},
Phase.NS_STRAIGHT: {"N": ("straight",), "S": ("straight",), "E": (), "W": ()},
Phase.NS_LEFT: {"N": ("left",), "S": ("left",), "E": (), "W": ()},
Phase.EW_GREEN: {"N": (), "S": (), "E": ("straight", "right"), "W": ("straight", "right")},
Phase.EW_STRAIGHT: {"N": (), "S": (), "E": ("straight",), "W": ("straight",)},
Phase.EW_LEFT: {"N": (), "S": (), "E": ("left",), "W": ("left",)},
Phase.ALL_RED: {"N": (), "S": (), "E": (), "W": ()},
}
# Candidate phases to consider for each axis (ordered greedily)
_NS_PHASES = [Phase.NS_GREEN, Phase.NS_STRAIGHT, Phase.NS_LEFT]
_EW_PHASES = [Phase.EW_GREEN, Phase.EW_STRAIGHT, Phase.EW_LEFT]
_ALL_CANDIDATE_PHASES = _NS_PHASES + _EW_PHASES
# ββ PhaseManager βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class PhaseManager:
"""
Selects the optimal intersection phase using movement-level max-pressure.
Features:
- Considers per-movement queue counts (left/straight/right)
- Min green time guard (prevents rapid oscillation)
- Integrates with EVP override (set ev_flag/ev_lane externally)
- Compatible with both dashboard_final.py and realtime_engine_v2.py
"""
def __init__(
self,
min_green_sec: float = 5.0,
min_left_vehicles: int = 3,
):
"""
Args:
min_green_sec : Minimum seconds to hold a phase
min_left_vehicles : Minimum vehicles in left-turn queue
to trigger a dedicated NS_LEFT / EW_LEFT phase
"""
self.min_green_sec = min_green_sec
self.min_left_vehicles = min_left_vehicles
self._current_phase: Phase = Phase.NS_GREEN
self._phase_start: float = time.time()
self._ev_flag: bool = False
self._ev_lane: Optional[str] = None
# Statistics
self._phase_counts: dict[str, int] = {}
self._total_decisions: int = 0
# ββ External EV control βββββββββββββββββββββββββββββββββββββββββββββββ
def trigger_ev(self, lane: str) -> None:
"""Override: immediately grant green to emergency vehicle lane."""
self._ev_flag = True
self._ev_lane = lane
def clear_ev(self) -> None:
"""Resume normal max-pressure after EV has cleared."""
self._ev_flag = False
self._ev_lane = None
# ββ Core decision βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def decide_phase(self, movements: dict) -> Phase:
"""
Choose the next phase based on movement queue pressures.
Args:
movements: {
'N': {'left': int, 'straight': int, 'right': int},
'S': {'left': int, 'straight': int, 'right': int},
'E': {'left': int, 'straight': int, 'right': int},
'W': {'left': int, 'straight': int, 'right': int},
}
Returns:
Phase enum value
"""
# ββ P0: Emergency Vehicle Priority ββββββββββββββββββββββββββββββββ
if self._ev_flag and self._ev_lane:
target = (
Phase.NS_GREEN
if self._ev_lane in ("N", "S")
else Phase.EW_GREEN
)
self._set_phase(target)
return self._current_phase
# ββ Min green time guard ββββββββββββββββββββββββββββββββββββββββββ
elapsed = time.time() - self._phase_start
if elapsed < self.min_green_sec:
return self._current_phase # too soon to switch
# ββ Compute pressure for each candidate phase βββββββββββββββββββββ
best_phase = self._current_phase
best_pressure = -1.0
for candidate in _ALL_CANDIDATE_PHASES:
pressure = self._compute_pressure(candidate, movements)
if pressure > best_pressure:
best_pressure = pressure
best_phase = candidate
# ββ Only switch if candidate clearly outperforms current ββββββββββ
current_pressure = self._compute_pressure(self._current_phase, movements)
switch_threshold = 0.5 # hysteresis: only switch if gain > 0.5 vehicles
if best_phase != self._current_phase and (best_pressure - current_pressure) > switch_threshold:
self._set_phase(best_phase)
return self._current_phase
def _compute_pressure(self, phase: Phase, movements: dict) -> float:
"""
Sum queue lengths for all movements served by this phase.
Higher = more vehicles will benefit from this phase.
"""
total = 0.0
phase_mvs = _PHASE_MOVEMENTS.get(phase, {})
for lane, mv_tuple in phase_mvs.items():
lane_data = movements.get(lane, {})
for mv in mv_tuple:
total += lane_data.get(mv, 0)
return total
def _set_phase(self, phase: Phase) -> None:
"""Internal: commit to a new phase and record stats."""
self._current_phase = phase
self._phase_start = time.time()
self._total_decisions += 1
key = phase.value
self._phase_counts[key] = self._phase_counts.get(key, 0) + 1
# ββ Accessors βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_signal_state(self) -> dict:
"""
Return per-lane signal state for the current phase.
Returns:
{'N': 'GREEN'|'RED', 'S': ..., 'E': ..., 'W': ...}
"""
return _SIGNAL_STATES.get(self._current_phase, _SIGNAL_STATES[Phase.ALL_RED])
def get_simple_phase(self) -> str:
"""Return current phase as one of NS_GREEN / EW_GREEN / ALL_RED."""
return self._current_phase.to_simple()
def time_in_phase(self) -> float:
"""Seconds since last phase change."""
return time.time() - self._phase_start
def get_stats(self) -> dict:
"""Return phase usage statistics."""
return {
"current_phase": self._current_phase.value,
"time_in_phase_s": round(self.time_in_phase(), 1),
"total_decisions": self._total_decisions,
"phase_counts": self._phase_counts.copy(),
"ev_active": self._ev_flag,
"ev_lane": self._ev_lane,
}
def get_phase_pressure(self, movements: dict) -> dict:
"""
Return pressure score for each candidate phase (useful for dashboard).
"""
return {
p.value: round(self._compute_pressure(p, movements), 2)
for p in _ALL_CANDIDATE_PHASES
}
# ββ Convenience factory βββββββββββββββββββββββββββββββββββββββββββββββββββ
def make_phase_manager(min_green_sec: float = 5.0) -> PhaseManager:
"""Create a PhaseManager with default settings."""
return PhaseManager(min_green_sec=min_green_sec)
# ββ CLI test ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
import random
print("π¦ Phase Controller β Self Test")
print("=" * 50)
pm = PhaseManager(min_green_sec=0.0) # no min time for testing
scenarios = [
{
"desc": "Heavy NS straight traffic",
"movements": {
'N': {'left': 1, 'straight': 12, 'right': 2},
'S': {'left': 1, 'straight': 10, 'right': 1},
'E': {'left': 0, 'straight': 3, 'right': 1},
'W': {'left': 0, 'straight': 2, 'right': 0},
}
},
{
"desc": "Balanced EW and NS",
"movements": {
'N': {'left': 2, 'straight': 5, 'right': 1},
'S': {'left': 1, 'straight': 4, 'right': 2},
'E': {'left': 3, 'straight': 7, 'right': 2},
'W': {'left': 2, 'straight': 6, 'right': 1},
}
},
{
"desc": "Emergency vehicle from East",
"movements": {
'N': {'left': 5, 'straight': 8, 'right': 2},
'S': {'left': 3, 'straight': 6, 'right': 1},
'E': {'left': 0, 'straight': 2, 'right': 0},
'W': {'left': 0, 'straight': 1, 'right': 0},
}
},
]
for i, s in enumerate(scenarios):
if i == 2:
pm.trigger_ev("E")
phase = pm.decide_phase(s["movements"])
sigs = pm.get_signal_state()
print(f"\nScenario {i+1}: {s['desc']}")
print(f" Phase : {phase.value}")
print(f" Simple : {pm.get_simple_phase()}")
print(f" Signals : N={sigs['N']} S={sigs['S']} E={sigs['E']} W={sigs['W']}")
print(f" Stats : {pm.get_stats()}")
print("\nβ Phase controller working")
|