File size: 11,016 Bytes
7eba88d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
929aaae
7eba88d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
"""
Social Intelligence Platform β€” FastAPI Backend
─────────────────────────────────────────────────────────────────────────────
Main application entrypoint. Wires together the NLP pipelines and exposes
a clean REST API for the frontend dashboard.

Architecture:
  POST /api/analyze          β†’ Single text sentiment + crisis + aspects
  POST /api/batch-analyze    β†’ Bulk post analysis
  GET  /api/dashboard        β†’ Full dashboard data payload
  GET  /api/topics           β†’ Topic clusters
  GET  /api/trends           β†’ Time series + forecast
  GET  /api/competitors      β†’ Competitor intelligence
  GET  /api/crisis           β†’ Crisis scan results
  POST /api/ingest           β†’ Add posts to the demo corpus
  GET  /api/health           β†’ Health check
"""

from __future__ import annotations

import logging
import sys
import time
from typing import List, Optional
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field

# ─── Internal modules ─────────────────────────────────────────────────────
sys.path.append(".")

from data.sample_data import generate_posts, generate_competitor_data, generate_time_series
from nlp.sentiment import get_analyzer
from nlp.topic_model import get_modeler
from nlp.trend_analysis import get_trend_analyzer
from nlp.crisis_detector import get_crisis_detector
from nlp.competitor_intel import get_competitor_intel

logging.basicConfig(level=logging.INFO, format="%(asctime)s β”‚ %(levelname)s β”‚ %(message)s")
logger = logging.getLogger(__name__)

# ─── In-memory state (replace with DB for production) ─────────────────────
_corpus: List[dict] = []
_analysis_cache: dict = {}
_initialized = False


def _bootstrap() -> None:
    """Generate sample data, run NLP pipeline, cache results."""
    global _corpus, _analysis_cache, _initialized

    logger.info("Bootstrapping platform with sample data...")
    t0 = time.time()

    # Generate posts
    _corpus = generate_posts(n=400)
    texts = [p["text"] for p in _corpus]

    # ── Sentiment analysis ────────────────────────────────────────────
    analyzer = get_analyzer()
    logger.info(f"Running sentiment on {len(texts)} posts (mode: {analyzer.mode})...")
    sentiments = analyzer.batch_analyze(texts)
    for i, post in enumerate(_corpus):
        post["sentiment"] = sentiments[i]["label"]
        post["sentiment_score"] = sentiments[i]["score"]

    # ── Topic modeling ────────────────────────────────────────────────
    logger.info("Fitting topic model...")
    modeler = get_modeler(n_topics=8)
    modeler.fit(texts)
    topic_labels = modeler.get_document_topics(texts)
    for i, post in enumerate(_corpus):
        post["topic_id"] = topic_labels[i]
        post["topic_name"] = modeler.topic_names[topic_labels[i]]

    sentiment_labels = [p["sentiment"] for p in _corpus]
    topics_summary = modeler.get_topics_summary(texts, sentiments=sentiment_labels)

    # ── Trend analysis ────────────────────────────────────────────────
    logger.info("Running trend analysis...")
    trend_analyzer = get_trend_analyzer()
    raw_series = trend_analyzer.aggregate_posts_to_series(_corpus)
    # Merge with richer pre-generated series for longer history
    extended_series = generate_time_series(days=90)
    trend_data = trend_analyzer.analyze_time_series(extended_series)

    # ── Crisis detection ──────────────────────────────────────────────
    logger.info("Running crisis scan...")
    detector = get_crisis_detector()
    crisis_report = detector.scan_corpus(_corpus)
    volume_spike = detector.detect_volume_spike(raw_series)

    # ── Competitor intelligence ───────────────────────────────────────
    logger.info("Building competitor intelligence...")
    intel = get_competitor_intel()
    comp_report = intel.build_competitive_report(
        _corpus,
        brand_name="TechFlow",
        brand_overall_sentiment=float(trend_data["trend"]["avg_30d"]),
    )

    # ── Assemble dashboard payload ────────────────────────────────────
    pos_count = sum(1 for p in _corpus if p["sentiment"] == "positive")
    neg_count = sum(1 for p in _corpus if p["sentiment"] == "negative")
    total = len(_corpus)

    _analysis_cache = {
        "meta": {
            "total_posts": total,
            "model_mode": analyzer.mode,
            "bootstrapped_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "elapsed_seconds": round(time.time() - t0, 1),
        },
        "summary": {
            "overall_sentiment": trend_data["trend"]["current_sentiment"],
            "avg_7d_sentiment": trend_data["trend"]["avg_7d"],
            "avg_30d_sentiment": trend_data["trend"]["avg_30d"],
            "delta": trend_data["trend"]["delta_7d_vs_30d"],
            "trend_direction": trend_data["trend"]["direction"],
            "total_volume": trend_data["trend"]["total_volume"],
            "avg_daily_volume": trend_data["trend"]["avg_daily_volume"],
            "positive_count": pos_count,
            "negative_count": neg_count,
            "neutral_count": total - pos_count - neg_count,
            "positive_pct": round(100 * pos_count / total, 1),
            "negative_pct": round(100 * neg_count / total, 1),
            "nps_estimate": round((pos_count - neg_count) / total * 100, 1),
            "crisis_alert": crisis_report["overall_alert_level"],
        },
        "topics": topics_summary,
        "trends": trend_data,
        "crisis": crisis_report,
        "volume_spike": volume_spike,
        "competitors": comp_report,
        "recent_posts": _corpus[:50],
    }

    _initialized = True
    logger.info(f"Bootstrap complete in {time.time() - t0:.1f}s")


