File size: 3,484 Bytes
6242ddb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Webhook and SSE endpoints for real-time ingestion."""

from __future__ import annotations

import asyncio
import json
from datetime import datetime

from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
from sse_starlette.sse import EventSourceResponse

from app.core.logging import get_logger
from app.core.security import get_api_key, verify_webhook_signature
from app.models.schemas import AnalysisStatus, FeedbackEntry, JobStatus, WebhookPayload
from app.services.analysis_pipeline import run_analysis
from app.services.redis_client import subscribe_events

logger = get_logger(__name__)
router = APIRouter(prefix="/api/v1", tags=["realtime"])


@router.post("/webhooks/ingest", response_model=JobStatus)
async def webhook_ingest(
    request: Request,
    background_tasks: BackgroundTasks,
):
    """Receive data via webhook with Stripe-style signature verification."""
    body = await request.body()
    signature = request.headers.get("X-Signature", "")
    timestamp = request.headers.get("X-Timestamp", "")

    if not verify_webhook_signature(body, signature, timestamp):
        raise HTTPException(status_code=401, detail="Invalid webhook signature")

    try:
        payload = WebhookPayload.model_validate_json(body)
    except Exception as exc:
        raise HTTPException(status_code=400, detail=f"Invalid payload: {exc}")

    if not payload.data:
        raise HTTPException(status_code=400, detail="No data entries in payload")

    import uuid

    job_id = uuid.uuid4().hex[:12]
    logger.info("webhook_received", job_id=job_id, event=payload.event_type, entries=len(payload.data))

    entries = [
        FeedbackEntry(
            id=e.id,
            text=e.text,
            source=payload.source or e.source or "webhook",
            timestamp=e.timestamp or datetime.utcnow(),
            metadata=e.metadata,
        )
        for e in payload.data
    ]

    background_tasks.add_task(run_analysis, entries, job_id)

    return JobStatus(
        job_id=job_id,
        status=AnalysisStatus.PENDING,
        progress=0.0,
        message=f"Webhook: processing {len(entries)} entries",
        created_at=datetime.utcnow(),
    )


@router.get("/events/analysis")
async def analysis_events(request: Request, api_key: str = Depends(get_api_key)):
    """Server-Sent Events stream for live analysis updates."""

    async def event_generator():
        try:
            async for data in subscribe_events("analysis_updates"):
                if await request.is_disconnected():
                    break
                yield {
                    "event": "analysis_update",
                    "data": json.dumps(data),
                }
        except asyncio.CancelledError:
            pass
        except Exception as exc:
            logger.error("sse_error", error=str(exc))

    return EventSourceResponse(event_generator())


@router.get("/events/anomalies")
async def anomaly_events(request: Request, api_key: str = Depends(get_api_key)):
    """SSE stream for anomaly alerts."""

    async def event_generator():
        try:
            async for data in subscribe_events("anomaly_alerts"):
                if await request.is_disconnected():
                    break
                yield {
                    "event": "anomaly_alert",
                    "data": json.dumps(data),
                }
        except asyncio.CancelledError:
            pass

    return EventSourceResponse(event_generator())