"""Small adapter for ML-ETLAgent to interact with ProjectService in a minimal way. This file provides: - ProjectAdapter: thin wrapper that exposes the tiny set of methods the agent needs. - A small demo (runnable) that uses a MockStorage to show typical calls. Keep this tiny and dependency-free so the agent can import and use it easily. """ from __future__ import annotations import json from typing import Literal, Optional from ml_module.services.project_service import ProjectService from ml_module.services.storage_service import MLStorageService from ml_module.core.artifacts import StoredArtifactInfo class ProjectAdapter: """Thin adapter exposing only the methods ML-ETLAgent typically needs. Methods intentionally keep semantics simple so the agent code stays readable. """ def __init__(self, storage_service: MLStorageService): self.storage = storage_service self._svc = ProjectService(self.storage) def create_project(self, user_id: str, project_name: str): return self._svc.create_project(user_id, project_name) def get_project(self, user_id: str, project_id: str): return self._svc.get_project(user_id, project_id) def ensure_project(self, user_id: str, project_name: str): """Create a project if none exists for user with that name. Note: this simple implementation does a linear scan of the user's projects. For production you may want an indexed lookup. Returns: Project object """ projects = self._svc.list_projects_for_user(user_id) for p in projects: if getattr(p, 'project_name', None) == project_name: return p return self._svc.create_project(user_id, project_name) def get_artifact_path(self, user_id: str, project_id: str, artifact_type: str, version: Optional[int] = None) -> str: return self._svc.get_artifact_path(user_id, project_id, artifact_type, version) def bump_version_and_get_path(self, user_id: str, project_id: str, artifact_type: str) -> str: """Increment the appropriate version and return the path where the agent should write the artifact. """ # Map artifact_type to the semantic version type used by ProjectService version_map = { 'raw': 'raw', 'processed': 'processed', 'model': 'model', 'evaluation': 'evaluation' } if artifact_type not in version_map: # For artifact labels like 'processed_csv' or 'model_artifact' # try to match the prefix if artifact_type.startswith('processed'): v_type = 'processed' elif artifact_type.startswith('model'): v_type = 'model' else: v_type = 'processed' else: v_type = version_map[artifact_type] project = self._svc.bump_version(user_id, project_id, v_type) # After bump, return canonical path for the new version new_version = getattr(project.versions, v_type) return self._svc.get_artifact_path(user_id, project_id, artifact_type, version=new_version) def save_json(self, data, path: str) -> StoredArtifactInfo: return self.storage.save_json(data, path) def save_text(self, text: str, path: str) -> StoredArtifactInfo: return self.storage.save_text(text, path) def register_artifact( self, user_id: str, project_id: str, artifact_type: str, version: int, info: StoredArtifactInfo, *, version_scope: Optional[Literal["raw", "processed", "model", "evaluation", "draft"]] = None, extra_metadata: Optional[dict] = None, soft_delete: bool = True, ) -> dict: return self._svc.register_artifact( user_id, project_id, artifact_type, version, info, version_scope=version_scope, extra_metadata=extra_metadata, soft_delete=soft_delete, ) # ----------------- Demo / sanity runner ----------------- def _demo(): """Run a tiny demo using an in-memory MockStorage to illustrate usage.""" class MockStorage: def __init__(self): self._store = {} # The ProjectService expects these methods at minimum def save_json(self, data, path): self._store[path] = json.dumps(data) def load_json(self, path): raw = self._store[path] return json.loads(raw) def list_project_files(self, user_id, project_id): prefix = f"{user_id}/{project_id}/" return [ {"path": k} for k in self._store.keys() if k.startswith(prefix) ] # Minimal attributes used by other code client = None storage = MockStorage() adapter = ProjectAdapter(storage) # type: ignore[arg-type] # Create project p = adapter.create_project('alice', 'demo') print('created project:', p.project_id) # Get canonical path for processed CSV (before any bump) path_before = adapter.get_artifact_path('alice', p.project_id, 'processed_csv') print('path before bump:', path_before) # Bump processed version and get writable path writable = adapter.bump_version_and_get_path('alice', p.project_id, 'processed') print('writable path after bump:', writable) # Save a tiny metadata JSON to that path adapter.save_json({'rows': 10}, writable) # Confirm stored print('stored keys:', list(storage._store.keys())) if __name__ == '__main__': _demo()