| """Orchestrator agent for intent recognition and planning.""" |
|
|
| from langchain_openai import AzureChatOpenAI |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.output_parsers import JsonOutputParser |
| from src.config.settings import settings |
| from src.middlewares.logging import get_logger |
|
|
| logger = get_logger("orchestrator") |
|
|
|
|
| class OrchestratorAgent: |
| """Orchestrator agent for intent recognition and planning.""" |
|
|
| def __init__(self): |
| self.llm = AzureChatOpenAI( |
| azure_deployment=settings.azureai_deployment_name_4o, |
| openai_api_version=settings.azureai_api_version_4o, |
| azure_endpoint=settings.azureai_endpoint_url_4o, |
| api_key=settings.azureai_api_key_4o, |
| temperature=0 |
| ) |
|
|
| self.prompt = ChatPromptTemplate.from_messages([ |
| ("system", """You are an orchestrator agent. Analyze user's message and determine: |
| |
| 1. What is user's intent? (question, greeting, goodbye, other) |
| 2. Do we need to search user's documents for relevant information? |
| 3. If search is needed, what query should we use? |
| 4. If no search needed, provide a direct response. |
| |
| Respond in JSON format with these fields: |
| - intent: string (question, greeting, goodbye, other) |
| - needs_search: boolean |
| - search_query: string (if needed) |
| - direct_response: string (if no search needed) |
| |
| Intent Routing: |
| - question -> needs_search=True, search_query=message |
| - greeting -> needs_search=False, direct_response="Hello! How can I assist you today?" |
| - goodbye -> needs_search=False, direct_response="Goodbye! Have a great day!" |
| - other -> needs_search=True, search_query=message |
| """), |
| ("user", "{message}") |
| ]) |
|
|
| self.chain = ( |
| self.prompt |
| | self.llm |
| | JsonOutputParser() |
| ) |
|
|
| async def analyze_message(self, message: str) -> dict: |
| """Analyze user message and determine next actions.""" |
| try: |
| logger.info(f"Analyzing message: {message[:50]}...") |
|
|
| result = await self.chain.ainvoke({"message": message}) |
|
|
| logger.info(f"Intent: {result.get('intent')}, Needs search: {result.get('needs_search')}") |
| return result |
|
|
| except Exception as e: |
| logger.error("Message analysis failed", error=str(e)) |
| |
| return { |
| "intent": "question", |
| "needs_search": True, |
| "search_query": message, |
| "direct_response": None |
| } |
|
|
|
|
| orchestrator = OrchestratorAgent() |
|
|