DoAn / scripts /build_data.py
hungnha's picture
build lại data
bf7ec12
"""Script build ChromaDB từ markdown files với incremental update."""
import sys
import argparse
from pathlib import Path
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv(usecwd=True))
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from core.rag.chunk import chunk_markdown_file
from core.rag.embedding_model import EmbeddingConfig, QwenEmbeddings
from core.rag.vector_store import ChromaConfig, ChromaVectorDB
from core.hash_file.hash_file import HashProcessor
_hasher = HashProcessor(verbose=False)
def get_db_file_info(db: ChromaVectorDB) -> dict:
"""Lấy thông tin files đã có trong DB (IDs và hash)."""
docs = db.get_all_documents()
file_to_ids = {}
file_to_hash = {}
for d in docs:
meta = d.get("metadata", {})
source = meta.get("source_basename") or meta.get("source_file")
doc_id = d.get("id")
content_hash = meta.get("content_hash", "")
if source and doc_id:
if source not in file_to_ids:
file_to_ids[source] = set()
file_to_ids[source].add(doc_id)
# Lưu hash đầu tiên tìm thấy cho file
if source not in file_to_hash and content_hash:
file_to_hash[source] = content_hash
return {"ids": file_to_ids, "hashes": file_to_hash}
def main():
parser = argparse.ArgumentParser(description="Build ChromaDB từ markdown files")
parser.add_argument("--force", action="store_true", help="Build lại tất cả files")
parser.add_argument("--no-delete", action="store_true", help="Không xóa docs orphaned")
args = parser.parse_args()
print("=" * 60)
print("BUILD HUST RAG DATABASE")
print("=" * 60)
# Bước 1: Khởi tạo embedder
print("\n[1/5] Khởi tạo embedder...")
emb_cfg = EmbeddingConfig()
emb = QwenEmbeddings(emb_cfg)
print(f" Model: {emb_cfg.model}")
print(f" API: {emb_cfg.api_base_url}")
# Bước 2: Khởi tạo ChromaDB
print("\n[2/5] Khởi tạo ChromaDB...")
db_cfg = ChromaConfig()
db = ChromaVectorDB(embedder=emb, config=db_cfg)
old_count = db.count()
print(f" Collection: {db_cfg.collection_name}")
print(f" Số docs hiện tại: {old_count}")
# Lấy trạng thái hiện tại của DB
db_info = {"ids": {}, "hashes": {}}
if not args.force and old_count > 0:
print("\n Đang quét documents trong DB...")
db_info = get_db_file_info(db)
print(f" Tìm thấy {len(db_info['ids'])} source files trong DB")
# Bước 3: Quét markdown files
print("\n[3/5] Quét markdown files...")
root = REPO_ROOT / "data" / "data_process"
md_files = sorted(root.rglob("*.md"))
print(f" Tìm thấy {len(md_files)} markdown files trên disk")
# So sánh files trên disk vs trong DB
current_files = {f.name for f in md_files}
db_files = set(db_info["ids"].keys())
# Tìm files cần xóa (có trong DB nhưng không có trên disk)
files_to_delete = db_files - current_files
# Bước 4: Xóa docs orphaned
deleted_count = 0
if files_to_delete and not args.no_delete:
print(f"\n[4/5] Dọn dẹp {len(files_to_delete)} files đã xóa...")
for filename in files_to_delete:
doc_ids = list(db_info["ids"].get(filename, []))
if doc_ids:
db.delete_documents(doc_ids)
deleted_count += len(doc_ids)
print(f" Đã xóa: {filename} ({len(doc_ids)} chunks)")
else:
print("\n[4/5] Không có files cần xóa")
# Bước 5: Xử lý markdown files (thêm mới, cập nhật)
print("\n[5/5] Xử lý markdown files...")
total_added = 0
total_updated = 0
skipped = 0
for i, f in enumerate(md_files, 1):
file_hash = _hasher.get_file_hash(str(f))
db_hash = db_info["hashes"].get(f.name, "")
existing_ids = db_info["ids"].get(f.name, set())
# Bỏ qua nếu hash khớp (file không thay đổi)
if not args.force and db_hash == file_hash:
print(f" [{i}/{len(md_files)}] {f.name}: BỎ QUA (không đổi)")
skipped += 1
continue
# Nếu file thay đổi, xóa chunks cũ trước
if existing_ids and not args.force:
db.delete_documents(list(existing_ids))
print(f" [{i}/{len(md_files)}] {f.name}: CẬP NHẬT (xóa {len(existing_ids)} chunks cũ)")
is_update = True
else:
is_update = False
try:
docs = chunk_markdown_file(f)
if docs:
# Thêm hash vào metadata để phát hiện thay đổi lần sau
for doc in docs:
if hasattr(doc, 'metadata'):
doc.metadata["content_hash"] = file_hash
elif isinstance(doc, dict) and "metadata" in doc:
doc["metadata"]["content_hash"] = file_hash
n = db.upsert_documents(docs)
if is_update:
total_updated += n
print(f" [{i}/{len(md_files)}] {f.name}: +{n} chunks mới")
else:
total_added += n
print(f" [{i}/{len(md_files)}] {f.name}: {n} chunks")
else:
print(f" [{i}/{len(md_files)}] {f.name}: BỎ QUA (không có chunks)")
except Exception as e:
print(f" [{i}/{len(md_files)}] {f.name}: LỖI - {e}")
# Tổng kết
new_count = db.count()
has_changes = deleted_count > 0 or total_updated > 0 or total_added > 0
# Xóa BM25 cache nếu có thay đổi (vì BM25 không hỗ trợ incremental update)
if has_changes:
bm25_cache = REPO_ROOT / "data" / "chroma" / "bm25_cache.pkl"
if bm25_cache.exists():
bm25_cache.unlink()
print("\n[!] Đã xóa BM25 cache (sẽ tự rebuild khi query)")
print(f"\n{'=' * 60}")
print("TỔNG KẾT")
print("=" * 60)
print(f" Đã xóa (orphaned): {deleted_count} chunks")
print(f" Đã cập nhật: {total_updated} chunks")
print(f" Đã thêm mới: {total_added} chunks")
print(f" Đã bỏ qua: {skipped} files")
print(f" Số docs trong DB: {old_count} -> {new_count} ({new_count - old_count:+d})")
print("\nHOÀN TẤT!")
if __name__ == "__main__":
main()