"""Shared in-memory AI summary cache with buffering and batching.""" import os import threading from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from utils.llm_summarizer import OpenAICompatSummarizer # Approx 4 chars per token -> 600 tokens ~= 2400 chars DEFAULT_BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400")) BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120")) class AISummaryCache: def __init__(self): self._lock = threading.Lock() self._buffer: List[Dict] = [] self._buffer_start: Optional[datetime] = None self._summaries: Dict[str, Dict] = {} self._last_update: Optional[datetime] = None def buffer_items(self, items: List[Dict]): if not items: return with self._lock: for item in items: key = self._item_key(item) if not key or key in self._summaries: continue self._buffer.append(item) if self._buffer and self._buffer_start is None: self._buffer_start = datetime.now() def maybe_flush(self): with self._lock: if not self._buffer or self._buffer_start is None: return if datetime.now() - self._buffer_start < timedelta(seconds=BUFFER_SECONDS): return items = self._buffer self._buffer = [] self._buffer_start = None summarizer = OpenAICompatSummarizer() if not summarizer.enabled: return batches = self._batch_items(items, DEFAULT_BATCH_MAX_CHARS) for batch in batches: texts = [self._build_input_text(item) for item in batch] texts = [t for t in texts if t] if not texts: continue summaries = summarizer._summarize_chunk(texts, source="dashboard") if not summaries: continue with self._lock: for item, summary in zip(batch, summaries): key = self._item_key(item) if not key: continue self._summaries[key] = { "id": item.get("id", key), "title": item.get("title", ""), "source": item.get("source", ""), "summary": summary, "timestamp": datetime.now(), } self._last_update = datetime.now() def get_summaries(self) -> Tuple[List[Dict], Optional[datetime]]: with self._lock: summaries = list(self._summaries.values()) last_update = self._last_update summaries.sort(key=lambda x: x.get("timestamp", datetime.min), reverse=True) return summaries, last_update def get_status(self) -> Dict: with self._lock: buffer_size = len(self._buffer) buffer_start = self._buffer_start total_summaries = len(self._summaries) last_update = self._last_update buffer_age_seconds = None buffer_remaining_seconds = None if buffer_start: buffer_age_seconds = (datetime.now() - buffer_start).total_seconds() buffer_remaining_seconds = max(BUFFER_SECONDS - buffer_age_seconds, 0) return { "buffer_size": buffer_size, "buffer_started_at": buffer_start, "buffer_age_seconds": buffer_age_seconds, "buffer_remaining_seconds": buffer_remaining_seconds, "buffer_window_seconds": BUFFER_SECONDS, "total_summaries": total_summaries, "last_update": last_update, "batch_max_chars": DEFAULT_BATCH_MAX_CHARS, } def _item_key(self, item: Dict) -> str: if item.get("id") is not None: return str(item.get("id")) title = str(item.get("title", "")).strip() source = str(item.get("source", "")).strip() if not title: return "" return f"{source}|{title}".lower() def _build_input_text(self, item: Dict) -> str: title = str(item.get("title", "")).strip() source = str(item.get("source", "")).strip() if not title: return "" if source: return f"Source: {source}\nTitle: {title}" return f"Title: {title}" def _batch_items(self, items: List[Dict], max_chars_total: int) -> List[List[Dict]]: if max_chars_total <= 0: return [items] batches: List[List[Dict]] = [] current: List[Dict] = [] current_chars = 0 for item in items: text = self._build_input_text(item) if not text: continue text_len = len(text) if current and current_chars + text_len > max_chars_total: batches.append(current) current = [] current_chars = 0 current.append(item) current_chars += text_len if current: batches.append(current) return batches ai_summary_cache = AISummaryCache()