File size: 13,240 Bytes
99f938a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
"""

phase_controller.py β€” Movement-Level Phase Controller

======================================================

Defines all intersection phases (including turning phases)

and a PhaseManager that selects the optimal phase based on

movement-level queue counts (left/straight/right per lane).



Required by: realtime_engine_v2.py



Classes:

    Phase        β€” Enum of all valid intersection phases

    PhaseManager β€” Selects and manages active phase using max-pressure

"""

from enum import Enum
import time
from typing import Optional


# ── Phase Enum ────────────────────────────────────────────────────────────

class Phase(Enum):
    """

    All conflict-free signal phases for a standard 4-way intersection.



    NS phases serve North + South simultaneously (they don't conflict).

    EW phases serve East + West simultaneously.



    _STRAIGHT variants: straight-going and right-turning vehicles

    _LEFT    variants:  protected left-turn vehicles (opposing turns blocked)

    """

    # Core phases (compatible with dashboard_final.py  PHASES dict)
    NS_GREEN    = "NS_GREEN"     # N+S straight + right (standard green)
    EW_GREEN    = "EW_GREEN"     # E+W straight + right (standard green)
    ALL_RED     = "ALL_RED"      # All red β€” transition / EVP clearance

    # Extended phases (movement-level)
    NS_STRAIGHT = "NS_STRAIGHT"  # N+S straight-only (alias for NS_GREEN)
    NS_LEFT     = "NS_LEFT"      # N+S protected left turns only
    EW_STRAIGHT = "EW_STRAIGHT"  # E+W straight-only (alias for EW_GREEN)
    EW_LEFT     = "EW_LEFT"      # E+W protected left turns only

    def to_simple(self) -> str:
        """

        Convert to one of the 3 simple phases understood by the

        dashboard and traffic_env.py:

            NS_GREEN | EW_GREEN | ALL_RED

        """
        _map = {
            "NS_GREEN":    "NS_GREEN",
            "EW_GREEN":    "EW_GREEN",
            "ALL_RED":     "ALL_RED",
            "NS_STRAIGHT": "NS_GREEN",
            "NS_LEFT":     "NS_GREEN",
            "EW_STRAIGHT": "EW_GREEN",
            "EW_LEFT":     "EW_GREEN",
        }
        return _map.get(self.value, "ALL_RED")


# ── Signal state per phase ────────────────────────────────────────────────

_SIGNAL_STATES: dict[Phase, dict[str, str]] = {
    Phase.NS_GREEN:    {"N": "GREEN", "S": "GREEN", "E": "RED",   "W": "RED"},
    Phase.EW_GREEN:    {"N": "RED",   "S": "RED",   "E": "GREEN", "W": "GREEN"},
    Phase.ALL_RED:     {"N": "RED",   "S": "RED",   "E": "RED",   "W": "RED"},
    Phase.NS_STRAIGHT: {"N": "GREEN", "S": "GREEN", "E": "RED",   "W": "RED"},
    Phase.NS_LEFT:     {"N": "GREEN", "S": "GREEN", "E": "RED",   "W": "RED"},
    Phase.EW_STRAIGHT: {"N": "RED",   "S": "RED",   "E": "GREEN", "W": "GREEN"},
    Phase.EW_LEFT:     {"N": "RED",   "S": "RED",   "E": "GREEN", "W": "GREEN"},
}

# Which movements does each phase serve?
_PHASE_MOVEMENTS: dict[Phase, dict] = {
    Phase.NS_GREEN:    {"N": ("straight", "right"), "S": ("straight", "right"), "E": (), "W": ()},
    Phase.NS_STRAIGHT: {"N": ("straight",),         "S": ("straight",),        "E": (), "W": ()},
    Phase.NS_LEFT:     {"N": ("left",),              "S": ("left",),            "E": (), "W": ()},
    Phase.EW_GREEN:    {"N": (), "S": (), "E": ("straight", "right"), "W": ("straight", "right")},
    Phase.EW_STRAIGHT: {"N": (), "S": (), "E": ("straight",),         "W": ("straight",)},
    Phase.EW_LEFT:     {"N": (), "S": (), "E": ("left",),              "W": ("left",)},
    Phase.ALL_RED:     {"N": (), "S": (), "E": (),                     "W": ()},
}

