Spaces:
Runtime error
Runtime error
File size: 7,083 Bytes
7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 8153ec2 7a2fae0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | import os
import asyncio
from appagents.FinancialAgent import FinancialAgent
from appagents.NewsAgent import NewsAgent
from appagents.SearchAgent import SearchAgent
from appagents.InputValidationAgent import input_validation_guardrail
from agents import Agent, OpenAIChatCompletionsModel, InputGuardrail
from openai import AsyncOpenAI
class OrchestratorAgent:
"""
The OrchestratorAgent coordinates multiple specialized sub-agents
(Financial, News, and Search) to provide accurate, up-to-date,
and well-routed market research insights.
"""
MAX_RETRIES = 2
# ----------------------------------------------------------
# MAIN CREATION METHOD
# ----------------------------------------------------------
@staticmethod
def create(model: str = "gpt-4o-mini"):
"""
Creates and returns a configured Orchestrator agent.
"""
# --- Sub-agent setup ---
handoffs = [
FinancialAgent.create(),
NewsAgent.create(),
SearchAgent.create(),
]
# --- Behavioral instructions ---
instructions = """
You are the Orchestrator Agent responsible for coordinating specialized sub-agents
to generate accurate and well-rounded market research responses.
**Your Core Responsibilities**
1. **Task Routing:** Determine which sub-agent (Financial, News, or Search) is best suited
to handle each user query based on intent and context.
2. **Delegation:** Forward the request to the appropriate sub-agent and wait for its result.
3. **Synthesis:** When multiple agents provide responses, summarize and merge their findings
into a clear, concise, and accurate overall answer.
4. **Recency and Accuracy:** Prioritize the most up-to-date, verifiable data from sub-agents.
5. **Transparency:** Clearly identify which insights came from which sub-agent when relevant.
6. **Error Handling:** If a sub-agent fails or provides insufficient data, attempt fallback
strategies such as rerouting the query or notifying the user.
7. **Clarity:** Always present the final response in a professional, well-structured,
and easy-to-understand format.
⚠️ Do **not** perform the underlying data analysis or external lookup yourself —
ALWAYS delegate those tasks to the respective sub-agents.
"""
# --- Model setup ---
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/"
google_api_key = os.getenv("GOOGLE_API_KEY")
gemini_client = AsyncOpenAI(base_url=GEMINI_BASE_URL, api_key=google_api_key)
gemini_model = OpenAIChatCompletionsModel(
model="gemini-2.0-flash",
openai_client=gemini_client
)
# --- Create orchestrator agent ---
agent = Agent(
name="AI Market Research Assistant",
handoffs=handoffs,
instructions=instructions.strip(),
model=gemini_model,
# input_guardrails=[
# InputGuardrail(
# name="Input Validation Guardrail",
# guardrail_function=input_validation_guardrail,
# )
# ],
)
# Attach orchestration logic
agent.respond = lambda prompt: OrchestratorAgent.respond(prompt, handoffs, gemini_model)
return agent
# ----------------------------------------------------------
# RESPONSE HANDLING + SELF-CORRECTION
# ----------------------------------------------------------
@staticmethod
async def respond(prompt: str, handoffs: list, model) -> str:
"""
Routes prompt to the most relevant agent, retries if output seems irrelevant.
"""
attempted_agents = set()
for attempt in range(OrchestratorAgent.MAX_RETRIES):
# Step 1: Route intelligently
chosen_agent = await OrchestratorAgent._route_to_agent(prompt, handoffs, attempted_agents)
if not chosen_agent:
return "⚠️ No available agent could handle this query."
print(f"🤖 Attempt {attempt+1}: Sending query to {chosen_agent.name}")
# Step 2: Run agent
try:
response = await chosen_agent.run(prompt)
except Exception as e:
print(f"⚠️ Agent {chosen_agent.name} failed: {e}")
attempted_agents.add(chosen_agent.name)
continue
# Step 3: Evaluate if relevant
if await OrchestratorAgent._is_relevant(prompt, response, model):
return f"✅ {chosen_agent.name} handled this successfully:\n\n{response}"
print(f"🔁 {chosen_agent.name}'s response deemed irrelevant. Re-routing...")
attempted_agents.add(chosen_agent.name)
return "⚠️ Could not find a relevant answer after multiple attempts."
# ----------------------------------------------------------
# ROUTING LOGIC
# ----------------------------------------------------------
@staticmethod
async def _route_to_agent(prompt: str, handoffs: list, attempted_agents: set):
"""
Determines the best-fit agent for the given prompt.
Avoids previously tried agents.
"""
lowered = prompt.lower()
available = [a for a in handoffs if a.name not in attempted_agents]
if not available:
return None
if any(k in lowered for k in ["finance", "stock", "market", "earnings"]):
return next((a for a in available if "financial" in a.name.lower()), available[0])
elif any(k in lowered for k in ["news", "headline", "press release"]):
return next((a for a in available if "news" in a.name.lower()), available[0])
elif any(k in lowered for k in ["search", "find", "lookup", "discover"]):
return next((a for a in available if "search" in a.name.lower()), available[0])
else:
# fallback — first available agent
return available[0]
# ----------------------------------------------------------
# LLM-BASED EVALUATOR
# ----------------------------------------------------------
@staticmethod
async def _is_relevant(prompt: str, response: str, model) -> bool:
"""
Uses the model itself to check if the response matches the prompt intent.
"""
eval_prompt = f"""
You are an evaluator checking multi-agent responses.
User asked: "{prompt}"
Agent responded: "{response}"
Does this response accurately and completely answer the user's intent?
Reply with only 'yes' or 'no'.
"""
try:
eval_result = await model.run(eval_prompt)
print(f"🧠 Evaluation result: {eval_result}")
return "yes" in eval_result.lower()
except Exception as e:
print(f"⚠️ Evaluation failed: {e}")
return False
|