File size: 3,977 Bytes
cb5a5c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
732e196
cb5a5c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
732e196
cb5a5c5
 
 
732e196
cb5a5c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
732e196
cb5a5c5
 
 
 
 
 
 
 
732e196
cb5a5c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
Nexus Gateway — data processing service.
"""

import os
import shutil
import signal
import subprocess
import threading
from contextlib import asynccontextmanager
from typing import Generator

from fastapi import FastAPI, Header, HTTPException, Query
from fastapi.responses import JSONResponse, StreamingResponse
from starlette.concurrency import iterate_in_threadpool

import config
from downloader import download_files

# ── App ───────────────────────────────────────────────────────────────────────


@asynccontextmanager
async def lifespan(app: FastAPI):
    thread = threading.Thread(target=download_files, daemon=True)
    thread.start()
    yield


app = FastAPI(
    title="Nexus Gateway",
    description="Internal data processing service.",
    version="3.2.1",
    lifespan=lifespan,
)


def stream_ripgrep_results(search_term: str, limit: int = 10) -> Generator[bytes, None, None]:
    rg_path = shutil.which("rg")
    if rg_path is None:
        yield b'{"error": "ripgrep not installed"}\n'
        return

    if not os.path.isdir(config.DATA_DIR):
        yield b'{"error": "Data directory not found"}\n'
        return

    cmd = [
        rg_path,
        "-F",              # fixed string
        "--threads=0",     # use all cores
        "--line-buffered", # flush matches immediately
        "--no-heading",
        "--no-filename",   # DO NOT output file paths
        "--color=never",
        "--max-columns=0",
        "--binary",
        f"--max-count={limit}",  # Dynamic limit per file

        search_term,
        config.DATA_DIR,
    ]

    # Prevent mmap to avoid RAM spikes on large files
    env = os.environ.copy()
    env["RIPGREP_CONFIG_PATH"] = ""
    cmd.insert(1, "--no-mmap")

    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
        bufsize=0,
        text=False,
        env=env,
    )

    try:
        for line in proc.stdout:
            line = line.rstrip(b"\n")
            if line:
                yield line + b"\n"
    except GeneratorExit:
        # Kill ripgrep if client disconnects
        proc.send_signal(signal.SIGTERM)
    finally:
        proc.stdout.close()
        proc.wait()


# ── Routes ────────────────────────────────────────────────────────────────────


@app.get("/")
async def root():
    return {"status": "ok", "message": "Nexus Gateway v3.2.1"}


@app.get("/status")
async def download_status(
    x_api_key: str = Header(..., description="Access token"),
):
    if x_api_key != config.API_KEY:
        raise HTTPException(status_code=403, detail="Invalid API key.")

    from downloader import status
    return JSONResponse(content=status)


@app.get("/search")
async def search(
    x_api_key: str = Header(..., description="Access token"),
    search: str = Query(..., min_length=1, description="Query string"),
    limit: int = Query(10, gt=0, description="Max matches per file"),
):
    if x_api_key != config.API_KEY:
        raise HTTPException(status_code=403, detail="Invalid API key.")

    if not os.path.isdir(config.DATA_DIR):
        raise HTTPException(status_code=503, detail="Data directory not found.")

    return StreamingResponse(
        iterate_in_threadpool(stream_ripgrep_results(search, limit=limit)),
        media_type="text/plain",
    )


# ── Entry point ───────────────────────────────────────────────────────────────

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "server:app",
        host=config.HOST,
        port=config.PORT,
        reload=True,
    )