""" backend/scraper.py ================== Fetches live YouTube chat comments, runs sentiment + topic classification, and pushes results to Redis. Accepts optional CLI arguments so multiple instances can run in parallel: python -m backend.scraper --video_id VIDEO_ID --redis_key chat_messages_a Defaults fall back to config.py values. """ import argparse import json import logging import time from datetime import datetime import pytchat import redis from backend.config import ( VIDEO_ID as DEFAULT_VIDEO_ID, REDIS_HOST, REDIS_PORT, REDIS_DB, ) from ml.sentiment_model import predict_sentiment from ml.topic_model import predict_topic, VALID_TOPICS from ml.action_type_model import predict_action_type, VALID_ACTION_TYPES logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("scraper") MAX_REDIS_MESSAGES = 40000 def _safe_sentiment(text: str) -> tuple[str, float]: try: return predict_sentiment(text) except Exception as exc: logger.error("predict_sentiment failed for %r: %s", text[:60], exc) return "Neutral", 0.50 def _safe_topic(text: str) -> tuple[str, float]: try: topic, conf = predict_topic(text) if topic not in VALID_TOPICS: return "General", 0.50 return topic, conf except Exception as exc: logger.error("predict_topic failed for %r: %s", text[:60], exc) return "General", 0.50 def _safe_action_type(text: str) -> tuple[str, float]: try: action_type, conf = predict_action_type(text) if action_type not in VALID_ACTION_TYPES: return "N/A", 0.50 return action_type, conf except Exception as exc: logger.error("predict_action_type failed for %r: %s", text[:60], exc) return "N/A", 0.50 def run(video_id: str, redis_key: str) -> None: r = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True, socket_connect_timeout=5, ) try: r.ping() logger.info("Redis connected ✓") except redis.ConnectionError as e: logger.error("Cannot connect to Redis: %s", e) raise SystemExit(1) logger.info("Starting scraper — video=%s redis_key=%s", video_id, redis_key) chat = pytchat.create(video_id=video_id) if not chat.is_alive(): logger.error("Could not connect to live chat for %s. Is the stream live?", video_id) return logger.info("Live chat connected ✓ — press Ctrl+C to stop") while chat.is_alive(): try: for c in chat.get().sync_items(): # pytchat converts emoji to :name: codes — convert back to actual characters import emoji as _emoji raw_text = c.message.strip() text = _emoji.emojize(raw_text, language="alias") author = c.author.name if not text: continue sentiment, s_conf = _safe_sentiment(text) topic, t_conf = _safe_topic(text) # Only classify action type for Question/Request topics if topic in ("Question", "Request/Feedback"): action_type, at_conf = _safe_action_type(text) else: action_type, at_conf = "N/A", 0.50 message_data = { "author": author, "text": text, "sentiment": sentiment, "confidence": round(s_conf, 3), "topic": topic, "topic_conf": round(t_conf, 3), "action_type": action_type, "action_type_conf": round(at_conf, 3), "time": datetime.now().isoformat(), } pipe = r.pipeline() pipe.rpush(redis_key, json.dumps(message_data)) pipe.ltrim(redis_key, -MAX_REDIS_MESSAGES, -1) pipe.execute() logger.info( "[%s] %s | %s(%.2f) %s(%.2f) %s(%.2f) | %r", message_data["time"][11:19], author[:20], sentiment, s_conf, topic, t_conf, action_type, at_conf, text[:60], ) except KeyboardInterrupt: logger.info("Stopped by user.") break except Exception as exc: logger.error("Unexpected error: %s", exc, exc_info=True) time.sleep(1) logger.info("Chat stream ended — key=%s", redis_key) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--video_id", default=DEFAULT_VIDEO_ID, help="YouTube video ID") parser.add_argument("--redis_key", default="chat_messages", help="Redis list key to write to") args = parser.parse_args() run(video_id=args.video_id, redis_key=args.redis_key)