vulnscan / engine.py
wuhp's picture
Update engine.py
776f1f8 verified
"""
Scanner Engine
==============
Orchestrates all plugins with a hard overall timeout.
All files live in the same flat directory.
"""
import asyncio
import httpx
from typing import Type
from base import Finding, VulnerabilityPlugin
from security_headers import SecurityHeaderPlugin
from privilege_escalation import BOLAPrivilegeEscalationPlugin
from cors import CORSPlugin
from open_redirect import OpenRedirectPlugin
from sensitive_exposure import SensitiveExposurePlugin
from sqli import SQLInjectionPlugin
PLUGINS: list[Type[VulnerabilityPlugin]] = [
SecurityHeaderPlugin, # fast ~1-2s
CORSPlugin, # fast ~1-2s
OpenRedirectPlugin, # medium ~5-10s
SensitiveExposurePlugin, # medium ~10-20s (concurrent, filtered)
SQLInjectionPlugin, # medium ~5-15s
BOLAPrivilegeEscalationPlugin, # slow ~10-30s (only if API found)
]
# Hard cap: if a plugin takes longer than this, it's killed and skipped.
PLUGIN_TIMEOUT_S = 45
# Hard cap for the entire scan.
SCAN_TIMEOUT_S = 120
async def run_scan(
target: str,
plugins: list[Type[VulnerabilityPlugin]] | None = None,
) -> list[Finding]:
selected = plugins or PLUGINS
findings: list[Finding] = []
async with httpx.AsyncClient(
follow_redirects=True,
verify=False,
timeout=httpx.Timeout(connect=5, read=10, write=5, pool=5),
limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"},
) as client:
try:
tasks = [_run_plugin(cls, client, target) for cls in selected]
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=SCAN_TIMEOUT_S,
)
except asyncio.TimeoutError:
print(f"[engine] global scan timeout ({SCAN_TIMEOUT_S}s) reached")
results = []
for result in results:
if isinstance(result, list):
findings.extend(result)
elif isinstance(result, Exception):
print(f"[engine] plugin error: {result}")
findings.sort(key=lambda f: _severity_order(f.severity))
return findings
async def _run_plugin(
plugin_cls: Type[VulnerabilityPlugin],
client: httpx.AsyncClient,
target: str,
) -> list[Finding]:
plugin = plugin_cls(client)
print(f"[{plugin.name}] starting")
try:
findings = await asyncio.wait_for(plugin.run(target), timeout=PLUGIN_TIMEOUT_S)
print(f"[{plugin.name}] done — {len(findings)} finding(s)")
return findings
except asyncio.TimeoutError:
print(f"[{plugin.name}] timed out after {PLUGIN_TIMEOUT_S}s — skipping")
return []
except Exception as exc:
print(f"[{plugin.name}] error: {exc}")
return []
def _severity_order(severity) -> int:
return {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}.get(
severity.value if hasattr(severity, "value") else str(severity), 5
)
if __name__ == "__main__":
import sys, json
target = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:3000"
print(f"Scanning: {target}\n")
results = asyncio.run(run_scan(target))
print(f"\n{'='*60}\nFINDINGS: {len(results)}\n{'='*60}")
for f in results:
print(f"\n[{f.severity.value.upper()}] {f.title}")
print(f" {f.owasp} | {f.cwe}")
print("\nFull JSON:")
print(json.dumps([f.to_dict() for f in results], indent=2))