devsecops-platform / infrastructure /finops_scanner.py
shaikhsalman's picture
refactor: merged structure - model at center, DevSecOps wrapped around it
9d4d5c7 verified
#!/usr/bin/env python3
"""FinOps Cloud Cost Scanner - Detect Waste & Recommend Savings."""
import json, subprocess
from datetime import datetime
from pathlib import Path
from typing import Dict, List
class FinOpsScanner:
def __init__(self, output_dir: str = "./finops-reports"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.findings: List[Dict] = []
self.total_potential_savings = 0.0
def _aws_cmd(self, service: str, cmd: str) -> str:
full_cmd = f"aws {service} {cmd} --output json --region us-east-1"
try:
result = subprocess.run(full_cmd, shell=True, capture_output=True, text=True, timeout=60)
return result.stdout if result.returncode == 0 else "[]"
except Exception:
return "[]"
def scan_unused_volumes(self) -> List[Dict]:
output = self._aws_cmd("ec2", "describe-volumes --filters Name=status,Values=available")
try:
volumes = json.loads(output).get("Volumes", [])
for v in volumes:
size_gb = v["Size"]
savings = size_gb * 0.08
self.findings.append({
"id": f"FINOPS-001-{v['VolumeId']}",
"type": "unused-ebs",
"size_gb": size_gb,
"monthly_savings": round(savings, 2),
"action": f"Delete or snapshot {v['VolumeId']}",
})
self.total_potential_savings += savings
return self.findings
except json.JSONDecodeError:
return []
def scan_idle_eips(self) -> List[Dict]:
output = self._aws_cmd("ec2", "describe-addresses")
try:
for addr in json.loads(output).get("Addresses", []):
if "AssociationId" not in addr:
self.findings.append({
"id": f"FINOPS-003-{addr['PublicIp']}",
"type": "unattached-eip",
"monthly_savings": 3.60,
"action": f"Release EIP {addr['PublicIp']}",
})
self.total_potential_savings += 3.60
return self.findings
except json.JSONDecodeError:
return []
def generate_report(self) -> str:
report = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"total_potential_monthly_savings": round(self.total_potential_savings, 2),
"findings_count": len(self.findings),
"findings": self.findings,
"recommendations": [
{"priority": 1, "action": "Schedule non-prod off-hours", "savings": "65% non-prod"},
{"priority": 2, "action": "Purchase RIs for EKS nodes", "savings": "30-40% compute"},
{"priority": 3, "action": "Enable S3 Intelligent-Tiering", "savings": "40-60% storage"},
{"priority": 4, "action": "Use SPOT for ML training", "savings": "70-90% GPU"},
{"priority": 5, "action": "Rightsize K8s workloads", "savings": "30-50% cluster"},
],
}
report_path = self.output_dir / f"finops-{datetime.now().strftime('%Y%m%d')}.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2, default=str)
print(f"
{'='*60}")
print(f"FINOPS SCAN: {len(self.findings)} findings | ${self.total_potential_savings:,.2f}/mo potential savings")
print(f"{'='*60}")
return str(report_path)
def run_full_scan(self):
self.scan_unused_volumes()
self.scan_idle_eips()
return self.generate_report()
if __name__ == "__main__":
FinOpsScanner().run_full_scan()