sadickam's picture
Fix chunk loading and huggingface-hub version issues
c75d770
"""Chunk storage for metadata lookup during retrieval.
This module provides the ChunkStore class for loading and accessing
chunk metadata from JSONL files. The store enables efficient lookup
of chunk metadata by chunk_id, which is essential for:
- Joining FAISS search results with full chunk content
- Providing heading hierarchy for context
- Source attribution for citations
Storage Format:
Chunks are stored in JSONL (JSON Lines) format, where each line
is a complete JSON object representing a single chunk. This format
supports:
- Streaming reads for large files
- Append-only updates
- Easy debugging (human-readable)
Lazy Loading:
Chunk data is loaded on first access to avoid memory overhead
until retrieval is actually needed. This is consistent with the
project's lazy loading pattern for heavy dependencies.
Example:
-------
>>> from pathlib import Path
>>> from rag_chatbot.retrieval import ChunkStore
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> chunk = store.get("ashrae55_001")
>>> if chunk:
... print(chunk.text[:100])
... print(chunk.heading_path)
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from rag_chatbot.chunking.models import Chunk
# =============================================================================
# Module Exports
# =============================================================================
__all__: list[str] = ["ChunkStore"]
# =============================================================================
# Logger
# =============================================================================
logger = logging.getLogger(__name__)
class ChunkStore:
"""Store for loading and accessing chunks from JSONL files.
This class provides efficient access to chunk metadata by chunk_id.
Chunks are loaded lazily from JSONL files and indexed by their
chunk_id for O(1) lookup performance.
The store is designed to work with the output of the chunking pipeline,
where chunks are saved in JSONL format with all metadata required for
retrieval and citation.
Lazy Loading Pattern:
- The JSONL file is not read until the first access (get() or __len__)
- This avoids loading potentially large chunk collections into memory
until they are actually needed
- Once loaded, chunks remain in memory for fast repeated access
Attributes:
----------
path : Path
Path to the JSONL file containing chunks.
num_chunks : int
Number of chunks in the store (read-only property).
Example:
-------
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> # Chunks are loaded on first access
>>> chunk = store.get("doc1_chunk_001")
>>> if chunk is not None:
... print(f"Found: {chunk.text[:50]}...")
... print(f"Source: {chunk.source}, Page: {chunk.page}")
Found: The PMV model predicts thermal sensation...
Source: ashrae_55.pdf, Page: 5
Note:
----
The store uses the Chunk model from rag_chatbot.chunking.models
for validated, type-safe chunk representation.
"""
def __init__(self, path: Path) -> None:
"""Initialize the chunk store with a JSONL file path.
The file is NOT loaded during initialization to support lazy loading.
The actual file reading happens on the first call to get(), __len__(),
or any other method that requires chunk data.
Args:
----
path: Path to the JSONL file containing chunk data.
The file should contain one JSON object per line,
with each object matching the Chunk model schema.
Raises:
------
ValueError: If path is None or not a Path object.
Example:
-------
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> # File is not read yet - fast initialization
>>> len(store) # File is read on first access
150
Note:
----
The path is validated to be a Path object, but file existence
is not checked until the first access. This allows the store
to be constructed before the file is created (e.g., in test setup).
"""
# Validate path parameter
if path is None:
msg = "path cannot be None"
raise ValueError(msg)
if not isinstance(path, Path):
msg = f"path must be a Path object, got {type(path).__name__}"
raise TypeError(msg)
# Store the path for lazy loading
self._path: Path = path
# Chunk storage indexed by chunk_id
# None indicates chunks have not been loaded yet (lazy loading sentinel)
self._chunks: dict[str, Chunk] | None = None
# -------------------------------------------------------------------------
# Class Methods
# -------------------------------------------------------------------------
@classmethod
def from_parquet(cls, path: Path) -> ChunkStore:
"""Create a ChunkStore by loading chunks from a Parquet file.
This factory method creates a ChunkStore instance and immediately
loads all chunks from the specified Parquet file. Unlike the standard
constructor which uses lazy loading from JSONL, this method eagerly
loads all data.
The Parquet file should have columns matching the Chunk model fields:
- chunk_id (string): Unique identifier
- text (string): Chunk content
- source (string): Source document name
- page (int): Page number
- heading_path (list): Heading hierarchy
Args:
----
path: Path to the chunks.parquet file.
Returns:
-------
ChunkStore instance with all chunks loaded from the Parquet file.
Raises:
------
FileNotFoundError: If the Parquet file doesn't exist.
ImportError: If pyarrow is not installed.
Example:
-------
>>> store = ChunkStore.from_parquet(Path("data/chunks.parquet"))
>>> chunk = store.get("chunk_001")
"""
from rag_chatbot.chunking.models import Chunk
path = Path(path)
if not path.exists():
msg = f"Chunk file not found: {path}"
raise FileNotFoundError(msg)
logger.info("Loading chunks from Parquet: %s", path)
# Lazy import pyarrow
import pyarrow.parquet as pq
# Read the Parquet file
table = pq.read_table(path)
# Create instance (we'll bypass lazy loading by setting _chunks directly)
instance = cls.__new__(cls)
instance._path = path
instance._chunks = {}
# Convert to Python dicts and create Chunk models
loaded_count = 0
error_count = 0
# Convert table to list of dicts for easier processing
for i in range(table.num_rows):
try:
# Extract row data as dict
row_data = {
col: table.column(col)[i].as_py()
for col in table.column_names
}
# Provide defaults for optional fields not in parquet
# These fields are required by Chunk model but may be missing
# from older parquet files created before these fields were added
if "start_char" not in row_data:
row_data["start_char"] = 0
if "end_char" not in row_data:
# Use text length as a reasonable default
text = row_data.get("text", "")
row_data["end_char"] = len(text) if text else 1
if "token_count" not in row_data:
# Estimate token count (~4 chars per token for English)
text = row_data.get("text", "")
row_data["token_count"] = len(text) // 4 if text else 0
# Validate and create Chunk model
chunk = Chunk.model_validate(row_data)
# Index by chunk_id
instance._chunks[chunk.chunk_id] = chunk
loaded_count += 1
except Exception as e:
error_count += 1
logger.warning(
"Failed to parse chunk at row %d in %s: %s",
i,
path,
str(e),
)
if error_count > 0:
logger.warning(
"Loaded %d chunks from Parquet %s with %d errors",
loaded_count,
path,
error_count,
)
else:
logger.info(
"Loaded %d chunks from Parquet %s",
loaded_count,
path,
)
return instance
# -------------------------------------------------------------------------
# Private Methods
# -------------------------------------------------------------------------
def _ensure_loaded(self) -> None:
"""Load chunks from the JSONL file if not already loaded.
This method implements the lazy loading pattern. It reads the JSONL
file and parses each line into a Chunk model, indexing by chunk_id.
The method is idempotent - calling it multiple times after the first
load has no effect.
Processing Steps:
1. Check if chunks are already loaded (skip if so)
2. Verify the file exists
3. Read each line as JSON
4. Parse into Chunk model
5. Index by chunk_id
Raises:
------
FileNotFoundError: If the JSONL file does not exist.
json.JSONDecodeError: If a line contains invalid JSON.
pydantic.ValidationError: If a line doesn't match Chunk schema.
Note:
----
Parse errors are logged at WARNING level but don't stop processing.
This allows partial recovery from corrupted files while alerting
to data quality issues.
"""
# Skip if already loaded (sentinel check)
if self._chunks is not None:
return
# Import Chunk model lazily to avoid circular imports and follow
# the project's lazy loading pattern for dependencies
from rag_chatbot.chunking.models import Chunk
# Check file existence before attempting to read
if not self._path.exists():
msg = f"Chunk file not found: {self._path}"
raise FileNotFoundError(msg)
logger.info("Loading chunks from %s...", self._path)
# Initialize the chunk dictionary
self._chunks = {}
# Track loading statistics for logging
loaded_count = 0
error_count = 0
# Read and parse the JSONL file line by line
# Using line-by-line reading for memory efficiency with large files
with open(self._path, encoding="utf-8") as f:
for line_num, line in enumerate(f, start=1):
# Skip empty lines (common at end of file)
stripped = line.strip()
if not stripped:
continue
try:
# Parse JSON from the line
data = json.loads(stripped)
# Validate and create Chunk model
# Pydantic handles validation and type coercion
chunk = Chunk.model_validate(data)
# Index by chunk_id for O(1) lookup
self._chunks[chunk.chunk_id] = chunk
loaded_count += 1
except json.JSONDecodeError as e:
# Log JSON parsing errors but continue processing
error_count += 1
logger.warning(
"Invalid JSON on line %d in %s: %s",
line_num,
self._path,
str(e),
)
except Exception as e:
# Log validation errors but continue processing
error_count += 1
logger.warning(
"Failed to parse chunk on line %d in %s: %s",
line_num,
self._path,
str(e),
)
# Log loading summary
if error_count > 0:
logger.warning(
"Loaded %d chunks from %s with %d errors",
loaded_count,
self._path,
error_count,
)
else:
logger.info(
"Loaded %d chunks from %s",
loaded_count,
self._path,
)
# -------------------------------------------------------------------------
# Public Methods
# -------------------------------------------------------------------------
def get(self, chunk_id: str) -> Chunk | None:
"""Retrieve a chunk by its ID.
Looks up a chunk in the store by its unique identifier. If the
chunks have not been loaded yet, this method triggers lazy loading.
Args:
----
chunk_id: The unique identifier of the chunk to retrieve.
This matches the chunk_id field in the Chunk model.
Returns:
-------
The Chunk object if found, or None if no chunk with that ID
exists in the store.
Raises:
------
FileNotFoundError: If the JSONL file doesn't exist (on first access).
ValueError: If chunk_id is None or empty.
Example:
-------
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> chunk = store.get("ashrae55_042")
>>> if chunk:
... print(f"Text: {chunk.text[:50]}...")
... print(f"Headings: {chunk.heading_path}")
Text: The PPD index represents the percentage...
Headings: ['Thermal Comfort', 'PMV-PPD Model']
Note:
----
This method returns None rather than raising an exception for
missing chunks, enabling graceful handling of race conditions
or index inconsistencies during retrieval.
"""
# Validate chunk_id parameter
if not chunk_id:
msg = "chunk_id cannot be None or empty"
raise ValueError(msg)
# Ensure chunks are loaded (lazy loading)
self._ensure_loaded()
# Look up chunk by ID (O(1) dictionary lookup)
# _chunks is guaranteed to be set after _ensure_loaded()
return self._chunks.get(chunk_id) # type: ignore[union-attr]
def get_all_chunks(self) -> list[Chunk]:
"""Get all chunks in the store.
Returns a list of all chunks currently loaded in the store.
If chunks have not been loaded yet, this triggers lazy loading.
Returns:
-------
List of all Chunk objects in the store. The order is not
guaranteed (dictionary iteration order).
Raises:
------
FileNotFoundError: If the JSONL file doesn't exist (on first access).
Example:
-------
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> all_chunks = store.get_all_chunks()
>>> print(f"Total chunks: {len(all_chunks)}")
Total chunks: 150
Note:
----
This method returns a new list each time, so modifications
to the returned list do not affect the store.
"""
# Ensure chunks are loaded (lazy loading)
self._ensure_loaded()
# Return a list of all chunk values
# _chunks is guaranteed to be set after _ensure_loaded()
return list(self._chunks.values()) # type: ignore[union-attr]
def get_chunk_ids(self) -> list[str]:
"""Get all chunk IDs in the store.
Returns a list of all chunk IDs currently in the store.
Useful for iteration or validation purposes.
Returns:
-------
List of all chunk_id strings in the store.
Raises:
------
FileNotFoundError: If the JSONL file doesn't exist (on first access).
Example:
-------
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> ids = store.get_chunk_ids()
>>> print(f"First 3 IDs: {ids[:3]}")
First 3 IDs: ['doc1_001', 'doc1_002', 'doc1_003']
"""
# Ensure chunks are loaded (lazy loading)
self._ensure_loaded()
# Return list of chunk IDs
# _chunks is guaranteed to be set after _ensure_loaded()
return list(self._chunks.keys()) # type: ignore[union-attr]
def __len__(self) -> int:
"""Get the number of chunks in the store.
Returns the total count of chunks. If chunks have not been
loaded yet, this triggers lazy loading.
Returns:
-------
Number of chunks in the store.
Raises:
------
FileNotFoundError: If the JSONL file doesn't exist (on first access).
Example:
-------
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> print(f"Store contains {len(store)} chunks")
Store contains 150 chunks
"""
# Ensure chunks are loaded (lazy loading)
self._ensure_loaded()
# Return chunk count
# _chunks is guaranteed to be set after _ensure_loaded()
return len(self._chunks) # type: ignore[arg-type]
def __contains__(self, chunk_id: str) -> bool:
"""Check if a chunk ID exists in the store.
Enables the 'in' operator for membership testing.
Args:
----
chunk_id: The chunk ID to check for.
Returns:
-------
True if the chunk ID exists in the store, False otherwise.
Raises:
------
FileNotFoundError: If the JSONL file doesn't exist (on first access).
Example:
-------
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> if "ashrae55_001" in store:
... print("Chunk exists!")
Chunk exists!
"""
# Ensure chunks are loaded (lazy loading)
self._ensure_loaded()
# Check membership
# _chunks is guaranteed to be set after _ensure_loaded()
return chunk_id in self._chunks # type: ignore[operator]
# -------------------------------------------------------------------------
# Properties
# -------------------------------------------------------------------------
@property
def path(self) -> Path:
"""Get the path to the JSONL file.
Returns
-------
Path to the chunk storage file.
"""
return self._path
@property
def num_chunks(self) -> int:
"""Get the number of chunks in the store.
This is an alias for __len__() provided for API clarity.
Returns
-------
Number of chunks in the store.
"""
return len(self)
@property
def is_loaded(self) -> bool:
"""Check if chunks have been loaded.
Returns True if chunks have been loaded from the file,
False if lazy loading has not yet occurred.
Returns:
-------
True if chunks are loaded, False otherwise.
Example:
-------
>>> store = ChunkStore(Path("data/chunks/chunks.jsonl"))
>>> store.is_loaded
False
>>> _ = len(store) # Triggers loading
>>> store.is_loaded
True
"""
return self._chunks is not None