chmielvu's picture
feat: add production refinements (Phase 1-3)
4454066 verified
"""
Persistent storage for workflow executions using JSON files
"""
import json
import os
from typing import Dict, Any, Optional, List
from .schema import WorkflowDefinition
import logging
logger = logging.getLogger(__name__)
class WorkflowStore:
"""
Persistent storage for workflow executions.
Features:
- Save completed workflow executions as JSON
- Load workflows by ID
- List all stored workflows
- Automatic directory creation
"""
def __init__(self, store_path: str = "./workflow_cache"):
"""
Initialize workflow store.
Args:
store_path: Directory path for storing workflows
"""
self.store_path = store_path
os.makedirs(store_path, exist_ok=True)
logger.info(f"WorkflowStore initialized at: {store_path}")
def save_workflow(
self,
workflow_id: str,
workflow: WorkflowDefinition,
result: Dict[str, Any]
) -> None:
"""
Save completed workflow execution.
Args:
workflow_id: Unique identifier for this execution
workflow: WorkflowDefinition that was executed
result: Execution result from WorkflowExecutor
"""
path = os.path.join(self.store_path, f"{workflow_id}.json")
data = {
"workflow_id": workflow_id,
"workflow": workflow.to_dict(),
"result": result
}
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
logger.info(f"Workflow saved: {workflow_id} -> {path}")
except Exception as e:
logger.error(f"Failed to save workflow {workflow_id}: {e}")
raise
def load_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]:
"""
Load workflow execution by ID.
Args:
workflow_id: Workflow execution identifier
Returns:
Workflow data dict or None if not found
"""
path = os.path.join(self.store_path, f"{workflow_id}.json")
if not os.path.exists(path):
logger.warning(f"Workflow not found: {workflow_id}")
return None
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
logger.info(f"Workflow loaded: {workflow_id}")
return data
except Exception as e:
logger.error(f"Failed to load workflow {workflow_id}: {e}")
return None
def list_workflows(self) -> List[str]:
"""
List all stored workflow IDs.
Returns:
List of workflow IDs
"""
try:
files = os.listdir(self.store_path)
workflow_ids = [
f.replace(".json", "")
for f in files
if f.endswith(".json")
]
logger.info(f"Found {len(workflow_ids)} stored workflows")
return workflow_ids
except Exception as e:
logger.error(f"Failed to list workflows: {e}")
return []
def delete_workflow(self, workflow_id: str) -> bool:
"""
Delete stored workflow by ID.
Args:
workflow_id: Workflow execution identifier
Returns:
True if deleted, False if not found
"""
path = os.path.join(self.store_path, f"{workflow_id}.json")
if not os.path.exists(path):
logger.warning(f"Cannot delete - workflow not found: {workflow_id}")
return False
try:
os.remove(path)
logger.info(f"Workflow deleted: {workflow_id}")
return True
except Exception as e:
logger.error(f"Failed to delete workflow {workflow_id}: {e}")
return False