Search / app.py
vomebook's picture
Upload app.py
2b32469 verified
Raw
History Blame Contribute Delete
15.4 kB
"""VOMEBOOK Search Space backend for txt bundles."""
from __future__ import annotations
import gzip
import json
import posixpath
import random
import re
import time
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from typing import Optional
from urllib.parse import quote
from fastapi import FastAPI, Query
from fastapi import HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
BASE_DIR = Path(__file__).resolve().parent
DATA_PATH = BASE_DIR / "data/search_data.json.gz"
FOLDER_TREE_PATH = BASE_DIR / "data/folder_tree.json.gz"
FOLDER_BROWSER_PATH = BASE_DIR / "data/folder_browser.json.gz"
FULLTEXT_MANIFEST_PATH = BASE_DIR / "data/fulltext_manifest.json.gz"
records: list[dict] = []
record_map: dict[str, dict] = {}
sources: list[dict] = []
source_counts: dict[str, int] = {}
folder_tree_data: dict[str, list[dict]] = {}
folder_browser_data: dict[str, dict[str, dict]] = {}
source_records_map: dict[str, list[int]] = {}
extension_counts: dict[str, int] = {}
vocab_by_len: dict[int, dict[str, int]] = {}
word_index: dict[str, set[int]] = {}
did_you_mean_vocab: dict[str, int] = {}
def tokenize(text: str) -> list[str]:
text_lower = (text or "").lower()
return list(set(re.findall(r"[a-z0-9]+|[\u4e00-\u9fff\u3400-\u4dbf]+", text_lower)))
def build_first_match_snippet(text: str, query: str, window: int = 110) -> str:
query = (query or "").strip()
if not text or not query:
return ""
tokens = [token for token in tokenize(query) if token]
if not tokens:
tokens = [query.lower()]
text_lower = text.lower()
best_index = None
best_token = ""
for token in tokens:
idx = text_lower.find(token.lower())
if idx != -1 and (best_index is None or idx < best_index):
best_index = idx
best_token = token
if best_index is None:
condensed = re.sub(r"\s+", " ", text).strip()
return condensed[: window * 2]
start = max(0, best_index - window)
end = min(len(text), best_index + max(len(best_token), 1) + window)
snippet = re.sub(r"\s+", " ", text[start:end]).strip()
if start > 0:
snippet = "..." + snippet
if end < len(text):
snippet = snippet + "..."
return snippet
def edit_distance(s1: str, s2: str, max_dist: int = 2) -> int:
if abs(len(s1) - len(s2)) > max_dist:
return 999
prev = list(range(len(s2) + 1))
for i, c1 in enumerate(s1):
curr = [i + 1]
for j, c2 in enumerate(s2):
cost = 0 if c1 == c2 else 1
curr.append(min(prev[j + 1] + 1, curr[j] + 1, prev[j] + cost))
if min(curr) > max_dist:
return 999
prev = curr
return prev[-1]
def load_json_gz(path: Path):
return json.loads(gzip.decompress(path.read_bytes()).decode("utf-8"))
def build_indexes() -> None:
global word_index, did_you_mean_vocab, vocab_by_len, source_records_map, extension_counts
word_index = {}
did_you_mean_vocab = {}
vocab_by_len = {}
source_records_map = {}
extension_counts = {}
for idx, rec in enumerate(records):
source_records_map.setdefault(rec["source"], []).append(idx)
extension_counts[rec["extension"]] = extension_counts.get(rec["extension"], 0) + 1
file_text = " ".join([rec["display_name"], rec["source_name"]])
path_text = rec["display_rel_path"]
tokens = tokenize(f"{file_text} {path_text}")
rec["_file_search_text"] = file_text.lower()
rec["_path_search_text"] = path_text.lower()
rec["_search_text"] = f"{file_text} {path_text}".lower()
for token in tokens:
word_index.setdefault(token, set()).add(idx)
did_you_mean_vocab[token] = did_you_mean_vocab.get(token, 0) + 1
for token, freq in did_you_mean_vocab.items():
vocab_by_len.setdefault(len(token), {})[token] = freq
def load_data() -> None:
global records, record_map, sources, source_counts, folder_tree_data, folder_browser_data
start = time.time()
payload = load_json_gz(DATA_PATH)
records = payload.get("records", [])
sources = payload.get("sources", [])
source_counts = {item["slug"]: item.get("count", 0) for item in sources}
record_map = {rec["doc_id"]: rec for rec in records}
folder_tree_data = load_json_gz(FOLDER_TREE_PATH)
folder_browser_data = load_json_gz(FOLDER_BROWSER_PATH)
build_indexes()
print(f"loaded {len(records)} txt records in {time.time() - start:.2f}s")
def score_record(idx: int, query_tokens: list[str], search_paths: bool = True) -> int:
score = 0
file_text = records[idx]["_file_search_text"]
path_text = records[idx]["_path_search_text"]
for token in query_tokens:
if token in file_text:
score += 3
if search_paths and token in path_text:
score += 2
return score
def apply_filters(indices: list[int], sources_filter=None, folders=None, min_size=None, max_size=None):
result = []
for idx in indices:
rec = records[idx]
if sources_filter and rec["source"] not in sources_filter:
continue
if folders:
rel_dir = "/".join(rec.get("display_dirs", []))
matched = any(rel_dir == folder or rel_dir.startswith(folder + "/") for folder in folders)
if not matched:
continue
size = rec.get("size") or 0
if min_size is not None and size < min_size:
continue
if max_size is not None and size > max_size:
continue
result.append(idx)
return result
def trim_record(rec: dict) -> dict:
return {
"doc_id": rec["doc_id"],
"Source": rec["source"],
"SourceName": rec["source_name"],
"File": rec["display_name"],
"Extension": rec["extension"],
"Folder": rec["display_dirs"],
"DisplayPath": rec["display_rel_path"],
"Size": rec["size"],
"HasTxt": True,
}
def search(q="", sources_filter=None, folders=None, min_size=None, max_size=None, page=1, page_size=100, sort="relevance", exact=False, search_paths=True):
q = q.strip()
if not q:
indices = list(range(len(records)))
filtered = apply_filters(indices, sources_filter, folders, min_size, max_size)
else:
tokens = tokenize(q)
if exact:
if search_paths:
indices = [idx for idx, rec in enumerate(records) if q.lower() in rec["_search_text"]]
else:
indices = [idx for idx, rec in enumerate(records) if q.lower() in rec["_file_search_text"]]
else:
matched = set()
for token in tokens:
if token in word_index:
if search_paths:
matched.update(word_index[token])
else:
matched.update(idx for idx in word_index[token] if token in records[idx]["_file_search_text"])
continue
tok_len = len(token)
for delta in (-2, -1, 0, 1, 2):
for vocab_word in vocab_by_len.get(tok_len + delta, {}):
if edit_distance(token, vocab_word) <= 2:
if search_paths:
matched.update(word_index.get(vocab_word, set()))
else:
matched.update(idx for idx in word_index.get(vocab_word, set()) if vocab_word in records[idx]["_file_search_text"])
if len(matched) > 5000:
break
indices = list(matched)
filtered = apply_filters(indices, sources_filter, folders, min_size, max_size)
if sort == "relevance":
filtered.sort(key=lambda idx: (-score_record(idx, tokens, search_paths), records[idx]["display_rel_path"].lower()))
if sort == "name":
filtered.sort(key=lambda idx: records[idx]["display_rel_path"].lower())
elif sort == "size":
filtered.sort(key=lambda idx: (-records[idx].get("size", 0), records[idx]["display_rel_path"].lower()))
total = len(filtered)
start = (page - 1) * page_size
result_items = [trim_record(records[idx]) for idx in filtered[start:start + page_size]]
return {"results": result_items, "total": total, "page": page, "page_size": page_size, "did_you_mean": None}
def get_doc_storage_path(doc: dict) -> Path:
return BASE_DIR / doc["storage_root"] / doc["storage_rel_path"]
def get_folder_contents(source_slug: str, path: str) -> dict:
source_browser = folder_browser_data.get(source_slug, {})
if path in source_browser:
entry = dict(source_browser[path])
entry["current_path"] = path
return entry
return {"folders": [], "files": [], "current_path": path}
@asynccontextmanager
async def lifespan(app: FastAPI):
load_data()
yield
app = FastAPI(title="VOMEBOOK Search", version="1.0", lifespan=lifespan)
app.add_middleware(GZipMiddleware, minimum_size=500)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
class SearchRequest(BaseModel):
q: str = ""
sources: Optional[list[str]] = None
folders: Optional[list[str]] = None
min_size: Optional[int] = None
max_size: Optional[int] = None
page: int = 1
page_size: int = 100
sort: str = "relevance"
exact: bool = False
search_paths: bool = True
class ZipRequest(BaseModel):
doc_ids: list[str] = []
@app.post("/api/search")
def api_search(body: SearchRequest):
return JSONResponse(search(body.q, body.sources, body.folders, body.min_size, body.max_size, body.page, body.page_size, body.sort, body.exact, body.search_paths))
@app.post("/api/search/{source_slug}")
def api_search_source(source_slug: str, body: SearchRequest):
if source_slug not in source_counts:
return JSONResponse({"error": "source not found", "results": [], "total": 0}, status_code=404)
return JSONResponse(search(body.q, [source_slug], body.folders, body.min_size, body.max_size, body.page, body.page_size, body.sort, body.exact, body.search_paths))
@app.get("/api/sources")
def api_sources():
return JSONResponse(sources)
@app.get("/api/extensions")
def api_extensions(source: Optional[str] = Query(default=None)):
pool = [records[idx] for idx in source_records_map.get(source, [])] if source else records
counts = {}
for rec in pool:
counts[rec["extension"]] = counts.get(rec["extension"], 0) + 1
return JSONResponse([{"name": ext, "count": count} for ext, count in sorted(counts.items())])
@app.get("/api/folders/{source_slug}")
def api_folders(source_slug: str):
return JSONResponse(folder_tree_data.get(source_slug, []))
@app.get("/api/folders/{source_slug}/contents")
def api_folder_contents(source_slug: str, path: str = Query(default="")):
return JSONResponse(get_folder_contents(source_slug, path))
@app.get("/api/random")
def api_random(source: Optional[str] = Query(default=None)):
pool = [records[idx] for idx in source_records_map.get(source, [])] if source else records
if not pool:
return JSONResponse({"error": "no record"}, status_code=404)
return JSONResponse(trim_record(random.choice(pool)))
@app.get("/api/preview/{doc_id}")
def api_preview(doc_id: str):
rec = record_map.get(doc_id)
if not rec:
return JSONResponse({"error": "not found"}, status_code=404)
file_path = get_doc_storage_path(rec)
if not file_path.exists():
return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404)
text = file_path.read_text(encoding="utf-8", errors="ignore")
return JSONResponse({"doc_id": doc_id, "title": rec["display_name"], "path": rec["display_rel_path"], "source": rec["source_name"], "text": text})
@app.get("/api/snippet/{doc_id}")
def api_snippet(doc_id: str, q: str = Query(default="")):
rec = record_map.get(doc_id)
if not rec:
return JSONResponse({"error": "not found"}, status_code=404)
file_path = get_doc_storage_path(rec)
if not file_path.exists():
return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404)
text = file_path.read_text(encoding="utf-8", errors="ignore")
return JSONResponse({"doc_id": doc_id, "snippet": build_first_match_snippet(text, q)})
@app.get("/api/download/{doc_id}")
def api_download(doc_id: str):
rec = record_map.get(doc_id)
if not rec:
return JSONResponse({"error": "not found"}, status_code=404)
file_path = get_doc_storage_path(rec)
if not file_path.exists():
return JSONResponse({"error": f"missing file: {file_path.name}"}, status_code=404)
encoded_filename = quote(f"{rec['display_name']}.txt", safe="")
return StreamingResponse(
iter([file_path.read_bytes()]),
media_type="text/plain; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"},
)
@app.post("/api/zip")
def api_zip(req: ZipRequest):
valid = []
for doc_id in req.doc_ids[:500]:
rec = record_map.get(doc_id)
if not rec:
continue
file_path = get_doc_storage_path(rec)
if not file_path.exists():
continue
valid.append((rec["display_rel_path"], file_path.read_bytes()))
if not valid:
return JSONResponse({"error": "no files"}, status_code=400)
try:
import io
import zipfile
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for display_path, data in valid:
zf.writestr(display_path, data)
buf.seek(0)
def chunks():
while True:
chunk = buf.read(65536)
if not chunk:
break
yield chunk
return StreamingResponse(
chunks(),
media_type="application/zip",
headers={"Content-Disposition": "attachment; filename=vomebook_batch.zip"},
)
except Exception as exc:
print(f"zip build failed: {exc}")
return JSONResponse({"error": f"zip failed: {exc}"}, status_code=500)
@app.get("/api/fulltext-manifest")
def api_fulltext_manifest():
return JSONResponse(load_json_gz(FULLTEXT_MANIFEST_PATH))
app.mount("/data", StaticFiles(directory=str(BASE_DIR / "data")), name="data")
app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static"), html=True), name="static")
app.mount("/icons", StaticFiles(directory=str(BASE_DIR / "static/icons")), name="icons")
@app.get("/manifest.json")
def serve_manifest():
return JSONResponse(json.loads((BASE_DIR / "static/manifest.json").read_text(encoding="utf-8")))
@app.get("/sw.js")
def serve_sw():
return PlainTextResponse((BASE_DIR / "static/sw.js").read_text(encoding="utf-8"), media_type="application/javascript")
@app.get("/{rest_of_path:path}")
async def serve_spa(rest_of_path: str):
return HTMLResponse((BASE_DIR / "static/index.html").read_text(encoding="utf-8"))