#!/usr/bin/env python3 # ============================================================================= # DevSecOps Platform — Security Audit Automation # ============================================================================= # Runs all security scans, generates compliance report # ============================================================================= import json import subprocess import sys from datetime import datetime from pathlib import Path from typing import Dict, List, Optional class SecurityAuditor: """Automated security audit runner for DevSecOps platform.""" def __init__(self, output_dir: str = "./audit-reports"): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.results: Dict = { "timestamp": datetime.utcnow().isoformat() + "Z", "scans": {}, } def _run_command(self, cmd: List[str], name: str) -> Dict: """Run a shell command and capture results.""" print(f"[→] Running {name}...") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=600 ) return { "exit_code": result.returncode, "stdout": result.stdout[:10000], "stderr": result.stderr[:5000], "success": result.returncode == 0, } except subprocess.TimeoutExpired: return {"exit_code": -1, "error": "timeout", "success": False} except FileNotFoundError: return {"exit_code": -1, "error": "command not found", "success": False} def scan_iac(self, directory: str = "terraform/") -> Dict: """Run IaC security scans.""" results = {} # Checkov r = self._run_command( ["checkov", "-d", directory, "--output", "json", "--compact"], "Checkov IaC Scan", ) results["checkov"] = r # Trivy IaC r = self._run_command( ["trivy", "fs", "--scanners", "misconfig,secret", directory], "Trivy IaC Scan", ) results["trivy_iac"] = r self.results["scans"]["iac"] = results return results def scan_container(self, image: str) -> Dict: """Run container security scans.""" results = {} # Trivy image r = self._run_command( ["trivy", "image", "--severity", "CRITICAL,HIGH", image], f"Trivy Container Scan ({image})", ) results["trivy_image"] = r self.results["scans"]["container"] = results return results def scan_kubernetes(self, kubeconfig: Optional[str] = None) -> Dict: """Run Kubernetes security scans.""" results = {} env = {"KUBECONFIG": kubeconfig} if kubeconfig else None # kube-bench r = self._run_command( ["kube-bench", "run", "--targets", "master,node,etcd,policies"], "kube-bench CIS Benchmark", ) results["kube_bench"] = r # kubectl checks checks = [ (["kubectl", "auth", "can-i", "--list"], "RBAC audit"), (["kubectl", "get", "networkpolicies", "-A"], "Network policies"), (["kubectl", "get", "clusterpolicies", "-A"], "Kyverno policies"), ] for cmd, name in checks: r = self._run_command(cmd, f"k8s: {name}") results[name] = r self.results["scans"]["kubernetes"] = results return results def generate_report(self) -> str: """Generate summary report.""" report_path = self.output_dir / f"audit-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json" with open(report_path, "w") as f: json.dump(self.results, f, indent=2, default=str) # Print summary total = sum(len(v) for v in self.results["scans"].values()) passed = sum( 1 for cat in self.results["scans"].values() for r in cat.values() if isinstance(r, dict) and r.get("success") ) print(f"\n{'='*60}") print(f"SECURITY AUDIT SUMMARY") print(f"{'='*60}") print(f"Timestamp: {self.results['timestamp']}") print(f"Total scans: {total}") print(f"Passed: {passed}") print(f"Failed: {total - passed}") print(f"Report: {report_path}") print(f"{'='*60}") return str(report_path) if __name__ == "__main__": auditor = SecurityAuditor() # Run all scans auditor.scan_iac("terraform/") auditor.scan_container("ecr.aws/devsecops/backend:latest") auditor.scan_kubernetes() # Generate report report = auditor.generate_report() print(f"\nFull report: {report}")