File size: 5,772 Bytes
11a0fc5 a4612d4 11a0fc5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | """
backend/main.py
===============
FastAPI server exposing live-chat insights from Redis.
Start with:
uvicorn backend.main:app --reload --port 8000
Endpoints
---------
GET /get_messages Last 50 raw messages (full payload)
GET /sentiment_trend Last 200 messages β time + sentiment + confidence
GET /sentiment_summary Aggregate counts + avg confidence (last 200)
GET /topic_stats Per-topic counts (last 100 messages)
GET /live_stats Combined snapshot for dashboard widgets
GET /health Redis connectivity check
"""
import json
import redis
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from backend.config import REDIS_HOST, REDIS_PORT, REDIS_DB
# ββ App ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
app = FastAPI(
title="YouTube Live Chat Insights API",
description="Real-time Hinglish sentiment + topic analysis for live streams",
version="2.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ββ Redis ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
decode_responses=True,
socket_connect_timeout=5,
)
VALID_TOPICS = {"Appreciation", "Question", "Promo", "Spam", "General", "MCQ Answer"}
VALID_SENTIMENT = {"Positive", "Neutral", "Negative"}
def _get_messages(n: int) -> list[dict]:
"""Fetch last n messages from Redis, safely parse each."""
raws = r.lrange("chat_messages", -n, -1)
out = []
for raw in raws:
try:
out.append(json.loads(raw))
except (json.JSONDecodeError, TypeError):
pass
return out
def _redis_check():
try:
r.ping()
except redis.RedisError as exc:
raise HTTPException(status_code=503, detail=f"Redis unavailable: {exc}")
# ββ Endpoints ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get("/health")
def health():
"""Check Redis connectivity."""
_redis_check()
return {"status": "ok", "redis": f"{REDIS_HOST}:{REDIS_PORT}"}
@app.get("/get_messages")
def get_messages(limit: int = 50):
"""
Return the last `limit` chat messages with full payload.
Each item: author, text, sentiment, confidence, topic, topic_conf, time.
"""
_redis_check()
return _get_messages(min(limit, 200))
@app.get("/sentiment_trend")
def sentiment_trend():
"""
Time-series data for the last 200 messages.
Useful for plotting sentiment over time in a frontend chart.
Returns: [{time, sentiment, confidence}, ...]
"""
_redis_check()
msgs = _get_messages(200)
return [
{
"time": m.get("time"),
"sentiment": m.get("sentiment"),
"confidence": m.get("confidence"),
}
for m in msgs
]
@app.get("/sentiment_summary")
def sentiment_summary():
"""
Aggregate sentiment counts and average confidence for the last 200 messages.
Returns: {counts: {Positive, Neutral, Negative}, avg_confidence, total_messages}
"""
_redis_check()
msgs = _get_messages(200)
counts = {s: 0 for s in VALID_SENTIMENT}
total_conf = 0.0
for m in msgs:
s = m.get("sentiment", "Neutral")
if s in counts:
counts[s] += 1
try:
total_conf += float(m.get("confidence", 0))
except (TypeError, ValueError):
pass
n = len(msgs)
return {
"counts": counts,
"avg_confidence": round(total_conf / n, 3) if n else 0.0,
"total_messages": n,
}
@app.get("/topic_stats")
def topic_stats():
"""
Per-topic message counts for the last 100 messages.
Returns: {Appreciation: int, Question: int, Promo: int, Spam: int, General: int}
"""
_redis_check()
msgs = _get_messages(100)
counts = {t: 0 for t in VALID_TOPICS}
for m in msgs:
label = m.get("topic", "General")
if label not in VALID_TOPICS:
label = "General"
counts[label] += 1
return counts
@app.get("/live_stats")
def live_stats():
"""
Combined snapshot β sentiment summary + topic stats in one request.
Use this for dashboard widgets to avoid two round-trips.
"""
_redis_check()
msgs = _get_messages(200)
# Sentiment
s_counts = {s: 0 for s in VALID_SENTIMENT}
total_conf = 0.0
for m in msgs:
s = m.get("sentiment", "Neutral")
if s in s_counts:
s_counts[s] += 1
try:
total_conf += float(m.get("confidence", 0))
except (TypeError, ValueError):
pass
n = len(msgs)
# Topics (last 100 only)
t_counts = {t: 0 for t in VALID_TOPICS}
for m in msgs[-100:]:
label = m.get("topic", "General")
if label not in VALID_TOPICS:
label = "General"
t_counts[label] += 1
# Recent messages (last 10 for live feed widget)
recent = msgs[-10:]
return {
"sentiment": {
"counts": s_counts,
"avg_confidence": round(total_conf / n, 3) if n else 0.0,
"total_messages": n,
},
"topics": t_counts,
"recent": recent,
} |