# ─── App startup ───────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
    _bootstrap()
    yield


app = FastAPI(
    title="Social Intelligence Platform API",
    description="AI-powered brand monitoring, sentiment analysis, and competitor intelligence.",
    version="1.0.0",
    lifespan=lifespan,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# ─── Schemas ───────────────────────────────────────────────────────────────
class AnalyzeRequest(BaseModel):
    text: str = Field(..., min_length=1, max_length=2000)
    include_aspects: bool = True
    include_crisis: bool = True


class BatchAnalyzeRequest(BaseModel):
    texts: List[str] = Field(..., min_items=1, max_items=200)


class IngestRequest(BaseModel):
    posts: List[dict]


# ─── Routes ────────────────────────────────────────────────────────────────
@app.get("/api/health")
async def health():
    return {
        "status": "ok",
        "initialized": _initialized,
        "corpus_size": len(_corpus),
        "model_mode": get_analyzer().mode,
    }


@app.get("/api/dashboard")
async def dashboard():
    if not _initialized:
        raise HTTPException(503, "Platform is initializing. Please try again in a moment.")
    return _analysis_cache


@app.get("/api/summary")
async def summary():
    if not _initialized:
        raise HTTPException(503, "Initializing...")
    return _analysis_cache["summary"]


@app.get("/api/topics")
async def topics():
    if not _initialized:
        raise HTTPException(503, "Initializing...")
    return {"topics": _analysis_cache["topics"]}


@app.get("/api/trends")
async def trends():
    if not _initialized:
        raise HTTPException(503, "Initializing...")
    return _analysis_cache["trends"]


@app.get("/api/crisis")
async def crisis():
    if not _initialized:
        raise HTTPException(503, "Initializing...")
    return {
        "crisis": _analysis_cache["crisis"],
        "volume_spike": _analysis_cache.get("volume_spike"),
    }


@app.get("/api/competitors")
async def competitors():
    if not _initialized:
        raise HTTPException(503, "Initializing...")
    return _analysis_cache["competitors"]


@app.get("/api/posts")
async def posts(limit: int = 50, sentiment: Optional[str] = None, source: Optional[str] = None):
    filtered = _corpus
    if sentiment:
        filtered = [p for p in filtered if p.get("sentiment") == sentiment]
    if source:
        filtered = [p for p in filtered if p.get("source", "").lower() == source.lower()]
    return {"posts": filtered[:limit], "total": len(filtered)}


@app.post("/api/analyze")
async def analyze(req: AnalyzeRequest):
    """Real-time analysis of a single text."""
    analyzer = get_analyzer()
    sentiment = analyzer.analyze(req.text)

    result = {"text": req.text, "sentiment": sentiment}

    if req.include_aspects:
        aspects = analyzer.analyze_aspects(req.text)
        result["aspects"] = aspects

    if req.include_crisis:
        detector = get_crisis_detector()
        crisis = detector.score_post(req.text)
        result["crisis"] = crisis

    return result


@app.post("/api/batch-analyze")
async def batch_analyze(req: BatchAnalyzeRequest):
    """Batch analysis of multiple texts."""
    analyzer = get_analyzer()
    results = analyzer.batch_analyze(req.texts)
    return {"results": results, "count": len(results)}


@app.post("/api/ingest")
async def ingest(req: IngestRequest, background_tasks: BackgroundTasks):
    """Add new posts to the corpus and trigger re-analysis."""
    global _corpus
    _corpus = req.posts + _corpus
    background_tasks.add_task(_bootstrap)
    return {"status": "accepted", "posts_added": len(req.posts), "total": len(_corpus)}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)