""" ragstudio backend. Single long-running HTTP server that owns the searchers (and therefore the loaded model weights). Both the UI and the CLI talk to it over HTTP. Routes: GET / -> ui/static/index.html GET /static/ -> ui static asset GET /api/modalities -> {"modalities": [...], "groups": {...}} GET /api/search/?q=&k= -> {modality, kind, query, hits:[{score,path}]} GET /api/file?path= -> serve a file referenced in any index's *_meta.json (with stale-path healing) Usage: python backend/server.py [--host 127.0.0.1] [--port 8000] """ import argparse import json import mimetypes import os import re import sys import threading import time import urllib.parse from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path SYNC_INTERVAL_S = 30 # Video meta entries are stored as " @ s"; strip the timestamp so the # allow-list contains the underlying file path. _VIDEO_FRAME_SUFFIX = re.compile(r" @ \d+(?:\.\d+)?s$") def _strip_frame_suffix(entry: str) -> str: return _VIDEO_FRAME_SUFFIX.sub("", entry) REPO_ROOT = Path(__file__).resolve().parent.parent INDEXING_DIR = REPO_ROOT / "indexing" INDEX_DATA_DIR = INDEXING_DIR / "index_data" UI_STATIC_DIR = REPO_ROOT / "ui" / "static" def _source_root() -> Path | None: """Folder originally passed to build_index(), recorded by index.py.""" manifest = INDEX_DATA_DIR / "_source.txt" if not manifest.exists(): return None p = Path(manifest.read_text().strip()) return p if p.is_dir() else None # Searchers use relative `index_data/`; cd into indexing/ so paths resolve. os.chdir(INDEXING_DIR) sys.path.insert(0, str(INDEXING_DIR)) from searchers import GROUPS, SEARCHERS # noqa: E402 def _modality_kind(name: str) -> str: if name == "image": return "image" if name == "video": return "video" return "text" def _load_allowed_paths() -> set[str]: """Union of every path referenced in any *_meta.json — the file allow-list.""" allowed: set[str] = set() for meta in (INDEXING_DIR / "index_data").glob("*_meta.json"): try: data = json.loads(meta.read_text()) except Exception: continue if isinstance(data, list): for entry in data: if isinstance(entry, str): allowed.add(_strip_frame_suffix(entry)) elif isinstance(entry, dict) and "path" in entry: allowed.add(_strip_frame_suffix(entry["path"])) return allowed ALLOWED_PATHS = _load_allowed_paths() # basename -> actual filesystem path, for healing stale absolute paths when # indexed files have since been moved. Built lazily on first miss. _BASENAME_INDEX: dict[str, str] | None = None def _build_basename_index() -> dict[str, str]: needed = {Path(p).name for p in ALLOWED_PATHS} out: dict[str, str] = {} seen: set[Path] = set() for root in (INDEXING_DIR, REPO_ROOT): try: root = root.resolve() except Exception: continue if root in seen: continue seen.add(root) for path in root.rglob("*"): if not path.is_file(): continue name = path.name if name in needed and name not in out: out[name] = str(path) return out _EXCERPT_CONTEXT_LINES = 4 _EXCERPT_LINE_CHARS = 240 _EXCERPT_TOKEN_RE = re.compile(r"\w+") # Split text into sentence-ish candidate units. Falls back to whole lines if # the file already uses line breaks meaningfully. _SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+(?=[A-Za-z0-9])") def _split_candidates(text: str) -> list[str]: by_line = [line.strip() for line in text.splitlines() if line.strip()] # If newlines already chop the text into multiple pieces, trust them. if len(by_line) >= 4: return by_line # Otherwise treat each line as a paragraph and split it into sentences. out: list[str] = [] for line in by_line: out.extend(s.strip() for s in _SENTENCE_SPLIT_RE.split(line) if s.strip()) return out def _cap_line(s: str) -> str: return s if len(s) <= _EXCERPT_LINE_CHARS else s[: _EXCERPT_LINE_CHARS] + "…" def _bm25_match_idx(query: str, lines: list[str]) -> int: q_tokens = set(_EXCERPT_TOKEN_RE.findall(query.lower())) if not q_tokens: return 0 best_i, best_count = 0, -1 for i, line in enumerate(lines): tokens = _EXCERPT_TOKEN_RE.findall(line.lower()) count = sum(1 for t in tokens if t in q_tokens) if count > best_count: best_i, best_count = i, count return best_i def _semantic_match_idx(query: str, lines: list[str]) -> int: # Reuse the model already loaded by the semantic-text searcher so we don't # double the VRAM/load time. This is a private import on purpose. from searchers.semantic_text import _get_model import numpy as np model = _get_model() q_emb = model.encode([query], convert_to_numpy=True, normalize_embeddings=True) line_emb = model.encode( lines, convert_to_numpy=True, normalize_embeddings=True, batch_size=64, show_progress_bar=False, ) sims = (line_emb @ q_emb.T).flatten() return int(np.argmax(sims)) def _text_excerpt(modality: str, query: str, requested: str) -> dict | None: """Three-part snippet ({before, match, after}) centered on the line most relevant to the query for this modality.""" resolved = _resolve_path(requested) if resolved is None: return None try: raw = resolved.read_text(encoding="utf-8", errors="replace") except OSError: return None candidates = _split_candidates(raw) if not candidates: return None if modality == "semantic-text": center = _semantic_match_idx(query, candidates) else: center = _bm25_match_idx(query, candidates) start = max(0, center - _EXCERPT_CONTEXT_LINES) end = min(len(candidates), center + _EXCERPT_CONTEXT_LINES + 1) return { "before": "\n".join(_cap_line(c) for c in candidates[start:center]), "match": _cap_line(candidates[center]), "after": "\n".join(_cap_line(c) for c in candidates[center + 1 : end]), } def _resolve_path(requested: str) -> Path | None: p = Path(requested) if p.is_absolute() and p.is_file(): return p # Indexer stores paths relative to the original source folder. Try: # 1. /source/ -- copy pulled from the bucket # 2. <_source.txt>/ -- original folder on this machine if not p.is_absolute(): bucket_copy = INDEX_DATA_DIR / "source" / p if bucket_copy.is_file(): return bucket_copy src = _source_root() if src is not None: local_copy = src / p if local_copy.is_file(): return local_copy # Last-resort: basename match anywhere under the repo, for indexes that # still hold stale absolute paths or whose source folder has moved. global _BASENAME_INDEX if _BASENAME_INDEX is None: _BASENAME_INDEX = _build_basename_index() hit = _BASENAME_INDEX.get(p.name) return Path(hit) if hit else None class Handler(BaseHTTPRequestHandler): def handle(self): try: super().handle() except BrokenPipeError: pass def log_message(self, fmt, *args): sys.stderr.write(f"[backend] {self.address_string()} - {fmt % args}\n") def _send_json(self, status: int, payload): body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(body) def _send_file(self, path: Path, status: int = 200): if not path.is_file(): self._send_json(404, {"error": f"not found: {path}"}) return ctype, _ = mimetypes.guess_type(str(path)) ctype = ctype or "application/octet-stream" size = path.stat().st_size range_hdr = self.headers.get("Range") if range_hdr and range_hdr.startswith("bytes="): # Parse byte range (only single ranges supported). spec = range_hdr[len("bytes="):] start_s, _, end_s = spec.partition("-") start = int(start_s) if start_s else 0 end = int(end_s) if end_s else size - 1 end = min(end, size - 1) length = end - start + 1 with open(path, "rb") as f: f.seek(start) data = f.read(length) self.send_response(206) self.send_header("Content-Type", ctype) self.send_header("Content-Range", f"bytes {start}-{end}/{size}") self.send_header("Content-Length", str(length)) self.send_header("Accept-Ranges", "bytes") self.end_headers() self.wfile.write(data) else: data = path.read_bytes() self.send_response(status) self.send_header("Content-Type", ctype) self.send_header("Content-Length", str(len(data))) self.send_header("Accept-Ranges", "bytes") self.end_headers() self.wfile.write(data) def _run_search(self, modality: str, query: str, top_k: int): fn = SEARCHERS.get(modality) if fn is None: self._send_json(404, {"error": f"unknown modality: {modality}"}) return kind = _modality_kind(modality) try: hits = fn(query, top_k) except Exception as exc: self._send_json( 500, { "modality": modality, "kind": kind, "query": query, "error": str(exc), "hits": [], }, ) return out = [] for s, p in hits: entry = {"score": float(s), "path": p} if kind == "text": excerpt = _text_excerpt(modality, query, p) if excerpt: entry["excerpt"] = excerpt out.append(entry) self._send_json( 200, { "modality": modality, "kind": kind, "query": query, "hits": out, }, ) def do_GET(self): parsed = urllib.parse.urlparse(self.path) route = parsed.path qs = urllib.parse.parse_qs(parsed.query) # static UI if route in ("/", "/index.html"): self._send_file(UI_STATIC_DIR / "index.html") return if route.startswith("/static/"): target = (UI_STATIC_DIR / route[len("/static/") :]).resolve() root = UI_STATIC_DIR.resolve() if root != target and root not in target.parents: self._send_json(403, {"error": "forbidden"}) return self._send_file(target) return # API if route == "/api/modalities": self._send_json( 200, { "modalities": sorted(SEARCHERS.keys()), "groups": {k: list(v) for k, v in GROUPS.items()}, }, ) return if route.startswith("/api/search/"): modality = route[len("/api/search/") :] query = (qs.get("q", [""])[0] or "").strip() if not query: self._send_json(400, {"error": "missing q"}) return top_k = int(qs.get("k", ["8"])[0]) self._run_search(modality, query, top_k) return if route == "/api/file": path = qs.get("path", [""])[0] if not path: self._send_json(400, {"error": "missing path"}) return global ALLOWED_PATHS if path not in ALLOWED_PATHS: ALLOWED_PATHS = _load_allowed_paths() if path not in ALLOWED_PATHS: self._send_json(403, {"error": "path not in any index"}) return resolved = _resolve_path(path) if resolved is None: self._send_json(404, {"error": f"file missing on disk: {path}"}) return self._send_file(resolved) return self._send_json(404, {"error": f"no route for {route}"}) def _sync_loop() -> None: """Pull the shared index bucket into index_data/ every SYNC_INTERVAL_S.""" # `sync` lives in INDEXING_DIR which is already on sys.path. import sync as sync_mod while True: try: sync_mod.sync(pull=True) except Exception as exc: print(f"[sync] pull failed: {exc}", file=sys.stderr) time.sleep(SYNC_INTERVAL_S) def main() -> None: parser = argparse.ArgumentParser(description=__doc__.strip()) parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8000) args = parser.parse_args() threading.Thread(target=_sync_loop, daemon=True).start() server = ThreadingHTTPServer((args.host, args.port), Handler) print(f"ragstudio backend on http://{args.host}:{args.port}") print(f" cwd: {os.getcwd()}") print(f" searchers: {sorted(SEARCHERS)}") print(f" static UI: {UI_STATIC_DIR}") print(f" index files: {len(ALLOWED_PATHS)}") print(f" sync: pull every {SYNC_INTERVAL_S}s") try: server.serve_forever() except KeyboardInterrupt: print("\nshutting down") server.server_close() if __name__ == "__main__": main()