|
|
"""Script build ChromaDB từ markdown files với incremental update.""" |
|
|
|
|
|
import sys |
|
|
import argparse |
|
|
from pathlib import Path |
|
|
from dotenv import find_dotenv, load_dotenv |
|
|
|
|
|
load_dotenv(find_dotenv(usecwd=True)) |
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1] |
|
|
if str(REPO_ROOT) not in sys.path: |
|
|
sys.path.insert(0, str(REPO_ROOT)) |
|
|
|
|
|
from core.rag.chunk import chunk_markdown_file |
|
|
from core.rag.embedding_model import EmbeddingConfig, QwenEmbeddings |
|
|
from core.rag.vector_store import ChromaConfig, ChromaVectorDB |
|
|
from core.hash_file.hash_file import HashProcessor |
|
|
|
|
|
_hasher = HashProcessor(verbose=False) |
|
|
|
|
|
|
|
|
def get_db_file_info(db: ChromaVectorDB) -> dict: |
|
|
"""Lấy thông tin files đã có trong DB (IDs và hash).""" |
|
|
docs = db.get_all_documents() |
|
|
file_to_ids = {} |
|
|
file_to_hash = {} |
|
|
|
|
|
for d in docs: |
|
|
meta = d.get("metadata", {}) |
|
|
source = meta.get("source_basename") or meta.get("source_file") |
|
|
doc_id = d.get("id") |
|
|
content_hash = meta.get("content_hash", "") |
|
|
|
|
|
if source and doc_id: |
|
|
if source not in file_to_ids: |
|
|
file_to_ids[source] = set() |
|
|
file_to_ids[source].add(doc_id) |
|
|
|
|
|
|
|
|
if source not in file_to_hash and content_hash: |
|
|
file_to_hash[source] = content_hash |
|
|
|
|
|
return {"ids": file_to_ids, "hashes": file_to_hash} |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Build ChromaDB từ markdown files") |
|
|
parser.add_argument("--force", action="store_true", help="Build lại tất cả files") |
|
|
parser.add_argument("--no-delete", action="store_true", help="Không xóa docs orphaned") |
|
|
args = parser.parse_args() |
|
|
|
|
|
print("=" * 60) |
|
|
print("BUILD HUST RAG DATABASE") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
print("\n[1/5] Khởi tạo embedder...") |
|
|
emb_cfg = EmbeddingConfig() |
|
|
emb = QwenEmbeddings(emb_cfg) |
|
|
print(f" Model: {emb_cfg.model}") |
|
|
print(f" API: {emb_cfg.api_base_url}") |
|
|
|
|
|
|
|
|
print("\n[2/5] Khởi tạo ChromaDB...") |
|
|
db_cfg = ChromaConfig() |
|
|
db = ChromaVectorDB(embedder=emb, config=db_cfg) |
|
|
old_count = db.count() |
|
|
print(f" Collection: {db_cfg.collection_name}") |
|
|
print(f" Số docs hiện tại: {old_count}") |
|
|
|
|
|
|
|
|
db_info = {"ids": {}, "hashes": {}} |
|
|
if not args.force and old_count > 0: |
|
|
print("\n Đang quét documents trong DB...") |
|
|
db_info = get_db_file_info(db) |
|
|
print(f" Tìm thấy {len(db_info['ids'])} source files trong DB") |
|
|
|
|
|
|
|
|
print("\n[3/5] Quét markdown files...") |
|
|
root = REPO_ROOT / "data" / "data_process" |
|
|
md_files = sorted(root.rglob("*.md")) |
|
|
print(f" Tìm thấy {len(md_files)} markdown files trên disk") |
|
|
|
|
|
|
|
|
current_files = {f.name for f in md_files} |
|
|
db_files = set(db_info["ids"].keys()) |
|
|
|
|
|
|
|
|
files_to_delete = db_files - current_files |
|
|
|
|
|
|
|
|
deleted_count = 0 |
|
|
if files_to_delete and not args.no_delete: |
|
|
print(f"\n[4/5] Dọn dẹp {len(files_to_delete)} files đã xóa...") |
|
|
for filename in files_to_delete: |
|
|
doc_ids = list(db_info["ids"].get(filename, [])) |
|
|
if doc_ids: |
|
|
db.delete_documents(doc_ids) |
|
|
deleted_count += len(doc_ids) |
|
|
print(f" Đã xóa: {filename} ({len(doc_ids)} chunks)") |
|
|
else: |
|
|
print("\n[4/5] Không có files cần xóa") |
|
|
|
|
|
|
|
|
print("\n[5/5] Xử lý markdown files...") |
|
|
total_added = 0 |
|
|
total_updated = 0 |
|
|
skipped = 0 |
|
|
|
|
|
for i, f in enumerate(md_files, 1): |
|
|
file_hash = _hasher.get_file_hash(str(f)) |
|
|
db_hash = db_info["hashes"].get(f.name, "") |
|
|
existing_ids = db_info["ids"].get(f.name, set()) |
|
|
|
|
|
|
|
|
if not args.force and db_hash == file_hash: |
|
|
print(f" [{i}/{len(md_files)}] {f.name}: BỎ QUA (không đổi)") |
|
|
skipped += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if existing_ids and not args.force: |
|
|
db.delete_documents(list(existing_ids)) |
|
|
print(f" [{i}/{len(md_files)}] {f.name}: CẬP NHẬT (xóa {len(existing_ids)} chunks cũ)") |
|
|
is_update = True |
|
|
else: |
|
|
is_update = False |
|
|
|
|
|
try: |
|
|
docs = chunk_markdown_file(f) |
|
|
if docs: |
|
|
|
|
|
for doc in docs: |
|
|
if hasattr(doc, 'metadata'): |
|
|
doc.metadata["content_hash"] = file_hash |
|
|
elif isinstance(doc, dict) and "metadata" in doc: |
|
|
doc["metadata"]["content_hash"] = file_hash |
|
|
|
|
|
n = db.upsert_documents(docs) |
|
|
if is_update: |
|
|
total_updated += n |
|
|
print(f" [{i}/{len(md_files)}] {f.name}: +{n} chunks mới") |
|
|
else: |
|
|
total_added += n |
|
|
print(f" [{i}/{len(md_files)}] {f.name}: {n} chunks") |
|
|
else: |
|
|
print(f" [{i}/{len(md_files)}] {f.name}: BỎ QUA (không có chunks)") |
|
|
except Exception as e: |
|
|
print(f" [{i}/{len(md_files)}] {f.name}: LỖI - {e}") |
|
|
|
|
|
|
|
|
new_count = db.count() |
|
|
has_changes = deleted_count > 0 or total_updated > 0 or total_added > 0 |
|
|
|
|
|
|
|
|
if has_changes: |
|
|
bm25_cache = REPO_ROOT / "data" / "chroma" / "bm25_cache.pkl" |
|
|
if bm25_cache.exists(): |
|
|
bm25_cache.unlink() |
|
|
print("\n[!] Đã xóa BM25 cache (sẽ tự rebuild khi query)") |
|
|
|
|
|
print(f"\n{'=' * 60}") |
|
|
print("TỔNG KẾT") |
|
|
print("=" * 60) |
|
|
print(f" Đã xóa (orphaned): {deleted_count} chunks") |
|
|
print(f" Đã cập nhật: {total_updated} chunks") |
|
|
print(f" Đã thêm mới: {total_added} chunks") |
|
|
print(f" Đã bỏ qua: {skipped} files") |
|
|
print(f" Số docs trong DB: {old_count} -> {new_count} ({new_count - old_count:+d})") |
|
|
|
|
|
print("\nHOÀN TẤT!") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |