sadickam's picture
Handle legacy manifest format gracefully
fa90fb0
"""Dataset freshness validator for server startup.
This module validates that downloaded RAG artifacts are consistent
and compatible with the server's expected schema version. It provides:
- Loading and parsing of source_manifest.json from artifact cache
- Schema version validation against EXPECTED_SCHEMA_VERSION
- Comprehensive startup logging of dataset metadata
- Fast-fail behavior with clear error messages
The FreshnessValidator is designed to be called during server startup
before the RAG pipeline is initialized. If validation fails, the server
should refuse to start, preventing undefined behavior from incompatible
or corrupted artifacts.
Validation Checks:
1. Manifest file existence - source_manifest.json must be present
2. Manifest parsing - JSON must be valid and match expected structure
3. Schema version - Must match EXPECTED_SCHEMA_VERSION exactly
4. Index version consistency - Logged for debugging
Startup Logging:
The validator logs extensively at INFO level to provide visibility
into the dataset state during server startup. This includes:
- Dataset URL (HuggingFace repository)
- Index version
- Schema version
- Number of source files
- Total source file size
- Manifest creation timestamp
Lazy Loading:
Heavy dependencies (Pydantic, JSON parsing) are loaded inside
methods to avoid import overhead at module load time. This follows
the project's lazy loading convention.
Example:
-------
>>> from pathlib import Path
>>> from rag_chatbot.api.freshness import FreshnessValidator
>>> from rag_chatbot.config.settings import Settings
>>>
>>> settings = Settings()
>>> cache_path = Path(settings.artifact_cache_path)
>>> validator = FreshnessValidator(cache_path, settings)
>>>
>>> try:
... manifest = validator.validate()
... print(f"Validation passed! Index version: {manifest.index_version}")
... except FreshnessValidationError as e:
... print(f"Validation failed: {e}")
... raise SystemExit(1)
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Final
if TYPE_CHECKING:
from rag_chatbot.api.manifest import SourceManifest
from rag_chatbot.config.settings import Settings
# =============================================================================
# Module Exports
# =============================================================================
__all__: list[str] = [
"FreshnessValidator",
"FreshnessValidationError",
]
# =============================================================================
# Logger
# =============================================================================
logger = logging.getLogger(__name__)
# =============================================================================
# Constants
# =============================================================================
SOURCE_MANIFEST_JSON: Final[str] = "source_manifest.json"
"""Filename for the source manifest JSON file in the artifact cache."""
INDEX_VERSION_TXT: Final[str] = "index_version.txt"
"""Filename for the index version text file in the artifact cache."""
# =============================================================================
# Exceptions
# =============================================================================
class FreshnessValidationError(Exception):
"""Exception raised when dataset freshness validation fails.
This exception is raised when the FreshnessValidator detects an
issue with the downloaded artifacts that prevents safe operation.
The error includes detailed information to help diagnose the issue.
Common failure scenarios:
- Missing source_manifest.json file
- Invalid JSON in manifest file
- Schema version mismatch
- Missing required fields in manifest
Attributes:
----------
message : str
Human-readable description of the validation failure.
expected_version : str | None
The schema version expected by the server (if applicable).
actual_version : str | None
The schema version found in the manifest (if applicable).
manifest_path : Path | None
Path to the manifest file that failed validation (if applicable).
Example:
-------
>>> try:
... validator.validate()
... except FreshnessValidationError as e:
... print(f"Validation failed: {e.message}")
... if e.expected_version:
... print(f"Expected: {e.expected_version}")
... print(f"Actual: {e.actual_version}")
... if e.manifest_path:
... print(f"Manifest: {e.manifest_path}")
"""
def __init__(
self,
message: str,
*,
expected_version: str | None = None,
actual_version: str | None = None,
manifest_path: Path | None = None,
) -> None:
"""Initialize a FreshnessValidationError.
Args:
----
message: Human-readable description of the validation failure.
expected_version: Schema version expected by server (optional).
actual_version: Schema version found in manifest (optional).
manifest_path: Path to the manifest file (optional).
"""
super().__init__(message)
self.message = message
self.expected_version = expected_version
self.actual_version = actual_version
self.manifest_path = manifest_path
def __str__(self) -> str:
"""Return a string representation of the error.
Returns
-------
Formatted error message with version and path info if available.
"""
parts = [self.message]
# Add version mismatch details if present
if self.expected_version is not None and self.actual_version is not None:
parts.append(
f"(expected schema: {self.expected_version}, "
f"actual schema: {self.actual_version})"
)
# Add manifest path if present
if self.manifest_path:
parts.append(f"[manifest: {self.manifest_path}]")
return " ".join(parts)
# =============================================================================
# FreshnessValidator Class
# =============================================================================
class FreshnessValidator:
"""Validates dataset freshness and compatibility on server startup.
The FreshnessValidator checks that downloaded RAG artifacts are
compatible with the current server code by validating:
- Presence of source_manifest.json
- Valid JSON structure matching the SourceManifest model
- Schema version matches EXPECTED_SCHEMA_VERSION
If validation passes, the validator logs detailed metadata about
the dataset for debugging and monitoring purposes. If validation
fails, it raises FreshnessValidationError with actionable details.
This class is designed to be used during server startup, before
initializing the RAG pipeline. The validate() method should be
called after artifacts are downloaded but before they are loaded.
Attributes:
----------
cache_path : Path
Path to the artifact cache directory.
settings : Settings | None
Application settings (optional, used for logging HF repo URL).
Example:
-------
>>> from pathlib import Path
>>> validator = FreshnessValidator(Path("/app/.cache/artifacts"))
>>> manifest = validator.validate() # Raises on failure
>>> print(f"Index version: {manifest.index_version}")
Index version: 2024.01.15.001
Note:
----
The validator uses lazy imports for Pydantic models to avoid
loading them at module import time.
"""
def __init__(
self,
cache_path: Path,
settings: Settings | None = None,
) -> None:
"""Initialize the FreshnessValidator.
Creates a new validator instance configured to check artifacts
in the specified cache directory.
Args:
----
cache_path: Path to the artifact cache directory where
source_manifest.json and other artifacts are stored.
settings: Optional application settings. If provided, used
to include HF repository URL in log messages.
Example:
-------
>>> from pathlib import Path
>>> validator = FreshnessValidator(Path("/app/.cache/artifacts"))
"""
self._cache_path = cache_path
self._settings = settings
logger.debug(
"Initialized FreshnessValidator: cache_path=%s",
cache_path,
)
@property
def cache_path(self) -> Path:
"""Get the artifact cache directory path.
Returns
-------
Path to the artifact cache directory.
"""
return self._cache_path
@property
def settings(self) -> Settings | None:
"""Get the application settings.
Returns
-------
The Settings instance, or None if not provided.
"""
return self._settings
@property
def manifest_path(self) -> Path:
"""Get the path to the source manifest file.
Returns
-------
Full path to source_manifest.json in the cache directory.
"""
return self._cache_path / SOURCE_MANIFEST_JSON
@property
def index_version_path(self) -> Path:
"""Get the path to the index version file.
Returns
-------
Full path to index_version.txt in the cache directory.
"""
return self._cache_path / INDEX_VERSION_TXT
def validate(self) -> SourceManifest | None:
"""Validate the downloaded artifacts and return the manifest.
This is the main entry point for validation. It performs all
validation checks and logs dataset metadata on success. If any
check fails, it raises FreshnessValidationError with details.
Validation Steps:
1. Load and parse source_manifest.json
2. Validate schema version matches EXPECTED_SCHEMA_VERSION
3. Log dataset metadata for visibility
Returns:
-------
SourceManifest | None: The validated manifest object containing
all metadata about the dataset, or None if the manifest
uses a legacy format that cannot be fully validated.
Raises:
------
FreshnessValidationError: If any validation check fails.
The exception includes details about the failure.
Example:
-------
>>> try:
... manifest = validator.validate()
... if manifest:
... print(f"Validated! Version: {manifest.index_version}")
... else:
... print("Legacy dataset - validation skipped")
... except FreshnessValidationError as e:
... logger.error("Validation failed: %s", e)
... raise SystemExit(1)
"""
logger.info("Starting dataset freshness validation...")
# Step 1: Load and parse the manifest file
manifest = self._load_manifest()
# If manifest is None, we have a legacy dataset - skip validation
if manifest is None:
logger.info(
"Dataset freshness validation SKIPPED (legacy manifest format). "
"The dataset will be loaded without manifest validation."
)
return None
# Step 2: Validate schema version compatibility
self._validate_schema_version(manifest)
# Step 3: Log dataset metadata for visibility
self._log_dataset_metadata(manifest)
logger.info("Dataset freshness validation PASSED")
return manifest
def _load_manifest(self) -> SourceManifest | None:
"""Load and parse the source manifest from the cache.
Reads source_manifest.json from the artifact cache directory
and parses it into a SourceManifest Pydantic model. If the
file is missing or contains invalid JSON/structure, raises
FreshnessValidationError with details.
Returns
-------
SourceManifest | None: Parsed manifest object, or None if the
manifest uses a legacy format that cannot be parsed.
Raises
------
FreshnessValidationError: If manifest file is missing,
contains invalid JSON, or doesn't match the expected
model structure.
"""
# Lazy import to avoid loading Pydantic at module import time
from rag_chatbot.api.manifest import SourceManifest
manifest_path = self.manifest_path
# Check if manifest file exists
if not manifest_path.exists():
# No manifest file - this is OK for legacy datasets
logger.warning(
"Source manifest file not found: %s. "
"This may indicate a legacy dataset. Skipping manifest validation.",
manifest_path,
)
return None
logger.debug("Loading manifest from: %s", manifest_path)
# Read and parse JSON
try:
manifest_text = manifest_path.read_text(encoding="utf-8")
manifest_data = json.loads(manifest_text)
except json.JSONDecodeError as e:
msg = (
f"Invalid JSON in source manifest file: {e}. "
"The manifest file may be corrupted. "
"Try re-downloading artifacts or rebuilding the dataset."
)
raise FreshnessValidationError(msg, manifest_path=manifest_path) from e
except OSError as e:
msg = (
f"Failed to read source manifest file: {e}. "
"Check file permissions and disk space."
)
raise FreshnessValidationError(msg, manifest_path=manifest_path) from e
# Check if this is a legacy manifest format (missing schema_version)
if "schema_version" not in manifest_data:
logger.warning(
"Legacy manifest format detected (missing schema_version). "
"Skipping manifest validation. Consider rebuilding the dataset "
"with the current build pipeline to enable full validation."
)
# Log what we can from the legacy format
if "created_at" in manifest_data:
logger.info("Legacy manifest created_at: %s", manifest_data["created_at"])
if "total_chunks" in manifest_data:
logger.info("Legacy manifest total_chunks: %s", manifest_data["total_chunks"])
return None
# Parse into Pydantic model
try:
manifest = SourceManifest.model_validate(manifest_data)
except Exception as e:
# Catch Pydantic ValidationError - wrap with actionable error message
msg = (
f"Failed to parse source manifest: {e}. "
"The manifest structure may be incompatible with this server version. "
"Check if the dataset was built with a compatible version "
"of the build pipeline."
)
raise FreshnessValidationError(msg, manifest_path=manifest_path) from e
logger.debug(
"Loaded manifest: schema_version=%s, index_version=%s, files=%d",
manifest.schema_version,
manifest.index_version,
manifest.source_file_count,
)
return manifest
def _validate_schema_version(self, manifest: SourceManifest) -> None:
"""Validate that the manifest schema version is compatible.
Compares the manifest's schema_version with EXPECTED_SCHEMA_VERSION.
If they don't match exactly, raises FreshnessValidationError.
Currently uses strict equality matching. Future versions may
implement semantic version comparison for backward compatibility.
Args:
----
manifest: The loaded SourceManifest to validate.
Raises:
------
FreshnessValidationError: If schema versions don't match.
"""
# Lazy import to avoid loading at module import time
from rag_chatbot.api.manifest import EXPECTED_SCHEMA_VERSION
actual_version = manifest.schema_version
expected_version = EXPECTED_SCHEMA_VERSION
if actual_version != expected_version:
msg = (
f"Schema version mismatch: expected '{expected_version}', "
f"found '{actual_version}'. "
"The dataset was built with an incompatible version of the "
"build pipeline. Please rebuild the dataset with the current "
"version of the codebase, or update the server to a compatible version."
)
raise FreshnessValidationError(
msg,
expected_version=expected_version,
actual_version=actual_version,
manifest_path=self.manifest_path,
)
logger.debug(
"Schema version validated: %s (matches expected)",
actual_version,
)
def _log_dataset_metadata(self, manifest: SourceManifest) -> None:
"""Log detailed metadata about the validated dataset.
This method logs comprehensive information about the dataset
at INFO level for visibility during server startup. The logs
help with debugging, monitoring, and auditing.
Logged Information:
- Dataset URL (HuggingFace repository, if settings provided)
- Index version
- Schema version
- Manifest creation timestamp
- Number of source files
- Total source file size
Args:
----
manifest: The validated SourceManifest to log details from.
"""
# Size thresholds for human-readable formatting
bytes_per_kb = 1024
bytes_per_mb = bytes_per_kb * bytes_per_kb
# Build the log message with all available metadata
log_lines = [
"=== Dataset Metadata ===",
]
# Add HF repository URL if settings are available
if self._settings is not None:
hf_repo = self._settings.hf_index_repo
log_lines.append(
f" Dataset URL: https://huggingface.co/datasets/{hf_repo}"
)
# Add index version
log_lines.append(f" Index Version: {manifest.index_version}")
# Add schema version
log_lines.append(f" Schema Version: {manifest.schema_version}")
# Add creation timestamp
log_lines.append(f" Created At: {manifest.created_at.isoformat()}")
# Add source file statistics
log_lines.append(f" Source Files: {manifest.source_file_count}")
# Calculate and format total size with human-readable units
total_bytes = manifest.total_source_size_bytes
if total_bytes >= bytes_per_mb:
# Format as MB for large files
total_size_str = f"{total_bytes / bytes_per_mb:.2f} MB"
elif total_bytes >= bytes_per_kb:
# Format as KB for medium files
total_size_str = f"{total_bytes / bytes_per_kb:.2f} KB"
else:
# Format as bytes for small files
total_size_str = f"{total_bytes} bytes"
log_lines.append(f" Total Source Size: {total_size_str}")
log_lines.append("========================")
# Log as a single multi-line message
logger.info("\n".join(log_lines))
def get_index_version(self) -> str | None:
"""Read the index version from index_version.txt.
This is a convenience method to read the index version directly
from the text file without loading the full manifest. Useful
for quick version checks or when manifest validation is not needed.
Returns:
-------
str | None: The index version string if the file exists and
is readable, None otherwise.
Example:
-------
>>> version = validator.get_index_version()
>>> if version:
... print(f"Index version: {version}")
... else:
... print("Index version file not found")
"""
version_path = self.index_version_path
if not version_path.exists():
logger.debug("Index version file not found: %s", version_path)
return None
try:
version = version_path.read_text(encoding="utf-8").strip()
except OSError as e:
logger.warning("Failed to read index version file: %s", e)
return None
else:
logger.debug("Read index version: %s", version)
return version if version else None
def check_version_consistency(self) -> bool:
"""Check if index version matches between manifest and text file.
Compares the index_version in source_manifest.json with the
content of index_version.txt. They should always match if the
build pipeline worked correctly.
This is useful for detecting partial or corrupted downloads
where some files may have been updated but not others.
Returns:
-------
bool: True if versions match, False if they differ or if
either file is missing/unreadable.
Example:
-------
>>> if not validator.check_version_consistency():
... logger.warning("Version inconsistency detected!")
... # Consider forcing a refresh
"""
# Read index version from text file
txt_version = self.get_index_version()
if txt_version is None:
logger.debug("Cannot check consistency: index_version.txt not readable")
return False
# Try to load manifest to get its version
try:
manifest = self._load_manifest()
manifest_version = manifest.index_version
except FreshnessValidationError:
logger.debug("Cannot check consistency: manifest not loadable")
return False
# Compare versions
if txt_version == manifest_version:
logger.debug(
"Version consistency check passed: %s",
txt_version,
)
return True
logger.warning(
"Version inconsistency detected: "
"index_version.txt=%s, manifest.index_version=%s",
txt_version,
manifest_version,
)
return False