File size: 4,761 Bytes
7c19d46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
# =============================================================================
# DevSecOps Platform — Security Audit Automation
# =============================================================================
# Runs all security scans, generates compliance report
# =============================================================================

import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional


class SecurityAuditor:
    """Automated security audit runner for DevSecOps platform."""

    def __init__(self, output_dir: str = "./audit-reports"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.results: Dict = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "scans": {},
        }

    def _run_command(self, cmd: List[str], name: str) -> Dict:
        """Run a shell command and capture results."""
        print(f"[→] Running {name}...")
        try:
            result = subprocess.run(
                cmd, capture_output=True, text=True, timeout=600
            )
            return {
                "exit_code": result.returncode,
                "stdout": result.stdout[:10000],
                "stderr": result.stderr[:5000],
                "success": result.returncode == 0,
            }
        except subprocess.TimeoutExpired:
            return {"exit_code": -1, "error": "timeout", "success": False}
        except FileNotFoundError:
            return {"exit_code": -1, "error": "command not found", "success": False}

    def scan_iac(self, directory: str = "terraform/") -> Dict:
        """Run IaC security scans."""
        results = {}

        # Checkov
        r = self._run_command(
            ["checkov", "-d", directory, "--output", "json", "--compact"],
            "Checkov IaC Scan",
        )
        results["checkov"] = r

        # Trivy IaC
        r = self._run_command(
            ["trivy", "fs", "--scanners", "misconfig,secret", directory],
            "Trivy IaC Scan",
        )
        results["trivy_iac"] = r

        self.results["scans"]["iac"] = results
        return results

    def scan_container(self, image: str) -> Dict:
        """Run container security scans."""
        results = {}

        # Trivy image
        r = self._run_command(
            ["trivy", "image", "--severity", "CRITICAL,HIGH", image],
            f"Trivy Container Scan ({image})",
        )
        results["trivy_image"] = r

        self.results["scans"]["container"] = results
        return results

    def scan_kubernetes(self, kubeconfig: Optional[str] = None) -> Dict:
        """Run Kubernetes security scans."""
        results = {}
        env = {"KUBECONFIG": kubeconfig} if kubeconfig else None

        # kube-bench
        r = self._run_command(
            ["kube-bench", "run", "--targets", "master,node,etcd,policies"],
            "kube-bench CIS Benchmark",
        )
        results["kube_bench"] = r

        # kubectl checks
        checks = [
            (["kubectl", "auth", "can-i", "--list"], "RBAC audit"),
            (["kubectl", "get", "networkpolicies", "-A"], "Network policies"),
            (["kubectl", "get", "clusterpolicies", "-A"], "Kyverno policies"),
        ]
        for cmd, name in checks:
            r = self._run_command(cmd, f"k8s: {name}")
            results[name] = r

        self.results["scans"]["kubernetes"] = results
        return results

    def generate_report(self) -> str:
        """Generate summary report."""
        report_path = self.output_dir / f"audit-{datetime.now().strftime('%Y%m%d-%H%M%S')}.json"
        with open(report_path, "w") as f:
            json.dump(self.results, f, indent=2, default=str)

        # Print summary
        total = sum(len(v) for v in self.results["scans"].values())
        passed = sum(
            1 for cat in self.results["scans"].values()
            for r in cat.values() if isinstance(r, dict) and r.get("success")
        )
        print(f"\n{'='*60}")
        print(f"SECURITY AUDIT SUMMARY")
        print(f"{'='*60}")
        print(f"Timestamp: {self.results['timestamp']}")
        print(f"Total scans: {total}")
        print(f"Passed: {passed}")
        print(f"Failed: {total - passed}")
        print(f"Report: {report_path}")
        print(f"{'='*60}")

        return str(report_path)


if __name__ == "__main__":
    auditor = SecurityAuditor()

    # Run all scans
    auditor.scan_iac("terraform/")
    auditor.scan_container("ecr.aws/devsecops/backend:latest")
    auditor.scan_kubernetes()

    # Generate report
    report = auditor.generate_report()
    print(f"\nFull report: {report}")