File size: 2,854 Bytes
07410f6
fdb5a25
 
 
 
07410f6
 
 
 
 
 
 
 
 
 
 
fdb5a25
 
07410f6
fdb5a25
07410f6
 
 
 
 
 
 
 
 
 
fdb5a25
07410f6
 
 
 
fdb5a25
07410f6
 
 
 
fdb5a25
07410f6
 
 
 
 
 
fdb5a25
07410f6
 
 
 
 
fdb5a25
 
07410f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fdb5a25
07410f6
fdb5a25
 
07410f6
 
 
 
fdb5a25
 
 
 
 
07410f6
fdb5a25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
server.py
=========
FastAPI server — serves index.html at / and exposes the scan engine at /api/scan.
Runs on port 7860 for Hugging Face Spaces.
"""

import asyncio
import json
from pathlib import Path

import uvicorn
from fastapi import FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, StreamingResponse

from engine import run_scan, PLUGINS
import httpx

app = FastAPI(title="VulnScanner", version="2.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["GET", "POST"],
    allow_headers=["*"],
)

INDEX = Path(__file__).parent / "index.html"


@app.get("/", response_class=HTMLResponse)
async def root():
    if INDEX.exists():
        return HTMLResponse(INDEX.read_text())
    return HTMLResponse("<h1>index.html not found</h1>", status_code=404)


@app.get("/api/scan")
async def scan(target: str = Query(..., description="Target URL to scan")):
    """Run all plugins against target, return findings as JSON."""
    if not target.startswith(("http://", "https://")):
        raise HTTPException(status_code=400, detail="target must start with http:// or https://")
    try:
        findings = await run_scan(target)
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc))
    return {"target": target, "count": len(findings), "findings": [f.to_dict() for f in findings]}


@app.get("/api/scan/stream")
async def scan_stream(target: str = Query(...)):
    """
    Server-Sent Events — emits each finding as it's discovered so the UI
    can show live results without waiting for all plugins to finish.
    """
    if not target.startswith(("http://", "https://")):
        raise HTTPException(status_code=400, detail="target must start with http:// or https://")

    async def event_gen():
        async with httpx.AsyncClient(
            follow_redirects=True,
            verify=False,
            timeout=15,
            headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"},
        ) as client:
            for plugin_cls in PLUGINS:
                try:
                    plugin = plugin_cls(client)
                    findings = await plugin.run(target)
                    for f in findings:
                        yield f"data: {json.dumps(f.to_dict())}\n\n"
                except Exception as exc:
                    yield f"data: {json.dumps({'error': str(exc), 'plugin': plugin_cls.name})}\n\n"
        yield 'data: {"done": true}\n\n'

    return StreamingResponse(event_gen(), media_type="text/event-stream")


@app.get("/api/plugins")
async def list_plugins():
    return [{"name": cls.name, "description": cls.description} for cls in PLUGINS]


if __name__ == "__main__":
    uvicorn.run("server:app", host="0.0.0.0", port=7860, reload=False)