Spaces:
Running
Running
| """Dataset freshness validator for server startup. | |
| This module validates that downloaded RAG artifacts are consistent | |
| and compatible with the server's expected schema version. It provides: | |
| - Loading and parsing of source_manifest.json from artifact cache | |
| - Schema version validation against EXPECTED_SCHEMA_VERSION | |
| - Comprehensive startup logging of dataset metadata | |
| - Fast-fail behavior with clear error messages | |
| The FreshnessValidator is designed to be called during server startup | |
| before the RAG pipeline is initialized. If validation fails, the server | |
| should refuse to start, preventing undefined behavior from incompatible | |
| or corrupted artifacts. | |
| Validation Checks: | |
| 1. Manifest file existence - source_manifest.json must be present | |
| 2. Manifest parsing - JSON must be valid and match expected structure | |
| 3. Schema version - Must match EXPECTED_SCHEMA_VERSION exactly | |
| 4. Index version consistency - Logged for debugging | |
| Startup Logging: | |
| The validator logs extensively at INFO level to provide visibility | |
| into the dataset state during server startup. This includes: | |
| - Dataset URL (HuggingFace repository) | |
| - Index version | |
| - Schema version | |
| - Number of source files | |
| - Total source file size | |
| - Manifest creation timestamp | |
| Lazy Loading: | |
| Heavy dependencies (Pydantic, JSON parsing) are loaded inside | |
| methods to avoid import overhead at module load time. This follows | |
| the project's lazy loading convention. | |
| Example: | |
| ------- | |
| >>> from pathlib import Path | |
| >>> from rag_chatbot.api.freshness import FreshnessValidator | |
| >>> from rag_chatbot.config.settings import Settings | |
| >>> | |
| >>> settings = Settings() | |
| >>> cache_path = Path(settings.artifact_cache_path) | |
| >>> validator = FreshnessValidator(cache_path, settings) | |
| >>> | |
| >>> try: | |
| ... manifest = validator.validate() | |
| ... print(f"Validation passed! Index version: {manifest.index_version}") | |
| ... except FreshnessValidationError as e: | |
| ... print(f"Validation failed: {e}") | |
| ... raise SystemExit(1) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING, Final | |
| if TYPE_CHECKING: | |
| from rag_chatbot.api.manifest import SourceManifest | |
| from rag_chatbot.config.settings import Settings | |
| # ============================================================================= | |
| # Module Exports | |
| # ============================================================================= | |
| __all__: list[str] = [ | |
| "FreshnessValidator", | |
| "FreshnessValidationError", | |
| ] | |
| # ============================================================================= | |
| # Logger | |
| # ============================================================================= | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================= | |
| # Constants | |
| # ============================================================================= | |
| SOURCE_MANIFEST_JSON: Final[str] = "source_manifest.json" | |
| """Filename for the source manifest JSON file in the artifact cache.""" | |
| INDEX_VERSION_TXT: Final[str] = "index_version.txt" | |
| """Filename for the index version text file in the artifact cache.""" | |
| # ============================================================================= | |
| # Exceptions | |
| # ============================================================================= | |
| class FreshnessValidationError(Exception): | |
| """Exception raised when dataset freshness validation fails. | |
| This exception is raised when the FreshnessValidator detects an | |
| issue with the downloaded artifacts that prevents safe operation. | |
| The error includes detailed information to help diagnose the issue. | |
| Common failure scenarios: | |
| - Missing source_manifest.json file | |
| - Invalid JSON in manifest file | |
| - Schema version mismatch | |
| - Missing required fields in manifest | |
| Attributes: | |
| ---------- | |
| message : str | |
| Human-readable description of the validation failure. | |
| expected_version : str | None | |
| The schema version expected by the server (if applicable). | |
| actual_version : str | None | |
| The schema version found in the manifest (if applicable). | |
| manifest_path : Path | None | |
| Path to the manifest file that failed validation (if applicable). | |
| Example: | |
| ------- | |
| >>> try: | |
| ... validator.validate() | |
| ... except FreshnessValidationError as e: | |
| ... print(f"Validation failed: {e.message}") | |
| ... if e.expected_version: | |
| ... print(f"Expected: {e.expected_version}") | |
| ... print(f"Actual: {e.actual_version}") | |
| ... if e.manifest_path: | |
| ... print(f"Manifest: {e.manifest_path}") | |
| """ | |
| def __init__( | |
| self, | |
| message: str, | |
| *, | |
| expected_version: str | None = None, | |
| actual_version: str | None = None, | |
| manifest_path: Path | None = None, | |
| ) -> None: | |
| """Initialize a FreshnessValidationError. | |
| Args: | |
| ---- | |
| message: Human-readable description of the validation failure. | |
| expected_version: Schema version expected by server (optional). | |
| actual_version: Schema version found in manifest (optional). | |
| manifest_path: Path to the manifest file (optional). | |
| """ | |
| super().__init__(message) | |
| self.message = message | |
| self.expected_version = expected_version | |
| self.actual_version = actual_version | |
| self.manifest_path = manifest_path | |
| def __str__(self) -> str: | |
| """Return a string representation of the error. | |
| Returns | |
| ------- | |
| Formatted error message with version and path info if available. | |
| """ | |
| parts = [self.message] | |
| # Add version mismatch details if present | |
| if self.expected_version is not None and self.actual_version is not None: | |
| parts.append( | |
| f"(expected schema: {self.expected_version}, " | |
| f"actual schema: {self.actual_version})" | |
| ) | |
| # Add manifest path if present | |
| if self.manifest_path: | |
| parts.append(f"[manifest: {self.manifest_path}]") | |
| return " ".join(parts) | |
| # ============================================================================= | |
| # FreshnessValidator Class | |
| # ============================================================================= | |
| class FreshnessValidator: | |
| """Validates dataset freshness and compatibility on server startup. | |
| The FreshnessValidator checks that downloaded RAG artifacts are | |
| compatible with the current server code by validating: | |
| - Presence of source_manifest.json | |
| - Valid JSON structure matching the SourceManifest model | |
| - Schema version matches EXPECTED_SCHEMA_VERSION | |
| If validation passes, the validator logs detailed metadata about | |
| the dataset for debugging and monitoring purposes. If validation | |
| fails, it raises FreshnessValidationError with actionable details. | |
| This class is designed to be used during server startup, before | |
| initializing the RAG pipeline. The validate() method should be | |
| called after artifacts are downloaded but before they are loaded. | |
| Attributes: | |
| ---------- | |
| cache_path : Path | |
| Path to the artifact cache directory. | |
| settings : Settings | None | |
| Application settings (optional, used for logging HF repo URL). | |
| Example: | |
| ------- | |
| >>> from pathlib import Path | |
| >>> validator = FreshnessValidator(Path("/app/.cache/artifacts")) | |
| >>> manifest = validator.validate() # Raises on failure | |
| >>> print(f"Index version: {manifest.index_version}") | |
| Index version: 2024.01.15.001 | |
| Note: | |
| ---- | |
| The validator uses lazy imports for Pydantic models to avoid | |
| loading them at module import time. | |
| """ | |
| def __init__( | |
| self, | |
| cache_path: Path, | |
| settings: Settings | None = None, | |
| ) -> None: | |
| """Initialize the FreshnessValidator. | |
| Creates a new validator instance configured to check artifacts | |
| in the specified cache directory. | |
| Args: | |
| ---- | |
| cache_path: Path to the artifact cache directory where | |
| source_manifest.json and other artifacts are stored. | |
| settings: Optional application settings. If provided, used | |
| to include HF repository URL in log messages. | |
| Example: | |
| ------- | |
| >>> from pathlib import Path | |
| >>> validator = FreshnessValidator(Path("/app/.cache/artifacts")) | |
| """ | |
| self._cache_path = cache_path | |
| self._settings = settings | |
| logger.debug( | |
| "Initialized FreshnessValidator: cache_path=%s", | |
| cache_path, | |
| ) | |
| def cache_path(self) -> Path: | |
| """Get the artifact cache directory path. | |
| Returns | |
| ------- | |
| Path to the artifact cache directory. | |
| """ | |
| return self._cache_path | |
| def settings(self) -> Settings | None: | |
| """Get the application settings. | |
| Returns | |
| ------- | |
| The Settings instance, or None if not provided. | |
| """ | |
| return self._settings | |
| def manifest_path(self) -> Path: | |
| """Get the path to the source manifest file. | |
| Returns | |
| ------- | |
| Full path to source_manifest.json in the cache directory. | |
| """ | |
| return self._cache_path / SOURCE_MANIFEST_JSON | |
| def index_version_path(self) -> Path: | |
| """Get the path to the index version file. | |
| Returns | |
| ------- | |
| Full path to index_version.txt in the cache directory. | |
| """ | |
| return self._cache_path / INDEX_VERSION_TXT | |
| def validate(self) -> SourceManifest | None: | |
| """Validate the downloaded artifacts and return the manifest. | |
| This is the main entry point for validation. It performs all | |
| validation checks and logs dataset metadata on success. If any | |
| check fails, it raises FreshnessValidationError with details. | |
| Validation Steps: | |
| 1. Load and parse source_manifest.json | |
| 2. Validate schema version matches EXPECTED_SCHEMA_VERSION | |
| 3. Log dataset metadata for visibility | |
| Returns: | |
| ------- | |
| SourceManifest | None: The validated manifest object containing | |
| all metadata about the dataset, or None if the manifest | |
| uses a legacy format that cannot be fully validated. | |
| Raises: | |
| ------ | |
| FreshnessValidationError: If any validation check fails. | |
| The exception includes details about the failure. | |
| Example: | |
| ------- | |
| >>> try: | |
| ... manifest = validator.validate() | |
| ... if manifest: | |
| ... print(f"Validated! Version: {manifest.index_version}") | |
| ... else: | |
| ... print("Legacy dataset - validation skipped") | |
| ... except FreshnessValidationError as e: | |
| ... logger.error("Validation failed: %s", e) | |
| ... raise SystemExit(1) | |
| """ | |
| logger.info("Starting dataset freshness validation...") | |
| # Step 1: Load and parse the manifest file | |
| manifest = self._load_manifest() | |
| # If manifest is None, we have a legacy dataset - skip validation | |
| if manifest is None: | |
| logger.info( | |
| "Dataset freshness validation SKIPPED (legacy manifest format). " | |
| "The dataset will be loaded without manifest validation." | |
| ) | |
| return None | |
| # Step 2: Validate schema version compatibility | |
| self._validate_schema_version(manifest) | |
| # Step 3: Log dataset metadata for visibility | |
| self._log_dataset_metadata(manifest) | |
| logger.info("Dataset freshness validation PASSED") | |
| return manifest | |
| def _load_manifest(self) -> SourceManifest | None: | |
| """Load and parse the source manifest from the cache. | |
| Reads source_manifest.json from the artifact cache directory | |
| and parses it into a SourceManifest Pydantic model. If the | |
| file is missing or contains invalid JSON/structure, raises | |
| FreshnessValidationError with details. | |
| Returns | |
| ------- | |
| SourceManifest | None: Parsed manifest object, or None if the | |
| manifest uses a legacy format that cannot be parsed. | |
| Raises | |
| ------ | |
| FreshnessValidationError: If manifest file is missing, | |
| contains invalid JSON, or doesn't match the expected | |
| model structure. | |
| """ | |
| # Lazy import to avoid loading Pydantic at module import time | |
| from rag_chatbot.api.manifest import SourceManifest | |
| manifest_path = self.manifest_path | |
| # Check if manifest file exists | |
| if not manifest_path.exists(): | |
| # No manifest file - this is OK for legacy datasets | |
| logger.warning( | |
| "Source manifest file not found: %s. " | |
| "This may indicate a legacy dataset. Skipping manifest validation.", | |
| manifest_path, | |
| ) | |
| return None | |
| logger.debug("Loading manifest from: %s", manifest_path) | |
| # Read and parse JSON | |
| try: | |
| manifest_text = manifest_path.read_text(encoding="utf-8") | |
| manifest_data = json.loads(manifest_text) | |
| except json.JSONDecodeError as e: | |
| msg = ( | |
| f"Invalid JSON in source manifest file: {e}. " | |
| "The manifest file may be corrupted. " | |
| "Try re-downloading artifacts or rebuilding the dataset." | |
| ) | |
| raise FreshnessValidationError(msg, manifest_path=manifest_path) from e | |
| except OSError as e: | |
| msg = ( | |
| f"Failed to read source manifest file: {e}. " | |
| "Check file permissions and disk space." | |
| ) | |
| raise FreshnessValidationError(msg, manifest_path=manifest_path) from e | |
| # Check if this is a legacy manifest format (missing schema_version) | |
| if "schema_version" not in manifest_data: | |
| logger.warning( | |
| "Legacy manifest format detected (missing schema_version). " | |
| "Skipping manifest validation. Consider rebuilding the dataset " | |
| "with the current build pipeline to enable full validation." | |
| ) | |
| # Log what we can from the legacy format | |
| if "created_at" in manifest_data: | |
| logger.info("Legacy manifest created_at: %s", manifest_data["created_at"]) | |
| if "total_chunks" in manifest_data: | |
| logger.info("Legacy manifest total_chunks: %s", manifest_data["total_chunks"]) | |
| return None | |
| # Parse into Pydantic model | |
| try: | |
| manifest = SourceManifest.model_validate(manifest_data) | |
| except Exception as e: | |
| # Catch Pydantic ValidationError - wrap with actionable error message | |
| msg = ( | |
| f"Failed to parse source manifest: {e}. " | |
| "The manifest structure may be incompatible with this server version. " | |
| "Check if the dataset was built with a compatible version " | |
| "of the build pipeline." | |
| ) | |
| raise FreshnessValidationError(msg, manifest_path=manifest_path) from e | |
| logger.debug( | |
| "Loaded manifest: schema_version=%s, index_version=%s, files=%d", | |
| manifest.schema_version, | |
| manifest.index_version, | |
| manifest.source_file_count, | |
| ) | |
| return manifest | |
| def _validate_schema_version(self, manifest: SourceManifest) -> None: | |
| """Validate that the manifest schema version is compatible. | |
| Compares the manifest's schema_version with EXPECTED_SCHEMA_VERSION. | |
| If they don't match exactly, raises FreshnessValidationError. | |
| Currently uses strict equality matching. Future versions may | |
| implement semantic version comparison for backward compatibility. | |
| Args: | |
| ---- | |
| manifest: The loaded SourceManifest to validate. | |
| Raises: | |
| ------ | |
| FreshnessValidationError: If schema versions don't match. | |
| """ | |
| # Lazy import to avoid loading at module import time | |
| from rag_chatbot.api.manifest import EXPECTED_SCHEMA_VERSION | |
| actual_version = manifest.schema_version | |
| expected_version = EXPECTED_SCHEMA_VERSION | |
| if actual_version != expected_version: | |
| msg = ( | |
| f"Schema version mismatch: expected '{expected_version}', " | |
| f"found '{actual_version}'. " | |
| "The dataset was built with an incompatible version of the " | |
| "build pipeline. Please rebuild the dataset with the current " | |
| "version of the codebase, or update the server to a compatible version." | |
| ) | |
| raise FreshnessValidationError( | |
| msg, | |
| expected_version=expected_version, | |
| actual_version=actual_version, | |
| manifest_path=self.manifest_path, | |
| ) | |
| logger.debug( | |
| "Schema version validated: %s (matches expected)", | |
| actual_version, | |
| ) | |
| def _log_dataset_metadata(self, manifest: SourceManifest) -> None: | |
| """Log detailed metadata about the validated dataset. | |
| This method logs comprehensive information about the dataset | |
| at INFO level for visibility during server startup. The logs | |
| help with debugging, monitoring, and auditing. | |
| Logged Information: | |
| - Dataset URL (HuggingFace repository, if settings provided) | |
| - Index version | |
| - Schema version | |
| - Manifest creation timestamp | |
| - Number of source files | |
| - Total source file size | |
| Args: | |
| ---- | |
| manifest: The validated SourceManifest to log details from. | |
| """ | |
| # Size thresholds for human-readable formatting | |
| bytes_per_kb = 1024 | |
| bytes_per_mb = bytes_per_kb * bytes_per_kb | |
| # Build the log message with all available metadata | |
| log_lines = [ | |
| "=== Dataset Metadata ===", | |
| ] | |
| # Add HF repository URL if settings are available | |
| if self._settings is not None: | |
| hf_repo = self._settings.hf_index_repo | |
| log_lines.append( | |
| f" Dataset URL: https://huggingface.co/datasets/{hf_repo}" | |
| ) | |
| # Add index version | |
| log_lines.append(f" Index Version: {manifest.index_version}") | |
| # Add schema version | |
| log_lines.append(f" Schema Version: {manifest.schema_version}") | |
| # Add creation timestamp | |
| log_lines.append(f" Created At: {manifest.created_at.isoformat()}") | |
| # Add source file statistics | |
| log_lines.append(f" Source Files: {manifest.source_file_count}") | |
| # Calculate and format total size with human-readable units | |
| total_bytes = manifest.total_source_size_bytes | |
| if total_bytes >= bytes_per_mb: | |
| # Format as MB for large files | |
| total_size_str = f"{total_bytes / bytes_per_mb:.2f} MB" | |
| elif total_bytes >= bytes_per_kb: | |
| # Format as KB for medium files | |
| total_size_str = f"{total_bytes / bytes_per_kb:.2f} KB" | |
| else: | |
| # Format as bytes for small files | |
| total_size_str = f"{total_bytes} bytes" | |
| log_lines.append(f" Total Source Size: {total_size_str}") | |
| log_lines.append("========================") | |
| # Log as a single multi-line message | |
| logger.info("\n".join(log_lines)) | |
| def get_index_version(self) -> str | None: | |
| """Read the index version from index_version.txt. | |
| This is a convenience method to read the index version directly | |
| from the text file without loading the full manifest. Useful | |
| for quick version checks or when manifest validation is not needed. | |
| Returns: | |
| ------- | |
| str | None: The index version string if the file exists and | |
| is readable, None otherwise. | |
| Example: | |
| ------- | |
| >>> version = validator.get_index_version() | |
| >>> if version: | |
| ... print(f"Index version: {version}") | |
| ... else: | |
| ... print("Index version file not found") | |
| """ | |
| version_path = self.index_version_path | |
| if not version_path.exists(): | |
| logger.debug("Index version file not found: %s", version_path) | |
| return None | |
| try: | |
| version = version_path.read_text(encoding="utf-8").strip() | |
| except OSError as e: | |
| logger.warning("Failed to read index version file: %s", e) | |
| return None | |
| else: | |
| logger.debug("Read index version: %s", version) | |
| return version if version else None | |
| def check_version_consistency(self) -> bool: | |
| """Check if index version matches between manifest and text file. | |
| Compares the index_version in source_manifest.json with the | |
| content of index_version.txt. They should always match if the | |
| build pipeline worked correctly. | |
| This is useful for detecting partial or corrupted downloads | |
| where some files may have been updated but not others. | |
| Returns: | |
| ------- | |
| bool: True if versions match, False if they differ or if | |
| either file is missing/unreadable. | |
| Example: | |
| ------- | |
| >>> if not validator.check_version_consistency(): | |
| ... logger.warning("Version inconsistency detected!") | |
| ... # Consider forcing a refresh | |
| """ | |
| # Read index version from text file | |
| txt_version = self.get_index_version() | |
| if txt_version is None: | |
| logger.debug("Cannot check consistency: index_version.txt not readable") | |
| return False | |
| # Try to load manifest to get its version | |
| try: | |
| manifest = self._load_manifest() | |
| manifest_version = manifest.index_version | |
| except FreshnessValidationError: | |
| logger.debug("Cannot check consistency: manifest not loadable") | |
| return False | |
| # Compare versions | |
| if txt_version == manifest_version: | |
| logger.debug( | |
| "Version consistency check passed: %s", | |
| txt_version, | |
| ) | |
| return True | |
| logger.warning( | |
| "Version inconsistency detected: " | |
| "index_version.txt=%s, manifest.index_version=%s", | |
| txt_version, | |
| manifest_version, | |
| ) | |
| return False | |