import html from pathlib import Path import uvicorn from fastapi import FastAPI, Form, Query from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from starlette.middleware.base import BaseHTTPMiddleware from scanner.certification import CertificationPipeline from scanner.config import Settings from scanner.loaders.url import _validate_url from scanner.middleware import RateLimitMiddleware from scanner.monitor import MonitorStore from scanner.pipeline import PipelineOrchestrator from scanner.policies import PolicyGenerator from scanner.proxy import ContentSafetyProxy from scanner.redteam import AdversarialPageGenerator, ScannerEvaluator from scanner.reputation import ReputationEngine settings = Settings() orchestrator = PipelineOrchestrator(settings=settings) policy_gen = PolicyGenerator() monitor_store = MonitorStore() rep_engine = ReputationEngine() cert_pipeline = CertificationPipeline(orchestrator) HERE = Path(__file__).parent class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["Content-Security-Policy"] = ( "default-src 'self'; script-src 'self' https://unpkg.com; style-src 'self' 'unsafe-inline'" ) response.headers["Referrer-Policy"] = "no-referrer" return response app = FastAPI( title="Prompt Injection Scanner", description="Scan URLs, files, and text for prompt injection and adversarial content.", version="0.1.0", ) app.add_middleware(SecurityHeadersMiddleware) app.add_middleware(RateLimitMiddleware, max_requests=60, window_seconds=60) static_dir = HERE.parent.parent / "frontend" / "static" if static_dir.exists(): app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") templates_dir = HERE.parent.parent / "frontend" / "templates" def _read_template(name: str) -> str: path = templates_dir / name if path.exists(): return path.read_text() return "" # ─── Core Scan ─────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def index(): html = _read_template("index.html") if not html: return HTMLResponse("
Template not found.
") return HTMLResponse(html) @app.post("/scan") async def scan_url( url: str = Form(""), file_path: str = Form(""), paste: str = Form(""), ): if url: report = await orchestrator.scan_url(url) elif paste: report = await orchestrator.scan_paste(paste) elif file_path: resolved = Path(file_path).resolve() safe = Path(".").resolve() if safe not in resolved.parents and resolved != safe: return HTMLResponse("{html.escape(f.description)}
{html.escape(f.snippet[:200])}
{f'{html.escape(f.recommendation)}
' if f.recommendation else ""}