|
|
""" |
|
|
Business logic for agents, workflows, and executions. |
|
|
""" |
|
|
import logging |
|
|
|
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Any, Optional, Tuple |
|
|
from bson import ObjectId |
|
|
|
|
|
from evoagentx.app.db import ( |
|
|
Database, |
|
|
AgentStatus, WorkflowStatus, ExecutionStatus |
|
|
) |
|
|
from evoagentx.app.schemas import ( |
|
|
AgentCreate, AgentUpdate, WorkflowCreate, WorkflowUpdate, |
|
|
ExecutionCreate, PaginationParams, SearchParams |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class AgentService: |
|
|
@staticmethod |
|
|
async def create_agent(agent_data: AgentCreate, user_id: Optional[str] = None) -> Dict[str, Any]: |
|
|
"""Create a new agent.""" |
|
|
agent_dict = agent_data.dict() |
|
|
agent_dict["created_by"] = user_id |
|
|
agent_dict["created_at"] = datetime.utcnow() |
|
|
agent_dict["updated_at"] = agent_dict["created_at"] |
|
|
agent_dict["status"] = AgentStatus.CREATED |
|
|
|
|
|
|
|
|
existing_agent = await Database.agents.find_one({"name": agent_dict["name"]}) |
|
|
if existing_agent: |
|
|
raise ValueError(f"Agent with name '{agent_dict['name']}' already exists") |
|
|
|
|
|
result = await Database.agents.insert_one(agent_dict) |
|
|
agent_dict["_id"] = result.inserted_id |
|
|
|
|
|
logger.info(f"Created agent {agent_dict['name']} with ID {result.inserted_id}") |
|
|
|
|
|
return agent_dict |
|
|
|
|
|
@staticmethod |
|
|
async def get_agent(agent_id: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get an agent by ID.""" |
|
|
if not ObjectId.is_valid(agent_id): |
|
|
raise ValueError(f"Invalid agent ID: {agent_id}") |
|
|
|
|
|
agent = await Database.agents.find_one({"_id": ObjectId(agent_id)}) |
|
|
return agent |
|
|
|
|
|
@staticmethod |
|
|
async def get_agent_by_name(name: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get an agent by name.""" |
|
|
return await Database.agents.find_one({"name": name}) |
|
|
|
|
|
@staticmethod |
|
|
async def update_agent(agent_id: str, agent_data: AgentUpdate) -> Optional[Dict[str, Any]]: |
|
|
"""Update an agent.""" |
|
|
if not ObjectId.is_valid(agent_id): |
|
|
raise ValueError(f"Invalid agent ID: {agent_id}") |
|
|
|
|
|
agent = await Database.agents.find_one({"_id": ObjectId(agent_id)}) |
|
|
if not agent: |
|
|
return None |
|
|
|
|
|
update_data = agent_data.dict(exclude_unset=True) |
|
|
update_data["updated_at"] = datetime.utcnow() |
|
|
|
|
|
if "name" in update_data: |
|
|
|
|
|
existing = await Database.agents.find_one({ |
|
|
"name": update_data["name"], |
|
|
"_id": {"$ne": ObjectId(agent_id)} |
|
|
}) |
|
|
if existing: |
|
|
raise ValueError(f"Agent with name '{update_data['name']}' already exists") |
|
|
|
|
|
await Database.agents.update_one( |
|
|
{"_id": ObjectId(agent_id)}, |
|
|
{"$set": update_data} |
|
|
) |
|
|
|
|
|
updated_agent = await Database.agents.find_one({"_id": ObjectId(agent_id)}) |
|
|
logger.info(f"Updated agent {agent_id}") |
|
|
|
|
|
return updated_agent |
|
|
|
|
|
@staticmethod |
|
|
async def delete_agent(agent_id: str) -> bool: |
|
|
"""Delete an agent.""" |
|
|
if not ObjectId.is_valid(agent_id): |
|
|
raise ValueError(f"Invalid agent ID: {agent_id}") |
|
|
|
|
|
|
|
|
workflow_count = await Database.workflows.count_documents({"agent_ids": agent_id}) |
|
|
if workflow_count > 0: |
|
|
raise ValueError(f"Cannot delete agent {agent_id} as it is used in {workflow_count} workflows") |
|
|
|
|
|
result = await Database.agents.delete_one({"_id": ObjectId(agent_id)}) |
|
|
if result.deleted_count: |
|
|
logger.info(f"Deleted agent {agent_id}") |
|
|
return True |
|
|
return False |
|
|
|
|
|
@staticmethod |
|
|
async def list_agents( |
|
|
params: PaginationParams, |
|
|
search: Optional[SearchParams] = None |
|
|
) -> Tuple[List[Dict[str, Any]], int]: |
|
|
"""List agents with pagination and search.""" |
|
|
query = {} |
|
|
|
|
|
if search: |
|
|
if search.query: |
|
|
query["$text"] = {"$search": search.query} |
|
|
|
|
|
if search.tags: |
|
|
query["tags"] = {"$all": search.tags} |
|
|
|
|
|
if search.status: |
|
|
query["status"] = search.status |
|
|
|
|
|
if search.start_date and search.end_date: |
|
|
query["created_at"] = { |
|
|
"$gte": search.start_date, |
|
|
"$lte": search.end_date |
|
|
} |
|
|
elif search.start_date: |
|
|
query["created_at"] = {"$gte": search.start_date} |
|
|
elif search.end_date: |
|
|
query["created_at"] = {"$lte": search.end_date} |
|
|
|
|
|
total = await Database.agents.count_documents(query) |
|
|
|
|
|
cursor = Database.agents.find(query)\ |
|
|
.sort("created_at", -1)\ |
|
|
.skip(params.skip)\ |
|
|
.limit(params.limit) |
|
|
|
|
|
agents = await cursor.to_list(length=params.limit) |
|
|
return agents, total |
|
|
|
|
|
|
|
|
class WorkflowService: |
|
|
@staticmethod |
|
|
async def create_workflow(workflow_data: WorkflowCreate, user_id: Optional[str] = None) -> Dict[str, Any]: |
|
|
"""Create a new workflow.""" |
|
|
workflow_dict = workflow_data.dict() |
|
|
workflow_dict["created_by"] = user_id |
|
|
workflow_dict["created_at"] = datetime.utcnow() |
|
|
workflow_dict["updated_at"] = workflow_dict["created_at"] |
|
|
workflow_dict["status"] = WorkflowStatus.CREATED |
|
|
workflow_dict["version"] = 1 |
|
|
|
|
|
|
|
|
agent_ids = set() |
|
|
|
|
|
|
|
|
steps = workflow_dict["definition"].get("steps", []) |
|
|
for step in steps: |
|
|
if "agent_id" in step: |
|
|
agent_id = step["agent_id"] |
|
|
|
|
|
agent = await AgentService.get_agent(agent_id) |
|
|
if not agent: |
|
|
raise ValueError(f"Agent with ID {agent_id} does not exist") |
|
|
agent_ids.add(agent_id) |
|
|
|
|
|
workflow_dict["agent_ids"] = list(agent_ids) |
|
|
|
|
|
|
|
|
existing = await Database.workflows.find_one({"name": workflow_dict["name"]}) |
|
|
if existing: |
|
|
raise ValueError(f"Workflow with name '{workflow_dict['name']}' already exists") |
|
|
|
|
|
result = await Database.workflows.insert_one(workflow_dict) |
|
|
workflow_dict["_id"] = result.inserted_id |
|
|
|
|
|
logger.info(f"Created workflow {workflow_dict['name']} with ID {result.inserted_id}") |
|
|
|
|
|
return workflow_dict |
|
|
|
|
|
@staticmethod |
|
|
async def get_workflow(workflow_id: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get a workflow by ID.""" |
|
|
if not ObjectId.is_valid(workflow_id): |
|
|
raise ValueError(f"Invalid workflow ID: {workflow_id}") |
|
|
workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)}) |
|
|
return workflow |
|
|
|
|
|
@staticmethod |
|
|
async def get_workflow_by_name(name: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get a workflow by name.""" |
|
|
return await Database.workflows.find_one({"name": name}) |
|
|
|
|
|
@staticmethod |
|
|
async def update_workflow(workflow_id: str, workflow_data: WorkflowUpdate) -> Optional[Dict[str, Any]]: |
|
|
"""Update a workflow.""" |
|
|
if not ObjectId.is_valid(workflow_id): |
|
|
raise ValueError(f"Invalid workflow ID: {workflow_id}") |
|
|
|
|
|
workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)}) |
|
|
if not workflow: |
|
|
return None |
|
|
|
|
|
update_data = workflow_data.dict(exclude_unset=True) |
|
|
update_data["updated_at"] = datetime.utcnow() |
|
|
|
|
|
|
|
|
if "definition" in update_data: |
|
|
update_data["version"] = workflow.get("version", 1) + 1 |
|
|
|
|
|
|
|
|
agent_ids = set() |
|
|
steps = update_data["definition"].get("steps", []) |
|
|
for step in steps: |
|
|
if "agent_id" in step: |
|
|
agent_id = step["agent_id"] |
|
|
|
|
|
agent = await AgentService.get_agent(agent_id) |
|
|
if not agent: |
|
|
raise ValueError(f"Agent with ID {agent_id} does not exist") |
|
|
agent_ids.add(agent_id) |
|
|
|
|
|
update_data["agent_ids"] = list(agent_ids) |
|
|
|
|
|
|
|
|
if "name" in update_data: |
|
|
existing = await Database.workflows.find_one({ |
|
|
"name": update_data["name"], |
|
|
"_id": {"$ne": ObjectId(workflow_id)} |
|
|
}) |
|
|
if existing: |
|
|
raise ValueError(f"Workflow with name '{update_data['name']}' already exists") |
|
|
|
|
|
await Database.workflows.update_one( |
|
|
{"_id": ObjectId(workflow_id)}, |
|
|
{"$set": update_data} |
|
|
) |
|
|
|
|
|
updated_workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)}) |
|
|
logger.info(f"Updated workflow {workflow_id}") |
|
|
|
|
|
return updated_workflow |
|
|
|
|
|
@staticmethod |
|
|
async def delete_workflow(workflow_id: str) -> bool: |
|
|
"""Delete a workflow.""" |
|
|
if not ObjectId.is_valid(workflow_id): |
|
|
raise ValueError(f"Invalid workflow ID: {workflow_id}") |
|
|
|
|
|
|
|
|
recent_executions = await Database.executions.count_documents({ |
|
|
"workflow_id": workflow_id, |
|
|
"status": {"$in": [ |
|
|
ExecutionStatus.PENDING, |
|
|
ExecutionStatus.RUNNING |
|
|
]} |
|
|
}) |
|
|
|
|
|
if recent_executions > 0: |
|
|
raise ValueError(f"Cannot delete workflow {workflow_id} with {recent_executions} active executions") |
|
|
|
|
|
result = await Database.workflows.delete_one({"_id": ObjectId(workflow_id)}) |
|
|
if result.deleted_count: |
|
|
|
|
|
await Database.logs.delete_many({"workflow_id": workflow_id}) |
|
|
await Database.executions.delete_many({"workflow_id": workflow_id}) |
|
|
|
|
|
logger.info(f"Deleted workflow {workflow_id}") |
|
|
return True |
|
|
return False |
|
|
|
|
|
@staticmethod |
|
|
async def list_workflows( |
|
|
params: PaginationParams, |
|
|
search: Optional[SearchParams] = None |
|
|
) -> Tuple[List[Dict[str, Any]], int]: |
|
|
"""List workflows with pagination and search.""" |
|
|
query = {} |
|
|
|
|
|
if search: |
|
|
if search.query: |
|
|
query["$text"] = {"$search": search.query} |
|
|
|
|
|
if search.tags: |
|
|
query["tags"] = {"$all": search.tags} |
|
|
|
|
|
if search.status: |
|
|
query["status"] = search.status |
|
|
|
|
|
if search.start_date and search.end_date: |
|
|
query["created_at"] = { |
|
|
"$gte": search.start_date, |
|
|
"$lte": search.end_date |
|
|
} |
|
|
elif search.start_date: |
|
|
query["created_at"] = {"$gte": search.start_date} |
|
|
elif search.end_date: |
|
|
query["created_at"] = {"$lte": search.end_date} |
|
|
|
|
|
total = await Database.workflows.count_documents(query) |
|
|
|
|
|
cursor = Database.workflows.find(query)\ |
|
|
.sort("created_at", -1)\ |
|
|
.skip(params.skip)\ |
|
|
.limit(params.limit) |
|
|
|
|
|
workflows = await cursor.to_list(length=params.limit) |
|
|
return workflows, total |
|
|
|
|
|
|
|
|
class WorkflowExecutionService: |
|
|
@staticmethod |
|
|
async def create_execution(execution_data: ExecutionCreate, user_id: Optional[str] = None) -> Dict[str, Any]: |
|
|
"""Create a new workflow execution.""" |
|
|
|
|
|
workflow = await WorkflowService.get_workflow(execution_data.workflow_id) |
|
|
if not workflow: |
|
|
raise ValueError(f"Workflow {execution_data.workflow_id} not found") |
|
|
|
|
|
|
|
|
execution_dict = { |
|
|
"workflow_id": execution_data.workflow_id, |
|
|
"status": ExecutionStatus.PENDING, |
|
|
"start_time": datetime.utcnow(), |
|
|
"input_params": execution_data.input_params, |
|
|
"created_by": user_id, |
|
|
"created_at": datetime.utcnow(), |
|
|
"step_results": {}, |
|
|
"current_step": None, |
|
|
"results": {}, |
|
|
"error_message": None |
|
|
} |
|
|
|
|
|
|
|
|
result = await Database.executions.insert_one(execution_dict) |
|
|
execution_dict["_id"] = result.inserted_id |
|
|
|
|
|
logger.info(f"Created workflow execution {result.inserted_id}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return execution_dict |
|
|
|
|
|
@staticmethod |
|
|
async def get_execution(execution_id: str) -> Optional[Dict[str, Any]]: |
|
|
"""Get a workflow execution by ID.""" |
|
|
if not ObjectId.is_valid(execution_id): |
|
|
raise ValueError(f"Invalid execution ID: {execution_id}") |
|
|
|
|
|
execution = await Database.executions.find_one({"_id": ObjectId(execution_id)}) |
|
|
return execution |
|
|
|
|
|
@staticmethod |
|
|
async def update_execution_status(execution_id: str, status: ExecutionStatus, error_message: Optional[str] = None) -> Optional[Dict[str, Any]]: |
|
|
"""Update execution status.""" |
|
|
if not ObjectId.is_valid(execution_id): |
|
|
raise ValueError(f"Invalid execution ID: {execution_id}") |
|
|
|
|
|
update_data = { |
|
|
"status": status, |
|
|
"updated_at": datetime.utcnow() |
|
|
} |
|
|
|
|
|
if status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED]: |
|
|
update_data["end_time"] = datetime.utcnow() |
|
|
|
|
|
if error_message: |
|
|
update_data["error_message"] = error_message |
|
|
|
|
|
result = await Database.executions.find_one_and_update( |
|
|
{"_id": ObjectId(execution_id)}, |
|
|
{"$set": update_data}, |
|
|
return_document=True |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
@staticmethod |
|
|
async def list_executions( |
|
|
workflow_id: Optional[str] = None, |
|
|
params: PaginationParams = PaginationParams(), |
|
|
search: Optional[SearchParams] = None |
|
|
) -> Tuple[List[Dict[str, Any]], int]: |
|
|
"""List workflow executions with pagination and search.""" |
|
|
query = {} |
|
|
|
|
|
if workflow_id: |
|
|
query["workflow_id"] = workflow_id |
|
|
|
|
|
if search: |
|
|
if search.status: |
|
|
query["status"] = search.status |
|
|
|
|
|
if search.start_date and search.end_date: |
|
|
query["created_at"] = { |
|
|
"$gte": search.start_date, |
|
|
"$lte": search.end_date |
|
|
} |
|
|
elif search.start_date: |
|
|
query["created_at"] = {"$gte": search.start_date} |
|
|
elif search.end_date: |
|
|
query["created_at"] = {"$lte": search.end_date} |
|
|
|
|
|
total = await Database.executions.count_documents(query) |
|
|
|
|
|
cursor = Database.executions.find(query)\ |
|
|
.sort("created_at", -1)\ |
|
|
.skip(params.skip)\ |
|
|
.limit(params.limit) |
|
|
|
|
|
executions = await cursor.to_list(length=params.limit) |
|
|
return executions, total |
|
|
|
|
|
@staticmethod |
|
|
async def log_execution_event( |
|
|
workflow_id: str, |
|
|
execution_id: str, |
|
|
message: str, |
|
|
step_id: Optional[str] = None, |
|
|
agent_id: Optional[str] = None, |
|
|
level: str = "INFO", |
|
|
details: Optional[Dict[str, Any]] = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Log an event in a workflow execution.""" |
|
|
log_entry = { |
|
|
"workflow_id": workflow_id, |
|
|
"execution_id": execution_id, |
|
|
"step_id": step_id, |
|
|
"agent_id": agent_id, |
|
|
"timestamp": datetime.utcnow(), |
|
|
"level": level, |
|
|
"message": message, |
|
|
"details": details or {} |
|
|
} |
|
|
|
|
|
result = await Database.logs.insert_one(log_entry) |
|
|
log_entry["_id"] = result.inserted_id |
|
|
|
|
|
return log_entry |
|
|
|
|
|
@staticmethod |
|
|
async def get_execution_logs( |
|
|
execution_id: str, |
|
|
params: PaginationParams = PaginationParams() |
|
|
) -> Tuple[List[Dict[str, Any]], int]: |
|
|
"""Retrieve logs for a specific execution.""" |
|
|
query = {"execution_id": execution_id} |
|
|
|
|
|
total = await Database.logs.count_documents(query) |
|
|
|
|
|
cursor = Database.logs.find(query)\ |
|
|
.sort("timestamp", 1)\ |
|
|
.skip(params.skip)\ |
|
|
.limit(params.limit) |
|
|
|
|
|
logs = await cursor.to_list(length=params.limit) |
|
|
return logs, total |