HFGitDorker / app.py
Jack698's picture
Upload folder using huggingface_hub
98aef54 verified
import os
import asyncio
from fastapi import FastAPI, Query, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from dorker import search
app = FastAPI()
templates = Jinja2Templates(directory="templates")
async def stream_dorker_results(tokens, query, dork_file_path):
"""Wraps the dorker.search generator and formats output for SSE."""
try:
with open(dork_file_path, 'r', encoding='utf-8') as f:
dorks = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
yield f"data: [ERROR] Dorks file not found: {dork_file_path}\n\n"
yield f"data: [STREAM_COMPLETE]\n\n"
return
async for result_line in search(tokens, query, dorks):
yield f"data: {result_line}\n\n"
await asyncio.sleep(0.01)
yield f"data: [STREAM_COMPLETE]\n\n"
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/run", response_class=StreamingResponse)
async def run_dorker_stream(query: str = Query(...), dork_file: str = Query(...)):
tokens_str = os.environ.get("GHA_TOKENS", "")
tokens = [token.strip() for token in tokens_str.split(',') if token.strip()]
if not tokens:
async def error_generator():
yield "data: [ERROR] GHA_TOKENS is not set in the Space secrets.\n\n"
yield "data: [STREAM_COMPLETE]\n\n"
return StreamingResponse(error_generator(), media_type="text/event-stream")
return StreamingResponse(stream_dorker_results(tokens, query, dork_file), media_type="text/event-stream")