import json import logging from pathlib import Path from typing import Any, Dict, Optional, Union import hashlib import pandas as pd logger = logging.getLogger(__name__) BASE_DIR = Path(__file__).resolve().parents[2] DATA_DIR = BASE_DIR / 'data' PROCESSED_DIR = DATA_DIR / 'processed' RAW_DIR = DATA_DIR / 'raw' CACHE_DIR = BASE_DIR / 'cache' MANIFEST_PATH = PROCESSED_DIR / 'artifact_manifest.json' def compute_checksum(file_path: Path) -> str: hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() class ArtifactManager: """Centralized Artifact Manager for structured datasets.""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not hasattr(self, 'initialized'): self.manifest = self._load_manifest() self.initialized = True def _load_manifest(self) -> Dict[str, Any]: if MANIFEST_PATH.exists(): try: with open(MANIFEST_PATH, 'r') as f: return json.load(f) except Exception as e: logger.warning(f"Could not load manifest: {e}") return {"artifacts": {}, "version": "1.0"} def _save_manifest(self): PROCESSED_DIR.mkdir(parents=True, exist_ok=True) with open(MANIFEST_PATH, 'w') as f: json.dump(self.manifest, f, indent=2) def register_artifact(self, name: str, df: pd.DataFrame, file_path: Path): """Register an artifact in the manifest with schema info.""" schema = {col: str(dtype) for col, dtype in df.dtypes.items()} checksum = compute_checksum(file_path) self.manifest['artifacts'][name] = { 'path': str(file_path.relative_to(BASE_DIR)), 'columns': list(df.columns), 'rows': len(df), 'schema': schema, 'checksum': checksum, 'version': "1.0" } self._save_manifest() def load_artifact(self, name: str, required_columns: Optional[list] = None, validate_schema: bool = True) -> pd.DataFrame: """Load an artifact securely with validation.""" path_str = self.manifest.get('artifacts', {}).get(name, {}).get('path') needs_rebuild = False if not path_str: path = PROCESSED_DIR / f"{name}.parquet" if not path.exists(): needs_rebuild = True else: path = BASE_DIR / path_str if not path.exists(): needs_rebuild = True else: expected_checksum = self.manifest['artifacts'][name].get('checksum') if expected_checksum and compute_checksum(path) != expected_checksum: logger.warning(f"Checksum mismatch for artifact {name}. Triggering rebuild...") needs_rebuild = True if needs_rebuild: logger.info(f"Artifact {name} missing or invalid. Triggering rebuild...") from preprocessing.artifact_store import ensure_structured_data ensure_structured_data(force_rebuild=True) self.manifest = self._load_manifest() # Reload manifest # Update path resolution in case it changed path_str = self.manifest.get('artifacts', {}).get(name, {}).get('path') if path_str: path = BASE_DIR / path_str elif not path.exists(): path = PROCESSED_DIR / f"{name}.parquet" if not path.exists(): raise FileNotFoundError(f"Failed to find or rebuild artifact: {name} at {path}") df = pd.read_parquet(path) if required_columns: missing = [col for col in required_columns if col not in df.columns] if missing: raise ValueError(f"Artifact {name} missing required columns: {missing}") if validate_schema and name in self.manifest['artifacts']: expected_schema = self.manifest['artifacts'][name].get('schema', {}) for col, expected_type in expected_schema.items(): if col in df.columns: actual_type = str(df[col].dtype) if expected_type != actual_type and 'object' not in actual_type: # simple check pass # Could warn here return df def verify_all_artifacts(self) -> bool: """Verify the integrity of all artifacts in the manifest.""" all_valid = True for name, metadata in self.manifest.get('artifacts', {}).items(): path_str = metadata.get('path') if not path_str: all_valid = False continue path = BASE_DIR / path_str if not path.exists(): all_valid = False continue if compute_checksum(path) != metadata.get('checksum'): all_valid = False continue return all_valid manager = ArtifactManager()