| """ |
| backend/scraper.py |
| ================== |
| Fetches live YouTube chat comments, runs sentiment + topic classification, |
| and pushes results to Redis. |
| |
| Accepts optional CLI arguments so multiple instances can run in parallel: |
| python -m backend.scraper --video_id VIDEO_ID --redis_key chat_messages_a |
| |
| Defaults fall back to config.py values. |
| """ |
|
|
| import argparse |
| import json |
| import logging |
| import time |
| from datetime import datetime |
|
|
| import pytchat |
| import redis |
|
|
| from backend.config import ( |
| VIDEO_ID as DEFAULT_VIDEO_ID, |
| REDIS_HOST, |
| REDIS_PORT, |
| REDIS_DB, |
| ) |
| from ml.sentiment_model import predict_sentiment |
| from ml.topic_model import predict_topic, VALID_TOPICS |
| from ml.action_type_model import predict_action_type, VALID_ACTION_TYPES |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| datefmt="%H:%M:%S", |
| ) |
| logger = logging.getLogger("scraper") |
|
|
| MAX_REDIS_MESSAGES = 40000 |
|
|
|
|
| def _safe_sentiment(text: str) -> tuple[str, float]: |
| try: |
| return predict_sentiment(text) |
| except Exception as exc: |
| logger.error("predict_sentiment failed for %r: %s", text[:60], exc) |
| return "Neutral", 0.50 |
|
|
|
|
| def _safe_topic(text: str) -> tuple[str, float]: |
| try: |
| topic, conf = predict_topic(text) |
| if topic not in VALID_TOPICS: |
| return "General", 0.50 |
| return topic, conf |
| except Exception as exc: |
| logger.error("predict_topic failed for %r: %s", text[:60], exc) |
| return "General", 0.50 |
|
|
|
|
| def _safe_action_type(text: str) -> tuple[str, float]: |
| try: |
| action_type, conf = predict_action_type(text) |
| if action_type not in VALID_ACTION_TYPES: |
| return "N/A", 0.50 |
| return action_type, conf |
| except Exception as exc: |
| logger.error("predict_action_type failed for %r: %s", text[:60], exc) |
| return "N/A", 0.50 |
|
|
|
|
| def run(video_id: str, redis_key: str) -> None: |
| r = redis.Redis( |
| host=REDIS_HOST, |
| port=REDIS_PORT, |
| db=REDIS_DB, |
| decode_responses=True, |
| socket_connect_timeout=5, |
| ) |
| try: |
| r.ping() |
| logger.info("Redis connected β") |
| except redis.ConnectionError as e: |
| logger.error("Cannot connect to Redis: %s", e) |
| raise SystemExit(1) |
|
|
| logger.info("Starting scraper β video=%s redis_key=%s", video_id, redis_key) |
|
|
| chat = pytchat.create(video_id=video_id) |
| if not chat.is_alive(): |
| logger.error("Could not connect to live chat for %s. Is the stream live?", video_id) |
| return |
|
|
| logger.info("Live chat connected β β press Ctrl+C to stop") |
|
|
| while chat.is_alive(): |
| try: |
| for c in chat.get().sync_items(): |
| |
| import emoji as _emoji |
| raw_text = c.message.strip() |
| text = _emoji.emojize(raw_text, language="alias") |
| author = c.author.name |
| if not text: |
| continue |
|
|
| sentiment, s_conf = _safe_sentiment(text) |
| topic, t_conf = _safe_topic(text) |
| |
| if topic in ("Question", "Request/Feedback"): |
| action_type, at_conf = _safe_action_type(text) |
| else: |
| action_type, at_conf = "N/A", 0.50 |
|
|
| message_data = { |
| "author": author, |
| "text": text, |
| "sentiment": sentiment, |
| "confidence": round(s_conf, 3), |
| "topic": topic, |
| "topic_conf": round(t_conf, 3), |
| "action_type": action_type, |
| "action_type_conf": round(at_conf, 3), |
| "time": datetime.now().isoformat(), |
| } |
|
|
| pipe = r.pipeline() |
| pipe.rpush(redis_key, json.dumps(message_data)) |
| pipe.ltrim(redis_key, -MAX_REDIS_MESSAGES, -1) |
| pipe.execute() |
|
|
| logger.info( |
| "[%s] %s | %s(%.2f) %s(%.2f) %s(%.2f) | %r", |
| message_data["time"][11:19], |
| author[:20], |
| sentiment, s_conf, |
| topic, t_conf, |
| action_type, at_conf, |
| text[:60], |
| ) |
|
|
| except KeyboardInterrupt: |
| logger.info("Stopped by user.") |
| break |
| except Exception as exc: |
| logger.error("Unexpected error: %s", exc, exc_info=True) |
|
|
| time.sleep(1) |
|
|
| logger.info("Chat stream ended β key=%s", redis_key) |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--video_id", default=DEFAULT_VIDEO_ID, help="YouTube video ID") |
| parser.add_argument("--redis_key", default="chat_messages", help="Redis list key to write to") |
| args = parser.parse_args() |
| run(video_id=args.video_id, redis_key=args.redis_key) |
|
|