Spaces:
Sleeping
Sleeping
| """Webhook and SSE endpoints for real-time ingestion.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| from datetime import datetime | |
| from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request | |
| from sse_starlette.sse import EventSourceResponse | |
| from app.core.logging import get_logger | |
| from app.core.security import get_api_key, verify_webhook_signature | |
| from app.models.schemas import AnalysisStatus, FeedbackEntry, JobStatus, WebhookPayload | |
| from app.services.analysis_pipeline import run_analysis | |
| from app.services.redis_client import subscribe_events | |
| logger = get_logger(__name__) | |
| router = APIRouter(prefix="/api/v1", tags=["realtime"]) | |
| async def webhook_ingest( | |
| request: Request, | |
| background_tasks: BackgroundTasks, | |
| ): | |
| """Receive data via webhook with Stripe-style signature verification.""" | |
| body = await request.body() | |
| signature = request.headers.get("X-Signature", "") | |
| timestamp = request.headers.get("X-Timestamp", "") | |
| if not verify_webhook_signature(body, signature, timestamp): | |
| raise HTTPException(status_code=401, detail="Invalid webhook signature") | |
| try: | |
| payload = WebhookPayload.model_validate_json(body) | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=f"Invalid payload: {exc}") | |
| if not payload.data: | |
| raise HTTPException(status_code=400, detail="No data entries in payload") | |
| import uuid | |
| job_id = uuid.uuid4().hex[:12] | |
| logger.info("webhook_received", job_id=job_id, event=payload.event_type, entries=len(payload.data)) | |
| entries = [ | |
| FeedbackEntry( | |
| id=e.id, | |
| text=e.text, | |
| source=payload.source or e.source or "webhook", | |
| timestamp=e.timestamp or datetime.utcnow(), | |
| metadata=e.metadata, | |
| ) | |
| for e in payload.data | |
| ] | |
| background_tasks.add_task(run_analysis, entries, job_id) | |
| return JobStatus( | |
| job_id=job_id, | |
| status=AnalysisStatus.PENDING, | |
| progress=0.0, | |
| message=f"Webhook: processing {len(entries)} entries", | |
| created_at=datetime.utcnow(), | |
| ) | |
| async def analysis_events(request: Request, api_key: str = Depends(get_api_key)): | |
| """Server-Sent Events stream for live analysis updates.""" | |
| async def event_generator(): | |
| try: | |
| async for data in subscribe_events("analysis_updates"): | |
| if await request.is_disconnected(): | |
| break | |
| yield { | |
| "event": "analysis_update", | |
| "data": json.dumps(data), | |
| } | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as exc: | |
| logger.error("sse_error", error=str(exc)) | |
| return EventSourceResponse(event_generator()) | |
| async def anomaly_events(request: Request, api_key: str = Depends(get_api_key)): | |
| """SSE stream for anomaly alerts.""" | |
| async def event_generator(): | |
| try: | |
| async for data in subscribe_events("anomaly_alerts"): | |
| if await request.is_disconnected(): | |
| break | |
| yield { | |
| "event": "anomaly_alert", | |
| "data": json.dumps(data), | |
| } | |
| except asyncio.CancelledError: | |
| pass | |
| return EventSourceResponse(event_generator()) | |