Spaces:
Sleeping
Sleeping
| # backend/src/database.py | |
| import sqlite3 | |
| from pathlib import Path | |
| from datetime import datetime, timezone | |
| DB_PATH = Path(__file__).resolve().parent.parent / "parking.db" | |
| def get_connection() -> sqlite3.Connection: | |
| conn = sqlite3.connect(DB_PATH, timeout=30, check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| # WAL mode: allows concurrent reads alongside writes (avoids "database is locked") | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| # Wait up to 10 s for any lock before raising an error | |
| conn.execute("PRAGMA busy_timeout=10000") | |
| return conn | |
| def init_db() -> None: | |
| """Create tables if they do not already exist.""" | |
| conn = get_connection() | |
| cur = conn.cursor() | |
| cur.executescript(""" | |
| CREATE TABLE IF NOT EXISTS occupancy_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| slot_id TEXT NOT NULL, | |
| status TEXT NOT NULL CHECK(status IN ('occupied', 'empty')), | |
| confidence REAL NOT NULL, | |
| logged_at TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS predictions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| slot_id TEXT NOT NULL, | |
| horizon_minutes INTEGER NOT NULL, | |
| vacancy_prob REAL NOT NULL, | |
| predicted_at TEXT NOT NULL | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_occupancy_slot | |
| ON occupancy_log(slot_id, logged_at); | |
| CREATE INDEX IF NOT EXISTS idx_predictions_slot | |
| ON predictions(slot_id, predicted_at); | |
| """) | |
| conn.commit() | |
| conn.close() | |
| def log_occupancy(records: list[dict]) -> None: | |
| """ | |
| Insert a batch of occupancy readings. | |
| Each record must have: | |
| slot_id : str | |
| status : 'occupied' | 'empty' | |
| confidence : float | |
| """ | |
| now = datetime.now(timezone.utc).isoformat() | |
| rows = [(r["slot_id"], r["status"], r["confidence"], now) for r in records] | |
| conn = get_connection() | |
| conn.executemany( | |
| "INSERT INTO occupancy_log (slot_id, status, confidence, logged_at) " | |
| "VALUES (?, ?, ?, ?)", | |
| rows, | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def get_latest_occupancy() -> list[dict]: | |
| """ | |
| Return the most recent status for every slot. | |
| Uses a subquery to pick the MAX logged_at per slot_id. | |
| """ | |
| conn = get_connection() | |
| rows = conn.execute(""" | |
| SELECT ol.slot_id, ol.status, ol.confidence, ol.logged_at | |
| FROM occupancy_log ol | |
| INNER JOIN ( | |
| SELECT slot_id, MAX(logged_at) AS max_ts | |
| FROM occupancy_log | |
| GROUP BY slot_id | |
| ) latest ON ol.slot_id = latest.slot_id | |
| AND ol.logged_at = latest.max_ts | |
| ORDER BY ol.slot_id | |
| """).fetchall() | |
| conn.close() | |
| return [dict(r) for r in rows] | |
| def get_full_history(limit: int = 500, offset: int = 0) -> list[dict]: | |
| """Return a paginated slice of occupancy_log, ordered by time.""" | |
| conn = get_connection() | |
| rows = conn.execute( | |
| "SELECT slot_id, status, confidence, logged_at " | |
| "FROM occupancy_log " | |
| "ORDER BY logged_at DESC " | |
| "LIMIT ? OFFSET ?", | |
| (limit, offset), | |
| ).fetchall() | |
| conn.close() | |
| return [dict(r) for r in rows] | |
| def save_predictions(records: list[dict]) -> None: | |
| """ | |
| Insert Prophet forecast results. | |
| Each record must have: | |
| slot_id : str | |
| horizon_minutes : int | |
| vacancy_prob : float | |
| """ | |
| now = datetime.now(timezone.utc).isoformat() | |
| rows = [ | |
| (r["slot_id"], r["horizon_minutes"], r["vacancy_prob"], now) | |
| for r in records | |
| ] | |
| conn = get_connection() | |
| conn.executemany( | |
| "INSERT INTO predictions (slot_id, horizon_minutes, vacancy_prob, predicted_at) " | |
| "VALUES (?, ?, ?, ?)", | |
| rows, | |
| ) | |
| conn.commit() | |
| conn.close() | |
| def get_latest_predictions(horizon_minutes: int) -> list[dict]: | |
| """Return the most recent forecast for each slot at the requested horizon.""" | |
| conn = get_connection() | |
| rows = conn.execute(""" | |
| SELECT p.slot_id, p.vacancy_prob, p.predicted_at | |
| FROM predictions p | |
| INNER JOIN ( | |
| SELECT slot_id, MAX(predicted_at) AS max_ts | |
| FROM predictions | |
| WHERE horizon_minutes = ? | |
| GROUP BY slot_id | |
| ) latest ON p.slot_id = latest.slot_id | |
| AND p.predicted_at = latest.max_ts | |
| WHERE p.horizon_minutes = ? | |
| ORDER BY p.slot_id | |
| """, (horizon_minutes, horizon_minutes)).fetchall() | |
| conn.close() | |
| return [dict(r) for r in rows] | |
| def get_analytics_summary() -> dict: | |
| """ | |
| Return aggregated analytics without sending raw rows. | |
| Computes: | |
| - total_readings: total rows in occupancy_log | |
| - avg_occupancy_pct: overall average occupancy percentage | |
| - peak_hour: 0-23 hour with highest average occupancy | |
| - busiest_slot: slot_id with most occupied readings | |
| - hourly_trend: list of {hour, occupied, empty} for last 48 buckets | |
| """ | |
| conn = get_connection() | |
| # Total readings | |
| total = conn.execute("SELECT COUNT(*) AS cnt FROM occupancy_log").fetchone()["cnt"] | |
| # Average occupancy % | |
| avg_row = conn.execute(""" | |
| SELECT ROUND(100.0 * SUM(CASE WHEN status = 'occupied' THEN 1 ELSE 0 END) | |
| / NULLIF(COUNT(*), 0), 1) AS avg_pct | |
| FROM occupancy_log | |
| """).fetchone() | |
| avg_pct = float(avg_row["avg_pct"]) if avg_row["avg_pct"] is not None else 0.0 | |
| # Peak hour (0-23) — SQLite strftime extracts UTC hour | |
| peak_row = conn.execute(""" | |
| SELECT CAST(strftime('%H', logged_at) AS INTEGER) AS hr, | |
| SUM(CASE WHEN status = 'occupied' THEN 1 ELSE 0 END) AS occ_count | |
| FROM occupancy_log | |
| GROUP BY hr | |
| ORDER BY occ_count DESC | |
| LIMIT 1 | |
| """).fetchone() | |
| peak_hour = int(peak_row["hr"]) if peak_row else 0 | |
| # Busiest slot (most occupied readings) | |
| busy_row = conn.execute(""" | |
| SELECT slot_id, COUNT(*) AS cnt | |
| FROM occupancy_log | |
| WHERE status = 'occupied' | |
| GROUP BY slot_id | |
| ORDER BY cnt DESC | |
| LIMIT 1 | |
| """).fetchone() | |
| busiest_slot = busy_row["slot_id"] if busy_row else None | |
| # Hourly trend: last 48 distinct hours bucketed by strftime('%Y-%m-%dT%H:00', logged_at) | |
| trend_rows = conn.execute(""" | |
| SELECT strftime('%Y-%m-%dT%H:00', logged_at) AS hour, | |
| SUM(CASE WHEN status = 'occupied' THEN 1 ELSE 0 END) AS occupied, | |
| SUM(CASE WHEN status = 'empty' THEN 1 ELSE 0 END) AS empty | |
| FROM occupancy_log | |
| GROUP BY hour | |
| ORDER BY hour DESC | |
| LIMIT 48 | |
| """).fetchall() | |
| # Reverse so chronological order (oldest first) | |
| hourly_trend = list(reversed([dict(r) for r in trend_rows])) | |
| conn.close() | |
| return { | |
| "total_readings": total, | |
| "avg_occupancy_pct": avg_pct, | |
| "peak_hour": peak_hour, | |
| "busiest_slot": busiest_slot, | |
| "hourly_trend": hourly_trend, | |
| } |