| |
| |
| |
| |
| |
|
|
| """ |
| Road Traffic Simulator β RL Environment. |
| |
| The agent controls a single car navigating from a random start node to a |
| random end node in downtown San Francisco. 2 000 background vehicles create |
| realistic traffic that the agent must route around. |
| |
| Observation |
| ----------- |
| * lat / lon β current GPS position |
| * goal_lat/goal_lon β destination GPS position |
| * distance_to_goal β straight-line metres to goal |
| * available_actions β number of outgoing edges (action range) |
| * map_screenshot β 400Γ400 PNG base64, roads coloured by traffic density |
| |
| Action |
| ------ |
| * next_edge_index β which outgoing edge to follow at the current node |
| |
| Reward |
| ------ |
| -1.0 per step (time penalty) |
| -3.0 * edge_density (congestion penalty for traversed edge, density β [0,1]) |
| +progress_reward (metres gained toward goal Γ 0.05) |
| +500.0 on reaching goal |
| """ |
|
|
| import base64 |
| import heapq |
| import io |
| import math |
| import random |
| from collections import defaultdict |
| from uuid import uuid4 |
| from typing import Dict, List, Optional, Tuple |
|
|
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
| from matplotlib.collections import LineCollection |
|
|
| import requests |
|
|
| from openenv.core.env_server.interfaces import Environment |
| from openenv.core.env_server.types import State |
|
|
| from models import RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation |
|
|
|
|
| |
| |
| |
|
|
| BBOX = dict(south=37.770, west=-122.430, north=37.800, east=-122.395) |
|
|
| NUM_BG_AGENTS = 5000 |
| BG_SUBSTEPS_PER_RL_STEP = 60 |
| MAX_RL_STEPS = 300 |
| GOAL_RADIUS_M = 50.0 |
|
|
| |
| _HW_LAYERS = ["unclassified", "residential", "tertiary", "secondary", "primary", "trunk", "motorway"] |
| _HW_ROAD_LW = {"motorway": 4.0, "trunk": 3.4, "primary": 2.8, "secondary": 2.2, |
| "tertiary": 1.6, "residential": 1.1, "unclassified": 0.8} |
| _HW_OUTLINE_LW= {"motorway": 6.2, "trunk": 5.4, "primary": 4.6, "secondary": 3.6, |
| "tertiary": 2.7, "residential": 1.9, "unclassified": 1.4} |
|
|
| |
| _TILE_ZOOM = 16 |
| _TILE_URLS = [ |
| "https://basemaps.cartocdn.com/dark_all/{z}/{x}/{y}.png", |
| "https://tile.openstreetmap.org/{z}/{x}/{y}.png", |
| ] |
| _TILE_HEADERS = {"User-Agent": "SFTrafficSimEnv/1.0 (rl-research)"} |
|
|
| |
| _ROAD_NETWORK: Optional[Tuple] = None |
| _BASEMAP: Optional[Tuple] = None |
|
|
|
|
| |
| |
| |
|
|
| def _haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: |
| """Return distance in metres between two GPS points.""" |
| R = 6_371_000.0 |
| dlat = math.radians(lat2 - lat1) |
| dlon = math.radians(lon2 - lon1) |
| a = math.sin(dlat / 2) ** 2 + ( |
| math.cos(math.radians(lat1)) |
| * math.cos(math.radians(lat2)) |
| * math.sin(dlon / 2) ** 2 |
| ) |
| return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) |
|
|
|
|
| def _traffic_color(density: float) -> tuple: |
| """Return RGBA tuple with alpha tied to congestion: free-flow nearly transparent.""" |
| if density < 0.20: |
| return (0.00, 0.80, 0.27, 0.22) |
| if density < 0.50: |
| return (1.00, 0.87, 0.00, 0.60) |
| if density < 0.80: |
| return (1.00, 0.53, 0.00, 0.80) |
| return (1.00, 0.13, 0.00, 0.95) |
|
|
|
|
| def _capacity(highway: str, lanes: int) -> int: |
| base = {"motorway": 50, "trunk": 40, "primary": 30, "secondary": 25, |
| "tertiary": 20, "residential": 15}.get(highway, 10) |
| return max(5, round(base * lanes / 2)) |
|
|
|
|
| |
| |
| |
|
|
| def _tile_coords(lat: float, lon: float, zoom: int) -> Tuple[int, int]: |
| n = 2 ** zoom |
| x = int((lon + 180.0) / 360.0 * n) |
| lat_r = math.radians(lat) |
| y = int((1.0 - math.asinh(math.tan(lat_r)) / math.pi) / 2.0 * n) |
| return x, y |
|
|
|
|
| def _tile_nw(tx: int, ty: int, zoom: int) -> Tuple[float, float]: |
| """Return (lat, lon) of the NW corner of tile (tx, ty).""" |
| n = 2 ** zoom |
| lon = tx / n * 360.0 - 180.0 |
| lat = math.degrees(math.atan(math.sinh(math.pi * (1.0 - 2.0 * ty / n)))) |
| return lat, lon |
|
|
|
|
| def _fetch_basemap() -> Tuple: |
| from PIL import Image as PILImage |
| import numpy as np |
|
|
| zoom = _TILE_ZOOM |
| tx_min, ty_min = _tile_coords(BBOX["north"], BBOX["west"], zoom) |
| tx_max, ty_max = _tile_coords(BBOX["south"], BBOX["east"], zoom) |
|
|
| cols = tx_max - tx_min + 1 |
| rows = ty_max - ty_min + 1 |
| canvas = PILImage.new("RGB", (cols * 256, rows * 256), (17, 17, 27)) |
|
|
| for tx in range(tx_min, tx_max + 1): |
| for ty in range(ty_min, ty_max + 1): |
| for url_tmpl in _TILE_URLS: |
| url = url_tmpl.format(z=zoom, x=tx, y=ty) |
| try: |
| r = requests.get(url, headers=_TILE_HEADERS, timeout=15) |
| r.raise_for_status() |
| tile = PILImage.open(io.BytesIO(r.content)).convert("RGB") |
| canvas.paste(tile, ((tx - tx_min) * 256, (ty - ty_min) * 256)) |
| break |
| except Exception: |
| continue |
|
|
| img_arr = np.array(canvas) |
| nw_lat, nw_lon = _tile_nw(tx_min, ty_min, zoom) |
| se_lat, se_lon = _tile_nw(tx_max + 1, ty_max + 1, zoom) |
| |
| extent = [nw_lon, se_lon, se_lat, nw_lat] |
| return img_arr, extent |
|
|
|
|
| def _get_basemap() -> Tuple: |
| global _BASEMAP |
| if _BASEMAP is None: |
| _BASEMAP = _fetch_basemap() |
| return _BASEMAP |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _fetch_road_network() -> Tuple[Dict, List[Dict], Dict]: |
| query = ( |
| "[out:json][timeout:60];\n" |
| "(\n" |
| ' way["highway"~"^(motorway|trunk|primary|secondary|tertiary|residential|unclassified)$"]\n' |
| f" ({BBOX['south']},{BBOX['west']},{BBOX['north']},{BBOX['east']});\n" |
| ");\n" |
| "out body;\n" |
| ">;\n" |
| "out skel qt;\n" |
| ) |
| servers = [ |
| "https://overpass-api.de/api/interpreter", |
| "https://overpass.kumi.systems/api/interpreter", |
| ] |
| data = None |
| for url in servers: |
| try: |
| r = requests.post( |
| url, |
| data={"data": query}, |
| timeout=90, |
| ) |
| r.raise_for_status() |
| data = r.json() |
| break |
| except Exception: |
| continue |
| if data is None: |
| raise RuntimeError("Failed to fetch OSM data from all Overpass servers.") |
|
|
| return _parse_osm(data) |
|
|
|
|
| def _parse_osm(osm_data: Dict) -> Tuple[Dict, List[Dict], Dict]: |
| node_coords: Dict[int, Tuple[float, float]] = {} |
| for el in osm_data["elements"]: |
| if el["type"] == "node": |
| node_coords[el["id"]] = (el["lat"], el["lon"]) |
|
|
| nodes: Dict = {} |
| edges: List[Dict] = [] |
| edge_by_id: Dict[str, Dict] = {} |
| edge_id_counter = 0 |
|
|
| for el in osm_data["elements"]: |
| if el["type"] != "way" or not el.get("nodes") or len(el["nodes"]) < 2: |
| continue |
| tags = el.get("tags", {}) |
| highway = tags.get("highway", "unclassified") |
| oneway = tags.get("oneway") == "yes" |
| lanes = int(tags.get("lanes", 2)) |
| cap = _capacity(highway, lanes) |
|
|
| for i in range(len(el["nodes"]) - 1): |
| s_id, e_id = el["nodes"][i], el["nodes"][i + 1] |
| if s_id not in node_coords or e_id not in node_coords: |
| continue |
| slat, slon = node_coords[s_id] |
| elat, elon = node_coords[e_id] |
| length = _haversine(slat, slon, elat, elon) |
|
|
| eid = f"e{edge_id_counter}" |
| edge_id_counter += 1 |
| edge = { |
| "id": eid, |
| "start_node": s_id, |
| "end_node": e_id, |
| "coords": [(slat, slon), (elat, elon)], |
| "length": length, |
| "capacity": cap, |
| "highway": highway, |
| "oneway": oneway, |
| } |
| edges.append(edge) |
| edge_by_id[eid] = edge |
|
|
| for nid, lat, lon in [(s_id, slat, slon), (e_id, elat, elon)]: |
| if nid not in nodes: |
| nodes[nid] = {"id": nid, "lat": lat, "lon": lon, |
| "out_edges": [], "in_edges": []} |
|
|
| nodes[s_id]["out_edges"].append(eid) |
| nodes[e_id]["in_edges"].append(eid) |
| if not oneway: |
| nodes[e_id]["out_edges"].append(eid) |
| nodes[s_id]["in_edges"].append(eid) |
|
|
| return nodes, edges, edge_by_id |
|
|
|
|
| def _get_road_network() -> Tuple[Dict, List[Dict], Dict]: |
| global _ROAD_NETWORK |
| if _ROAD_NETWORK is None: |
| _ROAD_NETWORK = _fetch_road_network() |
| return _ROAD_NETWORK |
|
|
|
|
| |
| |
| |
|
|
| class _BgAgent: |
| __slots__ = ("edge", "progress", "direction", "speed") |
|
|
| def __init__(self, edges: List[Dict]): |
| self.edge = random.choice(edges) |
| self.progress: float = random.random() |
| self.direction: int = 1 if self.edge["oneway"] else random.choice([1, -1]) |
| self.speed: float = 0.012 + random.random() * 0.018 |
|
|
| def _current_node_id(self): |
| if self.direction == 1: |
| return self.edge["end_node"] |
| return self.edge["start_node"] |
|
|
| def step( |
| self, |
| traffic_count: Dict[str, int], |
| nodes: Dict, |
| edges: List[Dict], |
| edge_by_id: Dict[str, Dict], |
| ): |
| density = traffic_count.get(self.edge["id"], 0) / self.edge["capacity"] |
| |
| speed_factor = max(0.02, 1.0 - density ** 0.7 * 0.98) |
| length_factor = 50.0 / max(20.0, self.edge["length"]) |
| self.progress += self.speed * speed_factor * length_factor |
|
|
| if self.progress >= 1.0: |
| self.progress = 0.0 |
| nid = self._current_node_id() |
| node = nodes.get(nid) |
| if not node: |
| self.edge = random.choice(edges) |
| self.direction = 1 if self.edge["oneway"] else random.choice([1, -1]) |
| return |
|
|
| choices = [ |
| e for eid in node["out_edges"] |
| if (e := edge_by_id.get(eid)) and e["id"] != self.edge["id"] |
| ] |
| if not choices: |
| if not self.edge["oneway"]: |
| self.direction *= -1 |
| else: |
| self.edge = random.choice(edges) |
| self.direction = 1 |
| return |
|
|
| next_edge = random.choice(choices) |
| self.edge = next_edge |
| if next_edge["start_node"] == nid: |
| self.direction = 1 |
| elif not next_edge["oneway"]: |
| self.direction = -1 |
| else: |
| self.direction = 1 |
|
|
| def get_position(self) -> Tuple[float, float]: |
| p = self.progress if self.direction == 1 else 1.0 - self.progress |
| (slat, slon), (elat, elon) = self.edge["coords"] |
| return (slat + (elat - slat) * p, slon + (elon - slon) * p) |
|
|
|
|
| |
| |
| |
|
|
| def _dijkstra( |
| nodes: Dict, edge_by_id: Dict[str, Dict], start_id: int, end_id: int |
| ) -> Tuple[List[str], float]: |
| dist: Dict[int, float] = {start_id: 0.0} |
| prev: Dict[int, Optional[Tuple[int, str]]] = {start_id: None} |
| pq = [(0.0, start_id)] |
|
|
| while pq: |
| d, u = heapq.heappop(pq) |
| if d > dist.get(u, math.inf): |
| continue |
| if u == end_id: |
| break |
| node = nodes.get(u) |
| if not node: |
| continue |
| for eid in node["out_edges"]: |
| edge = edge_by_id.get(eid) |
| if not edge: |
| continue |
| |
| if edge["start_node"] == u: |
| v = edge["end_node"] |
| elif not edge["oneway"]: |
| v = edge["start_node"] |
| else: |
| continue |
| nd = d + edge["length"] |
| if nd < dist.get(v, math.inf): |
| dist[v] = nd |
| prev[v] = (u, eid) |
| heapq.heappush(pq, (nd, v)) |
|
|
| |
| path: List[str] = [] |
| cur = end_id |
| while cur in prev and prev[cur] is not None: |
| parent, eid = prev[cur] |
| path.append(eid) |
| cur = parent |
| path.reverse() |
| return path, dist.get(end_id, math.inf) |
|
|
|
|
| |
| |
| |
|
|
| def _outgoing_edges( |
| node_id: int, |
| nodes: Dict, |
| edge_by_id: Dict, |
| prev_node_id: Optional[int] = None, |
| ) -> List[Dict]: |
| """Return outgoing edges from node_id, excluding U-turns back to prev_node_id. |
| |
| If excluding the U-turn would leave no options, all edges are returned |
| (so the agent can always move). |
| """ |
| node = nodes.get(node_id) |
| if not node: |
| return [] |
| result = [] |
| for eid in node["out_edges"]: |
| e = edge_by_id.get(eid) |
| if not e: |
| continue |
| |
| if e["oneway"] and e["start_node"] != node_id: |
| continue |
| result.append(e) |
| result = sorted(result, key=lambda e: e["id"]) |
|
|
| |
| if prev_node_id is not None and len(result) > 1: |
| def _neighbour(e: Dict) -> int: |
| return e["end_node"] if e["start_node"] == node_id else e["start_node"] |
| filtered = [e for e in result if _neighbour(e) != prev_node_id] |
| if filtered: |
| return filtered |
|
|
| return result |
|
|
|
|
| |
| |
| |
|
|
| class RoadTrafficSimulatorEnvironment(Environment): |
| """ |
| Single-car navigation environment in downtown San Francisco. |
| |
| The agent selects which road segment to follow at each intersection. |
| Background traffic creates dynamic congestion that the agent should avoid. |
| """ |
|
|
| SUPPORTS_CONCURRENT_SESSIONS: bool = True |
|
|
| def __init__(self): |
| self._state = State(episode_id=str(uuid4()), step_count=0) |
|
|
| self.nodes, self.edges, self.edge_by_id = _get_road_network() |
|
|
| |
| self._viable_nodes = [ |
| nid for nid, n in self.nodes.items() |
| if len(_outgoing_edges(nid, self.nodes, self.edge_by_id)) >= 2 |
| ] |
|
|
| self.traffic_count: Dict[str, int] = {e["id"]: 0 for e in self.edges} |
| self.bg_agents: List[_BgAgent] = [] |
|
|
| |
| self.agent_node_id: Optional[int] = None |
| self.prev_node_id: Optional[int] = None |
| self.start_node_id: Optional[int] = None |
| self.end_node_id: Optional[int] = None |
| self.planned_route: List[str] = [] |
| self.prev_dist_to_goal: float = 0.0 |
|
|
| |
| |
| |
|
|
| def reset(self) -> RoadTrafficSimulatorObservation: |
| self._state = State(episode_id=str(uuid4()), step_count=0) |
|
|
| |
| for _ in range(200): |
| s = random.choice(self._viable_nodes) |
| e = random.choice(self._viable_nodes) |
| if s == e: |
| continue |
| sn, en = self.nodes[s], self.nodes[e] |
| if _haversine(sn["lat"], sn["lon"], en["lat"], en["lon"]) >= 400.0: |
| self.start_node_id = s |
| self.end_node_id = e |
| break |
| else: |
| self.start_node_id = self._viable_nodes[0] |
| self.end_node_id = self._viable_nodes[-1] |
|
|
| self.agent_node_id = self.start_node_id |
| self.prev_node_id = None |
|
|
| |
| self.planned_route, _ = _dijkstra( |
| self.nodes, self.edge_by_id, self.start_node_id, self.end_node_id |
| ) |
|
|
| |
| self.bg_agents = [_BgAgent(self.edges) for _ in range(NUM_BG_AGENTS)] |
| self._update_traffic_counts() |
|
|
| sn = self.nodes[self.start_node_id] |
| en = self.nodes[self.end_node_id] |
| self.prev_dist_to_goal = _haversine(sn["lat"], sn["lon"], en["lat"], en["lon"]) |
|
|
| return self._make_observation(reward=0.0, done=False) |
|
|
| def step(self, action: RoadTrafficSimulatorAction) -> RoadTrafficSimulatorObservation: |
| self._state.step_count += 1 |
|
|
| |
| out_edges = _outgoing_edges( |
| self.agent_node_id, self.nodes, self.edge_by_id, self.prev_node_id |
| ) |
| if not out_edges: |
| |
| return self._make_observation(reward=-50.0, done=True) |
|
|
| idx = max(0, min(action.next_edge_index, len(out_edges) - 1)) |
| chosen_edge = out_edges[idx] |
|
|
| |
| density = self.traffic_count.get(chosen_edge["id"], 0) / chosen_edge["capacity"] |
|
|
| |
| self.prev_node_id = self.agent_node_id |
| if chosen_edge["start_node"] == self.agent_node_id: |
| self.agent_node_id = chosen_edge["end_node"] |
| else: |
| self.agent_node_id = chosen_edge["start_node"] |
|
|
| |
| for _ in range(BG_SUBSTEPS_PER_RL_STEP): |
| for ag in self.bg_agents: |
| ag.step(self.traffic_count, self.nodes, self.edges, self.edge_by_id) |
| self._update_traffic_counts() |
|
|
| |
| an = self.nodes[self.agent_node_id] |
| en = self.nodes[self.end_node_id] |
| dist_to_goal = _haversine(an["lat"], an["lon"], en["lat"], en["lon"]) |
|
|
| |
| time_penalty = -(1.0 + density * 4.0) |
| traffic_penalty = 0.0 |
| progress_reward = (self.prev_dist_to_goal - dist_to_goal) * 0.05 |
| self.prev_dist_to_goal = dist_to_goal |
|
|
| reward = time_penalty + traffic_penalty + progress_reward |
|
|
| |
| done = False |
| if dist_to_goal <= GOAL_RADIUS_M: |
| reward += 500.0 |
| done = True |
| elif self._state.step_count >= MAX_RL_STEPS: |
| done = True |
|
|
| return self._make_observation(reward=reward, done=done) |
|
|
| @property |
| def state(self) -> State: |
| return self._state |
|
|
| |
| |
| |
|
|
| def _update_traffic_counts(self): |
| for eid in self.traffic_count: |
| self.traffic_count[eid] = 0 |
| for ag in self.bg_agents: |
| eid = ag.edge["id"] |
| self.traffic_count[eid] = self.traffic_count.get(eid, 0) + 1 |
|
|
| def _make_observation(self, reward: float, done: bool) -> RoadTrafficSimulatorObservation: |
| an = self.nodes[self.agent_node_id] |
| en = self.nodes[self.end_node_id] |
| dist = _haversine(an["lat"], an["lon"], en["lat"], en["lon"]) |
| out_edges = _outgoing_edges( |
| self.agent_node_id, self.nodes, self.edge_by_id, self.prev_node_id |
| ) |
| screenshot = self._render() |
|
|
| return RoadTrafficSimulatorObservation( |
| lat=an["lat"], |
| lon=an["lon"], |
| goal_lat=en["lat"], |
| goal_lon=en["lon"], |
| distance_to_goal=dist, |
| available_actions=max(1, len(out_edges)), |
| map_screenshot=screenshot, |
| done=done, |
| reward=reward, |
| metadata={ |
| "step": self._state.step_count, |
| "episode_id": self._state.episode_id, |
| "start_node": self.start_node_id, |
| "end_node": self.end_node_id, |
| }, |
| ) |
|
|
| def _render(self) -> str: |
| """Render 512Γ512 traffic map and return as base64 PNG string.""" |
| BG = "#0d1117" |
| BORDER = "#111827" |
|
|
| fig, ax = plt.subplots(figsize=(5.12, 5.12), dpi=200) |
| fig.patch.set_facecolor(BG) |
| ax.set_facecolor(BG) |
| ax.set_xlim(BBOX["west"], BBOX["east"]) |
| ax.set_ylim(BBOX["south"], BBOX["north"]) |
| ax.set_aspect("auto") |
| ax.axis("off") |
|
|
| |
| bm_img, bm_extent = _get_basemap() |
| ax.imshow(bm_img, extent=bm_extent, aspect="auto", |
| zorder=0, interpolation="bilinear", alpha=1.0) |
|
|
| |
| by_hw: Dict[str, Dict] = {hw: {"segs": [], "colors": []} for hw in _HW_LAYERS} |
| for edge in self.edges: |
| hw = edge["highway"] if edge["highway"] in _HW_LAYERS else "unclassified" |
| density = self.traffic_count.get(edge["id"], 0) / edge["capacity"] |
| (slat, slon), (elat, elon) = edge["coords"] |
| by_hw[hw]["segs"].append([(slon, slat), (elon, elat)]) |
| by_hw[hw]["colors"].append(_traffic_color(density)) |
|
|
| |
| |
| for z, hw in enumerate(_HW_LAYERS): |
| data = by_hw[hw] |
| if not data["segs"]: |
| continue |
| rlw = _HW_ROAD_LW.get(hw, 0.8) |
| ax.add_collection(LineCollection( |
| data["segs"], colors=data["colors"], linewidths=rlw, zorder=z + 1)) |
|
|
| |
| if self.planned_route: |
| route_segs = [] |
| for eid in self.planned_route: |
| e = self.edge_by_id.get(eid) |
| if e: |
| (slat, slon), (elat, elon) = e["coords"] |
| route_segs.append([(slon, slat), (elon, elat)]) |
| if route_segs: |
| zr = len(_HW_LAYERS) + 2 |
| ax.add_collection(LineCollection( |
| route_segs, colors="#00ccff", linewidths=2.5, |
| linestyles="dashed", alpha=0.95, zorder=zr)) |
|
|
| zm = len(_HW_LAYERS) + 4 |
|
|
| |
| sn = self.nodes[self.start_node_id] |
| ax.plot(sn["lon"], sn["lat"], marker="*", color="#00dd66", |
| markersize=16, zorder=zm, markeredgecolor="white", markeredgewidth=0.8) |
| ax.text(sn["lon"], sn["lat"] + 0.00035, "START", color="white", |
| fontsize=5.5, ha="center", va="bottom", zorder=zm + 1, |
| bbox=dict(boxstyle="round,pad=0.25", fc="#00884a", ec="none", alpha=0.85)) |
|
|
| |
| en = self.nodes[self.end_node_id] |
| ax.plot(en["lon"], en["lat"], marker="*", color="#ff3311", |
| markersize=16, zorder=zm, markeredgecolor="white", markeredgewidth=0.8) |
| ax.text(en["lon"], en["lat"] + 0.00035, "GOAL", color="white", |
| fontsize=5.5, ha="center", va="bottom", zorder=zm + 1, |
| bbox=dict(boxstyle="round,pad=0.25", fc="#cc2200", ec="none", alpha=0.85)) |
|
|
| |
| an = self.nodes[self.agent_node_id] |
| ax.plot(an["lon"], an["lat"], marker="o", color="#3388ff", |
| markersize=9, zorder=zm + 2, |
| markeredgecolor="white", markeredgewidth=1.5) |
| if self.prev_node_id and self.prev_node_id in self.nodes: |
| pn = self.nodes[self.prev_node_id] |
| dx = an["lon"] - pn["lon"] |
| dy = an["lat"] - pn["lat"] |
| length = math.hypot(dx, dy) |
| if length > 1e-9: |
| |
| tail_lon = an["lon"] - dx * 0.6 |
| tail_lat = an["lat"] - dy * 0.6 |
| ax.annotate( |
| "", xy=(an["lon"], an["lat"]), |
| xytext=(tail_lon, tail_lat), |
| arrowprops=dict( |
| arrowstyle="-|>", color="white", lw=1.2, |
| mutation_scale=12), |
| zorder=zm + 3) |
|
|
| |
| dist = _haversine(an["lat"], an["lon"], en["lat"], en["lon"]) |
| hud_style = dict(boxstyle="round,pad=0.3", fc=BG, ec="#334455", alpha=0.88) |
| ax.text(0.01, 0.99, f"Step {self._state.step_count}", |
| transform=ax.transAxes, color="#aaaacc", fontsize=6.5, |
| va="top", ha="left", zorder=zm + 5, bbox=hud_style) |
| ax.text(0.99, 0.99, f"Dist {dist:.0f} m", |
| transform=ax.transAxes, color="#ffdd88", fontsize=6.5, |
| va="top", ha="right", zorder=zm + 5, bbox=hud_style) |
|
|
| |
| from matplotlib.patches import Patch |
| legend_els = [ |
| Patch(fc="#00cc44", ec="none", label="Free flow"), |
| Patch(fc="#ffdd00", ec="none", label="Moderate"), |
| Patch(fc="#ff8800", ec="none", label="Heavy"), |
| Patch(fc="#ff2200", ec="none", label="Jam"), |
| ] |
| ax.legend(handles=legend_els, loc="lower left", fontsize=5.5, |
| facecolor="#131b26", edgecolor="#334455", labelcolor="#cccccc", |
| framealpha=0.88, ncol=2, handlelength=1.2, |
| borderpad=0.5, handletextpad=0.4, columnspacing=0.8) |
|
|
| buf = io.BytesIO() |
| plt.savefig(buf, format="png", dpi=200, |
| bbox_inches="tight", pad_inches=0.02, facecolor=BG) |
| plt.close(fig) |
| buf.seek(0) |
| return base64.b64encode(buf.read()).decode("utf-8") |
|
|