File size: 4,761 Bytes
7c19d46 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | #!/usr/bin/env python3
# =============================================================================
# DevSecOps Platform — Security Audit Automation
# =============================================================================
# Runs all security scans, generates compliance report
# =============================================================================
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
class SecurityAuditor:
"""Automated security audit runner for DevSecOps platform."""
def __init__(self, output_dir: str = "./audit-reports"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.results: Dict = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"scans": {},
}
def _run_command(self, cmd: List[str], name: str) -> Dict:
"""Run a shell command and capture results."""
print(f"[→] Running {name}...")
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=600
)
return {
"exit_code": result.returncode,
"stdout": result.stdout[:10000],
"stderr": result.stderr[:5000],
"success": result.returncode == 0,
}
except subprocess.TimeoutExpired:
return {"exit_code": -1, "error": "timeout", "success": False}
except FileNotFoundError:
return {"exit_code": -1, "error": "command not found", "success": False}
def scan_iac(self, directory: str = "terraform/") -> Dict:
"""Run IaC security scans."""
results = {}
# Checkov
r = self._run_command(
["checkov", "-d", directory, "--output", "json", "--compact"],
"Checkov IaC Scan",
)
results["checkov"] = r
# Trivy IaC
r = self._run_command(
["trivy", "fs", "--scanners", "misconfig,secret", directory],
"Trivy IaC Scan",
)
results["trivy_iac"] = r
self.results["scans"]["iac"] = results
return results
def scan_container(self, image: str) -> Dict:
"""Run container security scans."""
results = {}
# Trivy image
r = self._run_command(
["trivy", "image", "--severity", "CRITICAL,HIGH", image],
f"Trivy Container Scan ({image})",
)
results["trivy_image"] = r
self.results["scans"]["container"] = results
return results
def scan_kubernetes(self, kubeconfig: Optional[str] = None) -> Dict:
"""Run Kubernetes security scans."""
results = {}
env = {"KUBECONFIG": kubeconfig} if kubeconfig else None
# kube-bench
r = self._run_command(
["kube-bench", "run", "--targets", "master,node,etcd,policies"],
"kube-bench CIS Benchmark",
)
results["kube_bench"] = r
# kubectl checks
checks = [
(["kubectl", "auth", "can-i", "--list"], "RBAC audit"),
(["kubectl", "get", "networkpolicies", "-A"], "Network policies"),
(["kubectl", "get", "clusterpolicies", "-A"], "Kyverno policies"),
]
for cmd, name in checks:
r = self._run_command(cmd, f"k8s: {name}")
results[name] = r
self.results["scans"]["kubernetes"] = results
return results
def generate_report(self) -> str:
"""Generate summary report."""
report_path = self.output_dir / f"audit-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
with open(report_path, "w") as f:
json.dump(self.results, f, indent=2, default=str)
# Print summary
total = sum(len(v) for v in self.results["scans"].values())
passed = sum(
1 for cat in self.results["scans"].values()
for r in cat.values() if isinstance(r, dict) and r.get("success")
)
print(f"\n{'='*60}")
print(f"SECURITY AUDIT SUMMARY")
print(f"{'='*60}")
print(f"Timestamp: {self.results['timestamp']}")
print(f"Total scans: {total}")
print(f"Passed: {passed}")
print(f"Failed: {total - passed}")
print(f"Report: {report_path}")
print(f"{'='*60}")
return str(report_path)
if __name__ == "__main__":
auditor = SecurityAuditor()
# Run all scans
auditor.scan_iac("terraform/")
auditor.scan_container("ecr.aws/devsecops/backend:latest")
auditor.scan_kubernetes()
# Generate report
report = auditor.generate_report()
print(f"\nFull report: {report}")
|