""" Nexus Gateway — data processing service. """ import os import shutil import signal import subprocess import threading from contextlib import asynccontextmanager from typing import Generator from fastapi import FastAPI, Header, HTTPException, Query from fastapi.responses import JSONResponse, StreamingResponse from starlette.concurrency import iterate_in_threadpool import config from downloader import download_files # ── App ─────────────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): thread = threading.Thread(target=download_files, daemon=True) thread.start() yield app = FastAPI( title="Nexus Gateway", description="Internal data processing service.", version="3.2.1", lifespan=lifespan, ) def stream_ripgrep_results(search_term: str, limit: int = 10) -> Generator[bytes, None, None]: rg_path = shutil.which("rg") if rg_path is None: yield b'{"error": "ripgrep not installed"}\n' return if not os.path.isdir(config.DATA_DIR): yield b'{"error": "Data directory not found"}\n' return cmd = [ rg_path, "-F", # fixed string "--threads=0", # use all cores "--line-buffered", # flush matches immediately "--no-heading", "--no-filename", # DO NOT output file paths "--color=never", "--max-columns=0", "--binary", f"--max-count={limit}", # Dynamic limit per file search_term, config.DATA_DIR, ] # Prevent mmap to avoid RAM spikes on large files env = os.environ.copy() env["RIPGREP_CONFIG_PATH"] = "" cmd.insert(1, "--no-mmap") proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0, text=False, env=env, ) try: for line in proc.stdout: line = line.rstrip(b"\n") if line: yield line + b"\n" except GeneratorExit: # Kill ripgrep if client disconnects proc.send_signal(signal.SIGTERM) finally: proc.stdout.close() proc.wait() # ── Routes ──────────────────────────────────────────────────────────────────── @app.get("/") async def root(): return {"status": "ok", "message": "Nexus Gateway v3.2.1"} @app.get("/status") async def download_status( x_api_key: str = Header(..., description="Access token"), ): if x_api_key != config.API_KEY: raise HTTPException(status_code=403, detail="Invalid API key.") from downloader import status return JSONResponse(content=status) @app.get("/search") async def search( x_api_key: str = Header(..., description="Access token"), search: str = Query(..., min_length=1, description="Query string"), limit: int = Query(10, gt=0, description="Max matches per file"), ): if x_api_key != config.API_KEY: raise HTTPException(status_code=403, detail="Invalid API key.") if not os.path.isdir(config.DATA_DIR): raise HTTPException(status_code=503, detail="Data directory not found.") return StreamingResponse( iterate_in_threadpool(stream_ripgrep_results(search, limit=limit)), media_type="text/plain", ) # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run( "server:app", host=config.HOST, port=config.PORT, reload=True, )