devsecops-platform / security /scanning /security_audit.py
shaikhsalman's picture
refactor: merged structure - model at center, DevSecOps wrapped around it
9d4d5c7 verified
#!/usr/bin/env python3
# =============================================================================
# DevSecOps Platform — Security Audit Automation
# =============================================================================
# Runs all security scans, generates compliance report
# =============================================================================
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
class SecurityAuditor:
"""Automated security audit runner for DevSecOps platform."""
def __init__(self, output_dir: str = "./audit-reports"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.results: Dict = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"scans": {},
}
def _run_command(self, cmd: List[str], name: str) -> Dict:
"""Run a shell command and capture results."""
print(f"[→] Running {name}...")
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=600
)
return {
"exit_code": result.returncode,
"stdout": result.stdout[:10000],
"stderr": result.stderr[:5000],
"success": result.returncode == 0,
}
except subprocess.TimeoutExpired:
return {"exit_code": -1, "error": "timeout", "success": False}
except FileNotFoundError:
return {"exit_code": -1, "error": "command not found", "success": False}
def scan_iac(self, directory: str = "terraform/") -> Dict:
"""Run IaC security scans."""
results = {}
# Checkov
r = self._run_command(
["checkov", "-d", directory, "--output", "json", "--compact"],
"Checkov IaC Scan",
)
results["checkov"] = r
# Trivy IaC
r = self._run_command(
["trivy", "fs", "--scanners", "misconfig,secret", directory],
"Trivy IaC Scan",
)
results["trivy_iac"] = r
self.results["scans"]["iac"] = results
return results
def scan_container(self, image: str) -> Dict:
"""Run container security scans."""
results = {}
# Trivy image
r = self._run_command(
["trivy", "image", "--severity", "CRITICAL,HIGH", image],
f"Trivy Container Scan ({image})",
)
results["trivy_image"] = r
self.results["scans"]["container"] = results
return results
def scan_kubernetes(self, kubeconfig: Optional[str] = None) -> Dict:
"""Run Kubernetes security scans."""
results = {}
env = {"KUBECONFIG": kubeconfig} if kubeconfig else None
# kube-bench
r = self._run_command(
["kube-bench", "run", "--targets", "master,node,etcd,policies"],
"kube-bench CIS Benchmark",
)
results["kube_bench"] = r
# kubectl checks
checks = [
(["kubectl", "auth", "can-i", "--list"], "RBAC audit"),
(["kubectl", "get", "networkpolicies", "-A"], "Network policies"),
(["kubectl", "get", "clusterpolicies", "-A"], "Kyverno policies"),
]
for cmd, name in checks:
r = self._run_command(cmd, f"k8s: {name}")
results[name] = r
self.results["scans"]["kubernetes"] = results
return results
def generate_report(self) -> str:
"""Generate summary report."""
report_path = self.output_dir / f"audit-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
with open(report_path, "w") as f:
json.dump(self.results, f, indent=2, default=str)
# Print summary
total = sum(len(v) for v in self.results["scans"].values())
passed = sum(
1 for cat in self.results["scans"].values()
for r in cat.values() if isinstance(r, dict) and r.get("success")
)
print(f"\n{'='*60}")
print(f"SECURITY AUDIT SUMMARY")
print(f"{'='*60}")
print(f"Timestamp: {self.results['timestamp']}")
print(f"Total scans: {total}")
print(f"Passed: {passed}")
print(f"Failed: {total - passed}")
print(f"Report: {report_path}")
print(f"{'='*60}")
return str(report_path)
if __name__ == "__main__":
auditor = SecurityAuditor()
# Run all scans
auditor.scan_iac("terraform/")
auditor.scan_container("ecr.aws/devsecops/backend:latest")
auditor.scan_kubernetes()
# Generate report
report = auditor.generate_report()
print(f"\nFull report: {report}")