Spaces:
Running
Running
File size: 8,059 Bytes
8b3905d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | """End-to-end async pipeline wiring parsers, agents, persistence, and hub."""
from __future__ import annotations
import asyncio
import logging
import os
import time
from datetime import datetime, timezone
from typing import Any
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from agents.ai_analyst_agent import generate_analyst_report
from agents.alerting_agent import send_alert
from agents.incident_correlation_agent import correlate
from agents.normalization_agent import normalize_event
from agents.remediation_agent import build_remediation
from agents.risk_scoring_agent import score_incident
from agents.threat_detection_agent import detect_threats
from agents.threat_enrichment_agent import enrich_event
from database.models import AlertRecord, EventRecord, IncidentRecord
from models.schemas import AlertPayload, RawLogIngest, Severity
from parsers.parser_agent import parse_raw
from services.chroma_memory import remember_incident
from services.event_hub import EventHub
from services.metrics_store import MetricsStore
logger = logging.getLogger("sentinelai.pipeline")
class SentinelPipeline:
def __init__(self, hub: EventHub, metrics: MetricsStore) -> None:
self.hub = hub
self.metrics = metrics
self._events: list[Any] = []
self._findings: list[Any] = []
self._incidents: list[Any] = []
self._last_auto_ai_at: float = 0.0
async def ingest_from_collector(self, ingest: RawLogIngest) -> None:
"""Fire-and-forget path for autonomous collectors (no DB session)."""
try:
await self.ingest(ingest, None)
except Exception: # noqa: BLE001
logger.exception("Collector ingest failed — continuing tail")
async def _auto_analyst(self, incident_id: UUID) -> None:
try:
await self.run_full_workflow_on_incident(incident_id, None)
except Exception: # noqa: BLE001
logger.exception("Auto AI analyst failed for incident %s", incident_id)
async def ingest(self, ingest: RawLogIngest, session: AsyncSession | None) -> dict[str, Any]:
await self.hub.log_agent("collector", "running", f"ingest {ingest.source}")
await self.hub.log_agent("parser", "running", "parse raw")
event = parse_raw(ingest)
event = normalize_event(event)
await self.hub.log_agent("normalization", "running", "schema unify")
await self.hub.log_agent("threat_enrichment", "running", "intel overlay")
enriched = await enrich_event(event)
geo = enriched.enrichment.get("geo") or {}
self.metrics.record_country(geo.get("countryCode"))
await self.hub.broadcast(
{
"type": "threat_feed",
"severity": enriched.severity.value,
"message": enriched.message,
"source_ip": enriched.source_ip,
"event_type": enriched.event_type,
"ts": enriched.timestamp.isoformat(),
}
)
await self.hub.log_agent("threat_detection", "running", "rules + heuristics")
findings = detect_threats(enriched)
for f in findings:
self.metrics.bump_threat()
await self.hub.broadcast(
{
"type": "detection",
"technique": f.technique,
"severity": f.severity.value,
"confidence": f.confidence,
"description": f.description,
}
)
self._events.append(enriched)
self._findings.extend(findings)
self.metrics.inc_frequency()
if session is not None:
await self._persist_event(session, enriched)
# Correlation & downstream when graph warrants
incidents = correlate(self._events, self._findings)
self._incidents = incidents
self.metrics.set_active_incidents(len(incidents))
out: dict[str, Any] = {"event_id": str(enriched.id), "findings": [f.model_dump() for f in findings]}
if incidents:
latest = incidents[-1]
risk = score_incident(latest, self._events, self._findings)
self.metrics.record_risk(risk.risk_score)
await self.hub.broadcast(
{
"type": "incident",
"title": latest.title,
"summary": latest.summary,
"risk": risk.model_dump(),
"timeline": latest.timeline,
}
)
remember_incident(latest.summary, {"severity": risk.severity.value})
if session is not None:
await self._persist_incident(session, latest, risk)
out["incident"] = latest.model_dump(mode="json")
out["risk"] = risk.model_dump()
if os.getenv("AUTO_AI_ON_INCIDENT", "1").lower() not in {"0", "false", "no", "off"}:
min_gap = float(os.getenv("AUTO_AI_MIN_SEC", "75"))
now = time.monotonic()
if now - self._last_auto_ai_at >= min_gap:
self._last_auto_ai_at = now
asyncio.create_task(self._auto_analyst(latest.id))
return out
async def run_full_workflow_on_incident(self, incident_id: UUID, session: AsyncSession | None) -> dict[str, Any]:
inc = next((i for i in self._incidents if i.id == incident_id), None)
if not inc:
return {"error": "incident not found"}
risk = score_incident(inc, self._events, self._findings)
await self.hub.log_agent("ai_analyst", "running", "LLM / ROCm inference")
report = await generate_analyst_report(inc, risk)
await self.hub.log_agent("remediation", "running", "playbook synthesis")
rem = build_remediation(inc, risk, report)
await self.hub.broadcast(
{
"type": "ai_report",
"incident_id": str(inc.id),
"executive": report.executive_summary,
"technical": report.technical_analysis,
"investigation_notes": report.investigation_notes,
"recommended_actions": report.recommended_actions,
}
)
payload = {
"incident": inc.model_dump(mode="json"),
"risk": risk.model_dump(),
"report": report.model_dump(),
"remediation": rem.model_dump(),
}
if session is not None:
await self._persist_alert(session, "stored", f"Analyst report {inc.id}", report.executive_summary, risk.severity, inc.id)
return payload
async def _persist_event(self, session: AsyncSession, enriched: Any) -> None:
rec = EventRecord(
id=enriched.id,
timestamp=enriched.timestamp,
event_type=enriched.event_type,
source_ip=enriched.source_ip,
host=enriched.host,
severity=enriched.severity.value,
payload=enriched.model_dump(mode="json"),
)
session.add(rec)
await session.commit()
async def _persist_incident(self, session: AsyncSession, incident: Any, risk: Any) -> None:
rec = IncidentRecord(
id=incident.id,
title=incident.title,
summary=incident.summary,
graph={"nodes": [n.model_dump(mode="json") for n in incident.nodes], "edges": [e.model_dump(mode="json") for e in incident.edges]},
risk_score=risk.risk_score,
severity=risk.severity.value,
)
session.add(rec)
await session.commit()
async def _persist_alert(self, session: AsyncSession, channel: str, title: str, body: str, severity: Severity, incident_id: UUID) -> None:
session.add(
AlertRecord(
channel=channel,
title=title,
body=body,
severity=severity.value,
incident_id=incident_id,
)
)
await session.commit()
|