Dmitry Beresnev commited on
Commit
c571607
·
1 Parent(s): 4bd052e

Added a shared SQLite store and buffer.

Browse files

Added a background worker process with retries and batching.
Updated dashboard to enqueue items and read summaries from the shared cache.

app/pages/05_Dashboard.py CHANGED
@@ -26,7 +26,8 @@ from components.news import (
26
  display_economic_calendar_widget
27
  )
28
  from utils.breaking_news_scorer import get_breaking_news_scorer
29
- from utils.ai_summary_cache import ai_summary_cache
 
30
 
31
  # Import news scrapers
32
  try:
@@ -265,6 +266,10 @@ with st.sidebar:
265
  # Check for forced refresh (don't clear yet - wait until after fetching)
266
  force_refresh = st.session_state.get('force_refresh', False)
267
 
 
 
 
 
268
  # Fetch news from all sources IN PARALLEL for maximum performance
269
  import pandas as pd
270
  from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -556,12 +561,10 @@ all_items = []
556
  for df in ai_summary_dfs:
557
  if df.empty:
558
  continue
559
- records = df.to_dict("records")
560
- all_items.extend(records)
561
 
562
  if all_items:
563
- ai_summary_cache.buffer_items(all_items)
564
- ai_summary_cache.maybe_flush()
565
 
566
  # Clear force refresh flag after fetching is complete
567
  if force_refresh:
@@ -875,9 +878,9 @@ ai_summary_pct = (ai_summarized / total_items * 100) if total_items else 0.0
875
  st.markdown("---")
876
  @st.fragment(run_every=60)
877
  def render_ai_summary_section():
878
- summaries, last_update = ai_summary_cache.get_summaries()
879
- status = ai_summary_cache.get_status()
880
- last_update_text = last_update.strftime("%Y-%m-%d %H:%M:%S") if last_update else "N/A"
881
  buffer_remaining = status.get("buffer_remaining_seconds")
882
  buffer_text = "N/A"
883
  if buffer_remaining is not None:
@@ -901,7 +904,7 @@ def render_ai_summary_section():
901
  )
902
 
903
  if summaries:
904
- for item in summaries[:50]:
905
  source = item.get("source", "")
906
  summary = item.get("summary", "")
907
  title = item.get("title", "")
 
26
  display_economic_calendar_widget
27
  )
28
  from utils.breaking_news_scorer import get_breaking_news_scorer
29
+ from utils.ai_summary_store import init_db, enqueue_items, fetch_summaries, get_status
30
+ from utils.ai_summary_worker import start_worker_if_needed
31
 
32
  # Import news scrapers
33
  try:
 
266
  # Check for forced refresh (don't clear yet - wait until after fetching)
267
  force_refresh = st.session_state.get('force_refresh', False)
268
 
269
+ # Initialize AI summary store/worker (shared across sessions/processes)
270
+ init_db()
271
+ start_worker_if_needed()
272
+
273
  # Fetch news from all sources IN PARALLEL for maximum performance
274
  import pandas as pd
275
  from concurrent.futures import ThreadPoolExecutor, as_completed
 
561
  for df in ai_summary_dfs:
562
  if df.empty:
563
  continue
564
+ all_items.extend(df.to_dict("records"))
 
565
 
566
  if all_items:
567
+ enqueue_items(all_items)
 
568
 
569
  # Clear force refresh flag after fetching is complete
570
  if force_refresh:
 
878
  st.markdown("---")
879
  @st.fragment(run_every=60)
880
  def render_ai_summary_section():
881
+ summaries = fetch_summaries(limit=50)
882
+ status = get_status()
883
+ last_update_text = status.get("last_update") or "N/A"
884
  buffer_remaining = status.get("buffer_remaining_seconds")
885
  buffer_text = "N/A"
886
  if buffer_remaining is not None:
 
904
  )
905
 
906
  if summaries:
907
+ for item in summaries:
908
  source = item.get("source", "")
909
  summary = item.get("summary", "")
910
  title = item.get("title", "")
