topic-analysis / backend /app /api /webhooks.py
alexchilton
Initial deployment: Sentiment & Topic Analysis Dashboard
6242ddb
"""Webhook and SSE endpoints for real-time ingestion."""
from __future__ import annotations
import asyncio
import json
from datetime import datetime
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
from sse_starlette.sse import EventSourceResponse
from app.core.logging import get_logger
from app.core.security import get_api_key, verify_webhook_signature
from app.models.schemas import AnalysisStatus, FeedbackEntry, JobStatus, WebhookPayload
from app.services.analysis_pipeline import run_analysis
from app.services.redis_client import subscribe_events
logger = get_logger(__name__)
router = APIRouter(prefix="/api/v1", tags=["realtime"])
@router.post("/webhooks/ingest", response_model=JobStatus)
async def webhook_ingest(
request: Request,
background_tasks: BackgroundTasks,
):
"""Receive data via webhook with Stripe-style signature verification."""
body = await request.body()
signature = request.headers.get("X-Signature", "")
timestamp = request.headers.get("X-Timestamp", "")
if not verify_webhook_signature(body, signature, timestamp):
raise HTTPException(status_code=401, detail="Invalid webhook signature")
try:
payload = WebhookPayload.model_validate_json(body)
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Invalid payload: {exc}")
if not payload.data:
raise HTTPException(status_code=400, detail="No data entries in payload")
import uuid
job_id = uuid.uuid4().hex[:12]
logger.info("webhook_received", job_id=job_id, event=payload.event_type, entries=len(payload.data))
entries = [
FeedbackEntry(
id=e.id,
text=e.text,
source=payload.source or e.source or "webhook",
timestamp=e.timestamp or datetime.utcnow(),
metadata=e.metadata,
)
for e in payload.data
]
background_tasks.add_task(run_analysis, entries, job_id)
return JobStatus(
job_id=job_id,
status=AnalysisStatus.PENDING,
progress=0.0,
message=f"Webhook: processing {len(entries)} entries",
created_at=datetime.utcnow(),
)
@router.get("/events/analysis")
async def analysis_events(request: Request, api_key: str = Depends(get_api_key)):
"""Server-Sent Events stream for live analysis updates."""
async def event_generator():
try:
async for data in subscribe_events("analysis_updates"):
if await request.is_disconnected():
break
yield {
"event": "analysis_update",
"data": json.dumps(data),
}
except asyncio.CancelledError:
pass
except Exception as exc:
logger.error("sse_error", error=str(exc))
return EventSourceResponse(event_generator())
@router.get("/events/anomalies")
async def anomaly_events(request: Request, api_key: str = Depends(get_api_key)):
"""SSE stream for anomaly alerts."""
async def event_generator():
try:
async for data in subscribe_events("anomaly_alerts"):
if await request.is_disconnected():
break
yield {
"event": "anomaly_alert",
"data": json.dumps(data),
}
except asyncio.CancelledError:
pass
return EventSourceResponse(event_generator())