s3shastra / scanner.py
Atharv834
Deploy S3Shastra backend - FastAPI + scanners + ML models
6a4dcb6
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import logging
import sys
import json
import re
import html
import random
import time
from typing import Set
from types import SimpleNamespace
import aiohttp
from aiohttp import ClientSession, ClientTimeout, TCPConnector, ClientConnectorError
logger = logging.getLogger("s3shastra.scanner")
# This file is a refactoring of the original script's core logic to be used by a web backend.
# It is not meant to be run directly.
# ================= CONFIG DEFAULTS =================
DEFAULT_UA = "S3Shastra/1.3 (+ethical; bugbounty; contact=security@example.com)"
# ================= PROVIDER PATTERNS =================
BUCKET_REGEX = re.compile(r"""(?ix)
(?:[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]\.s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com
|s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9])
|(?:[a-z0-9.-]+\.storage\.googleapis\.com
|storage\.googleapis\.com/[a-z0-9.-]+)
|(?:[a-z0-9-]+\.blob\.core\.windows\.net)
|(?:[a-z0-9.-]+\.(?:nyc3|ams3|sfo3|fra1|sgp1|tor1|blr1)\.digitaloceanspaces\.com)
|(?:[a-z0-9.-]+\.objectstorage\.[a-z0-9-]+\.oraclecloud\.com)
|(?:[a-z0-9.-]+\.[a-z0-9-]+\.linodeobjects\.com)
|(?:s3\.[a-z0-9-]+\.wasabisys\.com/[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]
|[a-z0-9.-]+\.s3\.[a-z0-9-]+\.wasabisys\.com)
|(?:[a-z0-9.-]+\.r2\.cloudflarestorage\.com)
|(?:f\d+\.backblazeb2\.com/[a-z0-9./-]+)
|(?:[a-z0-9.-]+\.oss-[a-z0-9-]+\.aliyuncs\.com)
|(?:[a-z0-9.-]+\.cos\.[a-z0-9.-]+\.myqcloud\.com)
|(?:[a-z0-9.-]+\.storage\.yandexcloud\.net)
""", re.VERBOSE)
PROVIDERS = [
("AWS S3", re.compile(r"amazonaws\.com", re.I)),
("Google Cloud", re.compile(r"storage\.googleapis\.com", re.I)),
("Azure Blob", re.compile(r"blob\.core\.windows\.net", re.I)),
("DigitalOcean", re.compile(r"digitaloceanspaces\.com", re.I)),
("Oracle Cloud", re.compile(r"objectstorage\.", re.I)),
("Linode", re.compile(r"linodeobjects\.com", re.I)),
("Wasabi", re.compile(r"wasabisys\.com", re.I)),
("Cloudflare R2", re.compile(r"r2\.cloudflarestorage\.com", re.I)),
("Backblaze B2", re.compile(r"backblazeb2\.com", re.I)),
("Alibaba OSS", re.compile(r"aliyuncs\.com", re.I)),
("Tencent COS", re.compile(r"myqcloud\.com", re.I)),
("Yandex", re.compile(r"yandexcloud\.net", re.I)),
]
PROVIDER_LIMITS_MAP = {
"aws":"AWS S3","gcp":"Google Cloud","azure":"Azure Blob","do":"DigitalOcean",
"wasabi":"Wasabi","r2":"Cloudflare R2","b2":"Backblaze B2","oracle":"Oracle Cloud",
"linode":"Linode","alibaba":"Alibaba OSS","tencent":"Tencent COS","yandex":"Yandex",
}
PRECOMPILED_LIMITS_MAP = {v.lower(): k for k, v in PROVIDER_LIMITS_MAP.items()}
def identify_provider(url: str) -> str:
for name, pat in PROVIDERS:
if pat.search(url):
return name
return "Unknown"
# ================= UTILITIES =================
def norm_host(h: str) -> str:
return h.strip().lower().rstrip(".")
def norm_match(m: str) -> str:
u = html.unescape(m).strip()
return u.strip('"\")]>;.,')
def build_timeout(tot, con, rd) -> ClientTimeout:
return ClientTimeout(total=tot, connect=con, sock_read=rd)
def include_exclude(sub, inc, exc):
if inc and not inc.search(sub): return False
if exc and exc.search(sub): return False
return True
async def http_get(session, url, retries=3, allow_redirects=True, headers=None):
backoff = 0.5
for i in range(retries+1):
try:
return await session.get(url, allow_redirects=allow_redirects, headers=headers)
except (asyncio.TimeoutError, ClientConnectorError, aiohttp.ClientError):
if i==retries: return None
await asyncio.sleep(backoff+random.random()*0.3)
backoff*=1.8
return None
async def read_body(r, max_size: int = 2 * 1024 * 1024):
"""Read response body, capped at max_size bytes to prevent memory exhaustion.
For oversized pages, reads the first 2MB (bucket URLs are near the top)."""
if not r:
return ""
try:
# Check content-length header — if oversized, read only first chunk
cl = r.headers.get("Content-Length")
if cl and int(cl) > max_size:
partial = await r.content.read(max_size)
return partial.decode(errors="ignore")
return await r.text(errors="ignore")
except Exception:
try:
return (await r.read()).decode(errors="ignore")
except Exception:
return ""
# ================= SUBDOMAIN ENUM =================
async def _safe_fetch(session, url, timeout_secs=30, as_json=False):
"""Wrapper that returns text/json or empty on any failure."""
try:
t = ClientTimeout(total=timeout_secs, sock_read=timeout_secs - 2)
r = await session.get(url, timeout=t, allow_redirects=True)
if r.status != 200:
return {} if as_json else ""
txt = await r.text(errors="ignore")
if as_json:
return json.loads(txt)
return txt
except Exception:
return {} if as_json else ""
async def get_subdomains_shrewdeye(domain, session, websocket):
subs = set()
try:
txt = await _safe_fetch(session, f"https://shrewdeye.app/domains/{domain}.txt")
for l in txt.splitlines():
s = norm_host(l)
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"ShrewdEye: {len(subs)}"})
return subs
async def get_subdomains_anubis(domain, session, websocket):
subs = set()
try:
data = await _safe_fetch(session, f"https://anubisdb.com/anubis/subdomains/{domain}", as_json=True)
if isinstance(data, list):
for d in data:
s = norm_host(d)
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"AnubisDB: {len(subs)}"})
return subs
async def get_subdomains_jldc(domain, session, websocket):
subs = set()
try:
data = await _safe_fetch(session, f"https://jldc.me/anubis/subdomains/{domain}", as_json=True)
if isinstance(data, list):
for d in data:
s = norm_host(d)
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"JLDC: {len(subs)}"})
return subs
async def get_subdomains_subdomain_center(domain, session, websocket):
subs = set()
try:
data = await _safe_fetch(session, f"https://api.subdomain.center/?domain={domain}", as_json=True)
if isinstance(data, list):
for d in data:
s = norm_host(d)
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"SubdomainCenter: {len(subs)}"})
return subs
async def get_subdomains_whoisxml(domain, session, websocket):
subs = set()
api_key = "at_IHleEeaUQpOU8jD15yXt3AMkY9pc6"
try:
url = f"https://subdomains.whoisxmlapi.com/api/v1?apiKey={api_key}&domainName={domain}&outputFormat=json"
data = await _safe_fetch(session, url, as_json=True)
records = data.get("result", {}).get("records", []) if isinstance(data, dict) else []
for rec in records:
d = rec.get("domain")
if d:
s = norm_host(d)
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"WhoisXML: {len(subs)}"})
return subs
async def get_subdomains_crt(domain, session, websocket):
subs = set()
try:
timeout = ClientTimeout(total=60, sock_read=45)
r = await session.get(f"https://crt.sh/?q=%.{domain}&output=json", timeout=timeout)
if r.status == 200:
data = json.loads(await r.text())
for row in data:
for n in row.get("name_value", "").splitlines():
h = norm_host(n.lstrip("*."))
if h and h.endswith(domain):
subs.add(h)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"crt.sh: {len(subs)}"})
return subs
# ---- NEW SOURCES ----
async def get_subdomains_hackertarget(domain, session, websocket):
"""HackerTarget hostsearch — free, no key."""
subs = set()
try:
txt = await _safe_fetch(session, f"https://api.hackertarget.com/hostsearch/?q={domain}", timeout_secs=20)
if txt and "error" not in txt.lower():
for line in txt.splitlines():
parts = line.split(",")
if parts:
s = norm_host(parts[0])
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"HackerTarget: {len(subs)}"})
return subs
async def get_subdomains_alienvault(domain, session, websocket):
"""AlienVault OTX — free passive DNS."""
subs = set()
try:
data = await _safe_fetch(
session,
f"https://otx.alienvault.com/api/v1/indicators/domain/{domain}/passive_dns",
timeout_secs=20, as_json=True,
)
for rec in (data.get("passive_dns", []) if isinstance(data, dict) else []):
h = rec.get("hostname", "")
s = norm_host(h)
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"AlienVault: {len(subs)}"})
return subs
async def get_subdomains_certspotter(domain, session, websocket):
"""CertSpotter free tier — certificate transparency."""
subs = set()
try:
data = await _safe_fetch(
session,
f"https://api.certspotter.com/v1/issuances?domain={domain}&include_subdomains=true&expand=dns_names",
timeout_secs=25, as_json=True,
)
if isinstance(data, list):
for cert in data:
for name in cert.get("dns_names", []):
s = norm_host(name.lstrip("*."))
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"CertSpotter: {len(subs)}"})
return subs
async def get_subdomains_urlscan(domain, session, websocket):
"""URLScan.io public search."""
subs = set()
try:
data = await _safe_fetch(
session,
f"https://urlscan.io/api/v1/search/?q=domain:{domain}&size=1000",
timeout_secs=20, as_json=True,
)
for hit in (data.get("results", []) if isinstance(data, dict) else []):
page = hit.get("page", {})
h = page.get("domain", "")
s = norm_host(h)
if s and s.endswith(domain):
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"URLScan: {len(subs)}"})
return subs
async def get_subdomains_rapiddns(domain, session, websocket):
"""RapidDNS.io — HTML scrape."""
subs = set()
try:
txt = await _safe_fetch(session, f"https://rapiddns.io/subdomain/{domain}?full=1", timeout_secs=20)
if txt:
for m in re.finditer(r'<td>([a-zA-Z0-9._-]+\.' + re.escape(domain) + r')</td>', txt):
s = norm_host(m.group(1))
if s:
subs.add(s)
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"RapidDNS: {len(subs)}"})
return subs
async def get_subdomains_wayback(domain, session, websocket):
"""Wayback Machine CDX API — harvests historical URLs for subdomains."""
subs = set()
try:
txt = await _safe_fetch(
session,
f"https://web.archive.org/cdx/search/cdx?url=*.{domain}/*&output=text&fl=original&collapse=urlkey&limit=5000",
timeout_secs=30,
)
for line in txt.splitlines():
try:
# extract hostname from URLs like https://sub.example.com/path
host_part = line.strip().split("//")[-1].split("/")[0].split(":")[0]
s = norm_host(host_part)
if s and s.endswith(domain):
subs.add(s)
except Exception:
continue
except Exception:
pass
await websocket.send_json({"type": "info", "message": f"Wayback: {len(subs)}"})
return subs
async def get_subdomains(domain, session, websocket):
t0 = time.time()
await websocket.send_json({"type": "status", "message": f"Enumerating subdomains for {domain} from 12 API sources..."})
tasks = [
get_subdomains_shrewdeye(domain, session, websocket),
get_subdomains_subdomain_center(domain, session, websocket),
get_subdomains_whoisxml(domain, session, websocket),
get_subdomains_anubis(domain, session, websocket),
get_subdomains_jldc(domain, session, websocket),
get_subdomains_crt(domain, session, websocket),
get_subdomains_hackertarget(domain, session, websocket),
get_subdomains_alienvault(domain, session, websocket),
get_subdomains_certspotter(domain, session, websocket),
get_subdomains_urlscan(domain, session, websocket),
get_subdomains_rapiddns(domain, session, websocket),
get_subdomains_wayback(domain, session, websocket),
]
results = await asyncio.gather(*tasks)
all_subs = set()
for r in results:
all_subs.update(r)
# Ensure the base domain itself is included
all_subs.add(norm_host(domain))
# Filter out wildcards and invalid entries
cleaned = set()
for s in all_subs:
s = s.lstrip("*.")
if not s or not s.endswith(domain):
continue
# Skip obviously invalid entries (contain spaces, too short)
if " " in s or len(s) < len(domain):
continue
cleaned.add(s)
elapsed = round(time.time() - t0, 1)
await websocket.send_json({
"type": "status",
"message": f"Subdomain enumeration complete: {len(cleaned)} unique subdomains in {elapsed}s"
})
return sorted(cleaned)
# ================= TAKEOVER FINGERPRINT =================
def is_s3_takeover(srv, body):
lo=srv.lower()
return (("amazons3" in lo and ("NoSuchBucket" in body or "does not exist" in body))
or ("cloudfront" in lo and "NoSuchBucket" in body))
# ================= BUCKET STATUS + PERMISSION AUDITOR =================
def _is_denied_body(body):
"""Check if a response body contains a *specific* access-denied error code.
IMPORTANT: Only match precise cloud-provider denial codes.
Do NOT match generic strings like '<Error>', 'Forbidden', or 'Unauthorized'
because they appear in 404 / redirect / generic XML responses too."""
if not body:
return False
markers = (
"AccessDenied", "Access Denied",
"AuthenticationFailed", "AllAccessDisabled",
"InvalidAccessKeyId", "SignatureDoesNotMatch",
"PublicAccessNotPermitted", "AccountRequiresAHint",
"AuthorizationRequired", "AuthorizationPermissionMismatch",
)
return any(m in body for m in markers)
def _finalize_status(status, perms, dangling=False):
"""Derive final status from actual permission flags.
Any bucket with read/write/list access is PUBLIC, period.
Any existing bucket with NO public perms is PRIVATE."""
if dangling:
return status # keep Nonexistent etc.
if perms.get("write"):
return "Public Write Access"
if perms.get("list"):
return "Public Listing"
if perms.get("read"):
return "Public Read Access"
if perms.get("aclReadable"):
return "ACL Exposed"
# If the bucket exists but has no public perms → Private
sl = status.lower()
if any(k in sl for k in ("exists", "redirect", "private", "allaccessdisabled", "unknown acl")):
return "Exists (Private)"
return status
def _make_perms(status, read=False, write=False, list_perm=False, acl_read=False, dangling=False, security_checks=None):
"""Build a standardised permissions dict."""
perms = {
"read": read,
"write": write,
"list": list_perm,
"aclReadable": acl_read,
}
# Re-derive status from actual permission flags
final_status = _finalize_status(status, perms, dangling)
result = {
"status": final_status,
"permissions": perms,
"dangling": dangling,
}
if security_checks:
result["securityChecks"] = security_checks
return result
async def s3_bucket_status(b, session):
"""Deep ACL audit for AWS S3 buckets."""
perms = {"read": False, "write": False, "list": False, "aclReadable": False}
dangling = False
status = "Unknown"
base = b if ".amazonaws.com" in b else f"{b}.s3.amazonaws.com"
try:
# 1. LIST check (GET /)
r = await http_get(session, f"https://{base}/", allow_redirects=False)
if r:
body = await read_body(r)
# Order matters! Check positive signals and 404 BEFORE denied-body
# so non-existent buckets and public listings aren't mis-flagged.
if r.status == 200 and "<ListBucketResult" in body:
status = "Public Listing"
perms["list"] = True
perms["read"] = True
elif r.status == 200:
status = "Exists (200)"
perms["read"] = True
elif r.status == 404 or "NoSuchBucket" in body:
status = "Nonexistent"
dangling = True
return _make_perms(status, dangling=True)
elif "AllAccessDisabled" in body:
status = "AllAccessDisabled"
return _make_perms(status)
elif r.status in (301, 302, 307, 308):
status = f"Redirect {r.status}"
elif r.status in (401, 403) or _is_denied_body(body):
status = "Exists (Private)"
# 2. READ check (HEAD on a test key)
if not perms["read"]:
r2 = await http_get(session, f"https://{base}/robots.txt", allow_redirects=False)
if r2 and r2.status == 200:
rbody = await read_body(r2)
if not _is_denied_body(rbody):
perms["read"] = True
# 3. WRITE check (PUT a harmless test key, then DELETE)
try:
wr = await session.put(
f"https://{base}/.s3shastra_write_test_probe",
data=b"s3shastra-write-test",
headers={"Content-Type": "text/plain"},
)
if wr.status in (200, 204):
wbody = await read_body(wr)
if not _is_denied_body(wbody):
perms["write"] = True
# Clean up
await session.delete(f"https://{base}/.s3shastra_write_test_probe")
except Exception:
pass
# 4. ACL readable check
try:
acl_r = await http_get(session, f"https://{base}/?acl", allow_redirects=False)
if acl_r and acl_r.status == 200:
abody = await read_body(acl_r)
if not _is_denied_body(abody):
perms["aclReadable"] = True
except Exception:
pass
# 5. ENCRYPTION check (GET /?encryption)
sec_checks = {"encryption": "unknown", "encryptionAlgorithm": None, "corsOpen": "unknown", "presignedBypass": False}
try:
enc_r = await http_get(session, f"https://{base}/?encryption", allow_redirects=False)
if enc_r:
enc_body = await read_body(enc_r)
if enc_r.status == 200 and "ServerSideEncryptionConfiguration" in enc_body:
sec_checks["encryption"] = "enabled"
if "aws:kms" in enc_body:
sec_checks["encryptionAlgorithm"] = "SSE-KMS (aws:kms)"
elif "AES256" in enc_body:
sec_checks["encryptionAlgorithm"] = "SSE-S3 (AES-256)"
else:
sec_checks["encryptionAlgorithm"] = "SSE (type unknown)"
elif enc_r.status == 200:
sec_checks["encryption"] = "disabled"
elif _is_denied_body(enc_body) or enc_r.status in (401, 403):
sec_checks["encryption"] = "denied" # can't check, private
elif "ServerSideEncryptionConfigurationNotFoundError" in enc_body:
sec_checks["encryption"] = "disabled"
except Exception:
pass
# 6. CORS check (OPTIONS /)
try:
cors_r = await session.options(
f"https://{base}/",
headers={"Origin": "https://evil.com", "Access-Control-Request-Method": "GET"},
)
if cors_r:
acao = cors_r.headers.get("Access-Control-Allow-Origin", "")
if acao == "*":
sec_checks["corsOpen"] = "wildcard"
elif "evil.com" in acao:
sec_checks["corsOpen"] = "reflects_origin"
elif acao:
sec_checks["corsOpen"] = "restricted"
else:
sec_checks["corsOpen"] = "no_cors"
except Exception:
pass
# 7. PRESIGNED URL BYPASS check — test with a fake expired presigned token
try:
fake_presigned = (
f"https://{base}/test.txt"
f"?X-Amz-Algorithm=AWS4-HMAC-SHA256"
f"&X-Amz-Credential=AKIAIOSFODNN7EXAMPLE/20200101/us-east-1/s3/aws4_request"
f"&X-Amz-Date=20200101T000000Z&X-Amz-Expires=1"
f"&X-Amz-Signature=0000000000000000000000000000000000000000000000000000000000000000"
f"&X-Amz-SignedHeaders=host"
)
pr = await http_get(session, fake_presigned, allow_redirects=False)
if pr and pr.status == 200:
pr_body = await read_body(pr)
if not _is_denied_body(pr_body) and len(pr_body) > 0:
sec_checks["presignedBypass"] = True
except Exception:
pass
except Exception:
pass
return _make_perms(status, **perms, dangling=dangling, security_checks=sec_checks)
async def gcs_bucket_status(b, session):
"""Deep ACL audit for GCS buckets."""
perms = {"read": False, "write": False, "list": False, "aclReadable": False}
dangling = False
status = "Unknown"
try:
# Metadata API — 200 means public metadata, check 404 before denied
r = await http_get(session, f"https://storage.googleapis.com/storage/v1/b/{b}", allow_redirects=False)
if r:
mbody = await read_body(r)
if r.status == 200:
status = "Exists (Metadata)"
perms["read"] = True
elif r.status == 404:
return _make_perms("Nonexistent", dangling=True)
elif r.status in (401, 403) or _is_denied_body(mbody):
status = "Exists (Private)"
# LIST check — positive signals first, then denied
r2 = await http_get(session, f"https://storage.googleapis.com/{b}", allow_redirects=False)
if r2:
body = await read_body(r2)
if r2.status == 200 and ("<ListBucketResult" in body or "Index of /" in body):
status = "Public Listing"
perms["list"] = True
perms["read"] = True
elif r2.status == 200:
status = "Exists (200)"
perms["read"] = True
elif r2.status in (401, 403) or _is_denied_body(body):
status = "Exists (Private)"
# ACL check
try:
acl_r = await http_get(session, f"https://storage.googleapis.com/storage/v1/b/{b}/iam", allow_redirects=False)
if acl_r and acl_r.status == 200:
acl_body = await read_body(acl_r)
if not _is_denied_body(acl_body):
perms["aclReadable"] = True
except Exception:
pass
except Exception:
pass
return _make_perms(status, **perms, dangling=dangling)
async def azure_blob_status(acc, session):
"""Deep ACL audit for Azure Blob."""
perms = {"read": False, "write": False, "list": False, "aclReadable": False}
dangling = False
status = "Unknown"
host = acc if ".blob.core.windows.net" in acc else f"{acc}.blob.core.windows.net"
try:
r = await http_get(session, f"https://{host}/?comp=list", allow_redirects=False)
if not r:
return _make_perms("Unknown")
body = await read_body(r)
# Check positive and 404 BEFORE denied to avoid mis-flagging
if r.status == 200 and "<EnumerationResults" in body:
status = "Public Listing"
perms["list"] = True
perms["read"] = True
elif r.status == 404 or "ResourceNotFound" in body:
status = "Nonexistent"
dangling = True
elif r.status in (401, 403) or _is_denied_body(body):
status = "Exists (Private)"
except Exception:
pass
return _make_perms(status, **perms, dangling=dangling)
async def s3_compatible_status(h, session):
"""Deep ACL audit for S3-compatible providers."""
perms = {"read": False, "write": False, "list": False, "aclReadable": False}
dangling = False
status = "Unknown"
try:
if h.startswith("http"):
cand = [h]
elif "." in h:
cand = [f"https://{h}/"]
else:
cand = [f"https://{h}.s3.amazonaws.com/"]
for u in cand:
r = await http_get(session, u, allow_redirects=False)
if not r:
continue
body = await read_body(r)
# Check positive and 404 BEFORE denied to avoid mis-flagging
if r.status == 200 and "<ListBucketResult" in body:
status = "Public Listing"
perms["list"] = True
perms["read"] = True
elif r.status == 200:
status = "Exists (200)"
perms["read"] = True
elif r.status == 404 or "NoSuchBucket" in body:
status = "Nonexistent"
dangling = True
elif r.status in (401, 403) or _is_denied_body(body):
status = "Exists (Private)"
except Exception:
pass
return _make_perms(status, **perms, dangling=dangling)
async def classify_bucket(provider, ref, session):
"""Route to the correct provider auditor and return {status, permissions, dangling}."""
try:
if provider == "AWS S3":
m = re.search(r"s3(?:[.-][a-z0-9-]+)?\.amazonaws\.com/([a-z0-9][a-z0-9.-]{1,61}[a-z0-9])", ref, re.I)
b = m.group(1) if m else ref
return await s3_bucket_status(b, session)
if provider == "Google Cloud":
m = re.search(r"(?:storage\.googleapis\.com/|^)([a-z0-9.-]+)", ref, re.I)
b = m.group(1) if m else ref
return await gcs_bucket_status(b, session)
if provider == "Azure Blob":
m = re.search(r"([a-z0-9-]+)\.blob\.core\.windows\.net", ref, re.I)
a = m.group(1) if m else ref
return await azure_blob_status(a, session)
if provider in ("DigitalOcean", "Wasabi", "Cloudflare R2", "Backblaze B2",
"Linode", "Oracle Cloud", "Alibaba OSS", "Tencent COS", "Yandex"):
return await s3_compatible_status(ref, session)
return _make_perms("Unknown")
except Exception:
return _make_perms("Unknown")
# ================= TAKEOVER SCRIPT GENERATOR =================
def generate_takeover_script(provider, bucket_name, ref):
"""Generate a provider-specific bucket claim script for authorized pentesting."""
scripts = {}
if provider == "AWS S3":
b = bucket_name
scripts["aws_cli"] = (
f"# AWS S3 Bucket Takeover — AUTHORIZED PENTESTING ONLY\n"
f"# Verify the bucket is truly dangling first:\n"
f"aws s3 ls s3://{b} 2>&1 | grep -q NoSuchBucket && echo 'DANGLING'\n\n"
f"# Claim the bucket:\n"
f"aws s3 mb s3://{b} --region us-east-1\n\n"
f"# Upload proof-of-concept:\n"
f"echo 's3shastra-takeover-proof' | aws s3 cp - s3://{b}/takeover-proof.txt\n\n"
f"# Block public access immediately:\n"
f"aws s3api put-public-access-block --bucket {b} \\\n"
f" --public-access-block-configuration BlockPublicAcls=true,IgnorePublicAcls=true,"
f"BlockPublicPolicy=true,RestrictPublicBuckets=true"
)
elif provider == "Google Cloud":
b = bucket_name
scripts["gcloud"] = (
f"# GCS Bucket Takeover — AUTHORIZED PENTESTING ONLY\n"
f"gsutil ls gs://{b} 2>&1 | grep -q BucketNotFound && echo 'DANGLING'\n\n"
f"# Claim:\n"
f"gsutil mb -l US gs://{b}\n\n"
f"# Lock down:\n"
f"gsutil iam ch -d allUsers gs://{b}\n"
f"gsutil iam ch -d allAuthenticatedUsers gs://{b}"
)
elif provider == "Azure Blob":
b = bucket_name
scripts["az_cli"] = (
f"# Azure Blob Takeover — AUTHORIZED PENTESTING ONLY\n"
f"# Create storage account (name must be globally unique):\n"
f"az storage account create --name {b[:24].replace('.','').replace('-','')} "
f"--resource-group YOUR_RG --location eastus --sku Standard_LRS\n\n"
f"# Disable public access:\n"
f"az storage account update --name {b[:24].replace('.','').replace('-','')} "
f"--resource-group YOUR_RG --allow-blob-public-access false"
)
else:
scripts["generic"] = (
f"# {provider} bucket '{bucket_name}' appears dangling.\n"
f"# Consult your provider's CLI docs to claim it.\n"
f"# Reference URL: {ref}"
)
return scripts
# ================= EXPOSURE TIMELINE (Wayback Machine) =================
async def get_exposure_timeline(bucket_ref, session):
"""Query Wayback Machine CDX API to estimate how long a bucket URL has been indexed."""
timeline = []
try:
# Search for the bucket URL in the Wayback Machine
cdx_url = (
f"https://web.archive.org/cdx/search/cdx"
f"?url={bucket_ref}&output=json&fl=timestamp,statuscode,original"
f"&collapse=timestamp:6&limit=200"
)
t = ClientTimeout(total=30, sock_read=25)
r = await session.get(cdx_url, timeout=t, allow_redirects=True)
if r.status == 200:
data = json.loads(await r.text())
if len(data) > 1: # first row is header
for row in data[1:]:
ts = row[0] # e.g. "20210315120000"
try:
year = ts[:4]
month = ts[4:6]
day = ts[6:8]
date_str = f"{year}-{month}-{day}"
timeline.append({
"date": date_str,
"timestamp": ts,
"statusCode": row[1] if len(row) > 1 else "",
"url": row[2] if len(row) > 2 else bucket_ref,
})
except Exception:
continue
except Exception:
pass
first_seen = timeline[0]["date"] if timeline else None
last_seen = timeline[-1]["date"] if timeline else None
total_snapshots = len(timeline)
# Calculate exposure duration
exposure_days = None
if first_seen and last_seen:
from datetime import datetime as dt
try:
d1 = dt.strptime(first_seen, "%Y-%m-%d")
d2 = dt.strptime(last_seen, "%Y-%m-%d")
exposure_days = (d2 - d1).days
except Exception:
pass
return {
"bucketRef": bucket_ref,
"firstSeen": first_seen,
"lastSeen": last_seen,
"totalSnapshots": total_snapshots,
"exposureDays": exposure_days,
"timeline": timeline[:50], # cap at 50 entries for the frontend
}
# ================= SCANNING =================
async def fetch_page(session, url, headers):
try:
r = await session.get(url, headers=headers, allow_redirects=True)
try:
txt = await r.text(encoding="utf-8", errors="ignore")
except Exception:
txt = await read_body(r)
return txt, r.headers.get("Server", ""), r.status
except Exception:
return "", "", 0
async def scan_subdomain_once(sub, session, args, websocket, seen_refs: Set[str]):
if not include_exclude(sub, args.inc_regex, args.exc_regex):
return
found = set()
takeover = False
headers = {"User-Agent": args.user_agent, "Accept-Encoding": "gzip, deflate"}
for scheme in ("https://", "http://"):
url = f"{scheme}{sub}"
content, server, _ = await fetch_page(session, url, headers)
if content and not args.no_takeover:
try:
if is_s3_takeover(server, content):
await websocket.send_json({"type": "takeover", "data": {"subdomain": sub, "server": server}})
takeover = True
except Exception:
pass
if content:
for m in re.finditer(BUCKET_REGEX, content):
found.add(norm_match(m.group(0)))
if found:
for ref in sorted(found):
if ref in seen_refs:
continue
seen_refs.add(ref)
prov = identify_provider(ref)
if args.providers and prov.lower() not in {PROVIDER_LIMITS_MAP[x.lower()].lower() for x in args.providers}:
continue
result = {"subdomain": sub, "reference": ref, "provider": prov, "status": "Found", "server": server, "url": url}
await websocket.send_json({"type": "result", "data": result})
if scheme == "https://" and (takeover or found):
break
if found and not args.no_bucket_checks:
sem = asyncio.Semaphore(min(50, args.threads))
async def chk(rf):
prov = identify_provider(rf)
if args.providers and prov.lower() not in {PROVIDER_LIMITS_MAP[x.lower()].lower() for x in args.providers}:
return
async with sem:
audit = await classify_bucket(prov, rf, session)
# Extract bucket name for takeover scripts
bucket_name = rf.split("/")[-1] if "/" in rf else rf.split(".")[0]
rec = {
"subdomain": sub,
"reference": rf,
"provider": prov,
"status": audit["status"],
"permissions": audit["permissions"],
"dangling": audit["dangling"],
}
# Include security checks if available (encryption, CORS, presigned URL)
if audit.get("securityChecks"):
rec["securityChecks"] = audit["securityChecks"]
# If dangling, generate takeover scripts
if audit["dangling"]:
rec["takeoverScripts"] = generate_takeover_script(prov, bucket_name, rf)
await websocket.send_json({"type": "result_update", "data": rec})
await asyncio.gather(*(chk(rf) for rf in sorted(found)))
async def scan_subdomain(sub, session, args, websocket, seen_refs, subdomain_timeout):
try:
await asyncio.wait_for(
scan_subdomain_once(sub, session, args, websocket, seen_refs),
timeout=subdomain_timeout
)
except asyncio.TimeoutError:
await websocket.send_json({"type": "status", "message": f"Timeout scanning {sub}"})
except Exception as e:
await websocket.send_json({"type": "status", "message": f"Error scanning {sub}: {e}"})
# ================= ORCHESTRATION =================
async def entrypoint_scan(domain, args, websocket):
await websocket.send_json({"type": "status", "message": f"Starting scan for {domain}"})
seen_refs: Set[str] = set()
timeout = build_timeout(args.timeout, args.connect_timeout, args.read_timeout)
resolver = aiohttp.AsyncResolver() if sys.platform == 'win32' else None
conn = TCPConnector(ssl=not args.insecure, limit=args.threads, limit_per_host=args.per_host, resolver=resolver)
args.inc_regex = re.compile(args.include) if args.include else None
args.exc_regex = re.compile(args.exclude) if args.exclude else None
async with ClientSession(connector=conn, timeout=timeout, headers={"User-Agent": args.user_agent}) as sess:
subs = await get_subdomains(domain, sess, websocket)
if not subs:
await websocket.send_json({"type": "status", "message": f"No subdomains found for {domain}"})
return
await websocket.send_json({"type": "status", "message": f"Found {len(subs)} subdomains for {domain}. Starting scan..."})
await websocket.send_json({"type": "progress_total", "total": len(subs)})
sem = asyncio.Semaphore(args.threads)
tasks = []
for s in subs:
# Fix: capture `s` via default argument to avoid closure bug
async def bscan(sd=s):
async with sem:
await scan_subdomain(sd, sess, args, websocket, seen_refs, args.subdomain_timeout)
try:
await websocket.send_json({"type": "progress_update"})
except Exception:
pass
tasks.append(asyncio.create_task(bscan()))
await asyncio.gather(*tasks)