""" Scanner Engine ============== Orchestrates all plugins with a hard overall timeout. All files live in the same flat directory. """ import asyncio import httpx from typing import Type from base import Finding, VulnerabilityPlugin from security_headers import SecurityHeaderPlugin from privilege_escalation import BOLAPrivilegeEscalationPlugin from cors import CORSPlugin from open_redirect import OpenRedirectPlugin from sensitive_exposure import SensitiveExposurePlugin from sqli import SQLInjectionPlugin PLUGINS: list[Type[VulnerabilityPlugin]] = [ SecurityHeaderPlugin, # fast ~1-2s CORSPlugin, # fast ~1-2s OpenRedirectPlugin, # medium ~5-10s SensitiveExposurePlugin, # medium ~10-20s (concurrent, filtered) SQLInjectionPlugin, # medium ~5-15s BOLAPrivilegeEscalationPlugin, # slow ~10-30s (only if API found) ] # Hard cap: if a plugin takes longer than this, it's killed and skipped. PLUGIN_TIMEOUT_S = 45 # Hard cap for the entire scan. SCAN_TIMEOUT_S = 120 async def run_scan( target: str, plugins: list[Type[VulnerabilityPlugin]] | None = None, ) -> list[Finding]: selected = plugins or PLUGINS findings: list[Finding] = [] async with httpx.AsyncClient( follow_redirects=True, verify=False, timeout=httpx.Timeout(connect=5, read=10, write=5, pool=5), limits=httpx.Limits(max_connections=20, max_keepalive_connections=10), headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"}, ) as client: try: tasks = [_run_plugin(cls, client, target) for cls in selected] results = await asyncio.wait_for( asyncio.gather(*tasks, return_exceptions=True), timeout=SCAN_TIMEOUT_S, ) except asyncio.TimeoutError: print(f"[engine] global scan timeout ({SCAN_TIMEOUT_S}s) reached") results = [] for result in results: if isinstance(result, list): findings.extend(result) elif isinstance(result, Exception): print(f"[engine] plugin error: {result}") findings.sort(key=lambda f: _severity_order(f.severity)) return findings async def _run_plugin( plugin_cls: Type[VulnerabilityPlugin], client: httpx.AsyncClient, target: str, ) -> list[Finding]: plugin = plugin_cls(client) print(f"[{plugin.name}] starting") try: findings = await asyncio.wait_for(plugin.run(target), timeout=PLUGIN_TIMEOUT_S) print(f"[{plugin.name}] done — {len(findings)} finding(s)") return findings except asyncio.TimeoutError: print(f"[{plugin.name}] timed out after {PLUGIN_TIMEOUT_S}s — skipping") return [] except Exception as exc: print(f"[{plugin.name}] error: {exc}") return [] def _severity_order(severity) -> int: return {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}.get( severity.value if hasattr(severity, "value") else str(severity), 5 ) if __name__ == "__main__": import sys, json target = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:3000" print(f"Scanning: {target}\n") results = asyncio.run(run_scan(target)) print(f"\n{'='*60}\nFINDINGS: {len(results)}\n{'='*60}") for f in results: print(f"\n[{f.severity.value.upper()}] {f.title}") print(f" {f.owasp} | {f.cwe}") print("\nFull JSON:") print(json.dumps([f.to_dict() for f in results], indent=2))