""" Lightweight in-process cache for high-frequency public reads. """ import asyncio import time from typing import Any from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.models import Notification, Session _active_sessions_cache: list[dict[str, Any]] = [] _active_sessions_expires_at: float = 0.0 _active_sessions_lock = asyncio.Lock() _recent_notifications_cache: list[dict[str, Any]] = [] _recent_notifications_expires_at: float = 0.0 _recent_notifications_lock = asyncio.Lock() def _serialize_session(row: Session) -> dict[str, Any]: return { "id": row.id, "session_name": row.session_name, "start_time": row.start_time.isoformat() if row.start_time else None, "end_time": row.end_time.isoformat() if row.end_time else None, "status": row.status, } def _serialize_notification(row: Notification) -> dict[str, Any]: return { "id": row.id, "message": row.message, "priority": row.priority, "created_by": row.created_by, "created_at": row.created_at.isoformat() if row.created_at else None, } async def get_active_sessions_cached( db: AsyncSession, *, force_refresh: bool = False, ) -> list[dict[str, Any]]: global _active_sessions_cache, _active_sessions_expires_at now = time.monotonic() if not force_refresh and now < _active_sessions_expires_at: return _active_sessions_cache async with _active_sessions_lock: now = time.monotonic() if not force_refresh and now < _active_sessions_expires_at: return _active_sessions_cache rows = ( await db.execute( select(Session) .filter(Session.status == "active") .order_by(Session.start_time.desc()) ) ).scalars().all() _active_sessions_cache = [_serialize_session(row) for row in rows] _active_sessions_expires_at = now + float(settings.ACTIVE_SESSIONS_CACHE_TTL_SECONDS) return _active_sessions_cache async def get_recent_notifications_cached( db: AsyncSession, *, force_refresh: bool = False, ) -> list[dict[str, Any]]: global _recent_notifications_cache, _recent_notifications_expires_at now = time.monotonic() if not force_refresh and now < _recent_notifications_expires_at: return _recent_notifications_cache async with _recent_notifications_lock: now = time.monotonic() if not force_refresh and now < _recent_notifications_expires_at: return _recent_notifications_cache rows = ( await db.execute( select(Notification) .order_by(Notification.created_at.desc()) .limit(settings.RECENT_NOTIFICATIONS_LIMIT) ) ).scalars().all() _recent_notifications_cache = [_serialize_notification(row) for row in rows] _recent_notifications_expires_at = now + float( settings.NOTIFICATIONS_CACHE_TTL_SECONDS ) return _recent_notifications_cache def invalidate_active_sessions_cache() -> None: global _active_sessions_expires_at _active_sessions_expires_at = 0.0 def invalidate_recent_notifications_cache() -> None: global _recent_notifications_expires_at _recent_notifications_expires_at = 0.0