# Candidate phases to consider for each axis (ordered greedily)
_NS_PHASES = [Phase.NS_GREEN, Phase.NS_STRAIGHT, Phase.NS_LEFT]
_EW_PHASES = [Phase.EW_GREEN, Phase.EW_STRAIGHT, Phase.EW_LEFT]
_ALL_CANDIDATE_PHASES = _NS_PHASES + _EW_PHASES


# ── PhaseManager ─────────────────────────────────────────────────────────

class PhaseManager:
    """

    Selects the optimal intersection phase using movement-level max-pressure.



    Features:

        - Considers per-movement queue counts (left/straight/right)

        - Min green time guard (prevents rapid oscillation)

        - Integrates with EVP override (set ev_flag/ev_lane externally)

        - Compatible with both dashboard_final.py and realtime_engine_v2.py

    """

    def __init__(

        self,

        min_green_sec: float = 5.0,

        min_left_vehicles: int = 3,

    ):
        """

        Args:

            min_green_sec     : Minimum seconds to hold a phase

            min_left_vehicles : Minimum vehicles in left-turn queue

                                to trigger a dedicated NS_LEFT / EW_LEFT phase

        """
        self.min_green_sec      = min_green_sec
        self.min_left_vehicles  = min_left_vehicles

        self._current_phase:  Phase  = Phase.NS_GREEN
        self._phase_start:    float  = time.time()
        self._ev_flag:        bool   = False
        self._ev_lane:        Optional[str] = None

        # Statistics
        self._phase_counts: dict[str, int] = {}
        self._total_decisions: int         = 0

    # ── External EV control ───────────────────────────────────────────────

    def trigger_ev(self, lane: str) -> None:
        """Override: immediately grant green to emergency vehicle lane."""
        self._ev_flag = True
        self._ev_lane = lane

    def clear_ev(self) -> None:
        """Resume normal max-pressure after EV has cleared."""
        self._ev_flag = False
        self._ev_lane = None

    # ── Core decision ─────────────────────────────────────────────────────

    def decide_phase(self, movements: dict) -> Phase:
        """

        Choose the next phase based on movement queue pressures.



        Args:

            movements: {

                'N': {'left': int, 'straight': int, 'right': int},

                'S': {'left': int, 'straight': int, 'right': int},

                'E': {'left': int, 'straight': int, 'right': int},

                'W': {'left': int, 'straight': int, 'right': int},

            }



        Returns:

            Phase enum value

        """
        # ── P0: Emergency Vehicle Priority ────────────────────────────────
        if self._ev_flag and self._ev_lane:
            target = (
                Phase.NS_GREEN
                if self._ev_lane in ("N", "S")
                else Phase.EW_GREEN
            )
            self._set_phase(target)
            return self._current_phase

        # ── Min green time guard ──────────────────────────────────────────
        elapsed = time.time() - self._phase_start
        if elapsed < self.min_green_sec:
            return self._current_phase   # too soon to switch

        # ── Compute pressure for each candidate phase ─────────────────────
        best_phase    = self._current_phase
        best_pressure = -1.0

        for candidate in _ALL_CANDIDATE_PHASES:
            pressure = self._compute_pressure(candidate, movements)
            if pressure > best_pressure:
                best_pressure = pressure
                best_phase    = candidate

        # ── Only switch if candidate clearly outperforms current ──────────
        current_pressure = self._compute_pressure(self._current_phase, movements)
        switch_threshold = 0.5   # hysteresis: only switch if gain > 0.5 vehicles

        if best_phase != self._current_phase and (best_pressure - current_pressure) > switch_threshold:
            self._set_phase(best_phase)

        return self._current_phase

    def _compute_pressure(self, phase: Phase, movements: dict) -> float:
        """

        Sum queue lengths for all movements served by this phase.

        Higher = more vehicles will benefit from this phase.

        """
        total = 0.0
        phase_mvs = _PHASE_MOVEMENTS.get(phase, {})

        for lane, mv_tuple in phase_mvs.items():
            lane_data = movements.get(lane, {})
            for mv in mv_tuple:
                total += lane_data.get(mv, 0)

        return total

    def _set_phase(self, phase: Phase) -> None:
        """Internal: commit to a new phase and record stats."""
        self._current_phase = phase
        self._phase_start   = time.time()
        self._total_decisions += 1
        key = phase.value
        self._phase_counts[key] = self._phase_counts.get(key, 0) + 1

    # ── Accessors ─────────────────────────────────────────────────────────

    def get_signal_state(self) -> dict:
        """

        Return per-lane signal state for the current phase.



        Returns:

            {'N': 'GREEN'|'RED', 'S': ..., 'E': ..., 'W': ...}

        """
        return _SIGNAL_STATES.get(self._current_phase, _SIGNAL_STATES[Phase.ALL_RED])

    def get_simple_phase(self) -> str:
        """Return current phase as one of NS_GREEN / EW_GREEN / ALL_RED."""
        return self._current_phase.to_simple()

    def time_in_phase(self) -> float:
        """Seconds since last phase change."""
        return time.time() - self._phase_start

    def get_stats(self) -> dict:
        """Return phase usage statistics."""
        return {
            "current_phase":   self._current_phase.value,
            "time_in_phase_s": round(self.time_in_phase(), 1),
            "total_decisions": self._total_decisions,
            "phase_counts":    self._phase_counts.copy(),
            "ev_active":       self._ev_flag,
            "ev_lane":         self._ev_lane,
        }

    def get_phase_pressure(self, movements: dict) -> dict:
        """

        Return pressure score for each candidate phase (useful for dashboard).

        """
        return {
            p.value: round(self._compute_pressure(p, movements), 2)
            for p in _ALL_CANDIDATE_PHASES
        }


