vietqa-api / src /utils /ingestion.py
quanho114
Deploy VietQA API
ebb8326
"""Knowledge base ingestion utilities for Qdrant vector store."""
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import re
from langchain_qdrant import QdrantVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
from tqdm import tqdm
from src.config import DATA_DIR, settings
from src.utils.common import normalize_text
from src.utils.doc_parsers import load_document
from src.utils.embeddings import get_embeddings
from src.utils.logging import log_pipeline
SUPPORTED_EXTENSIONS = {".json", ".pdf", ".docx", ".txt"}
INGESTION_BATCH_SIZE = 100
MAX_WORKERS = 4
JUNK_PATTERNS = [
r"đăng nhập", r"đăng ký", r"quên mật khẩu", r"chia sẻ qua email",
r"bản quyền thuộc", r"liên hệ quảng cáo", r"về đầu trang",
r"xem thêm", r"bình luận", r"báo xấu", r"trang chủ",
r"facebook", r"twitter", r"linkedin", r"zalo",
r"kết nối với chúng tôi", r"thông tin tòa soạn",
r"wikipedia", r"bách khoa toàn thư", r"sửa đổi", r"biểu quyết",
]
_qdrant_client: QdrantClient | None = None
_vector_store: QdrantVectorStore | None = None
def get_qdrant_client() -> QdrantClient:
"""Get or create persistent Qdrant client singleton."""
global _qdrant_client
if _qdrant_client is None:
db_path = settings.vector_db_path_resolved
db_path.parent.mkdir(parents=True, exist_ok=True)
_qdrant_client = QdrantClient(path=str(db_path))
return _qdrant_client
def get_vector_store() -> QdrantVectorStore:
"""Get the global vector store instance (Lazy load)."""
global _vector_store
if _vector_store is None:
client = get_qdrant_client()
embeddings = get_embeddings()
_vector_store = QdrantVectorStore(
client=client,
collection_name=settings.qdrant_collection,
embedding=embeddings,
)
return _vector_store
def _is_junk_text(text: str) -> bool:
"""Check if text is junk (nav, footer, ads)."""
if len(text.split()) < 5:
return True
text_lower = text.lower()
for pattern in JUNK_PATTERNS:
if re.search(pattern, text_lower):
return True
return False
def _prepend_title_to_chunk(chunk_text: str, title: str) -> str:
"""Prepend title to chunk content for better context in embeddings."""
if title and title.strip():
return f"Title: {title.strip()}\nContent: {chunk_text}"
return chunk_text
def _initialize_collection(
client: QdrantClient,
collection_name: str,
vector_size: int,
force_recreate: bool = False,
) -> None:
"""Initialize Qdrant collection, creating if needed."""
collection_exists = client.collection_exists(collection_name)
if collection_exists and force_recreate:
client.delete_collection(collection_name)
collection_exists = False
if not collection_exists:
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE),
)
def _get_text_splitter() -> RecursiveCharacterTextSplitter:
"""Create a text splitter with standard settings."""
return RecursiveCharacterTextSplitter(
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap,
separators=["\n\n", "\n", ".", " ", ""],
)
def _process_crawled_json(json_path: Path) -> tuple[list[str], list[dict]]:
"""Process crawled JSON file, normalize content, and return (chunks, metadatas)."""
with open(json_path, encoding="utf-8") as f:
data = json.load(f)
documents = data.get("documents", [])
if not documents:
return [], []
splitter = _get_text_splitter()
all_chunks = []
all_metadatas = []
for doc in documents:
content = normalize_text(doc.get("content", ""))
if not content:
continue
title = normalize_text(doc.get("title", ""))
keywords_raw = doc.get("keywords")
keywords_str = ""
if isinstance(keywords_raw, list):
keywords_str = ",".join([str(k) for k in keywords_raw if k])
elif isinstance(keywords_raw, str):
keywords_str = keywords_raw
base_metadata = {
"source_url": doc.get("url", ""),
"title": title,
"summary": normalize_text(doc.get("summary", "")),
"topic": data.get("topic", ""),
"keywords": keywords_str,
"domain": data.get("domain", ""),
"source_file": str(json_path),
}
raw_chunks = splitter.split_text(content)
total_raw_chunks = len(raw_chunks)
for i, chunk in enumerate(raw_chunks):
if _is_junk_text(chunk):
continue
chunk_metadata = base_metadata.copy()
chunk_metadata["chunk_index"] = i
chunk_metadata["total_chunks"] = total_raw_chunks
chunk_with_title = _prepend_title_to_chunk(chunk, title)
all_chunks.append(chunk_with_title)
all_metadatas.append(chunk_metadata)
return all_chunks, all_metadatas
def _scan_data_files(base_dir: Path) -> list[Path]:
"""Recursively scan directory for supported files."""
files = []
for ext in SUPPORTED_EXTENSIONS:
files.extend(base_dir.rglob(f"*{ext}"))
return sorted(files)
def _extract_chunks_from_file(
file_path: Path,
splitter: RecursiveCharacterTextSplitter,
) -> tuple[list[str], list[dict], str | None]:
"""Extract chunks and metadata from a single file.
Returns:
Tuple of (chunks, metadatas, error_message)
"""
try:
if file_path.suffix.lower() == ".json":
chunks, metas = _process_crawled_json(file_path)
return chunks, metas, None
text, metadata = load_document(file_path)
if not text or not metadata:
return [], [], None
title = metadata.get("title", "")
chunks = splitter.split_text(text)
processed_chunks = []
metadatas = []
for i, chunk in enumerate(chunks):
chunk_with_title = _prepend_title_to_chunk(chunk, title)
processed_chunks.append(chunk_with_title)
chunk_meta = metadata.copy()
chunk_meta["chunk_index"] = i
chunk_meta["total_chunks"] = len(chunks)
metadatas.append(chunk_meta)
return processed_chunks, metadatas, None
except Exception as e:
return [], [], str(e)
def _parse_files_parallel(
files: list[Path],
max_workers: int = MAX_WORKERS,
) -> tuple[list[str], list[dict], int, int, int]:
"""Parse all files in parallel using ThreadPoolExecutor.
Returns:
Tuple of (all_chunks, all_metadatas, total_docs, failed_files, skipped_files)
"""
splitter = _get_text_splitter()
all_chunks: list[str] = []
all_metadatas: list[dict] = []
total_docs = 0
failed_files = 0
skipped_files = 0
log_pipeline(f"Parsing {len(files)} files with {max_workers} workers...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(_extract_chunks_from_file, f, splitter): f
for f in files
}
with tqdm(total=len(files), desc="Parsing files", unit="file") as pbar:
for future in as_completed(future_to_file):
file_path = future_to_file[future]
pbar.set_postfix_str(f"{file_path.name}")
chunks, metas, error = future.result()
if error:
tqdm.write(f" [Error] {file_path.name}: {error}")
failed_files += 1
elif chunks:
all_chunks.extend(chunks)
all_metadatas.extend(metas)
total_docs += 1
else:
skipped_files += 1
pbar.update(1)
log_pipeline(f"Parsed {total_docs} files -> {len(all_chunks)} chunks")
if failed_files > 0:
log_pipeline(f"Failed: {failed_files} files")
if skipped_files > 0:
log_pipeline(f"Skipped (empty): {skipped_files} files")
return all_chunks, all_metadatas, total_docs, failed_files, skipped_files
def _ingest_chunks_batched(
chunks: list[str],
metadatas: list[dict],
vector_store: QdrantVectorStore,
batch_size: int = INGESTION_BATCH_SIZE,
) -> int:
"""Ingest chunks into vector store in large batches.
Returns:
Number of chunks ingested
"""
if not chunks:
return 0
total = len(chunks)
num_batches = (total + batch_size - 1) // batch_size
log_pipeline(f"Ingesting {total} chunks in {num_batches} batches (batch_size={batch_size})...")
ingested = 0
with tqdm(total=num_batches, desc="Ingesting batches", unit="batch") as pbar:
for i in range(0, total, batch_size):
batch_chunks = chunks[i:i + batch_size]
batch_metas = metadatas[i:i + batch_size]
vector_store.add_texts(
batch_chunks,
metadatas=batch_metas,
batch_size=len(batch_chunks),
)
ingested += len(batch_chunks)
pbar.set_postfix_str(f"{ingested}/{total} chunks")
pbar.update(1)
return ingested
def _process_and_index_documents(
files: list[Path],
vector_store: QdrantVectorStore,
batch_size: int = INGESTION_BATCH_SIZE,
max_workers: int = MAX_WORKERS,
) -> tuple[int, int, int]:
"""Process files in parallel and ingest in batched mode.
Returns:
Tuple of (total_chunks, total_docs, failed_files)
"""
all_chunks, all_metadatas, total_docs, failed_files, _ = _parse_files_parallel(
files, max_workers=max_workers
)
if not all_chunks:
return 0, 0, failed_files
total_chunks = _ingest_chunks_batched(
all_chunks, all_metadatas, vector_store, batch_size=batch_size
)
return total_chunks, total_docs, failed_files
def ingest_all_data(
base_dir: Path | None = None,
force: bool = False,
batch_size: int = INGESTION_BATCH_SIZE,
max_workers: int = MAX_WORKERS,
) -> QdrantVectorStore:
"""Ingest all data from crawled JSON and documents into Qdrant.
Uses parallel file parsing and batched ingestion for speed.
Args:
base_dir: Directory to scan (default: DATA_DIR)
force: If True, wipe collection and re-ingest everything
batch_size: Number of chunks per batch for vector store insertion
max_workers: Number of parallel workers for file parsing
Returns:
QdrantVectorStore instance
"""
global _vector_store
base_dir = base_dir or DATA_DIR
embeddings = get_embeddings()
client = get_qdrant_client()
collection_name = settings.qdrant_collection
collection_exists = client.collection_exists(collection_name)
if collection_exists and not force:
log_pipeline(f"Loading existing vector store: {settings.vector_db_path_resolved}")
_vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name,
embedding=embeddings,
)
return _vector_store
if force and collection_exists:
log_pipeline(f"Force re-ingesting: deleting collection '{collection_name}'")
files = _scan_data_files(base_dir)
sample_embedding = embeddings.embed_query("test")
_initialize_collection(client, collection_name, len(sample_embedding), force_recreate=force)
_vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name,
embedding=embeddings,
)
if not files:
log_pipeline(f"No supported files found in {base_dir}")
return _vector_store
log_pipeline(f"Found {len(files)} files to ingest from {base_dir}")
total_chunks, total_docs, failed_files = _process_and_index_documents(
files, _vector_store, batch_size=batch_size, max_workers=max_workers
)
log_pipeline(f"Ingestion complete: {total_docs} files, {total_chunks} chunks")
if failed_files > 0:
log_pipeline(f"Failed files: {failed_files}")
log_pipeline(f"Collection: '{collection_name}'")
return _vector_store
def ingest_files(
file_paths: list[Path],
collection_name: str | None = None,
append: bool = False,
batch_size: int = INGESTION_BATCH_SIZE,
max_workers: int = MAX_WORKERS,
) -> int:
"""Ingest specific files into Qdrant with parallel parsing and batched insertion.
Args:
file_paths: List of file paths to ingest
collection_name: Optional collection name (default from settings)
append: If True, append to existing collection; otherwise recreate
batch_size: Number of chunks per batch for vector store insertion
max_workers: Number of parallel workers for file parsing
Returns:
Number of chunks ingested
"""
collection_name = collection_name or settings.qdrant_collection
embeddings = get_embeddings()
client = get_qdrant_client()
sample_embedding = embeddings.embed_query("test")
_initialize_collection(client, collection_name, len(sample_embedding), force_recreate=not append)
vector_store = QdrantVectorStore(
client=client,
collection_name=collection_name,
embedding=embeddings,
)
total_chunks, _, _ = _process_and_index_documents(
file_paths, vector_store, batch_size=batch_size, max_workers=max_workers
)
log_pipeline(f"Total: {total_chunks} chunks in '{collection_name}'")
return total_chunks