File size: 3,539 Bytes
eafb640
 
 
776f1f8
 
eafb640
 
bd755fa
 
eafb640
 
177b441
 
 
 
 
 
 
bd755fa
eafb640
776f1f8
 
 
 
 
 
bd755fa
 
776f1f8
 
 
 
 
bd755fa
177b441
 
 
 
eafb640
bd755fa
 
 
 
177b441
776f1f8
 
 
bd755fa
776f1f8
 
 
 
 
 
 
 
 
eafb640
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
776f1f8
 
 
 
 
 
 
 
 
 
 
bd755fa
 
eafb640
177b441
 
 
eafb640
 
 
776f1f8
177b441
eafb640
 
776f1f8
eafb640
 
 
177b441
eafb640
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
Scanner Engine
==============
Orchestrates all plugins with a hard overall timeout.
All files live in the same flat directory.
"""

import asyncio
import httpx
from typing import Type

from base import Finding, VulnerabilityPlugin
from security_headers import SecurityHeaderPlugin
from privilege_escalation import BOLAPrivilegeEscalationPlugin
from cors import CORSPlugin
from open_redirect import OpenRedirectPlugin
from sensitive_exposure import SensitiveExposurePlugin
from sqli import SQLInjectionPlugin

PLUGINS: list[Type[VulnerabilityPlugin]] = [
    SecurityHeaderPlugin,       # fast  ~1-2s
    CORSPlugin,                 # fast  ~1-2s
    OpenRedirectPlugin,         # medium ~5-10s
    SensitiveExposurePlugin,    # medium ~10-20s (concurrent, filtered)
    SQLInjectionPlugin,         # medium ~5-15s
    BOLAPrivilegeEscalationPlugin,  # slow ~10-30s (only if API found)
]

# Hard cap: if a plugin takes longer than this, it's killed and skipped.
PLUGIN_TIMEOUT_S = 45
# Hard cap for the entire scan.
SCAN_TIMEOUT_S = 120


async def run_scan(
    target: str,
    plugins: list[Type[VulnerabilityPlugin]] | None = None,
) -> list[Finding]:
    selected = plugins or PLUGINS
    findings: list[Finding] = []

    async with httpx.AsyncClient(
        follow_redirects=True,
        verify=False,
        timeout=httpx.Timeout(connect=5, read=10, write=5, pool=5),
        limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
        headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"},
    ) as client:
        try:
            tasks = [_run_plugin(cls, client, target) for cls in selected]
            results = await asyncio.wait_for(
                asyncio.gather(*tasks, return_exceptions=True),
                timeout=SCAN_TIMEOUT_S,
            )
        except asyncio.TimeoutError:
            print(f"[engine] global scan timeout ({SCAN_TIMEOUT_S}s) reached")
            results = []

    for result in results:
        if isinstance(result, list):
            findings.extend(result)
        elif isinstance(result, Exception):
            print(f"[engine] plugin error: {result}")

    findings.sort(key=lambda f: _severity_order(f.severity))
    return findings


async def _run_plugin(
    plugin_cls: Type[VulnerabilityPlugin],
    client: httpx.AsyncClient,
    target: str,
) -> list[Finding]:
    plugin = plugin_cls(client)
    print(f"[{plugin.name}] starting")
    try:
        findings = await asyncio.wait_for(plugin.run(target), timeout=PLUGIN_TIMEOUT_S)
        print(f"[{plugin.name}] done — {len(findings)} finding(s)")
        return findings
    except asyncio.TimeoutError:
        print(f"[{plugin.name}] timed out after {PLUGIN_TIMEOUT_S}s — skipping")
        return []
    except Exception as exc:
        print(f"[{plugin.name}] error: {exc}")
        return []


def _severity_order(severity) -> int:
    return {"critical": 0, "high": 1, "medium": 2, "low": 3, "info": 4}.get(
        severity.value if hasattr(severity, "value") else str(severity), 5
    )


if __name__ == "__main__":
    import sys, json
    target = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:3000"
    print(f"Scanning: {target}\n")
    results = asyncio.run(run_scan(target))
    print(f"\n{'='*60}\nFINDINGS: {len(results)}\n{'='*60}")
    for f in results:
        print(f"\n[{f.severity.value.upper()}] {f.title}")
        print(f"  {f.owasp} | {f.cwe}")
    print("\nFull JSON:")
    print(json.dumps([f.to_dict() for f in results], indent=2))