Lattice / core /agents /workflow.py
cryogenic22's picture
Create core/agents/workflow.py
97cfc17 verified
from typing import Dict, List, Any, Optional, Type
from .base import BaseAgent, TaskInput, AgentRegistry
from .crewai_agent import CrewAIAgent, CrewAIWorkflow
from .autogen_agent import AutoGenAgent, AutoGenWorkflow
from .langgraph_agent import LangGraphAgent, LangGraphWorkflow
from ...core.base import LatticeComponent, LatticeError
from enum import Enum
from datetime import datetime
import logging
class WorkflowFramework(Enum):
"""Supported workflow frameworks"""
CREWAI = "crewai"
AUTOGEN = "autogen"
LANGGRAPH = "langgraph"
class WorkflowState(Enum):
"""Workflow execution states"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class WorkflowConfig(BaseModel):
"""Workflow configuration"""
name: str
description: str
framework: WorkflowFramework
agents: List[Dict[str, Any]]
tasks: List[Dict[str, Any]]
max_retries: int = 3
timeout: int = 600 # seconds
metadata: Optional[Dict[str, Any]] = None
class WorkflowResult(BaseModel):
"""Workflow execution result"""
workflow_id: str
state: WorkflowState
results: List[Dict[str, Any]]
error: Optional[str] = None
start_time: datetime
end_time: datetime
metadata: Dict[str, Any]
class AgentWorkflowManager(LatticeComponent):
"""Manages agent workflows across different frameworks"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self.agent_registry = AgentRegistry()
self.workflows: Dict[str, WorkflowConfig] = {}
self.results: Dict[str, WorkflowResult] = {}
# Framework-specific workflow managers
self.workflow_managers = {
WorkflowFramework.CREWAI: CrewAIWorkflow,
WorkflowFramework.AUTOGEN: AutoGenWorkflow,
WorkflowFramework.LANGGRAPH: LangGraphWorkflow
}
async def initialize(self) -> None:
"""Initialize workflow manager"""
try:
# Load any saved workflows
if self.config.get('workflow_storage'):
self._load_workflows()
self._initialized = True
except Exception as e:
raise LatticeError(f"Failed to initialize workflow manager: {str(e)}")
async def create_workflow(self, config: Dict[str, Any]) -> str:
"""Create a new workflow"""
try:
workflow_config = WorkflowConfig(**config)
workflow_id = str(uuid.uuid4())
# Create and register agents
agent_ids = []
for agent_config in workflow_config.agents:
agent = self._create_agent(
workflow_config.framework,
agent_config
)
agent_id = await self.agent_registry.register_agent(agent)
agent_ids.append(agent_id)
# Store workflow configuration
self.workflows[workflow_id] = workflow_config
return workflow_id
except Exception as e:
self.logger.error(f"Failed to create workflow: {str(e)}")
raise
async def execute_workflow(self,
workflow_id: str,
context: Optional[Dict[str, Any]] = None) -> WorkflowResult:
"""Execute a workflow"""
self.ensure_initialized()
if workflow_id not in self.workflows:
raise LatticeError(f"Workflow {workflow_id} not found")
workflow_config = self.workflows[workflow_id]
start_time = datetime.now()
try:
# Get workflow manager for framework
workflow_manager = self.workflow_managers[workflow_config.framework](
self.config
)
# Get agents
agents = [
self.agent_registry.get_agent(agent_id)
for agent_id in workflow_config.metadata.get('agent_ids', [])
]
# Create tasks
tasks = [
TaskInput(**task_config)
for task_config in workflow_config.tasks
]
# Execute workflow
result = await workflow_manager.execute_workflow(agents, tasks)
workflow_result = WorkflowResult(
workflow_id=workflow_id,
state=WorkflowState.COMPLETED,
results=result['results'],
start_time=start_time,
end_time=datetime.now(),
metadata={
'framework': workflow_config.framework.value,
'agent_count': len(agents),
'task_count': len(tasks),
**result.get('metadata', {})
}
)
except Exception as e:
self.logger.error(f"Workflow execution failed: {str(e)}")
workflow_result = WorkflowResult(
workflow_id=workflow_id,
state=WorkflowState.FAILED,
results=[],
error=str(e),
start_time=start_time,
end_time=datetime.now(),
metadata={
'framework': workflow_config.framework.value
}
)
# Store result
self.results[workflow_id] = workflow_result
return workflow_result
def _create_agent(self,
framework: WorkflowFramework,
config: Dict[str, Any]) -> BaseAgent:
"""Create appropriate agent type for framework"""
agent_types = {
WorkflowFramework.CREWAI: CrewAIAgent,
WorkflowFramework.AUTOGEN: AutoGenAgent,
WorkflowFramework.LANGGRAPH: LangGraphAgent
}
agent_class = agent_types.get(framework)
if not agent_class:
raise LatticeError(f"Unsupported framework: {framework}")
return agent_class(config)
def get_workflow_status(self, workflow_id: str) -> Optional[WorkflowResult]:
"""Get workflow execution status"""
return self.results.get(workflow_id)
def list_workflows(self) -> List[Dict[str, Any]]:
"""List all workflows"""
return [
{
'workflow_id': workflow_id,
'name': config.name,
'framework': config.framework.value,
'state': self.results.get(workflow_id, {}).get('state', WorkflowState.PENDING).value
}
for workflow_id, config in self.workflows.items()
]