| import asyncio |
| import argparse |
| import sys |
| from deep_scanner import S3DeepAuditor |
|
|
| |
| class MockWebSocket: |
| async def send_json(self, data): |
| if data.get("type") in ["status", "info"]: |
| print(f"[*] {data.get('message', '')}") |
| elif data.get("type") == "progress": |
| stats = data.get("stats", {}) |
| print(f"[>] Scanned: {stats.get('files_scanned', 0)} files | " |
| f"Bytes: {stats.get('bytes_scanned', 0)} | " |
| f"Skipped: {stats.get('skipped_binary', 0) + stats.get('skipped_empty', 0)} | " |
| f"Flags: {stats.get('flagged_high_risk', 0)} | " |
| f"Speed: {stats.get('items_per_second', 0)} files/sec", end='\r') |
| elif data.get("type") == "finding": |
| finding = data.get("data", {}) |
| print(f"\n[!!!] DEEP SCAN FINDING: {finding.get('full_url')}") |
| print(f" Trigger: {finding.get('trigger_reason')}") |
| print(f" File Size: {finding.get('size_bytes')} bytes") |
| elif data.get("type") == "error": |
| print(f"\n[ERROR] {data.get('message', '')}") |
| elif data.get("type") == "finished": |
| stats = data.get("stats", {}) |
| print(f"\n\n[✓] Deep Scan Finished in {stats.get('elapsed_seconds', 0)} seconds.") |
| print(f"Total Files Scanned: {stats.get('files_scanned', 0)}") |
| print(f"Total Bytes Fetched: {stats.get('bytes_scanned', 0)}") |
| print(f"Total Sensitive Flags: {stats.get('flagged_high_risk', 0)}") |
|
|
| async def run_deep_scan(bucket_name: str, timeout: int): |
| |
| auditor = S3DeepAuditor(bucket_name=bucket_name, timeout=timeout) |
| ws = MockWebSocket() |
| |
| print(f"\nStarting Deep Content Scan on S3 bucket: {bucket_name}") |
| print("=" * 60) |
| await auditor.audit_bucket(ws) |
| print("=" * 60) |
|
|
| if __name__ == "__main__": |
| |
| if sys.platform == "win32": |
| asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) |
| |
| parser = argparse.ArgumentParser(description="Run the Deep Content S3 Scanner from the CLI") |
| parser.add_argument("bucket", help="Name or URL of the S3 bucket to deep scan (e.g., my-company-bucket)") |
| parser.add_argument("-t", "--timeout", type=int, default=15, help="Timeout in seconds for fetching files (Default: 15)") |
| |
| args = parser.parse_args() |
| |
| try: |
| asyncio.run(run_deep_scan(args.bucket, args.timeout)) |
| except KeyboardInterrupt: |
| print("\n\n[-] Deep Scan aborted by user.") |
|
|