|
|
from typing import Dict, List, Any, Optional, Type |
|
|
from .base import BaseAgent, TaskInput, AgentRegistry |
|
|
from .crewai_agent import CrewAIAgent, CrewAIWorkflow |
|
|
from .autogen_agent import AutoGenAgent, AutoGenWorkflow |
|
|
from .langgraph_agent import LangGraphAgent, LangGraphWorkflow |
|
|
from ...core.base import LatticeComponent, LatticeError |
|
|
from enum import Enum |
|
|
from datetime import datetime |
|
|
import logging |
|
|
|
|
|
class WorkflowFramework(Enum): |
|
|
"""Supported workflow frameworks""" |
|
|
CREWAI = "crewai" |
|
|
AUTOGEN = "autogen" |
|
|
LANGGRAPH = "langgraph" |
|
|
|
|
|
class WorkflowState(Enum): |
|
|
"""Workflow execution states""" |
|
|
PENDING = "pending" |
|
|
RUNNING = "running" |
|
|
COMPLETED = "completed" |
|
|
FAILED = "failed" |
|
|
|
|
|
class WorkflowConfig(BaseModel): |
|
|
"""Workflow configuration""" |
|
|
name: str |
|
|
description: str |
|
|
framework: WorkflowFramework |
|
|
agents: List[Dict[str, Any]] |
|
|
tasks: List[Dict[str, Any]] |
|
|
max_retries: int = 3 |
|
|
timeout: int = 600 |
|
|
metadata: Optional[Dict[str, Any]] = None |
|
|
|
|
|
class WorkflowResult(BaseModel): |
|
|
"""Workflow execution result""" |
|
|
workflow_id: str |
|
|
state: WorkflowState |
|
|
results: List[Dict[str, Any]] |
|
|
error: Optional[str] = None |
|
|
start_time: datetime |
|
|
end_time: datetime |
|
|
metadata: Dict[str, Any] |
|
|
|
|
|
class AgentWorkflowManager(LatticeComponent): |
|
|
"""Manages agent workflows across different frameworks""" |
|
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None): |
|
|
super().__init__(config) |
|
|
self.agent_registry = AgentRegistry() |
|
|
self.workflows: Dict[str, WorkflowConfig] = {} |
|
|
self.results: Dict[str, WorkflowResult] = {} |
|
|
|
|
|
|
|
|
self.workflow_managers = { |
|
|
WorkflowFramework.CREWAI: CrewAIWorkflow, |
|
|
WorkflowFramework.AUTOGEN: AutoGenWorkflow, |
|
|
WorkflowFramework.LANGGRAPH: LangGraphWorkflow |
|
|
} |
|
|
|
|
|
async def initialize(self) -> None: |
|
|
"""Initialize workflow manager""" |
|
|
try: |
|
|
|
|
|
if self.config.get('workflow_storage'): |
|
|
self._load_workflows() |
|
|
|
|
|
self._initialized = True |
|
|
|
|
|
except Exception as e: |
|
|
raise LatticeError(f"Failed to initialize workflow manager: {str(e)}") |
|
|
|
|
|
async def create_workflow(self, config: Dict[str, Any]) -> str: |
|
|
"""Create a new workflow""" |
|
|
try: |
|
|
workflow_config = WorkflowConfig(**config) |
|
|
workflow_id = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
agent_ids = [] |
|
|
for agent_config in workflow_config.agents: |
|
|
agent = self._create_agent( |
|
|
workflow_config.framework, |
|
|
agent_config |
|
|
) |
|
|
agent_id = await self.agent_registry.register_agent(agent) |
|
|
agent_ids.append(agent_id) |
|
|
|
|
|
|
|
|
self.workflows[workflow_id] = workflow_config |
|
|
|
|
|
return workflow_id |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Failed to create workflow: {str(e)}") |
|
|
raise |
|
|
|
|
|
async def execute_workflow(self, |
|
|
workflow_id: str, |
|
|
context: Optional[Dict[str, Any]] = None) -> WorkflowResult: |
|
|
"""Execute a workflow""" |
|
|
self.ensure_initialized() |
|
|
|
|
|
if workflow_id not in self.workflows: |
|
|
raise LatticeError(f"Workflow {workflow_id} not found") |
|
|
|
|
|
workflow_config = self.workflows[workflow_id] |
|
|
start_time = datetime.now() |
|
|
|
|
|
try: |
|
|
|
|
|
workflow_manager = self.workflow_managers[workflow_config.framework]( |
|
|
self.config |
|
|
) |
|
|
|
|
|
|
|
|
agents = [ |
|
|
self.agent_registry.get_agent(agent_id) |
|
|
for agent_id in workflow_config.metadata.get('agent_ids', []) |
|
|
] |
|
|
|
|
|
|
|
|
tasks = [ |
|
|
TaskInput(**task_config) |
|
|
for task_config in workflow_config.tasks |
|
|
] |
|
|
|
|
|
|
|
|
result = await workflow_manager.execute_workflow(agents, tasks) |
|
|
|
|
|
workflow_result = WorkflowResult( |
|
|
workflow_id=workflow_id, |
|
|
state=WorkflowState.COMPLETED, |
|
|
results=result['results'], |
|
|
start_time=start_time, |
|
|
end_time=datetime.now(), |
|
|
metadata={ |
|
|
'framework': workflow_config.framework.value, |
|
|
'agent_count': len(agents), |
|
|
'task_count': len(tasks), |
|
|
**result.get('metadata', {}) |
|
|
} |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.error(f"Workflow execution failed: {str(e)}") |
|
|
workflow_result = WorkflowResult( |
|
|
workflow_id=workflow_id, |
|
|
state=WorkflowState.FAILED, |
|
|
results=[], |
|
|
error=str(e), |
|
|
start_time=start_time, |
|
|
end_time=datetime.now(), |
|
|
metadata={ |
|
|
'framework': workflow_config.framework.value |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
self.results[workflow_id] = workflow_result |
|
|
return workflow_result |
|
|
|
|
|
def _create_agent(self, |
|
|
framework: WorkflowFramework, |
|
|
config: Dict[str, Any]) -> BaseAgent: |
|
|
"""Create appropriate agent type for framework""" |
|
|
agent_types = { |
|
|
WorkflowFramework.CREWAI: CrewAIAgent, |
|
|
WorkflowFramework.AUTOGEN: AutoGenAgent, |
|
|
WorkflowFramework.LANGGRAPH: LangGraphAgent |
|
|
} |
|
|
|
|
|
agent_class = agent_types.get(framework) |
|
|
if not agent_class: |
|
|
raise LatticeError(f"Unsupported framework: {framework}") |
|
|
|
|
|
return agent_class(config) |
|
|
|
|
|
def get_workflow_status(self, workflow_id: str) -> Optional[WorkflowResult]: |
|
|
"""Get workflow execution status""" |
|
|
return self.results.get(workflow_id) |
|
|
|
|
|
def list_workflows(self) -> List[Dict[str, Any]]: |
|
|
"""List all workflows""" |
|
|
return [ |
|
|
{ |
|
|
'workflow_id': workflow_id, |
|
|
'name': config.name, |
|
|
'framework': config.framework.value, |
|
|
'state': self.results.get(workflow_id, {}).get('state', WorkflowState.PENDING).value |
|
|
} |
|
|
for workflow_id, config in self.workflows.items() |
|
|
] |