salvirezwan's picture
End-to-end real-time traffic analytics pipeline β€” YOLOv8s ONNX
5b60216
Raw
History Blame Contribute Delete
7.77 kB
"""SQLite persistence layer using aiosqlite.
Stores hourly aggregates of traffic metrics.
In-memory ring buffer holds the last 5 minutes of per-frame metrics.
"""
from __future__ import annotations
import json
import time
from collections import deque
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncGenerator
import aiosqlite
from api.schemas import MetricsMessage, HourlyAggregate
# DB file location β€” overridable via DATABASE_PATH env var
import os as _os
DB_PATH = Path(_os.environ.get("DATABASE_PATH", str(Path(__file__).parent.parent / "data" / "traffic.db")))
# In-memory ring buffer: last 5 min at 25fps β‰ˆ 7500 frames, cap at 10000
_RING_BUFFER_MAX = 10_000
_ring_buffer: deque[MetricsMessage] = deque(maxlen=_RING_BUFFER_MAX)
# ── Schema ────────────────────────────────────────────────────────────────────
_CREATE_TABLES = """
CREATE TABLE IF NOT EXISTS hourly_aggregates (
hour TEXT PRIMARY KEY, -- ISO-8601 hour: 2024-01-15T14:00:00
total_count INTEGER NOT NULL DEFAULT 0,
count_per_class TEXT NOT NULL DEFAULT '{}', -- JSON
avg_speed_kmh REAL NOT NULL DEFAULT 0.0,
peak_vehicles INTEGER NOT NULL DEFAULT 0,
alert_count INTEGER NOT NULL DEFAULT 0,
frame_count INTEGER NOT NULL DEFAULT 0 -- for incremental avg
);
"""
# ── Connection ────────────────────────────────────────────────────────────────
@asynccontextmanager
async def get_db() -> AsyncGenerator[aiosqlite.Connection, None]:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
async with aiosqlite.connect(str(DB_PATH)) as db:
db.row_factory = aiosqlite.Row
yield db
async def init_db() -> None:
"""Create tables if they don't exist. Called at app startup."""
async with get_db() as db:
await db.executescript(_CREATE_TABLES)
await db.commit()
# ── Ring buffer ───────────────────────────────────────────────────────────────
def ring_push(metrics: MetricsMessage) -> None:
"""Append a metrics snapshot to the in-memory ring buffer."""
_ring_buffer.append(metrics)
def ring_recent(window_seconds: float = 300.0) -> list[MetricsMessage]:
"""Return frames from the last `window_seconds` seconds."""
cutoff = time.time() - window_seconds
return [m for m in _ring_buffer if m.timestamp >= cutoff]
def ring_clear() -> None:
_ring_buffer.clear()
# ── Hourly aggregation ────────────────────────────────────────────────────────
def _hour_key(ts: float) -> str:
"""Format timestamp as ISO-8601 hour string."""
import datetime
dt = datetime.datetime.utcfromtimestamp(ts).replace(minute=0, second=0, microsecond=0)
return dt.isoformat()
async def upsert_hourly(metrics: MetricsMessage) -> None:
"""Incrementally update the hourly aggregate row for this frame's timestamp."""
hour = _hour_key(metrics.timestamp)
async with get_db() as db:
row = await (await db.execute(
"SELECT * FROM hourly_aggregates WHERE hour = ?", (hour,)
)).fetchone()
if row is None:
count_per_class = json.dumps(metrics.count_per_class)
await db.execute(
"""INSERT INTO hourly_aggregates
(hour, total_count, count_per_class, avg_speed_kmh,
peak_vehicles, alert_count, frame_count)
VALUES (?, ?, ?, ?, ?, ?, 1)""",
(
hour,
metrics.total_count,
count_per_class,
metrics.avg_speed_kmh,
metrics.vehicles_in_frame,
len(metrics.alerts),
),
)
else:
# Incremental running average for speed
n = row["frame_count"]
new_avg = (row["avg_speed_kmh"] * n + metrics.avg_speed_kmh) / (n + 1)
existing_cpc = json.loads(row["count_per_class"])
for cls, cnt in metrics.count_per_class.items():
existing_cpc[cls] = cnt # count_per_class is cumulative, just overwrite
await db.execute(
"""UPDATE hourly_aggregates SET
total_count = ?,
count_per_class = ?,
avg_speed_kmh = ?,
peak_vehicles = MAX(peak_vehicles, ?),
alert_count = alert_count + ?,
frame_count = frame_count + 1
WHERE hour = ?""",
(
metrics.total_count,
json.dumps(existing_cpc),
round(new_avg, 2),
metrics.vehicles_in_frame,
len(metrics.alerts),
hour,
),
)
await db.commit()
# ── Query helpers ─────────────────────────────────────────────────────────────
async def fetch_hourly_range(
start_iso: str, end_iso: str
) -> list[HourlyAggregate]:
async with get_db() as db:
rows = await (await db.execute(
"""SELECT * FROM hourly_aggregates
WHERE hour >= ? AND hour <= ?
ORDER BY hour""",
(start_iso, end_iso),
)).fetchall()
result = []
for row in rows:
result.append(HourlyAggregate(
hour=row["hour"],
total_count=row["total_count"],
count_per_class=json.loads(row["count_per_class"]),
avg_speed_kmh=row["avg_speed_kmh"],
peak_vehicles=row["peak_vehicles"],
alert_count=row["alert_count"],
))
return result
async def fetch_summary(start_iso: str, end_iso: str) -> dict:
async with get_db() as db:
rows = await (await db.execute(
"""SELECT * FROM hourly_aggregates
WHERE hour >= ? AND hour <= ?
ORDER BY hour""",
(start_iso, end_iso),
)).fetchall()
if not rows:
return {}
total_vehicles = 0
count_per_class: dict[str, int] = {}
speed_sum = 0.0
frame_count_sum = 0
peak_count = 0
peak_hour = None
total_alerts = 0
for row in rows:
total_vehicles = max(total_vehicles, row["total_count"]) # cumulative, take max
cpc = json.loads(row["count_per_class"])
for cls, cnt in cpc.items():
count_per_class[cls] = max(count_per_class.get(cls, 0), cnt)
speed_sum += row["avg_speed_kmh"] * row["frame_count"]
frame_count_sum += row["frame_count"]
total_alerts += row["alert_count"]
if row["peak_vehicles"] > peak_count:
peak_count = row["peak_vehicles"]
peak_hour = row["hour"]
avg_speed = round(speed_sum / frame_count_sum, 2) if frame_count_sum else 0.0
return {
"period_start": start_iso,
"period_end": end_iso,
"total_vehicles": total_vehicles,
"count_per_class": count_per_class,
"avg_speed_kmh": avg_speed,
"peak_hour": peak_hour,
"peak_count": peak_count,
"total_alerts": total_alerts,
}