road-traffic-simulator-env / server /road_traffic_simulator_env_environment.py
ykirpichev's picture
Upload folder using huggingface_hub
15c7b0c verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Road Traffic Simulator – RL Environment.
The agent controls a single car navigating from a random start node to a
random end node in downtown San Francisco. 2 000 background vehicles create
realistic traffic that the agent must route around.
Observation
-----------
* lat / lon – current GPS position
* goal_lat/goal_lon – destination GPS position
* distance_to_goal – straight-line metres to goal
* available_actions – number of outgoing edges (action range)
* map_screenshot – 400Γ—400 PNG base64, roads coloured by traffic density
Action
------
* next_edge_index – which outgoing edge to follow at the current node
Reward
------
-1.0 per step (time penalty)
-3.0 * edge_density (congestion penalty for traversed edge, density ∈ [0,1])
+progress_reward (metres gained toward goal Γ— 0.05)
+500.0 on reaching goal
"""
import base64
import heapq
import io
import math
import random
from collections import defaultdict
from uuid import uuid4
from typing import Dict, List, Optional, Tuple
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
import requests
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
from models import RoadTrafficSimulatorAction, RoadTrafficSimulatorObservation
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
BBOX = dict(south=37.770, west=-122.430, north=37.800, east=-122.395)
NUM_BG_AGENTS = 5000
BG_SUBSTEPS_PER_RL_STEP = 60 # background traffic advances this many mini-steps per RL step
MAX_RL_STEPS = 300 # episode time limit
GOAL_RADIUS_M = 50.0 # metres – agent "arrives" when within this distance
# Road rendering: draw in ascending z-order so major roads appear on top
_HW_LAYERS = ["unclassified", "residential", "tertiary", "secondary", "primary", "trunk", "motorway"]
_HW_ROAD_LW = {"motorway": 4.0, "trunk": 3.4, "primary": 2.8, "secondary": 2.2,
"tertiary": 1.6, "residential": 1.1, "unclassified": 0.8}
_HW_OUTLINE_LW= {"motorway": 6.2, "trunk": 5.4, "primary": 4.6, "secondary": 3.6,
"tertiary": 2.7, "residential": 1.9, "unclassified": 1.4}
# Map tile settings
_TILE_ZOOM = 16
_TILE_URLS = [
"https://basemaps.cartocdn.com/dark_all/{z}/{x}/{y}.png",
"https://tile.openstreetmap.org/{z}/{x}/{y}.png",
]
_TILE_HEADERS = {"User-Agent": "SFTrafficSimEnv/1.0 (rl-research)"}
# Module-level caches
_ROAD_NETWORK: Optional[Tuple] = None
_BASEMAP: Optional[Tuple] = None # (np.ndarray, [left_lon, right_lon, bottom_lat, top_lat])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Return distance in metres between two GPS points."""
R = 6_371_000.0
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + (
math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(dlon / 2) ** 2
)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _traffic_color(density: float) -> tuple:
"""Return RGBA tuple with alpha tied to congestion: free-flow nearly transparent."""
if density < 0.20:
return (0.00, 0.80, 0.27, 0.22) # green – barely visible, let map show
if density < 0.50:
return (1.00, 0.87, 0.00, 0.60) # yellow – semi-transparent
if density < 0.80:
return (1.00, 0.53, 0.00, 0.80) # orange
return (1.00, 0.13, 0.00, 0.95) # red jam – nearly opaque
def _capacity(highway: str, lanes: int) -> int:
base = {"motorway": 50, "trunk": 40, "primary": 30, "secondary": 25,
"tertiary": 20, "residential": 15}.get(highway, 10)
return max(5, round(base * lanes / 2))
# ---------------------------------------------------------------------------
# Map tile helpers (basemap cached at module level)
# ---------------------------------------------------------------------------
def _tile_coords(lat: float, lon: float, zoom: int) -> Tuple[int, int]:
n = 2 ** zoom
x = int((lon + 180.0) / 360.0 * n)
lat_r = math.radians(lat)
y = int((1.0 - math.asinh(math.tan(lat_r)) / math.pi) / 2.0 * n)
return x, y
def _tile_nw(tx: int, ty: int, zoom: int) -> Tuple[float, float]:
"""Return (lat, lon) of the NW corner of tile (tx, ty)."""
n = 2 ** zoom
lon = tx / n * 360.0 - 180.0
lat = math.degrees(math.atan(math.sinh(math.pi * (1.0 - 2.0 * ty / n))))
return lat, lon
def _fetch_basemap() -> Tuple:
from PIL import Image as PILImage
import numpy as np
zoom = _TILE_ZOOM
tx_min, ty_min = _tile_coords(BBOX["north"], BBOX["west"], zoom)
tx_max, ty_max = _tile_coords(BBOX["south"], BBOX["east"], zoom)
cols = tx_max - tx_min + 1
rows = ty_max - ty_min + 1
canvas = PILImage.new("RGB", (cols * 256, rows * 256), (17, 17, 27))
for tx in range(tx_min, tx_max + 1):
for ty in range(ty_min, ty_max + 1):
for url_tmpl in _TILE_URLS:
url = url_tmpl.format(z=zoom, x=tx, y=ty)
try:
r = requests.get(url, headers=_TILE_HEADERS, timeout=15)
r.raise_for_status()
tile = PILImage.open(io.BytesIO(r.content)).convert("RGB")
canvas.paste(tile, ((tx - tx_min) * 256, (ty - ty_min) * 256))
break
except Exception:
continue
img_arr = np.array(canvas)
nw_lat, nw_lon = _tile_nw(tx_min, ty_min, zoom)
se_lat, se_lon = _tile_nw(tx_max + 1, ty_max + 1, zoom)
# extent for imshow: [left, right, bottom, top]
extent = [nw_lon, se_lon, se_lat, nw_lat]
return img_arr, extent
def _get_basemap() -> Tuple:
global _BASEMAP
if _BASEMAP is None:
_BASEMAP = _fetch_basemap()
return _BASEMAP
# ---------------------------------------------------------------------------
# OSM fetch + parse (cached at module level so only fetched once)
# ---------------------------------------------------------------------------
def _fetch_road_network() -> Tuple[Dict, List[Dict], Dict]:
query = (
"[out:json][timeout:60];\n"
"(\n"
' way["highway"~"^(motorway|trunk|primary|secondary|tertiary|residential|unclassified)$"]\n'
f" ({BBOX['south']},{BBOX['west']},{BBOX['north']},{BBOX['east']});\n"
");\n"
"out body;\n"
">;\n"
"out skel qt;\n"
)
servers = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
]
data = None
for url in servers:
try:
r = requests.post(
url,
data={"data": query},
timeout=90,
)
r.raise_for_status()
data = r.json()
break
except Exception:
continue
if data is None:
raise RuntimeError("Failed to fetch OSM data from all Overpass servers.")
return _parse_osm(data)
def _parse_osm(osm_data: Dict) -> Tuple[Dict, List[Dict], Dict]:
node_coords: Dict[int, Tuple[float, float]] = {}
for el in osm_data["elements"]:
if el["type"] == "node":
node_coords[el["id"]] = (el["lat"], el["lon"])
nodes: Dict = {}
edges: List[Dict] = []
edge_by_id: Dict[str, Dict] = {}
edge_id_counter = 0
for el in osm_data["elements"]:
if el["type"] != "way" or not el.get("nodes") or len(el["nodes"]) < 2:
continue
tags = el.get("tags", {})
highway = tags.get("highway", "unclassified")
oneway = tags.get("oneway") == "yes"
lanes = int(tags.get("lanes", 2))
cap = _capacity(highway, lanes)
for i in range(len(el["nodes"]) - 1):
s_id, e_id = el["nodes"][i], el["nodes"][i + 1]
if s_id not in node_coords or e_id not in node_coords:
continue
slat, slon = node_coords[s_id]
elat, elon = node_coords[e_id]
length = _haversine(slat, slon, elat, elon)
eid = f"e{edge_id_counter}"
edge_id_counter += 1
edge = {
"id": eid,
"start_node": s_id,
"end_node": e_id,
"coords": [(slat, slon), (elat, elon)],
"length": length,
"capacity": cap,
"highway": highway,
"oneway": oneway,
}
edges.append(edge)
edge_by_id[eid] = edge
for nid, lat, lon in [(s_id, slat, slon), (e_id, elat, elon)]:
if nid not in nodes:
nodes[nid] = {"id": nid, "lat": lat, "lon": lon,
"out_edges": [], "in_edges": []}
nodes[s_id]["out_edges"].append(eid)
nodes[e_id]["in_edges"].append(eid)
if not oneway:
nodes[e_id]["out_edges"].append(eid)
nodes[s_id]["in_edges"].append(eid)
return nodes, edges, edge_by_id
def _get_road_network() -> Tuple[Dict, List[Dict], Dict]:
global _ROAD_NETWORK
if _ROAD_NETWORK is None:
_ROAD_NETWORK = _fetch_road_network()
return _ROAD_NETWORK
# ---------------------------------------------------------------------------
# Background agent
# ---------------------------------------------------------------------------
class _BgAgent:
__slots__ = ("edge", "progress", "direction", "speed")
def __init__(self, edges: List[Dict]):
self.edge = random.choice(edges)
self.progress: float = random.random()
self.direction: int = 1 if self.edge["oneway"] else random.choice([1, -1])
self.speed: float = 0.012 + random.random() * 0.018
def _current_node_id(self):
if self.direction == 1:
return self.edge["end_node"]
return self.edge["start_node"]
def step(
self,
traffic_count: Dict[str, int],
nodes: Dict,
edges: List[Dict],
edge_by_id: Dict[str, Dict],
):
density = traffic_count.get(self.edge["id"], 0) / self.edge["capacity"]
# Near-standstill in heavy jams (density 1.0 β†’ speed_factor 0.02)
speed_factor = max(0.02, 1.0 - density ** 0.7 * 0.98)
length_factor = 50.0 / max(20.0, self.edge["length"])
self.progress += self.speed * speed_factor * length_factor
if self.progress >= 1.0:
self.progress = 0.0
nid = self._current_node_id()
node = nodes.get(nid)
if not node:
self.edge = random.choice(edges)
self.direction = 1 if self.edge["oneway"] else random.choice([1, -1])
return
choices = [
e for eid in node["out_edges"]
if (e := edge_by_id.get(eid)) and e["id"] != self.edge["id"]
]
if not choices:
if not self.edge["oneway"]:
self.direction *= -1
else:
self.edge = random.choice(edges)
self.direction = 1
return
next_edge = random.choice(choices)
self.edge = next_edge
if next_edge["start_node"] == nid:
self.direction = 1
elif not next_edge["oneway"]:
self.direction = -1
else:
self.direction = 1
def get_position(self) -> Tuple[float, float]:
p = self.progress if self.direction == 1 else 1.0 - self.progress
(slat, slon), (elat, elon) = self.edge["coords"]
return (slat + (elat - slat) * p, slon + (elon - slon) * p)
# ---------------------------------------------------------------------------
# Dijkstra shortest path
# ---------------------------------------------------------------------------
def _dijkstra(
nodes: Dict, edge_by_id: Dict[str, Dict], start_id: int, end_id: int
) -> Tuple[List[str], float]:
dist: Dict[int, float] = {start_id: 0.0}
prev: Dict[int, Optional[Tuple[int, str]]] = {start_id: None}
pq = [(0.0, start_id)]
while pq:
d, u = heapq.heappop(pq)
if d > dist.get(u, math.inf):
continue
if u == end_id:
break
node = nodes.get(u)
if not node:
continue
for eid in node["out_edges"]:
edge = edge_by_id.get(eid)
if not edge:
continue
# Determine the neighbour node
if edge["start_node"] == u:
v = edge["end_node"]
elif not edge["oneway"]:
v = edge["start_node"]
else:
continue
nd = d + edge["length"]
if nd < dist.get(v, math.inf):
dist[v] = nd
prev[v] = (u, eid)
heapq.heappush(pq, (nd, v))
# Reconstruct edge path
path: List[str] = []
cur = end_id
while cur in prev and prev[cur] is not None:
parent, eid = prev[cur]
path.append(eid)
cur = parent
path.reverse()
return path, dist.get(end_id, math.inf)
# ---------------------------------------------------------------------------
# Outgoing edges from a node (respecting one-way)
# ---------------------------------------------------------------------------
def _outgoing_edges(
node_id: int,
nodes: Dict,
edge_by_id: Dict,
prev_node_id: Optional[int] = None,
) -> List[Dict]:
"""Return outgoing edges from node_id, excluding U-turns back to prev_node_id.
If excluding the U-turn would leave no options, all edges are returned
(so the agent can always move).
"""
node = nodes.get(node_id)
if not node:
return []
result = []
for eid in node["out_edges"]:
e = edge_by_id.get(eid)
if not e:
continue
# For oneway edges, must originate at start_node
if e["oneway"] and e["start_node"] != node_id:
continue
result.append(e)
result = sorted(result, key=lambda e: e["id"]) # deterministic order
# Remove U-turn (edge that leads straight back to where we came from)
if prev_node_id is not None and len(result) > 1:
def _neighbour(e: Dict) -> int:
return e["end_node"] if e["start_node"] == node_id else e["start_node"]
filtered = [e for e in result if _neighbour(e) != prev_node_id]
if filtered:
return filtered
return result
# ---------------------------------------------------------------------------
# Environment
# ---------------------------------------------------------------------------
class RoadTrafficSimulatorEnvironment(Environment):
"""
Single-car navigation environment in downtown San Francisco.
The agent selects which road segment to follow at each intersection.
Background traffic creates dynamic congestion that the agent should avoid.
"""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
self._state = State(episode_id=str(uuid4()), step_count=0)
self.nodes, self.edges, self.edge_by_id = _get_road_network()
# Nodes with β‰₯2 outgoing edges so the agent always has a real choice
self._viable_nodes = [
nid for nid, n in self.nodes.items()
if len(_outgoing_edges(nid, self.nodes, self.edge_by_id)) >= 2
]
self.traffic_count: Dict[str, int] = {e["id"]: 0 for e in self.edges}
self.bg_agents: List[_BgAgent] = []
# Agent car state
self.agent_node_id: Optional[int] = None
self.prev_node_id: Optional[int] = None # for U-turn prevention
self.start_node_id: Optional[int] = None
self.end_node_id: Optional[int] = None
self.planned_route: List[str] = [] # edge ids of shortest path
self.prev_dist_to_goal: float = 0.0
# ------------------------------------------------------------------
# OpenEnv interface
# ------------------------------------------------------------------
def reset(self) -> RoadTrafficSimulatorObservation:
self._state = State(episode_id=str(uuid4()), step_count=0)
# Pick start and end nodes that are at least 400 m apart
for _ in range(200):
s = random.choice(self._viable_nodes)
e = random.choice(self._viable_nodes)
if s == e:
continue
sn, en = self.nodes[s], self.nodes[e]
if _haversine(sn["lat"], sn["lon"], en["lat"], en["lon"]) >= 400.0:
self.start_node_id = s
self.end_node_id = e
break
else:
self.start_node_id = self._viable_nodes[0]
self.end_node_id = self._viable_nodes[-1]
self.agent_node_id = self.start_node_id
self.prev_node_id = None
# Pre-compute shortest path
self.planned_route, _ = _dijkstra(
self.nodes, self.edge_by_id, self.start_node_id, self.end_node_id
)
# Initialise background traffic
self.bg_agents = [_BgAgent(self.edges) for _ in range(NUM_BG_AGENTS)]
self._update_traffic_counts()
sn = self.nodes[self.start_node_id]
en = self.nodes[self.end_node_id]
self.prev_dist_to_goal = _haversine(sn["lat"], sn["lon"], en["lat"], en["lon"])
return self._make_observation(reward=0.0, done=False)
def step(self, action: RoadTrafficSimulatorAction) -> RoadTrafficSimulatorObservation:
self._state.step_count += 1
# ---- Agent moves ----
out_edges = _outgoing_edges(
self.agent_node_id, self.nodes, self.edge_by_id, self.prev_node_id
)
if not out_edges:
# True dead end – episode over
return self._make_observation(reward=-50.0, done=True)
idx = max(0, min(action.next_edge_index, len(out_edges) - 1))
chosen_edge = out_edges[idx]
# Edge density at time of traversal
density = self.traffic_count.get(chosen_edge["id"], 0) / chosen_edge["capacity"]
# Move agent to next node
self.prev_node_id = self.agent_node_id
if chosen_edge["start_node"] == self.agent_node_id:
self.agent_node_id = chosen_edge["end_node"]
else:
self.agent_node_id = chosen_edge["start_node"]
# ---- Advance background traffic ----
for _ in range(BG_SUBSTEPS_PER_RL_STEP):
for ag in self.bg_agents:
ag.step(self.traffic_count, self.nodes, self.edges, self.edge_by_id)
self._update_traffic_counts()
# ---- Reward ----
an = self.nodes[self.agent_node_id]
en = self.nodes[self.end_node_id]
dist_to_goal = _haversine(an["lat"], an["lon"], en["lat"], en["lon"])
# Time cost scales with congestion: congested edge takes up to 5Γ— longer
time_penalty = -(1.0 + density * 4.0)
traffic_penalty = 0.0 # absorbed into time_penalty
progress_reward = (self.prev_dist_to_goal - dist_to_goal) * 0.05
self.prev_dist_to_goal = dist_to_goal
reward = time_penalty + traffic_penalty + progress_reward
# ---- Termination ----
done = False
if dist_to_goal <= GOAL_RADIUS_M:
reward += 500.0
done = True
elif self._state.step_count >= MAX_RL_STEPS:
done = True
return self._make_observation(reward=reward, done=done)
@property
def state(self) -> State:
return self._state
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _update_traffic_counts(self):
for eid in self.traffic_count:
self.traffic_count[eid] = 0
for ag in self.bg_agents:
eid = ag.edge["id"]
self.traffic_count[eid] = self.traffic_count.get(eid, 0) + 1
def _make_observation(self, reward: float, done: bool) -> RoadTrafficSimulatorObservation:
an = self.nodes[self.agent_node_id]
en = self.nodes[self.end_node_id]
dist = _haversine(an["lat"], an["lon"], en["lat"], en["lon"])
out_edges = _outgoing_edges(
self.agent_node_id, self.nodes, self.edge_by_id, self.prev_node_id
)
screenshot = self._render()
return RoadTrafficSimulatorObservation(
lat=an["lat"],
lon=an["lon"],
goal_lat=en["lat"],
goal_lon=en["lon"],
distance_to_goal=dist,
available_actions=max(1, len(out_edges)),
map_screenshot=screenshot,
done=done,
reward=reward,
metadata={
"step": self._state.step_count,
"episode_id": self._state.episode_id,
"start_node": self.start_node_id,
"end_node": self.end_node_id,
},
)
def _render(self) -> str:
"""Render 512Γ—512 traffic map and return as base64 PNG string."""
BG = "#0d1117"
BORDER = "#111827"
fig, ax = plt.subplots(figsize=(5.12, 5.12), dpi=200)
fig.patch.set_facecolor(BG)
ax.set_facecolor(BG)
ax.set_xlim(BBOX["west"], BBOX["east"])
ax.set_ylim(BBOX["south"], BBOX["north"])
ax.set_aspect("auto")
ax.axis("off")
# --- Real SF map tiles as background ---
bm_img, bm_extent = _get_basemap()
ax.imshow(bm_img, extent=bm_extent, aspect="auto",
zorder=0, interpolation="bilinear", alpha=1.0)
# --- Group edges by highway type, compute traffic colour per segment ---
by_hw: Dict[str, Dict] = {hw: {"segs": [], "colors": []} for hw in _HW_LAYERS}
for edge in self.edges:
hw = edge["highway"] if edge["highway"] in _HW_LAYERS else "unclassified"
density = self.traffic_count.get(edge["id"], 0) / edge["capacity"]
(slat, slon), (elat, elon) = edge["coords"]
by_hw[hw]["segs"].append([(slon, slat), (elon, elat)])
by_hw[hw]["colors"].append(_traffic_color(density))
# Single-pass draw with RGBA colors (alpha encodes congestion level).
# No dark outline pass – we want the basemap to show through clearly.
for z, hw in enumerate(_HW_LAYERS):
data = by_hw[hw]
if not data["segs"]:
continue
rlw = _HW_ROAD_LW.get(hw, 0.8)
ax.add_collection(LineCollection(
data["segs"], colors=data["colors"], linewidths=rlw, zorder=z + 1))
# --- Planned route (bright cyan dashed, on top of roads) ---
if self.planned_route:
route_segs = []
for eid in self.planned_route:
e = self.edge_by_id.get(eid)
if e:
(slat, slon), (elat, elon) = e["coords"]
route_segs.append([(slon, slat), (elon, elat)])
if route_segs:
zr = len(_HW_LAYERS) + 2
ax.add_collection(LineCollection(
route_segs, colors="#00ccff", linewidths=2.5,
linestyles="dashed", alpha=0.95, zorder=zr))
zm = len(_HW_LAYERS) + 4 # marker z-base
# --- Start marker ---
sn = self.nodes[self.start_node_id]
ax.plot(sn["lon"], sn["lat"], marker="*", color="#00dd66",
markersize=16, zorder=zm, markeredgecolor="white", markeredgewidth=0.8)
ax.text(sn["lon"], sn["lat"] + 0.00035, "START", color="white",
fontsize=5.5, ha="center", va="bottom", zorder=zm + 1,
bbox=dict(boxstyle="round,pad=0.25", fc="#00884a", ec="none", alpha=0.85))
# --- Goal marker ---
en = self.nodes[self.end_node_id]
ax.plot(en["lon"], en["lat"], marker="*", color="#ff3311",
markersize=16, zorder=zm, markeredgecolor="white", markeredgewidth=0.8)
ax.text(en["lon"], en["lat"] + 0.00035, "GOAL", color="white",
fontsize=5.5, ha="center", va="bottom", zorder=zm + 1,
bbox=dict(boxstyle="round,pad=0.25", fc="#cc2200", ec="none", alpha=0.85))
# --- Agent car with direction arrow ---
an = self.nodes[self.agent_node_id]
ax.plot(an["lon"], an["lat"], marker="o", color="#3388ff",
markersize=9, zorder=zm + 2,
markeredgecolor="white", markeredgewidth=1.5)
if self.prev_node_id and self.prev_node_id in self.nodes:
pn = self.nodes[self.prev_node_id]
dx = an["lon"] - pn["lon"]
dy = an["lat"] - pn["lat"]
length = math.hypot(dx, dy)
if length > 1e-9:
# Arrow tail starts 60 % of the way back from the agent
tail_lon = an["lon"] - dx * 0.6
tail_lat = an["lat"] - dy * 0.6
ax.annotate(
"", xy=(an["lon"], an["lat"]),
xytext=(tail_lon, tail_lat),
arrowprops=dict(
arrowstyle="-|>", color="white", lw=1.2,
mutation_scale=12),
zorder=zm + 3)
# --- HUD: step count + distance ---
dist = _haversine(an["lat"], an["lon"], en["lat"], en["lon"])
hud_style = dict(boxstyle="round,pad=0.3", fc=BG, ec="#334455", alpha=0.88)
ax.text(0.01, 0.99, f"Step {self._state.step_count}",
transform=ax.transAxes, color="#aaaacc", fontsize=6.5,
va="top", ha="left", zorder=zm + 5, bbox=hud_style)
ax.text(0.99, 0.99, f"Dist {dist:.0f} m",
transform=ax.transAxes, color="#ffdd88", fontsize=6.5,
va="top", ha="right", zorder=zm + 5, bbox=hud_style)
# --- Legend ---
from matplotlib.patches import Patch
legend_els = [
Patch(fc="#00cc44", ec="none", label="Free flow"),
Patch(fc="#ffdd00", ec="none", label="Moderate"),
Patch(fc="#ff8800", ec="none", label="Heavy"),
Patch(fc="#ff2200", ec="none", label="Jam"),
]
ax.legend(handles=legend_els, loc="lower left", fontsize=5.5,
facecolor="#131b26", edgecolor="#334455", labelcolor="#cccccc",
framealpha=0.88, ncol=2, handlelength=1.2,
borderpad=0.5, handletextpad=0.4, columnspacing=0.8)
buf = io.BytesIO()
plt.savefig(buf, format="png", dpi=200,
bbox_inches="tight", pad_inches=0.02, facecolor=BG)
plt.close(fig)
buf.seek(0)
return base64.b64encode(buf.read()).decode("utf-8")