blackboxai-api / app /runtime_cache.py
teletubbies's picture
Scale runtime for 20-user target load
decd9fb
Raw
History Blame Contribute Delete
3.4 kB
"""
Lightweight in-process cache for high-frequency public reads.
"""
import asyncio
import time
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.models import Notification, Session
_active_sessions_cache: list[dict[str, Any]] = []
_active_sessions_expires_at: float = 0.0
_active_sessions_lock = asyncio.Lock()
_recent_notifications_cache: list[dict[str, Any]] = []
_recent_notifications_expires_at: float = 0.0
_recent_notifications_lock = asyncio.Lock()
def _serialize_session(row: Session) -> dict[str, Any]:
return {
"id": row.id,
"session_name": row.session_name,
"start_time": row.start_time.isoformat() if row.start_time else None,
"end_time": row.end_time.isoformat() if row.end_time else None,
"status": row.status,
}
def _serialize_notification(row: Notification) -> dict[str, Any]:
return {
"id": row.id,
"message": row.message,
"priority": row.priority,
"created_by": row.created_by,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
async def get_active_sessions_cached(
db: AsyncSession,
*,
force_refresh: bool = False,
) -> list[dict[str, Any]]:
global _active_sessions_cache, _active_sessions_expires_at
now = time.monotonic()
if not force_refresh and now < _active_sessions_expires_at:
return _active_sessions_cache
async with _active_sessions_lock:
now = time.monotonic()
if not force_refresh and now < _active_sessions_expires_at:
return _active_sessions_cache
rows = (
await db.execute(
select(Session)
.filter(Session.status == "active")
.order_by(Session.start_time.desc())
)
).scalars().all()
_active_sessions_cache = [_serialize_session(row) for row in rows]
_active_sessions_expires_at = now + float(settings.ACTIVE_SESSIONS_CACHE_TTL_SECONDS)
return _active_sessions_cache
async def get_recent_notifications_cached(
db: AsyncSession,
*,
force_refresh: bool = False,
) -> list[dict[str, Any]]:
global _recent_notifications_cache, _recent_notifications_expires_at
now = time.monotonic()
if not force_refresh and now < _recent_notifications_expires_at:
return _recent_notifications_cache
async with _recent_notifications_lock:
now = time.monotonic()
if not force_refresh and now < _recent_notifications_expires_at:
return _recent_notifications_cache
rows = (
await db.execute(
select(Notification)
.order_by(Notification.created_at.desc())
.limit(settings.RECENT_NOTIFICATIONS_LIMIT)
)
).scalars().all()
_recent_notifications_cache = [_serialize_notification(row) for row in rows]
_recent_notifications_expires_at = now + float(
settings.NOTIFICATIONS_CACHE_TTL_SECONDS
)
return _recent_notifications_cache
def invalidate_active_sessions_cache() -> None:
global _active_sessions_expires_at
_active_sessions_expires_at = 0.0
def invalidate_recent_notifications_cache() -> None:
global _recent_notifications_expires_at
_recent_notifications_expires_at = 0.0