| """Shared in-memory AI summary cache with buffering and batching.""" |
|
|
| import os |
| import threading |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional, Tuple |
|
|
| from utils.llm_summarizer import OpenAICompatSummarizer |
|
|
| |
| DEFAULT_BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400")) |
| BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120")) |
|
|
|
|
| class AISummaryCache: |
| def __init__(self): |
| self._lock = threading.Lock() |
| self._buffer: List[Dict] = [] |
| self._buffer_start: Optional[datetime] = None |
| self._summaries: Dict[str, Dict] = {} |
| self._last_update: Optional[datetime] = None |
|
|
| def buffer_items(self, items: List[Dict]): |
| if not items: |
| return |
| with self._lock: |
| for item in items: |
| key = self._item_key(item) |
| if not key or key in self._summaries: |
| continue |
| self._buffer.append(item) |
| if self._buffer and self._buffer_start is None: |
| self._buffer_start = datetime.now() |
|
|
| def maybe_flush(self): |
| with self._lock: |
| if not self._buffer or self._buffer_start is None: |
| return |
| if datetime.now() - self._buffer_start < timedelta(seconds=BUFFER_SECONDS): |
| return |
| items = self._buffer |
| self._buffer = [] |
| self._buffer_start = None |
|
|
| summarizer = OpenAICompatSummarizer() |
| if not summarizer.enabled: |
| return |
|
|
| batches = self._batch_items(items, DEFAULT_BATCH_MAX_CHARS) |
| for batch in batches: |
| texts = [self._build_input_text(item) for item in batch] |
| texts = [t for t in texts if t] |
| if not texts: |
| continue |
| summaries = summarizer._summarize_chunk(texts, source="dashboard") |
| if not summaries: |
| continue |
| with self._lock: |
| for item, summary in zip(batch, summaries): |
| key = self._item_key(item) |
| if not key: |
| continue |
| self._summaries[key] = { |
| "id": item.get("id", key), |
| "title": item.get("title", ""), |
| "source": item.get("source", ""), |
| "summary": summary, |
| "timestamp": datetime.now(), |
| } |
| self._last_update = datetime.now() |
|
|
| def get_summaries(self) -> Tuple[List[Dict], Optional[datetime]]: |
| with self._lock: |
| summaries = list(self._summaries.values()) |
| last_update = self._last_update |
| summaries.sort(key=lambda x: x.get("timestamp", datetime.min), reverse=True) |
| return summaries, last_update |
|
|
| def get_status(self) -> Dict: |
| with self._lock: |
| buffer_size = len(self._buffer) |
| buffer_start = self._buffer_start |
| total_summaries = len(self._summaries) |
| last_update = self._last_update |
| buffer_age_seconds = None |
| buffer_remaining_seconds = None |
| if buffer_start: |
| buffer_age_seconds = (datetime.now() - buffer_start).total_seconds() |
| buffer_remaining_seconds = max(BUFFER_SECONDS - buffer_age_seconds, 0) |
| return { |
| "buffer_size": buffer_size, |
| "buffer_started_at": buffer_start, |
| "buffer_age_seconds": buffer_age_seconds, |
| "buffer_remaining_seconds": buffer_remaining_seconds, |
| "buffer_window_seconds": BUFFER_SECONDS, |
| "total_summaries": total_summaries, |
| "last_update": last_update, |
| "batch_max_chars": DEFAULT_BATCH_MAX_CHARS, |
| } |
|
|
| def _item_key(self, item: Dict) -> str: |
| if item.get("id") is not None: |
| return str(item.get("id")) |
| title = str(item.get("title", "")).strip() |
| source = str(item.get("source", "")).strip() |
| if not title: |
| return "" |
| return f"{source}|{title}".lower() |
|
|
| def _build_input_text(self, item: Dict) -> str: |
| title = str(item.get("title", "")).strip() |
| source = str(item.get("source", "")).strip() |
| if not title: |
| return "" |
| if source: |
| return f"Source: {source}\nTitle: {title}" |
| return f"Title: {title}" |
|
|
| def _batch_items(self, items: List[Dict], max_chars_total: int) -> List[List[Dict]]: |
| if max_chars_total <= 0: |
| return [items] |
| batches: List[List[Dict]] = [] |
| current: List[Dict] = [] |
| current_chars = 0 |
| for item in items: |
| text = self._build_input_text(item) |
| if not text: |
| continue |
| text_len = len(text) |
| if current and current_chars + text_len > max_chars_total: |
| batches.append(current) |
| current = [] |
| current_chars = 0 |
| current.append(item) |
| current_chars += text_len |
| if current: |
| batches.append(current) |
| return batches |
|
|
|
|
| ai_summary_cache = AISummaryCache() |
|
|