Spaces:
Paused
Paused
| """ | |
| Nexus Gateway — data processing service. | |
| """ | |
| import os | |
| import shutil | |
| import signal | |
| import subprocess | |
| import threading | |
| from contextlib import asynccontextmanager | |
| from typing import Generator | |
| from fastapi import FastAPI, Header, HTTPException, Query | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from starlette.concurrency import iterate_in_threadpool | |
| import config | |
| from downloader import download_files | |
| # ── App ─────────────────────────────────────────────────────────────────────── | |
| async def lifespan(app: FastAPI): | |
| thread = threading.Thread(target=download_files, daemon=True) | |
| thread.start() | |
| yield | |
| app = FastAPI( | |
| title="Nexus Gateway", | |
| description="Internal data processing service.", | |
| version="3.2.1", | |
| lifespan=lifespan, | |
| ) | |
| def stream_ripgrep_results(search_term: str, limit: int = 10) -> Generator[bytes, None, None]: | |
| rg_path = shutil.which("rg") | |
| if rg_path is None: | |
| yield b'{"error": "ripgrep not installed"}\n' | |
| return | |
| if not os.path.isdir(config.DATA_DIR): | |
| yield b'{"error": "Data directory not found"}\n' | |
| return | |
| cmd = [ | |
| rg_path, | |
| "-F", # fixed string | |
| "--threads=0", # use all cores | |
| "--line-buffered", # flush matches immediately | |
| "--no-heading", | |
| "--no-filename", # DO NOT output file paths | |
| "--color=never", | |
| "--max-columns=0", | |
| "--binary", | |
| f"--max-count={limit}", # Dynamic limit per file | |
| search_term, | |
| config.DATA_DIR, | |
| ] | |
| # Prevent mmap to avoid RAM spikes on large files | |
| env = os.environ.copy() | |
| env["RIPGREP_CONFIG_PATH"] = "" | |
| cmd.insert(1, "--no-mmap") | |
| proc = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.DEVNULL, | |
| bufsize=0, | |
| text=False, | |
| env=env, | |
| ) | |
| try: | |
| for line in proc.stdout: | |
| line = line.rstrip(b"\n") | |
| if line: | |
| yield line + b"\n" | |
| except GeneratorExit: | |
| # Kill ripgrep if client disconnects | |
| proc.send_signal(signal.SIGTERM) | |
| finally: | |
| proc.stdout.close() | |
| proc.wait() | |
| # ── Routes ──────────────────────────────────────────────────────────────────── | |
| async def root(): | |
| return {"status": "ok", "message": "Nexus Gateway v3.2.1"} | |
| async def download_status( | |
| x_api_key: str = Header(..., description="Access token"), | |
| ): | |
| if x_api_key != config.API_KEY: | |
| raise HTTPException(status_code=403, detail="Invalid API key.") | |
| from downloader import status | |
| return JSONResponse(content=status) | |
| async def search( | |
| x_api_key: str = Header(..., description="Access token"), | |
| search: str = Query(..., min_length=1, description="Query string"), | |
| limit: int = Query(10, gt=0, description="Max matches per file"), | |
| ): | |
| if x_api_key != config.API_KEY: | |
| raise HTTPException(status_code=403, detail="Invalid API key.") | |
| if not os.path.isdir(config.DATA_DIR): | |
| raise HTTPException(status_code=503, detail="Data directory not found.") | |
| return StreamingResponse( | |
| iterate_in_threadpool(stream_ripgrep_results(search, limit=limit)), | |
| media_type="text/plain", | |
| ) | |
| # ── Entry point ─────────────────────────────────────────────────────────────── | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "server:app", | |
| host=config.HOST, | |
| port=config.PORT, | |
| reload=True, | |
| ) | |