File size: 3,762 Bytes
1a33e9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
"""FinOps Cloud Cost Scanner - Detect Waste & Recommend Savings."""

import json, subprocess
from datetime import datetime
from pathlib import Path
from typing import Dict, List

class FinOpsScanner:
    def __init__(self, output_dir: str = "./finops-reports"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.findings: List[Dict] = []
        self.total_potential_savings = 0.0

    def _aws_cmd(self, service: str, cmd: str) -> str:
        full_cmd = f"aws {service} {cmd} --output json --region us-east-1"
        try:
            result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=60)
            return result.stdout if result.returncode == 0 else "[]"
        except Exception:
            return "[]"

    def scan_unused_volumes(self) -> List[Dict]:
        output = self._aws_cmd("ec2", "describe-volumes --filters Name=status,Values=available")
        try:
            volumes = json.loads(output).get("Volumes", [])
            for v in volumes:
                size_gb = v["Size"]
                savings = size_gb * 0.08
                self.findings.append({
                    "id": f"FINOPS-001-{v['VolumeId']}",
                    "type": "unused-ebs",
                    "size_gb": size_gb,
                    "monthly_savings": round(savings, 2),
                    "action": f"Delete or snapshot {v['VolumeId']}",
                })
                self.total_potential_savings += savings
            return self.findings
        except json.JSONDecodeError:
            return []

    def scan_idle_eips(self) -> List[Dict]:
        output = self._aws_cmd("ec2", "describe-addresses")
        try:
            for addr in json.loads(output).get("Addresses", []):
                if "AssociationId" not in addr:
                    self.findings.append({
                        "id": f"FINOPS-003-{addr['PublicIp']}",
                        "type": "unattached-eip",
                        "monthly_savings": 3.60,
                        "action": f"Release EIP {addr['PublicIp']}",
                    })
                    self.total_potential_savings += 3.60
            return self.findings
        except json.JSONDecodeError:
            return []

    def generate_report(self) -> str:
        report = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "total_potential_monthly_savings": round(self.total_potential_savings, 2),
            "findings_count": len(self.findings),
            "findings": self.findings,
            "recommendations": [
                {"priority": 1, "action": "Schedule non-prod off-hours", "savings": "65% non-prod"},
                {"priority": 2, "action": "Purchase RIs for EKS nodes", "savings": "30-40% compute"},
                {"priority": 3, "action": "Enable S3 Intelligent-Tiering", "savings": "40-60% storage"},
                {"priority": 4, "action": "Use SPOT for ML training", "savings": "70-90% GPU"},
                {"priority": 5, "action": "Rightsize K8s workloads", "savings": "30-50% cluster"},
            ],
        }
        report_path = self.output_dir / f"finops-{datetime.now().strftime('%Y%m%d')}.json"
        with open(report_path, "w") as f:
            json.dump(report, f, indent=2, default=str)
        print(f"
{'='*60}")
        print(f"FINOPS SCAN: {len(self.findings)} findings | ${self.total_potential_savings:,.2f}/mo potential savings")
        print(f"{'='*60}")
        return str(report_path)

    def run_full_scan(self):
        self.scan_unused_volumes()
        self.scan_idle_eips()
        return self.generate_report()

if __name__ == "__main__":
    FinOpsScanner().run_full_scan()