"""QueryPlannerService — single LLM call: question + catalog → JSON IR. Prompt: src/config/prompts/query_planner.md (system) + the human content built by `prompt.build_planner_prompt(...)`. Output: a QueryIR ready for the IRValidator. Validation + retry are the caller's concern (`QueryService` orchestrates that loop). """ from __future__ import annotations from pathlib import Path from langchain_core.messages import SystemMessage from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import Runnable from langchain_openai import AzureChatOpenAI from src.middlewares.logging import get_logger from ...catalog.models import Catalog from ..ir.models import QueryIR from .prompt import build_planner_prompt logger = get_logger("query_planner") _PROMPT_PATH = ( Path(__file__).resolve().parent.parent.parent / "config" / "prompts" / "query_planner.md" ) def _load_prompt_text() -> str: return _PROMPT_PATH.read_text(encoding="utf-8") def _build_default_chain() -> Runnable: from src.config.settings import settings llm = AzureChatOpenAI( azure_deployment=settings.azureai_deployment_name_4o, openai_api_version=settings.azureai_api_version_4o, azure_endpoint=settings.azureai_endpoint_url_4o, api_key=settings.azureai_api_key_4o, temperature=0, ) prompt = ChatPromptTemplate.from_messages( [ SystemMessage(content=_load_prompt_text()), ("human", "{human_content}"), ] ) return prompt | llm.with_structured_output(QueryIR) _default_chain: Runnable | None = None def _get_default_chain() -> Runnable: global _default_chain if _default_chain is None: _default_chain = _build_default_chain() return _default_chain class QueryPlannerService: """Wraps the LLM call with structured-output parsing into QueryIR. Inject `structured_chain` for tests. The planner prompt is composed by `build_planner_prompt(question, catalog, previous_error)` so retry callers can append the prior error context to nudge the LLM. """ def __init__(self, structured_chain: Runnable | None = None) -> None: self._chain = structured_chain def _ensure_chain(self) -> Runnable: if self._chain is None: self._chain = _get_default_chain() return self._chain async def plan( self, question: str, catalog: Catalog, previous_error: str | None = None, ) -> QueryIR: human_content = build_planner_prompt(question, catalog, previous_error) chain = self._ensure_chain() ir: QueryIR = await chain.ainvoke({"human_content": human_content}) logger.info( "query planned", source_id=ir.source_id, table_id=ir.table_id, select_n=len(ir.select), filters_n=len(ir.filters), retry=previous_error is not None, ) return ir