Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| from typing import Any, TypedDict | |
| try: | |
| from .ckan_agent import AgentResult, build_retrieval_openui, run_ckan_agent | |
| from .ckan_support import DEFAULT_CKAN_ENDPOINT | |
| from .openui_support import _json_arg | |
| except ImportError: | |
| from ckan_agent import AgentResult, build_retrieval_openui, run_ckan_agent | |
| from ckan_support import DEFAULT_CKAN_ENDPOINT | |
| from openui_support import _json_arg | |
| class AgentWorkflowState(TypedDict, total=False): | |
| prompt: str | |
| ckan_endpoint: str | |
| dataset_path: str | |
| retrieval_result: dict[str, Any] | |
| openui_lang: str | |
| steps: list[dict[str, str]] | |
| intent: dict[str, str] | |
| def run_agent_workflow(prompt: str, ckan_endpoint: str | None = None, dataset_path: str | None = None) -> AgentWorkflowState: | |
| if dataset_path: | |
| openui_lang = _demo_dataset_openui(prompt) | |
| return { | |
| "prompt": prompt, | |
| "ckan_endpoint": ckan_endpoint or DEFAULT_CKAN_ENDPOINT, | |
| "dataset_path": dataset_path, | |
| "intent": {"task_type": "analysis"}, | |
| "steps": [{"node": "demo_dataset", "title": "analysis", "detail": "Rendered demo dataset placeholder through the compatibility workflow."}], | |
| "openui_lang": openui_lang, | |
| } | |
| result = run_ckan_agent(prompt, ckan_endpoint) | |
| return { | |
| "prompt": prompt, | |
| "ckan_endpoint": ckan_endpoint or DEFAULT_CKAN_ENDPOINT, | |
| "intent": {"task_type": "dataset_retrieval"}, | |
| "steps": [{"node": event.type, "title": event.type, "detail": event.detail} for event in result.events], | |
| "retrieval_result": _agent_result_dict(result), | |
| "openui_lang": build_retrieval_openui(result), | |
| } | |
| def _agent_result_dict(result: AgentResult) -> dict[str, Any]: | |
| return { | |
| "status": result.status, | |
| "endpoint": result.endpoint, | |
| "packages": result.packages, | |
| "resources": [resource.__dict__ for resource in result.resources], | |
| "selected": result.selected_resource.__dict__ if result.selected_resource else None, | |
| "confidence": result.confidence, | |
| "events": [event.__dict__ for event in result.events], | |
| } | |
| def _demo_dataset_openui(prompt: str) -> str: | |
| return "\n".join( | |
| [ | |
| "root = Card([header, callout, followups])", | |
| 'header = CardHeader("Dataset analysis", "Demo dataset path")', | |
| f'callout = Callout("info", "Next step", {_json_arg(f"Retrieval is now handled by the clean CKAN agent. Dataset analysis/OpenUI detail comes next. Request: {prompt}")})', | |
| 'f1 = FollowUpItem("Find CKAN population datasets")', | |
| 'f2 = FollowUpItem("Find CKAN bike counter datasets")', | |
| "followups = FollowUpBlock([f1, f2])", | |
| ] | |
| ) | |