| """ |
| server.py |
| ========= |
| FastAPI server — serves index.html at / and exposes the scan engine at /api/scan. |
| Runs on port 7860 for Hugging Face Spaces. |
| """ |
|
|
| import asyncio |
| import json |
| from pathlib import Path |
|
|
| import uvicorn |
| from fastapi import FastAPI, Query, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import HTMLResponse, StreamingResponse |
|
|
| from engine import run_scan, PLUGINS |
| import httpx |
|
|
| app = FastAPI(title="VulnScanner", version="2.0.0") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["GET", "POST"], |
| allow_headers=["*"], |
| ) |
|
|
| INDEX = Path(__file__).parent / "index.html" |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def root(): |
| if INDEX.exists(): |
| return HTMLResponse(INDEX.read_text()) |
| return HTMLResponse("<h1>index.html not found</h1>", status_code=404) |
|
|
|
|
| @app.get("/api/scan") |
| async def scan(target: str = Query(..., description="Target URL to scan")): |
| """Run all plugins against target, return findings as JSON.""" |
| if not target.startswith(("http://", "https://")): |
| raise HTTPException(status_code=400, detail="target must start with http:// or https://") |
| try: |
| findings = await run_scan(target) |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=str(exc)) |
| return {"target": target, "count": len(findings), "findings": [f.to_dict() for f in findings]} |
|
|
|
|
| @app.get("/api/scan/stream") |
| async def scan_stream(target: str = Query(...)): |
| """ |
| Server-Sent Events — emits each finding as it's discovered so the UI |
| can show live results without waiting for all plugins to finish. |
| """ |
| if not target.startswith(("http://", "https://")): |
| raise HTTPException(status_code=400, detail="target must start with http:// or https://") |
|
|
| async def event_gen(): |
| async with httpx.AsyncClient( |
| follow_redirects=True, |
| verify=False, |
| timeout=15, |
| headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"}, |
| ) as client: |
| for plugin_cls in PLUGINS: |
| try: |
| plugin = plugin_cls(client) |
| findings = await plugin.run(target) |
| for f in findings: |
| yield f"data: {json.dumps(f.to_dict())}\n\n" |
| except Exception as exc: |
| yield f"data: {json.dumps({'error': str(exc), 'plugin': plugin_cls.name})}\n\n" |
| yield 'data: {"done": true}\n\n' |
|
|
| return StreamingResponse(event_gen(), media_type="text/event-stream") |
|
|
|
|
| @app.get("/api/plugins") |
| async def list_plugins(): |
| return [{"name": cls.name, "description": cls.description} for cls in PLUGINS] |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run("server:app", host="0.0.0.0", port=7860, reload=False) |