""" Persistent storage for workflow executions using JSON files """ import json import os from typing import Dict, Any, Optional, List from .schema import WorkflowDefinition import logging logger = logging.getLogger(__name__) class WorkflowStore: """ Persistent storage for workflow executions. Features: - Save completed workflow executions as JSON - Load workflows by ID - List all stored workflows - Automatic directory creation """ def __init__(self, store_path: str = "./workflow_cache"): """ Initialize workflow store. Args: store_path: Directory path for storing workflows """ self.store_path = store_path os.makedirs(store_path, exist_ok=True) logger.info(f"WorkflowStore initialized at: {store_path}") def save_workflow( self, workflow_id: str, workflow: WorkflowDefinition, result: Dict[str, Any] ) -> None: """ Save completed workflow execution. Args: workflow_id: Unique identifier for this execution workflow: WorkflowDefinition that was executed result: Execution result from WorkflowExecutor """ path = os.path.join(self.store_path, f"{workflow_id}.json") data = { "workflow_id": workflow_id, "workflow": workflow.to_dict(), "result": result } try: with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) logger.info(f"Workflow saved: {workflow_id} -> {path}") except Exception as e: logger.error(f"Failed to save workflow {workflow_id}: {e}") raise def load_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]: """ Load workflow execution by ID. Args: workflow_id: Workflow execution identifier Returns: Workflow data dict or None if not found """ path = os.path.join(self.store_path, f"{workflow_id}.json") if not os.path.exists(path): logger.warning(f"Workflow not found: {workflow_id}") return None try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) logger.info(f"Workflow loaded: {workflow_id}") return data except Exception as e: logger.error(f"Failed to load workflow {workflow_id}: {e}") return None def list_workflows(self) -> List[str]: """ List all stored workflow IDs. Returns: List of workflow IDs """ try: files = os.listdir(self.store_path) workflow_ids = [ f.replace(".json", "") for f in files if f.endswith(".json") ] logger.info(f"Found {len(workflow_ids)} stored workflows") return workflow_ids except Exception as e: logger.error(f"Failed to list workflows: {e}") return [] def delete_workflow(self, workflow_id: str) -> bool: """ Delete stored workflow by ID. Args: workflow_id: Workflow execution identifier Returns: True if deleted, False if not found """ path = os.path.join(self.store_path, f"{workflow_id}.json") if not os.path.exists(path): logger.warning(f"Cannot delete - workflow not found: {workflow_id}") return False try: os.remove(path) logger.info(f"Workflow deleted: {workflow_id}") return True except Exception as e: logger.error(f"Failed to delete workflow {workflow_id}: {e}") return False