Spaces:
Sleeping
Sleeping
File size: 10,979 Bytes
0170ac5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 | from datetime import datetime # type: ignore
import sys # type: ignore
from pathlib import Path
# Add parent directory to path to allow imports from api/
sys.path.insert(0, str(Path(__file__).parent.parent))
# Add src directory to path to allow imports from same directory
sys.path.insert(0, str(Path(__file__).parent))
from api.bus_cache import AsyncBusCache # type: ignore
from api.utils import hms_to_seconds, get_service_day_start_ts, translate_occupancy # type: ignore
from db_manager import init_db # type: ignore
from dotenv import load_dotenv # type: ignore
from fastapi import FastAPI, HTTPException # type: ignore
from fastapi.middleware.cors import CORSMiddleware # type: ignore
load_dotenv()
ttc_cache = AsyncBusCache(ttl=20)
# Initialize database connection globally
db = init_db()
app = FastAPI(title="WheresMyBus v2.0 API")
# Setup CORS for your React frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, use your actual React URL
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def health_check():
"""Simple health check endpoint"""
return "backend is running"
@app.get("/api/vehicles")
async def get_vehicles():
data = await ttc_cache.get_data()
vehicles = data.get("vehicles", [])
return {
"status": "success",
"count": len(vehicles),
"vehicles": vehicles
}
@app.get("/api/routes")
async def get_all_routes():
"""
Returns a complete list of TTC routes with their display names and colors.
"""
try:
# Run the query against DuckDB
# We handle missing colors by providing defaults (TTC Red: #FF0000)
query = """
SELECT
route_id,
route_short_name,
route_long_name,
COALESCE(route_color, 'FF0000') as route_color,
COALESCE(route_text_color, 'FFFFFF') as route_text_color
FROM routes
ORDER BY
CASE
WHEN CAST(route_short_name AS VARCHAR) ~ '^[0-9]+$' THEN CAST(route_short_name AS INTEGER)
ELSE 999
END,
route_short_name;
"""
results = db.execute(query).fetchall()
# Convert to a clean list of dictionaries
route_list = [
{
"id": r[0],
"number": r[1],
"name": r[2],
"color": f"#{r[3]}",
"text_color": f"#{r[4]}"
}
for r in results
]
return {
"status": "success",
"count": len(route_list),
"routes": route_list
}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.get("/api/routes/{route_id}")
async def get_route_view(route_id: str):
data = await ttc_cache.get_data()
all_buses = data.get("vehicles", [])
route_buses = [v for v in all_buses if v['route'] == route_id]
if not route_buses:
return {"route": route_id, "vehicles": []}
# IMPORTANT: Cast Trip IDs to strings to ensure they match the DB
trip_ids = [str(v['trip_id']) for v in route_buses]
placeholders = ','.join(['?'] * len(trip_ids))
# We use CAST(? AS VARCHAR) to force DuckDB to match strings to strings
query = f"""
SELECT
CAST(st.trip_id AS VARCHAR),
CAST(st.stop_id AS VARCHAR),
st.arrival_time,
t.trip_headsign
FROM stop_times st
JOIN trips t ON CAST(st.trip_id AS VARCHAR) = CAST(t.trip_id AS VARCHAR)
WHERE CAST(st.trip_id AS VARCHAR) IN ({placeholders})
"""
db_rows = db.execute(query, trip_ids).fetchall()
# Check if we got ANYTHING back from the DB
if not db_rows:
print(f"DEBUG: No matches in DB for Trip IDs: {trip_ids[:3]}")
schedule_map = {(r[0], r[1]): r[2] for r in db_rows}
name_map = {r[0]: r[3] for r in db_rows}
service_day_ts = get_service_day_start_ts()
enriched = []
for bus in route_buses:
# Default delay is 0 if no prediction exists
raw_delay_mins = 0
pred_time = bus.get('predicted_time')
stop_id = bus.get('next_stop_id')
if pred_time and stop_id:
sched_hms = schedule_map.get((str(bus['trip_id']), str(stop_id)))
if sched_hms:
# Math: (Reality Unix - Plan Unix) / 60
plan_ts = service_day_ts + hms_to_seconds(sched_hms)
raw_delay_mins = round((pred_time - plan_ts) / 60)
enriched.append({
"number": bus['id'],
"name": name_map.get(str(bus['trip_id']), "Not in Schedule"), # This is the destination
"location": {"lat": bus['lat'], "lon": bus['lon']},
"delay_mins": raw_delay_mins, # Actual integer: 5 = 5m late, -2 = 2m early
"fullness": translate_occupancy(bus['occupancy'])
})
return {
"route": route_id,
"count": len(enriched),
"vehicles": enriched
}
@app.get("/api/vehicles/{vehicle_id}")
async def get_vehicle_view(vehicle_id: str):
# 1. Pull latest from cache
data = await ttc_cache.get_data()
vehicles = data.get("vehicles", [])
# 2. Find this specific bus in the list
bus = next((v for v in vehicles if str(v['id']) == vehicle_id), None)
if not bus:
raise HTTPException(status_code=404, detail="Vehicle not active or not found")
trip_id = str(bus['trip_id'])
next_stop_id = bus.get('next_stop_id')
predicted_time = bus.get('predicted_time')
# 3. Handshake with Database (Cast to VARCHAR to avoid type errors)
# We get the destination name and the specific scheduled arrival time
destination = "Not in Schedule"
delay_mins = 0
if next_stop_id:
query = """
SELECT
t.trip_headsign,
st.arrival_time
FROM trips t
JOIN stop_times st ON CAST(t.trip_id AS VARCHAR) = CAST(st.trip_id AS VARCHAR)
WHERE CAST(t.trip_id AS VARCHAR) = ?
AND CAST(st.stop_id AS VARCHAR) = ?
LIMIT 1
"""
row = db.execute(query, [trip_id, str(next_stop_id)]).fetchone()
if row:
destination = row[0]
scheduled_hms = row[1]
# Math: Reality (Unix Time) - Plan (Service Day + Scheduled Seconds)
if predicted_time:
service_day_ts = get_service_day_start_ts()
plan_ts = service_day_ts + hms_to_seconds(scheduled_hms)
delay_mins = round((predicted_time - plan_ts) / 60)
else:
# If no next_stop_id, try to get destination from trip_id only
query = """
SELECT trip_headsign
FROM trips
WHERE CAST(trip_id AS VARCHAR) = ?
LIMIT 1
"""
row = db.execute(query, [trip_id]).fetchone()
if row:
destination = row[0]
return {
"vehicle_number": vehicle_id,
"route_id": bus['route'],
"name": destination,
"location": {
"lat": bus['lat'],
"lon": bus['lon']
},
"delay_mins": delay_mins,
"fullness": translate_occupancy(bus['occupancy']),
"trip_id": trip_id
}
@app.get("/api/stop/{stop_code}")
async def get_stop_view(stop_code: str):
# 1. Translate Pole Number to Database ID
stop_info = db.execute("SELECT stop_id, stop_name FROM stops WHERE CAST(stop_code AS VARCHAR) = ? LIMIT 1", [str(stop_code)]).fetchone()
if not stop_info:
return {"error": "Stop code not found"}
target_id = str(stop_info[0])
stop_name = stop_info[1]
# 2. Get the Cache structure (dict with vehicles, predictions, alerts)
cached_data = await ttc_cache.get_data()
vehicles_list = cached_data.get("vehicles", [])
predictions = cached_data.get("predictions", {})
# Build vehicles map for quick lookup
vehicles = {str(v['trip_id']): v for v in vehicles_list}
now = datetime.now().timestamp()
two_hours_out = now + 7200
arrivals = []
# 3. Search the FULL itineraries for our target_id
for trip_id, itinerary in predictions.items():
if target_id in itinerary:
pred_time = itinerary[target_id]
# Only include if the bus hasn't passed the stop yet and is within 2 hours
if now <= pred_time <= two_hours_out:
# 4. Handshake with DB for destination and schedule
query = """
SELECT t.trip_headsign, st.arrival_time, r.route_short_name
FROM trips t
JOIN stop_times st ON CAST(t.trip_id AS VARCHAR) = CAST(st.trip_id AS VARCHAR)
JOIN routes r ON t.route_id = r.route_id
WHERE CAST(t.trip_id AS VARCHAR) = ? AND CAST(st.stop_id AS VARCHAR) = ?
LIMIT 1
"""
row = db.execute(query, [trip_id, target_id]).fetchone()
if row:
# Find the actual bus for fullness (if it's on the road)
bus = vehicles.get(trip_id)
plan_ts = get_service_day_start_ts() + hms_to_seconds(row[1])
arrivals.append({
"route": row[2],
"destination": row[0],
"eta_mins": round((pred_time - now) / 60),
"delay_mins": round((pred_time - plan_ts) / 60),
"fullness": translate_occupancy(bus['occupancy']) if bus else "Unknown",
"vehicle_id": bus['id'] if bus else "In Transit"
})
arrivals.sort(key=lambda x: x['eta_mins'])
return {"stop_name": stop_name, "stop_code": stop_code, "arrivals": arrivals}
@app.get("/api/alerts")
async def get_all_alerts():
"""
Returns every active service alert for the entire TTC network.
"""
data = await ttc_cache.get_data()
return {
"timestamp": datetime.now().timestamp(),
"count": len(data["alerts"]),
"alerts": data["alerts"]
}
@app.get("/api/alerts/{route_id}")
async def get_alerts_for_route(route_id: str):
data = await ttc_cache.get_data()
alerts = data.get("alerts", {})
route_alerts = alerts.get(route_id, [])
if not route_alerts:
return {
"route_id": route_id,
"count": 0,
"alerts": "No alerts"
}
return {
"route_id": route_id,
"count": len(route_alerts),
"alerts": route_alerts
}
if __name__ == "__main__":
import uvicorn # type: ignore
# Start the server
uvicorn.run(app, host="0.0.0.0", port=7860) |