import asyncio import argparse import sys from deep_scanner import S3DeepAuditor # Mocking the WebSocket interface since deep_scanner expects one class MockWebSocket: async def send_json(self, data): if data.get("type") in ["status", "info"]: print(f"[*] {data.get('message', '')}") elif data.get("type") == "progress": stats = data.get("stats", {}) print(f"[>] Scanned: {stats.get('files_scanned', 0)} files | " f"Bytes: {stats.get('bytes_scanned', 0)} | " f"Skipped: {stats.get('skipped_binary', 0) + stats.get('skipped_empty', 0)} | " f"Flags: {stats.get('flagged_high_risk', 0)} | " f"Speed: {stats.get('items_per_second', 0)} files/sec", end='\r') elif data.get("type") == "finding": finding = data.get("data", {}) print(f"\n[!!!] DEEP SCAN FINDING: {finding.get('full_url')}") print(f" Trigger: {finding.get('trigger_reason')}") print(f" File Size: {finding.get('size_bytes')} bytes") elif data.get("type") == "error": print(f"\n[ERROR] {data.get('message', '')}") elif data.get("type") == "finished": stats = data.get("stats", {}) print(f"\n\n[✓] Deep Scan Finished in {stats.get('elapsed_seconds', 0)} seconds.") print(f"Total Files Scanned: {stats.get('files_scanned', 0)}") print(f"Total Bytes Fetched: {stats.get('bytes_scanned', 0)}") print(f"Total Sensitive Flags: {stats.get('flagged_high_risk', 0)}") async def run_deep_scan(bucket_name: str, timeout: int): # Initialize the auditor precisely the same way the WebSocket does auditor = S3DeepAuditor(bucket_name=bucket_name, timeout=timeout) ws = MockWebSocket() print(f"\nStarting Deep Content Scan on S3 bucket: {bucket_name}") print("=" * 60) await auditor.audit_bucket(ws) print("=" * 60) if __name__ == "__main__": # Fix for Windows asyncio if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) parser = argparse.ArgumentParser(description="Run the Deep Content S3 Scanner from the CLI") parser.add_argument("bucket", help="Name or URL of the S3 bucket to deep scan (e.g., my-company-bucket)") parser.add_argument("-t", "--timeout", type=int, default=15, help="Timeout in seconds for fetching files (Default: 15)") args = parser.parse_args() try: asyncio.run(run_deep_scan(args.bucket, args.timeout)) except KeyboardInterrupt: print("\n\n[-] Deep Scan aborted by user.")