Spaces:
Running on Zero
Running on Zero
File size: 7,998 Bytes
41e0c9e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | """Client for the 511 SF Bay Area Open Data API (transit + traffic).
Docs: https://511.org/open-data/transit and the Open511 traffic spec.
Auth: an api_key query param (set TRANSIT_511_API_KEY in the environment).
Notes learned from the live API:
- Every response is UTF-8 with a BOM -> decode with 'utf-8-sig'.
- Transit payloads are SIRI-shaped; traffic payloads are Open511.
- There is NO trip-planner endpoint. "Fastest route" is therefore *reasoned*
from real-time departures + scheduled estimates + live traffic events.
"""
from __future__ import annotations
import datetime as dt
import difflib
import math
import os
import re
import time
from functools import lru_cache
import requests
BASE = "http://api.511.org"
# Operators we resolve place names against (regional rail spine of the Bay Area).
RESOLVE_OPERATORS = ["BA", "CT"] # BART, Caltrain
OPERATOR_NAMES = {"BA": "BART", "CT": "Caltrain", "AC": "AC Transit",
"SF": "Muni", "SM": "SamTrans", "SC": "VTA", "GG": "Golden Gate"}
class Transit511Error(RuntimeError):
pass
def _key() -> str:
k = os.environ.get("TRANSIT_511_API_KEY", "").strip()
if not k:
raise Transit511Error(
"TRANSIT_511_API_KEY is not set. Put your 511.org API key in the "
"environment (see .env.example).")
return k
def _get(path: str, params: dict, timeout=30):
params = {**params, "api_key": _key(), "format": "json"}
r = requests.get(f"{BASE}{path}", params=params, timeout=timeout)
if r.status_code == 401:
raise Transit511Error("511 rejected the API key (401).")
if r.status_code == 429:
raise Transit511Error("511 rate limit hit (429). Wait a bit.")
if r.status_code >= 400:
raise Transit511Error(f"511 error {r.status_code}: {r.text[:200]}")
text = r.content.decode("utf-8-sig") # strip BOM
import json
return json.loads(text), r.url
# --------------------------------------------------------------------------- #
def haversine_km(lat1, lon1, lat2, lon2):
p1, p2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dl = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dl / 2) ** 2
return 2 * 6371.0 * math.asin(math.sqrt(a))
def _mins_until(iso_str):
if not iso_str:
return None
try:
t = dt.datetime.fromisoformat(str(iso_str).replace("Z", "+00:00"))
return max(0, round((t - dt.datetime.now(dt.timezone.utc)).total_seconds() / 60))
except Exception:
return None
# ---- stops index (cached) -------------------------------------------------- #
@lru_cache(maxsize=8)
def _operator_stops(operator_id: str):
data, _ = _get("/transit/stops", {"operator_id": operator_id})
try:
pts = data["Contents"]["dataObjects"]["ScheduledStopPoint"]
except Exception:
return []
out = []
for p in pts:
loc = p.get("Location", {}) or {}
try:
lat = float(loc.get("Latitude")); lon = float(loc.get("Longitude"))
except (TypeError, ValueError):
continue
out.append({"operator": operator_id, "code": str(p.get("id")),
"name": p.get("Name", ""), "lat": lat, "lon": lon})
return out
def _stop_index():
idx = []
for op in RESOLVE_OPERATORS:
try:
idx.extend(_operator_stops(op))
except Transit511Error:
continue
return idx
# Common shorthands -> the string we actually fuzzy-match against stop names.
ALIASES = {
"sfo": "san francisco international airport",
"sf airport": "san francisco international airport",
"airport": "san francisco international airport",
"sf": "embarcadero", "the city": "embarcadero", "downtown sf": "embarcadero",
"san francisco": "embarcadero", "downtown": "embarcadero",
"oakland": "12th street", "downtown oakland": "12th street",
"san jose": "san jose diridon", "sj": "san jose diridon",
"berkeley": "downtown berkeley",
}
def resolve_place(text: str):
"""Fuzzy-match a free-text place to the best regional-rail stop.
Returns a stop dict {operator, code, name, lat, lon, score} or None.
"""
if not text:
return None
q = text.strip().lower()
q = ALIASES.get(q, q)
idx = _stop_index()
if not idx:
return None
best, best_score = None, 0.0
for s in idx:
name = s["name"].lower()
if q in name or name in q:
score = 0.92 + 0.08 * (len(q) / max(len(name), 1))
else:
score = difflib.SequenceMatcher(None, q, name).ratio()
# token overlap bonus
qt, nt = set(q.split()), set(name.replace("/", " ").split())
if qt & nt:
score = max(score, 0.6 + 0.1 * len(qt & nt))
if score > best_score:
best, best_score = s, score
if best and best_score >= 0.45:
return {**best, "score": round(best_score, 2)}
return None
# ---- real-time departures -------------------------------------------------- #
def departures(agency: str, stop_code: str, limit=6):
data, url = _get("/transit/StopMonitoring",
{"agency": agency, "stopCode": stop_code})
try:
visits = data["ServiceDelivery"]["StopMonitoringDelivery"]["MonitoredStopVisit"]
except Exception:
visits = []
rows = []
for v in visits[:limit]:
j = v.get("MonitoredVehicleJourney", {}) or {}
call = j.get("MonitoredCall", {}) or {}
exp = call.get("ExpectedDepartureTime") or call.get("AimedDepartureTime")
rows.append({
"line": j.get("LineRef") or j.get("PublishedLineName"),
"destination": j.get("DestinationName"),
"aimed": call.get("AimedDepartureTime"),
"expected": call.get("ExpectedDepartureTime"),
"minutes": _mins_until(exp),
})
rows.sort(key=lambda r: (r["minutes"] is None, r["minutes"] or 0))
return rows, url
def station_departures(operator: str, name: str, limit=8):
"""Aggregate real-time departures across ALL platforms of a named station
(a station often has separate codes per direction)."""
codes = [s["code"] for s in _operator_stops(operator) if s["name"] == name]
merged, last_url = [], None
for c in codes:
try:
d, last_url = departures(operator, c, limit=limit)
merged.extend(d)
except Transit511Error:
continue
seen, out = set(), []
for d in sorted(merged, key=lambda r: (r["minutes"] is None, r["minutes"] or 0)):
k = (d["line"], d["destination"], d["expected"])
if k in seen:
continue
seen.add(k)
out.append(d)
return out[:limit], last_url
# ---- traffic events (Open511) --------------------------------------------- #
def traffic_events(area_query: str | None = None, limit=12):
data, url = _get("/traffic/events", {})
evs = data.get("events", data if isinstance(data, list) else [])
out = []
for e in evs:
roads = ", ".join(r.get("name", "") for r in (e.get("roads") or []) if r.get("name"))
areas = ", ".join(a.get("name", "") for a in (e.get("areas") or []) if a.get("name"))
headline = e.get("headline") or ""
blob = f"{headline} {roads} {areas}"
if area_query and re.sub(r"[^a-z0-9]", "", area_query.lower()) \
not in re.sub(r"[^a-z0-9]", "", blob.lower()):
continue
geo = e.get("geography") or {}
coords = geo.get("coordinates")
lat = lon = None
if isinstance(coords, list) and coords and isinstance(coords[0], (int, float)):
lon, lat = coords[0], coords[1]
out.append({
"type": e.get("event_type"), "severity": e.get("severity"),
"headline": headline, "roads": roads, "areas": areas,
"lat": lat, "lon": lon,
})
return out[:limit], url
|