LivePulse / app.py
DivYonko
Add API key input field in sidebar - users can enter their own YouTube API key
6285ada
# -*- coding: utf-8 -*-
"""
app.py � Hugging Face Spaces adaptation of frontend/streamlit_app.py
Infrastructure: SQLite store + threading scraper (no Redis, no subprocess).
UI: identical to frontend/streamlit_app.py.
"""
import streamlit as st
import json
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
import time
import re
import os
import threading
import logging
import sqlite3
from collections import deque, defaultdict
from datetime import datetime, timedelta
# -- SQLite store (replaces in-memory deque) -----------------------------------
# Stored in /tmp so it persists for the lifetime of the container process
DB_PATH = "/tmp/livepulse.db"
MAX_STORE_MESSAGES = 100_000
_DB_LOCK = threading.Lock()
_META: dict[str, str] = {} # misc key-value (e.g. "video_title", "scraper_error")
# Scraper thread registry
_SCRAPER_THREADS: dict[str, threading.Thread] = {}
_SCRAPER_STOP: dict[str, threading.Event] = {}
def _get_db() -> sqlite3.Connection:
"""Return a thread-local SQLite connection."""
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
value TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_key ON messages(key)")
conn.commit()
return conn
# Initialize DB on import
_db_conn = _get_db()
def store_lrange(key: str, start: int, end: int) -> list[str]:
"""Emulate r.lrange(key, start, end) � returns rows in insertion order."""
with _DB_LOCK:
rows = _db_conn.execute(
"SELECT value FROM messages WHERE key=? ORDER BY id ASC", (key,)
).fetchall()
values = [r[0] for r in rows]
n = len(values)
if n == 0:
return []
if start < 0:
start = max(n + start, 0)
if end < 0:
end = n + end
end = min(end, n - 1)
if start > end:
return []
return values[start: end + 1]
def store_llen(key: str) -> int:
with _DB_LOCK:
row = _db_conn.execute(
"SELECT COUNT(*) FROM messages WHERE key=?", (key,)
).fetchone()
return row[0] if row else 0
def store_delete(key: str) -> None:
with _DB_LOCK:
_db_conn.execute("DELETE FROM messages WHERE key=?", (key,))
_db_conn.commit()
def store_rpush(key: str, value: str) -> None:
with _DB_LOCK:
_db_conn.execute(
"INSERT INTO messages (key, value) VALUES (?, ?)", (key, value)
)
# Trim to MAX_STORE_MESSAGES per key
_db_conn.execute("""
DELETE FROM messages WHERE key=? AND id NOT IN (
SELECT id FROM messages WHERE key=? ORDER BY id DESC LIMIT ?
)
""", (key, key, MAX_STORE_MESSAGES))
_db_conn.commit()
# -- Inline config (replaces backend/config.py) --------------------------------
VIDEO_ID = os.getenv("VIDEO_ID", "")
# -- ML imports (ml/ is at workspace root) ------------------------------------
from ml.sentiment_model import predict_sentiment
from ml.topic_model import predict_topic, VALID_TOPICS
from ml.action_type_model import predict_action_type, VALID_ACTION_TYPES
# -- Scraper thread logic (mirrors backend/scraper.py run()) ------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
force=True,
)
logger = logging.getLogger("app.scraper")
def _safe_sentiment(text: str):
try:
return predict_sentiment(text)
except Exception as exc:
logger.error("predict_sentiment failed: %s", exc)
return "Neutral", 0.50
def _safe_topic(text: str):
try:
topic, conf = predict_topic(text)
if topic not in VALID_TOPICS:
return "General", 0.50
return topic, conf
except Exception as exc:
logger.error("predict_topic failed: %s", exc)
return "General", 0.50
def _safe_action_type(text: str):
try:
action_type, conf = predict_action_type(text)
if action_type not in VALID_ACTION_TYPES:
return "N/A", 0.50
return action_type, conf
except Exception as exc:
logger.error("predict_action_type failed: %s", exc)
return "N/A", 0.50
def _get_live_chat_id(video_id: str, api_key: str) -> str | None:
"""Fetch the liveChatId for a given video using YouTube Data API v3."""
import urllib.request
import urllib.parse
import urllib.error
url = (
"https://www.googleapis.com/youtube/v3/videos"
f"?part=liveStreamingDetails&id={urllib.parse.quote(video_id)}&key={api_key}"
)
try:
with urllib.request.urlopen(url, timeout=10) as resp:
data = json.loads(resp.read())
logger.info("YouTube API response for %s: %s", video_id, json.dumps(data)[:500])
items = data.get("items", [])
if not items:
logger.error("No video found for id=%s (items empty). Check if video ID is correct and API key is valid.", video_id)
return None
live_details = items[0].get("liveStreamingDetails", {})
chat_id = live_details.get("activeLiveChatId")
if not chat_id:
logger.error("No activeLiveChatId for video=%s. liveStreamingDetails=%s", video_id, live_details)
return chat_id
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")[:500]
logger.error("HTTP %d from YouTube API for video=%s: %s", exc.code, video_id, body)
return None
except Exception as exc:
logger.error("Failed to get liveChatId: %s", exc)
return None
def _fetch_chat_messages(live_chat_id: str, api_key: str, page_token: str | None = None):
"""
Fetch one page of live chat messages.
Returns (messages_list, next_page_token, polling_interval_ms).
"""
import urllib.request
import urllib.parse
params = {
"part": "snippet,authorDetails",
"liveChatId": live_chat_id,
"key": api_key,
"maxResults": "200",
}
if page_token:
params["pageToken"] = page_token
url = "https://www.googleapis.com/youtube/v3/liveChat/messages?" + urllib.parse.urlencode(params)
try:
with urllib.request.urlopen(url, timeout=10) as resp:
data = json.loads(resp.read())
messages = data.get("items", [])
next_token = data.get("nextPageToken")
poll_interval = data.get("pollingIntervalMillis", 5000)
logger.info("Fetched %d chat messages (nextPageToken=%s)", len(messages), bool(next_token))
return messages, next_token, poll_interval
except Exception as exc:
logger.error("Failed to fetch chat messages: %s", exc)
return [], None, 5000
def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Event, min_poll_s: float = 10.0, api_key: str = "") -> None:
"""Background thread � scrapes live chat via YouTube Data API v3."""
# Use passed key first, fall back to environment variable
if not api_key:
api_key = os.getenv("YOUTUBE_API_KEY", "")
logger.info("YOUTUBE_API_KEY present: %s (length=%d)", bool(api_key), len(api_key))
if not api_key:
msg = "No API key provided. Enter your YouTube Data API v3 key in the sidebar."
logger.error(msg)
_META["scraper_error"] = msg
return
logger.info("Scraper thread starting � video=%s key=%s", video_id, redis_key)
_META.pop("scraper_error", None)
# Step 1: get the live chat ID
live_chat_id = _get_live_chat_id(video_id, api_key)
if not live_chat_id:
msg = f"No active live chat found for video '{video_id}'. Make sure the stream is currently LIVE."
logger.error(msg)
_META["scraper_error"] = msg
return
logger.info("Live chat ID obtained: %s", live_chat_id)
# Step 2: poll for messages
page_token = None
seen_ids: set = set() # avoid reprocessing messages on first page
is_first_page = True # skip ML on backlog to avoid startup delay
while not stop_event.is_set():
messages, page_token, poll_ms = _fetch_chat_messages(live_chat_id, api_key, page_token)
new_msgs = []
for item in messages:
if stop_event.is_set():
break
msg_id = item.get("id", "")
if msg_id in seen_ids:
continue
seen_ids.add(msg_id)
snippet = item.get("snippet", {})
if snippet.get("type") != "textMessageEvent":
continue
text = snippet.get("displayMessage", "").strip()
# Convert any :emoji_name: codes back to actual emoji characters
import emoji as _emoji
text = _emoji.emojize(text, language="alias")
author = item.get("authorDetails", {}).get("displayName", "Unknown")
if not text:
continue
new_msgs.append((msg_id, text, author))
# On the first page (backlog), store messages with placeholder sentiment
# so the UI shows something immediately, then process ML on subsequent pages
if is_first_page and new_msgs:
logger.info("First page: storing %d backlog messages with placeholder sentiment", len(new_msgs))
for _, text, author in new_msgs:
message_data = {
"author": author,
"text": text,
"sentiment": "Neutral",
"confidence": 0.5,
"topic": "General",
"topic_conf": 0.5,
"action_type": "N/A",
"action_type_conf": 0.5,
"time": datetime.now().isoformat(),
}
store_rpush(redis_key, json.dumps(message_data))
logger.info("Backlog stored: %d messages now in store", store_llen(redis_key))
is_first_page = False
else:
# Normal processing with full ML inference
for _, text, author in new_msgs:
if stop_event.is_set():
break
try:
sentiment, s_conf = _safe_sentiment(text)
topic, t_conf = _safe_topic(text)
# Only classify action type for Question/Request topics
if topic in ("Question", "Request/Feedback"):
action_type, at_conf = _safe_action_type(text)
else:
action_type, at_conf = "N/A", 0.50
except Exception as exc:
logger.error("ML inference failed for text=%r: %s", text[:50], exc)
sentiment, s_conf = "Neutral", 0.5
topic, t_conf = "General", 0.5
action_type, at_conf = "N/A", 0.5
message_data = {
"author": author,
"text": text,
"sentiment": sentiment,
"confidence": round(s_conf, 3),
"topic": topic,
"topic_conf": round(t_conf, 3),
"action_type": action_type,
"action_type_conf": round(at_conf, 3),
"time": datetime.now().isoformat(),
}
store_rpush(redis_key, json.dumps(message_data))
if new_msgs:
logger.info("Processed %d new messages, store size=%d", len(new_msgs), store_llen(redis_key))
# keep seen_ids from growing unbounded
if len(seen_ids) > 5000:
seen_ids = set(list(seen_ids)[-2000:])
# Respect YouTube's requested polling interval, but never faster than min_poll_s
wait_s = max(poll_ms / 1000, min_poll_s)
stop_event.wait(timeout=wait_s)
logger.info("Scraper thread ended � key=%s", redis_key)
def start_scraper(slot_idx: int, video_id: str, redis_key: str, min_poll_s: float = 10.0, api_key: str = "") -> None:
"""Start a scraper thread for the given slot, stopping any existing one first."""
key = str(slot_idx)
stop_scraper(slot_idx)
stop_event = threading.Event()
t = threading.Thread(
target=_scraper_thread_fn,
args=(video_id, redis_key, stop_event, min_poll_s, api_key),
daemon=True,
name=f"scraper-{slot_idx}",
)
_SCRAPER_STOP[key] = stop_event
_SCRAPER_THREADS[key] = t
t.start()
def stop_scraper(slot_idx: int) -> None:
"""Signal the scraper thread for this slot to stop."""
key = str(slot_idx)
ev = _SCRAPER_STOP.get(key)
if ev:
ev.set()
# Don't join � daemon thread will die on its own
def is_scraper_running(slot_idx: int) -> bool:
key = str(slot_idx)
t = _SCRAPER_THREADS.get(key)
return t is not None and t.is_alive()
# -- Streamlit page config -----------------------------------------------------
st.set_page_config(
page_title="LivePulse",
layout="wide",
page_icon="\U0001F4E1",
initial_sidebar_state="expanded"
)
TOPIC_LABELS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"]
TOPIC_COLOR = {
"Appreciation": "#f59e0b", "Question": "#3b82f6",
"Request/Feedback": "#8b5cf6",
"Promo": "#ec4899", "Spam": "#ef4444", "General": "#6b7280",
"MCQ Answer": "#10b981"
}
SENT_COLORS = {"Positive": "#22c55e", "Neutral": "#eab308", "Negative": "#ef4444"}
# -- JS: detect Streamlit's live theme and set data-livepulse attribute --
THEME_JS = """<script>
(function() {
function applyTheme() {
const html = window.parent.document.documentElement;
const style = window.parent.getComputedStyle(html);
const bg = style.getPropertyValue('--background-color').trim();
let isDark = true;
const m = bg.match(/rgb\((\d+),\s*(\d+),\s*(\d+)\)/);
if (m) { isDark = (0.299*m[1] + 0.587*m[2] + 0.114*m[3]) < 128; }
else {
const bodyBg = window.parent.getComputedStyle(window.parent.document.body).backgroundColor;
const m2 = bodyBg.match(/rgb\((\d+),\s*(\d+),\s*(\d+)\)/);
if (m2) { isDark = (0.299*m2[1] + 0.587*m2[2] + 0.114*m2[3]) < 128; }
}
html.setAttribute('data-livepulse', isDark ? 'dark' : 'light');
}
applyTheme();
const obs = new MutationObserver(applyTheme);
obs.observe(window.parent.document.documentElement, { attributes: true, attributeFilter: ['style','class'] });
obs.observe(window.parent.document.body, { attributes: true, attributeFilter: ['style','class'] });
})();
</script>"""
CSS = """<style>
@import url('https://fonts.googleapis.com/css2?family=Space+Grotesk:wght@400;500;600;700;800&display=swap');
:root, [data-livepulse="dark"] {
--bg:#07070f; --bg-card:#0f0f1e; --border:rgba(255,255,255,0.07);
--text-1:#f1f5f9; --text-2:#94a3b8; --text-3:#475569;
--accent:#7c3aed; --accent2:#4f46e5; --accent-text:#a78bfa;
--live:#22c55e; --input-bg:rgba(255,255,255,0.04); --input-border:rgba(255,255,255,0.1);
--divider:rgba(255,255,255,0.06); --badge-bg:rgba(255,255,255,0.05);
--shadow:0 4px 24px rgba(0,0,0,0.4); --shadow-sm:0 2px 8px rgba(0,0,0,0.3);
--pill-bg:rgba(124,58,237,0.15); --pill-border:rgba(124,58,237,0.3); --pill-text:#a78bfa;
--plotly-paper:rgba(0,0,0,0); --plotly-plot:rgba(255,255,255,0.015); --plotly-grid:rgba(255,255,255,0.05); --plotly-text:#94a3b8;
--alert-bg:rgba(239,68,68,0.1); --alert-border:rgba(239,68,68,0.3);
--pin-bg:rgba(234,179,8,0.1); --pin-border:rgba(234,179,8,0.35);
}
[data-livepulse="light"] {
--bg:#f4f6ff; --bg-card:#ffffff; --border:rgba(99,102,241,0.12);
--text-1:#0f172a; --text-2:#475569; --text-3:#94a3b8;
--accent:#6d28d9; --accent2:#4338ca; --accent-text:#6d28d9;
--live:#16a34a; --input-bg:#ffffff; --input-border:rgba(99,102,241,0.2);
--divider:rgba(99,102,241,0.1); --badge-bg:rgba(99,102,241,0.06);
--shadow:0 4px 24px rgba(99,102,241,0.12); --shadow-sm:0 2px 8px rgba(99,102,241,0.08);
--pill-bg:rgba(109,40,217,0.08); --pill-border:rgba(109,40,217,0.2); --pill-text:#6d28d9;
--plotly-paper:rgba(0,0,0,0); --plotly-plot:rgba(255,255,255,0.7); --plotly-grid:rgba(0,0,0,0.06); --plotly-text:#475569;
--alert-bg:rgba(239,68,68,0.07); --alert-border:rgba(239,68,68,0.25);
--pin-bg:rgba(234,179,8,0.08); --pin-border:rgba(234,179,8,0.3);
}
html,body,[data-testid="stAppViewContainer"],[data-testid="stMain"],.main .block-container {
background:var(--bg)!important; color:var(--text-1)!important;
font-family:'Space Grotesk',sans-serif!important; transition:background 0.3s,color 0.3s;
}
[data-testid="stSidebar"] { background:var(--bg-card)!important; border-right:1px solid var(--border)!important; transition:background 0.3s; }
[data-testid="stHeader"] { background:transparent!important; }
::-webkit-scrollbar{width:4px;} ::-webkit-scrollbar-track{background:var(--bg);}
::-webkit-scrollbar-thumb{background:linear-gradient(var(--accent),var(--accent2));border-radius:4px;}
[data-testid="metric-container"] {
background:var(--bg-card)!important; border:1px solid var(--border)!important;
border-radius:16px!important; padding:18px!important; box-shadow:var(--shadow-sm)!important; transition:background 0.3s;
}
[data-testid="stMetricLabel"]{color:var(--text-2)!important;font-size:0.8rem!important;}
[data-testid="stMetricValue"]{color:var(--text-1)!important;font-weight:700!important;}
[data-testid="stMetricDelta"]{color:var(--accent-text)!important;}
.stTextInput input { background:var(--input-bg)!important; border:1px solid var(--input-border)!important; border-radius:10px!important; color:var(--text-1)!important; }
.stTextInput input::placeholder { color:var(--text-3)!important; opacity:1!important; }
[data-testid="stSidebar"] .stTextInput input { background:#1a1a2e!important; border:1px solid rgba(124,58,237,0.4)!important; color:#f1f5f9!important; font-weight:500!important; }
[data-testid="stSidebar"] .stTextInput input::placeholder { color:#64748b!important; }
[data-testid="stSidebar"] .stTextInput input:focus { border-color:var(--accent)!important; box-shadow:0 0 0 2px rgba(124,58,237,0.2)!important; outline:none!important; }
[data-testid="stSidebar"] label { color:var(--text-2)!important; }
[data-baseweb="select"]>div { background:var(--input-bg)!important; border:1px solid var(--input-border)!important; border-radius:10px!important; color:var(--text-1)!important; }
.stButton>button { background:linear-gradient(135deg,var(--accent),var(--accent2))!important; color:#fff!important; border:none!important; border-radius:10px!important; font-weight:600!important; font-family:'Space Grotesk',sans-serif!important; box-shadow:0 4px 16px rgba(124,58,237,0.3)!important; transition:all 0.2s!important; }
.stButton>button:hover{transform:translateY(-2px)!important;}
hr{border:none!important;border-top:1px solid var(--divider)!important;margin:1.2rem 0!important;}
[data-testid="stSidebar"] label,[data-testid="stSidebar"] .stMarkdown p{color:var(--text-2)!important;font-size:0.83rem!important;}
[data-testid="stDownloadButton"]>button { background:var(--bg-card)!important; color:var(--text-2)!important; border:1px solid var(--border)!important; border-radius:8px!important; font-size:0.75rem!important; box-shadow:none!important; }
[data-testid="stDownloadButton"]>button:hover { background:var(--pill-bg)!important; color:var(--accent-text)!important; border-color:var(--pill-border)!important; }
[data-testid="stCheckbox"] label, [data-testid="stCheckbox"] span { color:var(--text-2)!important; font-size:0.82rem!important; }
[data-testid="stCheckbox"] [data-testid="stWidgetLabel"] { color:var(--text-2)!important; }
@keyframes pulse{0%{box-shadow:0 0 0 0 rgba(34,197,94,0.7);}70%{box-shadow:0 0 0 10px rgba(34,197,94,0);}100%{box-shadow:0 0 0 0 rgba(34,197,94,0);}}
.live-dot{display:inline-block;width:9px;height:9px;background:var(--live);border-radius:50%;animation:pulse 1.8s infinite;margin-right:6px;vertical-align:middle;}
@keyframes alertPulse{0%{opacity:1;}50%{opacity:0.7;}100%{opacity:1;}}
.alert-banner{background:var(--alert-bg);border:1px solid var(--alert-border);border-radius:14px;padding:14px 18px;margin:12px 0;display:flex;align-items:center;gap:12px;animation:alertPulse 2s infinite;}
.alert-icon{font-size:1.4rem;}
.alert-text{font-size:0.88rem;font-weight:600;color:#ef4444;}
.alert-sub{font-size:0.75rem;color:var(--text-3);margin-top:2px;}
.stat-grid{display:flex;gap:12px;margin:10px 0 18px;flex-wrap:wrap;}
.stat-card{flex:1;min-width:130px;background:var(--bg-card);border:1px solid var(--border);border-radius:20px;padding:22px 18px;text-align:center;transition:transform 0.2s,box-shadow 0.2s,background 0.3s;position:relative;overflow:hidden;box-shadow:var(--shadow-sm);}
.stat-card:hover{transform:translateY(-4px);box-shadow:var(--shadow);}
.stat-accent{position:absolute;top:0;left:0;right:0;height:3px;border-radius:20px 20px 0 0;}
.stat-number{font-size:2.6rem;font-weight:800;line-height:1;margin-bottom:6px;letter-spacing:-0.03em;}
.stat-label{font-size:0.82rem;color:var(--text-2);font-weight:600;text-transform:uppercase;letter-spacing:0.06em;}
.stat-sub{font-size:0.7rem;color:var(--text-3);margin-top:4px;}
.velocity-card{background:var(--bg-card);border:1px solid var(--border);border-radius:20px;padding:18px 22px;box-shadow:var(--shadow-sm);display:flex;align-items:center;gap:16px;}
.velocity-arrow{font-size:2rem;line-height:1;}
.velocity-val{font-size:1.6rem;font-weight:800;letter-spacing:-0.03em;}
.velocity-label{font-size:0.75rem;color:var(--text-3);font-weight:600;text-transform:uppercase;letter-spacing:0.06em;margin-top:2px;}
.sec-hdr{display:flex;align-items:center;gap:10px;margin:6px 0 14px;}
.sec-ttl{font-size:1rem;font-weight:700;color:var(--text-1);letter-spacing:-0.01em;}
.sec-pill{background:var(--pill-bg);border:1px solid var(--pill-border);border-radius:20px;padding:2px 10px;font-size:0.68rem;color:var(--pill-text);font-weight:700;text-transform:uppercase;letter-spacing:0.08em;}
.chart-wrap{background:var(--bg-card);border:1px solid var(--border);border-radius:20px;padding:14px 14px 6px;box-shadow:var(--shadow-sm);transition:background 0.3s,border 0.3s;}
.chart-title{font-size:0.88rem;font-weight:700;color:var(--text-1);margin-bottom:2px;}
.chart-sub{font-size:0.72rem;color:var(--text-3);margin-bottom:10px;}
.topic-grid{display:flex;gap:10px;flex-wrap:wrap;margin-bottom:18px;}
.topic-pill{background:var(--bg-card);border-radius:16px;padding:14px 20px;text-align:center;min-width:110px;box-shadow:var(--shadow-sm);transition:transform 0.2s,box-shadow 0.2s;}
.topic-pill:hover{transform:translateY(-3px);box-shadow:var(--shadow);}
.topic-count{font-size:1.4rem;font-weight:800;letter-spacing:-0.02em;}
.topic-name{font-size:0.7rem;color:var(--text-3);margin-top:3px;font-weight:600;text-transform:uppercase;letter-spacing:0.06em;}
@keyframes slideIn{from{opacity:0;transform:translateY(6px);}to{opacity:1;transform:translateY(0);}}
.chat-card{background:var(--bg-card);border:1px solid var(--border);border-radius:16px;padding:14px 16px;margin-bottom:10px;border-left:3px solid transparent;animation:slideIn 0.2s ease;transition:background 0.2s,transform 0.15s,box-shadow 0.2s;box-shadow:var(--shadow-sm);}
.chat-card:hover{transform:translateX(4px);box-shadow:var(--shadow);}
.chat-positive{border-left-color:#22c55e;} .chat-negative{border-left-color:#ef4444;} .chat-neutral{border-left-color:#eab308;}
.chat-pinned{border-left-color:#eab308!important;background:var(--pin-bg)!important;border-color:var(--pin-border)!important;}
.chat-author{font-weight:700;font-size:0.83rem;color:var(--accent-text);margin-bottom:5px;}
.chat-text{font-size:0.92rem;color:var(--text-2);line-height:1.55;margin-bottom:9px;}
.chat-badges{display:flex;gap:6px;flex-wrap:wrap;}
.badge{display:inline-flex;align-items:center;background:var(--badge-bg);border:1px solid var(--border);border-radius:20px;padding:3px 10px;font-size:0.7rem;font-weight:600;color:var(--text-2);}
.pin-badge{background:rgba(234,179,8,0.15);border-color:rgba(234,179,8,0.4);color:#eab308;}
.compare-label{font-size:0.72rem;font-weight:700;text-transform:uppercase;letter-spacing:0.08em;padding:3px 10px;border-radius:20px;display:inline-block;margin-bottom:8px;}
.engage-card{background:var(--bg-card);border:1px solid var(--border);border-radius:20px;padding:20px 24px;box-shadow:var(--shadow-sm);position:relative;overflow:hidden;}
.engage-score{font-size:3rem;font-weight:800;letter-spacing:-0.04em;line-height:1;}
.engage-label{font-size:0.75rem;color:var(--text-3);font-weight:600;text-transform:uppercase;letter-spacing:0.08em;margin-top:4px;}
.engage-bar-bg{background:var(--border);border-radius:99px;height:6px;margin-top:12px;overflow:hidden;}
.engage-bar-fill{height:6px;border-radius:99px;transition:width 0.6s ease;}
.engage-breakdown{display:flex;gap:16px;margin-top:10px;flex-wrap:wrap;}
.engage-item{font-size:0.72rem;color:var(--text-3);}
.engage-item span{font-weight:700;color:var(--text-2);}
.leaderboard-row{display:flex;align-items:center;gap:12px;padding:10px 14px;background:var(--bg-card);border:1px solid var(--border);border-radius:14px;margin-bottom:8px;transition:transform 0.15s,box-shadow 0.15s;}
.leaderboard-row:hover{transform:translateX(4px);box-shadow:var(--shadow);}
.lb-rank{font-size:1rem;font-weight:800;color:var(--text-3);min-width:28px;}
.lb-rank.gold{color:#f59e0b;} .lb-rank.silver{color:#94a3b8;} .lb-rank.bronze{color:#b45309;}
.lb-author{font-size:0.85rem;font-weight:700;color:var(--text-1);flex:1;overflow:hidden;text-overflow:ellipsis;white-space:nowrap;}
.lb-count{font-size:0.78rem;color:var(--text-3);min-width:40px;text-align:right;}
.lb-bar{flex:2;height:5px;background:var(--border);border-radius:99px;overflow:hidden;}
.lb-bar-fill{height:5px;border-radius:99px;}
.lb-sent{display:flex;gap:4px;min-width:80px;justify-content:flex-end;}
.lb-dot{width:8px;height:8px;border-radius:50%;display:inline-block;}
.spam-alert{background:rgba(239,68,68,0.08);border:1px solid rgba(239,68,68,0.25);border-radius:14px;padding:14px 18px;margin:12px 0;display:flex;align-items:center;gap:12px;}
.spam-alert-text{font-size:0.88rem;font-weight:600;color:#ef4444;}
.spam-alert-sub{font-size:0.75rem;color:var(--text-3);margin-top:2px;}
.empty-state{text-align:center;padding:80px 20px;background:var(--bg-card);border:1px solid var(--border);border-radius:24px;margin:40px 0;box-shadow:var(--shadow-sm);}
.empty-icon{font-size:3.5rem;margin-bottom:16px;}
.empty-title{font-size:1.1rem;color:var(--text-2);font-weight:700;}
.empty-sub{font-size:0.84rem;color:var(--text-3);margin-top:6px;}
[data-testid="stSidebar"] [role="radiogroup"] { display:flex; flex-direction:row; flex-wrap:nowrap; gap:4px; }
[data-testid="stSidebar"] [role="radiogroup"] label { flex:1; display:flex; align-items:center; justify-content:center; background:var(--bg-card); border:1px solid var(--pill-border); border-radius:8px; padding:6px 2px; cursor:pointer; transition:background 0.15s,border 0.15s; }
[data-testid="stSidebar"] [role="radiogroup"] label:hover { background:var(--pill-bg); border-color:var(--accent); }
[data-testid="stSidebar"] [role="radiogroup"] label[data-checked="true"],
[data-testid="stSidebar"] [role="radiogroup"] label:has(input:checked) { background:linear-gradient(135deg,var(--accent),var(--accent2)); border-color:var(--accent); }
[data-testid="stSidebar"] [role="radiogroup"] label p,
[data-testid="stSidebar"] [role="radiogroup"] label span { font-size:0.82rem !important; font-weight:700 !important; color:var(--text-1) !important; white-space:nowrap !important; }
[data-testid="stSidebar"] [role="radiogroup"] label:has(input:checked) p,
[data-testid="stSidebar"] [role="radiogroup"] label:has(input:checked) span { color:#fff !important; }
[data-testid="stSidebar"] [role="radiogroup"] input[type="radio"] { display:none !important; }
[data-testid="stSidebar"] [data-testid="stWidgetLabel"]:has(+ [role="radiogroup"]) { color:var(--text-2) !important; font-size:0.75rem !important; margin-bottom:4px; }
</style>"""
st.markdown(THEME_JS, unsafe_allow_html=True)
st.markdown(CSS, unsafe_allow_html=True)
# -- HELPERS --------------------------------------------------
def extract_video_id(url_or_id):
url_or_id = url_or_id.strip()
match = re.search(r"(?:v=|/live/|youtu\.be/)([A-Za-z0-9_-]{11})", url_or_id)
if match:
return match.group(1)
if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id):
return url_or_id
return url_or_id
def fetch_video_title(video_id):
"""Try oembed first (works for non-live), then YouTube Data API v3 (works for live)."""
import urllib.request
import urllib.parse
# Try oembed first (fast, no API key needed)
try:
url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json"
with urllib.request.urlopen(url, timeout=5) as resp:
title = json.loads(resp.read()).get("title")
if title:
return title
except Exception:
pass
# Fallback: YouTube Data API v3 (works for live streams)
try:
api_key = os.getenv("YOUTUBE_API_KEY", "")
if api_key:
url = (
"https://www.googleapis.com/youtube/v3/videos"
f"?part=snippet&id={urllib.parse.quote(video_id)}&key={api_key}"
)
with urllib.request.urlopen(url, timeout=5) as resp:
data = json.loads(resp.read())
items = data.get("items", [])
if items:
return items[0]["snippet"]["title"]
except Exception:
pass
return None
def clean_topic(val):
if pd.isna(val) or str(val).strip() == "" or str(val).strip().lower() == "nan":
return "General"
return str(val).strip()
def clean_sentiment(val):
if str(val).strip() in ("Positive", "Negative", "Neutral"):
return str(val).strip()
return "Neutral"
def plotly_layout(height=280):
return dict(
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)",
height=height,
margin=dict(l=10, r=10, t=10, b=10),
font=dict(family="Space Grotesk"),
xaxis=dict(showgrid=False, zeroline=False, showline=False,
tickfont=dict(size=11), title=None),
yaxis=dict(showgrid=True, gridcolor="rgba(128,128,128,0.12)",
zeroline=False, showline=False, tickfont=dict(size=11), title=None),
showlegend=False,
hoverlabel=dict(font_family="Space Grotesk", font_size=12),
)
def csv_download(df_export, label, filename):
csv = df_export.to_csv(index=False).encode("utf-8")
st.download_button(label=f"\u2b07 {label}", data=csv,
file_name=filename, mime="text/csv", key=filename)
def load_stream_data(redis_key: str, limit: int | None = None):
"""Load and parse messages from the in-memory store (no cache � store is in-memory)."""
if limit:
raws = store_lrange(redis_key, -limit, -1)
else:
raws = store_lrange(redis_key, 0, -1)
data = []
for raw in raws:
try:
data.append(json.loads(raw))
except Exception:
pass
return data
@st.cache_data(ttl=10, show_spinner=False)
def compute_velocity(df_all_json: str, window: int = 20) -> dict:
"""Compute sentiment velocity. Accepts JSON string for cache key compatibility."""
import json as _json
sentiments = [m.get("sentiment", "Neutral") for m in _json.loads(df_all_json)]
n = len(sentiments)
if n < window * 2:
return {"direction": "\u2192", "delta": 0.0, "label": "Stable", "color": "#eab308"}
recent = sentiments[-window:]
prev = sentiments[-window*2:-window]
r_pos = sum(1 for s in recent if s == "Positive") / window
p_pos = sum(1 for s in prev if s == "Positive") / window
delta = r_pos - p_pos
if delta > 0.08:
return {"direction": "\u2191", "delta": delta, "label": "Rising", "color": "#22c55e"}
elif delta < -0.08:
return {"direction": "\u2193", "delta": delta, "label": "Falling", "color": "#ef4444"}
return {"direction": "\u2192", "delta": delta, "label": "Stable", "color": "#eab308"}
@st.cache_data(ttl=10, show_spinner=False)
def build_heatmap_data(df_all_json: str, bucket_minutes: int = 1) -> pd.DataFrame:
"""Bucket messages into time intervals."""
import json as _json
records = _json.loads(df_all_json)
if not records:
return pd.DataFrame()
df_t = pd.DataFrame(records)
if "time" not in df_t.columns:
return pd.DataFrame()
df_t["time"] = pd.to_datetime(df_t["time"], errors="coerce")
df_t = df_t.dropna(subset=["time"])
if df_t.empty:
return pd.DataFrame()
df_t["bucket"] = df_t["time"].dt.floor(f"{bucket_minutes}min")
grouped = df_t.groupby(["bucket", "sentiment"]).size().unstack(fill_value=0)
for col in ["Positive", "Neutral", "Negative"]:
if col not in grouped.columns:
grouped[col] = 0
grouped = grouped.reset_index()
grouped.columns.name = None
return grouped[["bucket", "Positive", "Neutral", "Negative"]]
def check_alert(df_all: pd.DataFrame, threshold: float = 0.4, window: int = 15) -> dict | None:
"""Return alert info if negative ratio in last `window` messages exceeds threshold."""
if len(df_all) < window:
return None
recent = df_all.iloc[-window:]
neg_ratio = (recent["sentiment"] == "Negative").mean()
if neg_ratio >= threshold:
return {
"neg_ratio": neg_ratio,
"count": int((recent["sentiment"] == "Negative").sum()),
"window": window,
}
return None
@st.cache_data(ttl=10, show_spinner=False)
def compute_engagement(all_data_json: str, window: int = 50) -> dict:
"""Engagement score (0-100) = weighted combo of message rate, positive ratio, question density."""
import json as _j
msgs = _j.loads(all_data_json)
if not msgs:
return {"score": 0, "rate": 0.0, "pos_ratio": 0.0, "q_density": 0.0, "grade": "�"}
recent = msgs[-window:]
n = len(recent)
rate = 0.0
try:
t0 = datetime.fromisoformat(recent[0]["time"])
t1 = datetime.fromisoformat(recent[-1]["time"])
elapsed = max((t1 - t0).total_seconds() / 60, 0.1)
rate = round(n / elapsed, 1)
except Exception:
rate = float(n)
pos_ratio = sum(1 for m in recent if m.get("sentiment") == "Positive") / max(n, 1)
q_density = sum(1 for m in recent if m.get("topic") == "Question") / max(n, 1)
rate_norm = min(rate / 60, 1.0)
score = round((rate_norm * 0.4 + pos_ratio * 0.4 + q_density * 0.2) * 100)
if score >= 70: grade = "\U0001f525 High"
elif score >= 40: grade = "\u26a1 Medium"
else: grade = "\U0001f4a4 Low"
return {"score": score, "rate": rate, "pos_ratio": pos_ratio, "q_density": q_density, "grade": grade}
@st.cache_data(ttl=10, show_spinner=False)
def compute_top_contributors(all_data_json: str, top_n: int = 10) -> list[dict]:
"""Return top N authors by message count with sentiment + topic breakdown."""
import json as _j
from collections import Counter
msgs = _j.loads(all_data_json)
if not msgs:
return []
TOPICS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"]
author_data: dict[str, dict] = {}
for m in msgs:
a = m.get("author", "Unknown")
if a not in author_data:
author_data[a] = {
"count": 0,
"Positive": 0, "Neutral": 0, "Negative": 0,
**{t: 0 for t in TOPICS},
}
author_data[a]["count"] += 1
s = m.get("sentiment", "Neutral")
if s in ("Positive", "Neutral", "Negative"):
author_data[a][s] += 1
t = m.get("topic", "General")
if t not in TOPICS:
t = "General"
author_data[a][t] += 1
sorted_authors = sorted(author_data.items(), key=lambda x: x[1]["count"], reverse=True)[:top_n]
result = []
for author, d in sorted_authors:
total = max(d["count"], 1)
result.append({
"author": author,
"count": d["count"],
"pos_pct": round(d["Positive"] / total * 100),
"neu_pct": round(d["Neutral"] / total * 100),
"neg_pct": round(d["Negative"] / total * 100),
"t_appr": round(d["Appreciation"] / total * 100),
"t_ques": round(d["Question"] / total * 100),
"t_rf": round(d["Request/Feedback"] / total * 100),
"t_promo": round(d["Promo"] / total * 100),
"t_spam": round(d["Spam"] / total * 100),
"t_gen": round(d["General"] / total * 100),
"t_mcq": round(d["MCQ Answer"] / total * 100),
})
return result
@st.cache_data(ttl=10, show_spinner=False)
def compute_word_freq(all_data_json: str, sentiment_filter: str = "All",
topic_filter: str = "All", top_n: int = 60) -> list[tuple[str, int]]:
"""Return top N (word, count) pairs after filtering stopwords."""
import json as _j
from collections import Counter
STOPWORDS = {
"the","a","an","is","it","in","on","at","to","of","and","or","but","for",
"with","this","that","are","was","be","as","by","from","have","has","had",
"not","no","so","if","do","did","will","can","just","i","you","he","she",
"we","they","my","your","his","her","our","their","me","him","us","them",
"what","how","why","when","where","who","which","there","here","been",
"would","could","should","may","might","shall","than","then","now","also",
"more","very","too","up","out","about","into","over","after","before",
"yaar","bhi","hai","hain","ho","kar","ke","ki","ka","ko","se","ne","ye",
"vo","woh","aur","nahi","nhi","toh","toh","koi","kuch","ab","ek","hi",
}
msgs = _j.loads(all_data_json)
words: list[str] = []
for m in msgs:
if sentiment_filter != "All" and m.get("sentiment") != sentiment_filter:
continue
if topic_filter != "All" and m.get("topic") != topic_filter:
continue
text = re.sub(r"[^\w\s]", " ", m.get("text", "").lower())
for w in text.split():
if len(w) > 2 and w not in STOPWORDS and not w.isdigit():
words.append(w)
return Counter(words).most_common(top_n)
def check_spam_alert(df_all: pd.DataFrame, threshold: float = 0.3, window: int = 20) -> dict | None:
"""Return alert if spam ratio in last `window` messages exceeds threshold."""
if "topic" not in df_all.columns or len(df_all) < window:
return None
recent = df_all.iloc[-window:]
spam_ratio = (recent["topic"] == "Spam").mean()
if spam_ratio >= threshold:
return {
"spam_ratio": spam_ratio,
"count": int((recent["topic"] == "Spam").sum()),
"window": window,
}
return None
@st.cache_data(ttl=10, show_spinner=False)
def detect_repeat_spammers(all_data_json: str, window_sec: int = 15, min_repeats: int = 2) -> list[dict]:
"""
Detect users who send the same (or near-identical) message multiple times
within `window_sec` seconds. Returns list of spam burst dicts sorted by
repeat count descending.
Each dict: author, text, normalized_text, topic, sentiment, count, timestamps, first_seen
"""
import json as _j
from collections import defaultdict
msgs = _j.loads(all_data_json)
if not msgs:
return []
def _normalize(t: str) -> str:
"""Lowercase, strip punctuation/spaces for fuzzy matching."""
import re
return re.sub(r"[^\w]", "", t.lower().strip())
# Group by (author, normalized_text)
bursts: dict[tuple, dict] = {}
for m in msgs:
author = m.get("author", "Unknown")
text = m.get("text", "").strip()
if not text:
continue
norm = _normalize(text)
if len(norm) < 4: # skip very short messages like "ok", "hi"
continue
ts_str = m.get("time", "")
try:
ts = datetime.fromisoformat(ts_str)
except Exception:
continue
key = (author, norm)
if key not in bursts:
bursts[key] = {
"author": author,
"text": text,
"topic": m.get("topic", "General"),
"sentiment": m.get("sentiment", "Neutral"),
"timestamps": [],
}
bursts[key]["timestamps"].append(ts)
results = []
for key, burst in bursts.items():
times = sorted(burst["timestamps"])
# Sliding window: find max repeats within any window_sec period
max_in_window = 1
for i in range(len(times)):
count_in_window = sum(
1 for t in times[i:]
if (t - times[i]).total_seconds() <= window_sec
)
max_in_window = max(max_in_window, count_in_window)
if max_in_window >= min_repeats:
results.append({
"author": burst["author"],
"text": burst["text"],
"topic": burst["topic"],
"sentiment": burst["sentiment"],
"count": len(times),
"max_burst": max_in_window,
"first_seen": times[0].strftime("%H:%M:%S"),
"last_seen": times[-1].strftime("%H:%M:%S"),
})
return sorted(results, key=lambda x: x["max_burst"], reverse=True)
# -- SESSION STATE INIT ----------------------------------------
MAX_STREAMS = 5
STREAM_COLORS = ["#7c3aed", "#10b981", "#f59e0b", "#3b82f6", "#ec4899"]
STREAM_NAMES = ["A", "B", "C", "D", "E"]
if "pinned_messages" not in st.session_state:
st.session_state.pinned_messages = []
if "alert_dismissed" not in st.session_state:
st.session_state.alert_dismissed = False
if "last_alert_count" not in st.session_state:
st.session_state.last_alert_count = 0
if "last_view" not in st.session_state:
st.session_state.last_view = "?? Comments"
# Multi-stream: list of dicts {video_id, redis_key, label, proc}
# proc stores the Thread object (or None) for running-check compatibility
if "streams" not in st.session_state:
st.session_state.streams = [
{"video_id": VIDEO_ID, "redis_key": "chat_messages", "label": "Stream A", "proc": None}
]
# -- SIDEBAR --------------------------------------------------
with st.sidebar:
st.markdown(
'<div style="padding:12px 0 20px;">'
'<div style="font-size:1.35rem;font-weight:800;color:var(--text-1);letter-spacing:-0.02em;">\U0001F4E1 LivePulse</div>'
'<div style="font-size:0.75rem;color:var(--text-3);margin-top:2px;">YouTube Chat Analytics</div>'
'</div>', unsafe_allow_html=True
)
st.divider()
# -- Display Settings --
st.markdown('<p style="font-size:0.68rem;font-weight:700;color:var(--accent);text-transform:uppercase;letter-spacing:0.1em;margin-bottom:8px;">Display Settings</p>', unsafe_allow_html=True)
refresh_rate = st.radio(
"Refresh interval (s)",
options=[10, 20, 30, 40, 50, 60],
index=0,
horizontal=True,
key="refresh_rate",
)
msg_limit = st.slider("Message window", 10, 400, 50, step=10, key="msg_limit")
auto_refresh = st.toggle("Live auto-refresh", value=True, key="auto_refresh")
st.divider()
# -- Alert Settings --
st.markdown('<p style="font-size:0.68rem;font-weight:700;color:var(--accent);text-transform:uppercase;letter-spacing:0.1em;margin-bottom:8px;">Alert Settings</p>', unsafe_allow_html=True)
alert_enabled = st.toggle("Negative spike alerts", value=True, key="alert_enabled")
alert_threshold = st.slider("Neg alert threshold (%)", 20, 80, 40, key="alert_threshold_pct") / 100
alert_window = st.slider("Alert window (msgs)", 5, 30, 15, key="alert_window")
spam_alert_on = st.toggle("Spam rate alerts", value=True, key="spam_alert_on")
spam_threshold = st.slider("Spam alert threshold (%)", 10, 60, 30, key="spam_threshold_pct") / 100
st.divider()
# -- API Key --
st.markdown('<p style="font-size:0.68rem;font-weight:700;color:var(--accent);text-transform:uppercase;letter-spacing:0.1em;margin-bottom:8px;">YouTube API Key</p>', unsafe_allow_html=True)
_env_key = os.getenv("YOUTUBE_API_KEY", "")
_api_key_input = st.text_input(
"API Key",
value=st.session_state.get("user_api_key", ""),
type="password",
placeholder="AIza... (paste your YouTube Data API v3 key)",
key="api_key_input",
help="Your YouTube Data API v3 key. Never shared or stored permanently.",
)
# Store in session state whenever changed
if _api_key_input:
st.session_state["user_api_key"] = _api_key_input
# Show status
_effective_key = _api_key_input or _env_key
if _effective_key:
st.markdown(
f'<div style="font-size:0.7rem;color:#22c55e;margin-bottom:4px;">\u2713 API key set ({len(_effective_key)} chars)</div>',
unsafe_allow_html=True
)
else:
st.markdown(
'<div style="font-size:0.7rem;color:#ef4444;margin-bottom:4px;">\u26a0 No API key — scraper won\'t start</div>',
unsafe_allow_html=True
)
st.divider()
# -- Multi-Stream Scraper Control --
st.markdown('<p style="font-size:0.68rem;font-weight:700;color:var(--accent);text-transform:uppercase;letter-spacing:0.1em;margin-bottom:8px;">Stream Control</p>', unsafe_allow_html=True)
for idx, stream in enumerate(st.session_state.streams):
color = STREAM_COLORS[idx]
label = STREAM_NAMES[idx]
st.markdown(
f'<div style="font-size:0.72rem;font-weight:700;color:{color};text-transform:uppercase;'
f'letter-spacing:0.08em;margin:10px 0 4px;border-left:3px solid {color};padding-left:8px;">'
f'Stream {label}</div>',
unsafe_allow_html=True
)
vid_skey = f"vid_{idx}"
rkey_skey = f"rkey_{idx}"
if vid_skey not in st.session_state:
st.session_state[vid_skey] = stream["video_id"]
if rkey_skey not in st.session_state:
st.session_state[rkey_skey] = stream["redis_key"]
st.text_input("Video ID / URL", placeholder="e.g. eFSK2-QRB0A", key=vid_skey)
st.text_input("Store key", placeholder=f"chat_messages_{label.lower()}", key=rkey_skey)
sc1, sc2 = st.columns(2)
with sc1:
if st.button("\u25b6 Start", key=f"start_{idx}"):
vid = extract_video_id(st.session_state[vid_skey])
rkey = st.session_state[rkey_skey].strip() or f"chat_messages_{label.lower()}"
if vid:
start_scraper(idx, vid, rkey, min_poll_s=float(st.session_state.get("refresh_rate", 10)), api_key=st.session_state.get("user_api_key", "") or os.getenv("YOUTUBE_API_KEY", ""))
st.session_state.streams[idx]["proc"] = _SCRAPER_THREADS.get(str(idx))
st.session_state.streams[idx]["video_id"] = vid
st.session_state.streams[idx]["redis_key"] = rkey
# Fetch and store title for ALL streams (used in header pills)
_title = fetch_video_title(vid)
st.session_state.streams[idx]["video_title"] = _title or vid
if idx == 0:
if _title:
_META["video_title"] = _title
else:
_META.pop("video_title", None)
st.session_state.alert_dismissed = False
st.success(f"Stream {label} started -> `{rkey}`")
else:
st.error("Invalid video ID or URL")
with sc2:
if st.button("\u23f9 Stop", key=f"stop_{idx}"):
if is_scraper_running(idx):
stop_scraper(idx)
st.session_state.streams[idx]["proc"] = None
st.success(f"Stream {label} stopped")
else:
st.warning("Not running")
running = is_scraper_running(idx)
dot_color = "#22c55e" if running else "#ef4444"
status = "running" if running else "stopped"
st.markdown(f'<div style="font-size:0.72rem;color:{dot_color};margin-bottom:4px;">\u25cf {status}</div>', unsafe_allow_html=True)
# Show scraper error if any (only for stream A)
if idx == 0 and _META.get("scraper_error"):
st.error(_META["scraper_error"])
st.divider()
# -- Add / Remove stream slots --
add_col, rem_col = st.columns(2)
with add_col:
if len(st.session_state.streams) < MAX_STREAMS:
if st.button("+ Add stream"):
n = len(st.session_state.streams)
st.session_state.streams.append({
"video_id": "",
"redis_key": f"chat_messages_{STREAM_NAMES[n].lower()}",
"label": f"Stream {STREAM_NAMES[n]}",
"proc": None,
})
st.rerun()
with rem_col:
if len(st.session_state.streams) > 1:
if st.button("- Remove last"):
removed = st.session_state.streams.pop()
removed_idx = len(st.session_state.streams)
stop_scraper(removed_idx)
st.rerun()
st.divider()
# -- Pinned Messages --
st.markdown('<p style="font-size:0.68rem;font-weight:700;color:var(--accent);text-transform:uppercase;letter-spacing:0.1em;margin-bottom:8px;">Pinned Messages</p>', unsafe_allow_html=True)
pin_count = len(st.session_state.pinned_messages)
st.markdown(f'<div style="font-size:0.78rem;color:var(--text-3);">{pin_count} message{"s" if pin_count != 1 else ""} pinned</div>', unsafe_allow_html=True)
if pin_count > 0 and st.button("\U0001f5d1 Clear pins"):
st.session_state.pinned_messages = []
st.rerun()
st.divider()
# -- Download Data --
st.markdown('<p style="font-size:0.68rem;font-weight:700;color:var(--accent);text-transform:uppercase;letter-spacing:0.1em;margin-bottom:8px;">Download Data</p>', unsafe_allow_html=True)
_active_streams = [s for s in st.session_state.streams if s.get("redis_key")]
if _active_streams:
for _s in _active_streams:
_rkey = _s["redis_key"]
_slabel = _s["label"]
_all_raws = store_lrange(_rkey, 0, -1)
_dl_rows = []
for _raw in _all_raws:
try:
_dl_rows.append(json.loads(_raw))
except Exception:
pass
if _dl_rows:
_dl_df = pd.DataFrame(_dl_rows)
_ts = datetime.now().strftime("%Y%m%d_%H%M%S")
_fname = f"livepulse_{_rkey}_{_ts}.csv"
_csv_bytes = _dl_df.to_csv(index=False).encode("utf-8")
st.download_button(
label=f"\u2b07 {_slabel} ({len(_dl_rows)} msgs)",
data=_csv_bytes,
file_name=_fname,
mime="text/csv",
key=f"dl_{_rkey}",
)
# PDF button removed — use the Export button on the Stats page instead
else:
st.markdown(f'<div style="font-size:0.72rem;color:var(--text-3);">{_slabel}: no data yet</div>', unsafe_allow_html=True)
else:
st.markdown('<div style="font-size:0.72rem;color:var(--text-3);">No active streams</div>', unsafe_allow_html=True)
st.divider()
# -- Export --
st.markdown('<p style="font-size:0.68rem;font-weight:700;color:var(--accent);text-transform:uppercase;letter-spacing:0.1em;margin-bottom:8px;">Export</p>', unsafe_allow_html=True)
st.markdown(
'<div style="font-size:0.7rem;color:var(--text-3);margin-bottom:6px;">'
'\u26a0\ufe0f Go to <b style="color:var(--accent-text);">Stats &amp; Info</b> tab first, then click.</div>',
unsafe_allow_html=True
)
import streamlit.components.v1 as _comp
_comp.html("""
<div style="padding:2px 0;">
<button id="sidebarScreenshotBtn" style="
width:100%; background:linear-gradient(135deg,#7c3aed,#4f46e5);
color:#fff; border:none; border-radius:10px; padding:8px 12px;
font-size:13px; font-weight:600; cursor:pointer;
box-shadow:0 4px 16px rgba(124,58,237,0.3); transition:transform 0.2s;"
onmouseover="this.style.transform='translateY(-2px)'"
onmouseout="this.style.transform='translateY(0)'"
onclick="sidebarCapture()">
&#128247; Download Stats as PDF
</button>
<div id="sidebarMsg" style="margin-top:6px;font-size:11px;color:#94a3b8;text-align:center;"></div>
</div>
<script>
async function sidebarCapture() {
const btn = document.getElementById('sidebarScreenshotBtn');
const msg = document.getElementById('sidebarMsg');
btn.disabled = true; btn.textContent = 'Capturing...';
msg.textContent = 'Please wait...';
try {
const target = window.parent.document.querySelector('[data-testid="stMain"]')
|| window.parent.document.querySelector('.main')
|| window.parent.document.body;
const canvas = await window.parent.html2canvas(target, {
scale:1.5, useCORS:true, allowTaint:true,
backgroundColor:'#07070f', logging:false,
windowWidth:target.scrollWidth, windowHeight:target.scrollHeight,
scrollX:0, scrollY:0,
});
const imgData = canvas.toDataURL('image/png', 0.95);
const { jsPDF } = window.parent.jspdf;
const pdf = new jsPDF({
orientation: canvas.width > canvas.height ? 'l' : 'p',
unit:'px', format:[canvas.width, canvas.height], compress:true,
});
pdf.addImage(imgData, 'PNG', 0, 0, canvas.width, canvas.height);
const ts = new Date().toISOString().slice(0,16).replace('T','_').replace(':','-');
pdf.save('livepulse_stats_' + ts + '.pdf');
btn.textContent = 'Download Stats as PDF'; btn.disabled = false;
msg.textContent = 'Done!';
setTimeout(() => { msg.textContent = ''; }, 3000);
} catch(e) {
btn.textContent = 'Download Stats as PDF'; btn.disabled = false;
msg.textContent = 'Error: ' + e.message;
}
}
function loadScript(src, name) {
return new Promise(r => {
if (window.parent[name]) { r(); return; }
const s = window.parent.document.createElement('script');
s.src = src; s.onload = r;
window.parent.document.head.appendChild(s);
});
}
(async () => {
await loadScript('https://cdnjs.cloudflare.com/ajax/libs/html2canvas/1.4.1/html2canvas.min.js','html2canvas');
await loadScript('https://cdnjs.cloudflare.com/ajax/libs/jspdf/2.5.1/jspdf.umd.min.js','jspdf');
})();
</script>
""", height=75)
st.divider()
# -- Danger Zone --
st.markdown('<p style="font-size:0.68rem;font-weight:700;color:#ef4444;text-transform:uppercase;letter-spacing:0.1em;margin-bottom:8px;">Danger Zone</p>', unsafe_allow_html=True)
if st.button("\U0001f5d1 Clear all data"):
for s in st.session_state.streams:
store_delete(s["redis_key"])
st.session_state.pinned_messages = []
st.session_state.alert_dismissed = False
st.success("All stream data cleared.")
st.divider()
st.markdown(
'<div style="font-size:0.72rem;color:var(--text-3);text-align:center;line-height:1.6;">'
'Theme follows Streamlit settings<br>'
'<span style="font-size:0.65rem;">\u2630 \u2192 Settings \u2192 Theme</span>'
'</div>', unsafe_allow_html=True
)
# -- PAGE HEADER -----------------------------------------------
_video_title = _META.get("video_title")
# Build subtitle showing ALL active stream titles
_all_titles = []
for _si, _ss in enumerate(st.session_state.streams):
_st = _ss.get("video_title") or _ss.get("video_id")
_sk = _ss.get("redis_key", "")
if _st and (store_llen(_sk) > 0 or is_scraper_running(_si)):
_all_titles.append(f"\u25b6 {_st}")
if _all_titles:
_subtitle = " \u00b7 ".join(_all_titles)
else:
_subtitle = "Real-time sentiment \u00b7 topic classification \u00b7 engagement insights"
# Build active stream pills for header
_active_stream_pills = ""
for _hi, _hs in enumerate(st.session_state.streams):
_hkey = _hs.get("redis_key", "")
if store_llen(_hkey) > 0 or is_scraper_running(_hi):
_hcolor = STREAM_COLORS[_hi]
_hlabel = STREAM_NAMES[_hi]
_htitle = (
_hs.get("video_title")
or _hs.get("video_id")
or _hkey
or f"Stream {_hlabel}"
)
_hrunning = is_scraper_running(_hi)
_hdot = f'<span style="display:inline-block;width:7px;height:7px;background:{"#22c55e" if _hrunning else "#ef4444"};border-radius:50%;margin-right:5px;vertical-align:middle;"></span>'
_active_stream_pills += (
f'<span style="display:inline-flex;align-items:center;background:{_hcolor}18;'
f'border:1px solid {_hcolor}44;border-radius:20px;padding:3px 12px;'
f'font-size:0.75rem;font-weight:700;color:{_hcolor};margin-right:8px;">'
f'{_hdot}Stream {_hlabel}{str(_htitle)[:22]}</span>'
)
col_title, col_live = st.columns([7, 1])
with col_title:
st.markdown(
'<div style="padding:8px 0 4px;">'
'<div style="font-size:2rem;font-weight:800;color:var(--text-1);letter-spacing:-0.04em;">YouTube Live Chat Analytics</div>'
f'<div style="font-size:1.25rem;color:var(--accent-text);font-weight:600;margin-top:6px;">{_subtitle}</div>'
+ (f'<div style="margin-top:10px;">{_active_stream_pills}</div>' if _active_stream_pills else '') +
'</div>', unsafe_allow_html=True
)
with col_live:
st.markdown(
'<div style="text-align:right;padding-top:22px;">'
'<span class="live-dot"></span>'
'<span style="font-size:0.78rem;color:var(--live);font-weight:700;letter-spacing:0.05em;">LIVE</span>'
'</div>', unsafe_allow_html=True
)
st.divider()
# -- PRIMARY STREAM SELECTOR -----------------------------------
_streams_with_data = [
s for s in st.session_state.streams
if store_llen(s.get("redis_key", "")) > 0 or is_scraper_running(st.session_state.streams.index(s))
]
if len(_streams_with_data) > 1:
_ps_options = {}
for _psi, _pss in enumerate(_streams_with_data):
_psi_real = st.session_state.streams.index(_pss)
_pst = _pss.get("video_title") or _pss.get("video_id") or _pss.get("redis_key")
_psl = f"Stream {STREAM_NAMES[_psi_real]}{str(_pst)[:35]}"
_ps_options[_psl] = _pss["redis_key"]
_ps_col, _ = st.columns([2, 3])
with _ps_col:
_selected_primary_label = st.selectbox(
"?? Dashboard data source",
list(_ps_options.keys()),
key="primary_stream_select",
help="Switch which stream's data powers the main dashboard stats and charts"
)
_primary_key = _ps_options[_selected_primary_label]
else:
_primary_key = st.session_state.streams[0]["redis_key"]
# -- DATA LOAD -----------------------------------------------
# Store computed values that pages need but can't read directly from widget keys
st.session_state["_primary_key"] = _primary_key
st.session_state["alert_threshold"] = alert_threshold # computed: slider_pct / 100
st.session_state["spam_threshold"] = spam_threshold # computed: slider_pct / 100
# -- MULTI-PAGE NAVIGATION ------------------------------------
comments_page = st.Page("pages/comments.py", title="\U0001f4ac Comments", icon="\U0001f4ac", default=True)
stats_page = st.Page("pages/stats.py", title="\U0001f4ca Stats & Info", icon="\U0001f4ca")
pg = st.navigation([comments_page, stats_page], position="sidebar")
pg.run()