LivePulse / backend /scraper.py
DivYonko
Improve keyword accuracy from CSV analysis + gate action_type on topic
5a13d2c
"""
backend/scraper.py
==================
Fetches live YouTube chat comments, runs sentiment + topic classification,
and pushes results to Redis.
Accepts optional CLI arguments so multiple instances can run in parallel:
python -m backend.scraper --video_id VIDEO_ID --redis_key chat_messages_a
Defaults fall back to config.py values.
"""
import argparse
import json
import logging
import time
from datetime import datetime
import pytchat
import redis
from backend.config import (
VIDEO_ID as DEFAULT_VIDEO_ID,
REDIS_HOST,
REDIS_PORT,
REDIS_DB,
)
from ml.sentiment_model import predict_sentiment
from ml.topic_model import predict_topic, VALID_TOPICS
from ml.action_type_model import predict_action_type, VALID_ACTION_TYPES
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("scraper")
MAX_REDIS_MESSAGES = 40000
def _safe_sentiment(text: str) -> tuple[str, float]:
try:
return predict_sentiment(text)
except Exception as exc:
logger.error("predict_sentiment failed for %r: %s", text[:60], exc)
return "Neutral", 0.50
def _safe_topic(text: str) -> tuple[str, float]:
try:
topic, conf = predict_topic(text)
if topic not in VALID_TOPICS:
return "General", 0.50
return topic, conf
except Exception as exc:
logger.error("predict_topic failed for %r: %s", text[:60], exc)
return "General", 0.50
def _safe_action_type(text: str) -> tuple[str, float]:
try:
action_type, conf = predict_action_type(text)
if action_type not in VALID_ACTION_TYPES:
return "N/A", 0.50
return action_type, conf
except Exception as exc:
logger.error("predict_action_type failed for %r: %s", text[:60], exc)
return "N/A", 0.50
def run(video_id: str, redis_key: str) -> None:
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
decode_responses=True,
socket_connect_timeout=5,
)
try:
r.ping()
logger.info("Redis connected βœ“")
except redis.ConnectionError as e:
logger.error("Cannot connect to Redis: %s", e)
raise SystemExit(1)
logger.info("Starting scraper β€” video=%s redis_key=%s", video_id, redis_key)
chat = pytchat.create(video_id=video_id)
if not chat.is_alive():
logger.error("Could not connect to live chat for %s. Is the stream live?", video_id)
return
logger.info("Live chat connected βœ“ β€” press Ctrl+C to stop")
while chat.is_alive():
try:
for c in chat.get().sync_items():
# pytchat converts emoji to :name: codes β€” convert back to actual characters
import emoji as _emoji
raw_text = c.message.strip()
text = _emoji.emojize(raw_text, language="alias")
author = c.author.name
if not text:
continue
sentiment, s_conf = _safe_sentiment(text)
topic, t_conf = _safe_topic(text)
# Only classify action type for Question/Request topics
if topic in ("Question", "Request/Feedback"):
action_type, at_conf = _safe_action_type(text)
else:
action_type, at_conf = "N/A", 0.50
message_data = {
"author": author,
"text": text,
"sentiment": sentiment,
"confidence": round(s_conf, 3),
"topic": topic,
"topic_conf": round(t_conf, 3),
"action_type": action_type,
"action_type_conf": round(at_conf, 3),
"time": datetime.now().isoformat(),
}
pipe = r.pipeline()
pipe.rpush(redis_key, json.dumps(message_data))
pipe.ltrim(redis_key, -MAX_REDIS_MESSAGES, -1)
pipe.execute()
logger.info(
"[%s] %s | %s(%.2f) %s(%.2f) %s(%.2f) | %r",
message_data["time"][11:19],
author[:20],
sentiment, s_conf,
topic, t_conf,
action_type, at_conf,
text[:60],
)
except KeyboardInterrupt:
logger.info("Stopped by user.")
break
except Exception as exc:
logger.error("Unexpected error: %s", exc, exc_info=True)
time.sleep(1)
logger.info("Chat stream ended β€” key=%s", redis_key)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--video_id", default=DEFAULT_VIDEO_ID, help="YouTube video ID")
parser.add_argument("--redis_key", default="chat_messages", help="Redis list key to write to")
args = parser.parse_args()
run(video_id=args.video_id, redis_key=args.redis_key)