from typing import Dict, List, Any, Optional, Type from .base import BaseAgent, TaskInput, AgentRegistry from .crewai_agent import CrewAIAgent, CrewAIWorkflow from .autogen_agent import AutoGenAgent, AutoGenWorkflow from .langgraph_agent import LangGraphAgent, LangGraphWorkflow from ...core.base import LatticeComponent, LatticeError from enum import Enum from datetime import datetime import logging class WorkflowFramework(Enum): """Supported workflow frameworks""" CREWAI = "crewai" AUTOGEN = "autogen" LANGGRAPH = "langgraph" class WorkflowState(Enum): """Workflow execution states""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" class WorkflowConfig(BaseModel): """Workflow configuration""" name: str description: str framework: WorkflowFramework agents: List[Dict[str, Any]] tasks: List[Dict[str, Any]] max_retries: int = 3 timeout: int = 600 # seconds metadata: Optional[Dict[str, Any]] = None class WorkflowResult(BaseModel): """Workflow execution result""" workflow_id: str state: WorkflowState results: List[Dict[str, Any]] error: Optional[str] = None start_time: datetime end_time: datetime metadata: Dict[str, Any] class AgentWorkflowManager(LatticeComponent): """Manages agent workflows across different frameworks""" def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) self.agent_registry = AgentRegistry() self.workflows: Dict[str, WorkflowConfig] = {} self.results: Dict[str, WorkflowResult] = {} # Framework-specific workflow managers self.workflow_managers = { WorkflowFramework.CREWAI: CrewAIWorkflow, WorkflowFramework.AUTOGEN: AutoGenWorkflow, WorkflowFramework.LANGGRAPH: LangGraphWorkflow } async def initialize(self) -> None: """Initialize workflow manager""" try: # Load any saved workflows if self.config.get('workflow_storage'): self._load_workflows() self._initialized = True except Exception as e: raise LatticeError(f"Failed to initialize workflow manager: {str(e)}") async def create_workflow(self, config: Dict[str, Any]) -> str: """Create a new workflow""" try: workflow_config = WorkflowConfig(**config) workflow_id = str(uuid.uuid4()) # Create and register agents agent_ids = [] for agent_config in workflow_config.agents: agent = self._create_agent( workflow_config.framework, agent_config ) agent_id = await self.agent_registry.register_agent(agent) agent_ids.append(agent_id) # Store workflow configuration self.workflows[workflow_id] = workflow_config return workflow_id except Exception as e: self.logger.error(f"Failed to create workflow: {str(e)}") raise async def execute_workflow(self, workflow_id: str, context: Optional[Dict[str, Any]] = None) -> WorkflowResult: """Execute a workflow""" self.ensure_initialized() if workflow_id not in self.workflows: raise LatticeError(f"Workflow {workflow_id} not found") workflow_config = self.workflows[workflow_id] start_time = datetime.now() try: # Get workflow manager for framework workflow_manager = self.workflow_managers[workflow_config.framework]( self.config ) # Get agents agents = [ self.agent_registry.get_agent(agent_id) for agent_id in workflow_config.metadata.get('agent_ids', []) ] # Create tasks tasks = [ TaskInput(**task_config) for task_config in workflow_config.tasks ] # Execute workflow result = await workflow_manager.execute_workflow(agents, tasks) workflow_result = WorkflowResult( workflow_id=workflow_id, state=WorkflowState.COMPLETED, results=result['results'], start_time=start_time, end_time=datetime.now(), metadata={ 'framework': workflow_config.framework.value, 'agent_count': len(agents), 'task_count': len(tasks), **result.get('metadata', {}) } ) except Exception as e: self.logger.error(f"Workflow execution failed: {str(e)}") workflow_result = WorkflowResult( workflow_id=workflow_id, state=WorkflowState.FAILED, results=[], error=str(e), start_time=start_time, end_time=datetime.now(), metadata={ 'framework': workflow_config.framework.value } ) # Store result self.results[workflow_id] = workflow_result return workflow_result def _create_agent(self, framework: WorkflowFramework, config: Dict[str, Any]) -> BaseAgent: """Create appropriate agent type for framework""" agent_types = { WorkflowFramework.CREWAI: CrewAIAgent, WorkflowFramework.AUTOGEN: AutoGenAgent, WorkflowFramework.LANGGRAPH: LangGraphAgent } agent_class = agent_types.get(framework) if not agent_class: raise LatticeError(f"Unsupported framework: {framework}") return agent_class(config) def get_workflow_status(self, workflow_id: str) -> Optional[WorkflowResult]: """Get workflow execution status""" return self.results.get(workflow_id) def list_workflows(self) -> List[Dict[str, Any]]: """List all workflows""" return [ { 'workflow_id': workflow_id, 'name': config.name, 'framework': config.framework.value, 'state': self.results.get(workflow_id, {}).get('state', WorkflowState.PENDING).value } for workflow_id, config in self.workflows.items() ]