| |
| |
| |
| |
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import re |
| import sqlite3 |
| import threading |
| from collections import defaultdict |
| from datetime import datetime |
|
|
| import pandas as pd |
| import streamlit as st |
|
|
| |
| from ml.sentiment_model import predict_sentiment |
| from ml.topic_model import predict_topic, VALID_TOPICS |
| from ml.action_type_model import predict_action_type, VALID_ACTION_TYPES |
|
|
| |
| DB_PATH = "/tmp/livepulse.db" |
| MAX_STORE_MESSAGES = 100_000 |
|
|
| _DB_LOCK = threading.Lock() |
| _META: dict[str, str] = {} |
|
|
| _SCRAPER_THREADS: dict[str, threading.Thread] = {} |
| _SCRAPER_STOP: dict[str, threading.Event] = {} |
|
|
|
|
| def _get_db() -> sqlite3.Connection: |
| conn = sqlite3.connect(DB_PATH, check_same_thread=False) |
| conn.execute(""" |
| CREATE TABLE IF NOT EXISTS messages ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| key TEXT NOT NULL, |
| value TEXT NOT NULL |
| ) |
| """) |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_key ON messages(key)") |
| conn.commit() |
| return conn |
|
|
|
|
| _db_conn = _get_db() |
|
|
|
|
| def store_lrange(key: str, start: int, end: int) -> list[str]: |
| with _DB_LOCK: |
| rows = _db_conn.execute( |
| "SELECT value FROM messages WHERE key=? ORDER BY id ASC", (key,) |
| ).fetchall() |
| values = [r[0] for r in rows] |
| n = len(values) |
| if n == 0: |
| return [] |
| if start < 0: |
| start = max(n + start, 0) |
| if end < 0: |
| end = n + end |
| end = min(end, n - 1) |
| if start > end: |
| return [] |
| return values[start: end + 1] |
|
|
|
|
| def store_llen(key: str) -> int: |
| with _DB_LOCK: |
| row = _db_conn.execute( |
| "SELECT COUNT(*) FROM messages WHERE key=?", (key,) |
| ).fetchone() |
| return row[0] if row else 0 |
|
|
|
|
| def store_delete(key: str) -> None: |
| with _DB_LOCK: |
| _db_conn.execute("DELETE FROM messages WHERE key=?", (key,)) |
| _db_conn.commit() |
|
|
|
|
| def store_rpush(key: str, value: str) -> None: |
| with _DB_LOCK: |
| _db_conn.execute( |
| "INSERT INTO messages (key, value) VALUES (?, ?)", (key, value) |
| ) |
| _db_conn.execute(""" |
| DELETE FROM messages WHERE key=? AND id NOT IN ( |
| SELECT id FROM messages WHERE key=? ORDER BY id DESC LIMIT ? |
| ) |
| """, (key, key, MAX_STORE_MESSAGES)) |
| _db_conn.commit() |
|
|
|
|
| |
| VIDEO_ID = os.getenv("VIDEO_ID", "") |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| force=True, |
| ) |
| logger = logging.getLogger("app.scraper") |
|
|
| |
| MAX_STREAMS = 5 |
| STREAM_COLORS = ["#7c3aed", "#10b981", "#f59e0b", "#3b82f6", "#ec4899"] |
| STREAM_NAMES = ["A", "B", "C", "D", "E"] |
|
|
| TOPIC_LABELS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"] |
| TOPIC_COLOR = { |
| "Appreciation": "#f59e0b", "Question": "#3b82f6", |
| "Request/Feedback": "#8b5cf6", |
| "Promo": "#ec4899", "Spam": "#ef4444", "General": "#6b7280", |
| "MCQ Answer": "#10b981", |
| } |
| SENT_COLORS = {"Positive": "#22c55e", "Neutral": "#eab308", "Negative": "#ef4444"} |
|
|
| |
|
|
| def _safe_sentiment(text: str): |
| try: |
| return predict_sentiment(text) |
| except Exception as exc: |
| logger.error("predict_sentiment failed: %s", exc) |
| return "Neutral", 0.50 |
|
|
|
|
| def _safe_topic(text: str): |
| try: |
| topic, conf = predict_topic(text) |
| if topic not in VALID_TOPICS: |
| return "General", 0.50 |
| return topic, conf |
| except Exception as exc: |
| logger.error("predict_topic failed: %s", exc) |
| return "General", 0.50 |
|
|
|
|
| def _safe_action_type(text: str): |
| try: |
| action_type, conf = predict_action_type(text) |
| if action_type not in VALID_ACTION_TYPES: |
| return "N/A", 0.50 |
| return action_type, conf |
| except Exception as exc: |
| logger.error("predict_action_type failed: %s", exc) |
| return "N/A", 0.50 |
|
|
|
|
| def _get_live_chat_id(video_id: str, api_key: str) -> str | None: |
| import urllib.request |
| import urllib.parse |
| import urllib.error |
|
|
| url = ( |
| "https://www.googleapis.com/youtube/v3/videos" |
| f"?part=liveStreamingDetails&id={urllib.parse.quote(video_id)}&key={api_key}" |
| ) |
| try: |
| with urllib.request.urlopen(url, timeout=10) as resp: |
| data = json.loads(resp.read()) |
| items = data.get("items", []) |
| if not items: |
| logger.error("No video found for id=%s", video_id) |
| return None |
| live_details = items[0].get("liveStreamingDetails", {}) |
| chat_id = live_details.get("activeLiveChatId") |
| if not chat_id: |
| logger.error("No activeLiveChatId for video=%s", video_id) |
| return chat_id |
| except urllib.error.HTTPError as exc: |
| body = exc.read().decode("utf-8", errors="replace")[:500] |
| logger.error("HTTP %d from YouTube API for video=%s: %s", exc.code, video_id, body) |
| return None |
| except Exception as exc: |
| logger.error("Failed to get liveChatId: %s", exc) |
| return None |
|
|
|
|
| def _fetch_chat_messages(live_chat_id: str, api_key: str, page_token: str | None = None): |
| import urllib.request |
| import urllib.parse |
|
|
| params = { |
| "part": "snippet,authorDetails", |
| "liveChatId": live_chat_id, |
| "key": api_key, |
| "maxResults": "200", |
| } |
| if page_token: |
| params["pageToken"] = page_token |
|
|
| url = "https://www.googleapis.com/youtube/v3/liveChat/messages?" + urllib.parse.urlencode(params) |
| try: |
| with urllib.request.urlopen(url, timeout=10) as resp: |
| data = json.loads(resp.read()) |
| messages = data.get("items", []) |
| next_token = data.get("nextPageToken") |
| poll_interval = data.get("pollingIntervalMillis", 5000) |
| return messages, next_token, poll_interval |
| except Exception as exc: |
| logger.error("Failed to fetch chat messages: %s", exc) |
| return [], None, 5000 |
|
|
|
|
| def _scraper_thread_fn(video_id: str, redis_key: str, stop_event: threading.Event, min_poll_s: float = 10.0, api_key: str = "") -> None: |
| if not api_key: |
| api_key = os.getenv("YOUTUBE_API_KEY", "") |
| if not api_key: |
| msg = "No API key provided. Enter your YouTube Data API v3 key in the sidebar." |
| logger.error(msg) |
| _META["scraper_error"] = msg |
| return |
|
|
| _META.pop("scraper_error", None) |
| live_chat_id = _get_live_chat_id(video_id, api_key) |
| if not live_chat_id: |
| msg = f"No active live chat found for video '{video_id}'. Make sure the stream is currently LIVE." |
| logger.error(msg) |
| _META["scraper_error"] = msg |
| return |
|
|
| page_token = None |
| seen_ids: set = set() |
| is_first_page = True |
|
|
| while not stop_event.is_set(): |
| messages, page_token, poll_ms = _fetch_chat_messages(live_chat_id, api_key, page_token) |
|
|
| new_msgs = [] |
| for item in messages: |
| if stop_event.is_set(): |
| break |
| msg_id = item.get("id", "") |
| if msg_id in seen_ids: |
| continue |
| seen_ids.add(msg_id) |
| snippet = item.get("snippet", {}) |
| if snippet.get("type") != "textMessageEvent": |
| continue |
| text = snippet.get("displayMessage", "").strip() |
| import emoji as _emoji |
| text = _emoji.emojize(text, language="alias") |
| author = item.get("authorDetails", {}).get("displayName", "Unknown") |
| if not text: |
| continue |
| new_msgs.append((msg_id, text, author)) |
|
|
| if is_first_page and new_msgs: |
| for _, text, author in new_msgs: |
| message_data = { |
| "author": author, "text": text, |
| "sentiment": "Neutral", "confidence": 0.5, |
| "topic": "General", "topic_conf": 0.5, |
| "action_type": "N/A", "action_type_conf": 0.5, |
| "time": datetime.now().isoformat(), |
| } |
| store_rpush(redis_key, json.dumps(message_data)) |
| is_first_page = False |
| else: |
| for _, text, author in new_msgs: |
| if stop_event.is_set(): |
| break |
| try: |
| sentiment, s_conf = _safe_sentiment(text) |
| topic, t_conf = _safe_topic(text) |
| |
| if topic in ("Question", "Request/Feedback"): |
| action_type, at_conf = _safe_action_type(text) |
| else: |
| action_type, at_conf = "N/A", 0.50 |
| except Exception as exc: |
| logger.error("ML inference failed: %s", exc) |
| sentiment, s_conf = "Neutral", 0.5 |
| topic, t_conf = "General", 0.5 |
| action_type, at_conf = "N/A", 0.5 |
|
|
| message_data = { |
| "author": author, "text": text, |
| "sentiment": sentiment, "confidence": round(s_conf, 3), |
| "topic": topic, "topic_conf": round(t_conf, 3), |
| "action_type": action_type, "action_type_conf": round(at_conf, 3), |
| "time": datetime.now().isoformat(), |
| } |
| store_rpush(redis_key, json.dumps(message_data)) |
|
|
| if len(seen_ids) > 5000: |
| seen_ids = set(list(seen_ids)[-2000:]) |
|
|
| |
| wait_s = max(poll_ms / 1000, min_poll_s) |
| stop_event.wait(timeout=wait_s) |
|
|
|
|
| def start_scraper(slot_idx: int, video_id: str, redis_key: str, min_poll_s: float = 10.0, api_key: str = "") -> None: |
| key = str(slot_idx) |
| stop_scraper(slot_idx) |
| stop_event = threading.Event() |
| t = threading.Thread( |
| target=_scraper_thread_fn, |
| args=(video_id, redis_key, stop_event, min_poll_s, api_key), |
| daemon=True, |
| name=f"scraper-{slot_idx}", |
| ) |
| _SCRAPER_STOP[key] = stop_event |
| _SCRAPER_THREADS[key] = t |
| t.start() |
|
|
|
|
| def stop_scraper(slot_idx: int) -> None: |
| key = str(slot_idx) |
| ev = _SCRAPER_STOP.get(key) |
| if ev: |
| ev.set() |
|
|
|
|
| def is_scraper_running(slot_idx: int) -> bool: |
| key = str(slot_idx) |
| t = _SCRAPER_THREADS.get(key) |
| return t is not None and t.is_alive() |
|
|
|
|
| |
|
|
| def extract_video_id(url_or_id: str) -> str: |
| url_or_id = url_or_id.strip() |
| match = re.search(r"(?:v=|/live/|youtu\.be/)([A-Za-z0-9_-]{11})", url_or_id) |
| if match: |
| return match.group(1) |
| if re.match(r"^[A-Za-z0-9_-]{11}$", url_or_id): |
| return url_or_id |
| return url_or_id |
|
|
|
|
| def fetch_video_title(video_id: str) -> str | None: |
| """Try oembed first (works for non-live), then YouTube Data API v3 (works for live).""" |
| import urllib.request |
| import urllib.parse |
| try: |
| url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json" |
| with urllib.request.urlopen(url, timeout=5) as resp: |
| title = json.loads(resp.read()).get("title") |
| if title: |
| return title |
| except Exception: |
| pass |
| try: |
| api_key = os.getenv("YOUTUBE_API_KEY", "") |
| if api_key: |
| url = ( |
| "https://www.googleapis.com/youtube/v3/videos" |
| f"?part=snippet&id={urllib.parse.quote(video_id)}&key={api_key}" |
| ) |
| with urllib.request.urlopen(url, timeout=5) as resp: |
| data = json.loads(resp.read()) |
| items = data.get("items", []) |
| if items: |
| return items[0]["snippet"]["title"] |
| except Exception: |
| pass |
| return None |
|
|
|
|
| def clean_topic(val) -> str: |
| if pd.isna(val) or str(val).strip() == "" or str(val).strip().lower() == "nan": |
| return "General" |
| return str(val).strip() |
|
|
|
|
| def clean_sentiment(val) -> str: |
| if str(val).strip() in ("Positive", "Negative", "Neutral"): |
| return str(val).strip() |
| return "Neutral" |
|
|
|
|
| def plotly_layout(height: int = 280) -> dict: |
| return dict( |
| paper_bgcolor="rgba(0,0,0,0)", |
| plot_bgcolor="rgba(0,0,0,0)", |
| height=height, |
| margin=dict(l=10, r=10, t=10, b=10), |
| font=dict(family="Space Grotesk"), |
| xaxis=dict(showgrid=False, zeroline=False, showline=False, |
| tickfont=dict(size=11), title=None), |
| yaxis=dict(showgrid=True, gridcolor="rgba(128,128,128,0.12)", |
| zeroline=False, showline=False, tickfont=dict(size=11), title=None), |
| showlegend=False, |
| hoverlabel=dict(font_family="Space Grotesk", font_size=12), |
| ) |
|
|
|
|
| def csv_download(df_export, label: str, filename: str) -> None: |
| csv = df_export.to_csv(index=False).encode("utf-8") |
| st.download_button(label=f"\u2b07 {label}", data=csv, |
| file_name=filename, mime="text/csv", key=filename) |
|
|
|
|
| def load_stream_data(redis_key: str, limit: int | None = None) -> list[dict]: |
| if limit: |
| raws = store_lrange(redis_key, -limit, -1) |
| else: |
| raws = store_lrange(redis_key, 0, -1) |
| data = [] |
| for raw in raws: |
| try: |
| data.append(json.loads(raw)) |
| except Exception: |
| pass |
| return data |
|
|
|
|
| |
|
|
| @st.cache_data(ttl=10, show_spinner=False) |
| def compute_velocity(df_all_json: str, window: int = 20) -> dict: |
| import json as _json |
| sentiments = [m.get("sentiment", "Neutral") for m in _json.loads(df_all_json)] |
| n = len(sentiments) |
| if n < window * 2: |
| return {"direction": "\u2192", "delta": 0.0, "label": "Stable", "color": "#eab308"} |
| recent = sentiments[-window:] |
| prev = sentiments[-window*2:-window] |
| r_pos = sum(1 for s in recent if s == "Positive") / window |
| p_pos = sum(1 for s in prev if s == "Positive") / window |
| delta = r_pos - p_pos |
| if delta > 0.08: |
| return {"direction": "\u2191", "delta": delta, "label": "Rising", "color": "#22c55e"} |
| elif delta < -0.08: |
| return {"direction": "\u2193", "delta": delta, "label": "Falling", "color": "#ef4444"} |
| return {"direction": "\u2192", "delta": delta, "label": "Stable", "color": "#eab308"} |
|
|
|
|
| @st.cache_data(ttl=10, show_spinner=False) |
| def build_heatmap_data(df_all_json: str, bucket_minutes: int = 1) -> pd.DataFrame: |
| import json as _json |
| records = _json.loads(df_all_json) |
| if not records: |
| return pd.DataFrame() |
| df_t = pd.DataFrame(records) |
| if "time" not in df_t.columns: |
| return pd.DataFrame() |
| df_t["time"] = pd.to_datetime(df_t["time"], errors="coerce") |
| df_t = df_t.dropna(subset=["time"]) |
| if df_t.empty: |
| return pd.DataFrame() |
| df_t["bucket"] = df_t["time"].dt.floor(f"{bucket_minutes}min") |
| grouped = df_t.groupby(["bucket", "sentiment"]).size().unstack(fill_value=0) |
| for col in ["Positive", "Neutral", "Negative"]: |
| if col not in grouped.columns: |
| grouped[col] = 0 |
| grouped = grouped.reset_index() |
| grouped.columns.name = None |
| return grouped[["bucket", "Positive", "Neutral", "Negative"]] |
|
|
|
|
| def check_alert(df_all: pd.DataFrame, threshold: float = 0.4, window: int = 15) -> dict | None: |
| if len(df_all) < window: |
| return None |
| recent = df_all.iloc[-window:] |
| neg_ratio = (recent["sentiment"] == "Negative").mean() |
| if neg_ratio >= threshold: |
| return { |
| "neg_ratio": neg_ratio, |
| "count": int((recent["sentiment"] == "Negative").sum()), |
| "window": window, |
| } |
| return None |
|
|
|
|
| @st.cache_data(ttl=10, show_spinner=False) |
| def compute_engagement(all_data_json: str, window: int = 50) -> dict: |
| import json as _j |
| msgs = _j.loads(all_data_json) |
| if not msgs: |
| return {"score": 0, "rate": 0.0, "pos_ratio": 0.0, "q_density": 0.0, "grade": "\u2014"} |
| recent = msgs[-window:] |
| n = len(recent) |
| rate = 0.0 |
| try: |
| t0 = datetime.fromisoformat(recent[0]["time"]) |
| t1 = datetime.fromisoformat(recent[-1]["time"]) |
| elapsed = max((t1 - t0).total_seconds() / 60, 0.1) |
| rate = round(n / elapsed, 1) |
| except Exception: |
| rate = float(n) |
| pos_ratio = sum(1 for m in recent if m.get("sentiment") == "Positive") / max(n, 1) |
| q_density = sum(1 for m in recent if m.get("topic") == "Question") / max(n, 1) |
| rate_norm = min(rate / 60, 1.0) |
| score = round((rate_norm * 0.4 + pos_ratio * 0.4 + q_density * 0.2) * 100) |
| if score >= 70: grade = "\U0001f525 High" |
| elif score >= 40: grade = "\u26a1 Medium" |
| else: grade = "\U0001f4a4 Low" |
| return {"score": score, "rate": rate, "pos_ratio": pos_ratio, "q_density": q_density, "grade": grade} |
|
|
|
|
| @st.cache_data(ttl=10, show_spinner=False) |
| def compute_top_contributors(all_data_json: str, top_n: int = 10) -> list[dict]: |
| import json as _j |
| msgs = _j.loads(all_data_json) |
| if not msgs: |
| return [] |
| TOPICS = ["Appreciation", "Question", "Request/Feedback", "Promo", "Spam", "General", "MCQ Answer"] |
| author_data: dict[str, dict] = {} |
| for m in msgs: |
| a = m.get("author", "Unknown") |
| if a not in author_data: |
| author_data[a] = {"count": 0, "Positive": 0, "Neutral": 0, "Negative": 0, |
| **{t: 0 for t in TOPICS}} |
| author_data[a]["count"] += 1 |
| s = m.get("sentiment", "Neutral") |
| if s in ("Positive", "Neutral", "Negative"): |
| author_data[a][s] += 1 |
| t = m.get("topic", "General") |
| if t not in TOPICS: |
| t = "General" |
| author_data[a][t] += 1 |
| sorted_authors = sorted(author_data.items(), key=lambda x: x[1]["count"], reverse=True)[:top_n] |
| result = [] |
| for author, d in sorted_authors: |
| total = max(d["count"], 1) |
| result.append({ |
| "author": author, "count": d["count"], |
| "pos_pct": round(d["Positive"] / total * 100), |
| "neu_pct": round(d["Neutral"] / total * 100), |
| "neg_pct": round(d["Negative"] / total * 100), |
| "t_appr": round(d["Appreciation"] / total * 100), |
| "t_ques": round(d["Question"] / total * 100), |
| "t_rf": round(d["Request/Feedback"] / total * 100), |
| "t_promo": round(d["Promo"] / total * 100), |
| "t_spam": round(d["Spam"] / total * 100), |
| "t_gen": round(d["General"] / total * 100), |
| "t_mcq": round(d["MCQ Answer"] / total * 100), |
| }) |
| return result |
|
|
|
|
| @st.cache_data(ttl=10, show_spinner=False) |
| def compute_word_freq(all_data_json: str, sentiment_filter: str = "All", |
| topic_filter: str = "All", top_n: int = 60) -> list[tuple[str, int]]: |
| import json as _j |
| from collections import Counter |
| STOPWORDS = { |
| "the","a","an","is","it","in","on","at","to","of","and","or","but","for", |
| "with","this","that","are","was","be","as","by","from","have","has","had", |
| "not","no","so","if","do","did","will","can","just","i","you","he","she", |
| "we","they","my","your","his","her","our","their","me","him","us","them", |
| "what","how","why","when","where","who","which","there","here","been", |
| "would","could","should","may","might","shall","than","then","now","also", |
| "more","very","too","up","out","about","into","over","after","before", |
| "yaar","bhi","hai","hain","ho","kar","ke","ki","ka","ko","se","ne","ye", |
| "vo","woh","aur","nahi","nhi","toh","koi","kuch","ab","ek","hi", |
| } |
| msgs = _j.loads(all_data_json) |
| words: list[str] = [] |
| for m in msgs: |
| if sentiment_filter != "All" and m.get("sentiment") != sentiment_filter: |
| continue |
| if topic_filter != "All" and m.get("topic") != topic_filter: |
| continue |
| text = re.sub(r"[^\w\s]", " ", m.get("text", "").lower()) |
| for w in text.split(): |
| if len(w) > 2 and w not in STOPWORDS and not w.isdigit(): |
| words.append(w) |
| return Counter(words).most_common(top_n) |
|
|
|
|
| def check_spam_alert(df_all: pd.DataFrame, threshold: float = 0.3, window: int = 20) -> dict | None: |
| if "topic" not in df_all.columns or len(df_all) < window: |
| return None |
| recent = df_all.iloc[-window:] |
| spam_ratio = (recent["topic"] == "Spam").mean() |
| if spam_ratio >= threshold: |
| return { |
| "spam_ratio": spam_ratio, |
| "count": int((recent["topic"] == "Spam").sum()), |
| "window": window, |
| } |
| return None |
|
|
|
|
| @st.cache_data(ttl=10, show_spinner=False) |
| def detect_repeat_spammers(all_data_json: str, window_sec: int = 15, min_repeats: int = 2) -> list[dict]: |
| import json as _j |
| msgs = _j.loads(all_data_json) |
| if not msgs: |
| return [] |
|
|
| def _normalize(t: str) -> str: |
| return re.sub(r"[^\w]", "", t.lower().strip()) |
|
|
| bursts: dict[tuple, dict] = {} |
| for m in msgs: |
| author = m.get("author", "Unknown") |
| text = m.get("text", "").strip() |
| if not text: |
| continue |
| norm = _normalize(text) |
| if len(norm) < 4: |
| continue |
| ts_str = m.get("time", "") |
| try: |
| ts = datetime.fromisoformat(ts_str) |
| except Exception: |
| continue |
| key = (author, norm) |
| if key not in bursts: |
| bursts[key] = { |
| "author": author, "text": text, |
| "topic": m.get("topic", "General"), |
| "sentiment": m.get("sentiment", "Neutral"), |
| "timestamps": [], |
| } |
| bursts[key]["timestamps"].append(ts) |
|
|
| results = [] |
| for key, burst in bursts.items(): |
| times = sorted(burst["timestamps"]) |
| max_in_window = 1 |
| for i in range(len(times)): |
| count_in_window = sum( |
| 1 for t in times[i:] |
| if (t - times[i]).total_seconds() <= window_sec |
| ) |
| max_in_window = max(max_in_window, count_in_window) |
| if max_in_window >= min_repeats: |
| results.append({ |
| "author": burst["author"], |
| "text": burst["text"], |
| "topic": burst["topic"], |
| "sentiment": burst["sentiment"], |
| "count": len(times), |
| "max_burst": max_in_window, |
| "first_seen": times[0].strftime("%H:%M:%S"), |
| "last_seen": times[-1].strftime("%H:%M:%S"), |
| }) |
| return sorted(results, key=lambda x: x["max_burst"], reverse=True) |
|
|