app/utils/ai_summary_store.py ADDED
@@ -0,0 +1,195 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """SQLite-backed AI summary buffer and cache (shared across processes)."""
2
+
3
+ import os
4
+ import sqlite3
5
+ import time
6
+ from contextlib import contextmanager
7
+ from datetime import datetime
8
+ from typing import Dict, Iterable, List, Optional, Tuple
9
+
10
+ DB_PATH = os.getenv("AI_SUMMARY_DB_PATH", "/tmp/ai_summary_cache.sqlite")
11
+ BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120"))
12
+ BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400"))
13
+
14
+
15
+ def _connect() -> sqlite3.Connection:
16
+ conn = sqlite3.connect(DB_PATH, timeout=30, isolation_level=None)
17
+ conn.execute("PRAGMA journal_mode=WAL;")
18
+ conn.execute("PRAGMA synchronous=NORMAL;")
19
+ return conn
20
+
21
+
22
+ @contextmanager
23
+ def _db():
24
+ conn = _connect()
25
+ try:
26
+ yield conn
27
+ finally:
28
+ conn.close()
29
+
30
+
31
+ def init_db():
32
+ with _db() as conn:
33
+ conn.execute(
34
+ """
35
+ CREATE TABLE IF NOT EXISTS summary_buffer (
36
+ item_key TEXT PRIMARY KEY,
37
+ title TEXT NOT NULL,
38
+ source TEXT,
39
+ created_at REAL NOT NULL
40
+ );
41
+ """
42
+ )
43
+ conn.execute(
44
+ """
45
+ CREATE TABLE IF NOT EXISTS summaries (
46
+ item_key TEXT PRIMARY KEY,
47
+ title TEXT NOT NULL,
48
+ source TEXT,
49
+ summary TEXT NOT NULL,
50
+ updated_at REAL NOT NULL
51
+ );
52
+ """
53
+ )
54
+
55
+
56
+ def _item_key(item: Dict) -> str:
57
+ if item.get("id") is not None:
58
+ return str(item.get("id"))
59
+ title = str(item.get("title", "")).strip()
60
+ source = str(item.get("source", "")).strip()
61
+ if not title:
62
+ return ""
63
+ return f"{source}|{title}".lower()
64
+
65
+
66
+ def enqueue_items(items: Iterable[Dict]):
67
+ now = time.time()
68
+ rows = []
69
+ for item in items:
70
+ key = _item_key(item)
71
+ title = str(item.get("title", "")).strip()
72
+ if not key or not title:
73
+ continue
74
+ source = str(item.get("source", "")).strip()
75
+ rows.append((key, title, source, now))
76
+
77
+ if not rows:
78
+ return
79
+
80
+ with _db() as conn:
81
+ conn.executemany(
82
+ """
83
+ INSERT OR IGNORE INTO summary_buffer (item_key, title, source, created_at)
84
+ VALUES (?, ?, ?, ?)
85
+ """,
86
+ rows,
87
+ )
88
+
89
+
90
+ def get_status() -> Dict:
91
+ with _db() as conn:
92
+ buffer_count = conn.execute("SELECT COUNT(*) FROM summary_buffer").fetchone()[0]
93
+ summaries_count = conn.execute("SELECT COUNT(*) FROM summaries").fetchone()[0]
94
+ last_update_row = conn.execute("SELECT MAX(updated_at) FROM summaries").fetchone()
95
+ buffer_oldest_row = conn.execute("SELECT MIN(created_at) FROM summary_buffer").fetchone()
96
+
97
+ last_update = (
98
+ datetime.fromtimestamp(last_update_row[0]).strftime("%Y-%m-%d %H:%M:%S")
99
+ if last_update_row and last_update_row[0]
100
+ else None
101
+ )
102
+ buffer_oldest = buffer_oldest_row[0] if buffer_oldest_row else None
103
+ buffer_remaining = None
104
+ if buffer_oldest:
105
+ age = time.time() - buffer_oldest
106
+ buffer_remaining = max(BUFFER_SECONDS - age, 0)
107
+
108
+ return {
109
+ "buffer_size": buffer_count,
110
+ "total_summaries": summaries_count,
111
+ "last_update": last_update,
112
+ "buffer_remaining_seconds": buffer_remaining,
113
+ "batch_max_chars": BATCH_MAX_CHARS,
114
+ "buffer_window_seconds": BUFFER_SECONDS,
115
+ }
116
+
117
+
118
+ def fetch_summaries(limit: int = 50) -> List[Dict]:
119
+ with _db() as conn:
120
+ rows = conn.execute(
121
+ """
122
+ SELECT title, source, summary, updated_at
123
+ FROM summaries
124
+ ORDER BY updated_at DESC
125
+ LIMIT ?
126
+ """,
127
+ (limit,),
128
+ ).fetchall()
129
+
130
+ results = []
131
+ for title, source, summary, updated_at in rows:
132
+ results.append(
133
+ {
134
+ "title": title,
135
+ "source": source or "",
136
+ "summary": summary,
137
+ "timestamp": datetime.fromtimestamp(updated_at),
138
+ }
139
+ )
140
+ return results
141
+
142
+
143
+ def _build_input_text(title: str, source: str) -> str:
144
+ if source:
145
+ return f"Source: {source}\nTitle: {title}"
146
+ return f"Title: {title}"
147
+
148
+
149
+ def fetch_ready_batches(max_chars_total: int, buffer_seconds: int) -> List[List[Tuple[str, str, str]]]:
150
+ cutoff = time.time() - buffer_seconds
151
+ with _db() as conn:
152
+ rows = conn.execute(
153
+ """
154
+ SELECT item_key, title, source
155
+ FROM summary_buffer
156
+ WHERE created_at <= ?
157
+ ORDER BY created_at ASC
158
+ """,
159
+ (cutoff,),
160
+ ).fetchall()
161
+
162
+ batches: List[List[Tuple[str, str, str]]] = []
163
+ current: List[Tuple[str, str, str]] = []
164
+ current_chars = 0
165
+
166
+ for item_key, title, source in rows:
167
+ text = _build_input_text(title, source or "")
168
+ text_len = len(text)
169
+ if current and current_chars + text_len > max_chars_total:
170
+ batches.append(current)
171
+ current = []
172
+ current_chars = 0
173
+ current.append((item_key, title, source or ""))
174
+ current_chars += text_len
175
+
176
+ if current:
177
+ batches.append(current)
178
+
179
+ return batches
180
+
181
+
182
+ def store_summaries(items: List[Tuple[str, str, str, str]]):
183
+ now = time.time()
184
+ with _db() as conn:
185
+ conn.executemany(
186
+ """
187
+ INSERT OR REPLACE INTO summaries (item_key, title, source, summary, updated_at)
188
+ VALUES (?, ?, ?, ?, ?)
189
+ """,
190
+ [(k, t, s, summary, now) for k, t, s, summary in items],
191
+ )
192
+ conn.executemany(
193
+ "DELETE FROM summary_buffer WHERE item_key = ?",
194
+ [(k,) for k, _, _, _ in items],
195
+ )
app/utils/ai_summary_worker.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Background worker process for AI summarization."""
2
+
3
+ import os
4
+ import time
5
+ import logging
6
+ import signal
7
+ import sqlite3
8
+ from typing import List, Tuple
9
+
10
+ from utils.llm_summarizer import OpenAICompatSummarizer
11
+ from utils.ai_summary_store import (
12
+ init_db,
13
+ fetch_ready_batches,
14
+ store_summaries,
15
+ BATCH_MAX_CHARS,
16
+ BUFFER_SECONDS,
17
+ )
18
+
19
+ logger = logging.getLogger(__name__)
20
+
21
+ PID_FILE = os.getenv("AI_SUMMARY_WORKER_PID", "/tmp/ai_summary_worker.pid")
22
+ POLL_SECONDS = int(os.getenv("AI_SUMMARY_POLL_SECONDS", "5"))
23
+ MAX_RETRIES = int(os.getenv("LLM_SUMMARY_RETRIES", "3"))
24
+
25
+
26
+ class Worker:
27
+ def __init__(self):
28
+ self._stop = False
29
+ self.summarizer = OpenAICompatSummarizer()
30
+
31
+ def stop(self, *_args):
32
+ self._stop = True
33
+
34
+ def run(self):
35
+ init_db()
36
+ signal.signal(signal.SIGTERM, self.stop)
37
+ signal.signal(signal.SIGINT, self.stop)
38
+
39
+ while not self._stop:
40
+ try:
41
+ batches = fetch_ready_batches(BATCH_MAX_CHARS, BUFFER_SECONDS)
42
+ for batch in batches:
43
+ self._process_batch(batch)
44
+ except sqlite3.Error as exc:
45
+ logger.warning(f"AI worker DB error: {exc}")
46
+ except Exception as exc:
47
+ logger.warning(f"AI worker error: {exc}")
48
+
49
+ time.sleep(POLL_SECONDS)
50
+
51
+ def _process_batch(self, batch: List[Tuple[str, str, str]]):
52
+ if not batch or not self.summarizer.enabled:
53
+ return
54
+
55
+ texts = []
56
+ for _, title, source in batch:
57
+ if source:
58
+ texts.append(f"Source: {source}\nTitle: {title}")
59
+ else:
60
+ texts.append(f"Title: {title}")
61
+
62
+ for attempt in range(1, MAX_RETRIES + 1):
63
+ summaries = self.summarizer._summarize_chunk(texts, source="dashboard")
64
+ if summaries and len(summaries) == len(batch):
65
+ break
66
+ if attempt < MAX_RETRIES:
67
+ time.sleep(2 ** attempt)
68
+ else:
69
+ logger.warning("AI worker failed to summarize batch after retries")
70
+ return
71
+
72
+ to_store = []
73
+ for (item_key, title, source), summary in zip(batch, summaries):
74
+ if not summary:
75
+ continue
76
+ to_store.append((item_key, title, source, summary))
77
+
78
+ if to_store:
79
+ store_summaries(to_store)
80
+
81
+
82
+ def _pid_running(pid: int) -> bool:
83
+ try:
84
+ os.kill(pid, 0)
85
+ return True
86
+ except Exception:
87
+ return False
88
+
89
+
90
+ def start_worker_if_needed():
91
+ if os.path.exists(PID_FILE):
92
+ try:
93
+ with open(PID_FILE, "r", encoding="utf-8") as f:
94
+ pid = int(f.read().strip() or 0)
95
+ if pid and _pid_running(pid):
96
+ return
97
+ except Exception:
98
+ pass
99
+
100
+ pid = os.fork()
101
+ if pid != 0:
102
+ return
103
+
104
+ os.setsid()
105
+ with open(PID_FILE, "w", encoding="utf-8") as f:
106
+ f.write(str(os.getpid()))
107
+
108
+ worker = Worker()
109
+ worker.run()
110
+
app/utils/news_cache.py CHANGED
@@ -49,6 +49,7 @@ class NewsCacheManager:
49
  source: str,
50
  fetcher_func: Callable,
51
  force_refresh: bool = False,
 
52
  **kwargs
53
  ) -> List[Dict]:
54
  """
