Dmitry Beresnev commited on
Commit
451e369
·
1 Parent(s): c571607

fix ai summaries cache

Browse files
app/pages/05_Dashboard.py CHANGED
@@ -26,7 +26,7 @@ from components.news import (
26
  display_economic_calendar_widget
27
  )
28
  from utils.breaking_news_scorer import get_breaking_news_scorer
29
- from utils.ai_summary_store import init_db, enqueue_items, fetch_summaries, get_status
30
  from utils.ai_summary_worker import start_worker_if_needed
31
 
32
  # Import news scrapers
@@ -267,7 +267,7 @@ with st.sidebar:
267
  force_refresh = st.session_state.get('force_refresh', False)
268
 
269
  # Initialize AI summary store/worker (shared across sessions/processes)
270
- init_db()
271
  start_worker_if_needed()
272
 
273
  # Fetch news from all sources IN PARALLEL for maximum performance
 
26
  display_economic_calendar_widget
27
  )
28
  from utils.breaking_news_scorer import get_breaking_news_scorer
29
+ from utils.ai_summary_store import init_storage, enqueue_items, fetch_summaries, get_status
30
  from utils.ai_summary_worker import start_worker_if_needed
31
 
32
  # Import news scrapers
 
267
  force_refresh = st.session_state.get('force_refresh', False)
268
 
269
  # Initialize AI summary store/worker (shared across sessions/processes)
270
+ init_storage()
271
  start_worker_if_needed()
272
 
273
  # Fetch news from all sources IN PARALLEL for maximum performance
app/utils/ai_summary_store.py CHANGED
@@ -1,114 +1,104 @@
1
- """SQLite-backed AI summary buffer and cache (shared across processes)."""
2
 
 
3
  import os
4
- import sqlite3
5
  import time
6
  from contextlib import contextmanager
7
  from datetime import datetime
8
  from typing import Dict, Iterable, List, Optional, Tuple
9
 
10
- DB_PATH = os.getenv("AI_SUMMARY_DB_PATH", "/tmp/ai_summary_cache.sqlite")
 
 
 
 
 
 
 
 
 
 
 
11
  BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120"))
12
  BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400"))
 
 
13
 
 
 
 
 
14
 
15
- def _connect() -> sqlite3.Connection:
16
- conn = sqlite3.connect(DB_PATH, timeout=30, isolation_level=None)
17
- conn.execute("PRAGMA journal_mode=WAL;")
18
- conn.execute("PRAGMA synchronous=NORMAL;")
19
- return conn
20
 
 
 
 
 
 
21
 
22
- @contextmanager
23
- def _db():
24
- conn = _connect()
25
- try:
26
- yield conn
27
- finally:
28
- conn.close()
29
-
30
-
31
- def init_db():
32
- with _db() as conn:
33
- conn.execute(
34
- """
35
- CREATE TABLE IF NOT EXISTS summary_buffer (
36
- item_key TEXT PRIMARY KEY,
37
- title TEXT NOT NULL,
38
- source TEXT,
39
- created_at REAL NOT NULL
40
- );
41
- """
42
- )
43
- conn.execute(
44
- """
45
- CREATE TABLE IF NOT EXISTS summaries (
46
- item_key TEXT PRIMARY KEY,
47
- title TEXT NOT NULL,
48
- source TEXT,
49
- summary TEXT NOT NULL,
50
- updated_at REAL NOT NULL
51
- );
52
- """
53
- )
54
 
 
 
 
55
 
56
- def _item_key(item: Dict) -> str:
57
- if item.get("id") is not None:
58
- return str(item.get("id"))
59
- title = str(item.get("title", "")).strip()
60
- source = str(item.get("source", "")).strip()
61
- if not title:
62
- return ""
63
- return f"{source}|{title}".lower()
64
 
 
 
65
 
66
- def enqueue_items(items: Iterable[Dict]):
67
- now = time.time()
68
- rows = []
69
- for item in items:
70
- key = _item_key(item)
71
- title = str(item.get("title", "")).strip()
72
- if not key or not title:
73
- continue
74
- source = str(item.get("source", "")).strip()
75
- rows.append((key, title, source, now))
76
-
77
- if not rows:
78
- return
 
 
 
 
79
 
