# -*- coding: utf-8 -*- """ app.py � Hugging Face Spaces adaptation of frontend/streamlit_app.py Infrastructure: SQLite store + threading scraper (no Redis, no subprocess). UI: identical to frontend/streamlit_app.py. """ import streamlit as st import json import pandas as pd import plotly.graph_objects as go import plotly.express as px import time import re import os import threading import logging import sqlite3 from collections import deque, defaultdict from datetime import datetime, timedelta # -- SQLite store (replaces in-memory deque) ----------------------------------- # Stored in /tmp so it persists for the lifetime of the container process DB_PATH = "/tmp/livepulse.db" MAX_STORE_MESSAGES = 100_000 _DB_LOCK = threading.Lock() _META: dict[str, str] = {} # misc key-value (e.g. "video_title", "scraper_error") # Scraper thread registry _SCRAPER_THREADS: dict[str, threading.Thread] = {} _SCRAPER_STOP: dict[str, threading.Event] = {} def _get_db() -> sqlite3.Connection: """Return a thread-local SQLite connection.""" conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.execute(""" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL, value TEXT NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_key ON messages(key)") conn.commit() return conn # Initialize DB on import _db_conn = _get_db() def store_lrange(key: str, start: int, end: int) -> list[str]: """Emulate r.lrange(key, start, end) � returns rows in insertion order.""" with _DB_LOCK: rows = _db_conn.execute( "SELECT value FROM messages WHERE key=? ORDER BY id ASC", (key,) ).fetchall() values = [r[0] for r in rows] n = len(values) if n == 0: return [] if start < 0: start = max(n + start, 0) if end < 0: end = n + end end = min(end, n - 1) if start > end: return [] return values[start: end + 1] def store_llen(key: str) -> int: with _DB_LOCK: row = _db_conn.execute( "SELECT COUNT(*) FROM messages WHERE key=?", (key,) ).fetchone() return row[0] if row else 0 def store_delete(key: str) -> None: with _DB_LOCK: _db_conn.execute("DELETE FROM messages WHERE key=?", (key,)) _db_conn.commit() def store_rpush(key: str, value: str) -> None: with _DB_LOCK: _db_conn.execute( "INSERT INTO messages (key, value) VALUES (?, ?)", (key, value) ) # Trim to MAX_STORE_MESSAGES per key _db_conn.execute(""" DELETE FROM messages WHERE key=? AND id NOT IN ( SELECT id FROM messages WHERE key=? ORDER BY id DESC LIMIT ? ) """, (key, key, MAX_STORE_MESSAGES)) _db_conn.commit() # -- Inline config (replaces backend/config.py) -------------------------------- VIDEO_ID = os.getenv("VIDEO_ID", "") # -- ML imports (ml/ is at workspace root) ------------------------------------ from ml.sentiment_model import predict_sentiment from ml.topic_model import predict_topic, VALID_TOPICS from ml.action_type_model import predict_action_type, VALID_ACTION_TYPES # -- Scraper thread logic (mirrors backend/scraper.py run()) ------------------ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", force=True, ) logger = logging.getLogger("app.scraper") def _safe_sentiment(text: str): try: return predict_sentiment(text) except Exception as exc: logger.error("predict_sentiment failed: %s", exc) return "Neutral", 0.50 def _safe_topic(text: str): try: topic, conf = predict_topic(text) if topic not in VALID_TOPICS: return "General", 0.50 return topic, conf except Exception as exc: logger.error("predict_topic failed: %s", exc) return "General", 0.50 def _safe_action_type(text: str): try: action_type, conf = predict_action_type(text) if action_type not in VALID_ACTION_TYPES: return "N/A", 0.50 return action_type, conf except Exception as exc: logger.error("predict_action_type failed: %s", exc) return "N/A", 0.50 def _get_live_chat_id(video_id: str, api_key: str) -> str | None: """Fetch the liveChatId for a given video using YouTube Data API v3.""" import urllib.request import urllib.parse import urllib.error url = ( "https://www.googleapis.com/youtube/v3/videos" f"?part=liveStreamingDetails&id={urllib.parse.quote(video_id)}&key={api_key}" ) try: with urllib.request.urlopen(url, timeout=10) as resp: data = json.loads(resp.read()) logger.info("YouTube API response for %s: %s", video_id, json.dumps(data)[:500]) items = data.get("items", []) if not items: logger.error("No video found for id=%s (items empty). Check if video ID is correct and API key is valid.", video_id) return None live_details = items[0].get("liveStreamingDetails", {}) chat_id = live_details.get("activeLiveChatId") if not chat_id: logger.error("No activeLiveChatId for video=%s. liveStreamingDetails=%s", video_id, live_details) return chat_id except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace")[:500] logger.error("HTTP %d from YouTube API for video=%s: %s", exc.code, video_id, body) return None except Exception as exc: logger.error("Failed to get liveChatId: %s", exc) return None def _fetch_chat_messages(live_chat_id: str, api_key: str, page_token: str | None = None): """ Fetch one page of live chat messages. Returns (messages_list, next_page_token, polling_interval_ms). """ import urllib.request import urllib.parse params = { "part": "snippet,authorDetails", "liveChatId": live_chat_id, "key": api_key, "maxResults": "200", } if page_token: params["pageToken"] = page_token url = "https://www.googleapis.com/youtube/v3/liveChat/messages?" + urllib.parse.urlencode(params) try: with urllib.request.urlopen(url, timeout=10) as resp: data = json.loads(resp.read()) messages = data.get("items", []) next_token = data.get("nextPageToken") poll_interval = data.get("pollingIntervalMillis", 5000) logger.info("Fetched %d chat messages (nextPageToken=%s)", len(messages), bool(next_token)) return messages, next_token, poll_interval except Exception as exc: logger.error("Failed to fetch chat messages: %s", exc) return [], None, 5000 def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Event, min_poll_s: float = 10.0, api_key: str = "") -> None: """Background thread � scrapes live chat via YouTube Data API v3.""" # Use passed key first, fall back to environment variable if not api_key: api_key = os.getenv("YOUTUBE_API_KEY", "") logger.info("YOUTUBE_API_KEY present: %s (length=%d)", bool(api_key), len(api_key)) if not api_key: msg = "No API key provided. Enter your YouTube Data API v3 key in the sidebar." logger.error(msg) _META["scraper_error"] = msg return logger.info("Scraper thread starting � video=%s key=%s", video_id, redis_key) _META.pop("scraper_error", None) # Step 1: get the live chat ID live_chat_id = _get_live_chat_id(video_id, api_key) if not live_chat_id: msg = f"No active live chat found for video '{video_id}'. Make sure the stream is currently LIVE." logger.error(msg) _META["scraper_error"] = msg return logger.info("Live chat ID obtained: %s", live_chat_id) # Step 2: poll for messages page_token = None seen_ids: set = set() # avoid reprocessing messages on first page is_first_page = True # skip ML on backlog to avoid startup delay while not stop_event.is_set(): messages, page_token, poll_ms = _fetch_chat_messages(live_chat_id, api_key, page_token) new_msgs = [] for item in messages: if stop_event.is_set(): break msg_id = item.get("id", "") if msg_id in seen_ids: continue seen_ids.add(msg_id) snippet = item.get("snippet", {}) if snippet.get("type") != "textMessageEvent": continue text = snippet.get("displayMessage", "").strip() # Convert any :emoji_name: codes back to actual emoji characters import emoji as _emoji text = _emoji.emojize(text, language="alias") author = item.get("authorDetails", {}).get("displayName", "Unknown") if not text: continue new_msgs.append((msg_id, text, author)) # On the first page (backlog), store messages with placeholder sentiment # so the UI shows something immediately, then process ML on subsequent pages if is_first_page and new_msgs: logger.info("First page: storing %d backlog messages with placeholder sentiment", len(new_msgs)) for _, text, author in new_msgs: message_data = { "author": author, "text": text, "sentiment": "Neutral", "confidence": 0.5, "topic": "General", "topic_conf": 0.5, "action_type": "N/A", "action_type_conf": 0.5, "time": datetime.now().isoformat(), } store_rpush(redis_key, json.dumps(message_data)) logger.info("Backlog stored: %d messages now in store", store_llen(redis_key)) is_first_page = False else: # Normal processing with full ML inference for _, text, author in new_msgs: if stop_event.is_set(): break try: sentiment, s_conf = _safe_sentiment(text) topic, t_conf = _safe_topic(text) # Only classify action type for Question/Request topics if topic in ("Question", "Request/Feedback"): action_type, at_conf = _safe_action_type(text) else: action_type, at_conf = "N/A", 0.50 except Exception as exc: logger.error("ML inference failed for text=%r: %s", text[:50], exc) sentiment, s_conf = "Neutral", 0.5 topic, t_conf = "General", 0.5 action_type, at_conf = "N/A", 0.5 message_data = { "author": author, "text": text, "sentiment": sentiment, "confidence": round(s_conf, 3), "topic": topic, "topic_conf": round(t_conf, 3), "action_type": action_type, "action_type_conf": round(at_conf, 3), "time": datetime.now().isoformat(), } store_rpush(redis_key, json.dumps(message_data)) if new_msgs: logger.info("Processed %d new messages, store size=%d", len(new_msgs), store_llen(redis_key)) # keep seen_ids from growing unbounded if len(seen_ids) > 5000: seen_ids = set(list(seen_ids)[-2000:]) # Respect YouTube's requested polling interval, but never faster than min_poll_s wait_s = max(poll_ms / 1000, min_poll_s) stop_event.wait(timeout=wait_s) logger.info("Scraper thread ended � key=%s", redis_key) def start_scraper(slot_idx: int, video_id: str, redis_key: str, min_poll_s: float = 10.0, api_key: str = "") -> None: """Start a scraper thread for the given slot, stopping any existing one first.""" key = str(slot_idx) stop_scraper(slot_idx) stop_event = threading.Event() t = threading.Thread( target=_scraper_thread_fn, args=(video_id, redis_key, stop_event, min_poll_s, api_key), daemon=True, name=f"scraper-{slot_idx}", ) _SCRAPER_STOP[key] = stop_event _SCRAPER_THREADS[key] = t t.start() def stop_scraper(slot_idx: int) -> None: """Signal the scraper thread for this slot to stop.""" key = str(slot_idx) ev = _SCRAPER_STOP.get(key) if ev: ev.set() # Don't join � daemon thread will die on its own def is_scraper_running(slot_idx: int) -> bool: key = str(slot_idx) t = _SCRAPER_THREADS.get(key) return t is not None and t.is_alive() # -- Streamlit page config ----------------------------------------------------- st.set_page_config( page_title="LivePulse", layout="wide", page_icon="\U0001F4E1", initial_sidebar_state="expanded" ) TOPIC_LABELS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"] TOPIC_COLOR = { "Appreciation": "#f59e0b", "Question": "#3b82f6", "Request/Feedback": "#8b5cf6", "Promo": "#ec4899", "Spam": "#ef4444", "General": "#6b7280", "MCQ Answer": "#10b981" } SENT_COLORS = {"Positive": "#22c55e", "Neutral": "#eab308", "Negative": "#ef4444"} # -- JS: detect Streamlit's live theme and set data-livepulse attribute -- THEME_JS = """""" CSS = """""" st.markdown(THEME_JS, unsafe_allow_html=True) st.markdown(CSS, unsafe_allow_html=True) # -- HELPERS -------------------------------------------------- def extract_video_id(url_or_id): url_or_id = url_or_id.strip() match = re.search(r"(?:v=|/live/|youtu\.be/)([A-Za-z0-9_-]{11})", url_or_id) if match: return match.group(1) if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id): return url_or_id return url_or_id def fetch_video_title(video_id): """Try oembed first (works for non-live), then YouTube Data API v3 (works for live).""" import urllib.request import urllib.parse # Try oembed first (fast, no API key needed) try: url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json" with urllib.request.urlopen(url, timeout=5) as resp: title = json.loads(resp.read()).get("title") if title: return title except Exception: pass # Fallback: YouTube Data API v3 (works for live streams) try: api_key = os.getenv("YOUTUBE_API_KEY", "") if api_key: url = ( "https://www.googleapis.com/youtube/v3/videos" f"?part=snippet&id={urllib.parse.quote(video_id)}&key={api_key}" ) with urllib.request.urlopen(url, timeout=5) as resp: data = json.loads(resp.read()) items = data.get("items", []) if items: return items[0]["snippet"]["title"] except Exception: pass return None def clean_topic(val): if pd.isna(val) or str(val).strip() == "" or str(val).strip().lower() == "nan": return "General" return str(val).strip() def clean_sentiment(val): if str(val).strip() in ("Positive", "Negative", "Neutral"): return str(val).strip() return "Neutral" def plotly_layout(height=280): return dict( paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(0,0,0,0)", height=height, margin=dict(l=10, r=10, t=10, b=10), font=dict(family="Space Grotesk"), xaxis=dict(showgrid=False, zeroline=False, showline=False, tickfont=dict(size=11), title=None), yaxis=dict(showgrid=True, gridcolor="rgba(128,128,128,0.12)", zeroline=False, showline=False, tickfont=dict(size=11), title=None), showlegend=False, hoverlabel=dict(font_family="Space Grotesk", font_size=12), ) def csv_download(df_export, label, filename): csv = df_export.to_csv(index=False).encode("utf-8") st.download_button(label=f"\u2b07 {label}", data=csv, file_name=filename, mime="text/csv", key=filename) def load_stream_data(redis_key: str, limit: int | None = None): """Load and parse messages from the in-memory store (no cache � store is in-memory).""" if limit: raws = store_lrange(redis_key, -limit, -1) else: raws = store_lrange(redis_key, 0, -1) data = [] for raw in raws: try: data.append(json.loads(raw)) except Exception: pass return data @st.cache_data(ttl=10, show_spinner=False) def compute_velocity(df_all_json: str, window: int = 20) -> dict: """Compute sentiment velocity. Accepts JSON string for cache key compatibility.""" import json as _json sentiments = [m.get("sentiment", "Neutral") for m in _json.loads(df_all_json)] n = len(sentiments) if n < window * 2: return {"direction": "\u2192", "delta": 0.0, "label": "Stable", "color": "#eab308"} recent = sentiments[-window:] prev = sentiments[-window*2:-window] r_pos = sum(1 for s in recent if s == "Positive") / window p_pos = sum(1 for s in prev if s == "Positive") / window delta = r_pos - p_pos if delta > 0.08: return {"direction": "\u2191", "delta": delta, "label": "Rising", "color": "#22c55e"} elif delta < -0.08: return {"direction": "\u2193", "delta": delta, "label": "Falling", "color": "#ef4444"} return {"direction": "\u2192", "delta": delta, "label": "Stable", "color": "#eab308"} @st.cache_data(ttl=10, show_spinner=False) def build_heatmap_data(df_all_json: str, bucket_minutes: int = 1) -> pd.DataFrame: """Bucket messages into time intervals.""" import json as _json records = _json.loads(df_all_json) if not records: return pd.DataFrame() df_t = pd.DataFrame(records) if "time" not in df_t.columns: return pd.DataFrame() df_t["time"] = pd.to_datetime(df_t["time"], errors="coerce") df_t = df_t.dropna(subset=["time"]) if df_t.empty: return pd.DataFrame() df_t["bucket"] = df_t["time"].dt.floor(f"{bucket_minutes}min") grouped = df_t.groupby(["bucket", "sentiment"]).size().unstack(fill_value=0) for col in ["Positive", "Neutral", "Negative"]: if col not in grouped.columns: grouped[col] = 0 grouped = grouped.reset_index() grouped.columns.name = None return grouped[["bucket", "Positive", "Neutral", "Negative"]] def check_alert(df_all: pd.DataFrame, threshold: float = 0.4, window: int = 15) -> dict | None: """Return alert info if negative ratio in last `window` messages exceeds threshold.""" if len(df_all) < window: return None recent = df_all.iloc[-window:] neg_ratio = (recent["sentiment"] == "Negative").mean() if neg_ratio >= threshold: return { "neg_ratio": neg_ratio, "count": int((recent["sentiment"] == "Negative").sum()), "window": window, } return None @st.cache_data(ttl=10, show_spinner=False) def compute_engagement(all_data_json: str, window: int = 50) -> dict: """Engagement score (0-100) = weighted combo of message rate, positive ratio, question density.""" import json as _j msgs = _j.loads(all_data_json) if not msgs: return {"score": 0, "rate": 0.0, "pos_ratio": 0.0, "q_density": 0.0, "grade": "�"} recent = msgs[-window:] n = len(recent) rate = 0.0 try: t0 = datetime.fromisoformat(recent[0]["time"]) t1 = datetime.fromisoformat(recent[-1]["time"]) elapsed = max((t1 - t0).total_seconds() / 60, 0.1) rate = round(n / elapsed, 1) except Exception: rate = float(n) pos_ratio = sum(1 for m in recent if m.get("sentiment") == "Positive") / max(n, 1) q_density = sum(1 for m in recent if m.get("topic") == "Question") / max(n, 1) rate_norm = min(rate / 60, 1.0) score = round((rate_norm * 0.4 + pos_ratio * 0.4 + q_density * 0.2) * 100) if score >= 70: grade = "\U0001f525 High" elif score >= 40: grade = "\u26a1 Medium" else: grade = "\U0001f4a4 Low" return {"score": score, "rate": rate, "pos_ratio": pos_ratio, "q_density": q_density, "grade": grade} @st.cache_data(ttl=10, show_spinner=False) def compute_top_contributors(all_data_json: str, top_n: int = 10) -> list[dict]: """Return top N authors by message count with sentiment + topic breakdown.""" import json as _j from collections import Counter msgs = _j.loads(all_data_json) if not msgs: return [] TOPICS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"] author_data: dict[str, dict] = {} for m in msgs: a = m.get("author", "Unknown") if a not in author_data: author_data[a] = { "count": 0, "Positive": 0, "Neutral": 0, "Negative": 0, **{t: 0 for t in TOPICS}, } author_data[a]["count"] += 1 s = m.get("sentiment", "Neutral") if s in ("Positive", "Neutral", "Negative"): author_data[a][s] += 1 t = m.get("topic", "General") if t not in TOPICS: t = "General" author_data[a][t] += 1 sorted_authors = sorted(author_data.items(), key=lambda x: x[1]["count"], reverse=True)[:top_n] result = [] for author, d in sorted_authors: total = max(d["count"], 1) result.append({ "author": author, "count": d["count"], "pos_pct": round(d["Positive"] / total * 100), "neu_pct": round(d["Neutral"] / total * 100), "neg_pct": round(d["Negative"] / total * 100), "t_appr": round(d["Appreciation"] / total * 100), "t_ques": round(d["Question"] / total * 100), "t_rf": round(d["Request/Feedback"] / total * 100), "t_promo": round(d["Promo"] / total * 100), "t_spam": round(d["Spam"] / total * 100), "t_gen": round(d["General"] / total * 100), "t_mcq": round(d["MCQ Answer"] / total * 100), }) return result @st.cache_data(ttl=10, show_spinner=False) def compute_word_freq(all_data_json: str, sentiment_filter: str = "All", topic_filter: str = "All", top_n: int = 60) -> list[tuple[str, int]]: """Return top N (word, count) pairs after filtering stopwords.""" import json as _j from collections import Counter STOPWORDS = { "the","a","an","is","it","in","on","at","to","of","and","or","but","for", "with","this","that","are","was","be","as","by","from","have","has","had", "not","no","so","if","do","did","will","can","just","i","you","he","she", "we","they","my","your","his","her","our","their","me","him","us","them", "what","how","why","when","where","who","which","there","here","been", "would","could","should","may","might","shall","than","then","now","also", "more","very","too","up","out","about","into","over","after","before", "yaar","bhi","hai","hain","ho","kar","ke","ki","ka","ko","se","ne","ye", "vo","woh","aur","nahi","nhi","toh","toh","koi","kuch","ab","ek","hi", } msgs = _j.loads(all_data_json) words: list[str] = [] for m in msgs: if sentiment_filter != "All" and m.get("sentiment") != sentiment_filter: continue if topic_filter != "All" and m.get("topic") != topic_filter: continue text = re.sub(r"[^\w\s]", " ", m.get("text", "").lower()) for w in text.split(): if len(w) > 2 and w not in STOPWORDS and not w.isdigit(): words.append(w) return Counter(words).most_common(top_n) def check_spam_alert(df_all: pd.DataFrame, threshold: float = 0.3, window: int = 20) -> dict | None: """Return alert if spam ratio in last `window` messages exceeds threshold.""" if "topic" not in df_all.columns or len(df_all) < window: return None recent = df_all.iloc[-window:] spam_ratio = (recent["topic"] == "Spam").mean() if spam_ratio >= threshold: return { "spam_ratio": spam_ratio, "count": int((recent["topic"] == "Spam").sum()), "window": window, } return None @st.cache_data(ttl=10, show_spinner=False) def detect_repeat_spammers(all_data_json: str, window_sec: int = 15, min_repeats: int = 2) -> list[dict]: """ Detect users who send the same (or near-identical) message multiple times within `window_sec` seconds. Returns list of spam burst dicts sorted by repeat count descending. Each dict: author, text, normalized_text, topic, sentiment, count, timestamps, first_seen """ import json as _j from collections import defaultdict msgs = _j.loads(all_data_json) if not msgs: return [] def _normalize(t: str) -> str: """Lowercase, strip punctuation/spaces for fuzzy matching.""" import re return re.sub(r"[^\w]", "", t.lower().strip()) # Group by (author, normalized_text) bursts: dict[tuple, dict] = {} for m in msgs: author = m.get("author", "Unknown") text = m.get("text", "").strip() if not text: continue norm = _normalize(text) if len(norm) < 4: # skip very short messages like "ok", "hi" continue ts_str = m.get("time", "") try: ts = datetime.fromisoformat(ts_str) except Exception: continue key = (author, norm) if key not in bursts: bursts[key] = { "author": author, "text": text, "topic": m.get("topic", "General"), "sentiment": m.get("sentiment", "Neutral"), "timestamps": [], } bursts[key]["timestamps"].append(ts) results = [] for key, burst in bursts.items(): times = sorted(burst["timestamps"]) # Sliding window: find max repeats within any window_sec period max_in_window = 1 for i in range(len(times)): count_in_window = sum( 1 for t in times[i:] if (t - times[i]).total_seconds() <= window_sec ) max_in_window = max(max_in_window, count_in_window) if max_in_window >= min_repeats: results.append({ "author": burst["author"], "text": burst["text"], "topic": burst["topic"], "sentiment": burst["sentiment"], "count": len(times), "max_burst": max_in_window, "first_seen": times[0].strftime("%H:%M:%S"), "last_seen": times[-1].strftime("%H:%M:%S"), }) return sorted(results, key=lambda x: x["max_burst"], reverse=True) # -- SESSION STATE INIT ---------------------------------------- MAX_STREAMS = 5 STREAM_COLORS = ["#7c3aed", "#10b981", "#f59e0b", "#3b82f6", "#ec4899"] STREAM_NAMES = ["A", "B", "C", "D", "E"] if "pinned_messages" not in st.session_state: st.session_state.pinned_messages = [] if "alert_dismissed" not in st.session_state: st.session_state.alert_dismissed = False if "last_alert_count" not in st.session_state: st.session_state.last_alert_count = 0 if "last_view" not in st.session_state: st.session_state.last_view = "?? Comments" # Multi-stream: list of dicts {video_id, redis_key, label, proc} # proc stores the Thread object (or None) for running-check compatibility if "streams" not in st.session_state: st.session_state.streams = [ {"video_id": VIDEO_ID, "redis_key": "chat_messages", "label": "Stream A", "proc": None} ] # -- SIDEBAR -------------------------------------------------- with st.sidebar: st.markdown( '
Display Settings
', unsafe_allow_html=True) refresh_rate = st.radio( "Refresh interval (s)", options=[10, 20, 30, 40, 50, 60], index=0, horizontal=True, key="refresh_rate", ) msg_limit = st.slider("Message window", 10, 400, 50, step=10, key="msg_limit") auto_refresh = st.toggle("Live auto-refresh", value=True, key="auto_refresh") st.divider() # -- Alert Settings -- st.markdown('Alert Settings
', unsafe_allow_html=True) alert_enabled = st.toggle("Negative spike alerts", value=True, key="alert_enabled") alert_threshold = st.slider("Neg alert threshold (%)", 20, 80, 40, key="alert_threshold_pct") / 100 alert_window = st.slider("Alert window (msgs)", 5, 30, 15, key="alert_window") spam_alert_on = st.toggle("Spam rate alerts", value=True, key="spam_alert_on") spam_threshold = st.slider("Spam alert threshold (%)", 10, 60, 30, key="spam_threshold_pct") / 100 st.divider() # -- API Key -- st.markdown('YouTube API Key
', unsafe_allow_html=True) _env_key = os.getenv("YOUTUBE_API_KEY", "") _api_key_input = st.text_input( "API Key", value=st.session_state.get("user_api_key", ""), type="password", placeholder="AIza... (paste your YouTube Data API v3 key)", key="api_key_input", help="Your YouTube Data API v3 key. Never shared or stored permanently.", ) # Store in session state whenever changed if _api_key_input: st.session_state["user_api_key"] = _api_key_input # Show status _effective_key = _api_key_input or _env_key if _effective_key: st.markdown( f'Stream Control
', unsafe_allow_html=True) for idx, stream in enumerate(st.session_state.streams): color = STREAM_COLORS[idx] label = STREAM_NAMES[idx] st.markdown( f'Pinned Messages
', unsafe_allow_html=True) pin_count = len(st.session_state.pinned_messages) st.markdown(f'Download Data
', unsafe_allow_html=True) _active_streams = [s for s in st.session_state.streams if s.get("redis_key")] if _active_streams: for _s in _active_streams: _rkey = _s["redis_key"] _slabel = _s["label"] _all_raws = store_lrange(_rkey, 0, -1) _dl_rows = [] for _raw in _all_raws: try: _dl_rows.append(json.loads(_raw)) except Exception: pass if _dl_rows: _dl_df = pd.DataFrame(_dl_rows) _ts = datetime.now().strftime("%Y%m%d_%H%M%S") _fname = f"livepulse_{_rkey}_{_ts}.csv" _csv_bytes = _dl_df.to_csv(index=False).encode("utf-8") st.download_button( label=f"\u2b07 {_slabel} ({len(_dl_rows)} msgs)", data=_csv_bytes, file_name=_fname, mime="text/csv", key=f"dl_{_rkey}", ) # PDF button removed — use the Export button on the Stats page instead else: st.markdown(f'Export
', unsafe_allow_html=True) st.markdown( 'Danger Zone
', unsafe_allow_html=True) if st.button("\U0001f5d1 Clear all data"): for s in st.session_state.streams: store_delete(s["redis_key"]) st.session_state.pinned_messages = [] st.session_state.alert_dismissed = False st.success("All stream data cleared.") st.divider() st.markdown( '