Spaces:
Running
Running
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import pandas as pd | |
| import httpx | |
| from agno.tools import Toolkit | |
| from agno.utils.log import logger as agno_logger | |
| try: | |
| from backend.ml_module.services.storage_service import MLStorageService | |
| from backend.ml_module.core.constants import StoragePaths | |
| except ImportError: | |
| from ml_module.services.storage_service import MLStorageService | |
| from ml_module.core.constants import StoragePaths | |
| from agno.run import RunContext | |
| logger = logging.getLogger(__name__) | |
| DATA_SOURCES_API_BASE_URL = os.environ.get("DATA_SOURCES_API_BASE_URL", "http://127.0.0.1:8000") | |
| DEFAULT_REQUEST_TIMEOUT = float(os.environ.get("TENANT_FILES_TOOL_TIMEOUT", "120")) | |
| class TenantFileToolkit(Toolkit): | |
| """ | |
| Toolkit for listing, reading, and writing files stored in MinIO, | |
| scoped securely to a tenant. | |
| This toolkit relies on `session_state` to inject the `tenant_id` to ensure | |
| that an agent cannot access files belonging to another tenant. | |
| """ | |
| def __init__(self, storage_service: Optional[MLStorageService] = None): | |
| super().__init__(name="tenant_file_toolkit") | |
| self.storage = storage_service or MLStorageService() | |
| self.api_base_url = DATA_SOURCES_API_BASE_URL | |
| self.register(self.list_tenant_assets_structured) | |
| self.register(self.list_tenant_assets) | |
| self.register(self.load_tenant_file_to_dataframe) | |
| self.register(self.stage_tenant_asset_for_ml) | |
| def _get_tenant_id(self, run_context: Optional["RunContext"] = None) -> str: | |
| """Extracts the tenant ID securely from the agent's run session.""" | |
| if run_context and run_context.session_state and "tenant_id" in run_context.session_state: | |
| return run_context.session_state["tenant_id"] | |
| logger.warning("No tenant_id found in Agent run_context or session_state. Falling back to default 'unknown_tenant'.") | |
| return "unknown_tenant" | |
| def _get_session_state(self, run_context: Optional["RunContext"] = None) -> Dict[str, str]: | |
| if run_context and run_context.session_state: | |
| return run_context.session_state | |
| return {} | |
| def _fetch_assets_from_api( | |
| self, | |
| tenant_id: str, | |
| jwt_token: str, | |
| page_size: int = 200, | |
| max_pages: int = 5, | |
| ) -> Dict[str, List[Dict[str, str]]]: | |
| headers = {"Authorization": f"Bearer {jwt_token}"} | |
| datasets: List[Dict[str, str]] = [] | |
| models: List[Dict[str, str]] = [] | |
| reports: List[Dict[str, str]] = [] | |
| other: List[Dict[str, str]] = [] | |
| timeout = httpx.Timeout(DEFAULT_REQUEST_TIMEOUT) | |
| try: | |
| with httpx.Client(base_url=self.api_base_url, timeout=timeout, follow_redirects=True) as client: | |
| page = 1 | |
| total_pages = 1 | |
| while page <= total_pages and page <= max_pages: | |
| try: | |
| resp = client.get( | |
| f"/api/v1/tenant-files/assets?page={page}&page_size={page_size}", | |
| headers=headers, | |
| ) | |
| except httpx.TimeoutException as exc: | |
| return { | |
| "error": ( | |
| f"Tenant files API timed out for tenant={tenant_id} " | |
| f"base_url={self.api_base_url} timeout={DEFAULT_REQUEST_TIMEOUT}s error={exc}" | |
| ) | |
| } | |
| except httpx.HTTPError as exc: | |
| return { | |
| "error": ( | |
| f"Tenant files API HTTP error for tenant={tenant_id} " | |
| f"base_url={self.api_base_url} error={exc}" | |
| ) | |
| } | |
| if resp.status_code != 200: | |
| return { | |
| "error": f"Tenant files API request failed ({resp.status_code}): {resp.text[:300]}" | |
| } | |
| payload = resp.json() | |
| items = payload.get("items", []) | |
| total_pages = int(payload.get("total_pages", 1) or 1) | |
| for item in items: | |
| filename = item.get("filename", "unknown") | |
| file_type = (item.get("file_type") or "").lower() | |
| created_at = item.get("created_at", "") | |
| size_mb = round(float(item.get("size_bytes", 0)) / (1024 * 1024), 2) | |
| asset_id = item.get("asset_id", "") | |
| record = { | |
| "path": f"{tenant_id}/files/{filename}", | |
| "asset_id": asset_id, | |
| "filename": filename, | |
| "file_type": file_type, | |
| "size_mb": size_mb, | |
| "last_modified": created_at, | |
| } | |
| if file_type in {"csv", "xlsx", "xls", "parquet"}: | |
| datasets.append(record) | |
| elif file_type in {"joblib", "pkl", "onnx"}: | |
| models.append(record) | |
| elif file_type in {"json", "md", "txt", "html"}: | |
| reports.append(record) | |
| else: | |
| other.append(record) | |
| page += 1 | |
| except Exception as exc: | |
| return { | |
| "error": ( | |
| f"Unexpected tenant files API error for tenant={tenant_id} " | |
| f"base_url={self.api_base_url} error={exc}" | |
| ) | |
| } | |
| return { | |
| "search_prefix": f"{tenant_id}/", | |
| "datasets": datasets, | |
| "models": models, | |
| "reports": reports, | |
| "other": other, | |
| } | |
| def _fetch_asset_preview_from_api( | |
| self, | |
| asset_id: str, | |
| jwt_token: str, | |
| page_size: int = 200, | |
| sheet_name: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| headers = {"Authorization": f"Bearer {jwt_token}"} | |
| timeout = httpx.Timeout(DEFAULT_REQUEST_TIMEOUT) | |
| params: Dict[str, Any] = {"page": 1, "page_size": max(1, min(page_size, 500))} | |
| if sheet_name: | |
| params["sheet_name"] = sheet_name | |
| try: | |
| with httpx.Client(base_url=self.api_base_url, timeout=timeout, follow_redirects=True) as client: | |
| resp = client.get( | |
| f"/api/v1/tenant-files/assets/{asset_id}/preview", | |
| headers=headers, | |
| params=params, | |
| ) | |
| if resp.status_code != 200: | |
| return { | |
| "error": f"Tenant files preview failed ({resp.status_code}): {resp.text[:300]}" | |
| } | |
| return resp.json() | |
| except httpx.TimeoutException as exc: | |
| return { | |
| "error": ( | |
| f"Tenant files preview timed out asset_id={asset_id} " | |
| f"base_url={self.api_base_url} timeout={DEFAULT_REQUEST_TIMEOUT}s error={exc}" | |
| ) | |
| } | |
| except httpx.HTTPError as exc: | |
| return { | |
| "error": ( | |
| f"Tenant files preview HTTP error asset_id={asset_id} " | |
| f"base_url={self.api_base_url} error={exc}" | |
| ) | |
| } | |
| def _resolve_asset_from_path( | |
| self, | |
| file_path: str, | |
| tenant_id: str, | |
| jwt_token: str, | |
| run_context: Optional["RunContext"] = None, | |
| ) -> Optional[Dict[str, Any]]: | |
| catalog = self.list_tenant_assets_structured(prefix="", run_context=run_context) | |
| if not isinstance(catalog, dict): | |
| return None | |
| candidates: List[Dict[str, Any]] = [] | |
| for group in ("datasets", "models", "reports", "other"): | |
| candidates.extend(catalog.get(group, []) or []) | |
| normalized = file_path.strip().lower() | |
| basename = file_path.split("/")[-1].strip().lower() | |
| for item in candidates: | |
| item_path = str(item.get("path", "")).strip().lower() | |
| item_name = str(item.get("filename", "")).strip().lower() | |
| if normalized and (normalized == item_path or normalized == item_name): | |
| return item | |
| if basename and (basename == item_name or basename == item_path.split("/")[-1]): | |
| return item | |
| # Final safety: if caller passed tenant-prefixed path without filename metadata, | |
| # try exact filename match from tail segment. | |
| if basename: | |
| for item in candidates: | |
| if str(item.get("filename", "")).strip().lower() == basename: | |
| return item | |
| return None | |
| def _sanitize_local_filename(self, filename: str) -> str: | |
| candidate = Path(filename).name.strip() | |
| if not candidate or candidate in {".", ".."}: | |
| raise ValueError("Invalid filename") | |
| return candidate | |
| def _resolve_workspace_dir(self, run_context: Optional["RunContext"] = None) -> Path: | |
| session_state = self._get_session_state(run_context) | |
| workspace_value = str(session_state.get("workspace") or "").strip() | |
| if not workspace_value: | |
| raise ValueError("Missing workspace in run context") | |
| workspace = Path(workspace_value).resolve() | |
| workspace.mkdir(parents=True, exist_ok=True) | |
| return workspace | |
| def _download_all_rows_from_api( | |
| self, | |
| asset_id: str, | |
| jwt_token: str, | |
| ) -> pd.DataFrame: | |
| """Download all rows for a tenant asset by paginating the preview API. | |
| User-uploaded files live in tenant-files (not ml-projects), so direct | |
| MinIO reads fail with NoSuchKey. The preview endpoint is the correct | |
| access path. | |
| """ | |
| PAGE_SIZE = 500 | |
| headers = {"Authorization": f"Bearer {jwt_token}"} | |
| timeout = httpx.Timeout(DEFAULT_REQUEST_TIMEOUT) | |
| all_rows: list = [] | |
| columns: list = [] | |
| page = 1 | |
| with httpx.Client(base_url=self.api_base_url, timeout=timeout, follow_redirects=True) as client: | |
| while True: | |
| params: Dict[str, Any] = {"page": page, "page_size": PAGE_SIZE} | |
| resp = client.get( | |
| f"/api/v1/tenant-files/assets/{asset_id}/preview", | |
| headers=headers, | |
| params=params, | |
| ) | |
| if resp.status_code != 200: | |
| raise RuntimeError( | |
| f"Preview API failed ({resp.status_code}): {resp.text[:300]}" | |
| ) | |
| payload = resp.json() | |
| rows = payload.get("rows") or [] | |
| if not columns: | |
| columns = payload.get("columns") or [] | |
| all_rows.extend(rows) | |
| if len(rows) < PAGE_SIZE: | |
| break | |
| page += 1 | |
| return pd.DataFrame(all_rows) if all_rows else pd.DataFrame(columns=columns) | |
| def list_tenant_assets( | |
| self, | |
| prefix: str = "", | |
| run_context: Optional["RunContext"] = None | |
| ) -> str: | |
| """ | |
| Lists available CSV files, datasets, and reports for the tenant in the file storage cluster (MinIO). | |
| This tool should be used first to explore what data files are available before querying them. | |
| Args: | |
| prefix (str, optional): A specific folder prefix to list. If empty, it lists the root of the tenant's workspace. | |
| run_context: Agno RunContext (auto-injected). | |
| Returns: | |
| str: A formatted markdown representation of the available files and their sizes. | |
| """ | |
| structured = self.list_tenant_assets_structured(prefix=prefix, run_context=run_context) | |
| if isinstance(structured, str): | |
| return structured | |
| if structured.get("error"): | |
| return ( | |
| "Unable to reliably list tenant assets right now. " | |
| f"Tenant-files API error: {structured['error']}" | |
| ) | |
| search_prefix = structured.get("search_prefix", "") | |
| output = [f"## Assets for Tenant Workspace (`{search_prefix}`)"] | |
| def _format_lines(items): | |
| return [ | |
| f"- `{item['path']}` ({item['size_mb']} MB) [Last Modified: {item['last_modified']}]" | |
| for item in items | |
| ] | |
| if structured.get("datasets"): | |
| output.append("### Datasets (CSV/Excel/Parquet)") | |
| output.extend(_format_lines(structured["datasets"])) | |
| if structured.get("models"): | |
| output.append("\n### Models (.joblib/.pkl/.onnx)") | |
| output.extend(_format_lines(structured["models"])) | |
| if structured.get("reports"): | |
| output.append("\n### Reports (.json/.md/.txt/.html)") | |
| output.extend(_format_lines(structured["reports"])) | |
| if structured.get("other"): | |
| output.append("\n### Other Artifacts") | |
| output.extend(_format_lines(structured["other"])) | |
| if len(output) == 1: | |
| return f"No assets found for prefix: `{search_prefix}`." | |
| return "\n".join(output) | |
| def list_tenant_assets_structured( | |
| self, | |
| prefix: str = "", | |
| run_context: Optional["RunContext"] = None, | |
| ) -> Dict[str, Any]: | |
| """Return machine-friendly grouped asset catalog for a tenant.""" | |
| session_state = self._get_session_state(run_context) | |
| tenant_id = self._get_tenant_id(run_context) | |
| jwt_token = (session_state.get("supabase_jwt") or "").strip() | |
| api_error: Optional[str] = None | |
| # Primary path: use tenant-files API so agent sees the same assets as UI. | |
| if jwt_token: | |
| api_result = self._fetch_assets_from_api(tenant_id=tenant_id, jwt_token=jwt_token) | |
| if "error" not in api_result: | |
| return api_result | |
| api_error = str(api_result["error"]) | |
| logger.warning(api_error) | |
| search_prefix = f"{tenant_id}/" | |
| if prefix: | |
| search_prefix = f"{tenant_id}/{prefix.lstrip('/')}" | |
| if not self.storage.client: | |
| if api_error: | |
| return { | |
| "search_prefix": search_prefix, | |
| "datasets": [], | |
| "models": [], | |
| "reports": [], | |
| "other": [], | |
| "error": api_error, | |
| } | |
| return "Storage client is unavailable." | |
| try: | |
| objects = self.storage.client.list_objects( | |
| self.storage.bucket_name, | |
| prefix=search_prefix, | |
| recursive=True, | |
| ) | |
| datasets: List[Dict[str, str]] = [] | |
| models: List[Dict[str, str]] = [] | |
| reports: List[Dict[str, str]] = [] | |
| other: List[Dict[str, str]] = [] | |
| for obj in objects: | |
| path = obj.object_name | |
| file_info = { | |
| "path": path, | |
| "size_mb": round(obj.size / (1024 * 1024), 2), | |
| "last_modified": obj.last_modified.strftime('%Y-%m-%d %H:%M:%S'), | |
| } | |
| lower_path = path.lower() | |
| if lower_path.endswith((".csv", ".xlsx", ".xls", ".parquet")): | |
| datasets.append(file_info) | |
| elif lower_path.endswith((".joblib", ".pkl", ".onnx")): | |
| models.append(file_info) | |
| elif lower_path.endswith((".json", ".md", ".txt", ".html")): | |
| reports.append(file_info) | |
| else: | |
| other.append(file_info) | |
| result = { | |
| "search_prefix": search_prefix, | |
| "datasets": datasets, | |
| "models": models, | |
| "reports": reports, | |
| "other": other, | |
| } | |
| if api_error and not any([datasets, models, reports, other]): | |
| result["error"] = api_error | |
| return result | |
| except Exception as e: | |
| error_msg = f"Failed to list tenant assets: {str(e)}" | |
| logger.error(error_msg) | |
| return error_msg | |
| def load_tenant_file_to_dataframe( | |
| self, | |
| file_path: str, | |
| chunksize: Optional[int] = 10000, | |
| run_context: Optional["RunContext"] = None | |
| ) -> str: | |
| """ | |
| Reads a tenant dataset (CSV/XLSX/XLS/Parquet) into memory safely. | |
| Prefers tenant-files API preview so agent sees exactly what UI uploaded assets expose. | |
| Args: | |
| file_path (str): The full path to the file in MinIO (e.g., 'tenant_123/files/my_data.csv'). | |
| chunksize (int, optional): The number of rows to load at a time to prevent memory overflow. Defaults to 10000. | |
| run_context: Agno RunContext (auto-injected). | |
| Returns: | |
| str: A summary of the loaded DataFrame (columns, memory usage, head of the data), or an error message. | |
| """ | |
| tenant_id = self._get_tenant_id(run_context) | |
| session_state = self._get_session_state(run_context) | |
| jwt_token = (session_state.get("supabase_jwt") or "").strip() | |
| logger.info(f"Loading tenant file as DataFrame: {file_path}") | |
| # Primary path: resolve asset from tenant-files API and preview it. | |
| # This keeps behavior aligned with UI uploads and supports XLSX correctly. | |
| if jwt_token: | |
| resolved_asset = self._resolve_asset_from_path( | |
| file_path=file_path, | |
| tenant_id=tenant_id, | |
| jwt_token=jwt_token, | |
| run_context=run_context, | |
| ) | |
| if resolved_asset: | |
| asset_id = resolved_asset.get("asset_id") | |
| preview_payload = self._fetch_asset_preview_from_api( | |
| asset_id=asset_id, | |
| jwt_token=jwt_token, | |
| page_size=(chunksize or 100), | |
| ) | |
| if "error" in preview_payload: | |
| logger.error(preview_payload["error"]) | |
| return f"Failed to preview file `{file_path}` via tenant-files API: {preview_payload['error']}" | |
| rows = preview_payload.get("rows", []) or [] | |
| columns = preview_payload.get("columns", []) or [] | |
| if rows: | |
| df = pd.DataFrame(rows) | |
| else: | |
| df = pd.DataFrame(columns=columns) | |
| df_info = self._get_dataframe_summary(df, is_chunk=True) | |
| return ( | |
| f"Successfully previewed tenant asset `{resolved_asset.get('filename', file_path)}` " | |
| f"(asset_id: `{asset_id}`):\n{df_info}" | |
| ) | |
| # Fallback path: direct object read (legacy behavior) | |
| # Keep strict tenant-prefix check here for safety. | |
| if not file_path.startswith(f"{tenant_id}/"): | |
| return ( | |
| f"Access Denied: file path `{file_path}` is not in tenant scope `{tenant_id}/...`. " | |
| f"Try passing the exact filename from list_tenant_assets output." | |
| ) | |
| try: | |
| df_or_iterator = self.storage.load_dataframe(file_path, chunksize=chunksize) | |
| if chunksize: | |
| first_chunk = next(df_or_iterator) | |
| df_info = self._get_dataframe_summary(first_chunk, is_chunk=True) | |
| return f"Successfully read FIRST CHUNK of file `{file_path}`:\n{df_info}" | |
| df_info = self._get_dataframe_summary(df_or_iterator) | |
| return f"Successfully loaded file `{file_path}`:\n{df_info}" | |
| except Exception as e: | |
| error_msg = f"Failed to load file `{file_path}`: {str(e)}" | |
| logger.error(error_msg) | |
| return error_msg | |
| def stage_tenant_asset_for_ml( | |
| self, | |
| file_path: str, | |
| asset_id: Optional[str] = None, | |
| version: int = 1, | |
| run_context: Optional["RunContext"] = None, | |
| ) -> str: | |
| """ | |
| Resolves a tenant asset and materializes it into the ML workspace so downstream | |
| ML tools can load it without relying on arbitrary filesystem discovery. | |
| Args: | |
| file_path: Filename or tenant-scoped path from list_tenant_assets. | |
| asset_id: Optional explicit asset identifier. | |
| version: Asset version to stage. | |
| run_context: Agno RunContext (auto-injected). | |
| Returns: | |
| str: Success message with the staged local path. | |
| """ | |
| tenant_id = self._get_tenant_id(run_context) | |
| session_state = self._get_session_state(run_context) | |
| jwt_token = (session_state.get("supabase_jwt") or "").strip() | |
| if not jwt_token: | |
| return "Failed to stage tenant asset: missing authenticated tenant session." | |
| resolved_asset: Optional[Dict[str, Any]] = None | |
| if asset_id: | |
| catalog = self.list_tenant_assets_structured(prefix="", run_context=run_context) | |
| if isinstance(catalog, dict): | |
| for group in ("datasets", "models", "reports", "other"): | |
| for item in catalog.get(group, []) or []: | |
| if str(item.get("asset_id", "")).strip() == asset_id: | |
| resolved_asset = item | |
| break | |
| if resolved_asset: | |
| break | |
| if resolved_asset is None: | |
| resolved_asset = self._resolve_asset_from_path( | |
| file_path=file_path, | |
| tenant_id=tenant_id, | |
| jwt_token=jwt_token, | |
| run_context=run_context, | |
| ) | |
| if resolved_asset is None: | |
| return ( | |
| f"Failed to stage tenant asset `{file_path}`. " | |
| "Call list_tenant_assets first and pass the exact filename or asset_id." | |
| ) | |
| resolved_asset_id = str(resolved_asset.get("asset_id") or asset_id or "").strip() | |
| filename = self._sanitize_local_filename(str(resolved_asset.get("filename") or file_path)) | |
| if not resolved_asset_id: | |
| return f"Failed to stage tenant asset `{filename}`: missing asset_id metadata." | |
| try: | |
| # Download via the tenant-files preview API (paginated) — user files | |
| # live in the tenant-files service, NOT in the ml-projects MinIO bucket. | |
| df = self._download_all_rows_from_api( | |
| asset_id=resolved_asset_id, | |
| jwt_token=jwt_token, | |
| ) | |
| workspace = self._resolve_workspace_dir(run_context) | |
| processed_dir = workspace / "processed" | |
| processed_dir.mkdir(parents=True, exist_ok=True) | |
| local_path = processed_dir / filename | |
| suffix = local_path.suffix.lower() | |
| if suffix == ".csv": | |
| df.to_csv(local_path, index=False) | |
| elif suffix in {".xlsx", ".xls"}: | |
| df.to_excel(local_path, index=False) | |
| elif suffix == ".parquet": | |
| df.to_parquet(local_path, index=False) | |
| else: | |
| local_path = local_path.with_suffix(".csv") | |
| df.to_csv(local_path, index=False) | |
| return ( | |
| f"Successfully staged tenant asset `{filename}` to `{local_path.as_posix()}` " | |
| f"for ML use. Rows: {len(df)}, Columns: {len(df.columns)}, asset_id: `{resolved_asset_id}`." | |
| ) | |
| except Exception as exc: | |
| logger.error("Failed to stage tenant asset %s for tenant %s: %s", filename, tenant_id, exc) | |
| return f"Failed to stage tenant asset `{filename}`: {exc}" | |
| def _get_dataframe_summary(self, df: pd.DataFrame, is_chunk: bool = False) -> str: | |
| """Generates a markdown summary of a pandas DataFrame.""" | |
| import io | |
| buffer = io.StringIO() | |
| df.info(buf=buffer) | |
| info_str = buffer.getvalue() | |
| try: | |
| head_md = df.head(5).to_markdown() | |
| except ImportError: | |
| head_md = f"```text\n{df.head(5).to_string(index=False)}\n```" | |
| summary = [ | |
| f"**Rows:** {len(df)}{' (in chunk)' if is_chunk else ''}", | |
| f"**Columns:** {len(df.columns)}", | |
| f"\n**Data Info:**", | |
| f"```text\n{info_str}\n```", | |
| f"\n**Sample Data (Head):**", | |
| head_md | |
| ] | |
| return "\n".join(summary) | |