File size: 3,539 Bytes
eafb640 776f1f8 eafb640 bd755fa eafb640 177b441 bd755fa eafb640 776f1f8 bd755fa 776f1f8 bd755fa 177b441 eafb640 bd755fa 177b441 776f1f8 bd755fa 776f1f8 eafb640 776f1f8 bd755fa eafb640 177b441 eafb640 776f1f8 177b441 eafb640 776f1f8 eafb640 177b441 eafb640 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | """
Scanner Engine
==============
Orchestrates all plugins with a hard overall timeout.
All files live in the same flat directory.
"""
import asyncio
import httpx
from typing import Type
from base import Finding, VulnerabilityPlugin
from security_headers import SecurityHeaderPlugin
from privilege_escalation import BOLAPrivilegeEscalationPlugin
from cors import CORSPlugin
from open_redirect import OpenRedirectPlugin
from sensitive_exposure import SensitiveExposurePlugin
from sqli import SQLInjectionPlugin
PLUGINS: list[Type[VulnerabilityPlugin]] = [
SecurityHeaderPlugin, # fast ~1-2s
CORSPlugin, # fast ~1-2s
OpenRedirectPlugin, # medium ~5-10s
SensitiveExposurePlugin, # medium ~10-20s (concurrent, filtered)
SQLInjectionPlugin, # medium ~5-15s
BOLAPrivilegeEscalationPlugin, # slow ~10-30s (only if API found)
]
# Hard cap: if a plugin takes longer than this, it's killed and skipped.
PLUGIN_TIMEOUT_S = 45
# Hard cap for the entire scan.
SCAN_TIMEOUT_S = 120
async def run_scan(
target: str,
plugins: list[Type[VulnerabilityPlugin]] | None = None,
) -> list[Finding]:
selected = plugins or PLUGINS
findings: list[Finding] = []
async with httpx.AsyncClient(
follow_redirects=True,
verify=False,
timeout=httpx.Timeout(connect=5, read=10, write=5, pool=5),
limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"},
) as client:
try:
tasks = [_run_plugin(cls, client, target) for cls in selected]
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=SCAN_TIMEOUT_S,
)
except asyncio.TimeoutError:
print(f"[engine] global scan timeout ({SCAN_TIMEOUT_S}s) reached")
results = []
for result in results:
if isinstance(result, list):
findings.extend(result)
elif isinstance(result, Exception):
print(f"[engine] plugin error: {result}")
findings.sort(key=lambda f: _severity_order(f.severity))
return findings
async def _run_plugin(
plugin_cls: Type[VulnerabilityPlugin],
client: httpx.AsyncClient,
target: str,
) -> list[Finding]:
plugin = plugin_cls(client)
print(f"[{plugin.name}] starting")
try:
findings = await asyncio.wait_for(plugin.run(target), timeout=PLUGIN_TIMEOUT_S)
print(f"[{plugin.name}] done — {len(findings)} finding(s)")
return findings
except asyncio.TimeoutError:
print(f"[{plugin.name}] timed out after {PLUGIN_TIMEOUT_S}s — skipping")
return []
except Exception as exc:
print(f"[{plugin.name}] error: {exc}")
return []
def _severity_order(severity) -> int:
return {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}.get(
severity.value if hasattr(severity, "value") else str(severity), 5
)
if __name__ == "__main__":
import sys, json
target = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:3000"
print(f"Scanning: {target}\n")
results = asyncio.run(run_scan(target))
print(f"\n{'='*60}\nFINDINGS: {len(results)}\n{'='*60}")
for f in results:
print(f"\n[{f.severity.value.upper()}] {f.title}")
print(f" {f.owasp} | {f.cwe}")
print("\nFull JSON:")
print(json.dumps([f.to_dict() for f in results], indent=2)) |