Spaces:
Running
Running
| """Small adapter for ML-ETLAgent to interact with ProjectService in a minimal way. | |
| This file provides: | |
| - ProjectAdapter: thin wrapper that exposes the tiny set of methods the agent needs. | |
| - A small demo (runnable) that uses a MockStorage to show typical calls. | |
| Keep this tiny and dependency-free so the agent can import and use it easily. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from typing import Literal, Optional | |
| from ml_module.services.project_service import ProjectService | |
| from ml_module.services.storage_service import MLStorageService | |
| from ml_module.core.artifacts import StoredArtifactInfo | |
| class ProjectAdapter: | |
| """Thin adapter exposing only the methods ML-ETLAgent typically needs. | |
| Methods intentionally keep semantics simple so the agent code stays readable. | |
| """ | |
| def __init__(self, storage_service: MLStorageService): | |
| self.storage = storage_service | |
| self._svc = ProjectService(self.storage) | |
| def create_project(self, user_id: str, project_name: str): | |
| return self._svc.create_project(user_id, project_name) | |
| def get_project(self, user_id: str, project_id: str): | |
| return self._svc.get_project(user_id, project_id) | |
| def ensure_project(self, user_id: str, project_name: str): | |
| """Create a project if none exists for user with that name. | |
| Note: this simple implementation does a linear scan of the user's | |
| projects. For production you may want an indexed lookup. | |
| Returns: Project object | |
| """ | |
| projects = self._svc.list_projects_for_user(user_id) | |
| for p in projects: | |
| if getattr(p, 'project_name', None) == project_name: | |
| return p | |
| return self._svc.create_project(user_id, project_name) | |
| def get_artifact_path(self, user_id: str, project_id: str, artifact_type: str, version: Optional[int] = None) -> str: | |
| return self._svc.get_artifact_path(user_id, project_id, artifact_type, version) | |
| def bump_version_and_get_path(self, user_id: str, project_id: str, artifact_type: str) -> str: | |
| """Increment the appropriate version and return the path where the | |
| agent should write the artifact. | |
| """ | |
| # Map artifact_type to the semantic version type used by ProjectService | |
| version_map = { | |
| 'raw': 'raw', | |
| 'processed': 'processed', | |
| 'model': 'model', | |
| 'evaluation': 'evaluation' | |
| } | |
| if artifact_type not in version_map: | |
| # For artifact labels like 'processed_csv' or 'model_artifact' | |
| # try to match the prefix | |
| if artifact_type.startswith('processed'): | |
| v_type = 'processed' | |
| elif artifact_type.startswith('model'): | |
| v_type = 'model' | |
| else: | |
| v_type = 'processed' | |
| else: | |
| v_type = version_map[artifact_type] | |
| project = self._svc.bump_version(user_id, project_id, v_type) | |
| # After bump, return canonical path for the new version | |
| new_version = getattr(project.versions, v_type) | |
| return self._svc.get_artifact_path(user_id, project_id, artifact_type, version=new_version) | |
| def save_json(self, data, path: str) -> StoredArtifactInfo: | |
| return self.storage.save_json(data, path) | |
| def save_text(self, text: str, path: str) -> StoredArtifactInfo: | |
| return self.storage.save_text(text, path) | |
| def register_artifact( | |
| self, | |
| user_id: str, | |
| project_id: str, | |
| artifact_type: str, | |
| version: int, | |
| info: StoredArtifactInfo, | |
| *, | |
| version_scope: Optional[Literal["raw", "processed", "model", "evaluation", "draft"]] = None, | |
| extra_metadata: Optional[dict] = None, | |
| soft_delete: bool = True, | |
| ) -> dict: | |
| return self._svc.register_artifact( | |
| user_id, | |
| project_id, | |
| artifact_type, | |
| version, | |
| info, | |
| version_scope=version_scope, | |
| extra_metadata=extra_metadata, | |
| soft_delete=soft_delete, | |
| ) | |
| # ----------------- Demo / sanity runner ----------------- | |
| def _demo(): | |
| """Run a tiny demo using an in-memory MockStorage to illustrate usage.""" | |
| class MockStorage: | |
| def __init__(self): | |
| self._store = {} | |
| # The ProjectService expects these methods at minimum | |
| def save_json(self, data, path): | |
| self._store[path] = json.dumps(data) | |
| def load_json(self, path): | |
| raw = self._store[path] | |
| return json.loads(raw) | |
| def list_project_files(self, user_id, project_id): | |
| prefix = f"{user_id}/{project_id}/" | |
| return [ {"path": k} for k in self._store.keys() if k.startswith(prefix) ] | |
| # Minimal attributes used by other code | |
| client = None | |
| storage = MockStorage() | |
| adapter = ProjectAdapter(storage) # type: ignore[arg-type] | |
| # Create project | |
| p = adapter.create_project('alice', 'demo') | |
| print('created project:', p.project_id) | |
| # Get canonical path for processed CSV (before any bump) | |
| path_before = adapter.get_artifact_path('alice', p.project_id, 'processed_csv') | |
| print('path before bump:', path_before) | |
| # Bump processed version and get writable path | |
| writable = adapter.bump_version_and_get_path('alice', p.project_id, 'processed') | |
| print('writable path after bump:', writable) | |
| # Save a tiny metadata JSON to that path | |
| adapter.save_json({'rows': 10}, writable) | |
| # Confirm stored | |
| print('stored keys:', list(storage._store.keys())) | |
| if __name__ == '__main__': | |
| _demo() | |