@@ -58,6 +59,7 @@ class NewsCacheManager:
58
  source: News source ('twitter', 'reddit', 'rss', 'ai_tech')
59
  fetcher_func: Function to fetch fresh news
60
  force_refresh: If True, bypass cache and fetch fresh
 
61
  **kwargs: Arguments to pass to fetcher_func
62
 
63
  Returns:
@@ -93,12 +95,13 @@ class NewsCacheManager:
93
  # Update cache
94
  self._update_cache(source, new_items)
95
 
96
- # Deduplicate across sources
97
- deduplicated = self._deduplicate(new_items, source)
 
 
98
 
99
- logger.info(f"✅ Fetched {len(new_items)} items for {source}, {len(deduplicated)} unique after dedup")
100
-
101
- return deduplicated
102
 
103
  except Exception as e:
104
  logger.error(f"Error fetching news for {source}: {e}")
 
49
  source: str,
50
  fetcher_func: Callable,
51
  force_refresh: bool = False,
52
+ deduplicate: bool = False,
53
  **kwargs
54
  ) -> List[Dict]:
55
  """
 
59
  source: News source ('twitter', 'reddit', 'rss', 'ai_tech')
60
  fetcher_func: Function to fetch fresh news
61
  force_refresh: If True, bypass cache and fetch fresh
62
+ deduplicate: If True, remove duplicates across sources using global index
63
  **kwargs: Arguments to pass to fetcher_func
64
 
65
  Returns:
 
95
  # Update cache
96
  self._update_cache(source, new_items)
97
 
98
+ if deduplicate:
99
+ deduplicated = self._deduplicate(new_items, source)
100
+ logger.info(f"✅ Fetched {len(new_items)} items for {source}, {len(deduplicated)} unique after dedup")
101
+ return deduplicated
102
 
103
+ logger.info(f"✅ Fetched {len(new_items)} items for {source} (dedup disabled)")
104
+ return new_items
 
105
 
106
  except Exception as e:
107
  logger.error(f"Error fetching news for {source}: {e}")