File size: 10,424 Bytes
9419f40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any
import json
import logging

from memory import BaseMemory
from base_tool import BaseTool 
from llm_provider import LLMProvider, LLMResponse

logger = logging.getLogger("AgentFramework")

@dataclass
class AgentResponse:
    content: str                  
    metadata: Dict[str, Any] = field(default_factory=dict) 

class BaseAgent(ABC):
    """
    The parent class for all agents. 
    Now accepts a clean list of 'BaseTool' objects.
    """
    def __init__(self, name: str, tools: List[BaseTool], system_prompt: str = "You are a helpful assistant."):
        self.name = name
        self.system_prompt = system_prompt
        
        # 1. Build the Registry (Map Name -> Function) for execution
        self.tool_registry = {tool.name: tool.run for tool in tools}
        
        # 2. Build the Definitions (List of Schemas) for the LLM
        self.tool_definitions = [tool.get_schema() for tool in tools]

    @abstractmethod
    def process_query(self, user_query: str, provider: LLMProvider) -> AgentResponse:
        pass


class SingleAgent(BaseAgent):
    """
    A standard worker agent that uses the provided BaseTools to answer queries.
    """
    def __init__(self, name: str, tools: List[BaseTool], system_prompt: str = "You are a helpful assistant."):
        # Pass the tool objects directly to the parent
        super().__init__(name, tools, system_prompt)

    def process_query(self, user_query: str, provider: LLMProvider) -> AgentResponse:
        messages = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": user_query}
        ]

        logger.info(f"\nπŸš€ [{self.name}] Starting Loop...")

        for turn in range(5):
            logger.info(f"--- Turn {turn + 1} ---")
            
            # 1. Ask the Provider (Using the internally built definitions)
            response: LLMResponse = provider.get_response(messages, self.tool_definitions)

            # 2. Handle Tool Calls
            if response.tool_call:
                tool_name = response.tool_call["name"]
                tool_args = response.tool_call["args"]
                tool_id = response.tool_call.get("id", "call_default")
                
                logger.info(f"πŸ€– Agent Intent: Call `{tool_name}` with {tool_args}")

                if tool_name in self.tool_registry:
                    messages.append({
                        "role": "assistant",
                        "content": None, 
                        "tool_calls": [{"id": tool_id, "type": "function", "function": {"name": tool_name, "arguments": json.dumps(tool_args)}}]
                    })

                    try:
                        # Execution uses the registry built in __init__
                        tool_func = self.tool_registry[tool_name]
                        raw_result = tool_func(**tool_args)
                        result_str = json.dumps(raw_result) if not isinstance(raw_result, str) else raw_result
                        
                        logger.info(f"Tool Output: {result_str}")
                        messages.append({"role": "tool", "tool_call_id": tool_id, "name": tool_name, "content": result_str})

                    except Exception as e:
                        error_msg = f"Tool Execution Failed: {str(e)}"
                        logger.error(error_msg)
                        messages.append({"role": "tool", "tool_call_id": tool_id, "name": tool_name, "content": error_msg})
                    continue 
                else:
                    messages.append({"role": "tool", "tool_call_id": tool_id, "name": tool_name, "content": f"❌ Unknown tool '{tool_name}'"})
                    continue 

            # 3. Handle Final Answer
            if response.content:
                logger.info(f"[{self.name}] Final Answer: {response.content}")
                return AgentResponse(content=response.content, metadata={"final_answer": response.content})
            
        return AgentResponse(content="Agent timed out.", metadata={"error": "Timeout"})

