File size: 3,342 Bytes
5893134
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from __future__ import annotations

from typing import Any


class HeuristicController:
    """

    Simple local traffic-light controller.



    Action space:

        0 -> choose NS green

        1 -> choose EW green



    Assumes:

        queue_lengths = [N, S, E, W]

        waiting_counts = [N, S, E, W]

    """

    def __init__(

        self,

        min_green_steps: int = 5,

        switch_margin: float = 1.0,

        district_bonus_scale: float = 3.0,

        neighbor_pressure_scale: float = 0.25,

    ):
        self.min_green_steps = min_green_steps
        self.switch_margin = switch_margin
        self.district_bonus_scale = district_bonus_scale
        self.neighbor_pressure_scale = neighbor_pressure_scale

    def act(self, obs: dict[str, Any]) -> int:
        queue_lengths = obs.get("queue_lengths", [0, 0, 0, 0])
        waiting_counts = obs.get("waiting_counts", [0, 0, 0, 0])
        current_phase = int(obs.get("current_phase", 0))
        time_since_switch = int(obs.get("time_since_switch", 0))
        district_mode = obs.get("district_mode", "none")
        district_weight = float(obs.get("district_weight", 0.5))
        neighbor_pressure = obs.get("neighbor_pressure", [0.0, 0.0])

        ns_score = (
            queue_lengths[0]
            + queue_lengths[1]
            + 1.5 * (waiting_counts[0] + waiting_counts[1])
        )
        ew_score = (
            queue_lengths[2]
            + queue_lengths[3]
            + 1.5 * (waiting_counts[2] + waiting_counts[3])
        )

        # Optional small neighbor-pressure bias
        if isinstance(neighbor_pressure, list) and len(neighbor_pressure) >= 2:
            ns_score += self.neighbor_pressure_scale * float(neighbor_pressure[0])
            ew_score += self.neighbor_pressure_scale * float(neighbor_pressure[1])

        # District-level strategic bias
        district_bonus = self.district_bonus_scale * district_weight
        if district_mode == "prioritize_ns":
            ns_score += district_bonus
        elif district_mode == "prioritize_ew":
            ew_score += district_bonus
        elif district_mode == "green_wave":
            corridor = obs.get("district_corridor")
            if corridor == "ns":
                ns_score += district_bonus
            elif corridor == "ew":
                ew_score += district_bonus
        elif district_mode == "emergency_route":
            corridor = obs.get("district_corridor")
            if corridor in {"north_to_south", "south_to_north", "ns"}:
                ns_score += district_bonus * 1.5
            elif corridor in {"west_to_east", "east_to_west", "ew"}:
                ew_score += district_bonus * 1.5

        desired_phase = 0 if ns_score >= ew_score else 1

        # Avoid thrashing
        if time_since_switch < self.min_green_steps:
            return current_phase

        # Only switch if the other phase is meaningfully better
        current_score = ns_score if current_phase == 0 else ew_score
        desired_score = ns_score if desired_phase == 0 else ew_score

        if (
            desired_phase != current_phase
            and desired_score < current_score + self.switch_margin
        ):
            return current_phase

        return desired_phase