chatbot-app / src /chatbot /aagents /orchestrator_agent.py
mishrabp's picture
Upload folder using huggingface_hub
2cb39a9 verified
import os
import asyncio
from common.aagents.search_agent import search_agent
from common.aagents.news_agent import news_agent
from common.aagents.yf_agent import yf_agent
from aagents.input_validation_agent import input_validation_guardrail
from agents import Agent, OpenAIChatCompletionsModel, Runner, function_tool
from openai import AsyncOpenAI
from core.model import get_model_client
# ----------------------------------------------------------
# PARALLEL EXECUTION TOOL
# ----------------------------------------------------------
@function_tool
async def prompt_broadcaster(query: str, include_finance: bool = True, include_news: bool = True, include_search: bool = True) -> str:
"""
Broadcasts the search query to selected specialized agents in parallel and aggregates their responses.
Args:
query: The user's question or topic to research.
include_finance: Whether to include the Yahoo Finance agent (default: True).
include_news: Whether to include the News agent (default: True).
include_search: Whether to include the Web Search agent (default: True).
Returns:
Combined reports from the selected agents.
"""
# A better approach for variable tasks is to map them.
active_agents = []
if include_finance: active_agents.append(("YahooFinanceAgent", Runner.run(yf_agent, query)))
if include_news: active_agents.append(("NewsAgent", Runner.run(news_agent, query)))
if include_search: active_agents.append(("WebSearchAgent", Runner.run(search_agent, query)))
if not active_agents:
return "No agents were selected for this query."
# Run in parallel
agent_names = [name for name, _ in active_agents]
coroutines = [coro for _, coro in active_agents]
results = await asyncio.gather(*coroutines, return_exceptions=True)
outputs = []
for name, res in zip(agent_names, results):
if isinstance(res, Exception):
outputs.append(f"❌ {name} Error: {str(res)}")
else:
outputs.append(f"✅ {name} Report:\n{res.final_output}")
combined_response = "\n--- START OF AGENT REPORTS ---\n\n" + "\n\n-----------------------------------\n\n".join(outputs) + "\n\n--- END OF AGENT REPORTS ---"
return combined_response
orchestrator_agent = Agent(
name="AI Chat Orchestrator",
tools=[prompt_broadcaster],
instructions="""
You are the **AI Chat Orchestrator**.
Your goal is to provide a comprehensive, multi-perspective answer by synthesizing data from specialized sub-agents.
**Workflow**:
1. **Analyze Request**: Understand the user's question.
2. **Determine Needs**: Decide which specialized agents are required.
* **Finance**: For stock prices, market trends, company financials, or analyst ratings.
* **News**: For recent events, headlines, or breaking news.
* **Web Search**: For general knowledge, history, facts, or broad research.
3. **Broadcast Query**: Call the `prompt_broadcaster` tool with the `query` and set the `include_*` flags to True/False accordingly.
* *Optimization Tip*: efficiently select ONLY the necessary agents to reduce latency.
4. **Synthesize Results**: Read the returned "Agent Reports".
* Combine the financial data (prices, sentiment), news headlines, and general search context.
* Compare and contrast findings if necessary.
* Resolve conflicts by prioritizing specific data (e.g., Yahoo Finance for prices) over general text.
5. **Final Response**: Generate a clear, professional, and well-structured summary for the user. Do not simply paste the individual reports.
**Final Response Structure**:
You should adapt the response structure based on the user's query type:
* **For Market/Finance Queries**: Use the "Market Analysis" style with a Financial Snapshot (Price, Sentinel, Ratings), Key Developments, and Synthesis.
* **For News/Research**: Use a clear "Executive Summary" followed by "Key Findings" and "Details".
* **For General Chat**: Maintain a conversational but professional tone. Use markdown for clarity (bullet points, bold text).
* **For Coding Requests**: Provide clear code blocks and explanations.
**Constraint**:
* Do NOT try to answer based on your own knowledge if live data is needed/requested.
* Use `prompt_broadcaster` when the query implies a need for external information.
* If agents return "No data", explicitly state that in the relevant section.
""",
model=get_model_client(),
)
orchestrator_agent.description = "An intelligent orchestrator that queries Finance, News, and Search agents in parallel and synthesizes a comprehensive response."
__all__ = ["orchestrator_agent"]