| """ |
| Scanner Engine |
| ============== |
| Orchestrates all plugins with a hard overall timeout. |
| All files live in the same flat directory. |
| """ |
|
|
| import asyncio |
| import httpx |
| from typing import Type |
|
|
| from base import Finding, VulnerabilityPlugin |
| from security_headers import SecurityHeaderPlugin |
| from privilege_escalation import BOLAPrivilegeEscalationPlugin |
| from cors import CORSPlugin |
| from open_redirect import OpenRedirectPlugin |
| from sensitive_exposure import SensitiveExposurePlugin |
| from sqli import SQLInjectionPlugin |
|
|
| PLUGINS: list[Type[VulnerabilityPlugin]] = [ |
| SecurityHeaderPlugin, |
| CORSPlugin, |
| OpenRedirectPlugin, |
| SensitiveExposurePlugin, |
| SQLInjectionPlugin, |
| BOLAPrivilegeEscalationPlugin, |
| ] |
|
|
| |
| PLUGIN_TIMEOUT_S = 45 |
| |
| SCAN_TIMEOUT_S = 120 |
|
|
|
|
| async def run_scan( |
| target: str, |
| plugins: list[Type[VulnerabilityPlugin]] | None = None, |
| ) -> list[Finding]: |
| selected = plugins or PLUGINS |
| findings: list[Finding] = [] |
|
|
| async with httpx.AsyncClient( |
| follow_redirects=True, |
| verify=False, |
| timeout=httpx.Timeout(connect=5, read=10, write=5, pool=5), |
| limits=httpx.Limits(max_connections=20, max_keepalive_connections=10), |
| headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"}, |
| ) as client: |
| try: |
| tasks = [_run_plugin(cls, client, target) for cls in selected] |
| results = await asyncio.wait_for( |
| asyncio.gather(*tasks, return_exceptions=True), |
| timeout=SCAN_TIMEOUT_S, |
| ) |
| except asyncio.TimeoutError: |
| print(f"[engine] global scan timeout ({SCAN_TIMEOUT_S}s) reached") |
| results = [] |
|
|
| for result in results: |
| if isinstance(result, list): |
| findings.extend(result) |
| elif isinstance(result, Exception): |
| print(f"[engine] plugin error: {result}") |
|
|
| findings.sort(key=lambda f: _severity_order(f.severity)) |
| return findings |
|
|
|
|
| async def _run_plugin( |
| plugin_cls: Type[VulnerabilityPlugin], |
| client: httpx.AsyncClient, |
| target: str, |
| ) -> list[Finding]: |
| plugin = plugin_cls(client) |
| print(f"[{plugin.name}] starting") |
| try: |
| findings = await asyncio.wait_for(plugin.run(target), timeout=PLUGIN_TIMEOUT_S) |
| print(f"[{plugin.name}] done — {len(findings)} finding(s)") |
| return findings |
| except asyncio.TimeoutError: |
| print(f"[{plugin.name}] timed out after {PLUGIN_TIMEOUT_S}s — skipping") |
| return [] |
| except Exception as exc: |
| print(f"[{plugin.name}] error: {exc}") |
| return [] |
|
|
|
|
| def _severity_order(severity) -> int: |
| return {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}.get( |
| severity.value if hasattr(severity, "value") else str(severity), 5 |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| import sys, json |
| target = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:3000" |
| print(f"Scanning: {target}\n") |
| results = asyncio.run(run_scan(target)) |
| print(f"\n{'='*60}\nFINDINGS: {len(results)}\n{'='*60}") |
| for f in results: |
| print(f"\n[{f.severity.value.upper()}] {f.title}") |
| print(f" {f.owasp} | {f.cwe}") |
| print("\nFull JSON:") |
| print(json.dumps([f.to_dict() for f in results], indent=2)) |