# ── Convenience factory ───────────────────────────────────────────────────

def make_phase_manager(min_green_sec: float = 5.0) -> PhaseManager:
    """Create a PhaseManager with default settings."""
    return PhaseManager(min_green_sec=min_green_sec)


# ── CLI test ──────────────────────────────────────────────────────────────

if __name__ == "__main__":
    import random

    print("🚦 Phase Controller β€” Self Test")
    print("=" * 50)

    pm = PhaseManager(min_green_sec=0.0)   # no min time for testing

    scenarios = [
        {
            "desc": "Heavy NS straight traffic",
            "movements": {
                'N': {'left': 1, 'straight': 12, 'right': 2},
                'S': {'left': 1, 'straight': 10, 'right': 1},
                'E': {'left': 0, 'straight': 3,  'right': 1},
                'W': {'left': 0, 'straight': 2,  'right': 0},
            }
        },
        {
            "desc": "Balanced EW and NS",
            "movements": {
                'N': {'left': 2, 'straight': 5, 'right': 1},
                'S': {'left': 1, 'straight': 4, 'right': 2},
                'E': {'left': 3, 'straight': 7, 'right': 2},
                'W': {'left': 2, 'straight': 6, 'right': 1},
            }
        },
        {
            "desc": "Emergency vehicle from East",
            "movements": {
                'N': {'left': 5, 'straight': 8, 'right': 2},
                'S': {'left': 3, 'straight': 6, 'right': 1},
                'E': {'left': 0, 'straight': 2, 'right': 0},
                'W': {'left': 0, 'straight': 1, 'right': 0},
            }
        },
    ]

    for i, s in enumerate(scenarios):
        if i == 2:
            pm.trigger_ev("E")

        phase = pm.decide_phase(s["movements"])
        sigs  = pm.get_signal_state()
        print(f"\nScenario {i+1}: {s['desc']}")
        print(f"  Phase   : {phase.value}")
        print(f"  Simple  : {pm.get_simple_phase()}")
        print(f"  Signals : N={sigs['N']}  S={sigs['S']}  E={sigs['E']}  W={sigs['W']}")
        print(f"  Stats   : {pm.get_stats()}")

    print("\nβœ“ Phase controller working")