File size: 5,275 Bytes
d29b763
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Union
import hashlib

import pandas as pd

logger = logging.getLogger(__name__)

BASE_DIR = Path(__file__).resolve().parents[2]
DATA_DIR = BASE_DIR / 'data'
PROCESSED_DIR = DATA_DIR / 'processed'
RAW_DIR = DATA_DIR / 'raw'
CACHE_DIR = BASE_DIR / 'cache'

MANIFEST_PATH = PROCESSED_DIR / 'artifact_manifest.json'

def compute_checksum(file_path: Path) -> str:
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

class ArtifactManager:
    """Centralized Artifact Manager for structured datasets."""
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
        
    def __init__(self):
        if not hasattr(self, 'initialized'):
            self.manifest = self._load_manifest()
            self.initialized = True
            
    def _load_manifest(self) -> Dict[str, Any]:
        if MANIFEST_PATH.exists():
            try:
                with open(MANIFEST_PATH, 'r') as f:
                    return json.load(f)
            except Exception as e:
                logger.warning(f"Could not load manifest: {e}")
        return {"artifacts": {}, "version": "1.0"}
        
    def _save_manifest(self):
        PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
        with open(MANIFEST_PATH, 'w') as f:
            json.dump(self.manifest, f, indent=2)

    def register_artifact(self, name: str, df: pd.DataFrame, file_path: Path):
        """Register an artifact in the manifest with schema info."""
        schema = {col: str(dtype) for col, dtype in df.dtypes.items()}
        checksum = compute_checksum(file_path)
        self.manifest['artifacts'][name] = {
            'path': str(file_path.relative_to(BASE_DIR)),
            'columns': list(df.columns),
            'rows': len(df),
            'schema': schema,
            'checksum': checksum,
            'version': "1.0"
        }
        self._save_manifest()
        
    def load_artifact(self, name: str, required_columns: Optional[list] = None, validate_schema: bool = True) -> pd.DataFrame:
        """Load an artifact securely with validation."""
        path_str = self.manifest.get('artifacts', {}).get(name, {}).get('path')
        needs_rebuild = False
        
        if not path_str:
            path = PROCESSED_DIR / f"{name}.parquet"
            if not path.exists():
                needs_rebuild = True
        else:
            path = BASE_DIR / path_str
            if not path.exists():
                needs_rebuild = True
            else:
                expected_checksum = self.manifest['artifacts'][name].get('checksum')
                if expected_checksum and compute_checksum(path) != expected_checksum:
                    logger.warning(f"Checksum mismatch for artifact {name}. Triggering rebuild...")
                    needs_rebuild = True
                    
        if needs_rebuild:
            logger.info(f"Artifact {name} missing or invalid. Triggering rebuild...")
            from preprocessing.artifact_store import ensure_structured_data
            ensure_structured_data(force_rebuild=True)
            self.manifest = self._load_manifest() # Reload manifest
            # Update path resolution in case it changed
            path_str = self.manifest.get('artifacts', {}).get(name, {}).get('path')
            if path_str:
                path = BASE_DIR / path_str
            elif not path.exists():
                path = PROCESSED_DIR / f"{name}.parquet"
            if not path.exists():
                raise FileNotFoundError(f"Failed to find or rebuild artifact: {name} at {path}")
            
        df = pd.read_parquet(path)
        
        if required_columns:
            missing = [col for col in required_columns if col not in df.columns]
            if missing:
                raise ValueError(f"Artifact {name} missing required columns: {missing}")

        if validate_schema and name in self.manifest['artifacts']:
            expected_schema = self.manifest['artifacts'][name].get('schema', {})
            for col, expected_type in expected_schema.items():
                if col in df.columns:
                    actual_type = str(df[col].dtype)
                    if expected_type != actual_type and 'object' not in actual_type:  # simple check
                        pass # Could warn here
                
        return df

    def verify_all_artifacts(self) -> bool:
        """Verify the integrity of all artifacts in the manifest."""
        all_valid = True
        for name, metadata in self.manifest.get('artifacts', {}).items():
            path_str = metadata.get('path')
            if not path_str:
                all_valid = False
                continue
            path = BASE_DIR / path_str
            if not path.exists():
                all_valid = False
                continue
            if compute_checksum(path) != metadata.get('checksum'):
                all_valid = False
                continue
        return all_valid

manager = ArtifactManager()