smart-parking-api / src /database.py
rohanv56's picture
Upload 380 files
2d30bad verified
Raw
History Blame Contribute Delete
7.34 kB
# backend/src/database.py
import sqlite3
from pathlib import Path
from datetime import datetime, timezone
DB_PATH = Path(__file__).resolve().parent.parent / "parking.db"
def get_connection() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, timeout=30, check_same_thread=False)
conn.row_factory = sqlite3.Row
# WAL mode: allows concurrent reads alongside writes (avoids "database is locked")
conn.execute("PRAGMA journal_mode=WAL")
# Wait up to 10 s for any lock before raising an error
conn.execute("PRAGMA busy_timeout=10000")
return conn
def init_db() -> None:
"""Create tables if they do not already exist."""
conn = get_connection()
cur = conn.cursor()
cur.executescript("""
CREATE TABLE IF NOT EXISTS occupancy_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
slot_id TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('occupied', 'empty')),
confidence REAL NOT NULL,
logged_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS predictions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
slot_id TEXT NOT NULL,
horizon_minutes INTEGER NOT NULL,
vacancy_prob REAL NOT NULL,
predicted_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_occupancy_slot
ON occupancy_log(slot_id, logged_at);
CREATE INDEX IF NOT EXISTS idx_predictions_slot
ON predictions(slot_id, predicted_at);
""")
conn.commit()
conn.close()
def log_occupancy(records: list[dict]) -> None:
"""
Insert a batch of occupancy readings.
Each record must have:
slot_id : str
status : 'occupied' | 'empty'
confidence : float
"""
now = datetime.now(timezone.utc).isoformat()
rows = [(r["slot_id"], r["status"], r["confidence"], now) for r in records]
conn = get_connection()
conn.executemany(
"INSERT INTO occupancy_log (slot_id, status, confidence, logged_at) "
"VALUES (?, ?, ?, ?)",
rows,
)
conn.commit()
conn.close()
def get_latest_occupancy() -> list[dict]:
"""
Return the most recent status for every slot.
Uses a subquery to pick the MAX logged_at per slot_id.
"""
conn = get_connection()
rows = conn.execute("""
SELECT ol.slot_id, ol.status, ol.confidence, ol.logged_at
FROM occupancy_log ol
INNER JOIN (
SELECT slot_id, MAX(logged_at) AS max_ts
FROM occupancy_log
GROUP BY slot_id
) latest ON ol.slot_id = latest.slot_id
AND ol.logged_at = latest.max_ts
ORDER BY ol.slot_id
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def get_full_history(limit: int = 500, offset: int = 0) -> list[dict]:
"""Return a paginated slice of occupancy_log, ordered by time."""
conn = get_connection()
rows = conn.execute(
"SELECT slot_id, status, confidence, logged_at "
"FROM occupancy_log "
"ORDER BY logged_at DESC "
"LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
conn.close()
return [dict(r) for r in rows]
def save_predictions(records: list[dict]) -> None:
"""
Insert Prophet forecast results.
Each record must have:
slot_id : str
horizon_minutes : int
vacancy_prob : float
"""
now = datetime.now(timezone.utc).isoformat()
rows = [
(r["slot_id"], r["horizon_minutes"], r["vacancy_prob"], now)
for r in records
]
conn = get_connection()
conn.executemany(
"INSERT INTO predictions (slot_id, horizon_minutes, vacancy_prob, predicted_at) "
"VALUES (?, ?, ?, ?)",
rows,
)
conn.commit()
conn.close()
def get_latest_predictions(horizon_minutes: int) -> list[dict]:
"""Return the most recent forecast for each slot at the requested horizon."""
conn = get_connection()
rows = conn.execute("""
SELECT p.slot_id, p.vacancy_prob, p.predicted_at
FROM predictions p
INNER JOIN (
SELECT slot_id, MAX(predicted_at) AS max_ts
FROM predictions
WHERE horizon_minutes = ?
GROUP BY slot_id
) latest ON p.slot_id = latest.slot_id
AND p.predicted_at = latest.max_ts
WHERE p.horizon_minutes = ?
ORDER BY p.slot_id
""", (horizon_minutes, horizon_minutes)).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_analytics_summary() -> dict:
"""
Return aggregated analytics without sending raw rows.
Computes:
- total_readings: total rows in occupancy_log
- avg_occupancy_pct: overall average occupancy percentage
- peak_hour: 0-23 hour with highest average occupancy
- busiest_slot: slot_id with most occupied readings
- hourly_trend: list of {hour, occupied, empty} for last 48 buckets
"""
conn = get_connection()
# Total readings
total = conn.execute("SELECT COUNT(*) AS cnt FROM occupancy_log").fetchone()["cnt"]
# Average occupancy %
avg_row = conn.execute("""
SELECT ROUND(100.0 * SUM(CASE WHEN status = 'occupied' THEN 1 ELSE 0 END)
/ NULLIF(COUNT(*), 0), 1) AS avg_pct
FROM occupancy_log
""").fetchone()
avg_pct = float(avg_row["avg_pct"]) if avg_row["avg_pct"] is not None else 0.0
# Peak hour (0-23) — SQLite strftime extracts UTC hour
peak_row = conn.execute("""
SELECT CAST(strftime('%H', logged_at) AS INTEGER) AS hr,
SUM(CASE WHEN status = 'occupied' THEN 1 ELSE 0 END) AS occ_count
FROM occupancy_log
GROUP BY hr
ORDER BY occ_count DESC
LIMIT 1
""").fetchone()
peak_hour = int(peak_row["hr"]) if peak_row else 0
# Busiest slot (most occupied readings)
busy_row = conn.execute("""
SELECT slot_id, COUNT(*) AS cnt
FROM occupancy_log
WHERE status = 'occupied'
GROUP BY slot_id
ORDER BY cnt DESC
LIMIT 1
""").fetchone()
busiest_slot = busy_row["slot_id"] if busy_row else None
# Hourly trend: last 48 distinct hours bucketed by strftime('%Y-%m-%dT%H:00', logged_at)
trend_rows = conn.execute("""
SELECT strftime('%Y-%m-%dT%H:00', logged_at) AS hour,
SUM(CASE WHEN status = 'occupied' THEN 1 ELSE 0 END) AS occupied,
SUM(CASE WHEN status = 'empty' THEN 1 ELSE 0 END) AS empty
FROM occupancy_log
GROUP BY hour
ORDER BY hour DESC
LIMIT 48
""").fetchall()
# Reverse so chronological order (oldest first)
hourly_trend = list(reversed([dict(r) for r in trend_rows]))
conn.close()
return {
"total_readings": total,
"avg_occupancy_pct": avg_pct,
"peak_hour": peak_hour,
"busiest_slot": busiest_slot,
"hourly_trend": hourly_trend,
}