Spaces:
Sleeping
Sleeping
| """Knowledge base ingestion utilities for Qdrant vector store.""" | |
| import json | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| import re | |
| from langchain_qdrant import QdrantVectorStore | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams | |
| from tqdm import tqdm | |
| from src.config import DATA_DIR, settings | |
| from src.utils.common import normalize_text | |
| from src.utils.doc_parsers import load_document | |
| from src.utils.embeddings import get_embeddings | |
| from src.utils.logging import log_pipeline | |
| SUPPORTED_EXTENSIONS = {".json", ".pdf", ".docx", ".txt"} | |
| INGESTION_BATCH_SIZE = 100 | |
| MAX_WORKERS = 4 | |
| JUNK_PATTERNS = [ | |
| r"đăng nhập", r"đăng ký", r"quên mật khẩu", r"chia sẻ qua email", | |
| r"bản quyền thuộc", r"liên hệ quảng cáo", r"về đầu trang", | |
| r"xem thêm", r"bình luận", r"báo xấu", r"trang chủ", | |
| r"facebook", r"twitter", r"linkedin", r"zalo", | |
| r"kết nối với chúng tôi", r"thông tin tòa soạn", | |
| r"wikipedia", r"bách khoa toàn thư", r"sửa đổi", r"biểu quyết", | |
| ] | |
| _qdrant_client: QdrantClient | None = None | |
| _vector_store: QdrantVectorStore | None = None | |
| def get_qdrant_client() -> QdrantClient: | |
| """Get or create persistent Qdrant client singleton.""" | |
| global _qdrant_client | |
| if _qdrant_client is None: | |
| db_path = settings.vector_db_path_resolved | |
| db_path.parent.mkdir(parents=True, exist_ok=True) | |
| _qdrant_client = QdrantClient(path=str(db_path)) | |
| return _qdrant_client | |
| def get_vector_store() -> QdrantVectorStore: | |
| """Get the global vector store instance (Lazy load).""" | |
| global _vector_store | |
| if _vector_store is None: | |
| client = get_qdrant_client() | |
| embeddings = get_embeddings() | |
| _vector_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=settings.qdrant_collection, | |
| embedding=embeddings, | |
| ) | |
| return _vector_store | |
| def _is_junk_text(text: str) -> bool: | |
| """Check if text is junk (nav, footer, ads).""" | |
| if len(text.split()) < 5: | |
| return True | |
| text_lower = text.lower() | |
| for pattern in JUNK_PATTERNS: | |
| if re.search(pattern, text_lower): | |
| return True | |
| return False | |
| def _prepend_title_to_chunk(chunk_text: str, title: str) -> str: | |
| """Prepend title to chunk content for better context in embeddings.""" | |
| if title and title.strip(): | |
| return f"Title: {title.strip()}\nContent: {chunk_text}" | |
| return chunk_text | |
| def _initialize_collection( | |
| client: QdrantClient, | |
| collection_name: str, | |
| vector_size: int, | |
| force_recreate: bool = False, | |
| ) -> None: | |
| """Initialize Qdrant collection, creating if needed.""" | |
| collection_exists = client.collection_exists(collection_name) | |
| if collection_exists and force_recreate: | |
| client.delete_collection(collection_name) | |
| collection_exists = False | |
| if not collection_exists: | |
| client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE), | |
| ) | |
| def _get_text_splitter() -> RecursiveCharacterTextSplitter: | |
| """Create a text splitter with standard settings.""" | |
| return RecursiveCharacterTextSplitter( | |
| chunk_size=settings.chunk_size, | |
| chunk_overlap=settings.chunk_overlap, | |
| separators=["\n\n", "\n", ".", " ", ""], | |
| ) | |
| def _process_crawled_json(json_path: Path) -> tuple[list[str], list[dict]]: | |
| """Process crawled JSON file, normalize content, and return (chunks, metadatas).""" | |
| with open(json_path, encoding="utf-8") as f: | |
| data = json.load(f) | |
| documents = data.get("documents", []) | |
| if not documents: | |
| return [], [] | |
| splitter = _get_text_splitter() | |
| all_chunks = [] | |
| all_metadatas = [] | |
| for doc in documents: | |
| content = normalize_text(doc.get("content", "")) | |
| if not content: | |
| continue | |
| title = normalize_text(doc.get("title", "")) | |
| keywords_raw = doc.get("keywords") | |
| keywords_str = "" | |
| if isinstance(keywords_raw, list): | |
| keywords_str = ",".join([str(k) for k in keywords_raw if k]) | |
| elif isinstance(keywords_raw, str): | |
| keywords_str = keywords_raw | |
| base_metadata = { | |
| "source_url": doc.get("url", ""), | |
| "title": title, | |
| "summary": normalize_text(doc.get("summary", "")), | |
| "topic": data.get("topic", ""), | |
| "keywords": keywords_str, | |
| "domain": data.get("domain", ""), | |
| "source_file": str(json_path), | |
| } | |
| raw_chunks = splitter.split_text(content) | |
| total_raw_chunks = len(raw_chunks) | |
| for i, chunk in enumerate(raw_chunks): | |
| if _is_junk_text(chunk): | |
| continue | |
| chunk_metadata = base_metadata.copy() | |
| chunk_metadata["chunk_index"] = i | |
| chunk_metadata["total_chunks"] = total_raw_chunks | |
| chunk_with_title = _prepend_title_to_chunk(chunk, title) | |
| all_chunks.append(chunk_with_title) | |
| all_metadatas.append(chunk_metadata) | |
| return all_chunks, all_metadatas | |
| def _scan_data_files(base_dir: Path) -> list[Path]: | |
| """Recursively scan directory for supported files.""" | |
| files = [] | |
| for ext in SUPPORTED_EXTENSIONS: | |
| files.extend(base_dir.rglob(f"*{ext}")) | |
| return sorted(files) | |
| def _extract_chunks_from_file( | |
| file_path: Path, | |
| splitter: RecursiveCharacterTextSplitter, | |
| ) -> tuple[list[str], list[dict], str | None]: | |
| """Extract chunks and metadata from a single file. | |
| Returns: | |
| Tuple of (chunks, metadatas, error_message) | |
| """ | |
| try: | |
| if file_path.suffix.lower() == ".json": | |
| chunks, metas = _process_crawled_json(file_path) | |
| return chunks, metas, None | |
| text, metadata = load_document(file_path) | |
| if not text or not metadata: | |
| return [], [], None | |
| title = metadata.get("title", "") | |
| chunks = splitter.split_text(text) | |
| processed_chunks = [] | |
| metadatas = [] | |
| for i, chunk in enumerate(chunks): | |
| chunk_with_title = _prepend_title_to_chunk(chunk, title) | |
| processed_chunks.append(chunk_with_title) | |
| chunk_meta = metadata.copy() | |
| chunk_meta["chunk_index"] = i | |
| chunk_meta["total_chunks"] = len(chunks) | |
| metadatas.append(chunk_meta) | |
| return processed_chunks, metadatas, None | |
| except Exception as e: | |
| return [], [], str(e) | |
| def _parse_files_parallel( | |
| files: list[Path], | |
| max_workers: int = MAX_WORKERS, | |
| ) -> tuple[list[str], list[dict], int, int, int]: | |
| """Parse all files in parallel using ThreadPoolExecutor. | |
| Returns: | |
| Tuple of (all_chunks, all_metadatas, total_docs, failed_files, skipped_files) | |
| """ | |
| splitter = _get_text_splitter() | |
| all_chunks: list[str] = [] | |
| all_metadatas: list[dict] = [] | |
| total_docs = 0 | |
| failed_files = 0 | |
| skipped_files = 0 | |
| log_pipeline(f"Parsing {len(files)} files with {max_workers} workers...") | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_file = { | |
| executor.submit(_extract_chunks_from_file, f, splitter): f | |
| for f in files | |
| } | |
| with tqdm(total=len(files), desc="Parsing files", unit="file") as pbar: | |
| for future in as_completed(future_to_file): | |
| file_path = future_to_file[future] | |
| pbar.set_postfix_str(f"{file_path.name}") | |
| chunks, metas, error = future.result() | |
| if error: | |
| tqdm.write(f" [Error] {file_path.name}: {error}") | |
| failed_files += 1 | |
| elif chunks: | |
| all_chunks.extend(chunks) | |
| all_metadatas.extend(metas) | |
| total_docs += 1 | |
| else: | |
| skipped_files += 1 | |
| pbar.update(1) | |
| log_pipeline(f"Parsed {total_docs} files -> {len(all_chunks)} chunks") | |
| if failed_files > 0: | |
| log_pipeline(f"Failed: {failed_files} files") | |
| if skipped_files > 0: | |
| log_pipeline(f"Skipped (empty): {skipped_files} files") | |
| return all_chunks, all_metadatas, total_docs, failed_files, skipped_files | |
| def _ingest_chunks_batched( | |
| chunks: list[str], | |
| metadatas: list[dict], | |
| vector_store: QdrantVectorStore, | |
| batch_size: int = INGESTION_BATCH_SIZE, | |
| ) -> int: | |
| """Ingest chunks into vector store in large batches. | |
| Returns: | |
| Number of chunks ingested | |
| """ | |
| if not chunks: | |
| return 0 | |
| total = len(chunks) | |
| num_batches = (total + batch_size - 1) // batch_size | |
| log_pipeline(f"Ingesting {total} chunks in {num_batches} batches (batch_size={batch_size})...") | |
| ingested = 0 | |
| with tqdm(total=num_batches, desc="Ingesting batches", unit="batch") as pbar: | |
| for i in range(0, total, batch_size): | |
| batch_chunks = chunks[i:i + batch_size] | |
| batch_metas = metadatas[i:i + batch_size] | |
| vector_store.add_texts( | |
| batch_chunks, | |
| metadatas=batch_metas, | |
| batch_size=len(batch_chunks), | |
| ) | |
| ingested += len(batch_chunks) | |
| pbar.set_postfix_str(f"{ingested}/{total} chunks") | |
| pbar.update(1) | |
| return ingested | |
| def _process_and_index_documents( | |
| files: list[Path], | |
| vector_store: QdrantVectorStore, | |
| batch_size: int = INGESTION_BATCH_SIZE, | |
| max_workers: int = MAX_WORKERS, | |
| ) -> tuple[int, int, int]: | |
| """Process files in parallel and ingest in batched mode. | |
| Returns: | |
| Tuple of (total_chunks, total_docs, failed_files) | |
| """ | |
| all_chunks, all_metadatas, total_docs, failed_files, _ = _parse_files_parallel( | |
| files, max_workers=max_workers | |
| ) | |
| if not all_chunks: | |
| return 0, 0, failed_files | |
| total_chunks = _ingest_chunks_batched( | |
| all_chunks, all_metadatas, vector_store, batch_size=batch_size | |
| ) | |
| return total_chunks, total_docs, failed_files | |
| def ingest_all_data( | |
| base_dir: Path | None = None, | |
| force: bool = False, | |
| batch_size: int = INGESTION_BATCH_SIZE, | |
| max_workers: int = MAX_WORKERS, | |
| ) -> QdrantVectorStore: | |
| """Ingest all data from crawled JSON and documents into Qdrant. | |
| Uses parallel file parsing and batched ingestion for speed. | |
| Args: | |
| base_dir: Directory to scan (default: DATA_DIR) | |
| force: If True, wipe collection and re-ingest everything | |
| batch_size: Number of chunks per batch for vector store insertion | |
| max_workers: Number of parallel workers for file parsing | |
| Returns: | |
| QdrantVectorStore instance | |
| """ | |
| global _vector_store | |
| base_dir = base_dir or DATA_DIR | |
| embeddings = get_embeddings() | |
| client = get_qdrant_client() | |
| collection_name = settings.qdrant_collection | |
| collection_exists = client.collection_exists(collection_name) | |
| if collection_exists and not force: | |
| log_pipeline(f"Loading existing vector store: {settings.vector_db_path_resolved}") | |
| _vector_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=collection_name, | |
| embedding=embeddings, | |
| ) | |
| return _vector_store | |
| if force and collection_exists: | |
| log_pipeline(f"Force re-ingesting: deleting collection '{collection_name}'") | |
| files = _scan_data_files(base_dir) | |
| sample_embedding = embeddings.embed_query("test") | |
| _initialize_collection(client, collection_name, len(sample_embedding), force_recreate=force) | |
| _vector_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=collection_name, | |
| embedding=embeddings, | |
| ) | |
| if not files: | |
| log_pipeline(f"No supported files found in {base_dir}") | |
| return _vector_store | |
| log_pipeline(f"Found {len(files)} files to ingest from {base_dir}") | |
| total_chunks, total_docs, failed_files = _process_and_index_documents( | |
| files, _vector_store, batch_size=batch_size, max_workers=max_workers | |
| ) | |
| log_pipeline(f"Ingestion complete: {total_docs} files, {total_chunks} chunks") | |
| if failed_files > 0: | |
| log_pipeline(f"Failed files: {failed_files}") | |
| log_pipeline(f"Collection: '{collection_name}'") | |
| return _vector_store | |
| def ingest_files( | |
| file_paths: list[Path], | |
| collection_name: str | None = None, | |
| append: bool = False, | |
| batch_size: int = INGESTION_BATCH_SIZE, | |
| max_workers: int = MAX_WORKERS, | |
| ) -> int: | |
| """Ingest specific files into Qdrant with parallel parsing and batched insertion. | |
| Args: | |
| file_paths: List of file paths to ingest | |
| collection_name: Optional collection name (default from settings) | |
| append: If True, append to existing collection; otherwise recreate | |
| batch_size: Number of chunks per batch for vector store insertion | |
| max_workers: Number of parallel workers for file parsing | |
| Returns: | |
| Number of chunks ingested | |
| """ | |
| collection_name = collection_name or settings.qdrant_collection | |
| embeddings = get_embeddings() | |
| client = get_qdrant_client() | |
| sample_embedding = embeddings.embed_query("test") | |
| _initialize_collection(client, collection_name, len(sample_embedding), force_recreate=not append) | |
| vector_store = QdrantVectorStore( | |
| client=client, | |
| collection_name=collection_name, | |
| embedding=embeddings, | |
| ) | |
| total_chunks, _, _ = _process_and_index_documents( | |
| file_paths, vector_store, batch_size=batch_size, max_workers=max_workers | |
| ) | |
| log_pipeline(f"Total: {total_chunks} chunks in '{collection_name}'") | |
| return total_chunks |