sirus / backend /ml_module /services /project_adapter.py
ranilmukesh's picture
Deploy SiRUS SQL Agent backend
783a952
"""Small adapter for ML-ETLAgent to interact with ProjectService in a minimal way.
This file provides:
- ProjectAdapter: thin wrapper that exposes the tiny set of methods the agent needs.
- A small demo (runnable) that uses a MockStorage to show typical calls.
Keep this tiny and dependency-free so the agent can import and use it easily.
"""
from __future__ import annotations
import json
from typing import Literal, Optional
from ml_module.services.project_service import ProjectService
from ml_module.services.storage_service import MLStorageService
from ml_module.core.artifacts import StoredArtifactInfo
class ProjectAdapter:
"""Thin adapter exposing only the methods ML-ETLAgent typically needs.
Methods intentionally keep semantics simple so the agent code stays readable.
"""
def __init__(self, storage_service: MLStorageService):
self.storage = storage_service
self._svc = ProjectService(self.storage)
def create_project(self, user_id: str, project_name: str):
return self._svc.create_project(user_id, project_name)
def get_project(self, user_id: str, project_id: str):
return self._svc.get_project(user_id, project_id)
def ensure_project(self, user_id: str, project_name: str):
"""Create a project if none exists for user with that name.
Note: this simple implementation does a linear scan of the user's
projects. For production you may want an indexed lookup.
Returns: Project object
"""
projects = self._svc.list_projects_for_user(user_id)
for p in projects:
if getattr(p, 'project_name', None) == project_name:
return p
return self._svc.create_project(user_id, project_name)
def get_artifact_path(self, user_id: str, project_id: str, artifact_type: str, version: Optional[int] = None) -> str:
return self._svc.get_artifact_path(user_id, project_id, artifact_type, version)
def bump_version_and_get_path(self, user_id: str, project_id: str, artifact_type: str) -> str:
"""Increment the appropriate version and return the path where the
agent should write the artifact.
"""
# Map artifact_type to the semantic version type used by ProjectService
version_map = {
'raw': 'raw',
'processed': 'processed',
'model': 'model',
'evaluation': 'evaluation'
}
if artifact_type not in version_map:
# For artifact labels like 'processed_csv' or 'model_artifact'
# try to match the prefix
if artifact_type.startswith('processed'):
v_type = 'processed'
elif artifact_type.startswith('model'):
v_type = 'model'
else:
v_type = 'processed'
else:
v_type = version_map[artifact_type]
project = self._svc.bump_version(user_id, project_id, v_type)
# After bump, return canonical path for the new version
new_version = getattr(project.versions, v_type)
return self._svc.get_artifact_path(user_id, project_id, artifact_type, version=new_version)
def save_json(self, data, path: str) -> StoredArtifactInfo:
return self.storage.save_json(data, path)
def save_text(self, text: str, path: str) -> StoredArtifactInfo:
return self.storage.save_text(text, path)
def register_artifact(
self,
user_id: str,
project_id: str,
artifact_type: str,
version: int,
info: StoredArtifactInfo,
*,
version_scope: Optional[Literal["raw", "processed", "model", "evaluation", "draft"]] = None,
extra_metadata: Optional[dict] = None,
soft_delete: bool = True,
) -> dict:
return self._svc.register_artifact(
user_id,
project_id,
artifact_type,
version,
info,
version_scope=version_scope,
extra_metadata=extra_metadata,
soft_delete=soft_delete,
)
# ----------------- Demo / sanity runner -----------------
def _demo():
"""Run a tiny demo using an in-memory MockStorage to illustrate usage."""
class MockStorage:
def __init__(self):
self._store = {}
# The ProjectService expects these methods at minimum
def save_json(self, data, path):
self._store[path] = json.dumps(data)
def load_json(self, path):
raw = self._store[path]
return json.loads(raw)
def list_project_files(self, user_id, project_id):
prefix = f"{user_id}/{project_id}/"
return [ {"path": k} for k in self._store.keys() if k.startswith(prefix) ]
# Minimal attributes used by other code
client = None
storage = MockStorage()
adapter = ProjectAdapter(storage) # type: ignore[arg-type]
# Create project
p = adapter.create_project('alice', 'demo')
print('created project:', p.project_id)
# Get canonical path for processed CSV (before any bump)
path_before = adapter.get_artifact_path('alice', p.project_id, 'processed_csv')
print('path before bump:', path_before)
# Bump processed version and get writable path
writable = adapter.bump_version_and_get_path('alice', p.project_id, 'processed')
print('writable path after bump:', writable)
# Save a tiny metadata JSON to that path
adapter.save_json({'rows': 10}, writable)
# Confirm stored
print('stored keys:', list(storage._store.keys()))
if __name__ == '__main__':
_demo()