iLOVE2D's picture
Upload 2846 files
5374a2d verified
"""
Business logic for agents, workflows, and executions.
"""
import logging
# import asyncio
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
from bson import ObjectId
from evoagentx.app.db import (
Database, # Agent, Workflow, WorkflowExecution, ExecutionLog,
AgentStatus, WorkflowStatus, ExecutionStatus
)
from evoagentx.app.schemas import (
AgentCreate, AgentUpdate, WorkflowCreate, WorkflowUpdate,
ExecutionCreate, PaginationParams, SearchParams
)
logger = logging.getLogger(__name__)
# Agent Service
class AgentService:
@staticmethod
async def create_agent(agent_data: AgentCreate, user_id: Optional[str] = None) -> Dict[str, Any]:
"""Create a new agent."""
agent_dict = agent_data.dict()
agent_dict["created_by"] = user_id
agent_dict["created_at"] = datetime.utcnow()
agent_dict["updated_at"] = agent_dict["created_at"]
agent_dict["status"] = AgentStatus.CREATED
# Validate agent exists with the same name
existing_agent = await Database.agents.find_one({"name": agent_dict["name"]})
if existing_agent:
raise ValueError(f"Agent with name '{agent_dict['name']}' already exists")
result = await Database.agents.insert_one(agent_dict)
agent_dict["_id"] = result.inserted_id
logger.info(f"Created agent {agent_dict['name']} with ID {result.inserted_id}")
return agent_dict
@staticmethod
async def get_agent(agent_id: str) -> Optional[Dict[str, Any]]:
"""Get an agent by ID."""
if not ObjectId.is_valid(agent_id):
raise ValueError(f"Invalid agent ID: {agent_id}")
agent = await Database.agents.find_one({"_id": ObjectId(agent_id)})
return agent
@staticmethod
async def get_agent_by_name(name: str) -> Optional[Dict[str, Any]]:
"""Get an agent by name."""
return await Database.agents.find_one({"name": name})
@staticmethod
async def update_agent(agent_id: str, agent_data: AgentUpdate) -> Optional[Dict[str, Any]]:
"""Update an agent."""
if not ObjectId.is_valid(agent_id):
raise ValueError(f"Invalid agent ID: {agent_id}")
agent = await Database.agents.find_one({"_id": ObjectId(agent_id)})
if not agent:
return None
update_data = agent_data.dict(exclude_unset=True)
update_data["updated_at"] = datetime.utcnow()
if "name" in update_data:
# Check if the new name already exists
existing = await Database.agents.find_one({
"name": update_data["name"],
"_id": {"$ne": ObjectId(agent_id)}
})
if existing:
raise ValueError(f"Agent with name '{update_data['name']}' already exists")
await Database.agents.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update_data}
)
updated_agent = await Database.agents.find_one({"_id": ObjectId(agent_id)})
logger.info(f"Updated agent {agent_id}")
return updated_agent
@staticmethod
async def delete_agent(agent_id: str) -> bool:
"""Delete an agent."""
if not ObjectId.is_valid(agent_id):
raise ValueError(f"Invalid agent ID: {agent_id}")
# Check if agent is used in any workflows
workflow_count = await Database.workflows.count_documents({"agent_ids": agent_id})
if workflow_count > 0:
raise ValueError(f"Cannot delete agent {agent_id} as it is used in {workflow_count} workflows")
result = await Database.agents.delete_one({"_id": ObjectId(agent_id)})
if result.deleted_count:
logger.info(f"Deleted agent {agent_id}")
return True
return False
@staticmethod
async def list_agents(
params: PaginationParams,
search: Optional[SearchParams] = None
) -> Tuple[List[Dict[str, Any]], int]:
"""List agents with pagination and search."""
query = {}
if search:
if search.query:
query["$text"] = {"$search": search.query}
if search.tags:
query["tags"] = {"$all": search.tags}
if search.status:
query["status"] = search.status
if search.start_date and search.end_date:
query["created_at"] = {
"$gte": search.start_date,
"$lte": search.end_date
}
elif search.start_date:
query["created_at"] = {"$gte": search.start_date}
elif search.end_date:
query["created_at"] = {"$lte": search.end_date}
total = await Database.agents.count_documents(query)
cursor = Database.agents.find(query)\
.sort("created_at", -1)\
.skip(params.skip)\
.limit(params.limit)
agents = await cursor.to_list(length=params.limit)
return agents, total
# Workflow Service
class WorkflowService:
@staticmethod
async def create_workflow(workflow_data: WorkflowCreate, user_id: Optional[str] = None) -> Dict[str, Any]:
"""Create a new workflow."""
workflow_dict = workflow_data.dict()
workflow_dict["created_by"] = user_id
workflow_dict["created_at"] = datetime.utcnow()
workflow_dict["updated_at"] = workflow_dict["created_at"]
workflow_dict["status"] = WorkflowStatus.CREATED
workflow_dict["version"] = 1
# Extract agent IDs from the workflow definition
agent_ids = set()
# Extract agent IDs from steps
steps = workflow_dict["definition"].get("steps", [])
for step in steps:
if "agent_id" in step:
agent_id = step["agent_id"]
# Validate agent exists
agent = await AgentService.get_agent(agent_id)
if not agent:
raise ValueError(f"Agent with ID {agent_id} does not exist")
agent_ids.add(agent_id)
workflow_dict["agent_ids"] = list(agent_ids)
# Check for existing workflow with the same name
existing = await Database.workflows.find_one({"name": workflow_dict["name"]})
if existing:
raise ValueError(f"Workflow with name '{workflow_dict['name']}' already exists")
result = await Database.workflows.insert_one(workflow_dict)
workflow_dict["_id"] = result.inserted_id
logger.info(f"Created workflow {workflow_dict['name']} with ID {result.inserted_id}")
return workflow_dict
@staticmethod
async def get_workflow(workflow_id: str) -> Optional[Dict[str, Any]]:
"""Get a workflow by ID."""
if not ObjectId.is_valid(workflow_id):
raise ValueError(f"Invalid workflow ID: {workflow_id}")
workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)})
return workflow
@staticmethod
async def get_workflow_by_name(name: str) -> Optional[Dict[str, Any]]:
"""Get a workflow by name."""
return await Database.workflows.find_one({"name": name})
@staticmethod
async def update_workflow(workflow_id: str, workflow_data: WorkflowUpdate) -> Optional[Dict[str, Any]]:
"""Update a workflow."""
if not ObjectId.is_valid(workflow_id):
raise ValueError(f"Invalid workflow ID: {workflow_id}")
workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)})
if not workflow:
return None
update_data = workflow_data.dict(exclude_unset=True)
update_data["updated_at"] = datetime.utcnow()
# Update version if definition changes
if "definition" in update_data:
update_data["version"] = workflow.get("version", 1) + 1
# Extract agent IDs from the updated workflow definition
agent_ids = set()
steps = update_data["definition"].get("steps", [])
for step in steps:
if "agent_id" in step:
agent_id = step["agent_id"]
# Validate agent exists
agent = await AgentService.get_agent(agent_id)
if not agent:
raise ValueError(f"Agent with ID {agent_id} does not exist")
agent_ids.add(agent_id)
update_data["agent_ids"] = list(agent_ids)
# Check for name conflict if name is being updated
if "name" in update_data:
existing = await Database.workflows.find_one({
"name": update_data["name"],
"_id": {"$ne": ObjectId(workflow_id)}
})
if existing:
raise ValueError(f"Workflow with name '{update_data['name']}' already exists")
await Database.workflows.update_one(
{"_id": ObjectId(workflow_id)},
{"$set": update_data}
)
updated_workflow = await Database.workflows.find_one({"_id": ObjectId(workflow_id)})
logger.info(f"Updated workflow {workflow_id}")
return updated_workflow
@staticmethod
async def delete_workflow(workflow_id: str) -> bool:
"""Delete a workflow."""
if not ObjectId.is_valid(workflow_id):
raise ValueError(f"Invalid workflow ID: {workflow_id}")
# Check if workflow has any ongoing or recent executions
recent_executions = await Database.executions.count_documents({
"workflow_id": workflow_id,
"status": {"$in": [
ExecutionStatus.PENDING,
ExecutionStatus.RUNNING
]}
})
if recent_executions > 0:
raise ValueError(f"Cannot delete workflow {workflow_id} with {recent_executions} active executions")
result = await Database.workflows.delete_one({"_id": ObjectId(workflow_id)})
if result.deleted_count:
# Delete associated execution logs
await Database.logs.delete_many({"workflow_id": workflow_id})
await Database.executions.delete_many({"workflow_id": workflow_id})
logger.info(f"Deleted workflow {workflow_id}")
return True
return False
@staticmethod
async def list_workflows(
params: PaginationParams,
search: Optional[SearchParams] = None
) -> Tuple[List[Dict[str, Any]], int]:
"""List workflows with pagination and search."""
query = {}
if search:
if search.query:
query["$text"] = {"$search": search.query}
if search.tags:
query["tags"] = {"$all": search.tags}
if search.status:
query["status"] = search.status
if search.start_date and search.end_date:
query["created_at"] = {
"$gte": search.start_date,
"$lte": search.end_date
}
elif search.start_date:
query["created_at"] = {"$gte": search.start_date}
elif search.end_date:
query["created_at"] = {"$lte": search.end_date}
total = await Database.workflows.count_documents(query)
cursor = Database.workflows.find(query)\
.sort("created_at", -1)\
.skip(params.skip)\
.limit(params.limit)
workflows = await cursor.to_list(length=params.limit)
return workflows, total
# Workflow Execution Service
class WorkflowExecutionService:
@staticmethod
async def create_execution(execution_data: ExecutionCreate, user_id: Optional[str] = None) -> Dict[str, Any]:
"""Create a new workflow execution."""
# Validate workflow exists
workflow = await WorkflowService.get_workflow(execution_data.workflow_id)
if not workflow:
raise ValueError(f"Workflow {execution_data.workflow_id} not found")
# Prepare execution document
execution_dict = {
"workflow_id": execution_data.workflow_id,
"status": ExecutionStatus.PENDING,
"start_time": datetime.utcnow(),
"input_params": execution_data.input_params,
"created_by": user_id,
"created_at": datetime.utcnow(),
"step_results": {},
"current_step": None,
"results": {},
"error_message": None
}
# Insert execution record
result = await Database.executions.insert_one(execution_dict)
execution_dict["_id"] = result.inserted_id
logger.info(f"Created workflow execution {result.inserted_id}")
# Optional: Queue execution for async processing
# This would typically use a task queue like Celery
# await execute_workflow_async.delay(execution_dict)
return execution_dict
@staticmethod
async def get_execution(execution_id: str) -> Optional[Dict[str, Any]]:
"""Get a workflow execution by ID."""
if not ObjectId.is_valid(execution_id):
raise ValueError(f"Invalid execution ID: {execution_id}")
execution = await Database.executions.find_one({"_id": ObjectId(execution_id)})
return execution
@staticmethod
async def update_execution_status(execution_id: str, status: ExecutionStatus, error_message: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Update execution status."""
if not ObjectId.is_valid(execution_id):
raise ValueError(f"Invalid execution ID: {execution_id}")
update_data = {
"status": status,
"updated_at": datetime.utcnow()
}
if status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.CANCELLED]:
update_data["end_time"] = datetime.utcnow()
if error_message:
update_data["error_message"] = error_message
result = await Database.executions.find_one_and_update(
{"_id": ObjectId(execution_id)},
{"$set": update_data},
return_document=True
)
return result
@staticmethod
async def list_executions(
workflow_id: Optional[str] = None,
params: PaginationParams = PaginationParams(),
search: Optional[SearchParams] = None
) -> Tuple[List[Dict[str, Any]], int]:
"""List workflow executions with pagination and search."""
query = {}
if workflow_id:
query["workflow_id"] = workflow_id
if search:
if search.status:
query["status"] = search.status
if search.start_date and search.end_date:
query["created_at"] = {
"$gte": search.start_date,
"$lte": search.end_date
}
elif search.start_date:
query["created_at"] = {"$gte": search.start_date}
elif search.end_date:
query["created_at"] = {"$lte": search.end_date}
total = await Database.executions.count_documents(query)
cursor = Database.executions.find(query)\
.sort("created_at", -1)\
.skip(params.skip)\
.limit(params.limit)
executions = await cursor.to_list(length=params.limit)
return executions, total
@staticmethod
async def log_execution_event(
workflow_id: str,
execution_id: str,
message: str,
step_id: Optional[str] = None,
agent_id: Optional[str] = None,
level: str = "INFO",
details: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Log an event in a workflow execution."""
log_entry = {
"workflow_id": workflow_id,
"execution_id": execution_id,
"step_id": step_id,
"agent_id": agent_id,
"timestamp": datetime.utcnow(),
"level": level,
"message": message,
"details": details or {}
}
result = await Database.logs.insert_one(log_entry)
log_entry["_id"] = result.inserted_id
return log_entry
@staticmethod
async def get_execution_logs(
execution_id: str,
params: PaginationParams = PaginationParams()
) -> Tuple[List[Dict[str, Any]], int]:
"""Retrieve logs for a specific execution."""
query = {"execution_id": execution_id}
total = await Database.logs.count_documents(query)
cursor = Database.logs.find(query)\
.sort("timestamp", 1)\
.skip(params.skip)\
.limit(params.limit)
logs = await cursor.to_list(length=params.limit)
return logs, total