Spaces:
Sleeping
Sleeping
File size: 2,043 Bytes
8b3905d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | """Remediation agent — automated playbooks, scripts, and infra hints."""
from __future__ import annotations
from models.schemas import AnalystReport, Incident, RemediationPlan, RiskAssessment
def build_remediation(incident: Incident, risk: RiskAssessment, report: AnalystReport | None = None) -> RemediationPlan:
actions: list[dict] = []
fw: list[str] = []
scripts: list[str] = []
k8s: list[str] = []
iam: list[str] = []
iocs = report.indicators if report else []
for ip in iocs:
fw.append(f"iptables -A INPUT -s {ip} -j DROP # SentinelAI auto-block")
fw.append(f"nft add rule inet filter input ip saddr {ip} drop")
scripts.append(
"""#!/usr/bin/env bash
set -euo pipefail
echo "[SentinelAI] Rotating exposed SSH keys & invalidating sessions"
sudo passwd -l $(awk -F: '$3 == 0 {print $1}' /etc/passwd) 2>/dev/null || true
"""
)
k8s.append(
"""apiVersion: v1
kind: NetworkPolicy
metadata:
name: sentinelai-deny-suspicious
spec:
podSelector: {}
policyTypes:
- Ingress
ingress:
- from:
- ipBlock:
cidr: 0.0.0.0/0
"""
)
iam.extend(
[
"Enforce MFA on all break-glass accounts",
"Scope IAM roles with session duration <= 1h",
"Enable CloudTrail data events on sensitive buckets",
]
)
actions.extend(
[
{"type": "isolate", "detail": "Network isolate affected host via SOC VLAN quarantine"},
{"type": "credential", "detail": "Force password/ key rotation for implicated users"},
{"type": "monitoring", "detail": "Increase log verbosity and enable EDR kernel module"},
]
)
if risk.severity.value in {"critical", "high"}:
actions.append({"type": "war_room", "detail": "Page incident commander + legal/comms"})
return RemediationPlan(
incident_id=incident.id,
actions=actions,
firewall_rules=fw,
scripts=scripts,
k8s_patches=k8s,
iam_hardening=iam,
)
|