gMAS / web_ui /backend /models /execution.py
Артём Боярских
fix: fixed some bugs
81f5c1c
"""Pydantic schemas for execution API requests and responses."""
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
from .graph import GraphSaveRequest
class BudgetConfigSchema(BaseModel):
"""Budget configuration for execution."""
total_token_limit: int | None = None
total_request_limit: int | None = None
max_prompt_length: int = 4000
max_completion_length: int = 2000
timeout_seconds: float | None = None
# ---------------------------------------------------------------------------
# Early-stop conditions (declarative schema sent from the UI)
# ---------------------------------------------------------------------------
class EarlyStopType(str, Enum):
KEYWORD = "keyword"
TOKEN_LIMIT = "token_limit"
AGENT_COUNT = "agent_count"
class EarlyStopConditionSchema(BaseModel):
"""One early-stop rule configured in the UI."""
type: EarlyStopType
keyword: str | None = None # for KEYWORD
max_tokens: int | None = None # for TOKEN_LIMIT
max_agents: int | None = None # for AGENT_COUNT
# ---------------------------------------------------------------------------
# Topology hooks (predefined hook presets selectable from the UI)
# ---------------------------------------------------------------------------
class TopologyHookType(str, Enum):
STOP_ON_KEYWORD = "stop_on_keyword"
SKIP_ON_TOKEN_BUDGET = "skip_on_token_budget"
FORCE_REVIEWER_ON_ERROR = "force_reviewer_on_error"
INSERT_CHAIN_ON_KEYWORD = "insert_chain_on_keyword"
ADD_EDGE_ON_KEYWORD = "add_edge_on_keyword"
REDIRECT_END_ON_KEYWORD = "redirect_end_on_keyword"
SKIP_AGENT_ON_KEYWORD = "skip_agent_on_keyword"
class TopologyHookSchema(BaseModel):
"""A predefined topology-hook preset."""
type: TopologyHookType
keyword: str | None = None # for keyword-based hooks
token_threshold: int | None = None # for SKIP_ON_TOKEN_BUDGET
reviewer_agent_id: str | None = None # for FORCE_REVIEWER_ON_ERROR
source_agent: str | None = None # for edge/chain hooks
target_agent: str | None = None # for edge/chain/redirect hooks
weight: float = 1.0 # for ADD_EDGE_ON_KEYWORD
class RunnerConfigSchema(BaseModel):
"""Runner configuration mirroring gMAS RunnerConfig."""
timeout: float = 60.0
adaptive: bool = False
enable_parallel: bool = True
max_parallel_size: int = 5
max_retries: int = 2
enable_memory: bool = False
memory_context_limit: int = 5
broadcast_task_to_all: bool = True
enable_dynamic_topology: bool = False
max_tool_iterations: int = 3
budget_config: BudgetConfigSchema | None = None
early_stop_conditions: list[EarlyStopConditionSchema] = Field(default_factory=list)
topology_hooks: list[TopologyHookSchema] = Field(default_factory=list)
class LLMProviderConfig(BaseModel):
"""LLM provider configuration."""
provider_id: str
name: str
base_url: str
api_key: str # Can use $ENV_VAR format
default_model: str | None = None
class ExecutionRequest(BaseModel):
"""Request to start a workflow execution."""
graph_id: str | None = None
graph: GraphSaveRequest | None = None
task_query: str
config: RunnerConfigSchema | None = None
llm_provider: LLMProviderConfig | None = None
class StreamEventResponse(BaseModel):
"""A streaming event sent over WebSocket."""
event_type: str
timestamp: str
run_id: str | None = None
data: dict[str, Any] = Field(default_factory=dict)
class AgentStatusEntry(BaseModel):
"""Status of a single agent during execution."""
agent_id: str
status: str # "pending", "running", "completed", "error"
response: str | None = None
tokens_used: int = 0
duration_ms: float = 0.0
error: str | None = None
class ExecutionStatusResponse(BaseModel):
"""Response for execution status query."""
run_id: str
status: str # "running", "completed", "error", "cancelled"
agent_statuses: list[AgentStatusEntry] = Field(default_factory=list)
events: list[StreamEventResponse] = Field(default_factory=list)
result: dict[str, Any] | None = None
started_at: str = ""
completed_at: str | None = None
class RunHistoryEntry(BaseModel):
"""Summary entry for run history."""
run_id: str
graph_id: str | None = None
graph_name: str = ""
task_query: str = ""
status: str
total_tokens: int = 0
total_time: float = 0.0
agent_count: int = 0
started_at: str = ""
completed_at: str | None = None