File size: 7,083 Bytes
7a2fae0
 
8153ec2
 
 
7a2fae0
 
8153ec2
 
7a2fae0
8153ec2
 
7a2fae0
 
 
8153ec2
 
7a2fae0
 
 
 
 
8153ec2
 
 
7a2fae0
8153ec2
 
7a2fae0
8153ec2
 
 
7a2fae0
8153ec2
 
7a2fae0
8153ec2
7a2fae0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8153ec2
 
7a2fae0
8153ec2
7a2fae0
8153ec2
7a2fae0
 
 
 
8153ec2
7a2fae0
8153ec2
 
 
 
7a2fae0
 
 
 
 
 
 
8153ec2
7a2fae0
 
 
8153ec2
7a2fae0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import os
import asyncio
from appagents.FinancialAgent import FinancialAgent
from appagents.NewsAgent import NewsAgent
from appagents.SearchAgent import SearchAgent
from appagents.InputValidationAgent import input_validation_guardrail
from agents import Agent, OpenAIChatCompletionsModel, InputGuardrail
from openai import AsyncOpenAI


class OrchestratorAgent:
    """
    The OrchestratorAgent coordinates multiple specialized sub-agents
    (Financial, News, and Search) to provide accurate, up-to-date,
    and well-routed market research insights.
    """

    MAX_RETRIES = 2

    # ----------------------------------------------------------
    # MAIN CREATION METHOD
    # ----------------------------------------------------------
    @staticmethod
    def create(model: str = "gpt-4o-mini"):
        """
        Creates and returns a configured Orchestrator agent.
        """

        # --- Sub-agent setup ---
        handoffs = [
            FinancialAgent.create(),
            NewsAgent.create(),
            SearchAgent.create(),
        ]

        # --- Behavioral instructions ---
        instructions = """
        You are the Orchestrator Agent responsible for coordinating specialized sub-agents 
        to generate accurate and well-rounded market research responses.

        **Your Core Responsibilities**
        1. **Task Routing:** Determine which sub-agent (Financial, News, or Search) is best suited 
           to handle each user query based on intent and context.
        2. **Delegation:** Forward the request to the appropriate sub-agent and wait for its result.
        3. **Synthesis:** When multiple agents provide responses, summarize and merge their findings 
           into a clear, concise, and accurate overall answer.
        4. **Recency and Accuracy:** Prioritize the most up-to-date, verifiable data from sub-agents.
        5. **Transparency:** Clearly identify which insights came from which sub-agent when relevant.
        6. **Error Handling:** If a sub-agent fails or provides insufficient data, attempt fallback 
           strategies such as rerouting the query or notifying the user.
        7. **Clarity:** Always present the final response in a professional, well-structured, 
           and easy-to-understand format.

        ⚠️ Do **not** perform the underlying data analysis or external lookup yourself — 
        ALWAYS delegate those tasks to the respective sub-agents.
        """

        # --- Model setup ---
        GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/"
        google_api_key = os.getenv("GOOGLE_API_KEY")
        gemini_client = AsyncOpenAI(base_url=GEMINI_BASE_URL, api_key=google_api_key)
        gemini_model = OpenAIChatCompletionsModel(
            model="gemini-2.0-flash",
            openai_client=gemini_client
        )

        # --- Create orchestrator agent ---
        agent = Agent(
            name="AI Market Research Assistant",
            handoffs=handoffs,
            instructions=instructions.strip(),
            model=gemini_model,
            # input_guardrails=[
            #     InputGuardrail(
            #         name="Input Validation Guardrail",
            #         guardrail_function=input_validation_guardrail,
            #     )
            # ],
        )

        # Attach orchestration logic
        agent.respond = lambda prompt: OrchestratorAgent.respond(prompt, handoffs, gemini_model)
        return agent

    # ----------------------------------------------------------
    # RESPONSE HANDLING + SELF-CORRECTION
    # ----------------------------------------------------------
    @staticmethod
    async def respond(prompt: str, handoffs: list, model) -> str:
        """
        Routes prompt to the most relevant agent, retries if output seems irrelevant.
        """
        attempted_agents = set()

        for attempt in range(OrchestratorAgent.MAX_RETRIES):
            # Step 1: Route intelligently
            chosen_agent = await OrchestratorAgent._route_to_agent(prompt, handoffs, attempted_agents)
            if not chosen_agent:
                return "⚠️ No available agent could handle this query."

            print(f"🤖 Attempt {attempt+1}: Sending query to {chosen_agent.name}")

            # Step 2: Run agent
            try:
                response = await chosen_agent.run(prompt)
            except Exception as e:
                print(f"⚠️ Agent {chosen_agent.name} failed: {e}")
                attempted_agents.add(chosen_agent.name)
                continue

            # Step 3: Evaluate if relevant
            if await OrchestratorAgent._is_relevant(prompt, response, model):
                return f"✅ {chosen_agent.name} handled this successfully:\n\n{response}"

            print(f"🔁 {chosen_agent.name}'s response deemed irrelevant. Re-routing...")
            attempted_agents.add(chosen_agent.name)

        return "⚠️ Could not find a relevant answer after multiple attempts."

    # ----------------------------------------------------------
    # ROUTING LOGIC
    # ----------------------------------------------------------
    @staticmethod
    async def _route_to_agent(prompt: str, handoffs: list, attempted_agents: set):
        """
        Determines the best-fit agent for the given prompt.
        Avoids previously tried agents.
        """
        lowered = prompt.lower()
        available = [a for a in handoffs if a.name not in attempted_agents]

        if not available:
            return None

        if any(k in lowered for k in ["finance", "stock", "market", "earnings"]):
            return next((a for a in available if "financial" in a.name.lower()), available[0])
        elif any(k in lowered for k in ["news", "headline", "press release"]):
            return next((a for a in available if "news" in a.name.lower()), available[0])
        elif any(k in lowered for k in ["search", "find", "lookup", "discover"]):
            return next((a for a in available if "search" in a.name.lower()), available[0])
        else:
            # fallback — first available agent
            return available[0]

    # ----------------------------------------------------------
    # LLM-BASED EVALUATOR
    # ----------------------------------------------------------
    @staticmethod
    async def _is_relevant(prompt: str, response: str, model) -> bool:
        """
        Uses the model itself to check if the response matches the prompt intent.
        """
        eval_prompt = f"""
        You are an evaluator checking multi-agent responses.
        User asked: "{prompt}"
        Agent responded: "{response}"

        Does this response accurately and completely answer the user's intent?
        Reply with only 'yes' or 'no'.
        """
        try:
            eval_result = await model.run(eval_prompt)
            print(f"🧠 Evaluation result: {eval_result}")
            return "yes" in eval_result.lower()
        except Exception as e:
            print(f"⚠️ Evaluation failed: {e}")
            return False