Nexus-API-2 / server.py
AadityaPawarx1's picture
Upload 5 files
8017575 verified
"""
Nexus Gateway — data processing service.
"""
import os
import shutil
import signal
import subprocess
import threading
from contextlib import asynccontextmanager
from typing import Generator
from fastapi import FastAPI, Header, HTTPException, Query
from fastapi.responses import JSONResponse, StreamingResponse
from starlette.concurrency import iterate_in_threadpool
import config
from downloader import download_files
# ── App ───────────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
thread = threading.Thread(target=download_files, daemon=True)
thread.start()
yield
app = FastAPI(
title="Nexus Gateway",
description="Internal data processing service.",
version="3.2.1",
lifespan=lifespan,
)
def stream_ripgrep_results(search_term: str, limit: int = 10) -> Generator[bytes, None, None]:
rg_path = shutil.which("rg")
if rg_path is None:
yield b'{"error": "ripgrep not installed"}\n'
return
if not os.path.isdir(config.DATA_DIR):
yield b'{"error": "Data directory not found"}\n'
return
cmd = [
rg_path,
"-F", # fixed string
"--threads=0", # use all cores
"--line-buffered", # flush matches immediately
"--no-heading",
"--no-filename", # DO NOT output file paths
"--color=never",
"--max-columns=0",
"--binary",
f"--max-count={limit}", # Dynamic limit per file
search_term,
config.DATA_DIR,
]
# Prevent mmap to avoid RAM spikes on large files
env = os.environ.copy()
env["RIPGREP_CONFIG_PATH"] = ""
cmd.insert(1, "--no-mmap")
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
bufsize=0,
text=False,
env=env,
)
try:
for line in proc.stdout:
line = line.rstrip(b"\n")
if line:
yield line + b"\n"
except GeneratorExit:
# Kill ripgrep if client disconnects
proc.send_signal(signal.SIGTERM)
finally:
proc.stdout.close()
proc.wait()
# ── Routes ────────────────────────────────────────────────────────────────────
@app.get("/")
async def root():
return {"status": "ok", "message": "Nexus Gateway v3.2.1"}
@app.get("/status")
async def download_status(
x_api_key: str = Header(..., description="Access token"),
):
if x_api_key != config.API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key.")
from downloader import status
return JSONResponse(content=status)
@app.get("/search")
async def search(
x_api_key: str = Header(..., description="Access token"),
search: str = Query(..., min_length=1, description="Query string"),
limit: int = Query(10, gt=0, description="Max matches per file"),
):
if x_api_key != config.API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key.")
if not os.path.isdir(config.DATA_DIR):
raise HTTPException(status_code=503, detail="Data directory not found.")
return StreamingResponse(
iterate_in_threadpool(stream_ripgrep_results(search, limit=limit)),
media_type="text/plain",
)
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"server:app",
host=config.HOST,
port=config.PORT,
reload=True,
)