Spaces:
Running on Zero
Running on Zero
| """Client for the 511 SF Bay Area Open Data API (transit + traffic). | |
| Docs: https://511.org/open-data/transit and the Open511 traffic spec. | |
| Auth: an api_key query param (set TRANSIT_511_API_KEY in the environment). | |
| Notes learned from the live API: | |
| - Every response is UTF-8 with a BOM -> decode with 'utf-8-sig'. | |
| - Transit payloads are SIRI-shaped; traffic payloads are Open511. | |
| - There is NO trip-planner endpoint. "Fastest route" is therefore *reasoned* | |
| from real-time departures + scheduled estimates + live traffic events. | |
| """ | |
| from __future__ import annotations | |
| import datetime as dt | |
| import difflib | |
| import math | |
| import os | |
| import re | |
| import time | |
| from functools import lru_cache | |
| import requests | |
| BASE = "http://api.511.org" | |
| # Operators we resolve place names against (regional rail spine of the Bay Area). | |
| RESOLVE_OPERATORS = ["BA", "CT"] # BART, Caltrain | |
| OPERATOR_NAMES = {"BA": "BART", "CT": "Caltrain", "AC": "AC Transit", | |
| "SF": "Muni", "SM": "SamTrans", "SC": "VTA", "GG": "Golden Gate"} | |
| class Transit511Error(RuntimeError): | |
| pass | |
| def _key() -> str: | |
| k = os.environ.get("TRANSIT_511_API_KEY", "").strip() | |
| if not k: | |
| raise Transit511Error( | |
| "TRANSIT_511_API_KEY is not set. Put your 511.org API key in the " | |
| "environment (see .env.example).") | |
| return k | |
| def _get(path: str, params: dict, timeout=30): | |
| params = {**params, "api_key": _key(), "format": "json"} | |
| r = requests.get(f"{BASE}{path}", params=params, timeout=timeout) | |
| if r.status_code == 401: | |
| raise Transit511Error("511 rejected the API key (401).") | |
| if r.status_code == 429: | |
| raise Transit511Error("511 rate limit hit (429). Wait a bit.") | |
| if r.status_code >= 400: | |
| raise Transit511Error(f"511 error {r.status_code}: {r.text[:200]}") | |
| text = r.content.decode("utf-8-sig") # strip BOM | |
| import json | |
| return json.loads(text), r.url | |
| # --------------------------------------------------------------------------- # | |
| def haversine_km(lat1, lon1, lat2, lon2): | |
| p1, p2 = math.radians(lat1), math.radians(lat2) | |
| dphi = math.radians(lat2 - lat1) | |
| dl = math.radians(lon2 - lon1) | |
| a = math.sin(dphi / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dl / 2) ** 2 | |
| return 2 * 6371.0 * math.asin(math.sqrt(a)) | |
| def _mins_until(iso_str): | |
| if not iso_str: | |
| return None | |
| try: | |
| t = dt.datetime.fromisoformat(str(iso_str).replace("Z", "+00:00")) | |
| return max(0, round((t - dt.datetime.now(dt.timezone.utc)).total_seconds() / 60)) | |
| except Exception: | |
| return None | |
| # ---- stops index (cached) -------------------------------------------------- # | |
| def _operator_stops(operator_id: str): | |
| data, _ = _get("/transit/stops", {"operator_id": operator_id}) | |
| try: | |
| pts = data["Contents"]["dataObjects"]["ScheduledStopPoint"] | |
| except Exception: | |
| return [] | |
| out = [] | |
| for p in pts: | |
| loc = p.get("Location", {}) or {} | |
| try: | |
| lat = float(loc.get("Latitude")); lon = float(loc.get("Longitude")) | |
| except (TypeError, ValueError): | |
| continue | |
| out.append({"operator": operator_id, "code": str(p.get("id")), | |
| "name": p.get("Name", ""), "lat": lat, "lon": lon}) | |
| return out | |
| def _stop_index(): | |
| idx = [] | |
| for op in RESOLVE_OPERATORS: | |
| try: | |
| idx.extend(_operator_stops(op)) | |
| except Transit511Error: | |
| continue | |
| return idx | |
| # Common shorthands -> the string we actually fuzzy-match against stop names. | |
| ALIASES = { | |
| "sfo": "san francisco international airport", | |
| "sf airport": "san francisco international airport", | |
| "airport": "san francisco international airport", | |
| "sf": "embarcadero", "the city": "embarcadero", "downtown sf": "embarcadero", | |
| "san francisco": "embarcadero", "downtown": "embarcadero", | |
| "oakland": "12th street", "downtown oakland": "12th street", | |
| "san jose": "san jose diridon", "sj": "san jose diridon", | |
| "berkeley": "downtown berkeley", | |
| } | |
| def resolve_place(text: str): | |
| """Fuzzy-match a free-text place to the best regional-rail stop. | |
| Returns a stop dict {operator, code, name, lat, lon, score} or None. | |
| """ | |
| if not text: | |
| return None | |
| q = text.strip().lower() | |
| q = ALIASES.get(q, q) | |
| idx = _stop_index() | |
| if not idx: | |
| return None | |
| best, best_score = None, 0.0 | |
| for s in idx: | |
| name = s["name"].lower() | |
| if q in name or name in q: | |
| score = 0.92 + 0.08 * (len(q) / max(len(name), 1)) | |
| else: | |
| score = difflib.SequenceMatcher(None, q, name).ratio() | |
| # token overlap bonus | |
| qt, nt = set(q.split()), set(name.replace("/", " ").split()) | |
| if qt & nt: | |
| score = max(score, 0.6 + 0.1 * len(qt & nt)) | |
| if score > best_score: | |
| best, best_score = s, score | |
| if best and best_score >= 0.45: | |
| return {**best, "score": round(best_score, 2)} | |
| return None | |
| # ---- real-time departures -------------------------------------------------- # | |
| def departures(agency: str, stop_code: str, limit=6): | |
| data, url = _get("/transit/StopMonitoring", | |
| {"agency": agency, "stopCode": stop_code}) | |
| try: | |
| visits = data["ServiceDelivery"]["StopMonitoringDelivery"]["MonitoredStopVisit"] | |
| except Exception: | |
| visits = [] | |
| rows = [] | |
| for v in visits[:limit]: | |
| j = v.get("MonitoredVehicleJourney", {}) or {} | |
| call = j.get("MonitoredCall", {}) or {} | |
| exp = call.get("ExpectedDepartureTime") or call.get("AimedDepartureTime") | |
| rows.append({ | |
| "line": j.get("LineRef") or j.get("PublishedLineName"), | |
| "destination": j.get("DestinationName"), | |
| "aimed": call.get("AimedDepartureTime"), | |
| "expected": call.get("ExpectedDepartureTime"), | |
| "minutes": _mins_until(exp), | |
| }) | |
| rows.sort(key=lambda r: (r["minutes"] is None, r["minutes"] or 0)) | |
| return rows, url | |
| def station_departures(operator: str, name: str, limit=8): | |
| """Aggregate real-time departures across ALL platforms of a named station | |
| (a station often has separate codes per direction).""" | |
| codes = [s["code"] for s in _operator_stops(operator) if s["name"] == name] | |
| merged, last_url = [], None | |
| for c in codes: | |
| try: | |
| d, last_url = departures(operator, c, limit=limit) | |
| merged.extend(d) | |
| except Transit511Error: | |
| continue | |
| seen, out = set(), [] | |
| for d in sorted(merged, key=lambda r: (r["minutes"] is None, r["minutes"] or 0)): | |
| k = (d["line"], d["destination"], d["expected"]) | |
| if k in seen: | |
| continue | |
| seen.add(k) | |
| out.append(d) | |
| return out[:limit], last_url | |
| # ---- traffic events (Open511) --------------------------------------------- # | |
| def traffic_events(area_query: str | None = None, limit=12): | |
| data, url = _get("/traffic/events", {}) | |
| evs = data.get("events", data if isinstance(data, list) else []) | |
| out = [] | |
| for e in evs: | |
| roads = ", ".join(r.get("name", "") for r in (e.get("roads") or []) if r.get("name")) | |
| areas = ", ".join(a.get("name", "") for a in (e.get("areas") or []) if a.get("name")) | |
| headline = e.get("headline") or "" | |
| blob = f"{headline} {roads} {areas}" | |
| if area_query and re.sub(r"[^a-z0-9]", "", area_query.lower()) \ | |
| not in re.sub(r"[^a-z0-9]", "", blob.lower()): | |
| continue | |
| geo = e.get("geography") or {} | |
| coords = geo.get("coordinates") | |
| lat = lon = None | |
| if isinstance(coords, list) and coords and isinstance(coords[0], (int, float)): | |
| lon, lat = coords[0], coords[1] | |
| out.append({ | |
| "type": e.get("event_type"), "severity": e.get("severity"), | |
| "headline": headline, "roads": roads, "areas": areas, | |
| "lat": lat, "lon": lon, | |
| }) | |
| return out[:limit], url | |