"""Client for the 511 SF Bay Area Open Data API (transit + traffic). Docs: https://511.org/open-data/transit and the Open511 traffic spec. Auth: an api_key query param (set TRANSIT_511_API_KEY in the environment). Notes learned from the live API: - Every response is UTF-8 with a BOM -> decode with 'utf-8-sig'. - Transit payloads are SIRI-shaped; traffic payloads are Open511. - There is NO trip-planner endpoint. "Fastest route" is therefore *reasoned* from real-time departures + scheduled estimates + live traffic events. """ from __future__ import annotations import datetime as dt import difflib import math import os import re import time from functools import lru_cache import requests BASE = "http://api.511.org" # Operators we resolve place names against (regional rail spine of the Bay Area). RESOLVE_OPERATORS = ["BA", "CT"] # BART, Caltrain OPERATOR_NAMES = {"BA": "BART", "CT": "Caltrain", "AC": "AC Transit", "SF": "Muni", "SM": "SamTrans", "SC": "VTA", "GG": "Golden Gate"} class Transit511Error(RuntimeError): pass def _key() -> str: k = os.environ.get("TRANSIT_511_API_KEY", "").strip() if not k: raise Transit511Error( "TRANSIT_511_API_KEY is not set. Put your 511.org API key in the " "environment (see .env.example).") return k def _get(path: str, params: dict, timeout=30): params = {**params, "api_key": _key(), "format": "json"} r = requests.get(f"{BASE}{path}", params=params, timeout=timeout) if r.status_code == 401: raise Transit511Error("511 rejected the API key (401).") if r.status_code == 429: raise Transit511Error("511 rate limit hit (429). Wait a bit.") if r.status_code >= 400: raise Transit511Error(f"511 error {r.status_code}: {r.text[:200]}") text = r.content.decode("utf-8-sig") # strip BOM import json return json.loads(text), r.url # --------------------------------------------------------------------------- # def haversine_km(lat1, lon1, lat2, lon2): p1, p2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dl = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dl / 2) ** 2 return 2 * 6371.0 * math.asin(math.sqrt(a)) def _mins_until(iso_str): if not iso_str: return None try: t = dt.datetime.fromisoformat(str(iso_str).replace("Z", "+00:00")) return max(0, round((t - dt.datetime.now(dt.timezone.utc)).total_seconds() / 60)) except Exception: return None # ---- stops index (cached) -------------------------------------------------- # @lru_cache(maxsize=8) def _operator_stops(operator_id: str): data, _ = _get("/transit/stops", {"operator_id": operator_id}) try: pts = data["Contents"]["dataObjects"]["ScheduledStopPoint"] except Exception: return [] out = [] for p in pts: loc = p.get("Location", {}) or {} try: lat = float(loc.get("Latitude")); lon = float(loc.get("Longitude")) except (TypeError, ValueError): continue out.append({"operator": operator_id, "code": str(p.get("id")), "name": p.get("Name", ""), "lat": lat, "lon": lon}) return out def _stop_index(): idx = [] for op in RESOLVE_OPERATORS: try: idx.extend(_operator_stops(op)) except Transit511Error: continue return idx # Common shorthands -> the string we actually fuzzy-match against stop names. ALIASES = { "sfo": "san francisco international airport", "sf airport": "san francisco international airport", "airport": "san francisco international airport", "sf": "embarcadero", "the city": "embarcadero", "downtown sf": "embarcadero", "san francisco": "embarcadero", "downtown": "embarcadero", "oakland": "12th street", "downtown oakland": "12th street", "san jose": "san jose diridon", "sj": "san jose diridon", "berkeley": "downtown berkeley", } def resolve_place(text: str): """Fuzzy-match a free-text place to the best regional-rail stop. Returns a stop dict {operator, code, name, lat, lon, score} or None. """ if not text: return None q = text.strip().lower() q = ALIASES.get(q, q) idx = _stop_index() if not idx: return None best, best_score = None, 0.0 for s in idx: name = s["name"].lower() if q in name or name in q: score = 0.92 + 0.08 * (len(q) / max(len(name), 1)) else: score = difflib.SequenceMatcher(None, q, name).ratio() # token overlap bonus qt, nt = set(q.split()), set(name.replace("/", " ").split()) if qt & nt: score = max(score, 0.6 + 0.1 * len(qt & nt)) if score > best_score: best, best_score = s, score if best and best_score >= 0.45: return {**best, "score": round(best_score, 2)} return None # ---- real-time departures -------------------------------------------------- # def departures(agency: str, stop_code: str, limit=6): data, url = _get("/transit/StopMonitoring", {"agency": agency, "stopCode": stop_code}) try: visits = data["ServiceDelivery"]["StopMonitoringDelivery"]["MonitoredStopVisit"] except Exception: visits = [] rows = [] for v in visits[:limit]: j = v.get("MonitoredVehicleJourney", {}) or {} call = j.get("MonitoredCall", {}) or {} exp = call.get("ExpectedDepartureTime") or call.get("AimedDepartureTime") rows.append({ "line": j.get("LineRef") or j.get("PublishedLineName"), "destination": j.get("DestinationName"), "aimed": call.get("AimedDepartureTime"), "expected": call.get("ExpectedDepartureTime"), "minutes": _mins_until(exp), }) rows.sort(key=lambda r: (r["minutes"] is None, r["minutes"] or 0)) return rows, url def station_departures(operator: str, name: str, limit=8): """Aggregate real-time departures across ALL platforms of a named station (a station often has separate codes per direction).""" codes = [s["code"] for s in _operator_stops(operator) if s["name"] == name] merged, last_url = [], None for c in codes: try: d, last_url = departures(operator, c, limit=limit) merged.extend(d) except Transit511Error: continue seen, out = set(), [] for d in sorted(merged, key=lambda r: (r["minutes"] is None, r["minutes"] or 0)): k = (d["line"], d["destination"], d["expected"]) if k in seen: continue seen.add(k) out.append(d) return out[:limit], last_url # ---- traffic events (Open511) --------------------------------------------- # def traffic_events(area_query: str | None = None, limit=12): data, url = _get("/traffic/events", {}) evs = data.get("events", data if isinstance(data, list) else []) out = [] for e in evs: roads = ", ".join(r.get("name", "") for r in (e.get("roads") or []) if r.get("name")) areas = ", ".join(a.get("name", "") for a in (e.get("areas") or []) if a.get("name")) headline = e.get("headline") or "" blob = f"{headline} {roads} {areas}" if area_query and re.sub(r"[^a-z0-9]", "", area_query.lower()) \ not in re.sub(r"[^a-z0-9]", "", blob.lower()): continue geo = e.get("geography") or {} coords = geo.get("coordinates") lat = lon = None if isinstance(coords, list) and coords and isinstance(coords[0], (int, float)): lon, lat = coords[0], coords[1] out.append({ "type": e.get("event_type"), "severity": e.get("severity"), "headline": headline, "roads": roads, "areas": areas, "lat": lat, "lon": lon, }) return out[:limit], url