s3shastra / cli_deep_scanner.py
Atharv834
Deploy S3Shastra backend - FastAPI + scanners + ML models
6a4dcb6
import asyncio
import argparse
import sys
from deep_scanner import S3DeepAuditor
# Mocking the WebSocket interface since deep_scanner expects one
class MockWebSocket:
async def send_json(self, data):
if data.get("type") in ["status", "info"]:
print(f"[*] {data.get('message', '')}")
elif data.get("type") == "progress":
stats = data.get("stats", {})
print(f"[>] Scanned: {stats.get('files_scanned', 0)} files | "
f"Bytes: {stats.get('bytes_scanned', 0)} | "
f"Skipped: {stats.get('skipped_binary', 0) + stats.get('skipped_empty', 0)} | "
f"Flags: {stats.get('flagged_high_risk', 0)} | "
f"Speed: {stats.get('items_per_second', 0)} files/sec", end='\r')
elif data.get("type") == "finding":
finding = data.get("data", {})
print(f"\n[!!!] DEEP SCAN FINDING: {finding.get('full_url')}")
print(f" Trigger: {finding.get('trigger_reason')}")
print(f" File Size: {finding.get('size_bytes')} bytes")
elif data.get("type") == "error":
print(f"\n[ERROR] {data.get('message', '')}")
elif data.get("type") == "finished":
stats = data.get("stats", {})
print(f"\n\n[✓] Deep Scan Finished in {stats.get('elapsed_seconds', 0)} seconds.")
print(f"Total Files Scanned: {stats.get('files_scanned', 0)}")
print(f"Total Bytes Fetched: {stats.get('bytes_scanned', 0)}")
print(f"Total Sensitive Flags: {stats.get('flagged_high_risk', 0)}")
async def run_deep_scan(bucket_name: str, timeout: int):
# Initialize the auditor precisely the same way the WebSocket does
auditor = S3DeepAuditor(bucket_name=bucket_name, timeout=timeout)
ws = MockWebSocket()
print(f"\nStarting Deep Content Scan on S3 bucket: {bucket_name}")
print("=" * 60)
await auditor.audit_bucket(ws)
print("=" * 60)
if __name__ == "__main__":
# Fix for Windows asyncio
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
parser = argparse.ArgumentParser(description="Run the Deep Content S3 Scanner from the CLI")
parser.add_argument("bucket", help="Name or URL of the S3 bucket to deep scan (e.g., my-company-bucket)")
parser.add_argument("-t", "--timeout", type=int, default=15, help="Timeout in seconds for fetching files (Default: 15)")
args = parser.parse_args()
try:
asyncio.run(run_deep_scan(args.bucket, args.timeout))
except KeyboardInterrupt:
print("\n\n[-] Deep Scan aborted by user.")