Spaces:
Running
Running
| """ | |
| Artifact Storage Abstraction Layer | |
| Provides unified interface for saving models, plots, reports, and data files | |
| to either local filesystem or Google Cloud Storage (GCS). | |
| Design Principles: | |
| - Backend chosen via environment variable (ARTIFACT_BACKEND=local|gcs) | |
| - Tools never know which backend is used (clean separation) | |
| - GCS paths versioned with timestamps for reproducibility | |
| - Consistent return format: local paths or GCS URIs | |
| - Graceful fallback to local if GCS unavailable | |
| Architecture: | |
| Tool → ArtifactStore → LocalBackend / GCSBackend | |
| Usage: | |
| from storage import get_artifact_store | |
| store = get_artifact_store() | |
| # Save model | |
| path = store.save_model("model.pkl", metadata={"accuracy": 0.95}) | |
| # Save plot | |
| path = store.save_plot("correlation_heatmap.html") | |
| # Save report | |
| path = store.save_report("eda_report.html") | |
| # Save data file | |
| path = store.save_data("cleaned_data.csv") | |
| """ | |
| import os | |
| import json | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional, Union | |
| from abc import ABC, abstractmethod | |
| class StorageBackend(ABC): | |
| """Abstract base class for storage backends.""" | |
| def save_file( | |
| self, | |
| local_path: Union[str, Path], | |
| artifact_type: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Save file to backend storage. | |
| Args: | |
| local_path: Path to local file to save | |
| artifact_type: Type of artifact (model, plot, report, data) | |
| metadata: Optional metadata to save alongside artifact | |
| Returns: | |
| Storage path or URI where file was saved | |
| """ | |
| pass | |
| def list_artifacts(self, artifact_type: str) -> list[str]: | |
| """List all artifacts of given type.""" | |
| pass | |
| def get_artifact_path(self, artifact_type: str, filename: str) -> str: | |
| """Get full path/URI for an artifact.""" | |
| pass | |
| class LocalBackend(StorageBackend): | |
| """ | |
| Local filesystem storage backend. | |
| Preserves existing behavior - saves to ./outputs/ directory structure. | |
| """ | |
| def __init__(self, base_dir: str = "./outputs"): | |
| """ | |
| Initialize local backend. | |
| Args: | |
| base_dir: Base directory for all artifacts (default: ./outputs) | |
| """ | |
| self.base_dir = Path(base_dir) | |
| # Create subdirectories | |
| self.subdirs = { | |
| "model": self.base_dir / "models", | |
| "plot": self.base_dir / "plots", | |
| "report": self.base_dir / "reports", | |
| "data": self.base_dir / "data", | |
| "code": self.base_dir / "code" | |
| } | |
| for subdir in self.subdirs.values(): | |
| subdir.mkdir(parents=True, exist_ok=True) | |
| def save_file( | |
| self, | |
| local_path: Union[str, Path], | |
| artifact_type: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Save file to local filesystem. | |
| Args: | |
| local_path: Path to source file | |
| artifact_type: Type (model, plot, report, data, code) | |
| metadata: Optional metadata (saved as JSON sidecar) | |
| Returns: | |
| Absolute path where file was saved | |
| """ | |
| local_path = Path(local_path) | |
| if not local_path.exists(): | |
| raise FileNotFoundError(f"Source file not found: {local_path}") | |
| # Determine target directory | |
| target_dir = self.subdirs.get(artifact_type) | |
| if target_dir is None: | |
| raise ValueError( | |
| f"Unknown artifact type: {artifact_type}. " | |
| f"Must be one of: {list(self.subdirs.keys())}" | |
| ) | |
| # Preserve filename | |
| target_path = target_dir / local_path.name | |
| # Copy file (if not already in target location) | |
| if local_path.resolve() != target_path.resolve(): | |
| import shutil | |
| shutil.copy2(local_path, target_path) | |
| # Save metadata if provided | |
| if metadata: | |
| metadata_path = target_path.with_suffix(target_path.suffix + ".meta.json") | |
| with open(metadata_path, "w") as f: | |
| json.dump({ | |
| "artifact_type": artifact_type, | |
| "filename": local_path.name, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "backend": "local", | |
| **metadata | |
| }, f, indent=2) | |
| return str(target_path.resolve()) | |
| def list_artifacts(self, artifact_type: str) -> list[str]: | |
| """List all artifacts of given type in local storage.""" | |
| # Validate artifact type | |
| valid_types = ["model", "plot", "report", "data", "code"] | |
| if artifact_type not in valid_types: | |
| raise ValueError( | |
| f"Invalid artifact type: {artifact_type}. " | |
| f"Must be one of: {', '.join(valid_types)}" | |
| ) | |
| target_dir = self.subdirs.get(artifact_type) | |
| if target_dir is None or not target_dir.exists(): | |
| return [] | |
| # Exclude metadata files | |
| return [ | |
| str(f.resolve()) | |
| for f in target_dir.iterdir() | |
| if f.is_file() and not f.name.endswith(".meta.json") | |
| ] | |
| def get_artifact_path(self, artifact_type: str, filename: str) -> str: | |
| """Get full local path for artifact.""" | |
| target_dir = self.subdirs.get(artifact_type) | |
| if target_dir is None: | |
| raise ValueError(f"Unknown artifact type: {artifact_type}") | |
| return str((target_dir / filename).resolve()) | |
| class GCSBackend(StorageBackend): | |
| """ | |
| Google Cloud Storage backend. | |
| Saves artifacts to GCS bucket with versioned paths. | |
| """ | |
| def __init__( | |
| self, | |
| bucket_name: Optional[str] = None, | |
| project_id: Optional[str] = None, | |
| base_prefix: str = "artifacts" | |
| ): | |
| """ | |
| Initialize GCS backend. | |
| Args: | |
| bucket_name: GCS bucket name (from env: GCS_BUCKET_NAME) | |
| project_id: GCP project ID (from env: GCP_PROJECT_ID) | |
| base_prefix: Base prefix for all artifacts (default: artifacts) | |
| """ | |
| try: | |
| from google.cloud import storage | |
| from google.auth import default as gcp_default | |
| except ImportError: | |
| raise ImportError( | |
| "GCS backend requires google-cloud-storage. " | |
| "Install with: pip install google-cloud-storage" | |
| ) | |
| # Get configuration from environment | |
| self.bucket_name = bucket_name or os.getenv("GCS_BUCKET_NAME") | |
| self.project_id = project_id or os.getenv("GCP_PROJECT_ID") | |
| self.base_prefix = base_prefix | |
| if not self.bucket_name: | |
| raise ValueError( | |
| "GCS bucket name not specified. " | |
| "Set GCS_BUCKET_NAME environment variable or pass bucket_name." | |
| ) | |
| # Initialize GCS client | |
| try: | |
| if self.project_id: | |
| self.client = storage.Client(project=self.project_id) | |
| else: | |
| # Use default credentials | |
| credentials, project = gcp_default() | |
| self.client = storage.Client(credentials=credentials, project=project) | |
| self.project_id = project | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"Failed to initialize GCS client: {e}\n" | |
| "Ensure credentials are configured (GOOGLE_APPLICATION_CREDENTIALS " | |
| "or gcloud auth application-default login)" | |
| ) | |
| # Get bucket | |
| try: | |
| self.bucket = self.client.bucket(self.bucket_name) | |
| # Verify bucket exists | |
| if not self.bucket.exists(): | |
| raise ValueError(f"Bucket '{self.bucket_name}' does not exist") | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to access bucket '{self.bucket_name}': {e}") | |
| def _get_versioned_path(self, artifact_type: str, filename: str) -> str: | |
| """ | |
| Generate versioned GCS path. | |
| Format: artifacts/{type}/{YYYY-MM-DD}/{timestamp}_{filename} | |
| Example: artifacts/models/2025-12-23/20251223_143052_model.pkl | |
| """ | |
| timestamp = datetime.utcnow() | |
| date_str = timestamp.strftime("%Y-%m-%d") | |
| time_str = timestamp.strftime("%Y%m%d_%H%M%S") | |
| versioned_filename = f"{time_str}_{filename}" | |
| return f"{self.base_prefix}/{artifact_type}/{date_str}/{versioned_filename}" | |
| def save_file( | |
| self, | |
| local_path: Union[str, Path], | |
| artifact_type: str, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Upload file to GCS with versioned path. | |
| Args: | |
| local_path: Path to local file to upload | |
| artifact_type: Type (model, plot, report, data, code) | |
| metadata: Optional metadata (stored as blob metadata) | |
| Returns: | |
| GCS URI (gs://bucket/path) | |
| """ | |
| local_path = Path(local_path) | |
| if not local_path.exists(): | |
| raise FileNotFoundError(f"Source file not found: {local_path}") | |
| # Generate versioned path | |
| gcs_path = self._get_versioned_path(artifact_type, local_path.name) | |
| # Create blob | |
| blob = self.bucket.blob(gcs_path) | |
| # Set metadata | |
| if metadata: | |
| blob.metadata = { | |
| "artifact_type": artifact_type, | |
| "filename": local_path.name, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "backend": "gcs", | |
| **{k: str(v) for k, v in metadata.items()} # Convert all to strings | |
| } | |
| # Upload file | |
| try: | |
| blob.upload_from_filename(str(local_path)) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to upload to GCS: {e}") | |
| # Return GCS URI | |
| gcs_uri = f"gs://{self.bucket_name}/{gcs_path}" | |
| return gcs_uri | |
| def list_artifacts(self, artifact_type: str) -> list[str]: | |
| """List all artifacts of given type in GCS.""" | |
| # Validate artifact type | |
| valid_types = ["model", "plot", "report", "data", "code"] | |
| if artifact_type not in valid_types: | |
| raise ValueError( | |
| f"Invalid artifact type: {artifact_type}. " | |
| f"Must be one of: {', '.join(valid_types)}" | |
| ) | |
| prefix = f"{self.base_prefix}/{artifact_type}/" | |
| try: | |
| blobs = self.client.list_blobs(self.bucket, prefix=prefix) | |
| return [f"gs://{self.bucket_name}/{blob.name}" for blob in blobs] | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to list GCS artifacts: {e}") | |
| def get_artifact_path(self, artifact_type: str, filename: str) -> str: | |
| """Get latest GCS path for artifact (most recent version).""" | |
| artifacts = self.list_artifacts(artifact_type) | |
| # Filter by filename (strip timestamp prefix) | |
| matching = [ | |
| uri for uri in artifacts | |
| if uri.endswith(f"_{filename}") or uri.endswith(f"/{filename}") | |
| ] | |
| if not matching: | |
| raise FileNotFoundError( | |
| f"No artifact found with filename '{filename}' in type '{artifact_type}'" | |
| ) | |
| # Return most recent (last in sorted list) | |
| return sorted(matching)[-1] | |
| class ArtifactStore: | |
| """ | |
| Unified interface for artifact storage. | |
| Automatically routes to correct backend based on configuration. | |
| Tools use this class and never directly interact with backends. | |
| """ | |
| def __init__(self, backend: Optional[StorageBackend] = None): | |
| """ | |
| Initialize artifact store with backend. | |
| Args: | |
| backend: Storage backend (auto-detected if None) | |
| """ | |
| if backend is None: | |
| backend = self._detect_backend() | |
| self.backend = backend | |
| def _detect_backend(self) -> StorageBackend: | |
| """ | |
| Detect and initialize appropriate backend. | |
| Detection logic: | |
| 1. Check ARTIFACT_BACKEND env var (local|gcs) | |
| 2. If GCS, check for GCS_BUCKET_NAME | |
| 3. Fall back to local if anything fails | |
| Returns: | |
| Initialized storage backend | |
| """ | |
| backend_type = os.getenv("ARTIFACT_BACKEND", "local").lower() | |
| if backend_type == "gcs": | |
| try: | |
| # Try to initialize GCS | |
| bucket_name = os.getenv("GCS_BUCKET_NAME") | |
| if not bucket_name: | |
| print("⚠️ GCS backend requested but GCS_BUCKET_NAME not set. Falling back to local.") | |
| return LocalBackend() | |
| print(f"🔵 Initializing GCS backend (bucket: {bucket_name})") | |
| return GCSBackend(bucket_name=bucket_name) | |
| except Exception as e: | |
| print(f"⚠️ GCS backend initialization failed: {e}") | |
| print(" Falling back to local storage.") | |
| return LocalBackend() | |
| elif backend_type == "local": | |
| print("📁 Using local filesystem backend") | |
| return LocalBackend() | |
| else: | |
| print(f"⚠️ Unknown ARTIFACT_BACKEND: {backend_type}. Using local.") | |
| return LocalBackend() | |
| def save_model( | |
| self, | |
| local_path: Union[str, Path], | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Save machine learning model. | |
| Args: | |
| local_path: Path to model file (e.g., model.pkl) | |
| metadata: Optional metadata (accuracy, hyperparameters, etc.) | |
| Returns: | |
| Storage path or URI where model was saved | |
| Example: | |
| store = ArtifactStore() | |
| path = store.save_model( | |
| "model.pkl", | |
| metadata={"accuracy": 0.95, "model_type": "RandomForest"} | |
| ) | |
| """ | |
| return self.backend.save_file(local_path, "model", metadata) | |
| def save_plot( | |
| self, | |
| local_path: Union[str, Path], | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Save visualization plot. | |
| Args: | |
| local_path: Path to plot file (e.g., plot.html, plot.png) | |
| metadata: Optional metadata (plot type, columns, etc.) | |
| Returns: | |
| Storage path or URI where plot was saved | |
| Example: | |
| store = ArtifactStore() | |
| path = store.save_plot( | |
| "correlation_heatmap.html", | |
| metadata={"plot_type": "heatmap", "columns": ["age", "income"]} | |
| ) | |
| """ | |
| return self.backend.save_file(local_path, "plot", metadata) | |
| def save_report( | |
| self, | |
| local_path: Union[str, Path], | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Save analysis report. | |
| Args: | |
| local_path: Path to report file (e.g., report.html) | |
| metadata: Optional metadata (report type, dataset, etc.) | |
| Returns: | |
| Storage path or URI where report was saved | |
| Example: | |
| store = ArtifactStore() | |
| path = store.save_report( | |
| "eda_report.html", | |
| metadata={"report_type": "ydata_profiling", "dataset": "titanic"} | |
| ) | |
| """ | |
| return self.backend.save_file(local_path, "report", metadata) | |
| def save_data( | |
| self, | |
| local_path: Union[str, Path], | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Save processed data file. | |
| Args: | |
| local_path: Path to data file (e.g., cleaned.csv) | |
| metadata: Optional metadata (transformation steps, row count, etc.) | |
| Returns: | |
| Storage path or URI where data was saved | |
| Example: | |
| store = ArtifactStore() | |
| path = store.save_data( | |
| "cleaned_data.csv", | |
| metadata={"rows": 1000, "columns": 20, "transformations": ["drop_na", "encode"]} | |
| ) | |
| """ | |
| return self.backend.save_file(local_path, "data", metadata) | |
| def save_code( | |
| self, | |
| local_path: Union[str, Path], | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Save code interpreter output. | |
| Args: | |
| local_path: Path to code output file | |
| metadata: Optional metadata (execution time, etc.) | |
| Returns: | |
| Storage path or URI where file was saved | |
| """ | |
| return self.backend.save_file(local_path, "code", metadata) | |
| def list_artifacts(self, artifact_type: str) -> list[str]: | |
| """ | |
| List all artifacts of a specific type. | |
| Args: | |
| artifact_type: Type of artifact (model, plot, report, data, code) | |
| Returns: | |
| List of artifact paths or URIs | |
| Example: | |
| store = ArtifactStore() | |
| models = store.list_artifacts("model") | |
| plots = store.list_artifacts("plot") | |
| """ | |
| return self.backend.list_artifacts(artifact_type) | |
| def list_models(self) -> list[str]: | |
| """List all saved models.""" | |
| return self.backend.list_artifacts("model") | |
| def list_plots(self) -> list[str]: | |
| """List all saved plots.""" | |
| return self.backend.list_artifacts("plot") | |
| def list_reports(self) -> list[str]: | |
| """List all saved reports.""" | |
| return self.backend.list_artifacts("report") | |
| def list_data_files(self) -> list[str]: | |
| """List all saved data files.""" | |
| return self.backend.list_artifacts("data") | |
| def get_backend_info(self) -> Dict[str, Any]: | |
| """ | |
| Get information about current backend. | |
| Returns: | |
| Backend configuration details | |
| """ | |
| if isinstance(self.backend, LocalBackend): | |
| return { | |
| "type": "local", | |
| "base_path": str(self.backend.base_dir.resolve()), | |
| "base_dir": str(self.backend.base_dir.resolve()), | |
| "subdirs": {k: str(v) for k, v in self.backend.subdirs.items()} | |
| } | |
| elif isinstance(self.backend, GCSBackend): | |
| return { | |
| "type": "gcs", | |
| "base_path": f"gs://{self.backend.bucket_name}/{self.backend.base_prefix}", | |
| "bucket": self.backend.bucket_name, | |
| "project": self.backend.project_id, | |
| "base_prefix": self.backend.base_prefix | |
| } | |
| else: | |
| return {"type": "unknown", "base_path": "unknown"} | |
| # Singleton instance | |
| _artifact_store_instance: Optional[ArtifactStore] = None | |
| def get_artifact_store(backend: Optional[StorageBackend] = None) -> ArtifactStore: | |
| """ | |
| Get singleton instance of ArtifactStore. | |
| This ensures all tools use the same backend configuration. | |
| Args: | |
| backend: Optional backend (for testing or custom configuration) | |
| Returns: | |
| Singleton ArtifactStore instance | |
| Example: | |
| from storage import get_artifact_store | |
| store = get_artifact_store() | |
| path = store.save_model("model.pkl", metadata={"accuracy": 0.95}) | |
| """ | |
| global _artifact_store_instance | |
| if _artifact_store_instance is None or backend is not None: | |
| _artifact_store_instance = ArtifactStore(backend=backend) | |
| return _artifact_store_instance | |
| def reset_artifact_store(): | |
| """ | |
| Reset singleton instance (useful for testing). | |
| """ | |
| global _artifact_store_instance | |
| _artifact_store_instance = None | |