"""VOMEBOOK Search Space backend for txt bundles.""" from __future__ import annotations import gzip import json import posixpath import random import re import time from contextlib import asynccontextmanager from datetime import datetime from pathlib import Path from typing import Optional from urllib.parse import quote from fastapi import FastAPI, Query from fastapi import HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel BASE_DIR = Path(__file__).resolve().parent DATA_PATH = BASE_DIR / "data/search_data.json.gz" FOLDER_TREE_PATH = BASE_DIR / "data/folder_tree.json.gz" FOLDER_BROWSER_PATH = BASE_DIR / "data/folder_browser.json.gz" FULLTEXT_MANIFEST_PATH = BASE_DIR / "data/fulltext_manifest.json.gz" records: list[dict] = [] record_map: dict[str, dict] = {} sources: list[dict] = [] source_counts: dict[str, int] = {} folder_tree_data: dict[str, list[dict]] = {} folder_browser_data: dict[str, dict[str, dict]] = {} source_records_map: dict[str, list[int]] = {} extension_counts: dict[str, int] = {} vocab_by_len: dict[int, dict[str, int]] = {} word_index: dict[str, set[int]] = {} did_you_mean_vocab: dict[str, int] = {} def tokenize(text: str) -> list[str]: text_lower = (text or "").lower() return list(set(re.findall(r"[a-z0-9]+|[\u4e00-\u9fff\u3400-\u4dbf]+", text_lower))) def build_first_match_snippet(text: str, query: str, window: int = 110) -> str: query = (query or "").strip() if not text or not query: return "" tokens = [token for token in tokenize(query) if token] if not tokens: tokens = [query.lower()] text_lower = text.lower() best_index = None best_token = "" for token in tokens: idx = text_lower.find(token.lower()) if idx != -1 and (best_index is None or idx < best_index): best_index = idx best_token = token if best_index is None: condensed = re.sub(r"\s+", " ", text).strip() return condensed[: window * 2] start = max(0, best_index - window) end = min(len(text), best_index + max(len(best_token), 1) + window) snippet = re.sub(r"\s+", " ", text[start:end]).strip() if start > 0: snippet = "..." + snippet if end < len(text): snippet = snippet + "..." return snippet def edit_distance(s1: str, s2: str, max_dist: int = 2) -> int: if abs(len(s1) - len(s2)) > max_dist: return 999 prev = list(range(len(s2) + 1)) for i, c1 in enumerate(s1): curr = [i + 1] for j, c2 in enumerate(s2): cost = 0 if c1 == c2 else 1 curr.append(min(prev[j + 1] + 1, curr[j] + 1, prev[j] + cost)) if min(curr) > max_dist: return 999 prev = curr return prev[-1] def load_json_gz(path: Path): return json.loads(gzip.decompress(path.read_bytes()).decode("utf-8")) def build_indexes() -> None: global word_index, did_you_mean_vocab, vocab_by_len, source_records_map, extension_counts word_index = {} did_you_mean_vocab = {} vocab_by_len = {} source_records_map = {} extension_counts = {} for idx, rec in enumerate(records): source_records_map.setdefault(rec["source"], []).append(idx) extension_counts[rec["extension"]] = extension_counts.get(rec["extension"], 0) + 1 file_text = " ".join([rec["display_name"], rec["source_name"]]) path_text = rec["display_rel_path"] tokens = tokenize(f"{file_text} {path_text}") rec["_file_search_text"] = file_text.lower() rec["_path_search_text"] = path_text.lower() rec["_search_text"] = f"{file_text} {path_text}".lower() for token in tokens: word_index.setdefault(token, set()).add(idx) did_you_mean_vocab[token] = did_you_mean_vocab.get(token, 0) + 1 for token, freq in did_you_mean_vocab.items(): vocab_by_len.setdefault(len(token), {})[token] = freq def load_data() -> None: global records, record_map, sources, source_counts, folder_tree_data, folder_browser_data start = time.time() payload = load_json_gz(DATA_PATH) records = payload.get("records", []) sources = payload.get("sources", []) source_counts = {item["slug"]: item.get("count", 0) for item in sources} record_map = {rec["doc_id"]: rec for rec in records} folder_tree_data = load_json_gz(FOLDER_TREE_PATH) folder_browser_data = load_json_gz(FOLDER_BROWSER_PATH) build_indexes() print(f"loaded {len(records)} txt records in {time.time() - start:.2f}s") def score_record(idx: int, query_tokens: list[str], search_paths: bool = True) -> int: score = 0 file_text = records[idx]["_file_search_text"] path_text = records[idx]["_path_search_text"] for token in query_tokens: if token in file_text: score += 3 if search_paths and token in path_text: score += 2 return score def apply_filters(indices: list[int], sources_filter=None, folders=None, min_size=None, max_size=None): result = [] for idx in indices: rec = records[idx] if sources_filter and rec["source"] not in sources_filter: continue if folders: rel_dir = "/".join(rec.get("display_dirs", [])) matched = any(rel_dir == folder or rel_dir.startswith(folder + "/") for folder in folders) if not matched: continue size = rec.get("size") or 0 if min_size is not None and size < min_size: continue if max_size is not None and size > max_size: continue result.append(idx) return result def trim_record(rec: dict) -> dict: return { "doc_id": rec["doc_id"], "Source": rec["source"], "SourceName": rec["source_name"], "File": rec["display_name"], "Extension": rec["extension"], "Folder": rec["display_dirs"], "DisplayPath": rec["display_rel_path"], "Size": rec["size"], "HasTxt": True, } def search(q="", sources_filter=None, folders=None, min_size=None, max_size=None, page=1, page_size=100, sort="relevance", exact=False, search_paths=True): q = q.strip() if not q: indices = list(range(len(records))) filtered = apply_filters(indices, sources_filter, folders, min_size, max_size) else: tokens = tokenize(q) if exact: if search_paths: indices = [idx for idx, rec in enumerate(records) if q.lower() in rec["_search_text"]] else: indices = [idx for idx, rec in enumerate(records) if q.lower() in rec["_file_search_text"]] else: matched = set() for token in tokens: if token in word_index: if search_paths: matched.update(word_index[token]) else: matched.update(idx for idx in word_index[token] if token in records[idx]["_file_search_text"]) continue tok_len = len(token) for delta in (-2, -1, 0, 1, 2): for vocab_word in vocab_by_len.get(tok_len + delta, {}): if edit_distance(token, vocab_word) <= 2: if search_paths: matched.update(word_index.get(vocab_word, set())) else: matched.update(idx for idx in word_index.get(vocab_word, set()) if vocab_word in records[idx]["_file_search_text"]) if len(matched) > 5000: break indices = list(matched) filtered = apply_filters(indices, sources_filter, folders, min_size, max_size) if sort == "relevance": filtered.sort(key=lambda idx: (-score_record(idx, tokens, search_paths), records[idx]["display_rel_path"].lower())) if sort == "name": filtered.sort(key=lambda idx: records[idx]["display_rel_path"].lower()) elif sort == "size": filtered.sort(key=lambda idx: (-records[idx].get("size", 0), records[idx]["display_rel_path"].lower())) total = len(filtered) start = (page - 1) * page_size result_items = [trim_record(records[idx]) for idx in filtered[start:start + page_size]] return {"results": result_items, "total": total, "page": page, "page_size": page_size, "did_you_mean": None} def get_doc_storage_path(doc: dict) -> Path: return BASE_DIR / doc["storage_root"] / doc["storage_rel_path"] def get_folder_contents(source_slug: str, path: str) -> dict: source_browser = folder_browser_data.get(source_slug, {}) if path in source_browser: entry = dict(source_browser[path]) entry["current_path"] = path return entry return {"folders": [], "files": [], "current_path": path} @asynccontextmanager async def lifespan(app: FastAPI): load_data() yield app = FastAPI(title="VOMEBOOK Search", version="1.0", lifespan=lifespan) app.add_middleware(GZipMiddleware, minimum_size=500) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) class SearchRequest(BaseModel): q: str = "" sources: Optional[list[str]] = None folders: Optional[list[str]] = None min_size: Optional[int] = None max_size: Optional[int] = None page: int = 1 page_size: int = 100 sort: str = "relevance" exact: bool = False search_paths: bool = True class ZipRequest(BaseModel): doc_ids: list[str] = [] @app.post("/api/search") def api_search(body: SearchRequest): return JSONResponse(search(body.q, body.sources, body.folders, body.min_size, body.max_size, body.page, body.page_size, body.sort, body.exact, body.search_paths)) @app.post("/api/search/{source_slug}") def api_search_source(source_slug: str, body: SearchRequest): if source_slug not in source_counts: return JSONResponse({"error": "source not found", "results": [], "total": 0}, status_code=404) return JSONResponse(search(body.q, [source_slug], body.folders, body.min_size, body.max_size, body.page, body.page_size, body.sort, body.exact, body.search_paths)) @app.get("/api/sources") def api_sources(): return JSONResponse(sources) @app.get("/api/extensions") def api_extensions(source: Optional[str] = Query(default=None)): pool = [records[idx] for idx in source_records_map.get(source, [])] if source else records counts = {} for rec in pool: counts[rec["extension"]] = counts.get(rec["extension"], 0) + 1 return JSONResponse([{"name": ext, "count": count} for ext, count in sorted(counts.items())]) @app.get("/api/folders/{source_slug}") def api_folders(source_slug: str): return JSONResponse(folder_tree_data.get(source_slug, [])) @app.get("/api/folders/{source_slug}/contents") def api_folder_contents(source_slug: str, path: str = Query(default="")): return JSONResponse(get_folder_contents(source_slug, path)) @app.get("/api/random") def api_random(source: Optional[str] = Query(default=None)): pool = [records[idx] for idx in source_records_map.get(source, [])] if source else records if not pool: return JSONResponse({"error": "no record"}, status_code=404) return JSONResponse(trim_record(random.choice(pool))) @app.get("/api/preview/{doc_id}") def api_preview(doc_id: str): rec = record_map.get(doc_id) if not rec: return JSONResponse({"error": "not found"}, status_code=404) file_path = get_doc_storage_path(rec) if not file_path.exists(): return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404) text = file_path.read_text(encoding="utf-8", errors="ignore") return JSONResponse({"doc_id": doc_id, "title": rec["display_name"], "path": rec["display_rel_path"], "source": rec["source_name"], "text": text}) @app.get("/api/snippet/{doc_id}") def api_snippet(doc_id: str, q: str = Query(default="")): rec = record_map.get(doc_id) if not rec: return JSONResponse({"error": "not found"}, status_code=404) file_path = get_doc_storage_path(rec) if not file_path.exists(): return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404) text = file_path.read_text(encoding="utf-8", errors="ignore") return JSONResponse({"doc_id": doc_id, "snippet": build_first_match_snippet(text, q)}) @app.get("/api/download/{doc_id}") def api_download(doc_id: str): rec = record_map.get(doc_id) if not rec: return JSONResponse({"error": "not found"}, status_code=404) file_path = get_doc_storage_path(rec) if not file_path.exists(): return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404) encoded_filename = quote(f"{rec['display_name']}.txt", safe="") return StreamingResponse( iter([file_path.read_bytes()]), media_type="text/plain; charset=utf-8", headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"}, ) @app.post("/api/zip") def api_zip(req: ZipRequest): valid = [] for doc_id in req.doc_ids[:500]: rec = record_map.get(doc_id) if not rec: continue file_path = get_doc_storage_path(rec) if not file_path.exists(): continue valid.append((rec["display_rel_path"], file_path.read_bytes())) if not valid: return JSONResponse({"error": "no files"}, status_code=400) try: import io import zipfile buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: for display_path, data in valid: zf.writestr(display_path, data) buf.seek(0) def chunks(): while True: chunk = buf.read(65536) if not chunk: break yield chunk return StreamingResponse( chunks(), media_type="application/zip", headers={"Content-Disposition": "attachment; filename=vomebook_batch.zip"}, ) except Exception as exc: print(f"zip build failed: {exc}") return JSONResponse({"error": f"zip failed: {exc}"}, status_code=500) @app.get("/api/fulltext-manifest") def api_fulltext_manifest(): return JSONResponse(load_json_gz(FULLTEXT_MANIFEST_PATH)) app.mount("/data", StaticFiles(directory=str(BASE_DIR / "data")), name="data") app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static"), html=True), name="static") app.mount("/icons", StaticFiles(directory=str(BASE_DIR / "static/icons")), name="icons") @app.get("/manifest.json") def serve_manifest(): return JSONResponse(json.loads((BASE_DIR / "static/manifest.json").read_text(encoding="utf-8"))) @app.get("/sw.js") def serve_sw(): return PlainTextResponse((BASE_DIR / "static/sw.js").read_text(encoding="utf-8"), media_type="application/javascript") @app.get("/{rest_of_path:path}") async def serve_spa(rest_of_path: str): return HTMLResponse((BASE_DIR / "static/index.html").read_text(encoding="utf-8"))