Flight-Transit-Agent / transit.py
Quazim0t0's picture
Upload 34 files
41e0c9e verified
Raw
History Blame Contribute Delete
8 kB
"""Client for the 511 SF Bay Area Open Data API (transit + traffic).
Docs: https://511.org/open-data/transit and the Open511 traffic spec.
Auth: an api_key query param (set TRANSIT_511_API_KEY in the environment).
Notes learned from the live API:
- Every response is UTF-8 with a BOM -> decode with 'utf-8-sig'.
- Transit payloads are SIRI-shaped; traffic payloads are Open511.
- There is NO trip-planner endpoint. "Fastest route" is therefore *reasoned*
from real-time departures + scheduled estimates + live traffic events.
"""
from __future__ import annotations
import datetime as dt
import difflib
import math
import os
import re
import time
from functools import lru_cache
import requests
BASE = "http://api.511.org"
# Operators we resolve place names against (regional rail spine of the Bay Area).
RESOLVE_OPERATORS = ["BA", "CT"] # BART, Caltrain
OPERATOR_NAMES = {"BA": "BART", "CT": "Caltrain", "AC": "AC Transit",
"SF": "Muni", "SM": "SamTrans", "SC": "VTA", "GG": "Golden Gate"}
class Transit511Error(RuntimeError):
pass
def _key() -> str:
k = os.environ.get("TRANSIT_511_API_KEY", "").strip()
if not k:
raise Transit511Error(
"TRANSIT_511_API_KEY is not set. Put your 511.org API key in the "
"environment (see .env.example).")
return k
def _get(path: str, params: dict, timeout=30):
params = {**params, "api_key": _key(), "format": "json"}
r = requests.get(f"{BASE}{path}", params=params, timeout=timeout)
if r.status_code == 401:
raise Transit511Error("511 rejected the API key (401).")
if r.status_code == 429:
raise Transit511Error("511 rate limit hit (429). Wait a bit.")
if r.status_code >= 400:
raise Transit511Error(f"511 error {r.status_code}: {r.text[:200]}")
text = r.content.decode("utf-8-sig") # strip BOM
import json
return json.loads(text), r.url
# --------------------------------------------------------------------------- #
def haversine_km(lat1, lon1, lat2, lon2):
p1, p2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dl = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dl / 2) ** 2
return 2 * 6371.0 * math.asin(math.sqrt(a))
def _mins_until(iso_str):
if not iso_str:
return None
try:
t = dt.datetime.fromisoformat(str(iso_str).replace("Z", "+00:00"))
return max(0, round((t - dt.datetime.now(dt.timezone.utc)).total_seconds() / 60))
except Exception:
return None
# ---- stops index (cached) -------------------------------------------------- #
@lru_cache(maxsize=8)
def _operator_stops(operator_id: str):
data, _ = _get("/transit/stops", {"operator_id": operator_id})
try:
pts = data["Contents"]["dataObjects"]["ScheduledStopPoint"]
except Exception:
return []
out = []
for p in pts:
loc = p.get("Location", {}) or {}
try:
lat = float(loc.get("Latitude")); lon = float(loc.get("Longitude"))
except (TypeError, ValueError):
continue
out.append({"operator": operator_id, "code": str(p.get("id")),
"name": p.get("Name", ""), "lat": lat, "lon": lon})
return out
def _stop_index():
idx = []
for op in RESOLVE_OPERATORS:
try:
idx.extend(_operator_stops(op))
except Transit511Error:
continue
return idx
# Common shorthands -> the string we actually fuzzy-match against stop names.
ALIASES = {
"sfo": "san francisco international airport",
"sf airport": "san francisco international airport",
"airport": "san francisco international airport",
"sf": "embarcadero", "the city": "embarcadero", "downtown sf": "embarcadero",
"san francisco": "embarcadero", "downtown": "embarcadero",
"oakland": "12th street", "downtown oakland": "12th street",
"san jose": "san jose diridon", "sj": "san jose diridon",
"berkeley": "downtown berkeley",
}
def resolve_place(text: str):
"""Fuzzy-match a free-text place to the best regional-rail stop.
Returns a stop dict {operator, code, name, lat, lon, score} or None.
"""
if not text:
return None
q = text.strip().lower()
q = ALIASES.get(q, q)
idx = _stop_index()
if not idx:
return None
best, best_score = None, 0.0
for s in idx:
name = s["name"].lower()
if q in name or name in q:
score = 0.92 + 0.08 * (len(q) / max(len(name), 1))
else:
score = difflib.SequenceMatcher(None, q, name).ratio()
# token overlap bonus
qt, nt = set(q.split()), set(name.replace("/", " ").split())
if qt & nt:
score = max(score, 0.6 + 0.1 * len(qt & nt))
if score > best_score:
best, best_score = s, score
if best and best_score >= 0.45:
return {**best, "score": round(best_score, 2)}
return None
# ---- real-time departures -------------------------------------------------- #
def departures(agency: str, stop_code: str, limit=6):
data, url = _get("/transit/StopMonitoring",
{"agency": agency, "stopCode": stop_code})
try:
visits = data["ServiceDelivery"]["StopMonitoringDelivery"]["MonitoredStopVisit"]
except Exception:
visits = []
rows = []
for v in visits[:limit]:
j = v.get("MonitoredVehicleJourney", {}) or {}
call = j.get("MonitoredCall", {}) or {}
exp = call.get("ExpectedDepartureTime") or call.get("AimedDepartureTime")
rows.append({
"line": j.get("LineRef") or j.get("PublishedLineName"),
"destination": j.get("DestinationName"),
"aimed": call.get("AimedDepartureTime"),
"expected": call.get("ExpectedDepartureTime"),
"minutes": _mins_until(exp),
})
rows.sort(key=lambda r: (r["minutes"] is None, r["minutes"] or 0))
return rows, url
def station_departures(operator: str, name: str, limit=8):
"""Aggregate real-time departures across ALL platforms of a named station
(a station often has separate codes per direction)."""
codes = [s["code"] for s in _operator_stops(operator) if s["name"] == name]
merged, last_url = [], None
for c in codes:
try:
d, last_url = departures(operator, c, limit=limit)
merged.extend(d)
except Transit511Error:
continue
seen, out = set(), []
for d in sorted(merged, key=lambda r: (r["minutes"] is None, r["minutes"] or 0)):
k = (d["line"], d["destination"], d["expected"])
if k in seen:
continue
seen.add(k)
out.append(d)
return out[:limit], last_url
# ---- traffic events (Open511) --------------------------------------------- #
def traffic_events(area_query: str | None = None, limit=12):
data, url = _get("/traffic/events", {})
evs = data.get("events", data if isinstance(data, list) else [])
out = []
for e in evs:
roads = ", ".join(r.get("name", "") for r in (e.get("roads") or []) if r.get("name"))
areas = ", ".join(a.get("name", "") for a in (e.get("areas") or []) if a.get("name"))
headline = e.get("headline") or ""
blob = f"{headline} {roads} {areas}"
if area_query and re.sub(r"[^a-z0-9]", "", area_query.lower()) \
not in re.sub(r"[^a-z0-9]", "", blob.lower()):
continue
geo = e.get("geography") or {}
coords = geo.get("coordinates")
lat = lon = None
if isinstance(coords, list) and coords and isinstance(coords[0], (int, float)):
lon, lat = coords[0], coords[1]
out.append({
"type": e.get("event_type"), "severity": e.get("severity"),
"headline": headline, "roads": roads, "areas": areas,
"lat": lat, "lon": lon,
})
return out[:limit], url