| """VOMEBOOK Search Space backend for txt bundles.""" |
|
|
| from __future__ import annotations |
|
|
| import gzip |
| import json |
| import posixpath |
| import random |
| import re |
| import time |
| from contextlib import asynccontextmanager |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Optional |
| from urllib.parse import quote |
|
|
| from fastapi import FastAPI, Query |
| from fastapi import HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.middleware.gzip import GZipMiddleware |
| from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel |
|
|
|
|
| BASE_DIR = Path(__file__).resolve().parent |
| DATA_PATH = BASE_DIR / "data/search_data.json.gz" |
| FOLDER_TREE_PATH = BASE_DIR / "data/folder_tree.json.gz" |
| FOLDER_BROWSER_PATH = BASE_DIR / "data/folder_browser.json.gz" |
| FULLTEXT_MANIFEST_PATH = BASE_DIR / "data/fulltext_manifest.json.gz" |
|
|
| records: list[dict] = [] |
| record_map: dict[str, dict] = {} |
| sources: list[dict] = [] |
| source_counts: dict[str, int] = {} |
| folder_tree_data: dict[str, list[dict]] = {} |
| folder_browser_data: dict[str, dict[str, dict]] = {} |
| source_records_map: dict[str, list[int]] = {} |
| extension_counts: dict[str, int] = {} |
| vocab_by_len: dict[int, dict[str, int]] = {} |
| word_index: dict[str, set[int]] = {} |
| did_you_mean_vocab: dict[str, int] = {} |
|
|
|
|
| def tokenize(text: str) -> list[str]: |
| text_lower = (text or "").lower() |
| return list(set(re.findall(r"[a-z0-9]+|[\u4e00-\u9fff\u3400-\u4dbf]+", text_lower))) |
|
|
|
|
| def build_first_match_snippet(text: str, query: str, window: int = 110) -> str: |
| query = (query or "").strip() |
| if not text or not query: |
| return "" |
| tokens = [token for token in tokenize(query) if token] |
| if not tokens: |
| tokens = [query.lower()] |
| text_lower = text.lower() |
| best_index = None |
| best_token = "" |
| for token in tokens: |
| idx = text_lower.find(token.lower()) |
| if idx != -1 and (best_index is None or idx < best_index): |
| best_index = idx |
| best_token = token |
| if best_index is None: |
| condensed = re.sub(r"\s+", " ", text).strip() |
| return condensed[: window * 2] |
| start = max(0, best_index - window) |
| end = min(len(text), best_index + max(len(best_token), 1) + window) |
| snippet = re.sub(r"\s+", " ", text[start:end]).strip() |
| if start > 0: |
| snippet = "..." + snippet |
| if end < len(text): |
| snippet = snippet + "..." |
| return snippet |
|
|
|
|
| def edit_distance(s1: str, s2: str, max_dist: int = 2) -> int: |
| if abs(len(s1) - len(s2)) > max_dist: |
| return 999 |
| prev = list(range(len(s2) + 1)) |
| for i, c1 in enumerate(s1): |
| curr = [i + 1] |
| for j, c2 in enumerate(s2): |
| cost = 0 if c1 == c2 else 1 |
| curr.append(min(prev[j + 1] + 1, curr[j] + 1, prev[j] + cost)) |
| if min(curr) > max_dist: |
| return 999 |
| prev = curr |
| return prev[-1] |
|
|
|
|
| def load_json_gz(path: Path): |
| return json.loads(gzip.decompress(path.read_bytes()).decode("utf-8")) |
|
|
|
|
| def build_indexes() -> None: |
| global word_index, did_you_mean_vocab, vocab_by_len, source_records_map, extension_counts |
| word_index = {} |
| did_you_mean_vocab = {} |
| vocab_by_len = {} |
| source_records_map = {} |
| extension_counts = {} |
| for idx, rec in enumerate(records): |
| source_records_map.setdefault(rec["source"], []).append(idx) |
| extension_counts[rec["extension"]] = extension_counts.get(rec["extension"], 0) + 1 |
| file_text = " ".join([rec["display_name"], rec["source_name"]]) |
| path_text = rec["display_rel_path"] |
| tokens = tokenize(f"{file_text} {path_text}") |
| rec["_file_search_text"] = file_text.lower() |
| rec["_path_search_text"] = path_text.lower() |
| rec["_search_text"] = f"{file_text} {path_text}".lower() |
| for token in tokens: |
| word_index.setdefault(token, set()).add(idx) |
| did_you_mean_vocab[token] = did_you_mean_vocab.get(token, 0) + 1 |
| for token, freq in did_you_mean_vocab.items(): |
| vocab_by_len.setdefault(len(token), {})[token] = freq |
|
|
|
|
| def load_data() -> None: |
| global records, record_map, sources, source_counts, folder_tree_data, folder_browser_data |
| start = time.time() |
| payload = load_json_gz(DATA_PATH) |
| records = payload.get("records", []) |
| sources = payload.get("sources", []) |
| source_counts = {item["slug"]: item.get("count", 0) for item in sources} |
| record_map = {rec["doc_id"]: rec for rec in records} |
| folder_tree_data = load_json_gz(FOLDER_TREE_PATH) |
| folder_browser_data = load_json_gz(FOLDER_BROWSER_PATH) |
| build_indexes() |
| print(f"loaded {len(records)} txt records in {time.time() - start:.2f}s") |
|
|
|
|
| def score_record(idx: int, query_tokens: list[str], search_paths: bool = True) -> int: |
| score = 0 |
| file_text = records[idx]["_file_search_text"] |
| path_text = records[idx]["_path_search_text"] |
| for token in query_tokens: |
| if token in file_text: |
| score += 3 |
| if search_paths and token in path_text: |
| score += 2 |
| return score |
|
|
|
|
| def apply_filters(indices: list[int], sources_filter=None, folders=None, min_size=None, max_size=None): |
| result = [] |
| for idx in indices: |
| rec = records[idx] |
| if sources_filter and rec["source"] not in sources_filter: |
| continue |
| if folders: |
| rel_dir = "/".join(rec.get("display_dirs", [])) |
| matched = any(rel_dir == folder or rel_dir.startswith(folder + "/") for folder in folders) |
| if not matched: |
| continue |
| size = rec.get("size") or 0 |
| if min_size is not None and size < min_size: |
| continue |
| if max_size is not None and size > max_size: |
| continue |
| result.append(idx) |
| return result |
|
|
|
|
| def trim_record(rec: dict) -> dict: |
| return { |
| "doc_id": rec["doc_id"], |
| "Source": rec["source"], |
| "SourceName": rec["source_name"], |
| "File": rec["display_name"], |
| "Extension": rec["extension"], |
| "Folder": rec["display_dirs"], |
| "DisplayPath": rec["display_rel_path"], |
| "Size": rec["size"], |
| "HasTxt": True, |
| } |
|
|
|
|
| def search(q="", sources_filter=None, folders=None, min_size=None, max_size=None, page=1, page_size=100, sort="relevance", exact=False, search_paths=True): |
| q = q.strip() |
| if not q: |
| indices = list(range(len(records))) |
| filtered = apply_filters(indices, sources_filter, folders, min_size, max_size) |
| else: |
| tokens = tokenize(q) |
| if exact: |
| if search_paths: |
| indices = [idx for idx, rec in enumerate(records) if q.lower() in rec["_search_text"]] |
| else: |
| indices = [idx for idx, rec in enumerate(records) if q.lower() in rec["_file_search_text"]] |
| else: |
| matched = set() |
| for token in tokens: |
| if token in word_index: |
| if search_paths: |
| matched.update(word_index[token]) |
| else: |
| matched.update(idx for idx in word_index[token] if token in records[idx]["_file_search_text"]) |
| continue |
| tok_len = len(token) |
| for delta in (-2, -1, 0, 1, 2): |
| for vocab_word in vocab_by_len.get(tok_len + delta, {}): |
| if edit_distance(token, vocab_word) <= 2: |
| if search_paths: |
| matched.update(word_index.get(vocab_word, set())) |
| else: |
| matched.update(idx for idx in word_index.get(vocab_word, set()) if vocab_word in records[idx]["_file_search_text"]) |
| if len(matched) > 5000: |
| break |
| indices = list(matched) |
| filtered = apply_filters(indices, sources_filter, folders, min_size, max_size) |
| if sort == "relevance": |
| filtered.sort(key=lambda idx: (-score_record(idx, tokens, search_paths), records[idx]["display_rel_path"].lower())) |
|
|
| if sort == "name": |
| filtered.sort(key=lambda idx: records[idx]["display_rel_path"].lower()) |
| elif sort == "size": |
| filtered.sort(key=lambda idx: (-records[idx].get("size", 0), records[idx]["display_rel_path"].lower())) |
|
|
| total = len(filtered) |
| start = (page - 1) * page_size |
| result_items = [trim_record(records[idx]) for idx in filtered[start:start + page_size]] |
| return {"results": result_items, "total": total, "page": page, "page_size": page_size, "did_you_mean": None} |
|
|
|
|
| def get_doc_storage_path(doc: dict) -> Path: |
| return BASE_DIR / doc["storage_root"] / doc["storage_rel_path"] |
|
|
|
|
| def get_folder_contents(source_slug: str, path: str) -> dict: |
| source_browser = folder_browser_data.get(source_slug, {}) |
| if path in source_browser: |
| entry = dict(source_browser[path]) |
| entry["current_path"] = path |
| return entry |
| return {"folders": [], "files": [], "current_path": path} |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| load_data() |
| yield |
|
|
|
|
| app = FastAPI(title="VOMEBOOK Search", version="1.0", lifespan=lifespan) |
| app.add_middleware(GZipMiddleware, minimum_size=500) |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) |
|
|
|
|
| class SearchRequest(BaseModel): |
| q: str = "" |
| sources: Optional[list[str]] = None |
| folders: Optional[list[str]] = None |
| min_size: Optional[int] = None |
| max_size: Optional[int] = None |
| page: int = 1 |
| page_size: int = 100 |
| sort: str = "relevance" |
| exact: bool = False |
| search_paths: bool = True |
|
|
|
|
| class ZipRequest(BaseModel): |
| doc_ids: list[str] = [] |
|
|
|
|
| @app.post("/api/search") |
| def api_search(body: SearchRequest): |
| return JSONResponse(search(body.q, body.sources, body.folders, body.min_size, body.max_size, body.page, body.page_size, body.sort, body.exact, body.search_paths)) |
|
|
|
|
| @app.post("/api/search/{source_slug}") |
| def api_search_source(source_slug: str, body: SearchRequest): |
| if source_slug not in source_counts: |
| return JSONResponse({"error": "source not found", "results": [], "total": 0}, status_code=404) |
| return JSONResponse(search(body.q, [source_slug], body.folders, body.min_size, body.max_size, body.page, body.page_size, body.sort, body.exact, body.search_paths)) |
|
|
|
|
| @app.get("/api/sources") |
| def api_sources(): |
| return JSONResponse(sources) |
|
|
|
|
| @app.get("/api/extensions") |
| def api_extensions(source: Optional[str] = Query(default=None)): |
| pool = [records[idx] for idx in source_records_map.get(source, [])] if source else records |
| counts = {} |
| for rec in pool: |
| counts[rec["extension"]] = counts.get(rec["extension"], 0) + 1 |
| return JSONResponse([{"name": ext, "count": count} for ext, count in sorted(counts.items())]) |
|
|
|
|
| @app.get("/api/folders/{source_slug}") |
| def api_folders(source_slug: str): |
| return JSONResponse(folder_tree_data.get(source_slug, [])) |
|
|
|
|
| @app.get("/api/folders/{source_slug}/contents") |
| def api_folder_contents(source_slug: str, path: str = Query(default="")): |
| return JSONResponse(get_folder_contents(source_slug, path)) |
|
|
|
|
| @app.get("/api/random") |
| def api_random(source: Optional[str] = Query(default=None)): |
| pool = [records[idx] for idx in source_records_map.get(source, [])] if source else records |
| if not pool: |
| return JSONResponse({"error": "no record"}, status_code=404) |
| return JSONResponse(trim_record(random.choice(pool))) |
|
|
|
|
| @app.get("/api/preview/{doc_id}") |
| def api_preview(doc_id: str): |
| rec = record_map.get(doc_id) |
| if not rec: |
| return JSONResponse({"error": "not found"}, status_code=404) |
| file_path = get_doc_storage_path(rec) |
| if not file_path.exists(): |
| return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404) |
| text = file_path.read_text(encoding="utf-8", errors="ignore") |
| return JSONResponse({"doc_id": doc_id, "title": rec["display_name"], "path": rec["display_rel_path"], "source": rec["source_name"], "text": text}) |
|
|
|
|
| @app.get("/api/snippet/{doc_id}") |
| def api_snippet(doc_id: str, q: str = Query(default="")): |
| rec = record_map.get(doc_id) |
| if not rec: |
| return JSONResponse({"error": "not found"}, status_code=404) |
| file_path = get_doc_storage_path(rec) |
| if not file_path.exists(): |
| return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404) |
| text = file_path.read_text(encoding="utf-8", errors="ignore") |
| return JSONResponse({"doc_id": doc_id, "snippet": build_first_match_snippet(text, q)}) |
|
|
|
|
| @app.get("/api/download/{doc_id}") |
| def api_download(doc_id: str): |
| rec = record_map.get(doc_id) |
| if not rec: |
| return JSONResponse({"error": "not found"}, status_code=404) |
| file_path = get_doc_storage_path(rec) |
| if not file_path.exists(): |
| return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404) |
| encoded_filename = quote(f"{rec['display_name']}.txt", safe="") |
| return StreamingResponse( |
| iter([file_path.read_bytes()]), |
| media_type="text/plain; charset=utf-8", |
| headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"}, |
| ) |
|
|
|
|
| @app.post("/api/zip") |
| def api_zip(req: ZipRequest): |
| valid = [] |
| for doc_id in req.doc_ids[:500]: |
| rec = record_map.get(doc_id) |
| if not rec: |
| continue |
| file_path = get_doc_storage_path(rec) |
| if not file_path.exists(): |
| continue |
| valid.append((rec["display_rel_path"], file_path.read_bytes())) |
| if not valid: |
| return JSONResponse({"error": "no files"}, status_code=400) |
|
|
| try: |
| import io |
| import zipfile |
|
|
| buf = io.BytesIO() |
| with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: |
| for display_path, data in valid: |
| zf.writestr(display_path, data) |
| buf.seek(0) |
|
|
| def chunks(): |
| while True: |
| chunk = buf.read(65536) |
| if not chunk: |
| break |
| yield chunk |
|
|
| return StreamingResponse( |
| chunks(), |
| media_type="application/zip", |
| headers={"Content-Disposition": "attachment; filename=vomebook_batch.zip"}, |
| ) |
| except Exception as exc: |
| print(f"zip build failed: {exc}") |
| return JSONResponse({"error": f"zip failed: {exc}"}, status_code=500) |
|
|
|
|
| @app.get("/api/fulltext-manifest") |
| def api_fulltext_manifest(): |
| return JSONResponse(load_json_gz(FULLTEXT_MANIFEST_PATH)) |
|
|
|
|
| app.mount("/data", StaticFiles(directory=str(BASE_DIR / "data")), name="data") |
| app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static"), html=True), name="static") |
| app.mount("/icons", StaticFiles(directory=str(BASE_DIR / "static/icons")), name="icons") |
|
|
|
|
| @app.get("/manifest.json") |
| def serve_manifest(): |
| return JSONResponse(json.loads((BASE_DIR / "static/manifest.json").read_text(encoding="utf-8"))) |
|
|
|
|
| @app.get("/sw.js") |
| def serve_sw(): |
| return PlainTextResponse((BASE_DIR / "static/sw.js").read_text(encoding="utf-8"), media_type="application/javascript") |
|
|
|
|
| @app.get("/{rest_of_path:path}") |
| async def serve_spa(rest_of_path: str): |
| return HTMLResponse((BASE_DIR / "static/index.html").read_text(encoding="utf-8")) |
|
|