File size: 1,702 Bytes
6a4dcb6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | import sys
import asyncio
import os
import ml_engine
def run_cli_scan():
if len(sys.argv) < 3 or sys.argv[1] != "scan":
print("Usage: python cli_scanner.py scan <domain>")
sys.exit(1)
domain = sys.argv[2]
# Check if the model exists, if not train it
model_path = "s3_model.joblib"
if not os.path.exists(model_path):
print("[+] Training ML model for the first time...")
ml_engine.train(model_path)
else:
print("[+] ML model found.")
print(f"\n[+] Starting ML-powered scan for domain: {domain}")
from dlp_scanner import S3DLPAuditor
class MockWebSocket:
async def send_json(self, data):
if data["type"] == "finding":
f = data["data"]
print(f" [!] SENSITIVE FILE FOUND: {f['file_name']} (Reason: {f['trigger_reason']}) -> {f['full_url']}")
elif data["type"] == "progress":
print(f" ... scanned {data['stats']['scanned']} files ...")
elif data["type"] == "error":
print(f" [X] ERROR: {data['message']}")
elif data["type"] == "status":
print(f" [*] {data['message']}")
elif data["type"] == "finished":
print(f"\n[+] Scan Complete! Scanned {data['stats']['scanned']} files. Found {data['stats']['flagged_high_risk']} sensitive files.")
elif data["type"] == "info":
print(f" [i] {data['message']}")
async def run_scan():
auditor = S3DLPAuditor(bucket_name=domain)
await auditor.audit_bucket(MockWebSocket())
asyncio.run(run_scan())
if __name__ == "__main__":
run_cli_scan()
|