Spaces:
Running
Running
File size: 5,689 Bytes
87444a0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
from typing import TypedDict, Literal, List, Optional, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field
from enum import Enum
class MessageRole(str, Enum):
"""Enum for message roles"""
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
AGENT = "agent"
class AgentType(str, Enum):
"""Enum for different agent types"""
SUPERVISOR = "supervisor"
CRYPTO_DATA = "crypto_data"
GENERAL = "general"
RESEARCH = "research"
ANALYSIS = "analysis"
class MessageStatus(str, Enum):
"""Enum for message processing status"""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class ChatMessage(BaseModel):
"""Enhanced chat message model for multi-agent conversations"""
# Core message fields
role: MessageRole = Field(..., description="Role of the message sender")
content: str = Field(..., description="Message content")
# Agent-specific fields
agent_name: Optional[str] = Field(None, description="Name of the agent that processed this message")
agent_type: Optional[AgentType] = Field(None, description="Type of agent that processed this message")
requires_action: bool = Field(default=False, description="Whether this message requires followup")
action_type: Optional[str] = Field(None, description="Type of action required")
# Metadata and context
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
timestamp: datetime = Field(default_factory=datetime.utcnow, description="Message timestamp")
message_id: Optional[str] = Field(None, description="Unique message identifier")
# Processing status
status: MessageStatus = Field(default=MessageStatus.COMPLETED, description="Message processing status")
error_message: Optional[str] = Field(None, description="Error message if processing failed")
# Conversation context
conversation_id: Optional[str] = Field(None, description="Conversation identifier")
user_id: Optional[str] = Field(None, description="User identifier")
# Tool calls and responses
tool_calls: Optional[List[Dict[str, Any]]] = Field(None, description="Tool calls made by the agent")
tool_results: Optional[List[Dict[str, Any]]] = Field(None, description="Results from tool executions")
# Multi-turn conversation support
next_agent: Optional[str] = Field(None, description="Next agent to handle the conversation")
requires_followup: bool = Field(default=False, description="Whether this message requires followup")
class Config:
use_enum_values = True
json_encoders = {
datetime: lambda v: v.isoformat()
}
class ConversationState(BaseModel):
"""State management for multi-agent conversations"""
conversation_id: str = Field(..., description="Unique conversation identifier")
user_id: str = Field(..., description="User identifier")
# Current state
current_agent: Optional[str] = Field(None, description="Currently active agent")
last_message_id: Optional[str] = Field(None, description="ID of the last message")
# Conversation history
messages: List[ChatMessage] = Field(default_factory=list, description="Message history")
# Context and memory
context: Dict[str, Any] = Field(default_factory=dict, description="Conversation context")
memory: Dict[str, Any] = Field(default_factory=dict, description="Persistent memory across turns")
# Agent routing history
agent_history: List[Dict[str, Any]] = Field(default_factory=list, description="History of agent interactions")
# Status and metadata
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
is_active: bool = Field(default=True, description="Whether conversation is active")
class Config:
use_enum_values = True
json_encoders = {
datetime: lambda v: v.isoformat()
}
class AgentResponse(BaseModel):
"""Standardized response format for agents"""
content: str = Field(..., description="Response content")
agent_name: str = Field(..., description="Name of the responding agent")
agent_type: AgentType = Field(..., description="Type of the responding agent")
# Metadata
metadata: Dict[str, Any] = Field(default_factory=dict, description="Response metadata")
timestamp: datetime = Field(default_factory=datetime.utcnow)
# Tool information
tools_used: List[str] = Field(default_factory=list, description="Tools used in this response")
tool_results: Optional[List[Dict[str, Any]]] = Field(None, description="Results from tool executions")
# Next steps
next_agent: Optional[str] = Field(None, description="Next agent to handle the conversation")
requires_followup: bool = Field(default=False, description="Whether followup is needed")
# Status
success: bool = Field(default=True, description="Whether the response was successful")
error_message: Optional[str] = Field(None, description="Error message if failed")
class Config:
use_enum_values = True
json_encoders = {
datetime: lambda v: v.isoformat()
}
# TypedDict for backward compatibility
class ChatMessageDict(TypedDict):
role: str
content: str
agent_name: Optional[str]
metadata: Dict[str, Any]
timestamp: str
message_id: Optional[str]
status: str
conversation_id: Optional[str]
user_id: Optional[str]
|