|
|
""" |
|
|
Database connection and models for EvoAgentX. |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from datetime import datetime |
|
|
from enum import Enum |
|
|
from typing import Optional, List, Dict, Any |
|
|
from motor.motor_asyncio import AsyncIOMotorClient |
|
|
from pymongo import ASCENDING, TEXT |
|
|
from pydantic_core import core_schema |
|
|
from bson import ObjectId |
|
|
from pydantic import GetCoreSchemaHandler |
|
|
from pydantic import Field, BaseModel |
|
|
from evoagentx.app.config import settings |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class PyObjectId(ObjectId): |
|
|
@classmethod |
|
|
def __get_pydantic_core_schema__(cls, source_type, handler: GetCoreSchemaHandler): |
|
|
return core_schema.no_info_after_validator_function(cls.validate, core_schema.str_schema()) |
|
|
|
|
|
@classmethod |
|
|
def validate(cls, v): |
|
|
if not ObjectId.is_valid(v): |
|
|
raise ValueError("Invalid ObjectId") |
|
|
return ObjectId(v) |
|
|
|
|
|
|
|
|
class MongoBaseModel(BaseModel): |
|
|
id: Optional[PyObjectId] = Field(alias="_id", default=None) |
|
|
|
|
|
model_config = { |
|
|
"protected_namespaces": (), |
|
|
"populate_by_name": True, |
|
|
"arbitrary_types_allowed": True, |
|
|
"json_encoders": { |
|
|
ObjectId: str |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class AgentStatus(str, Enum): |
|
|
CREATED = "created" |
|
|
ACTIVE = "active" |
|
|
INACTIVE = "inactive" |
|
|
ERROR = "error" |
|
|
|
|
|
class WorkflowStatus(str, Enum): |
|
|
CREATED = "created" |
|
|
RUNNING = "running" |
|
|
COMPLETED = "completed" |
|
|
FAILED = "failed" |
|
|
CANCELLED = "cancelled" |
|
|
|
|
|
class ExecutionStatus(str, Enum): |
|
|
PENDING = "pending" |
|
|
RUNNING = "running" |
|
|
COMPLETED = "completed" |
|
|
FAILED = "failed" |
|
|
TIMEOUT = "timeout" |
|
|
CANCELLED = "cancelled" |
|
|
|
|
|
|
|
|
class Agent(MongoBaseModel): |
|
|
id: str = Field(..., alias="_id") |
|
|
name: str |
|
|
description: Optional[str] = None |
|
|
config: Dict[str, Any] |
|
|
state: Dict[str, Any] = Field(default_factory=dict) |
|
|
runtime_params: Dict[str, Any] = Field(default_factory=dict) |
|
|
status: AgentStatus = AgentStatus.CREATED |
|
|
created_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
updated_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
created_by: Optional[str] = None |
|
|
tags: List[str] = Field(default_factory=list) |
|
|
|
|
|
class Workflow(MongoBaseModel): |
|
|
id: str = Field(..., alias="_id") |
|
|
name: str |
|
|
description: Optional[str] = None |
|
|
definition: Dict[str, Any] |
|
|
agent_ids: List[str] = Field(default_factory=list) |
|
|
status: WorkflowStatus = WorkflowStatus.CREATED |
|
|
created_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
updated_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
created_by: Optional[str] = None |
|
|
tags: List[str] = Field(default_factory=list) |
|
|
version: int = 1 |
|
|
|
|
|
class ExecutionLog(MongoBaseModel): |
|
|
workflow_id: str |
|
|
execution_id: str |
|
|
step_id: Optional[str] = None |
|
|
agent_id: Optional[str] = None |
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow) |
|
|
level: str = "INFO" |
|
|
message: str |
|
|
details: Dict[str, Any] = Field(default_factory=dict) |
|
|
|
|
|
class WorkflowExecution(MongoBaseModel): |
|
|
workflow_id: str |
|
|
status: ExecutionStatus = ExecutionStatus.PENDING |
|
|
start_time: Optional[datetime] = None |
|
|
end_time: Optional[datetime] = None |
|
|
input_params: Dict[str, Any] = Field(default_factory=dict) |
|
|
results: Dict[str, Any] = Field(default_factory=dict) |
|
|
created_by: Optional[str] = None |
|
|
step_results: Dict[str, Dict[str, Any]] = Field(default_factory=dict) |
|
|
current_step: Optional[str] = None |
|
|
error_message: Optional[str] = None |
|
|
created_at: datetime = Field(default_factory=datetime.utcnow) |
|
|
|
|
|
|
|
|
class Database: |
|
|
client: AsyncIOMotorClient = None |
|
|
db = None |
|
|
|
|
|
|
|
|
agents = None |
|
|
workflows = None |
|
|
executions = None |
|
|
logs = None |
|
|
|
|
|
@classmethod |
|
|
async def connect(cls): |
|
|
"""Connect to MongoDB""" |
|
|
logger.info(f"Connecting to MongoDB at {settings.MONGODB_URL}...") |
|
|
cls.client = AsyncIOMotorClient(settings.MONGODB_URL) |
|
|
cls.db = cls.client[settings.MONGODB_DB_NAME] |
|
|
|
|
|
|
|
|
cls.agents = cls.db.agents |
|
|
cls.workflows = cls.db.workflows |
|
|
cls.executions = cls.db.workflow_executions |
|
|
cls.logs = cls.db.execution_logs |
|
|
|
|
|
|
|
|
await cls._create_indexes() |
|
|
|
|
|
logger.info("Connected to MongoDB successfully") |
|
|
|
|
|
@classmethod |
|
|
async def disconnect(cls): |
|
|
"""Disconnect from MongoDB""" |
|
|
if cls.client: |
|
|
cls.client.close() |
|
|
logger.info("Disconnected from MongoDB") |
|
|
|
|
|
@classmethod |
|
|
async def _create_indexes(cls): |
|
|
"""Create indexes for collections""" |
|
|
|
|
|
await cls.agents.create_index([("name", ASCENDING)], unique=True) |
|
|
await cls.agents.create_index([("name", TEXT), ("description", TEXT)]) |
|
|
await cls.agents.create_index([("created_at", ASCENDING)]) |
|
|
await cls.agents.create_index([("tags", ASCENDING)]) |
|
|
|
|
|
|
|
|
await cls.workflows.create_index([("name", ASCENDING)]) |
|
|
await cls.workflows.create_index([("name", TEXT), ("description", TEXT)]) |
|
|
await cls.workflows.create_index([("created_at", ASCENDING)]) |
|
|
await cls.workflows.create_index([("agent_ids", ASCENDING)]) |
|
|
await cls.workflows.create_index([("tags", ASCENDING)]) |
|
|
|
|
|
|
|
|
await cls.executions.create_index([("workflow_id", ASCENDING)]) |
|
|
await cls.executions.create_index([("created_at", ASCENDING)]) |
|
|
await cls.executions.create_index([("status", ASCENDING)]) |
|
|
|
|
|
|
|
|
await cls.logs.create_index([("execution_id", ASCENDING)]) |
|
|
await cls.logs.create_index([("timestamp", ASCENDING)]) |
|
|
await cls.logs.create_index([("workflow_id", ASCENDING), ("execution_id", ASCENDING)]) |