Spaces:
Running
Running
| import os | |
| import asyncio | |
| from appagents.FinancialAgent import FinancialAgent | |
| from appagents.NewsAgent import NewsAgent | |
| from appagents.SearchAgent import SearchAgent | |
| from appagents.InputValidationAgent import input_validation_guardrail | |
| from agents import Agent, OpenAIChatCompletionsModel, InputGuardrail | |
| from openai import AsyncOpenAI | |
| class OrchestratorAgent: | |
| """ | |
| The OrchestratorAgent coordinates multiple specialized sub-agents | |
| (Financial, News, and Search) to provide accurate, up-to-date, | |
| and well-routed market research insights. | |
| """ | |
| MAX_RETRIES = 2 | |
| # ---------------------------------------------------------- | |
| # MAIN CREATION METHOD | |
| # ---------------------------------------------------------- | |
| def create(model: str = "gpt-4o-mini"): | |
| """ | |
| Creates and returns a configured Orchestrator agent. | |
| """ | |
| # --- Sub-agent setup --- | |
| handoffs = [ | |
| FinancialAgent.create(), | |
| NewsAgent.create(), | |
| SearchAgent.create(), | |
| ] | |
| # --- Behavioral instructions --- | |
| instructions = """ | |
| You are the Orchestrator Agent responsible for coordinating specialized sub-agents | |
| to generate accurate and well-rounded market research responses. | |
| **Your Core Responsibilities** | |
| 1. **Task Routing:** Determine which sub-agent (Financial, News, or Search) is best suited | |
| to handle each user query based on intent and context. | |
| 2. **Delegation:** Forward the request to the appropriate sub-agent and wait for its result. | |
| 3. **Synthesis:** When multiple agents provide responses, summarize and merge their findings | |
| into a clear, concise, and accurate overall answer. | |
| 4. **Recency and Accuracy:** Prioritize the most up-to-date, verifiable data from sub-agents. | |
| 5. **Transparency:** Clearly identify which insights came from which sub-agent when relevant. | |
| 6. **Error Handling:** If a sub-agent fails or provides insufficient data, attempt fallback | |
| strategies such as rerouting the query or notifying the user. | |
| 7. **Clarity:** Always present the final response in a professional, well-structured, | |
| and easy-to-understand format. | |
| ⚠️ Do **not** perform the underlying data analysis or external lookup yourself — | |
| ALWAYS delegate those tasks to the respective sub-agents. | |
| """ | |
| # --- Model setup --- | |
| GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/" | |
| google_api_key = os.getenv("GOOGLE_API_KEY") | |
| gemini_client = AsyncOpenAI(base_url=GEMINI_BASE_URL, api_key=google_api_key) | |
| gemini_model = OpenAIChatCompletionsModel( | |
| model="gemini-2.0-flash", | |
| openai_client=gemini_client | |
| ) | |
| # --- Create orchestrator agent --- | |
| agent = Agent( | |
| name="AI Market Research Assistant", | |
| handoffs=handoffs, | |
| instructions=instructions.strip(), | |
| model=gemini_model, | |
| # input_guardrails=[ | |
| # InputGuardrail( | |
| # name="Input Validation Guardrail", | |
| # guardrail_function=input_validation_guardrail, | |
| # ) | |
| # ], | |
| ) | |
| # Attach orchestration logic | |
| agent.respond = lambda prompt: OrchestratorAgent.respond(prompt, handoffs, gemini_model) | |
| return agent | |
| # ---------------------------------------------------------- | |
| # RESPONSE HANDLING + SELF-CORRECTION | |
| # ---------------------------------------------------------- | |
| async def respond(prompt: str, handoffs: list, model) -> str: | |
| """ | |
| Routes prompt to the most relevant agent, retries if output seems irrelevant. | |
| """ | |
| attempted_agents = set() | |
| for attempt in range(OrchestratorAgent.MAX_RETRIES): | |
| # Step 1: Route intelligently | |
| chosen_agent = await OrchestratorAgent._route_to_agent(prompt, handoffs, attempted_agents) | |
| if not chosen_agent: | |
| return "⚠️ No available agent could handle this query." | |
| print(f"🤖 Attempt {attempt+1}: Sending query to {chosen_agent.name}") | |
| # Step 2: Run agent | |
| try: | |
| response = await chosen_agent.run(prompt) | |
| except Exception as e: | |
| print(f"⚠️ Agent {chosen_agent.name} failed: {e}") | |
| attempted_agents.add(chosen_agent.name) | |
| continue | |
| # Step 3: Evaluate if relevant | |
| if await OrchestratorAgent._is_relevant(prompt, response, model): | |
| return f"✅ {chosen_agent.name} handled this successfully:\n\n{response}" | |
| print(f"🔁 {chosen_agent.name}'s response deemed irrelevant. Re-routing...") | |
| attempted_agents.add(chosen_agent.name) | |
| return "⚠️ Could not find a relevant answer after multiple attempts." | |
| # ---------------------------------------------------------- | |
| # ROUTING LOGIC | |
| # ---------------------------------------------------------- | |
| async def _route_to_agent(prompt: str, handoffs: list, attempted_agents: set): | |
| """ | |
| Determines the best-fit agent for the given prompt. | |
| Avoids previously tried agents. | |
| """ | |
| lowered = prompt.lower() | |
| available = [a for a in handoffs if a.name not in attempted_agents] | |
| if not available: | |
| return None | |
| if any(k in lowered for k in ["finance", "stock", "market", "earnings"]): | |
| return next((a for a in available if "financial" in a.name.lower()), available[0]) | |
| elif any(k in lowered for k in ["news", "headline", "press release"]): | |
| return next((a for a in available if "news" in a.name.lower()), available[0]) | |
| elif any(k in lowered for k in ["search", "find", "lookup", "discover"]): | |
| return next((a for a in available if "search" in a.name.lower()), available[0]) | |
| else: | |
| # fallback — first available agent | |
| return available[0] | |
| # ---------------------------------------------------------- | |
| # LLM-BASED EVALUATOR | |
| # ---------------------------------------------------------- | |
| async def _is_relevant(prompt: str, response: str, model) -> bool: | |
| """ | |
| Uses the model itself to check if the response matches the prompt intent. | |
| """ | |
| eval_prompt = f""" | |
| You are an evaluator checking multi-agent responses. | |
| User asked: "{prompt}" | |
| Agent responded: "{response}" | |
| Does this response accurately and completely answer the user's intent? | |
| Reply with only 'yes' or 'no'. | |
| """ | |
| try: | |
| eval_result = await model.run(eval_prompt) | |
| print(f"🧠 Evaluation result: {eval_result}") | |
| return "yes" in eval_result.lower() | |
| except Exception as e: | |
| print(f"⚠️ Evaluation failed: {e}") | |
| return False | |