class ManagerAgent(BaseAgent):
    """
    The Brain. 
    It treats its sub-agents as 'Tools' and dynamically decides which one to call.
    Now equipped with Short-Term Memory!
    """
    def __init__(self, name: str, sub_agents: Dict[str, SingleAgent], memory: BaseMemory, system_prompt: str = "You are a manager."):
        super().__init__(name, tools=[], system_prompt=system_prompt)
        self.sub_agents = sub_agents
        self.memory = memory
        self.delegation_definitions = self._build_delegation_definitions()

    def _build_delegation_definitions(self) -> List[Dict]:
        """
        Dynamically creates OpenAI-compatible function schemas for each sub-agent.
        """
        definitions = []
        for agent_name, agent in self.sub_agents.items():
            agent_desc = getattr(agent, "description", "A helper agent.")   
            schema = {
                "type": "function",
                "function": {
                    "name": f"delegate_to_{agent_name}",
                    "description": f"Delegate a query to the {agent_name}. Capability: {agent_desc}",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string", 
                                "description": "The specific question or instruction for this worker."
                            }
                        },
                        "required": ["query"]
                    }
                }
            }
            definitions.append(schema)
        return definitions

    def process_query(self, user_query: str, provider: LLMProvider) -> AgentResponse:
        """
        The Manager's Thinking Loop.
        It decides: Do I answer myself? Or do I call a worker?
        """

        # 1. Save User Query to Memory
        self.memory.add_message(role="user", content=user_query)

        # 2. Construct the Context (System Prompt + History)
        team_roster = ", ".join(self.sub_agents.keys())
        enhanced_system_prompt = (
            f"{self.system_prompt}\n"
            f"You manage a team of agents: [{team_roster}].\n"
            f"Delegate tasks to them using the available tools.\n"
            f"Combine their outputs into a comprehensive final answer."
            f"Use the conversation history to answer follow-up questions."
        )

        # Start with System Prompt
        messages = [{"role": "system", "content": enhanced_system_prompt}]

        # Add Conversation History
        history = self.memory.get_history()
        messages.extend(history)

        logger.info(f"πŸ‘‘ [{self.name}] Starting Orchestration Loop...")

        # 3. Start the Loop (Max 5 turns)
        for turn in range(5):
            logger.info(f"--- Manager Turn {turn + 1} ---")
            
            # A. Ask the Provider
            response: LLMResponse = provider.get_response(messages, self.delegation_definitions)

            # B. Handle "Virtual Tool" Calls (Delegation)
            if response.tool_call:
                tool_name = response.tool_call["name"]
                tool_args = response.tool_call["args"]
                tool_id = response.tool_call.get("id", "call_mgr")
                
                if tool_name.startswith("delegate_to_"):
                    agent_name = tool_name.replace("delegate_to_", "")
                    
                    if agent_name in self.sub_agents:
                        logger.info(f"πŸ‘‘ -> πŸ‘· Delegating to {agent_name}: {tool_args.get('query')}")
                        
                        # Record the "Thought" (Tool Call)
                        messages.append({
                            "role": "assistant",
                            "content": None,
                            "tool_calls": [{
                                "id": tool_id,
                                "type": "function",
                                "function": {"name": tool_name, "arguments": json.dumps(tool_args)}
                            }]
                        })

                        # EXECUTE THE WORKER
                        worker_agent = self.sub_agents[agent_name]
                        worker_query = tool_args.get("query")
                        
                        try:
                            # Worker runs its own loop (stateless for now)
                            worker_response = worker_agent.process_query(worker_query, provider)
                            worker_content = worker_response.content
                            logger.info(f"πŸ‘· -> πŸ‘‘ {agent_name} replied.")

                        except Exception as e:
                            worker_content = f"Error from {agent_name}: {str(e)}"
                            logger.error(worker_content)

                        # Record the "Observation" (Tool Output)
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_id,
                            "name": tool_name,
                            "content": f"Output from {agent_name}:\n{worker_content}"
                        })
                        continue
                    else:
                        logger.warning(f"❌ Manager tried to call unknown agent: {agent_name}")
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_id,
                            "name": tool_name,
                            "content": f"Error: Agent {agent_name} does not exist."
                        })
                        continue
            
            # C. Handle Final Answer (Synthesis)
            if response.content:
                logger.info(f"βœ… [{self.name}] Final Synthesis: {response.content}")
                
                # 4. Save Assistant Answer to Memory
                self.memory.add_message(role="assistant", content=response.content)
                
                return AgentResponse(content=response.content)

        return AgentResponse(content="Manager timed out while coordinating agents.", metadata={"error": "timeout"})