UnifiedFinancialPlatform / app /utils /ai_summary_cache.py
Dmitry Beresnev
init project
e189a31
"""Shared in-memory AI summary cache with buffering and batching."""
import os
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from utils.llm_summarizer import OpenAICompatSummarizer
# Approx 4 chars per token -> 600 tokens ~= 2400 chars
DEFAULT_BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400"))
BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120"))
class AISummaryCache:
def __init__(self):
self._lock = threading.Lock()
self._buffer: List[Dict] = []
self._buffer_start: Optional[datetime] = None
self._summaries: Dict[str, Dict] = {}
self._last_update: Optional[datetime] = None
def buffer_items(self, items: List[Dict]):
if not items:
return
with self._lock:
for item in items:
key = self._item_key(item)
if not key or key in self._summaries:
continue
self._buffer.append(item)
if self._buffer and self._buffer_start is None:
self._buffer_start = datetime.now()
def maybe_flush(self):
with self._lock:
if not self._buffer or self._buffer_start is None:
return
if datetime.now() - self._buffer_start < timedelta(seconds=BUFFER_SECONDS):
return
items = self._buffer
self._buffer = []
self._buffer_start = None
summarizer = OpenAICompatSummarizer()
if not summarizer.enabled:
return
batches = self._batch_items(items, DEFAULT_BATCH_MAX_CHARS)
for batch in batches:
texts = [self._build_input_text(item) for item in batch]
texts = [t for t in texts if t]
if not texts:
continue
summaries = summarizer._summarize_chunk(texts, source="dashboard")
if not summaries:
continue
with self._lock:
for item, summary in zip(batch, summaries):
key = self._item_key(item)
if not key:
continue
self._summaries[key] = {
"id": item.get("id", key),
"title": item.get("title", ""),
"source": item.get("source", ""),
"summary": summary,
"timestamp": datetime.now(),
}
self._last_update = datetime.now()
def get_summaries(self) -> Tuple[List[Dict], Optional[datetime]]:
with self._lock:
summaries = list(self._summaries.values())
last_update = self._last_update
summaries.sort(key=lambda x: x.get("timestamp", datetime.min), reverse=True)
return summaries, last_update
def get_status(self) -> Dict:
with self._lock:
buffer_size = len(self._buffer)
buffer_start = self._buffer_start
total_summaries = len(self._summaries)
last_update = self._last_update
buffer_age_seconds = None
buffer_remaining_seconds = None
if buffer_start:
buffer_age_seconds = (datetime.now() - buffer_start).total_seconds()
buffer_remaining_seconds = max(BUFFER_SECONDS - buffer_age_seconds, 0)
return {
"buffer_size": buffer_size,
"buffer_started_at": buffer_start,
"buffer_age_seconds": buffer_age_seconds,
"buffer_remaining_seconds": buffer_remaining_seconds,
"buffer_window_seconds": BUFFER_SECONDS,
"total_summaries": total_summaries,
"last_update": last_update,
"batch_max_chars": DEFAULT_BATCH_MAX_CHARS,
}
def _item_key(self, item: Dict) -> str:
if item.get("id") is not None:
return str(item.get("id"))
title = str(item.get("title", "")).strip()
source = str(item.get("source", "")).strip()
if not title:
return ""
return f"{source}|{title}".lower()
def _build_input_text(self, item: Dict) -> str:
title = str(item.get("title", "")).strip()
source = str(item.get("source", "")).strip()
if not title:
return ""
if source:
return f"Source: {source}\nTitle: {title}"
return f"Title: {title}"
def _batch_items(self, items: List[Dict], max_chars_total: int) -> List[List[Dict]]:
if max_chars_total <= 0:
return [items]
batches: List[List[Dict]] = []
current: List[Dict] = []
current_chars = 0
for item in items:
text = self._build_input_text(item)
if not text:
continue
text_len = len(text)
if current and current_chars + text_len > max_chars_total:
batches.append(current)
current = []
current_chars = 0
current.append(item)
current_chars += text_len
if current:
batches.append(current)
return batches
ai_summary_cache = AISummaryCache()