File size: 5,226 Bytes
e189a31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Shared in-memory AI summary cache with buffering and batching."""

import os
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple

from utils.llm_summarizer import OpenAICompatSummarizer

# Approx 4 chars per token -> 600 tokens ~= 2400 chars
DEFAULT_BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400"))
BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120"))


class AISummaryCache:
    def __init__(self):
        self._lock = threading.Lock()
        self._buffer: List[Dict] = []
        self._buffer_start: Optional[datetime] = None
        self._summaries: Dict[str, Dict] = {}
        self._last_update: Optional[datetime] = None

    def buffer_items(self, items: List[Dict]):
        if not items:
            return
        with self._lock:
            for item in items:
                key = self._item_key(item)
                if not key or key in self._summaries:
                    continue
                self._buffer.append(item)
            if self._buffer and self._buffer_start is None:
                self._buffer_start = datetime.now()

    def maybe_flush(self):
        with self._lock:
            if not self._buffer or self._buffer_start is None:
                return
            if datetime.now() - self._buffer_start < timedelta(seconds=BUFFER_SECONDS):
                return
            items = self._buffer
            self._buffer = []
            self._buffer_start = None

        summarizer = OpenAICompatSummarizer()
        if not summarizer.enabled:
            return

        batches = self._batch_items(items, DEFAULT_BATCH_MAX_CHARS)
        for batch in batches:
            texts = [self._build_input_text(item) for item in batch]
            texts = [t for t in texts if t]
            if not texts:
                continue
            summaries = summarizer._summarize_chunk(texts, source="dashboard")
            if not summaries:
                continue
            with self._lock:
                for item, summary in zip(batch, summaries):
                    key = self._item_key(item)
                    if not key:
                        continue
                    self._summaries[key] = {
                        "id": item.get("id", key),
                        "title": item.get("title", ""),
                        "source": item.get("source", ""),
                        "summary": summary,
                        "timestamp": datetime.now(),
                    }
                self._last_update = datetime.now()

    def get_summaries(self) -> Tuple[List[Dict], Optional[datetime]]:
        with self._lock:
            summaries = list(self._summaries.values())
            last_update = self._last_update
        summaries.sort(key=lambda x: x.get("timestamp", datetime.min), reverse=True)
        return summaries, last_update

    def get_status(self) -> Dict:
        with self._lock:
            buffer_size = len(self._buffer)
            buffer_start = self._buffer_start
            total_summaries = len(self._summaries)
            last_update = self._last_update
        buffer_age_seconds = None
        buffer_remaining_seconds = None
        if buffer_start:
            buffer_age_seconds = (datetime.now() - buffer_start).total_seconds()
            buffer_remaining_seconds = max(BUFFER_SECONDS - buffer_age_seconds, 0)
        return {
            "buffer_size": buffer_size,
            "buffer_started_at": buffer_start,
            "buffer_age_seconds": buffer_age_seconds,
            "buffer_remaining_seconds": buffer_remaining_seconds,
            "buffer_window_seconds": BUFFER_SECONDS,
            "total_summaries": total_summaries,
            "last_update": last_update,
            "batch_max_chars": DEFAULT_BATCH_MAX_CHARS,
        }

    def _item_key(self, item: Dict) -> str:
        if item.get("id") is not None:
            return str(item.get("id"))
        title = str(item.get("title", "")).strip()
        source = str(item.get("source", "")).strip()
        if not title:
            return ""
        return f"{source}|{title}".lower()

    def _build_input_text(self, item: Dict) -> str:
        title = str(item.get("title", "")).strip()
        source = str(item.get("source", "")).strip()
        if not title:
            return ""
        if source:
            return f"Source: {source}\nTitle: {title}"
        return f"Title: {title}"

    def _batch_items(self, items: List[Dict], max_chars_total: int) -> List[List[Dict]]:
        if max_chars_total <= 0:
            return [items]
        batches: List[List[Dict]] = []
        current: List[Dict] = []
        current_chars = 0
        for item in items:
            text = self._build_input_text(item)
            if not text:
                continue
            text_len = len(text)
            if current and current_chars + text_len > max_chars_total:
                batches.append(current)
                current = []
                current_chars = 0
            current.append(item)
            current_chars += text_len
        if current:
            batches.append(current)
        return batches


ai_summary_cache = AISummaryCache()