80
- with _db() as conn:
81
- conn.executemany(
82
- """
83
- INSERT OR IGNORE INTO summary_buffer (item_key, title, source, created_at)
84
- VALUES (?, ?, ?, ?)
85
- """,
86
- rows,
87
- )
88
 
89
 
90
  def get_status() -> Dict:
91
- with _db() as conn:
92
- buffer_count = conn.execute("SELECT COUNT(*) FROM summary_buffer").fetchone()[0]
93
- summaries_count = conn.execute("SELECT COUNT(*) FROM summaries").fetchone()[0]
94
- last_update_row = conn.execute("SELECT MAX(updated_at) FROM summaries").fetchone()
95
- buffer_oldest_row = conn.execute("SELECT MIN(created_at) FROM summary_buffer").fetchone()
96
-
97
- last_update = (
98
- datetime.fromtimestamp(last_update_row[0]).strftime("%Y-%m-%d %H:%M:%S")
99
- if last_update_row and last_update_row[0]
100
- else None
101
- )
102
- buffer_oldest = buffer_oldest_row[0] if buffer_oldest_row else None
 
 
 
103
  buffer_remaining = None
104
  if buffer_oldest:
105
  age = time.time() - buffer_oldest
106
  buffer_remaining = max(BUFFER_SECONDS - age, 0)
107
 
 
 
 
 
