s3shastra / cli_scanner.py
Atharv834
Deploy S3Shastra backend - FastAPI + scanners + ML models
6a4dcb6
import sys
import asyncio
import os
import ml_engine
def run_cli_scan():
if len(sys.argv) < 3 or sys.argv[1] != "scan":
print("Usage: python cli_scanner.py scan <domain>")
sys.exit(1)
domain = sys.argv[2]
# Check if the model exists, if not train it
model_path = "s3_model.joblib"
if not os.path.exists(model_path):
print("[+] Training ML model for the first time...")
ml_engine.train(model_path)
else:
print("[+] ML model found.")
print(f"\n[+] Starting ML-powered scan for domain: {domain}")
from dlp_scanner import S3DLPAuditor
class MockWebSocket:
async def send_json(self, data):
if data["type"] == "finding":
f = data["data"]
print(f" [!] SENSITIVE FILE FOUND: {f['file_name']} (Reason: {f['trigger_reason']}) -> {f['full_url']}")
elif data["type"] == "progress":
print(f" ... scanned {data['stats']['scanned']} files ...")
elif data["type"] == "error":
print(f" [X] ERROR: {data['message']}")
elif data["type"] == "status":
print(f" [*] {data['message']}")
elif data["type"] == "finished":
print(f"\n[+] Scan Complete! Scanned {data['stats']['scanned']} files. Found {data['stats']['flagged_high_risk']} sensitive files.")
elif data["type"] == "info":
print(f" [i] {data['message']}")
async def run_scan():
auditor = S3DLPAuditor(bucket_name=domain)
await auditor.audit_bucket(MockWebSocket())
asyncio.run(run_scan())
if __name__ == "__main__":
run_cli_scan()