Spaces:
Sleeping
Sleeping
| """ | |
| Execution state and result models | |
| """ | |
| from pydantic import BaseModel, Field | |
| from typing import Optional, Any | |
| from enum import Enum | |
| from datetime import datetime | |
| import uuid | |
| class ExecutionStatus(str, Enum): | |
| """Status of workflow execution""" | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| CANCELLED = "cancelled" | |
| class NodeResult(BaseModel): | |
| """Result of executing a single node""" | |
| node_id: str | |
| status: ExecutionStatus | |
| output: Optional[Any] = None | |
| error: Optional[str] = None | |
| started_at: Optional[str] = None | |
| completed_at: Optional[str] = None | |
| duration_ms: Optional[float] = None | |
| class ExecutionState(BaseModel): | |
| """Current state of workflow execution""" | |
| execution_id: str = Field(default_factory=lambda: str(uuid.uuid4())) | |
| workflow_id: str | |
| status: ExecutionStatus = ExecutionStatus.PENDING | |
| current_node: Optional[str] = None | |
| node_results: dict[str, NodeResult] = Field(default_factory=dict) | |
| variables: dict[str, Any] = Field(default_factory=dict) # Shared state | |
| started_at: Optional[str] = None | |
| completed_at: Optional[str] = None | |
| class ExecutionResult(BaseModel): | |
| """Final result of workflow execution""" | |
| execution_id: str | |
| workflow_id: str | |
| status: ExecutionStatus | |
| node_results: list[NodeResult] | |
| final_output: Optional[Any] = None | |
| error: Optional[str] = None | |
| total_duration_ms: Optional[float] = None | |
| started_at: str | |
| completed_at: str | |
| class GuardianAlert(BaseModel): | |
| """Alert from the Guardian system""" | |
| id: str = Field(default_factory=lambda: str(uuid.uuid4())) | |
| level: str # info, warning, error | |
| message: str | |
| node_id: Optional[str] = None | |
| suggestion: Optional[str] = None | |
| timestamp: str = Field(default_factory=lambda: datetime.now().isoformat()) | |