import sys import asyncio import os import ml_engine def run_cli_scan(): if len(sys.argv) < 3 or sys.argv[1] != "scan": print("Usage: python cli_scanner.py scan ") sys.exit(1) domain = sys.argv[2] # Check if the model exists, if not train it model_path = "s3_model.joblib" if not os.path.exists(model_path): print("[+] Training ML model for the first time...") ml_engine.train(model_path) else: print("[+] ML model found.") print(f"\n[+] Starting ML-powered scan for domain: {domain}") from dlp_scanner import S3DLPAuditor class MockWebSocket: async def send_json(self, data): if data["type"] == "finding": f = data["data"] print(f" [!] SENSITIVE FILE FOUND: {f['file_name']} (Reason: {f['trigger_reason']}) -> {f['full_url']}") elif data["type"] == "progress": print(f" ... scanned {data['stats']['scanned']} files ...") elif data["type"] == "error": print(f" [X] ERROR: {data['message']}") elif data["type"] == "status": print(f" [*] {data['message']}") elif data["type"] == "finished": print(f"\n[+] Scan Complete! Scanned {data['stats']['scanned']} files. Found {data['stats']['flagged_high_risk']} sensitive files.") elif data["type"] == "info": print(f" [i] {data['message']}") async def run_scan(): auditor = S3DLPAuditor(bucket_name=domain) await auditor.audit_bucket(MockWebSocket()) asyncio.run(run_scan()) if __name__ == "__main__": run_cli_scan()