File size: 3,664 Bytes
77320e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | from __future__ import annotations
import abc
from dataclasses import dataclass, field
from enum import Enum
from typing import List, NamedTuple, Optional, Union
from pydantic import BaseModel
from ..schemas.sandbox_models import *
@dataclass
class BaseAgentResponse:
"""Base Agent step result, contains formatted output string."""
formatted_output: str
raw_output: str
@dataclass
class AgentAction(BaseAgentResponse):
"""
Agent's action to take.
"""
tool: str
tool_input: Union[str, dict]
@dataclass
class AgentObservation(BaseAgentResponse):
"""
Agent's action to take.
"""
tool: str
@dataclass
class AgentFinish(BaseAgentResponse):
"""Agent's return value when finishing execution."""
pass
class AgentType(Enum):
"""
Enumerated type for agent types.
"""
openai = "openai"
react = "react"
rewoo = "rewoo"
vanilla = "vanilla"
openai_memory = "openai_memory"
@staticmethod
def get_agent_class(_type: AgentType):
"""
Get agent class from agent type.
:param _type: agent type
:return: agent class
"""
if _type == AgentType.react:
from ..agent.react import ReactAgent
return ReactAgent
else:
raise ValueError(f"Unknown agent type: {_type}")
class AgentOutput(BaseModel):
"""
Pydantic model for agent output.
"""
output: str
cost: float
token_usage: int
@dataclass
class AgentRequest:
sandbox_id: Optional[str] = None
messages: List[Message] = field(default_factory=list)
input_files: List[MediaFile] = field(default_factory=list)
sandbox_status: Optional[SandboxStatus] = None
is_cn: bool = False
@dataclass
class AgentResponse:
output_text: str
raw_output_text: str
output_files: List[MediaFile] = field(default_factory=list)
sandbox_id: Optional[str] = None
sandbox_status: Optional[SandboxStatus] = None
turn_level_prompt: Optional[List[str]] = None
turn_level_response: Optional[List[str]] = None
class RoleType(Enum):
User = 0
System = 1
Agent = 2
@classmethod
def _missing_(cls, name):
# If the input is a string, perform case-insensitive matching
if isinstance(name, str):
for member in cls:
if member.name.lower() == name.lower():
return member
return super()._missing_(name)
@dataclass
class Message(abc.ABC):
role: RoleType
content: str
raw_content: str = ""
@staticmethod
def parse_from_dict(data):
data['role'] = RoleType(data['role'])
# Add a check for raw_content in legacy data
if 'raw_content' not in data:
data['raw_content'] = ""
return Message(**data)
def to_dict(self):
role_value = self.role.value if isinstance(self.role, RoleType) else self.role
return {
"role": role_value,
"content": self.content, # Fixed the missing comma here
"raw_content": self.raw_content
}
@dataclass
class MediaFile:
file_name: Optional[str] = None
file_content: Optional[bytes] = None
tos_path: Optional[str] = None
sandbox_path: Optional[str] = None
def __dict__(self):
return {
'file_name': self.file_name if self.file_name is not None else "",
'file_content': self.file_content if self.file_content is not None else "",
'tos_path': self.tos_path if self.tos_path is not None else "",
'sandbox_path': self.sandbox_path if self.sandbox_path is not None else "",
}
|