Spaces:
Running
Running
| """Chunk storage for metadata lookup during retrieval. | |
| This module provides the ChunkStore class for loading and accessing | |
| chunk metadata from JSONL files. The store enables efficient lookup | |
| of chunk metadata by chunk_id, which is essential for: | |
| - Joining FAISS search results with full chunk content | |
| - Providing heading hierarchy for context | |
| - Source attribution for citations | |
| Storage Format: | |
| Chunks are stored in JSONL (JSON Lines) format, where each line | |
| is a complete JSON object representing a single chunk. This format | |
| supports: | |
| - Streaming reads for large files | |
| - Append-only updates | |
| - Easy debugging (human-readable) | |
| Lazy Loading: | |
| Chunk data is loaded on first access to avoid memory overhead | |
| until retrieval is actually needed. This is consistent with the | |
| project's lazy loading pattern for heavy dependencies. | |
| Example: | |
| ------- | |
| >>> from pathlib import Path | |
| >>> from rag_chatbot.retrieval import ChunkStore | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> chunk = store.get("ashrae55_001") | |
| >>> if chunk: | |
| ... print(chunk.text[:100]) | |
| ... print(chunk.heading_path) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| from rag_chatbot.chunking.models import Chunk | |
| # ============================================================================= | |
| # Module Exports | |
| # ============================================================================= | |
| __all__: list[str] = ["ChunkStore"] | |
| # ============================================================================= | |
| # Logger | |
| # ============================================================================= | |
| logger = logging.getLogger(__name__) | |
| class ChunkStore: | |
| """Store for loading and accessing chunks from JSONL files. | |
| This class provides efficient access to chunk metadata by chunk_id. | |
| Chunks are loaded lazily from JSONL files and indexed by their | |
| chunk_id for O(1) lookup performance. | |
| The store is designed to work with the output of the chunking pipeline, | |
| where chunks are saved in JSONL format with all metadata required for | |
| retrieval and citation. | |
| Lazy Loading Pattern: | |
| - The JSONL file is not read until the first access (get() or __len__) | |
| - This avoids loading potentially large chunk collections into memory | |
| until they are actually needed | |
| - Once loaded, chunks remain in memory for fast repeated access | |
| Attributes: | |
| ---------- | |
| path : Path | |
| Path to the JSONL file containing chunks. | |
| num_chunks : int | |
| Number of chunks in the store (read-only property). | |
| Example: | |
| ------- | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> # Chunks are loaded on first access | |
| >>> chunk = store.get("doc1_chunk_001") | |
| >>> if chunk is not None: | |
| ... print(f"Found: {chunk.text[:50]}...") | |
| ... print(f"Source: {chunk.source}, Page: {chunk.page}") | |
| Found: The PMV model predicts thermal sensation... | |
| Source: ashrae_55.pdf, Page: 5 | |
| Note: | |
| ---- | |
| The store uses the Chunk model from rag_chatbot.chunking.models | |
| for validated, type-safe chunk representation. | |
| """ | |
| def __init__(self, path: Path) -> None: | |
| """Initialize the chunk store with a JSONL file path. | |
| The file is NOT loaded during initialization to support lazy loading. | |
| The actual file reading happens on the first call to get(), __len__(), | |
| or any other method that requires chunk data. | |
| Args: | |
| ---- | |
| path: Path to the JSONL file containing chunk data. | |
| The file should contain one JSON object per line, | |
| with each object matching the Chunk model schema. | |
| Raises: | |
| ------ | |
| ValueError: If path is None or not a Path object. | |
| Example: | |
| ------- | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> # File is not read yet - fast initialization | |
| >>> len(store) # File is read on first access | |
| 150 | |
| Note: | |
| ---- | |
| The path is validated to be a Path object, but file existence | |
| is not checked until the first access. This allows the store | |
| to be constructed before the file is created (e.g., in test setup). | |
| """ | |
| # Validate path parameter | |
| if path is None: | |
| msg = "path cannot be None" | |
| raise ValueError(msg) | |
| if not isinstance(path, Path): | |
| msg = f"path must be a Path object, got {type(path).__name__}" | |
| raise TypeError(msg) | |
| # Store the path for lazy loading | |
| self._path: Path = path | |
| # Chunk storage indexed by chunk_id | |
| # None indicates chunks have not been loaded yet (lazy loading sentinel) | |
| self._chunks: dict[str, Chunk] | None = None | |
| # ------------------------------------------------------------------------- | |
| # Class Methods | |
| # ------------------------------------------------------------------------- | |
| def from_parquet(cls, path: Path) -> ChunkStore: | |
| """Create a ChunkStore by loading chunks from a Parquet file. | |
| This factory method creates a ChunkStore instance and immediately | |
| loads all chunks from the specified Parquet file. Unlike the standard | |
| constructor which uses lazy loading from JSONL, this method eagerly | |
| loads all data. | |
| The Parquet file should have columns matching the Chunk model fields: | |
| - chunk_id (string): Unique identifier | |
| - text (string): Chunk content | |
| - source (string): Source document name | |
| - page (int): Page number | |
| - heading_path (list): Heading hierarchy | |
| Args: | |
| ---- | |
| path: Path to the chunks.parquet file. | |
| Returns: | |
| ------- | |
| ChunkStore instance with all chunks loaded from the Parquet file. | |
| Raises: | |
| ------ | |
| FileNotFoundError: If the Parquet file doesn't exist. | |
| ImportError: If pyarrow is not installed. | |
| Example: | |
| ------- | |
| >>> store = ChunkStore.from_parquet(Path("data/chunks.parquet")) | |
| >>> chunk = store.get("chunk_001") | |
| """ | |
| from rag_chatbot.chunking.models import Chunk | |
| path = Path(path) | |
| if not path.exists(): | |
| msg = f"Chunk file not found: {path}" | |
| raise FileNotFoundError(msg) | |
| logger.info("Loading chunks from Parquet: %s", path) | |
| # Lazy import pyarrow | |
| import pyarrow.parquet as pq | |
| # Read the Parquet file | |
| table = pq.read_table(path) | |
| # Create instance (we'll bypass lazy loading by setting _chunks directly) | |
| instance = cls.__new__(cls) | |
| instance._path = path | |
| instance._chunks = {} | |
| # Convert to Python dicts and create Chunk models | |
| loaded_count = 0 | |
| error_count = 0 | |
| # Convert table to list of dicts for easier processing | |
| for i in range(table.num_rows): | |
| try: | |
| # Extract row data as dict | |
| row_data = { | |
| col: table.column(col)[i].as_py() | |
| for col in table.column_names | |
| } | |
| # Provide defaults for optional fields not in parquet | |
| # These fields are required by Chunk model but may be missing | |
| # from older parquet files created before these fields were added | |
| if "start_char" not in row_data: | |
| row_data["start_char"] = 0 | |
| if "end_char" not in row_data: | |
| # Use text length as a reasonable default | |
| text = row_data.get("text", "") | |
| row_data["end_char"] = len(text) if text else 1 | |
| if "token_count" not in row_data: | |
| # Estimate token count (~4 chars per token for English) | |
| text = row_data.get("text", "") | |
| row_data["token_count"] = len(text) // 4 if text else 0 | |
| # Validate and create Chunk model | |
| chunk = Chunk.model_validate(row_data) | |
| # Index by chunk_id | |
| instance._chunks[chunk.chunk_id] = chunk | |
| loaded_count += 1 | |
| except Exception as e: | |
| error_count += 1 | |
| logger.warning( | |
| "Failed to parse chunk at row %d in %s: %s", | |
| i, | |
| path, | |
| str(e), | |
| ) | |
| if error_count > 0: | |
| logger.warning( | |
| "Loaded %d chunks from Parquet %s with %d errors", | |
| loaded_count, | |
| path, | |
| error_count, | |
| ) | |
| else: | |
| logger.info( | |
| "Loaded %d chunks from Parquet %s", | |
| loaded_count, | |
| path, | |
| ) | |
| return instance | |
| # ------------------------------------------------------------------------- | |
| # Private Methods | |
| # ------------------------------------------------------------------------- | |
| def _ensure_loaded(self) -> None: | |
| """Load chunks from the JSONL file if not already loaded. | |
| This method implements the lazy loading pattern. It reads the JSONL | |
| file and parses each line into a Chunk model, indexing by chunk_id. | |
| The method is idempotent - calling it multiple times after the first | |
| load has no effect. | |
| Processing Steps: | |
| 1. Check if chunks are already loaded (skip if so) | |
| 2. Verify the file exists | |
| 3. Read each line as JSON | |
| 4. Parse into Chunk model | |
| 5. Index by chunk_id | |
| Raises: | |
| ------ | |
| FileNotFoundError: If the JSONL file does not exist. | |
| json.JSONDecodeError: If a line contains invalid JSON. | |
| pydantic.ValidationError: If a line doesn't match Chunk schema. | |
| Note: | |
| ---- | |
| Parse errors are logged at WARNING level but don't stop processing. | |
| This allows partial recovery from corrupted files while alerting | |
| to data quality issues. | |
| """ | |
| # Skip if already loaded (sentinel check) | |
| if self._chunks is not None: | |
| return | |
| # Import Chunk model lazily to avoid circular imports and follow | |
| # the project's lazy loading pattern for dependencies | |
| from rag_chatbot.chunking.models import Chunk | |
| # Check file existence before attempting to read | |
| if not self._path.exists(): | |
| msg = f"Chunk file not found: {self._path}" | |
| raise FileNotFoundError(msg) | |
| logger.info("Loading chunks from %s...", self._path) | |
| # Initialize the chunk dictionary | |
| self._chunks = {} | |
| # Track loading statistics for logging | |
| loaded_count = 0 | |
| error_count = 0 | |
| # Read and parse the JSONL file line by line | |
| # Using line-by-line reading for memory efficiency with large files | |
| with open(self._path, encoding="utf-8") as f: | |
| for line_num, line in enumerate(f, start=1): | |
| # Skip empty lines (common at end of file) | |
| stripped = line.strip() | |
| if not stripped: | |
| continue | |
| try: | |
| # Parse JSON from the line | |
| data = json.loads(stripped) | |
| # Validate and create Chunk model | |
| # Pydantic handles validation and type coercion | |
| chunk = Chunk.model_validate(data) | |
| # Index by chunk_id for O(1) lookup | |
| self._chunks[chunk.chunk_id] = chunk | |
| loaded_count += 1 | |
| except json.JSONDecodeError as e: | |
| # Log JSON parsing errors but continue processing | |
| error_count += 1 | |
| logger.warning( | |
| "Invalid JSON on line %d in %s: %s", | |
| line_num, | |
| self._path, | |
| str(e), | |
| ) | |
| except Exception as e: | |
| # Log validation errors but continue processing | |
| error_count += 1 | |
| logger.warning( | |
| "Failed to parse chunk on line %d in %s: %s", | |
| line_num, | |
| self._path, | |
| str(e), | |
| ) | |
| # Log loading summary | |
| if error_count > 0: | |
| logger.warning( | |
| "Loaded %d chunks from %s with %d errors", | |
| loaded_count, | |
| self._path, | |
| error_count, | |
| ) | |
| else: | |
| logger.info( | |
| "Loaded %d chunks from %s", | |
| loaded_count, | |
| self._path, | |
| ) | |
| # ------------------------------------------------------------------------- | |
| # Public Methods | |
| # ------------------------------------------------------------------------- | |
| def get(self, chunk_id: str) -> Chunk | None: | |
| """Retrieve a chunk by its ID. | |
| Looks up a chunk in the store by its unique identifier. If the | |
| chunks have not been loaded yet, this method triggers lazy loading. | |
| Args: | |
| ---- | |
| chunk_id: The unique identifier of the chunk to retrieve. | |
| This matches the chunk_id field in the Chunk model. | |
| Returns: | |
| ------- | |
| The Chunk object if found, or None if no chunk with that ID | |
| exists in the store. | |
| Raises: | |
| ------ | |
| FileNotFoundError: If the JSONL file doesn't exist (on first access). | |
| ValueError: If chunk_id is None or empty. | |
| Example: | |
| ------- | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> chunk = store.get("ashrae55_042") | |
| >>> if chunk: | |
| ... print(f"Text: {chunk.text[:50]}...") | |
| ... print(f"Headings: {chunk.heading_path}") | |
| Text: The PPD index represents the percentage... | |
| Headings: ['Thermal Comfort', 'PMV-PPD Model'] | |
| Note: | |
| ---- | |
| This method returns None rather than raising an exception for | |
| missing chunks, enabling graceful handling of race conditions | |
| or index inconsistencies during retrieval. | |
| """ | |
| # Validate chunk_id parameter | |
| if not chunk_id: | |
| msg = "chunk_id cannot be None or empty" | |
| raise ValueError(msg) | |
| # Ensure chunks are loaded (lazy loading) | |
| self._ensure_loaded() | |
| # Look up chunk by ID (O(1) dictionary lookup) | |
| # _chunks is guaranteed to be set after _ensure_loaded() | |
| return self._chunks.get(chunk_id) # type: ignore[union-attr] | |
| def get_all_chunks(self) -> list[Chunk]: | |
| """Get all chunks in the store. | |
| Returns a list of all chunks currently loaded in the store. | |
| If chunks have not been loaded yet, this triggers lazy loading. | |
| Returns: | |
| ------- | |
| List of all Chunk objects in the store. The order is not | |
| guaranteed (dictionary iteration order). | |
| Raises: | |
| ------ | |
| FileNotFoundError: If the JSONL file doesn't exist (on first access). | |
| Example: | |
| ------- | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> all_chunks = store.get_all_chunks() | |
| >>> print(f"Total chunks: {len(all_chunks)}") | |
| Total chunks: 150 | |
| Note: | |
| ---- | |
| This method returns a new list each time, so modifications | |
| to the returned list do not affect the store. | |
| """ | |
| # Ensure chunks are loaded (lazy loading) | |
| self._ensure_loaded() | |
| # Return a list of all chunk values | |
| # _chunks is guaranteed to be set after _ensure_loaded() | |
| return list(self._chunks.values()) # type: ignore[union-attr] | |
| def get_chunk_ids(self) -> list[str]: | |
| """Get all chunk IDs in the store. | |
| Returns a list of all chunk IDs currently in the store. | |
| Useful for iteration or validation purposes. | |
| Returns: | |
| ------- | |
| List of all chunk_id strings in the store. | |
| Raises: | |
| ------ | |
| FileNotFoundError: If the JSONL file doesn't exist (on first access). | |
| Example: | |
| ------- | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> ids = store.get_chunk_ids() | |
| >>> print(f"First 3 IDs: {ids[:3]}") | |
| First 3 IDs: ['doc1_001', 'doc1_002', 'doc1_003'] | |
| """ | |
| # Ensure chunks are loaded (lazy loading) | |
| self._ensure_loaded() | |
| # Return list of chunk IDs | |
| # _chunks is guaranteed to be set after _ensure_loaded() | |
| return list(self._chunks.keys()) # type: ignore[union-attr] | |
| def __len__(self) -> int: | |
| """Get the number of chunks in the store. | |
| Returns the total count of chunks. If chunks have not been | |
| loaded yet, this triggers lazy loading. | |
| Returns: | |
| ------- | |
| Number of chunks in the store. | |
| Raises: | |
| ------ | |
| FileNotFoundError: If the JSONL file doesn't exist (on first access). | |
| Example: | |
| ------- | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> print(f"Store contains {len(store)} chunks") | |
| Store contains 150 chunks | |
| """ | |
| # Ensure chunks are loaded (lazy loading) | |
| self._ensure_loaded() | |
| # Return chunk count | |
| # _chunks is guaranteed to be set after _ensure_loaded() | |
| return len(self._chunks) # type: ignore[arg-type] | |
| def __contains__(self, chunk_id: str) -> bool: | |
| """Check if a chunk ID exists in the store. | |
| Enables the 'in' operator for membership testing. | |
| Args: | |
| ---- | |
| chunk_id: The chunk ID to check for. | |
| Returns: | |
| ------- | |
| True if the chunk ID exists in the store, False otherwise. | |
| Raises: | |
| ------ | |
| FileNotFoundError: If the JSONL file doesn't exist (on first access). | |
| Example: | |
| ------- | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> if "ashrae55_001" in store: | |
| ... print("Chunk exists!") | |
| Chunk exists! | |
| """ | |
| # Ensure chunks are loaded (lazy loading) | |
| self._ensure_loaded() | |
| # Check membership | |
| # _chunks is guaranteed to be set after _ensure_loaded() | |
| return chunk_id in self._chunks # type: ignore[operator] | |
| # ------------------------------------------------------------------------- | |
| # Properties | |
| # ------------------------------------------------------------------------- | |
| def path(self) -> Path: | |
| """Get the path to the JSONL file. | |
| Returns | |
| ------- | |
| Path to the chunk storage file. | |
| """ | |
| return self._path | |
| def num_chunks(self) -> int: | |
| """Get the number of chunks in the store. | |
| This is an alias for __len__() provided for API clarity. | |
| Returns | |
| ------- | |
| Number of chunks in the store. | |
| """ | |
| return len(self) | |
| def is_loaded(self) -> bool: | |
| """Check if chunks have been loaded. | |
| Returns True if chunks have been loaded from the file, | |
| False if lazy loading has not yet occurred. | |
| Returns: | |
| ------- | |
| True if chunks are loaded, False otherwise. | |
| Example: | |
| ------- | |
| >>> store = ChunkStore(Path("data/chunks/chunks.jsonl")) | |
| >>> store.is_loaded | |
| False | |
| >>> _ = len(store) # Triggers loading | |
| >>> store.is_loaded | |
| True | |
| """ | |
| return self._chunks is not None | |