""" server.py ========= FastAPI server — serves index.html at / and exposes the scan engine at /api/scan. Runs on port 7860 for Hugging Face Spaces. """ import asyncio import json from pathlib import Path import uvicorn from fastapi import FastAPI, Query, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, StreamingResponse from engine import run_scan, PLUGINS import httpx app = FastAPI(title="VulnScanner", version="2.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["GET", "POST"], allow_headers=["*"], ) INDEX = Path(__file__).parent / "index.html" @app.get("/", response_class=HTMLResponse) async def root(): if INDEX.exists(): return HTMLResponse(INDEX.read_text()) return HTMLResponse("

index.html not found

", status_code=404) @app.get("/api/scan") async def scan(target: str = Query(..., description="Target URL to scan")): """Run all plugins against target, return findings as JSON.""" if not target.startswith(("http://", "https://")): raise HTTPException(status_code=400, detail="target must start with http:// or https://") try: findings = await run_scan(target) except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) return {"target": target, "count": len(findings), "findings": [f.to_dict() for f in findings]} @app.get("/api/scan/stream") async def scan_stream(target: str = Query(...)): """ Server-Sent Events — emits each finding as it's discovered so the UI can show live results without waiting for all plugins to finish. """ if not target.startswith(("http://", "https://")): raise HTTPException(status_code=400, detail="target must start with http:// or https://") async def event_gen(): async with httpx.AsyncClient( follow_redirects=True, verify=False, timeout=15, headers={"User-Agent": "VulnScanner/1.0 (security-research; authorized-scan)"}, ) as client: for plugin_cls in PLUGINS: try: plugin = plugin_cls(client) findings = await plugin.run(target) for f in findings: yield f"data: {json.dumps(f.to_dict())}\n\n" except Exception as exc: yield f"data: {json.dumps({'error': str(exc), 'plugin': plugin_cls.name})}\n\n" yield 'data: {"done": true}\n\n' return StreamingResponse(event_gen(), media_type="text/event-stream") @app.get("/api/plugins") async def list_plugins(): return [{"name": cls.name, "description": cls.description} for cls in PLUGINS] if __name__ == "__main__": uvicorn.run("server:app", host="0.0.0.0", port=7860, reload=False)