| |
| |
| import asyncio |
| import logging |
| import sys |
| import json |
| import re |
| import html |
| import random |
| import time |
| from typing import Set |
| from types import SimpleNamespace |
|
|
| import aiohttp |
| from aiohttp import ClientSession, ClientTimeout, TCPConnector, ClientConnectorError |
|
|
| logger = logging.getLogger("s3shastra.scanner") |
|
|
| |
| |
|
|
| |
| DEFAULT_UA = "S3Shastra/1.3 (+ethical; bugbounty; contact=security@example.com)" |
|
|
| |
| BUCKET_REGEX = re.compile(r"""(?ix) |
| (?:[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]\.s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com |
| |s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]) |
| |(?:[a-z0-9.-]+\.storage\.googleapis\.com |
| |storage\.googleapis\.com/[a-z0-9.-]+) |
| |(?:[a-z0-9-]+\.blob\.core\.windows\.net) |
| |(?:[a-z0-9.-]+\.(?:nyc3|ams3|sfo3|fra1|sgp1|tor1|blr1)\.digitaloceanspaces\.com) |
| |(?:[a-z0-9.-]+\.objectstorage\.[a-z0-9-]+\.oraclecloud\.com) |
| |(?:[a-z0-9.-]+\.[a-z0-9-]+\.linodeobjects\.com) |
| |(?:s3\.[a-z0-9-]+\.wasabisys\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9] |
| |[a-z0-9.-]+\.s3\.[a-z0-9-]+\.wasabisys\.com) |
| |(?:[a-z0-9.-]+\.r2\.cloudflarestorage\.com) |
| |(?:f\d+\.backblazeb2\.com/[a-z0-9./-]+) |
| |(?:[a-z0-9.-]+\.oss-[a-z0-9-]+\.aliyuncs\.com) |
| |(?:[a-z0-9.-]+\.cos\.[a-z0-9.-]+\.myqcloud\.com) |
| |(?:[a-z0-9.-]+\.storage\.yandexcloud\.net) |
| """, re.VERBOSE) |
|
|
| PROVIDERS = [ |
| ("AWS S3", re.compile(r"amazonaws\.com", re.I)), |
| ("Google Cloud", re.compile(r"storage\.googleapis\.com", re.I)), |
| ("Azure Blob", re.compile(r"blob\.core\.windows\.net", re.I)), |
| ("DigitalOcean", re.compile(r"digitaloceanspaces\.com", re.I)), |
| ("Oracle Cloud", re.compile(r"objectstorage\.", re.I)), |
| ("Linode", re.compile(r"linodeobjects\.com", re.I)), |
| ("Wasabi", re.compile(r"wasabisys\.com", re.I)), |
| ("Cloudflare R2", re.compile(r"r2\.cloudflarestorage\.com", re.I)), |
| ("Backblaze B2", re.compile(r"backblazeb2\.com", re.I)), |
| ("Alibaba OSS", re.compile(r"aliyuncs\.com", re.I)), |
| ("Tencent COS", re.compile(r"myqcloud\.com", re.I)), |
| ("Yandex", re.compile(r"yandexcloud\.net", re.I)), |
| ] |
|
|
| PROVIDER_LIMITS_MAP = { |
| "aws":"AWS S3","gcp":"Google Cloud","azure":"Azure Blob","do":"DigitalOcean", |
| "wasabi":"Wasabi","r2":"Cloudflare R2","b2":"Backblaze B2","oracle":"Oracle Cloud", |
| "linode":"Linode","alibaba":"Alibaba OSS","tencent":"Tencent COS","yandex":"Yandex", |
| } |
|
|
| PRECOMPILED_LIMITS_MAP = {v.lower(): k for k, v in PROVIDER_LIMITS_MAP.items()} |
|
|
| def identify_provider(url: str) -> str: |
| for name, pat in PROVIDERS: |
| if pat.search(url): |
| return name |
| return "Unknown" |
|
|
| |
| def norm_host(h: str) -> str: |
| return h.strip().lower().rstrip(".") |
|
|
| def norm_match(m: str) -> str: |
| u = html.unescape(m).strip() |
| return u.strip('"\")]>;.,') |
|
|
| def build_timeout(tot, con, rd) -> ClientTimeout: |
| return ClientTimeout(total=tot, connect=con, sock_read=rd) |
|
|
| def include_exclude(sub, inc, exc): |
| if inc and not inc.search(sub): return False |
| if exc and exc.search(sub): return False |
| return True |
|
|
| async def http_get(session, url, retries=3, allow_redirects=True, headers=None): |
| backoff = 0.5 |
| for i in range(retries+1): |
| try: |
| return await session.get(url, allow_redirects=allow_redirects, headers=headers) |
| except (asyncio.TimeoutError, ClientConnectorError, aiohttp.ClientError): |
| if i==retries: return None |
| await asyncio.sleep(backoff+random.random()*0.3) |
| backoff*=1.8 |
| return None |
|
|
| async def read_body(r, max_size: int = 2 * 1024 * 1024): |
| """Read response body, capped at max_size bytes to prevent memory exhaustion. |
| For oversized pages, reads the first 2MB (bucket URLs are near the top).""" |
| if not r: |
| return "" |
| try: |
| |
| cl = r.headers.get("Content-Length") |
| if cl and int(cl) > max_size: |
| partial = await r.content.read(max_size) |
| return partial.decode(errors="ignore") |
| return await r.text(errors="ignore") |
| except Exception: |
| try: |
| return (await r.read()).decode(errors="ignore") |
| except Exception: |
| return "" |
|
|
| |
|
|
| async def _safe_fetch(session, url, timeout_secs=30, as_json=False): |
| """Wrapper that returns text/json or empty on any failure.""" |
| try: |
| t = ClientTimeout(total=timeout_secs, sock_read=timeout_secs - 2) |
| r = await session.get(url, timeout=t, allow_redirects=True) |
| if r.status != 200: |
| return {} if as_json else "" |
| txt = await r.text(errors="ignore") |
| if as_json: |
| return json.loads(txt) |
| return txt |
| except Exception: |
| return {} if as_json else "" |
|
|
| async def get_subdomains_shrewdeye(domain, session, websocket): |
| subs = set() |
| try: |
| txt = await _safe_fetch(session, f"https://shrewdeye.app/domains/{domain}.txt") |
| for l in txt.splitlines(): |
| s = norm_host(l) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"ShrewdEye: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_anubis(domain, session, websocket): |
| subs = set() |
| try: |
| data = await _safe_fetch(session, f"https://anubisdb.com/anubis/subdomains/{domain}", as_json=True) |
| if isinstance(data, list): |
| for d in data: |
| s = norm_host(d) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"AnubisDB: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_jldc(domain, session, websocket): |
| subs = set() |
| try: |
| data = await _safe_fetch(session, f"https://jldc.me/anubis/subdomains/{domain}", as_json=True) |
| if isinstance(data, list): |
| for d in data: |
| s = norm_host(d) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"JLDC: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_subdomain_center(domain, session, websocket): |
| subs = set() |
| try: |
| data = await _safe_fetch(session, f"https://api.subdomain.center/?domain={domain}", as_json=True) |
| if isinstance(data, list): |
| for d in data: |
| s = norm_host(d) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"SubdomainCenter: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_whoisxml(domain, session, websocket): |
| subs = set() |
| api_key = "at_IHleEeaUQpOU8jD15yXt3AMkY9pc6" |
| try: |
| url = f"https://subdomains.whoisxmlapi.com/api/v1?apiKey={api_key}&domainName={domain}&outputFormat=json" |
| data = await _safe_fetch(session, url, as_json=True) |
| records = data.get("result", {}).get("records", []) if isinstance(data, dict) else [] |
| for rec in records: |
| d = rec.get("domain") |
| if d: |
| s = norm_host(d) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"WhoisXML: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_crt(domain, session, websocket): |
| subs = set() |
| try: |
| timeout = ClientTimeout(total=60, sock_read=45) |
| r = await session.get(f"https://crt.sh/?q=%.{domain}&output=json", timeout=timeout) |
| if r.status == 200: |
| data = json.loads(await r.text()) |
| for row in data: |
| for n in row.get("name_value", "").splitlines(): |
| h = norm_host(n.lstrip("*.")) |
| if h and h.endswith(domain): |
| subs.add(h) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"crt.sh: {len(subs)}"}) |
| return subs |
|
|
| |
|
|
| async def get_subdomains_hackertarget(domain, session, websocket): |
| """HackerTarget hostsearch — free, no key.""" |
| subs = set() |
| try: |
| txt = await _safe_fetch(session, f"https://api.hackertarget.com/hostsearch/?q={domain}", timeout_secs=20) |
| if txt and "error" not in txt.lower(): |
| for line in txt.splitlines(): |
| parts = line.split(",") |
| if parts: |
| s = norm_host(parts[0]) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"HackerTarget: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_alienvault(domain, session, websocket): |
| """AlienVault OTX — free passive DNS.""" |
| subs = set() |
| try: |
| data = await _safe_fetch( |
| session, |
| f"https://otx.alienvault.com/api/v1/indicators/domain/{domain}/passive_dns", |
| timeout_secs=20, as_json=True, |
| ) |
| for rec in (data.get("passive_dns", []) if isinstance(data, dict) else []): |
| h = rec.get("hostname", "") |
| s = norm_host(h) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"AlienVault: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_certspotter(domain, session, websocket): |
| """CertSpotter free tier — certificate transparency.""" |
| subs = set() |
| try: |
| data = await _safe_fetch( |
| session, |
| f"https://api.certspotter.com/v1/issuances?domain={domain}&include_subdomains=true&expand=dns_names", |
| timeout_secs=25, as_json=True, |
| ) |
| if isinstance(data, list): |
| for cert in data: |
| for name in cert.get("dns_names", []): |
| s = norm_host(name.lstrip("*.")) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"CertSpotter: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_urlscan(domain, session, websocket): |
| """URLScan.io public search.""" |
| subs = set() |
| try: |
| data = await _safe_fetch( |
| session, |
| f"https://urlscan.io/api/v1/search/?q=domain:{domain}&size=1000", |
| timeout_secs=20, as_json=True, |
| ) |
| for hit in (data.get("results", []) if isinstance(data, dict) else []): |
| page = hit.get("page", {}) |
| h = page.get("domain", "") |
| s = norm_host(h) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"URLScan: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_rapiddns(domain, session, websocket): |
| """RapidDNS.io — HTML scrape.""" |
| subs = set() |
| try: |
| txt = await _safe_fetch(session, f"https://rapiddns.io/subdomain/{domain}?full=1", timeout_secs=20) |
| if txt: |
| for m in re.finditer(r'<td>([a-zA-Z0-9._-]+\.' + re.escape(domain) + r')</td>', txt): |
| s = norm_host(m.group(1)) |
| if s: |
| subs.add(s) |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"RapidDNS: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains_wayback(domain, session, websocket): |
| """Wayback Machine CDX API — harvests historical URLs for subdomains.""" |
| subs = set() |
| try: |
| txt = await _safe_fetch( |
| session, |
| f"https://web.archive.org/cdx/search/cdx?url=*.{domain}/*&output=text&fl=original&collapse=urlkey&limit=5000", |
| timeout_secs=30, |
| ) |
| for line in txt.splitlines(): |
| try: |
| |
| host_part = line.strip().split("//")[-1].split("/")[0].split(":")[0] |
| s = norm_host(host_part) |
| if s and s.endswith(domain): |
| subs.add(s) |
| except Exception: |
| continue |
| except Exception: |
| pass |
| await websocket.send_json({"type": "info", "message": f"Wayback: {len(subs)}"}) |
| return subs |
|
|
| async def get_subdomains(domain, session, websocket): |
| t0 = time.time() |
| await websocket.send_json({"type": "status", "message": f"Enumerating subdomains for {domain} from 12 API sources..."}) |
|
|
| tasks = [ |
| get_subdomains_shrewdeye(domain, session, websocket), |
| get_subdomains_subdomain_center(domain, session, websocket), |
| get_subdomains_whoisxml(domain, session, websocket), |
| get_subdomains_anubis(domain, session, websocket), |
| get_subdomains_jldc(domain, session, websocket), |
| get_subdomains_crt(domain, session, websocket), |
| get_subdomains_hackertarget(domain, session, websocket), |
| get_subdomains_alienvault(domain, session, websocket), |
| get_subdomains_certspotter(domain, session, websocket), |
| get_subdomains_urlscan(domain, session, websocket), |
| get_subdomains_rapiddns(domain, session, websocket), |
| get_subdomains_wayback(domain, session, websocket), |
| ] |
|
|
| results = await asyncio.gather(*tasks) |
|
|
| all_subs = set() |
| for r in results: |
| all_subs.update(r) |
|
|
| |
| all_subs.add(norm_host(domain)) |
|
|
| |
| cleaned = set() |
| for s in all_subs: |
| s = s.lstrip("*.") |
| if not s or not s.endswith(domain): |
| continue |
| |
| if " " in s or len(s) < len(domain): |
| continue |
| cleaned.add(s) |
|
|
| elapsed = round(time.time() - t0, 1) |
| await websocket.send_json({ |
| "type": "status", |
| "message": f"Subdomain enumeration complete: {len(cleaned)} unique subdomains in {elapsed}s" |
| }) |
| return sorted(cleaned) |
|
|
| |
| def is_s3_takeover(srv, body): |
| lo=srv.lower() |
| return (("amazons3" in lo and ("NoSuchBucket" in body or "does not exist" in body)) |
| or ("cloudfront" in lo and "NoSuchBucket" in body)) |
|
|
| |
|
|
| def _is_denied_body(body): |
| """Check if a response body contains a *specific* access-denied error code. |
| IMPORTANT: Only match precise cloud-provider denial codes. |
| Do NOT match generic strings like '<Error>', 'Forbidden', or 'Unauthorized' |
| because they appear in 404 / redirect / generic XML responses too.""" |
| if not body: |
| return False |
| markers = ( |
| "AccessDenied", "Access Denied", |
| "AuthenticationFailed", "AllAccessDisabled", |
| "InvalidAccessKeyId", "SignatureDoesNotMatch", |
| "PublicAccessNotPermitted", "AccountRequiresAHint", |
| "AuthorizationRequired", "AuthorizationPermissionMismatch", |
| ) |
| return any(m in body for m in markers) |
|
|
| def _finalize_status(status, perms, dangling=False): |
| """Derive final status from actual permission flags. |
| Any bucket with read/write/list access is PUBLIC, period. |
| Any existing bucket with NO public perms is PRIVATE.""" |
| if dangling: |
| return status |
| if perms.get("write"): |
| return "Public Write Access" |
| if perms.get("list"): |
| return "Public Listing" |
| if perms.get("read"): |
| return "Public Read Access" |
| if perms.get("aclReadable"): |
| return "ACL Exposed" |
| |
| sl = status.lower() |
| if any(k in sl for k in ("exists", "redirect", "private", "allaccessdisabled", "unknown acl")): |
| return "Exists (Private)" |
| return status |
|
|
| def _make_perms(status, read=False, write=False, list_perm=False, acl_read=False, dangling=False, security_checks=None): |
| """Build a standardised permissions dict.""" |
| perms = { |
| "read": read, |
| "write": write, |
| "list": list_perm, |
| "aclReadable": acl_read, |
| } |
| |
| final_status = _finalize_status(status, perms, dangling) |
| result = { |
| "status": final_status, |
| "permissions": perms, |
| "dangling": dangling, |
| } |
| if security_checks: |
| result["securityChecks"] = security_checks |
| return result |
|
|
| async def s3_bucket_status(b, session): |
| """Deep ACL audit for AWS S3 buckets.""" |
| perms = {"read": False, "write": False, "list": False, "aclReadable": False} |
| dangling = False |
| status = "Unknown" |
| base = b if ".amazonaws.com" in b else f"{b}.s3.amazonaws.com" |
|
|
| try: |
| |
| r = await http_get(session, f"https://{base}/", allow_redirects=False) |
| if r: |
| body = await read_body(r) |
| |
| |
| if r.status == 200 and "<ListBucketResult" in body: |
| status = "Public Listing" |
| perms["list"] = True |
| perms["read"] = True |
| elif r.status == 200: |
| status = "Exists (200)" |
| perms["read"] = True |
| elif r.status == 404 or "NoSuchBucket" in body: |
| status = "Nonexistent" |
| dangling = True |
| return _make_perms(status, dangling=True) |
| elif "AllAccessDisabled" in body: |
| status = "AllAccessDisabled" |
| return _make_perms(status) |
| elif r.status in (301, 302, 307, 308): |
| status = f"Redirect {r.status}" |
| elif r.status in (401, 403) or _is_denied_body(body): |
| status = "Exists (Private)" |
|
|
| |
| if not perms["read"]: |
| r2 = await http_get(session, f"https://{base}/robots.txt", allow_redirects=False) |
| if r2 and r2.status == 200: |
| rbody = await read_body(r2) |
| if not _is_denied_body(rbody): |
| perms["read"] = True |
|
|
| |
| try: |
| wr = await session.put( |
| f"https://{base}/.s3shastra_write_test_probe", |
| data=b"s3shastra-write-test", |
| headers={"Content-Type": "text/plain"}, |
| ) |
| if wr.status in (200, 204): |
| wbody = await read_body(wr) |
| if not _is_denied_body(wbody): |
| perms["write"] = True |
| |
| await session.delete(f"https://{base}/.s3shastra_write_test_probe") |
| except Exception: |
| pass |
|
|
| |
| try: |
| acl_r = await http_get(session, f"https://{base}/?acl", allow_redirects=False) |
| if acl_r and acl_r.status == 200: |
| abody = await read_body(acl_r) |
| if not _is_denied_body(abody): |
| perms["aclReadable"] = True |
| except Exception: |
| pass |
|
|
| |
| sec_checks = {"encryption": "unknown", "encryptionAlgorithm": None, "corsOpen": "unknown", "presignedBypass": False} |
| try: |
| enc_r = await http_get(session, f"https://{base}/?encryption", allow_redirects=False) |
| if enc_r: |
| enc_body = await read_body(enc_r) |
| if enc_r.status == 200 and "ServerSideEncryptionConfiguration" in enc_body: |
| sec_checks["encryption"] = "enabled" |
| if "aws:kms" in enc_body: |
| sec_checks["encryptionAlgorithm"] = "SSE-KMS (aws:kms)" |
| elif "AES256" in enc_body: |
| sec_checks["encryptionAlgorithm"] = "SSE-S3 (AES-256)" |
| else: |
| sec_checks["encryptionAlgorithm"] = "SSE (type unknown)" |
| elif enc_r.status == 200: |
| sec_checks["encryption"] = "disabled" |
| elif _is_denied_body(enc_body) or enc_r.status in (401, 403): |
| sec_checks["encryption"] = "denied" |
| elif "ServerSideEncryptionConfigurationNotFoundError" in enc_body: |
| sec_checks["encryption"] = "disabled" |
| except Exception: |
| pass |
|
|
| |
| try: |
| cors_r = await session.options( |
| f"https://{base}/", |
| headers={"Origin": "https://evil.com", "Access-Control-Request-Method": "GET"}, |
| ) |
| if cors_r: |
| acao = cors_r.headers.get("Access-Control-Allow-Origin", "") |
| if acao == "*": |
| sec_checks["corsOpen"] = "wildcard" |
| elif "evil.com" in acao: |
| sec_checks["corsOpen"] = "reflects_origin" |
| elif acao: |
| sec_checks["corsOpen"] = "restricted" |
| else: |
| sec_checks["corsOpen"] = "no_cors" |
| except Exception: |
| pass |
|
|
| |
| try: |
| fake_presigned = ( |
| f"https://{base}/test.txt" |
| f"?X-Amz-Algorithm=AWS4-HMAC-SHA256" |
| f"&X-Amz-Credential=AKIAIOSFODNN7EXAMPLE/20200101/us-east-1/s3/aws4_request" |
| f"&X-Amz-Date=20200101T000000Z&X-Amz-Expires=1" |
| f"&X-Amz-Signature=0000000000000000000000000000000000000000000000000000000000000000" |
| f"&X-Amz-SignedHeaders=host" |
| ) |
| pr = await http_get(session, fake_presigned, allow_redirects=False) |
| if pr and pr.status == 200: |
| pr_body = await read_body(pr) |
| if not _is_denied_body(pr_body) and len(pr_body) > 0: |
| sec_checks["presignedBypass"] = True |
| except Exception: |
| pass |
|
|
| except Exception: |
| pass |
|
|
| return _make_perms(status, **perms, dangling=dangling, security_checks=sec_checks) |
|
|
| async def gcs_bucket_status(b, session): |
| """Deep ACL audit for GCS buckets.""" |
| perms = {"read": False, "write": False, "list": False, "aclReadable": False} |
| dangling = False |
| status = "Unknown" |
| try: |
| |
| r = await http_get(session, f"https://storage.googleapis.com/storage/v1/b/{b}", allow_redirects=False) |
| if r: |
| mbody = await read_body(r) |
| if r.status == 200: |
| status = "Exists (Metadata)" |
| perms["read"] = True |
| elif r.status == 404: |
| return _make_perms("Nonexistent", dangling=True) |
| elif r.status in (401, 403) or _is_denied_body(mbody): |
| status = "Exists (Private)" |
|
|
| |
| r2 = await http_get(session, f"https://storage.googleapis.com/{b}", allow_redirects=False) |
| if r2: |
| body = await read_body(r2) |
| if r2.status == 200 and ("<ListBucketResult" in body or "Index of /" in body): |
| status = "Public Listing" |
| perms["list"] = True |
| perms["read"] = True |
| elif r2.status == 200: |
| status = "Exists (200)" |
| perms["read"] = True |
| elif r2.status in (401, 403) or _is_denied_body(body): |
| status = "Exists (Private)" |
|
|
| |
| try: |
| acl_r = await http_get(session, f"https://storage.googleapis.com/storage/v1/b/{b}/iam", allow_redirects=False) |
| if acl_r and acl_r.status == 200: |
| acl_body = await read_body(acl_r) |
| if not _is_denied_body(acl_body): |
| perms["aclReadable"] = True |
| except Exception: |
| pass |
|
|
| except Exception: |
| pass |
|
|
| return _make_perms(status, **perms, dangling=dangling) |
|
|
| async def azure_blob_status(acc, session): |
| """Deep ACL audit for Azure Blob.""" |
| perms = {"read": False, "write": False, "list": False, "aclReadable": False} |
| dangling = False |
| status = "Unknown" |
| host = acc if ".blob.core.windows.net" in acc else f"{acc}.blob.core.windows.net" |
| try: |
| r = await http_get(session, f"https://{host}/?comp=list", allow_redirects=False) |
| if not r: |
| return _make_perms("Unknown") |
| body = await read_body(r) |
| |
| if r.status == 200 and "<EnumerationResults" in body: |
| status = "Public Listing" |
| perms["list"] = True |
| perms["read"] = True |
| elif r.status == 404 or "ResourceNotFound" in body: |
| status = "Nonexistent" |
| dangling = True |
| elif r.status in (401, 403) or _is_denied_body(body): |
| status = "Exists (Private)" |
| except Exception: |
| pass |
|
|
| return _make_perms(status, **perms, dangling=dangling) |
|
|
| async def s3_compatible_status(h, session): |
| """Deep ACL audit for S3-compatible providers.""" |
| perms = {"read": False, "write": False, "list": False, "aclReadable": False} |
| dangling = False |
| status = "Unknown" |
| try: |
| if h.startswith("http"): |
| cand = [h] |
| elif "." in h: |
| cand = [f"https://{h}/"] |
| else: |
| cand = [f"https://{h}.s3.amazonaws.com/"] |
| for u in cand: |
| r = await http_get(session, u, allow_redirects=False) |
| if not r: |
| continue |
| body = await read_body(r) |
| |
| if r.status == 200 and "<ListBucketResult" in body: |
| status = "Public Listing" |
| perms["list"] = True |
| perms["read"] = True |
| elif r.status == 200: |
| status = "Exists (200)" |
| perms["read"] = True |
| elif r.status == 404 or "NoSuchBucket" in body: |
| status = "Nonexistent" |
| dangling = True |
| elif r.status in (401, 403) or _is_denied_body(body): |
| status = "Exists (Private)" |
| except Exception: |
| pass |
|
|
| return _make_perms(status, **perms, dangling=dangling) |
|
|
| async def classify_bucket(provider, ref, session): |
| """Route to the correct provider auditor and return {status, permissions, dangling}.""" |
| try: |
| if provider == "AWS S3": |
| m = re.search(r"s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com/([a-z0-9][a-z0-9.-]{1,61}[a-z0-9])", ref, re.I) |
| b = m.group(1) if m else ref |
| return await s3_bucket_status(b, session) |
| if provider == "Google Cloud": |
| m = re.search(r"(?:storage\.googleapis\.com/|^)([a-z0-9.-]+)", ref, re.I) |
| b = m.group(1) if m else ref |
| return await gcs_bucket_status(b, session) |
| if provider == "Azure Blob": |
| m = re.search(r"([a-z0-9-]+)\.blob\.core\.windows\.net", ref, re.I) |
| a = m.group(1) if m else ref |
| return await azure_blob_status(a, session) |
| if provider in ("DigitalOcean", "Wasabi", "Cloudflare R2", "Backblaze B2", |
| "Linode", "Oracle Cloud", "Alibaba OSS", "Tencent COS", "Yandex"): |
| return await s3_compatible_status(ref, session) |
| return _make_perms("Unknown") |
| except Exception: |
| return _make_perms("Unknown") |
|
|
|
|
| |
| def generate_takeover_script(provider, bucket_name, ref): |
| """Generate a provider-specific bucket claim script for authorized pentesting.""" |
| scripts = {} |
| if provider == "AWS S3": |
| b = bucket_name |
| scripts["aws_cli"] = ( |
| f"# AWS S3 Bucket Takeover — AUTHORIZED PENTESTING ONLY\n" |
| f"# Verify the bucket is truly dangling first:\n" |
| f"aws s3 ls s3://{b} 2>&1 | grep -q NoSuchBucket && echo 'DANGLING'\n\n" |
| f"# Claim the bucket:\n" |
| f"aws s3 mb s3://{b} --region us-east-1\n\n" |
| f"# Upload proof-of-concept:\n" |
| f"echo 's3shastra-takeover-proof' | aws s3 cp - s3://{b}/takeover-proof.txt\n\n" |
| f"# Block public access immediately:\n" |
| f"aws s3api put-public-access-block --bucket {b} \\\n" |
| f" --public-access-block-configuration BlockPublicAcls=true,IgnorePublicAcls=true," |
| f"BlockPublicPolicy=true,RestrictPublicBuckets=true" |
| ) |
| elif provider == "Google Cloud": |
| b = bucket_name |
| scripts["gcloud"] = ( |
| f"# GCS Bucket Takeover — AUTHORIZED PENTESTING ONLY\n" |
| f"gsutil ls gs://{b} 2>&1 | grep -q BucketNotFound && echo 'DANGLING'\n\n" |
| f"# Claim:\n" |
| f"gsutil mb -l US gs://{b}\n\n" |
| f"# Lock down:\n" |
| f"gsutil iam ch -d allUsers gs://{b}\n" |
| f"gsutil iam ch -d allAuthenticatedUsers gs://{b}" |
| ) |
| elif provider == "Azure Blob": |
| b = bucket_name |
| scripts["az_cli"] = ( |
| f"# Azure Blob Takeover — AUTHORIZED PENTESTING ONLY\n" |
| f"# Create storage account (name must be globally unique):\n" |
| f"az storage account create --name {b[:24].replace('.','').replace('-','')} " |
| f"--resource-group YOUR_RG --location eastus --sku Standard_LRS\n\n" |
| f"# Disable public access:\n" |
| f"az storage account update --name {b[:24].replace('.','').replace('-','')} " |
| f"--resource-group YOUR_RG --allow-blob-public-access false" |
| ) |
| else: |
| scripts["generic"] = ( |
| f"# {provider} bucket '{bucket_name}' appears dangling.\n" |
| f"# Consult your provider's CLI docs to claim it.\n" |
| f"# Reference URL: {ref}" |
| ) |
| return scripts |
|
|
|
|
| |
| async def get_exposure_timeline(bucket_ref, session): |
| """Query Wayback Machine CDX API to estimate how long a bucket URL has been indexed.""" |
| timeline = [] |
| try: |
| |
| cdx_url = ( |
| f"https://web.archive.org/cdx/search/cdx" |
| f"?url={bucket_ref}&output=json&fl=timestamp,statuscode,original" |
| f"&collapse=timestamp:6&limit=200" |
| ) |
| t = ClientTimeout(total=30, sock_read=25) |
| r = await session.get(cdx_url, timeout=t, allow_redirects=True) |
| if r.status == 200: |
| data = json.loads(await r.text()) |
| if len(data) > 1: |
| for row in data[1:]: |
| ts = row[0] |
| try: |
| year = ts[:4] |
| month = ts[4:6] |
| day = ts[6:8] |
| date_str = f"{year}-{month}-{day}" |
| timeline.append({ |
| "date": date_str, |
| "timestamp": ts, |
| "statusCode": row[1] if len(row) > 1 else "", |
| "url": row[2] if len(row) > 2 else bucket_ref, |
| }) |
| except Exception: |
| continue |
| except Exception: |
| pass |
|
|
| first_seen = timeline[0]["date"] if timeline else None |
| last_seen = timeline[-1]["date"] if timeline else None |
| total_snapshots = len(timeline) |
|
|
| |
| exposure_days = None |
| if first_seen and last_seen: |
| from datetime import datetime as dt |
| try: |
| d1 = dt.strptime(first_seen, "%Y-%m-%d") |
| d2 = dt.strptime(last_seen, "%Y-%m-%d") |
| exposure_days = (d2 - d1).days |
| except Exception: |
| pass |
|
|
| return { |
| "bucketRef": bucket_ref, |
| "firstSeen": first_seen, |
| "lastSeen": last_seen, |
| "totalSnapshots": total_snapshots, |
| "exposureDays": exposure_days, |
| "timeline": timeline[:50], |
| } |
|
|
|
|
| |
| async def fetch_page(session, url, headers): |
| try: |
| r = await session.get(url, headers=headers, allow_redirects=True) |
| try: |
| txt = await r.text(encoding="utf-8", errors="ignore") |
| except Exception: |
| txt = await read_body(r) |
| return txt, r.headers.get("Server", ""), r.status |
| except Exception: |
| return "", "", 0 |
|
|
| async def scan_subdomain_once(sub, session, args, websocket, seen_refs: Set[str]): |
| if not include_exclude(sub, args.inc_regex, args.exc_regex): |
| return |
| found = set() |
| takeover = False |
| headers = {"User-Agent": args.user_agent, "Accept-Encoding": "gzip, deflate"} |
| for scheme in ("https://", "http://"): |
| url = f"{scheme}{sub}" |
| content, server, _ = await fetch_page(session, url, headers) |
| if content and not args.no_takeover: |
| try: |
| if is_s3_takeover(server, content): |
| await websocket.send_json({"type": "takeover", "data": {"subdomain": sub, "server": server}}) |
| takeover = True |
| except Exception: |
| pass |
| if content: |
| for m in re.finditer(BUCKET_REGEX, content): |
| found.add(norm_match(m.group(0))) |
| if found: |
| for ref in sorted(found): |
| if ref in seen_refs: |
| continue |
| seen_refs.add(ref) |
| prov = identify_provider(ref) |
| if args.providers and prov.lower() not in {PROVIDER_LIMITS_MAP[x.lower()].lower() for x in args.providers}: |
| continue |
| result = {"subdomain": sub, "reference": ref, "provider": prov, "status": "Found", "server": server, "url": url} |
| await websocket.send_json({"type": "result", "data": result}) |
|
|
| if scheme == "https://" and (takeover or found): |
| break |
|
|
| if found and not args.no_bucket_checks: |
| sem = asyncio.Semaphore(min(50, args.threads)) |
|
|
| async def chk(rf): |
| prov = identify_provider(rf) |
| if args.providers and prov.lower() not in {PROVIDER_LIMITS_MAP[x.lower()].lower() for x in args.providers}: |
| return |
| async with sem: |
| audit = await classify_bucket(prov, rf, session) |
|
|
| |
| bucket_name = rf.split("/")[-1] if "/" in rf else rf.split(".")[0] |
|
|
| rec = { |
| "subdomain": sub, |
| "reference": rf, |
| "provider": prov, |
| "status": audit["status"], |
| "permissions": audit["permissions"], |
| "dangling": audit["dangling"], |
| } |
|
|
| |
| if audit.get("securityChecks"): |
| rec["securityChecks"] = audit["securityChecks"] |
|
|
| |
| if audit["dangling"]: |
| rec["takeoverScripts"] = generate_takeover_script(prov, bucket_name, rf) |
|
|
| await websocket.send_json({"type": "result_update", "data": rec}) |
|
|
| await asyncio.gather(*(chk(rf) for rf in sorted(found))) |
|
|
| async def scan_subdomain(sub, session, args, websocket, seen_refs, subdomain_timeout): |
| try: |
| await asyncio.wait_for( |
| scan_subdomain_once(sub, session, args, websocket, seen_refs), |
| timeout=subdomain_timeout |
| ) |
| except asyncio.TimeoutError: |
| await websocket.send_json({"type": "status", "message": f"Timeout scanning {sub}"}) |
| except Exception as e: |
| await websocket.send_json({"type": "status", "message": f"Error scanning {sub}: {e}"}) |
|
|
|
|
| |
| async def entrypoint_scan(domain, args, websocket): |
| await websocket.send_json({"type": "status", "message": f"Starting scan for {domain}"}) |
| seen_refs: Set[str] = set() |
| timeout = build_timeout(args.timeout, args.connect_timeout, args.read_timeout) |
| resolver = aiohttp.AsyncResolver() if sys.platform == 'win32' else None |
| conn = TCPConnector(ssl=not args.insecure, limit=args.threads, limit_per_host=args.per_host, resolver=resolver) |
|
|
| args.inc_regex = re.compile(args.include) if args.include else None |
| args.exc_regex = re.compile(args.exclude) if args.exclude else None |
|
|
| async with ClientSession(connector=conn, timeout=timeout, headers={"User-Agent": args.user_agent}) as sess: |
| subs = await get_subdomains(domain, sess, websocket) |
| if not subs: |
| await websocket.send_json({"type": "status", "message": f"No subdomains found for {domain}"}) |
| return |
|
|
| await websocket.send_json({"type": "status", "message": f"Found {len(subs)} subdomains for {domain}. Starting scan..."}) |
| await websocket.send_json({"type": "progress_total", "total": len(subs)}) |
|
|
| sem = asyncio.Semaphore(args.threads) |
| tasks = [] |
| for s in subs: |
| |
| async def bscan(sd=s): |
| async with sem: |
| await scan_subdomain(sd, sess, args, websocket, seen_refs, args.subdomain_timeout) |
| try: |
| await websocket.send_json({"type": "progress_update"}) |
| except Exception: |
| pass |
| tasks.append(asyncio.create_task(bscan())) |
|
|
| await asyncio.gather(*tasks) |
|
|