108
  return {
109
  "buffer_size": buffer_count,
110
  "total_summaries": summaries_count,
111
- "last_update": last_update,
112
  "buffer_remaining_seconds": buffer_remaining,
113
  "batch_max_chars": BATCH_MAX_CHARS,
114
  "buffer_window_seconds": BUFFER_SECONDS,
@@ -116,61 +106,48 @@ def get_status() -> Dict:
116
 
117
 
118
  def fetch_summaries(limit: int = 50) -> List[Dict]:
119
- with _db() as conn:
120
- rows = conn.execute(
121
- """
122
- SELECT title, source, summary, updated_at
123
- FROM summaries
124
- ORDER BY updated_at DESC
125
- LIMIT ?
126
- """,
127
- (limit,),
128
- ).fetchall()
129
 
 
130
  results = []
131
- for title, source, summary, updated_at in rows:
132
  results.append(
133
  {
134
- "title": title,
135
- "source": source or "",
136
- "summary": summary,
137
- "timestamp": datetime.fromtimestamp(updated_at),
138
  }
139
  )
140
  return results
141
 
142
 
143
- def _build_input_text(title: str, source: str) -> str:
144
- if source:
145
- return f"Source: {source}\nTitle: {title}"
146
- return f"Title: {title}"
147
-
148
-
149
  def fetch_ready_batches(max_chars_total: int, buffer_seconds: int) -> List[List[Tuple[str, str, str]]]:
 
150
  cutoff = time.time() - buffer_seconds
151
- with _db() as conn:
152
- rows = conn.execute(
153
- """
154
- SELECT item_key, title, source
155
- FROM summary_buffer
156
- WHERE created_at <= ?
157
- ORDER BY created_at ASC
158
- """,
159
- (cutoff,),
160
- ).fetchall()
161
 
162
  batches: List[List[Tuple[str, str, str]]] = []
163
  current: List[Tuple[str, str, str]] = []
164
  current_chars = 0
165
 
166
- for item_key, title, source in rows:
167
- text = _build_input_text(title, source or "")
 
 
168
  text_len = len(text)
169
  if current and current_chars + text_len > max_chars_total:
170
  batches.append(current)
171
  current = []
172
  current_chars = 0
173
- current.append((item_key, title, source or ""))
174
  current_chars += text_len
175
 
176
  if current:
@@ -180,16 +157,131 @@ def fetch_ready_batches(max_chars_total: int, buffer_seconds: int) -> List[List[
180
 
181
 
182
  def store_summaries(items: List[Tuple[str, str, str, str]]):
 
 
 
 
183
  now = time.time()
184
- with _db() as conn:
185
- conn.executemany(
186
- """
187
- INSERT OR REPLACE INTO summaries (item_key, title, source, summary, updated_at)
188
- VALUES (?, ?, ?, ?, ?)
189
- """,
190
- [(k, t, s, summary, now) for k, t, s, summary in items],
191
- )
192
- conn.executemany(
193
- "DELETE FROM summary_buffer WHERE item_key = ?",
194
- [(k,) for k, _, _, _ in items],
195
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """File-backed AI summary buffer and cache with optional HF dataset sync."""
2
 
3
+ import json
4
  import os
 
5
  import time
6
  from contextlib import contextmanager
7
  from datetime import datetime
8
  from typing import Dict, Iterable, List, Optional, Tuple
9
 
10
+ try:
11
+ import fcntl
12
+ except Exception: # pragma: no cover
13
+ fcntl = None
14
+
15
+ try:
16
+ from huggingface_hub import HfApi, snapshot_download
17
+ except Exception: # pragma: no cover
18
+ HfApi = None
19
+ snapshot_download = None
20
+
21
+ CACHE_DIR = os.getenv("AI_SUMMARY_CACHE_DIR", "./ai-summary-cache")
22
  BUFFER_SECONDS = int(os.getenv("LLM_SUMMARY_BUFFER_SECONDS", "120"))
23
  BATCH_MAX_CHARS = int(os.getenv("LLM_SUMMARY_BATCH_MAX_CHARS", "2400"))
24
+ HF_REPO_ID = os.getenv("AI_SUMMARY_HF_REPO", "ResearchEngineering/ai_news_summaries")
25
+ HF_REPO_TYPE = os.getenv("AI_SUMMARY_HF_REPO_TYPE", "dataset")
26
 
27
+ BUFFER_FILE = "buffer.jsonl"
28
+ SUMMARIES_FILE = "summaries.jsonl"
29
+ META_FILE = "meta.json"
30
+ LOCK_FILE = ".lock"
31
 
 
 
 
 
 
32
 
33
+ def init_storage():
34
+ os.makedirs(CACHE_DIR, exist_ok=True)
35
+ if snapshot_download and HF_REPO_ID:
36
+ _maybe_restore_from_hf()
37
+ _ensure_files()
38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
+ def enqueue_items(items: Iterable[Dict]):
41
+ init_storage()
42
+ now = time.time()
43
 
44
+ with _file_lock():
45
+ buffer_items = _read_jsonl(BUFFER_FILE)
46
+ summaries = _read_jsonl(SUMMARIES_FILE)
 
 
 
 
 
47
 
48
+ existing_keys = {item.get("item_key") for item in buffer_items if item.get("item_key")}
49
+ existing_keys.update({item.get("item_key") for item in summaries if item.get("item_key")})
50
 
51
+ added = 0
52
+ for item in items:
53
+ key = _item_key(item)
54
+ title = str(item.get("title", "")).strip()
55
+ if not key or not title or key in existing_keys:
56
+ continue
57
+ source = str(item.get("source", "")).strip()
58
+ buffer_items.append(
59
+ {
60
+ "item_key": key,
61
+ "title": title,
62
+ "source": source,
63
+ "created_at": now,
64
+ }
65
+ )
66
+ existing_keys.add(key)
67
+ added += 1
68
 
69
+ if added:
70
+ _write_jsonl(BUFFER_FILE, buffer_items)
 
 
 
 
 
 
71
 
72
 
73
  def get_status() -> Dict:
74
+ init_storage()
75
+ with _file_lock():
76
+ buffer_items = _read_jsonl(BUFFER_FILE)
77
+ summaries = _read_jsonl(SUMMARIES_FILE)
78
+
79
+ buffer_count = len(buffer_items)
80
+ summaries_count = len(summaries)
81
+ last_update = None
82
+ if summaries:
83
+ last_update = max(item.get("updated_at", 0) for item in summaries)
84
+
85
+ buffer_oldest = None
86
+ if buffer_items:
87
+ buffer_oldest = min(item.get("created_at", 0) for item in buffer_items)
88
+
89
  buffer_remaining = None
90
  if buffer_oldest:
91
  age = time.time() - buffer_oldest
92
  buffer_remaining = max(BUFFER_SECONDS - age, 0)
93
 
94
+ last_update_text = (
95
+ datetime.fromtimestamp(last_update).strftime("%Y-%m-%d %H:%M:%S") if last_update else None
96
+ )
97
+
98
  return {
99
  "buffer_size": buffer_count,
100
  "total_summaries": summaries_count,
101
+ "last_update": last_update_text,
102
  "buffer_remaining_seconds": buffer_remaining,
103
  "batch_max_chars": BATCH_MAX_CHARS,
104
  "buffer_window_seconds": BUFFER_SECONDS,
 
106
 
107
 
108
  def fetch_summaries(limit: int = 50) -> List[Dict]:
109
+ init_storage()
110
+ with _file_lock():
111
+ summaries = _read_jsonl(SUMMARIES_FILE)
 
 
 
 
 
 
 
112
 
113
+ summaries.sort(key=lambda x: x.get("updated_at", 0), reverse=True)
114
  results = []
115
+ for item in summaries[:limit]:
116
  results.append(
117
  {
118
+ "title": item.get("title", ""),
119
+ "source": item.get("source", ""),
120
+ "summary": item.get("summary", ""),
121
+ "timestamp": datetime.fromtimestamp(item.get("updated_at", time.time())),
122
  }
123
  )
124
  return results
125
 
126
 
 
 
 
 
 
 
127
  def fetch_ready_batches(max_chars_total: int, buffer_seconds: int) -> List[List[Tuple[str, str, str]]]:
128
+ init_storage()
129
  cutoff = time.time() - buffer_seconds
130
+
131
+ with _file_lock():
132
+ buffer_items = _read_jsonl(BUFFER_FILE)
133
+
134
+ eligible = [item for item in buffer_items if item.get("created_at", 0) <= cutoff]
135
+ eligible.sort(key=lambda x: x.get("created_at", 0))
 
 
 
 
136
 
137
  batches: List[List[Tuple[str, str, str]]] = []
138
  current: List[Tuple[str, str, str]] = []
139
  current_chars = 0
140
 
141
+ for item in eligible:
142
+ title = item.get("title", "")
143
+ source = item.get("source", "")
144
+ text = _build_input_text(title, source)
145
  text_len = len(text)
146
  if current and current_chars + text_len > max_chars_total:
147
  batches.append(current)
148
  current = []
149
  current_chars = 0
150
+ current.append((item.get("item_key"), title, source))
151
  current_chars += text_len
152
 
153
  if current:
 
157
 
158
 
159
  def store_summaries(items: List[Tuple[str, str, str, str]]):
160
+ if not items:
161
+ return
162
+
163
+ init_storage()
164
  now = time.time()
165
+
166
+ with _file_lock():
167
+ summaries = _read_jsonl(SUMMARIES_FILE)
168
+ buffer_items = _read_jsonl(BUFFER_FILE)
169
+
170
+ summaries_by_key = {item.get("item_key"): item for item in summaries if item.get("item_key")}
171
+ buffer_by_key = {item.get("item_key"): item for item in buffer_items if item.get("item_key")}
172
+
173
+ for item_key, title, source, summary in items:
174
+ summaries_by_key[item_key] = {
175
+ "item_key": item_key,
176
+ "title": title,
177
+ "source": source,
178
+ "summary": summary,
179
+ "updated_at": now,
180
+ }
181
+ if item_key in buffer_by_key:
182
+ del buffer_by_key[item_key]
183
+
184
+ _write_jsonl(SUMMARIES_FILE, list(summaries_by_key.values()))
185
+ _write_jsonl(BUFFER_FILE, list(buffer_by_key.values()))
186
+
187
+ _write_meta({"last_sync": None, "last_update": now})
188
+ _sync_to_hf_if_configured()
189
+
190
+
191
+ def _item_key(item: Dict) -> str:
192
+ if item.get("id") is not None:
193
+ return str(item.get("id"))
194
+ title = str(item.get("title", "")).strip()
195
+ source = str(item.get("source", "")).strip()
196
+ if not title:
197
+ return ""
198
+ return f"{source}|{title}".lower()
199
+
200
+
201
+ def _build_input_text(title: str, source: str) -> str:
202
+ if source:
203
+ return f"Source: {source}\nTitle: {title}"
204
+ return f"Title: {title}"
205
+
206
+
207
+ def _ensure_files():
208
+ for name in (BUFFER_FILE, SUMMARIES_FILE):
209
+ path = os.path.join(CACHE_DIR, name)
210
+ if not os.path.exists(path):
211
+ with open(path, "w", encoding="utf-8") as f:
212
+ f.write("")
213
+
214
+
215
+ def _read_jsonl(filename: str) -> List[Dict]:
216
+ path = os.path.join(CACHE_DIR, filename)
217
+ if not os.path.exists(path):
218
+ return []
219
+ items = []
220
+ with open(path, "r", encoding="utf-8") as f:
221
+ for line in f:
222
+ line = line.strip()
223
+ if not line:
224
+ continue
225
+ try:
226
+ items.append(json.loads(line))
227
+ except Exception:
228
+ continue
229
+ return items
230
+
231
+
232
+ def _write_jsonl(filename: str, items: List[Dict]):
233
+ path = os.path.join(CACHE_DIR, filename)
234
+ tmp_path = path + ".tmp"
235
+ with open(tmp_path, "w", encoding="utf-8") as f:
236
+ for item in items:
237
+ f.write(json.dumps(item, ensure_ascii=True) + "\n")
238
+ os.replace(tmp_path, path)
239
+
240
+
241
+ def _write_meta(data: Dict):
242
+ path = os.path.join(CACHE_DIR, META_FILE)
243
+ tmp_path = path + ".tmp"
244
+ with open(tmp_path, "w", encoding="utf-8") as f:
245
+ json.dump(data, f)
246
+ os.replace(tmp_path, path)
247
+
248
+
249
+ @contextmanager
250
+ def _file_lock():
251
+ os.makedirs(CACHE_DIR, exist_ok=True)
252
+ lock_path = os.path.join(CACHE_DIR, LOCK_FILE)
253
+ if fcntl is None:
254
+ yield
255
+ return
256
+ with open(lock_path, "w", encoding="utf-8") as lock_file:
257
+ fcntl.flock(lock_file, fcntl.LOCK_EX)
258
+ try:
259
+ yield
260
+ finally:
261
+ fcntl.flock(lock_file, fcntl.LOCK_UN)
262
+
263
+
264
+ def _maybe_restore_from_hf():
265
+ if not snapshot_download:
266
+ return
267
+ if not HF_REPO_ID:
268
+ return
269
+ if os.path.exists(os.path.join(CACHE_DIR, SUMMARIES_FILE)):
270
+ return
271
+ snapshot_download(
272
+ repo_id=HF_REPO_ID,
273
+ repo_type=HF_REPO_TYPE,
274
+ local_dir=CACHE_DIR,
275
+ local_dir_use_symlinks=False,
276
+ )
277
+
278
+
279
+ def _sync_to_hf_if_configured():
280
+ if not HfApi or not HF_REPO_ID:
281
+ return
282
+ api = HfApi()
283
+ api.upload_folder(
284
+ folder_path=CACHE_DIR,
285
+ repo_id=HF_REPO_ID,
286
+ repo_type=HF_REPO_TYPE,
287
+ )
app/utils/ai_summary_worker.py CHANGED
@@ -9,7 +9,7 @@ from typing import List, Tuple
9
 
10
  from utils.llm_summarizer import OpenAICompatSummarizer
11
  from utils.ai_summary_store import (
12
- init_db,
13
  fetch_ready_batches,
14
  store_summaries,
15
  BATCH_MAX_CHARS,
@@ -32,7 +32,7 @@ class Worker:
32
  self._stop = True
33
 
34
  def run(self):
35
- init_db()
36
  signal.signal(signal.SIGTERM, self.stop)
37
  signal.signal(signal.SIGINT, self.stop)
38
 
@@ -107,4 +107,3 @@ def start_worker_if_needed():
107
 
108
  worker = Worker()
109
  worker.run()
110
-
 
9
 
10
  from utils.llm_summarizer import OpenAICompatSummarizer
11
  from utils.ai_summary_store import (
12
+ init_storage,
13
  fetch_ready_batches,
14
  store_summaries,
15
  BATCH_MAX_CHARS,
 
32
  self._stop = True
33
 
34
  def run(self):
35
+ init_storage()
36
  signal.signal(signal.SIGTERM, self.stop)
37
  signal.signal(signal.SIGINT, self.stop)
38
 
 
107
 
108
  worker = Worker()
109
  worker.run()
 
requirements.txt CHANGED
@@ -10,3 +10,4 @@ beautifulsoup4>=4.12.0
10
  lxml>=5.0.0
11
  ntscraper
12
  playwright>=1.40.0
 
 
10
  lxml>=5.0.0
11
  ntscraper
12
  playwright>=1.40.0
13
+ huggingface_hub>=0.22.2