Spaces:
Running
Running
File size: 5,275 Bytes
d29b763 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Union
import hashlib
import pandas as pd
logger = logging.getLogger(__name__)
BASE_DIR = Path(__file__).resolve().parents[2]
DATA_DIR = BASE_DIR / 'data'
PROCESSED_DIR = DATA_DIR / 'processed'
RAW_DIR = DATA_DIR / 'raw'
CACHE_DIR = BASE_DIR / 'cache'
MANIFEST_PATH = PROCESSED_DIR / 'artifact_manifest.json'
def compute_checksum(file_path: Path) -> str:
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
class ArtifactManager:
"""Centralized Artifact Manager for structured datasets."""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.manifest = self._load_manifest()
self.initialized = True
def _load_manifest(self) -> Dict[str, Any]:
if MANIFEST_PATH.exists():
try:
with open(MANIFEST_PATH, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Could not load manifest: {e}")
return {"artifacts": {}, "version": "1.0"}
def _save_manifest(self):
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
with open(MANIFEST_PATH, 'w') as f:
json.dump(self.manifest, f, indent=2)
def register_artifact(self, name: str, df: pd.DataFrame, file_path: Path):
"""Register an artifact in the manifest with schema info."""
schema = {col: str(dtype) for col, dtype in df.dtypes.items()}
checksum = compute_checksum(file_path)
self.manifest['artifacts'][name] = {
'path': str(file_path.relative_to(BASE_DIR)),
'columns': list(df.columns),
'rows': len(df),
'schema': schema,
'checksum': checksum,
'version': "1.0"
}
self._save_manifest()
def load_artifact(self, name: str, required_columns: Optional[list] = None, validate_schema: bool = True) -> pd.DataFrame:
"""Load an artifact securely with validation."""
path_str = self.manifest.get('artifacts', {}).get(name, {}).get('path')
needs_rebuild = False
if not path_str:
path = PROCESSED_DIR / f"{name}.parquet"
if not path.exists():
needs_rebuild = True
else:
path = BASE_DIR / path_str
if not path.exists():
needs_rebuild = True
else:
expected_checksum = self.manifest['artifacts'][name].get('checksum')
if expected_checksum and compute_checksum(path) != expected_checksum:
logger.warning(f"Checksum mismatch for artifact {name}. Triggering rebuild...")
needs_rebuild = True
if needs_rebuild:
logger.info(f"Artifact {name} missing or invalid. Triggering rebuild...")
from preprocessing.artifact_store import ensure_structured_data
ensure_structured_data(force_rebuild=True)
self.manifest = self._load_manifest() # Reload manifest
# Update path resolution in case it changed
path_str = self.manifest.get('artifacts', {}).get(name, {}).get('path')
if path_str:
path = BASE_DIR / path_str
elif not path.exists():
path = PROCESSED_DIR / f"{name}.parquet"
if not path.exists():
raise FileNotFoundError(f"Failed to find or rebuild artifact: {name} at {path}")
df = pd.read_parquet(path)
if required_columns:
missing = [col for col in required_columns if col not in df.columns]
if missing:
raise ValueError(f"Artifact {name} missing required columns: {missing}")
if validate_schema and name in self.manifest['artifacts']:
expected_schema = self.manifest['artifacts'][name].get('schema', {})
for col, expected_type in expected_schema.items():
if col in df.columns:
actual_type = str(df[col].dtype)
if expected_type != actual_type and 'object' not in actual_type: # simple check
pass # Could warn here
return df
def verify_all_artifacts(self) -> bool:
"""Verify the integrity of all artifacts in the manifest."""
all_valid = True
for name, metadata in self.manifest.get('artifacts', {}).items():
path_str = metadata.get('path')
if not path_str:
all_valid = False
continue
path = BASE_DIR / path_str
if not path.exists():
all_valid = False
continue
if compute_checksum(path) != metadata.get('checksum'):
all_valid = False
continue
return all_valid
manager = ArtifactManager()
|