# shared.py # Pure infrastructure, helpers, and analytics functions. # No Streamlit UI rendering — safe to import from any page without # triggering widget re-execution. from __future__ import annotations import json import logging import os import re import sqlite3 import threading from collections import defaultdict from datetime import datetime import pandas as pd import streamlit as st # ── ML imports ──────────────────────────────────────────────────────────────── from ml.sentiment_model import predict_sentiment from ml.topic_model import predict_topic, VALID_TOPICS from ml.action_type_model import predict_action_type, VALID_ACTION_TYPES # ── SQLite store ────────────────────────────────────────────────────────────── DB_PATH = "/tmp/livepulse.db" MAX_STORE_MESSAGES = 100_000 _DB_LOCK = threading.Lock() _META: dict[str, str] = {} _SCRAPER_THREADS: dict[str, threading.Thread] = {} _SCRAPER_STOP: dict[str, threading.Event] = {} def _get_db() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.execute(""" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT NOT NULL, value TEXT NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_key ON messages(key)") conn.commit() return conn _db_conn = _get_db() def store_lrange(key: str, start: int, end: int) -> list[str]: with _DB_LOCK: rows = _db_conn.execute( "SELECT value FROM messages WHERE key=? ORDER BY id ASC", (key,) ).fetchall() values = [r[0] for r in rows] n = len(values) if n == 0: return [] if start < 0: start = max(n + start, 0) if end < 0: end = n + end end = min(end, n - 1) if start > end: return [] return values[start: end + 1] def store_llen(key: str) -> int: with _DB_LOCK: row = _db_conn.execute( "SELECT COUNT(*) FROM messages WHERE key=?", (key,) ).fetchone() return row[0] if row else 0 def store_delete(key: str) -> None: with _DB_LOCK: _db_conn.execute("DELETE FROM messages WHERE key=?", (key,)) _db_conn.commit() def store_rpush(key: str, value: str) -> None: with _DB_LOCK: _db_conn.execute( "INSERT INTO messages (key, value) VALUES (?, ?)", (key, value) ) _db_conn.execute(""" DELETE FROM messages WHERE key=? AND id NOT IN ( SELECT id FROM messages WHERE key=? ORDER BY id DESC LIMIT ? ) """, (key, key, MAX_STORE_MESSAGES)) _db_conn.commit() # ── Config ──────────────────────────────────────────────────────────────────── VIDEO_ID = os.getenv("VIDEO_ID", "") # ── Logging ─────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", force=True, ) logger = logging.getLogger("app.scraper") # ── Constants ───────────────────────────────────────────────────────────────── MAX_STREAMS = 5 STREAM_COLORS = ["#7c3aed", "#10b981", "#f59e0b", "#3b82f6", "#ec4899"] STREAM_NAMES = ["A", "B", "C", "D", "E"] TOPIC_LABELS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"] TOPIC_COLOR = { "Appreciation": "#f59e0b", "Question": "#3b82f6", "Request/Feedback": "#8b5cf6", "Promo": "#ec4899", "Spam": "#ef4444", "General": "#6b7280", "MCQ Answer": "#10b981", } SENT_COLORS = {"Positive": "#22c55e", "Neutral": "#eab308", "Negative": "#ef4444"} # ── Scraper helpers ─────────────────────────────────────────────────────────── def _safe_sentiment(text: str): try: return predict_sentiment(text) except Exception as exc: logger.error("predict_sentiment failed: %s", exc) return "Neutral", 0.50 def _safe_topic(text: str): try: topic, conf = predict_topic(text) if topic not in VALID_TOPICS: return "General", 0.50 return topic, conf except Exception as exc: logger.error("predict_topic failed: %s", exc) return "General", 0.50 def _safe_action_type(text: str): try: action_type, conf = predict_action_type(text) if action_type not in VALID_ACTION_TYPES: return "N/A", 0.50 return action_type, conf except Exception as exc: logger.error("predict_action_type failed: %s", exc) return "N/A", 0.50 def _get_live_chat_id(video_id: str, api_key: str) -> str | None: import urllib.request import urllib.parse import urllib.error url = ( "https://www.googleapis.com/youtube/v3/videos" f"?part=liveStreamingDetails&id={urllib.parse.quote(video_id)}&key={api_key}" ) try: with urllib.request.urlopen(url, timeout=10) as resp: data = json.loads(resp.read()) items = data.get("items", []) if not items: logger.error("No video found for id=%s", video_id) return None live_details = items[0].get("liveStreamingDetails", {}) chat_id = live_details.get("activeLiveChatId") if not chat_id: logger.error("No activeLiveChatId for video=%s", video_id) return chat_id except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace")[:500] logger.error("HTTP %d from YouTube API for video=%s: %s", exc.code, video_id, body) return None except Exception as exc: logger.error("Failed to get liveChatId: %s", exc) return None def _fetch_chat_messages(live_chat_id: str, api_key: str, page_token: str | None = None): import urllib.request import urllib.parse params = { "part": "snippet,authorDetails", "liveChatId": live_chat_id, "key": api_key, "maxResults": "200", } if page_token: params["pageToken"] = page_token url = "https://www.googleapis.com/youtube/v3/liveChat/messages?" + urllib.parse.urlencode(params) try: with urllib.request.urlopen(url, timeout=10) as resp: data = json.loads(resp.read()) messages = data.get("items", []) next_token = data.get("nextPageToken") poll_interval = data.get("pollingIntervalMillis", 5000) return messages, next_token, poll_interval except Exception as exc: logger.error("Failed to fetch chat messages: %s", exc) return [], None, 5000 def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Event, min_poll_s: float = 10.0, api_key: str = "") -> None: if not api_key: api_key = os.getenv("YOUTUBE_API_KEY", "") if not api_key: msg = "No API key provided. Enter your YouTube Data API v3 key in the sidebar." logger.error(msg) _META["scraper_error"] = msg return _META.pop("scraper_error", None) live_chat_id = _get_live_chat_id(video_id, api_key) if not live_chat_id: msg = f"No active live chat found for video '{video_id}'. Make sure the stream is currently LIVE." logger.error(msg) _META["scraper_error"] = msg return page_token = None seen_ids: set = set() is_first_page = True while not stop_event.is_set(): messages, page_token, poll_ms = _fetch_chat_messages(live_chat_id, api_key, page_token) new_msgs = [] for item in messages: if stop_event.is_set(): break msg_id = item.get("id", "") if msg_id in seen_ids: continue seen_ids.add(msg_id) snippet = item.get("snippet", {}) if snippet.get("type") != "textMessageEvent": continue text = snippet.get("displayMessage", "").strip() import emoji as _emoji text = _emoji.emojize(text, language="alias") author = item.get("authorDetails", {}).get("displayName", "Unknown") if not text: continue new_msgs.append((msg_id, text, author)) if is_first_page and new_msgs: for _, text, author in new_msgs: message_data = { "author": author, "text": text, "sentiment": "Neutral", "confidence": 0.5, "topic": "General", "topic_conf": 0.5, "action_type": "N/A", "action_type_conf": 0.5, "time": datetime.now().isoformat(), } store_rpush(redis_key, json.dumps(message_data)) is_first_page = False else: for _, text, author in new_msgs: if stop_event.is_set(): break try: sentiment, s_conf = _safe_sentiment(text) topic, t_conf = _safe_topic(text) # Only classify action type for Question/Request topics if topic in ("Question", "Request/Feedback"): action_type, at_conf = _safe_action_type(text) else: action_type, at_conf = "N/A", 0.50 except Exception as exc: logger.error("ML inference failed: %s", exc) sentiment, s_conf = "Neutral", 0.5 topic, t_conf = "General", 0.5 action_type, at_conf = "N/A", 0.5 message_data = { "author": author, "text": text, "sentiment": sentiment, "confidence": round(s_conf, 3), "topic": topic, "topic_conf": round(t_conf, 3), "action_type": action_type, "action_type_conf": round(at_conf, 3), "time": datetime.now().isoformat(), } store_rpush(redis_key, json.dumps(message_data)) if len(seen_ids) > 5000: seen_ids = set(list(seen_ids)[-2000:]) # Respect YouTube's requested polling interval, but never faster than min_poll_s wait_s = max(poll_ms / 1000, min_poll_s) stop_event.wait(timeout=wait_s) def start_scraper(slot_idx: int, video_id: str, redis_key: str, min_poll_s: float = 10.0, api_key: str = "") -> None: key = str(slot_idx) stop_scraper(slot_idx) stop_event = threading.Event() t = threading.Thread( target=_scraper_thread_fn, args=(video_id, redis_key, stop_event, min_poll_s, api_key), daemon=True, name=f"scraper-{slot_idx}", ) _SCRAPER_STOP[key] = stop_event _SCRAPER_THREADS[key] = t t.start() def stop_scraper(slot_idx: int) -> None: key = str(slot_idx) ev = _SCRAPER_STOP.get(key) if ev: ev.set() def is_scraper_running(slot_idx: int) -> bool: key = str(slot_idx) t = _SCRAPER_THREADS.get(key) return t is not None and t.is_alive() # ── UI helpers ──────────────────────────────────────────────────────────────── def extract_video_id(url_or_id: str) -> str: url_or_id = url_or_id.strip() match = re.search(r"(?:v=|/live/|youtu\.be/)([A-Za-z0-9_-]{11})", url_or_id) if match: return match.group(1) if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id): return url_or_id return url_or_id def fetch_video_title(video_id: str) -> str | None: """Try oembed first (works for non-live), then YouTube Data API v3 (works for live).""" import urllib.request import urllib.parse try: url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json" with urllib.request.urlopen(url, timeout=5) as resp: title = json.loads(resp.read()).get("title") if title: return title except Exception: pass try: api_key = os.getenv("YOUTUBE_API_KEY", "") if api_key: url = ( "https://www.googleapis.com/youtube/v3/videos" f"?part=snippet&id={urllib.parse.quote(video_id)}&key={api_key}" ) with urllib.request.urlopen(url, timeout=5) as resp: data = json.loads(resp.read()) items = data.get("items", []) if items: return items[0]["snippet"]["title"] except Exception: pass return None def clean_topic(val) -> str: if pd.isna(val) or str(val).strip() == "" or str(val).strip().lower() == "nan": return "General" return str(val).strip() def clean_sentiment(val) -> str: if str(val).strip() in ("Positive", "Negative", "Neutral"): return str(val).strip() return "Neutral" def plotly_layout(height: int = 280) -> dict: return dict( paper_bgcolor="rgba(0,0,0,0)", plot_bgcolor="rgba(0,0,0,0)", height=height, margin=dict(l=10, r=10, t=10, b=10), font=dict(family="Space Grotesk"), xaxis=dict(showgrid=False, zeroline=False, showline=False, tickfont=dict(size=11), title=None), yaxis=dict(showgrid=True, gridcolor="rgba(128,128,128,0.12)", zeroline=False, showline=False, tickfont=dict(size=11), title=None), showlegend=False, hoverlabel=dict(font_family="Space Grotesk", font_size=12), ) def csv_download(df_export, label: str, filename: str) -> None: csv = df_export.to_csv(index=False).encode("utf-8") st.download_button(label=f"\u2b07 {label}", data=csv, file_name=filename, mime="text/csv", key=filename) def load_stream_data(redis_key: str, limit: int | None = None) -> list[dict]: if limit: raws = store_lrange(redis_key, -limit, -1) else: raws = store_lrange(redis_key, 0, -1) data = [] for raw in raws: try: data.append(json.loads(raw)) except Exception: pass return data # ── Analytics (cached) ──────────────────────────────────────────────────────── @st.cache_data(ttl=10, show_spinner=False) def compute_velocity(df_all_json: str, window: int = 20) -> dict: import json as _json sentiments = [m.get("sentiment", "Neutral") for m in _json.loads(df_all_json)] n = len(sentiments) if n < window * 2: return {"direction": "\u2192", "delta": 0.0, "label": "Stable", "color": "#eab308"} recent = sentiments[-window:] prev = sentiments[-window*2:-window] r_pos = sum(1 for s in recent if s == "Positive") / window p_pos = sum(1 for s in prev if s == "Positive") / window delta = r_pos - p_pos if delta > 0.08: return {"direction": "\u2191", "delta": delta, "label": "Rising", "color": "#22c55e"} elif delta < -0.08: return {"direction": "\u2193", "delta": delta, "label": "Falling", "color": "#ef4444"} return {"direction": "\u2192", "delta": delta, "label": "Stable", "color": "#eab308"} @st.cache_data(ttl=10, show_spinner=False) def build_heatmap_data(df_all_json: str, bucket_minutes: int = 1) -> pd.DataFrame: import json as _json records = _json.loads(df_all_json) if not records: return pd.DataFrame() df_t = pd.DataFrame(records) if "time" not in df_t.columns: return pd.DataFrame() df_t["time"] = pd.to_datetime(df_t["time"], errors="coerce") df_t = df_t.dropna(subset=["time"]) if df_t.empty: return pd.DataFrame() df_t["bucket"] = df_t["time"].dt.floor(f"{bucket_minutes}min") grouped = df_t.groupby(["bucket", "sentiment"]).size().unstack(fill_value=0) for col in ["Positive", "Neutral", "Negative"]: if col not in grouped.columns: grouped[col] = 0 grouped = grouped.reset_index() grouped.columns.name = None return grouped[["bucket", "Positive", "Neutral", "Negative"]] def check_alert(df_all: pd.DataFrame, threshold: float = 0.4, window: int = 15) -> dict | None: if len(df_all) < window: return None recent = df_all.iloc[-window:] neg_ratio = (recent["sentiment"] == "Negative").mean() if neg_ratio >= threshold: return { "neg_ratio": neg_ratio, "count": int((recent["sentiment"] == "Negative").sum()), "window": window, } return None @st.cache_data(ttl=10, show_spinner=False) def compute_engagement(all_data_json: str, window: int = 50) -> dict: import json as _j msgs = _j.loads(all_data_json) if not msgs: return {"score": 0, "rate": 0.0, "pos_ratio": 0.0, "q_density": 0.0, "grade": "\u2014"} recent = msgs[-window:] n = len(recent) rate = 0.0 try: t0 = datetime.fromisoformat(recent[0]["time"]) t1 = datetime.fromisoformat(recent[-1]["time"]) elapsed = max((t1 - t0).total_seconds() / 60, 0.1) rate = round(n / elapsed, 1) except Exception: rate = float(n) pos_ratio = sum(1 for m in recent if m.get("sentiment") == "Positive") / max(n, 1) q_density = sum(1 for m in recent if m.get("topic") == "Question") / max(n, 1) rate_norm = min(rate / 60, 1.0) score = round((rate_norm * 0.4 + pos_ratio * 0.4 + q_density * 0.2) * 100) if score >= 70: grade = "\U0001f525 High" elif score >= 40: grade = "\u26a1 Medium" else: grade = "\U0001f4a4 Low" return {"score": score, "rate": rate, "pos_ratio": pos_ratio, "q_density": q_density, "grade": grade} @st.cache_data(ttl=10, show_spinner=False) def compute_top_contributors(all_data_json: str, top_n: int = 10) -> list[dict]: import json as _j msgs = _j.loads(all_data_json) if not msgs: return [] TOPICS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"] author_data: dict[str, dict] = {} for m in msgs: a = m.get("author", "Unknown") if a not in author_data: author_data[a] = {"count": 0, "Positive": 0, "Neutral": 0, "Negative": 0, **{t: 0 for t in TOPICS}} author_data[a]["count"] += 1 s = m.get("sentiment", "Neutral") if s in ("Positive", "Neutral", "Negative"): author_data[a][s] += 1 t = m.get("topic", "General") if t not in TOPICS: t = "General" author_data[a][t] += 1 sorted_authors = sorted(author_data.items(), key=lambda x: x[1]["count"], reverse=True)[:top_n] result = [] for author, d in sorted_authors: total = max(d["count"], 1) result.append({ "author": author, "count": d["count"], "pos_pct": round(d["Positive"] / total * 100), "neu_pct": round(d["Neutral"] / total * 100), "neg_pct": round(d["Negative"] / total * 100), "t_appr": round(d["Appreciation"] / total * 100), "t_ques": round(d["Question"] / total * 100), "t_rf": round(d["Request/Feedback"] / total * 100), "t_promo": round(d["Promo"] / total * 100), "t_spam": round(d["Spam"] / total * 100), "t_gen": round(d["General"] / total * 100), "t_mcq": round(d["MCQ Answer"] / total * 100), }) return result @st.cache_data(ttl=10, show_spinner=False) def compute_word_freq(all_data_json: str, sentiment_filter: str = "All", topic_filter: str = "All", top_n: int = 60) -> list[tuple[str, int]]: import json as _j from collections import Counter STOPWORDS = { "the","a","an","is","it","in","on","at","to","of","and","or","but","for", "with","this","that","are","was","be","as","by","from","have","has","had", "not","no","so","if","do","did","will","can","just","i","you","he","she", "we","they","my","your","his","her","our","their","me","him","us","them", "what","how","why","when","where","who","which","there","here","been", "would","could","should","may","might","shall","than","then","now","also", "more","very","too","up","out","about","into","over","after","before", "yaar","bhi","hai","hain","ho","kar","ke","ki","ka","ko","se","ne","ye", "vo","woh","aur","nahi","nhi","toh","koi","kuch","ab","ek","hi", } msgs = _j.loads(all_data_json) words: list[str] = [] for m in msgs: if sentiment_filter != "All" and m.get("sentiment") != sentiment_filter: continue if topic_filter != "All" and m.get("topic") != topic_filter: continue text = re.sub(r"[^\w\s]", " ", m.get("text", "").lower()) for w in text.split(): if len(w) > 2 and w not in STOPWORDS and not w.isdigit(): words.append(w) return Counter(words).most_common(top_n) def check_spam_alert(df_all: pd.DataFrame, threshold: float = 0.3, window: int = 20) -> dict | None: if "topic" not in df_all.columns or len(df_all) < window: return None recent = df_all.iloc[-window:] spam_ratio = (recent["topic"] == "Spam").mean() if spam_ratio >= threshold: return { "spam_ratio": spam_ratio, "count": int((recent["topic"] == "Spam").sum()), "window": window, } return None @st.cache_data(ttl=10, show_spinner=False) def detect_repeat_spammers(all_data_json: str, window_sec: int = 15, min_repeats: int = 2) -> list[dict]: import json as _j msgs = _j.loads(all_data_json) if not msgs: return [] def _normalize(t: str) -> str: return re.sub(r"[^\w]", "", t.lower().strip()) bursts: dict[tuple, dict] = {} for m in msgs: author = m.get("author", "Unknown") text = m.get("text", "").strip() if not text: continue norm = _normalize(text) if len(norm) < 4: continue ts_str = m.get("time", "") try: ts = datetime.fromisoformat(ts_str) except Exception: continue key = (author, norm) if key not in bursts: bursts[key] = { "author": author, "text": text, "topic": m.get("topic", "General"), "sentiment": m.get("sentiment", "Neutral"), "timestamps": [], } bursts[key]["timestamps"].append(ts) results = [] for key, burst in bursts.items(): times = sorted(burst["timestamps"]) max_in_window = 1 for i in range(len(times)): count_in_window = sum( 1 for t in times[i:] if (t - times[i]).total_seconds() <= window_sec ) max_in_window = max(max_in_window, count_in_window) if max_in_window >= min_repeats: results.append({ "author": burst["author"], "text": burst["text"], "topic": burst["topic"], "sentiment": burst["sentiment"], "count": len(times), "max_burst": max_in_window, "first_seen": times[0].strftime("%H:%M:%S"), "last_seen": times[-1].strftime("%H:%M:%S"), }) return sorted(results, key=lambda x: x["max_burst"], reverse=True)