File size: 2,878 Bytes
44c5827 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | import os, json, glob, pickle, logging
from typing import List, Dict, Any
from underthesea import word_tokenize
from rank_bm25 import BM25Okapi
from pathlib import Path
# ---------------------------
# Config & Logging
# ---------------------------
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
logger = logging.getLogger("bm25_local_indexer")
BASE_DIR = Path(__file__).resolve().parent.parent
CHUNKS_DIR = BASE_DIR / "chunks"
INDEX_OUT = BASE_DIR / "bm25_index.pkl"
MAX_TOKENS = 512
# ---------------------------
# Tokenization (Tiếng Việt)
# ---------------------------
def tokenize_vi(text: str) -> List[str]:
return word_tokenize(text, format="text").lower().split()
# ---------------------------
# Load Chunks
# ---------------------------
def load_chunks(chunks_dir: Path) -> List[Dict[str, Any]]:
files = chunks_dir.glob("*.json")
docs = []
for fp in files:
# Skip manifest files
if "manifest" in fp.name.lower():
logger.info("Skipping manifest file: %s", fp)
continue
try:
with open(fp, "r", encoding="utf-8") as f:
ch = json.load(f)
except Exception as e:
logger.warning("Failed to read %s: %s", fp, e)
continue
text = ch.get("chunk_text", "")
docs.append({
"id": ch["id"],
"doc_id": ch.get("doc_id"),
"path": ch.get("path"),
"text": text,
"chunk_for_embedding": ch.get("chunk_for_embedding"),
"token_count": ch.get("token_count")
})
logger.info("Loaded %d chunks", len(docs))
return docs
# ---------------------------
# Build BM25 Index
# ---------------------------
def build_bm25_index(chunks: List[Dict[str, Any]]) -> Dict[str, Any]:
corpus = [tokenize_vi(c["text"]) for c in chunks]
bm25 = BM25Okapi(corpus)
index = {
"bm25": bm25,
"chunks": chunks,
"tokenized_corpus": corpus
}
return index
# ---------------------------
# Save & Load index
# ---------------------------
def save_index(index: Dict[str, Any], out_path: str):
with open(out_path, "wb") as f:
pickle.dump(index, f)
logger.info("Saved BM25 index to %s", out_path)
def load_index(path: str) -> Dict[str, Any]:
with open(path, "rb") as f:
return pickle.load(f)
# ---------------------------
# CLI
# ---------------------------
def main(reindex: bool, check: bool):
if reindex:
chunks = load_chunks(CHUNKS_DIR)
idx = build_bm25_index(chunks)
save_index(idx, INDEX_OUT)
if check:
idx = load_index(INDEX_OUT)
logger.info("Index contains %d chunks", len(idx["chunks"]))
if __name__ == "__main__":
main(reindex=True, check=True) |