| """Load PDFs, split into chunks with metadata, and index into Qdrant.""" |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import uuid |
| from collections import defaultdict |
| from pathlib import Path |
| from typing import Protocol |
|
|
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain_core.documents import Document |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from loguru import logger |
|
|
| from src.config import settings |
| from src.schemas import ChunkMetadata |
| from src.store import ensure_collection, get_vector_store |
|
|
|
|
| class Chunker(Protocol): |
| def split_documents(self, documents: list[Document]) -> list[Document]: |
| """Split page-level documents into chunk-level documents.""" |
|
|
|
|
| def _splitter( |
| chunk_size: int | None = None, chunk_overlap: int | None = None |
| ) -> RecursiveCharacterTextSplitter: |
| size = chunk_size or settings.chunk_size |
| overlap = chunk_overlap or settings.chunk_overlap |
|
|
| return RecursiveCharacterTextSplitter( |
| chunk_size=size, |
| chunk_overlap=overlap, |
| separators=["\n\n", "\n", ". ", " ", ""], |
| keep_separator=False, |
| ) |
|
|
|
|
| def _document_id(path: Path) -> str: |
| raw = f"{path.name}:{path.stat().st_size}" |
| return hashlib.sha1(raw.encode("utf-8")).hexdigest()[:16] |
|
|
|
|
| def _chunk_id(doc_id: str, page: int, index: int) -> str: |
| return f"{doc_id}:{page}:{index}" |
|
|
|
|
| def _load_pdf(path: Path) -> list[Document]: |
| loader = PyPDFLoader(str(path)) |
| pages = loader.load() |
| doc_id = _document_id(path) |
| for doc in pages: |
| page_number = int(doc.metadata.get("page", 0)) + 1 |
| doc.metadata = { |
| "document_id": doc_id, |
| "filename": path.name, |
| "source": str(path.resolve()), |
| "page": page_number, |
| "section": doc.metadata.get("section"), |
| } |
| return pages |
|
|
|
|
| def discover_pdfs(data_dir: Path | None = None) -> list[Path]: |
| directory = data_dir or settings.data_dir |
| if not directory.exists(): |
| return [] |
| return sorted(p for p in directory.iterdir() if p.is_file() and p.suffix.lower() == ".pdf") |
|
|
|
|
| def build_chunks( |
| pdf_paths: list[Path], |
| chunk_size: int | None = None, |
| chunk_overlap: int | None = None, |
| chunker: Chunker | None = None, |
| ) -> list[Document]: |
| page_docs: list[Document] = [] |
| for path in pdf_paths: |
| logger.info("Loading PDF: {}", path.name) |
| page_docs.extend(_load_pdf(path)) |
|
|
| if chunker is None: |
| chunks = _splitter(chunk_size, chunk_overlap).split_documents(page_docs) |
| else: |
| chunks = chunker.split_documents(page_docs) |
|
|
| per_doc_counter: dict[str, int] = defaultdict(int) |
| for chunk in chunks: |
| doc_id = chunk.metadata["document_id"] |
| idx = per_doc_counter[doc_id] |
| per_doc_counter[doc_id] += 1 |
| meta = ChunkMetadata( |
| document_id=doc_id, |
| filename=chunk.metadata["filename"], |
| source=chunk.metadata["source"], |
| page=chunk.metadata["page"], |
| chunk_id=_chunk_id(doc_id, chunk.metadata["page"], idx), |
| section=chunk.metadata.get("section"), |
| ) |
| chunk.metadata = meta.model_dump() |
| return chunks |
|
|
|
|
| def index_chunks(chunks: list[Document], collection_name: str | None = None) -> int: |
| """Compute deterministic UUIDs and add chunks to the vector store. |
| |
| Re-ingesting the same content upserts instead of creating duplicates. |
| """ |
| if not chunks: |
| return 0 |
| ids = [str(uuid.uuid5(uuid.NAMESPACE_DNS, c.metadata["chunk_id"])) for c in chunks] |
| get_vector_store(collection_name=collection_name).add_documents(chunks, ids=ids) |
| return len(chunks) |
|
|
|
|
| def ingest( |
| recreate: bool = False, |
| collection_name: str | None = None, |
| chunker: Chunker | None = None, |
| chunk_size: int | None = None, |
| chunk_overlap: int | None = None, |
| ) -> int: |
| pdfs = discover_pdfs() |
| if not pdfs: |
| logger.warning("No PDF files found in {}", settings.data_dir) |
| return 0 |
|
|
| ensure_collection(recreate=recreate, collection_name=collection_name) |
| chunks = build_chunks( |
| pdfs, |
| chunker=chunker, |
| chunk_size=chunk_size, |
| chunk_overlap=chunk_overlap, |
| ) |
|
|
| if not chunks: |
| logger.warning("No chunks produced from {} PDF(s)", len(pdfs)) |
| return 0 |
|
|
| count = index_chunks(chunks, collection_name=collection_name) |
| logger.info("Ingested {} chunks from {} PDF(s)", count, len(pdfs)) |
| return count |
|
|
|
|
| def save_and_ingest_pdf(file_bytes: bytes, filename: str) -> dict[str, object]: |
| """Save an uploaded PDF to `data_dir` and ingest it into Qdrant. |
| |
| Args: file_bytes, filename. Returns: {"filename", "chunks_indexed"}. Raises: ValueError. |
| """ |
| if not filename: |
| raise ValueError("Filename is required.") |
| if not filename.lower().endswith(".pdf"): |
| raise ValueError("Only PDF files are accepted.") |
| if not file_bytes: |
| raise ValueError("Uploaded file is empty.") |
|
|
| safe_name = Path(filename).name |
| settings.data_dir.mkdir(parents=True, exist_ok=True) |
| dest = settings.data_dir / safe_name |
| dest.write_bytes(file_bytes) |
| logger.info("Saved uploaded PDF: {}", dest) |
|
|
| ensure_collection(recreate=False) |
| chunks = build_chunks([dest]) |
| if not chunks: |
| logger.warning("No chunks produced for uploaded file {}", safe_name) |
| return {"filename": safe_name, "chunks_indexed": 0} |
|
|
| count = index_chunks(chunks) |
| logger.info("Indexed {} chunks from {}", count, safe_name) |
| return {"filename": safe_name, "chunks_indexed": count} |
|
|
|
|