File size: 5,226 Bytes
e189a31 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | """Shared in-memory AI summary cache with buffering and batching."""
import os
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from utils.llm_summarizer import OpenAICompatSummarizer
# Approx 4 chars per token -> 600 tokens ~= 2400 chars
DEFAULT_BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400"))
BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120"))
class AISummaryCache:
def __init__(self):
self._lock = threading.Lock()
self._buffer: List[Dict] = []
self._buffer_start: Optional[datetime] = None
self._summaries: Dict[str, Dict] = {}
self._last_update: Optional[datetime] = None
def buffer_items(self, items: List[Dict]):
if not items:
return
with self._lock:
for item in items:
key = self._item_key(item)
if not key or key in self._summaries:
continue
self._buffer.append(item)
if self._buffer and self._buffer_start is None:
self._buffer_start = datetime.now()
def maybe_flush(self):
with self._lock:
if not self._buffer or self._buffer_start is None:
return
if datetime.now() - self._buffer_start < timedelta(seconds=BUFFER_SECONDS):
return
items = self._buffer
self._buffer = []
self._buffer_start = None
summarizer = OpenAICompatSummarizer()
if not summarizer.enabled:
return
batches = self._batch_items(items, DEFAULT_BATCH_MAX_CHARS)
for batch in batches:
texts = [self._build_input_text(item) for item in batch]
texts = [t for t in texts if t]
if not texts:
continue
summaries = summarizer._summarize_chunk(texts, source="dashboard")
if not summaries:
continue
with self._lock:
for item, summary in zip(batch, summaries):
key = self._item_key(item)
if not key:
continue
self._summaries[key] = {
"id": item.get("id", key),
"title": item.get("title", ""),
"source": item.get("source", ""),
"summary": summary,
"timestamp": datetime.now(),
}
self._last_update = datetime.now()
def get_summaries(self) -> Tuple[List[Dict], Optional[datetime]]:
with self._lock:
summaries = list(self._summaries.values())
last_update = self._last_update
summaries.sort(key=lambda x: x.get("timestamp", datetime.min), reverse=True)
return summaries, last_update
def get_status(self) -> Dict:
with self._lock:
buffer_size = len(self._buffer)
buffer_start = self._buffer_start
total_summaries = len(self._summaries)
last_update = self._last_update
buffer_age_seconds = None
buffer_remaining_seconds = None
if buffer_start:
buffer_age_seconds = (datetime.now() - buffer_start).total_seconds()
buffer_remaining_seconds = max(BUFFER_SECONDS - buffer_age_seconds, 0)
return {
"buffer_size": buffer_size,
"buffer_started_at": buffer_start,
"buffer_age_seconds": buffer_age_seconds,
"buffer_remaining_seconds": buffer_remaining_seconds,
"buffer_window_seconds": BUFFER_SECONDS,
"total_summaries": total_summaries,
"last_update": last_update,
"batch_max_chars": DEFAULT_BATCH_MAX_CHARS,
}
def _item_key(self, item: Dict) -> str:
if item.get("id") is not None:
return str(item.get("id"))
title = str(item.get("title", "")).strip()
source = str(item.get("source", "")).strip()
if not title:
return ""
return f"{source}|{title}".lower()
def _build_input_text(self, item: Dict) -> str:
title = str(item.get("title", "")).strip()
source = str(item.get("source", "")).strip()
if not title:
return ""
if source:
return f"Source: {source}\nTitle: {title}"
return f"Title: {title}"
def _batch_items(self, items: List[Dict], max_chars_total: int) -> List[List[Dict]]:
if max_chars_total <= 0:
return [items]
batches: List[List[Dict]] = []
current: List[Dict] = []
current_chars = 0
for item in items:
text = self._build_input_text(item)
if not text:
continue
text_len = len(text)
if current and current_chars + text_len > max_chars_total:
batches.append(current)
current = []
current_chars = 0
current.append(item)
current_chars += text_len
if current:
batches.append(current)
return batches
ai_summary_cache = AISummaryCache()
|