Spaces:
Running
Running
| """End-to-end async pipeline wiring parsers, agents, persistence, and hub.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import os | |
| import time | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from uuid import UUID | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from agents.ai_analyst_agent import generate_analyst_report | |
| from agents.alerting_agent import send_alert | |
| from agents.incident_correlation_agent import correlate | |
| from agents.normalization_agent import normalize_event | |
| from agents.remediation_agent import build_remediation | |
| from agents.risk_scoring_agent import score_incident | |
| from agents.threat_detection_agent import detect_threats | |
| from agents.threat_enrichment_agent import enrich_event | |
| from database.models import AlertRecord, EventRecord, IncidentRecord | |
| from models.schemas import AlertPayload, RawLogIngest, Severity | |
| from parsers.parser_agent import parse_raw | |
| from services.chroma_memory import remember_incident | |
| from services.event_hub import EventHub | |
| from services.metrics_store import MetricsStore | |
| logger = logging.getLogger("sentinelai.pipeline") | |
| class SentinelPipeline: | |
| def __init__(self, hub: EventHub, metrics: MetricsStore) -> None: | |
| self.hub = hub | |
| self.metrics = metrics | |
| self._events: list[Any] = [] | |
| self._findings: list[Any] = [] | |
| self._incidents: list[Any] = [] | |
| self._last_auto_ai_at: float = 0.0 | |
| async def ingest_from_collector(self, ingest: RawLogIngest) -> None: | |
| """Fire-and-forget path for autonomous collectors (no DB session).""" | |
| try: | |
| await self.ingest(ingest, None) | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Collector ingest failed — continuing tail") | |
| async def _auto_analyst(self, incident_id: UUID) -> None: | |
| try: | |
| await self.run_full_workflow_on_incident(incident_id, None) | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Auto AI analyst failed for incident %s", incident_id) | |
| async def ingest(self, ingest: RawLogIngest, session: AsyncSession | None) -> dict[str, Any]: | |
| await self.hub.log_agent("collector", "running", f"ingest {ingest.source}") | |
| await self.hub.log_agent("parser", "running", "parse raw") | |
| event = parse_raw(ingest) | |
| event = normalize_event(event) | |
| await self.hub.log_agent("normalization", "running", "schema unify") | |
| await self.hub.log_agent("threat_enrichment", "running", "intel overlay") | |
| enriched = await enrich_event(event) | |
| geo = enriched.enrichment.get("geo") or {} | |
| self.metrics.record_country(geo.get("countryCode")) | |
| await self.hub.broadcast( | |
| { | |
| "type": "threat_feed", | |
| "severity": enriched.severity.value, | |
| "message": enriched.message, | |
| "source_ip": enriched.source_ip, | |
| "event_type": enriched.event_type, | |
| "ts": enriched.timestamp.isoformat(), | |
| } | |
| ) | |
| await self.hub.log_agent("threat_detection", "running", "rules + heuristics") | |
| findings = detect_threats(enriched) | |
| for f in findings: | |
| self.metrics.bump_threat() | |
| await self.hub.broadcast( | |
| { | |
| "type": "detection", | |
| "technique": f.technique, | |
| "severity": f.severity.value, | |
| "confidence": f.confidence, | |
| "description": f.description, | |
| } | |
| ) | |
| self._events.append(enriched) | |
| self._findings.extend(findings) | |
| self.metrics.inc_frequency() | |
| if session is not None: | |
| await self._persist_event(session, enriched) | |
| # Correlation & downstream when graph warrants | |
| incidents = correlate(self._events, self._findings) | |
| self._incidents = incidents | |
| self.metrics.set_active_incidents(len(incidents)) | |
| out: dict[str, Any] = {"event_id": str(enriched.id), "findings": [f.model_dump() for f in findings]} | |
| if incidents: | |
| latest = incidents[-1] | |
| risk = score_incident(latest, self._events, self._findings) | |
| self.metrics.record_risk(risk.risk_score) | |
| await self.hub.broadcast( | |
| { | |
| "type": "incident", | |
| "title": latest.title, | |
| "summary": latest.summary, | |
| "risk": risk.model_dump(), | |
| "timeline": latest.timeline, | |
| } | |
| ) | |
| remember_incident(latest.summary, {"severity": risk.severity.value}) | |
| if session is not None: | |
| await self._persist_incident(session, latest, risk) | |
| out["incident"] = latest.model_dump(mode="json") | |
| out["risk"] = risk.model_dump() | |
| if os.getenv("AUTO_AI_ON_INCIDENT", "1").lower() not in {"0", "false", "no", "off"}: | |
| min_gap = float(os.getenv("AUTO_AI_MIN_SEC", "75")) | |
| now = time.monotonic() | |
| if now - self._last_auto_ai_at >= min_gap: | |
| self._last_auto_ai_at = now | |
| asyncio.create_task(self._auto_analyst(latest.id)) | |
| return out | |
| async def run_full_workflow_on_incident(self, incident_id: UUID, session: AsyncSession | None) -> dict[str, Any]: | |
| inc = next((i for i in self._incidents if i.id == incident_id), None) | |
| if not inc: | |
| return {"error": "incident not found"} | |
| risk = score_incident(inc, self._events, self._findings) | |
| await self.hub.log_agent("ai_analyst", "running", "LLM / ROCm inference") | |
| report = await generate_analyst_report(inc, risk) | |
| await self.hub.log_agent("remediation", "running", "playbook synthesis") | |
| rem = build_remediation(inc, risk, report) | |
| await self.hub.broadcast( | |
| { | |
| "type": "ai_report", | |
| "incident_id": str(inc.id), | |
| "executive": report.executive_summary, | |
| "technical": report.technical_analysis, | |
| "investigation_notes": report.investigation_notes, | |
| "recommended_actions": report.recommended_actions, | |
| } | |
| ) | |
| payload = { | |
| "incident": inc.model_dump(mode="json"), | |
| "risk": risk.model_dump(), | |
| "report": report.model_dump(), | |
| "remediation": rem.model_dump(), | |
| } | |
| if session is not None: | |
| await self._persist_alert(session, "stored", f"Analyst report {inc.id}", report.executive_summary, risk.severity, inc.id) | |
| return payload | |
| async def _persist_event(self, session: AsyncSession, enriched: Any) -> None: | |
| rec = EventRecord( | |
| id=enriched.id, | |
| timestamp=enriched.timestamp, | |
| event_type=enriched.event_type, | |
| source_ip=enriched.source_ip, | |
| host=enriched.host, | |
| severity=enriched.severity.value, | |
| payload=enriched.model_dump(mode="json"), | |
| ) | |
| session.add(rec) | |
| await session.commit() | |
| async def _persist_incident(self, session: AsyncSession, incident: Any, risk: Any) -> None: | |
| rec = IncidentRecord( | |
| id=incident.id, | |
| title=incident.title, | |
| summary=incident.summary, | |
| graph={"nodes": [n.model_dump(mode="json") for n in incident.nodes], "edges": [e.model_dump(mode="json") for e in incident.edges]}, | |
| risk_score=risk.risk_score, | |
| severity=risk.severity.value, | |
| ) | |
| session.add(rec) | |
| await session.commit() | |
| async def _persist_alert(self, session: AsyncSession, channel: str, title: str, body: str, severity: Severity, incident_id: UUID) -> None: | |
| session.add( | |
| AlertRecord( | |
| channel=channel, | |
| title=title, | |
| body=body, | |
| severity=severity.value, | |
| incident_id=incident_id, | |
| ) | |
| ) | |
| await session.commit() | |