| |
| |
| |
| |
| |
| |
|
|
| import json |
| import subprocess |
| import sys |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
|
|
| class SecurityAuditor: |
| """Automated security audit runner for DevSecOps platform.""" |
|
|
| def __init__(self, output_dir: str = "./audit-reports"): |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
| self.results: Dict = { |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "scans": {}, |
| } |
|
|
| def _run_command(self, cmd: List[str], name: str) -> Dict: |
| """Run a shell command and capture results.""" |
| print(f"[→] Running {name}...") |
| try: |
| result = subprocess.run( |
| cmd, capture_output=True, text=True, timeout=600 |
| ) |
| return { |
| "exit_code": result.returncode, |
| "stdout": result.stdout[:10000], |
| "stderr": result.stderr[:5000], |
| "success": result.returncode == 0, |
| } |
| except subprocess.TimeoutExpired: |
| return {"exit_code": -1, "error": "timeout", "success": False} |
| except FileNotFoundError: |
| return {"exit_code": -1, "error": "command not found", "success": False} |
|
|
| def scan_iac(self, directory: str = "terraform/") -> Dict: |
| """Run IaC security scans.""" |
| results = {} |
|
|
| |
| r = self._run_command( |
| ["checkov", "-d", directory, "--output", "json", "--compact"], |
| "Checkov IaC Scan", |
| ) |
| results["checkov"] = r |
|
|
| |
| r = self._run_command( |
| ["trivy", "fs", "--scanners", "misconfig,secret", directory], |
| "Trivy IaC Scan", |
| ) |
| results["trivy_iac"] = r |
|
|
| self.results["scans"]["iac"] = results |
| return results |
|
|
| def scan_container(self, image: str) -> Dict: |
| """Run container security scans.""" |
| results = {} |
|
|
| |
| r = self._run_command( |
| ["trivy", "image", "--severity", "CRITICAL,HIGH", image], |
| f"Trivy Container Scan ({image})", |
| ) |
| results["trivy_image"] = r |
|
|
| self.results["scans"]["container"] = results |
| return results |
|
|
| def scan_kubernetes(self, kubeconfig: Optional[str] = None) -> Dict: |
| """Run Kubernetes security scans.""" |
| results = {} |
| env = {"KUBECONFIG": kubeconfig} if kubeconfig else None |
|
|
| |
| r = self._run_command( |
| ["kube-bench", "run", "--targets", "master,node,etcd,policies"], |
| "kube-bench CIS Benchmark", |
| ) |
| results["kube_bench"] = r |
|
|
| |
| checks = [ |
| (["kubectl", "auth", "can-i", "--list"], "RBAC audit"), |
| (["kubectl", "get", "networkpolicies", "-A"], "Network policies"), |
| (["kubectl", "get", "clusterpolicies", "-A"], "Kyverno policies"), |
| ] |
| for cmd, name in checks: |
| r = self._run_command(cmd, f"k8s: {name}") |
| results[name] = r |
|
|
| self.results["scans"]["kubernetes"] = results |
| return results |
|
|
| def generate_report(self) -> str: |
| """Generate summary report.""" |
| report_path = self.output_dir / f"audit-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json" |
| with open(report_path, "w") as f: |
| json.dump(self.results, f, indent=2, default=str) |
|
|
| |
| total = sum(len(v) for v in self.results["scans"].values()) |
| passed = sum( |
| 1 for cat in self.results["scans"].values() |
| for r in cat.values() if isinstance(r, dict) and r.get("success") |
| ) |
| print(f"\n{'='*60}") |
| print(f"SECURITY AUDIT SUMMARY") |
| print(f"{'='*60}") |
| print(f"Timestamp: {self.results['timestamp']}") |
| print(f"Total scans: {total}") |
| print(f"Passed: {passed}") |
| print(f"Failed: {total - passed}") |
| print(f"Report: {report_path}") |
| print(f"{'='*60}") |
|
|
| return str(report_path) |
|
|
|
|
| if __name__ == "__main__": |
| auditor = SecurityAuditor() |
|
|
| |
| auditor.scan_iac("terraform/") |
| auditor.scan_container("ecr.aws/devsecops/backend:latest") |
| auditor.scan_kubernetes() |
|
|
| |
| report = auditor.generate_report() |
| print(f"\nFull report: {report}") |
|
|