chatbot-app / src /chatbot /appagents /OrchestratorAgent.py
mishrabp's picture
Upload folder using huggingface_hub
6b50ab8 verified
import os
import asyncio
from appagents.FinancialAgent import FinancialAgent
from appagents.NewsAgent import NewsAgent
from appagents.SearchAgent import SearchAgent
from appagents.InputValidationAgent import input_validation_guardrail
from agents import Agent, OpenAIChatCompletionsModel, InputGuardrail
from openai import AsyncOpenAI
class OrchestratorAgent:
"""
The OrchestratorAgent coordinates multiple specialized sub-agents
(Financial, News, and Search) to provide accurate, up-to-date,
and well-routed market research insights.
"""
MAX_RETRIES = 2
# ----------------------------------------------------------
# MAIN CREATION METHOD
# ----------------------------------------------------------
@staticmethod
def create(model: str = "gpt-4o-mini"):
"""
Creates and returns a configured Orchestrator agent.
"""
# --- Sub-agent setup ---
handoffs = [
FinancialAgent.create(),
NewsAgent.create(),
SearchAgent.create(),
]
# --- Behavioral instructions ---
instructions = """
You are the Orchestrator Agent responsible for coordinating specialized sub-agents
to generate accurate and well-rounded market research responses.
**Your Core Responsibilities**
1. **Task Routing:** Determine which sub-agent (Financial, News, or Search) is best suited
to handle each user query based on intent and context.
2. **Delegation:** Forward the request to the appropriate sub-agent and wait for its result.
3. **Synthesis:** When multiple agents provide responses, summarize and merge their findings
into a clear, concise, and accurate overall answer.
4. **Recency and Accuracy:** Prioritize the most up-to-date, verifiable data from sub-agents.
5. **Transparency:** Clearly identify which insights came from which sub-agent when relevant.
6. **Error Handling:** If a sub-agent fails or provides insufficient data, attempt fallback
strategies such as rerouting the query or notifying the user.
7. **Clarity:** Always present the final response in a professional, well-structured,
and easy-to-understand format.
⚠️ Do **not** perform the underlying data analysis or external lookup yourself —
ALWAYS delegate those tasks to the respective sub-agents.
"""
# --- Model setup ---
GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/"
google_api_key = os.getenv("GOOGLE_API_KEY")
gemini_client = AsyncOpenAI(base_url=GEMINI_BASE_URL, api_key=google_api_key)
gemini_model = OpenAIChatCompletionsModel(
model="gemini-2.0-flash",
openai_client=gemini_client
)
# --- Create orchestrator agent ---
agent = Agent(
name="AI Market Research Assistant",
handoffs=handoffs,
instructions=instructions.strip(),
model=gemini_model,
# input_guardrails=[
# InputGuardrail(
# name="Input Validation Guardrail",
# guardrail_function=input_validation_guardrail,
# )
# ],
)
# Attach orchestration logic
agent.respond = lambda prompt: OrchestratorAgent.respond(prompt, handoffs, gemini_model)
return agent
# ----------------------------------------------------------
# RESPONSE HANDLING + SELF-CORRECTION
# ----------------------------------------------------------
@staticmethod
async def respond(prompt: str, handoffs: list, model) -> str:
"""
Routes prompt to the most relevant agent, retries if output seems irrelevant.
"""
attempted_agents = set()
for attempt in range(OrchestratorAgent.MAX_RETRIES):
# Step 1: Route intelligently
chosen_agent = await OrchestratorAgent._route_to_agent(prompt, handoffs, attempted_agents)
if not chosen_agent:
return "⚠️ No available agent could handle this query."
print(f"🤖 Attempt {attempt+1}: Sending query to {chosen_agent.name}")
# Step 2: Run agent
try:
response = await chosen_agent.run(prompt)
except Exception as e:
print(f"⚠️ Agent {chosen_agent.name} failed: {e}")
attempted_agents.add(chosen_agent.name)
continue
# Step 3: Evaluate if relevant
if await OrchestratorAgent._is_relevant(prompt, response, model):
return f"✅ {chosen_agent.name} handled this successfully:\n\n{response}"
print(f"🔁 {chosen_agent.name}'s response deemed irrelevant. Re-routing...")
attempted_agents.add(chosen_agent.name)
return "⚠️ Could not find a relevant answer after multiple attempts."
# ----------------------------------------------------------
# ROUTING LOGIC
# ----------------------------------------------------------
@staticmethod
async def _route_to_agent(prompt: str, handoffs: list, attempted_agents: set):
"""
Determines the best-fit agent for the given prompt.
Avoids previously tried agents.
"""
lowered = prompt.lower()
available = [a for a in handoffs if a.name not in attempted_agents]
if not available:
return None
if any(k in lowered for k in ["finance", "stock", "market", "earnings"]):
return next((a for a in available if "financial" in a.name.lower()), available[0])
elif any(k in lowered for k in ["news", "headline", "press release"]):
return next((a for a in available if "news" in a.name.lower()), available[0])
elif any(k in lowered for k in ["search", "find", "lookup", "discover"]):
return next((a for a in available if "search" in a.name.lower()), available[0])
else:
# fallback — first available agent
return available[0]
# ----------------------------------------------------------
# LLM-BASED EVALUATOR
# ----------------------------------------------------------
@staticmethod
async def _is_relevant(prompt: str, response: str, model) -> bool:
"""
Uses the model itself to check if the response matches the prompt intent.
"""
eval_prompt = f"""
You are an evaluator checking multi-agent responses.
User asked: "{prompt}"
Agent responded: "{response}"
Does this response accurately and completely answer the user's intent?
Reply with only 'yes' or 'no'.
"""
try:
eval_result = await model.run(eval_prompt)
print(f"🧠 Evaluation result: {eval_result}")
return "yes" in eval_result.lower()
except Exception as e:
print(f"⚠️ Evaluation failed: {e}")
return False