| """OrchestratorAgent — classifies a user message and emits source_hint. |
| |
| Output: needs_search (bool) + source_hint ∈ { chat, unstructured, structured } |
| + rewritten_query (standalone form of the user's question, history-resolved). |
| |
| Phase 2 replaces the previous intent-classification body. The class name |
| is preserved so existing import sites (`from src.agents.orchestration |
| import OrchestratorAgent`) keep working. The default LLM chain is |
| constructed lazily so the module is import-safe even without `.env` |
| populated. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Literal |
|
|
| from langchain_core.messages import BaseMessage |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder |
| from langchain_core.runnables import Runnable |
| from langchain_openai import AzureChatOpenAI |
| from pydantic import BaseModel, Field |
|
|
| from src.middlewares.logging import get_logger |
|
|
| logger = get_logger("orchestrator") |
|
|
| SourceHint = Literal["chat", "unstructured", "structured"] |
|
|
| _PROMPT_PATH = ( |
| Path(__file__).resolve().parent.parent |
| / "config" |
| / "prompts" |
| / "intent_router.md" |
| ) |
|
|
|
|
| class IntentRouterDecision(BaseModel): |
| """LLM output. Pydantic so it can be used with `with_structured_output`.""" |
|
|
| needs_search: bool = Field( |
| ..., description="True if we must look at the user's data to answer." |
| ) |
| source_hint: SourceHint = Field( |
| ..., |
| description="Which downstream path: 'chat' (no lookup), " |
| "'unstructured' (PDF/DOCX/TXT prose), 'structured' (DB / tabular file).", |
| ) |
| rewritten_query: str | None = Field( |
| None, |
| description="Standalone version of the question, history-resolved. " |
| "Null when needs_search=false.", |
| ) |
|
|
|
|
| def _load_prompt_text() -> str: |
| return _PROMPT_PATH.read_text(encoding="utf-8") |
|
|
|
|
| def _build_default_chain() -> Runnable: |
| from src.config.settings import settings |
|
|
| llm = AzureChatOpenAI( |
| azure_deployment=settings.azureai_deployment_name_4o, |
| openai_api_version=settings.azureai_api_version_4o, |
| azure_endpoint=settings.azureai_endpoint_url_4o, |
| api_key=settings.azureai_api_key_4o, |
| temperature=0, |
| ) |
| prompt = ChatPromptTemplate.from_messages( |
| [ |
| ("system", _load_prompt_text()), |
| MessagesPlaceholder(variable_name="history", optional=True), |
| ("human", "{message}"), |
| ] |
| ) |
| return prompt | llm.with_structured_output(IntentRouterDecision) |
|
|
|
|
| class OrchestratorAgent: |
| """Classifies a user message into chat / unstructured / structured. |
| |
| Inject `structured_chain` for tests; default builds the production |
| Azure OpenAI chain on first use. |
| """ |
|
|
| def __init__(self, structured_chain: Runnable | None = None) -> None: |
| self._chain = structured_chain |
|
|
| def _ensure_chain(self) -> Runnable: |
| if self._chain is None: |
| self._chain = _build_default_chain() |
| return self._chain |
|
|
| async def classify( |
| self, |
| message: str, |
| history: list[BaseMessage] | None = None, |
| ) -> IntentRouterDecision: |
| chain = self._ensure_chain() |
| decision: IntentRouterDecision = await chain.ainvoke( |
| {"message": message, "history": history or []} |
| ) |
| logger.info( |
| "intent classified", |
| source_hint=decision.source_hint, |
| needs_search=decision.needs_search, |
| ) |
| return decision |
|
|