Spaces:
Sleeping
Sleeping
| """SQLite persistence layer using aiosqlite. | |
| Stores hourly aggregates of traffic metrics. | |
| In-memory ring buffer holds the last 5 minutes of per-frame metrics. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import time | |
| from collections import deque | |
| from contextlib import asynccontextmanager | |
| from pathlib import Path | |
| from typing import AsyncGenerator | |
| import aiosqlite | |
| from api.schemas import MetricsMessage, HourlyAggregate | |
| # DB file location β overridable via DATABASE_PATH env var | |
| import os as _os | |
| DB_PATH = Path(_os.environ.get("DATABASE_PATH", str(Path(__file__).parent.parent / "data" / "traffic.db"))) | |
| # In-memory ring buffer: last 5 min at 25fps β 7500 frames, cap at 10000 | |
| _RING_BUFFER_MAX = 10_000 | |
| _ring_buffer: deque[MetricsMessage] = deque(maxlen=_RING_BUFFER_MAX) | |
| # ββ Schema ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _CREATE_TABLES = """ | |
| CREATE TABLE IF NOT EXISTS hourly_aggregates ( | |
| hour TEXT PRIMARY KEY, -- ISO-8601 hour: 2024-01-15T14:00:00 | |
| total_count INTEGER NOT NULL DEFAULT 0, | |
| count_per_class TEXT NOT NULL DEFAULT '{}', -- JSON | |
| avg_speed_kmh REAL NOT NULL DEFAULT 0.0, | |
| peak_vehicles INTEGER NOT NULL DEFAULT 0, | |
| alert_count INTEGER NOT NULL DEFAULT 0, | |
| frame_count INTEGER NOT NULL DEFAULT 0 -- for incremental avg | |
| ); | |
| """ | |
| # ββ Connection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_db() -> AsyncGenerator[aiosqlite.Connection, None]: | |
| DB_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| async with aiosqlite.connect(str(DB_PATH)) as db: | |
| db.row_factory = aiosqlite.Row | |
| yield db | |
| async def init_db() -> None: | |
| """Create tables if they don't exist. Called at app startup.""" | |
| async with get_db() as db: | |
| await db.executescript(_CREATE_TABLES) | |
| await db.commit() | |
| # ββ Ring buffer βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def ring_push(metrics: MetricsMessage) -> None: | |
| """Append a metrics snapshot to the in-memory ring buffer.""" | |
| _ring_buffer.append(metrics) | |
| def ring_recent(window_seconds: float = 300.0) -> list[MetricsMessage]: | |
| """Return frames from the last `window_seconds` seconds.""" | |
| cutoff = time.time() - window_seconds | |
| return [m for m in _ring_buffer if m.timestamp >= cutoff] | |
| def ring_clear() -> None: | |
| _ring_buffer.clear() | |
| # ββ Hourly aggregation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _hour_key(ts: float) -> str: | |
| """Format timestamp as ISO-8601 hour string.""" | |
| import datetime | |
| dt = datetime.datetime.utcfromtimestamp(ts).replace(minute=0, second=0, microsecond=0) | |
| return dt.isoformat() | |
| async def upsert_hourly(metrics: MetricsMessage) -> None: | |
| """Incrementally update the hourly aggregate row for this frame's timestamp.""" | |
| hour = _hour_key(metrics.timestamp) | |
| async with get_db() as db: | |
| row = await (await db.execute( | |
| "SELECT * FROM hourly_aggregates WHERE hour = ?", (hour,) | |
| )).fetchone() | |
| if row is None: | |
| count_per_class = json.dumps(metrics.count_per_class) | |
| await db.execute( | |
| """INSERT INTO hourly_aggregates | |
| (hour, total_count, count_per_class, avg_speed_kmh, | |
| peak_vehicles, alert_count, frame_count) | |
| VALUES (?, ?, ?, ?, ?, ?, 1)""", | |
| ( | |
| hour, | |
| metrics.total_count, | |
| count_per_class, | |
| metrics.avg_speed_kmh, | |
| metrics.vehicles_in_frame, | |
| len(metrics.alerts), | |
| ), | |
| ) | |
| else: | |
| # Incremental running average for speed | |
| n = row["frame_count"] | |
| new_avg = (row["avg_speed_kmh"] * n + metrics.avg_speed_kmh) / (n + 1) | |
| existing_cpc = json.loads(row["count_per_class"]) | |
| for cls, cnt in metrics.count_per_class.items(): | |
| existing_cpc[cls] = cnt # count_per_class is cumulative, just overwrite | |
| await db.execute( | |
| """UPDATE hourly_aggregates SET | |
| total_count = ?, | |
| count_per_class = ?, | |
| avg_speed_kmh = ?, | |
| peak_vehicles = MAX(peak_vehicles, ?), | |
| alert_count = alert_count + ?, | |
| frame_count = frame_count + 1 | |
| WHERE hour = ?""", | |
| ( | |
| metrics.total_count, | |
| json.dumps(existing_cpc), | |
| round(new_avg, 2), | |
| metrics.vehicles_in_frame, | |
| len(metrics.alerts), | |
| hour, | |
| ), | |
| ) | |
| await db.commit() | |
| # ββ Query helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def fetch_hourly_range( | |
| start_iso: str, end_iso: str | |
| ) -> list[HourlyAggregate]: | |
| async with get_db() as db: | |
| rows = await (await db.execute( | |
| """SELECT * FROM hourly_aggregates | |
| WHERE hour >= ? AND hour <= ? | |
| ORDER BY hour""", | |
| (start_iso, end_iso), | |
| )).fetchall() | |
| result = [] | |
| for row in rows: | |
| result.append(HourlyAggregate( | |
| hour=row["hour"], | |
| total_count=row["total_count"], | |
| count_per_class=json.loads(row["count_per_class"]), | |
| avg_speed_kmh=row["avg_speed_kmh"], | |
| peak_vehicles=row["peak_vehicles"], | |
| alert_count=row["alert_count"], | |
| )) | |
| return result | |
| async def fetch_summary(start_iso: str, end_iso: str) -> dict: | |
| async with get_db() as db: | |
| rows = await (await db.execute( | |
| """SELECT * FROM hourly_aggregates | |
| WHERE hour >= ? AND hour <= ? | |
| ORDER BY hour""", | |
| (start_iso, end_iso), | |
| )).fetchall() | |
| if not rows: | |
| return {} | |
| total_vehicles = 0 | |
| count_per_class: dict[str, int] = {} | |
| speed_sum = 0.0 | |
| frame_count_sum = 0 | |
| peak_count = 0 | |
| peak_hour = None | |
| total_alerts = 0 | |
| for row in rows: | |
| total_vehicles = max(total_vehicles, row["total_count"]) # cumulative, take max | |
| cpc = json.loads(row["count_per_class"]) | |
| for cls, cnt in cpc.items(): | |
| count_per_class[cls] = max(count_per_class.get(cls, 0), cnt) | |
| speed_sum += row["avg_speed_kmh"] * row["frame_count"] | |
| frame_count_sum += row["frame_count"] | |
| total_alerts += row["alert_count"] | |
| if row["peak_vehicles"] > peak_count: | |
| peak_count = row["peak_vehicles"] | |
| peak_hour = row["hour"] | |
| avg_speed = round(speed_sum / frame_count_sum, 2) if frame_count_sum else 0.0 | |
| return { | |
| "period_start": start_iso, | |
| "period_end": end_iso, | |
| "total_vehicles": total_vehicles, | |
| "count_per_class": count_per_class, | |
| "avg_speed_kmh": avg_speed, | |
| "peak_hour": peak_hour, | |
| "peak_count": peak_count, | |
| "total_alerts": total_alerts, | |
| } | |