ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
"""OrchestratorAgent — classifies a user message and emits source_hint.
Output: needs_search (bool) + source_hint ∈ { chat, unstructured, structured }
+ rewritten_query (standalone form of the user's question, history-resolved).
Phase 2 replaces the previous intent-classification body. The class name
is preserved so existing import sites (`from src.agents.orchestration
import OrchestratorAgent`) keep working. The default LLM chain is
constructed lazily so the module is import-safe even without `.env`
populated.
"""
from __future__ import annotations
from pathlib import Path
from typing import Literal
from langchain_core.messages import BaseMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import Runnable
from langchain_openai import AzureChatOpenAI
from pydantic import BaseModel, Field
from src.middlewares.logging import get_logger
logger = get_logger("orchestrator")
SourceHint = Literal["chat", "unstructured", "structured"]
_PROMPT_PATH = (
Path(__file__).resolve().parent.parent
/ "config"
/ "prompts"
/ "intent_router.md"
)
class IntentRouterDecision(BaseModel):
"""LLM output. Pydantic so it can be used with `with_structured_output`."""
needs_search: bool = Field(
..., description="True if we must look at the user's data to answer."
)
source_hint: SourceHint = Field(
...,
description="Which downstream path: 'chat' (no lookup), "
"'unstructured' (PDF/DOCX/TXT prose), 'structured' (DB / tabular file).",
)
rewritten_query: str | None = Field(
None,
description="Standalone version of the question, history-resolved. "
"Null when needs_search=false.",
)
def _load_prompt_text() -> str:
return _PROMPT_PATH.read_text(encoding="utf-8")
def _build_default_chain() -> Runnable:
from src.config.settings import settings
llm = AzureChatOpenAI(
azure_deployment=settings.azureai_deployment_name_4o,
openai_api_version=settings.azureai_api_version_4o,
azure_endpoint=settings.azureai_endpoint_url_4o,
api_key=settings.azureai_api_key_4o,
temperature=0,
)
prompt = ChatPromptTemplate.from_messages(
[
("system", _load_prompt_text()),
MessagesPlaceholder(variable_name="history", optional=True),
("human", "{message}"),
]
)
return prompt | llm.with_structured_output(IntentRouterDecision)
class OrchestratorAgent:
"""Classifies a user message into chat / unstructured / structured.
Inject `structured_chain` for tests; default builds the production
Azure OpenAI chain on first use.
"""
def __init__(self, structured_chain: Runnable | None = None) -> None:
self._chain = structured_chain
def _ensure_chain(self) -> Runnable:
if self._chain is None:
self._chain = _build_default_chain()
return self._chain
async def classify(
self,
message: str,
history: list[BaseMessage] | None = None,
) -> IntentRouterDecision:
chain = self._ensure_chain()
decision: IntentRouterDecision = await chain.ainvoke(
{"message": message, "history": history or []}
)
logger.info(
"intent classified",
source_hint=decision.source_hint,
needs_search=decision.needs_search,
)
return decision