vulnscan / server.py
wuhp's picture
Update server.py
fdb5a25 verified
"""
server.py
=========
FastAPI server — serves index.html at / and exposes the scan engine at /api/scan.
Runs on port 7860 for Hugging Face Spaces.
"""
import asyncio
import json
from pathlib import Path
import uvicorn
from fastapi import FastAPI, Query, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, StreamingResponse
from engine import run_scan, PLUGINS
import httpx
app = FastAPI(title="VulnScanner", version="2.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"],
)
INDEX = Path(__file__).parent / "index.html"
@app.get("/", response_class=HTMLResponse)
async def root():
if INDEX.exists():
return HTMLResponse(INDEX.read_text())
return HTMLResponse("<h1>index.html not found</h1>", status_code=404)
@app.get("/api/scan")
async def scan(target: str = Query(..., description="Target URL to scan")):
"""Run all plugins against target, return findings as JSON."""
if not target.startswith(("http://", "https://")):
raise HTTPException(status_code=400, detail="target must start with http:// or https://")
try:
findings = await run_scan(target)
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
return {"target": target, "count": len(findings), "findings": [f.to_dict() for f in findings]}
@app.get("/api/scan/stream")
async def scan_stream(target: str = Query(...)):
"""
Server-Sent Events — emits each finding as it's discovered so the UI
can show live results without waiting for all plugins to finish.
"""
if not target.startswith(("http://", "https://")):
raise HTTPException(status_code=400, detail="target must start with http:// or https://")
async def event_gen():
async with httpx.AsyncClient(
follow_redirects=True,
verify=False,
timeout=15,
headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"},
) as client:
for plugin_cls in PLUGINS:
try:
plugin = plugin_cls(client)
findings = await plugin.run(target)
for f in findings:
yield f"data: {json.dumps(f.to_dict())}\n\n"
except Exception as exc:
yield f"data: {json.dumps({'error': str(exc), 'plugin': plugin_cls.name})}\n\n"
yield 'data: {"done": true}\n\n'
return StreamingResponse(event_gen(), media_type="text/event-stream")
@app.get("/api/plugins")
async def list_plugins():
return [{"name": cls.name, "description": cls.description} for cls in PLUGINS]
if __name__ == "__main__":
uvicorn.run("server:app", host="0.0.0.0", port=7860, reload=False)