DivYonko commited on
Commit
906e964
Β·
1 Parent(s): 2965fd0

fix: replace in-memory store with SQLite so scraper and UI share data across workers

Browse files
Files changed (1) hide show
  1. app.py +53 -24
app.py CHANGED
@@ -1,8 +1,10 @@
1
  ο»Ώ# -*- coding: utf-8 -*-
2
  """
3
  app.py β€” Hugging Face Spaces adaptation of frontend/streamlit_app.py
4
- All features identical; infrastructure layer uses in-memory deque store
5
  and threading instead of Redis + subprocess.
 
 
6
  """
7
 
8
  import streamlit as st
@@ -15,37 +17,52 @@ import re
15
  import os
16
  import threading
17
  import logging
 
18
  from collections import deque, defaultdict
19
  from datetime import datetime, timedelta
20
 
21
- # ── In-memory store (replaces Redis) ─────────────────────────────────────────
 
 
22
  MAX_STORE_MESSAGES = 10000
23
 
24
- _STORE_LOCK: threading.Lock = threading.Lock()
25
- _STORE: dict[str, deque] = {} # keyed by redis_key string
26
- _META: dict[str, str] = {} # misc key-value (e.g. "video_title")
27
 
28
  # Scraper thread registry
29
  _SCRAPER_THREADS: dict[str, threading.Thread] = {}
30
  _SCRAPER_STOP: dict[str, threading.Event] = {}
31
 
32
 
33
- def _get_deque(key: str) -> deque:
34
- """Return (creating if needed) the deque for a given key."""
35
- if key not in _STORE:
36
- _STORE[key] = deque(maxlen=MAX_STORE_MESSAGES)
37
- return _STORE[key]
 
 
 
 
 
 
 
 
 
 
 
 
38
 
39
 
40
- # Redis-compatible helpers
41
  def store_lrange(key: str, start: int, end: int) -> list[str]:
42
- """Emulate r.lrange(key, start, end)."""
43
- with _STORE_LOCK:
44
- d = list(_get_deque(key))
45
- n = len(d)
 
 
 
46
  if n == 0:
47
  return []
48
- # Normalise negative indices
49
  if start < 0:
50
  start = max(n + start, 0)
51
  if end < 0:
@@ -53,23 +70,35 @@ def store_lrange(key: str, start: int, end: int) -> list[str]:
53
  end = min(end, n - 1)
54
  if start > end:
55
  return []
56
- return d[start : end + 1]
57
 
58
 
59
  def store_llen(key: str) -> int:
60
- with _STORE_LOCK:
61
- return len(_get_deque(key))
 
 
 
62
 
63
 
64
  def store_delete(key: str) -> None:
65
- with _STORE_LOCK:
66
- if key in _STORE:
67
- _STORE[key].clear()
68
 
69
 
70
  def store_rpush(key: str, value: str) -> None:
71
- with _STORE_LOCK:
72
- _get_deque(key).append(value)
 
 
 
 
 
 
 
 
 
73
 
74
 
75
  # ── Inline config (replaces backend/config.py) ────────────────────────────────
 
1
  ο»Ώ# -*- coding: utf-8 -*-
2
  """
3
  app.py β€” Hugging Face Spaces adaptation of frontend/streamlit_app.py
4
+ All features identical; infrastructure layer uses SQLite store
5
  and threading instead of Redis + subprocess.
6
+ SQLite is used so the scraper thread and Streamlit UI share the same data
7
+ even when running in a multi-worker environment.
8
  """
9
 
10
  import streamlit as st
 
17
  import os
18
  import threading
19
  import logging
20
+ import sqlite3
21
  from collections import deque, defaultdict
22
  from datetime import datetime, timedelta
23
 
24
+ # ── SQLite store (replaces in-memory deque) ───────────────────────────────────
25
+ # Stored in /tmp so it persists for the lifetime of the container process
26
+ DB_PATH = "/tmp/livepulse.db"
27
  MAX_STORE_MESSAGES = 10000
28
 
29
+ _DB_LOCK = threading.Lock()
30
+ _META: dict[str, str] = {} # misc key-value (e.g. "video_title", "scraper_error")
 
31
 
32
  # Scraper thread registry
33
  _SCRAPER_THREADS: dict[str, threading.Thread] = {}
34
  _SCRAPER_STOP: dict[str, threading.Event] = {}
35
 
36
 
37
+ def _get_db() -> sqlite3.Connection:
38
+ """Return a thread-local SQLite connection."""
39
+ conn = sqlite3.connect(DB_PATH, check_same_thread=False)
40
+ conn.execute("""
41
+ CREATE TABLE IF NOT EXISTS messages (
42
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
43
+ key TEXT NOT NULL,
44
+ value TEXT NOT NULL
45
+ )
46
+ """)
47
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_key ON messages(key)")
48
+ conn.commit()
49
+ return conn
50
+
51
+
52
+ # Initialize DB on import
53
+ _db_conn = _get_db()
54
 
55
 
 
56
  def store_lrange(key: str, start: int, end: int) -> list[str]:
57
+ """Emulate r.lrange(key, start, end) β€” returns rows in insertion order."""
58
+ with _DB_LOCK:
59
+ rows = _db_conn.execute(
60
+ "SELECT value FROM messages WHERE key=? ORDER BY id ASC", (key,)
61
+ ).fetchall()
62
+ values = [r[0] for r in rows]
63
+ n = len(values)
64
  if n == 0:
65
  return []
 
66
  if start < 0:
67
  start = max(n + start, 0)
68
  if end < 0:
 
70
  end = min(end, n - 1)
71
  if start > end:
72
  return []
73
+ return values[start: end + 1]
74
 
75
 
76
  def store_llen(key: str) -> int:
77
+ with _DB_LOCK:
78
+ row = _db_conn.execute(
79
+ "SELECT COUNT(*) FROM messages WHERE key=?", (key,)
80
+ ).fetchone()
81
+ return row[0] if row else 0
82
 
83
 
84
  def store_delete(key: str) -> None:
85
+ with _DB_LOCK:
86
+ _db_conn.execute("DELETE FROM messages WHERE key=?", (key,))
87
+ _db_conn.commit()
88
 
89
 
90
  def store_rpush(key: str, value: str) -> None:
91
+ with _DB_LOCK:
92
+ _db_conn.execute(
93
+ "INSERT INTO messages (key, value) VALUES (?, ?)", (key, value)
94
+ )
95
+ # Trim to MAX_STORE_MESSAGES per key
96
+ _db_conn.execute("""
97
+ DELETE FROM messages WHERE key=? AND id NOT IN (
98
+ SELECT id FROM messages WHERE key=? ORDER BY id DESC LIMIT ?
99
+ )
100
+ """, (key, key, MAX_STORE_MESSAGES))
101
+ _db_conn.commit()
102
 
103
 
104
  # ── Inline config (replaces backend/config.py) ────────────────────────────────