PulseAI / backend /main.py
aasthav18's picture
..
929aaae
"""
Social Intelligence Platform β€” FastAPI Backend
─────────────────────────────────────────────────────────────────────────────
Main application entrypoint. Wires together the NLP pipelines and exposes
a clean REST API for the frontend dashboard.
Architecture:
POST /api/analyze β†’ Single text sentiment + crisis + aspects
POST /api/batch-analyze β†’ Bulk post analysis
GET /api/dashboard β†’ Full dashboard data payload
GET /api/topics β†’ Topic clusters
GET /api/trends β†’ Time series + forecast
GET /api/competitors β†’ Competitor intelligence
GET /api/crisis β†’ Crisis scan results
POST /api/ingest β†’ Add posts to the demo corpus
GET /api/health β†’ Health check
"""
from __future__ import annotations
import logging
import sys
import time
from typing import List, Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
# ─── Internal modules ─────────────────────────────────────────────────────
sys.path.append(".")
from data.sample_data import generate_posts, generate_competitor_data, generate_time_series
from nlp.sentiment import get_analyzer
from nlp.topic_model import get_modeler
from nlp.trend_analysis import get_trend_analyzer
from nlp.crisis_detector import get_crisis_detector
from nlp.competitor_intel import get_competitor_intel
logging.basicConfig(level=logging.INFO, format="%(asctime)s β”‚ %(levelname)s β”‚ %(message)s")
logger = logging.getLogger(__name__)
# ─── In-memory state (replace with DB for production) ─────────────────────
_corpus: List[dict] = []
_analysis_cache: dict = {}
_initialized = False
def _bootstrap() -> None:
"""Generate sample data, run NLP pipeline, cache results."""
global _corpus, _analysis_cache, _initialized
logger.info("Bootstrapping platform with sample data...")
t0 = time.time()
# Generate posts
_corpus = generate_posts(n=400)
texts = [p["text"] for p in _corpus]
# ── Sentiment analysis ────────────────────────────────────────────
analyzer = get_analyzer()
logger.info(f"Running sentiment on {len(texts)} posts (mode: {analyzer.mode})...")
sentiments = analyzer.batch_analyze(texts)
for i, post in enumerate(_corpus):
post["sentiment"] = sentiments[i]["label"]
post["sentiment_score"] = sentiments[i]["score"]
# ── Topic modeling ────────────────────────────────────────────────
logger.info("Fitting topic model...")
modeler = get_modeler(n_topics=8)
modeler.fit(texts)
topic_labels = modeler.get_document_topics(texts)
for i, post in enumerate(_corpus):
post["topic_id"] = topic_labels[i]
post["topic_name"] = modeler.topic_names[topic_labels[i]]
sentiment_labels = [p["sentiment"] for p in _corpus]
topics_summary = modeler.get_topics_summary(texts, sentiments=sentiment_labels)
# ── Trend analysis ────────────────────────────────────────────────
logger.info("Running trend analysis...")
trend_analyzer = get_trend_analyzer()
raw_series = trend_analyzer.aggregate_posts_to_series(_corpus)
# Merge with richer pre-generated series for longer history
extended_series = generate_time_series(days=90)
trend_data = trend_analyzer.analyze_time_series(extended_series)
# ── Crisis detection ──────────────────────────────────────────────
logger.info("Running crisis scan...")
detector = get_crisis_detector()
crisis_report = detector.scan_corpus(_corpus)
volume_spike = detector.detect_volume_spike(raw_series)
# ── Competitor intelligence ───────────────────────────────────────
logger.info("Building competitor intelligence...")
intel = get_competitor_intel()
comp_report = intel.build_competitive_report(
_corpus,
brand_name="TechFlow",
brand_overall_sentiment=float(trend_data["trend"]["avg_30d"]),
)
# ── Assemble dashboard payload ────────────────────────────────────
pos_count = sum(1 for p in _corpus if p["sentiment"] == "positive")
neg_count = sum(1 for p in _corpus if p["sentiment"] == "negative")
total = len(_corpus)
_analysis_cache = {
"meta": {
"total_posts": total,
"model_mode": analyzer.mode,
"bootstrapped_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"elapsed_seconds": round(time.time() - t0, 1),
},
"summary": {
"overall_sentiment": trend_data["trend"]["current_sentiment"],
"avg_7d_sentiment": trend_data["trend"]["avg_7d"],
"avg_30d_sentiment": trend_data["trend"]["avg_30d"],
"delta": trend_data["trend"]["delta_7d_vs_30d"],
"trend_direction": trend_data["trend"]["direction"],
"total_volume": trend_data["trend"]["total_volume"],
"avg_daily_volume": trend_data["trend"]["avg_daily_volume"],
"positive_count": pos_count,
"negative_count": neg_count,
"neutral_count": total - pos_count - neg_count,
"positive_pct": round(100 * pos_count / total, 1),
"negative_pct": round(100 * neg_count / total, 1),
"nps_estimate": round((pos_count - neg_count) / total * 100, 1),
"crisis_alert": crisis_report["overall_alert_level"],
},
"topics": topics_summary,
"trends": trend_data,
"crisis": crisis_report,
"volume_spike": volume_spike,
"competitors": comp_report,
"recent_posts": _corpus[:50],
}
_initialized = True
logger.info(f"Bootstrap complete in {time.time() - t0:.1f}s")
# ─── App startup ───────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
_bootstrap()
yield
app = FastAPI(
title="Social Intelligence Platform API",
description="AI-powered brand monitoring, sentiment analysis, and competitor intelligence.",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ─── Schemas ───────────────────────────────────────────────────────────────
class AnalyzeRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=2000)
include_aspects: bool = True
include_crisis: bool = True
class BatchAnalyzeRequest(BaseModel):
texts: List[str] = Field(..., min_items=1, max_items=200)
class IngestRequest(BaseModel):
posts: List[dict]
# ─── Routes ────────────────────────────────────────────────────────────────
@app.get("/api/health")
async def health():
return {
"status": "ok",
"initialized": _initialized,
"corpus_size": len(_corpus),
"model_mode": get_analyzer().mode,
}
@app.get("/api/dashboard")
async def dashboard():
if not _initialized:
raise HTTPException(503, "Platform is initializing. Please try again in a moment.")
return _analysis_cache
@app.get("/api/summary")
async def summary():
if not _initialized:
raise HTTPException(503, "Initializing...")
return _analysis_cache["summary"]
@app.get("/api/topics")
async def topics():
if not _initialized:
raise HTTPException(503, "Initializing...")
return {"topics": _analysis_cache["topics"]}
@app.get("/api/trends")
async def trends():
if not _initialized:
raise HTTPException(503, "Initializing...")
return _analysis_cache["trends"]
@app.get("/api/crisis")
async def crisis():
if not _initialized:
raise HTTPException(503, "Initializing...")
return {
"crisis": _analysis_cache["crisis"],
"volume_spike": _analysis_cache.get("volume_spike"),
}
@app.get("/api/competitors")
async def competitors():
if not _initialized:
raise HTTPException(503, "Initializing...")
return _analysis_cache["competitors"]
@app.get("/api/posts")
async def posts(limit: int = 50, sentiment: Optional[str] = None, source: Optional[str] = None):
filtered = _corpus
if sentiment:
filtered = [p for p in filtered if p.get("sentiment") == sentiment]
if source:
filtered = [p for p in filtered if p.get("source", "").lower() == source.lower()]
return {"posts": filtered[:limit], "total": len(filtered)}
@app.post("/api/analyze")
async def analyze(req: AnalyzeRequest):
"""Real-time analysis of a single text."""
analyzer = get_analyzer()
sentiment = analyzer.analyze(req.text)
result = {"text": req.text, "sentiment": sentiment}
if req.include_aspects:
aspects = analyzer.analyze_aspects(req.text)
result["aspects"] = aspects
if req.include_crisis:
detector = get_crisis_detector()
crisis = detector.score_post(req.text)
result["crisis"] = crisis
return result
@app.post("/api/batch-analyze")
async def batch_analyze(req: BatchAnalyzeRequest):
"""Batch analysis of multiple texts."""
analyzer = get_analyzer()
results = analyzer.batch_analyze(req.texts)
return {"results": results, "count": len(results)}
@app.post("/api/ingest")
async def ingest(req: IngestRequest, background_tasks: BackgroundTasks):
"""Add new posts to the corpus and trigger re-analysis."""
global _corpus
_corpus = req.posts + _corpus
background_tasks.add_task(_bootstrap)
return {"status": "accepted", "posts_added": len(req.posts), "total": len(_corpus)}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)