SentinelAI / agents /remediation_agent.py
iitian's picture
Sync SentinelAI project and add Hugging Face Docker Space layout.
8b3905d
"""Remediation agent — automated playbooks, scripts, and infra hints."""
from __future__ import annotations
from models.schemas import AnalystReport, Incident, RemediationPlan, RiskAssessment
def build_remediation(incident: Incident, risk: RiskAssessment, report: AnalystReport | None = None) -> RemediationPlan:
actions: list[dict] = []
fw: list[str] = []
scripts: list[str] = []
k8s: list[str] = []
iam: list[str] = []
iocs = report.indicators if report else []
for ip in iocs:
fw.append(f"iptables -A INPUT -s {ip} -j DROP # SentinelAI auto-block")
fw.append(f"nft add rule inet filter input ip saddr {ip} drop")
scripts.append(
"""#!/usr/bin/env bash
set -euo pipefail
echo "[SentinelAI] Rotating exposed SSH keys & invalidating sessions"
sudo passwd -l $(awk -F: '$3 == 0 {print $1}' /etc/passwd) 2>/dev/null || true
"""
)
k8s.append(
"""apiVersion: v1
kind: NetworkPolicy
metadata:
name: sentinelai-deny-suspicious
spec:
podSelector: {}
policyTypes:
- Ingress
ingress:
- from:
- ipBlock:
cidr: 0.0.0.0/0
"""
)
iam.extend(
[
"Enforce MFA on all break-glass accounts",
"Scope IAM roles with session duration <= 1h",
"Enable CloudTrail data events on sensitive buckets",
]
)
actions.extend(
[
{"type": "isolate", "detail": "Network isolate affected host via SOC VLAN quarantine"},
{"type": "credential", "detail": "Force password/ key rotation for implicated users"},
{"type": "monitoring", "detail": "Increase log verbosity and enable EDR kernel module"},
]
)
if risk.severity.value in {"critical", "high"}:
actions.append({"type": "war_room", "detail": "Page incident commander + legal/comms"})
return RemediationPlan(
incident_id=incident.id,
actions=actions,
firewall_rules=fw,
scripts=scripts,
k8s_patches=k8s,
iam_hardening=iam,
)