Spaces:
Sleeping
Sleeping
| """ | |
| Persistent storage for workflow executions using JSON files | |
| """ | |
| import json | |
| import os | |
| from typing import Dict, Any, Optional, List | |
| from .schema import WorkflowDefinition | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class WorkflowStore: | |
| """ | |
| Persistent storage for workflow executions. | |
| Features: | |
| - Save completed workflow executions as JSON | |
| - Load workflows by ID | |
| - List all stored workflows | |
| - Automatic directory creation | |
| """ | |
| def __init__(self, store_path: str = "./workflow_cache"): | |
| """ | |
| Initialize workflow store. | |
| Args: | |
| store_path: Directory path for storing workflows | |
| """ | |
| self.store_path = store_path | |
| os.makedirs(store_path, exist_ok=True) | |
| logger.info(f"WorkflowStore initialized at: {store_path}") | |
| def save_workflow( | |
| self, | |
| workflow_id: str, | |
| workflow: WorkflowDefinition, | |
| result: Dict[str, Any] | |
| ) -> None: | |
| """ | |
| Save completed workflow execution. | |
| Args: | |
| workflow_id: Unique identifier for this execution | |
| workflow: WorkflowDefinition that was executed | |
| result: Execution result from WorkflowExecutor | |
| """ | |
| path = os.path.join(self.store_path, f"{workflow_id}.json") | |
| data = { | |
| "workflow_id": workflow_id, | |
| "workflow": workflow.to_dict(), | |
| "result": result | |
| } | |
| try: | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2) | |
| logger.info(f"Workflow saved: {workflow_id} -> {path}") | |
| except Exception as e: | |
| logger.error(f"Failed to save workflow {workflow_id}: {e}") | |
| raise | |
| def load_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Load workflow execution by ID. | |
| Args: | |
| workflow_id: Workflow execution identifier | |
| Returns: | |
| Workflow data dict or None if not found | |
| """ | |
| path = os.path.join(self.store_path, f"{workflow_id}.json") | |
| if not os.path.exists(path): | |
| logger.warning(f"Workflow not found: {workflow_id}") | |
| return None | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| logger.info(f"Workflow loaded: {workflow_id}") | |
| return data | |
| except Exception as e: | |
| logger.error(f"Failed to load workflow {workflow_id}: {e}") | |
| return None | |
| def list_workflows(self) -> List[str]: | |
| """ | |
| List all stored workflow IDs. | |
| Returns: | |
| List of workflow IDs | |
| """ | |
| try: | |
| files = os.listdir(self.store_path) | |
| workflow_ids = [ | |
| f.replace(".json", "") | |
| for f in files | |
| if f.endswith(".json") | |
| ] | |
| logger.info(f"Found {len(workflow_ids)} stored workflows") | |
| return workflow_ids | |
| except Exception as e: | |
| logger.error(f"Failed to list workflows: {e}") | |
| return [] | |
| def delete_workflow(self, workflow_id: str) -> bool: | |
| """ | |
| Delete stored workflow by ID. | |
| Args: | |
| workflow_id: Workflow execution identifier | |
| Returns: | |
| True if deleted, False if not found | |
| """ | |
| path = os.path.join(self.store_path, f"{workflow_id}.json") | |
| if not os.path.exists(path): | |
| logger.warning(f"Cannot delete - workflow not found: {workflow_id}") | |
| return False | |
| try: | |
| os.remove(path) | |
| logger.info(f"Workflow deleted: {workflow_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to delete workflow {workflow_id}: {e}") | |
| return False | |