File size: 3,476 Bytes
6bff5d9
027123c
6bff5d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
6bff5d9
 
 
 
027123c
 
 
 
6bff5d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
 
6bff5d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
027123c
6bff5d9
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""OrchestratorAgent — classifies a user message and emits source_hint.

Output: needs_search (bool) + source_hint ∈ { chat, unstructured, structured }
+ rewritten_query (standalone form of the user's question, history-resolved).

Phase 2 replaces the previous intent-classification body. The class name
is preserved so existing import sites (`from src.agents.orchestration
import OrchestratorAgent`) keep working. The default LLM chain is
constructed lazily so the module is import-safe even without `.env`
populated.
"""

from __future__ import annotations

from pathlib import Path
from typing import Literal

from langchain_core.messages import BaseMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import Runnable
from langchain_openai import AzureChatOpenAI
from pydantic import BaseModel, Field

from src.middlewares.logging import get_logger

logger = get_logger("orchestrator")

SourceHint = Literal["chat", "unstructured", "structured"]

_PROMPT_PATH = (
    Path(__file__).resolve().parent.parent
    / "config"
    / "prompts"
    / "intent_router.md"
)


class IntentRouterDecision(BaseModel):
    """LLM output. Pydantic so it can be used with `with_structured_output`."""

    needs_search: bool = Field(
        ..., description="True if we must look at the user's data to answer."
    )
    source_hint: SourceHint = Field(
        ...,
        description="Which downstream path: 'chat' (no lookup), "
        "'unstructured' (PDF/DOCX/TXT prose), 'structured' (DB / tabular file).",
    )
    rewritten_query: str | None = Field(
        None,
        description="Standalone version of the question, history-resolved. "
        "Null when needs_search=false.",
    )


def _load_prompt_text() -> str:
    return _PROMPT_PATH.read_text(encoding="utf-8")


def _build_default_chain() -> Runnable:
    from src.config.settings import settings

    llm = AzureChatOpenAI(
        azure_deployment=settings.azureai_deployment_name_4o,
        openai_api_version=settings.azureai_api_version_4o,
        azure_endpoint=settings.azureai_endpoint_url_4o,
        api_key=settings.azureai_api_key_4o,
        temperature=0,
    )
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", _load_prompt_text()),
            MessagesPlaceholder(variable_name="history", optional=True),
            ("human", "{message}"),
        ]
    )
    return prompt | llm.with_structured_output(IntentRouterDecision)


class OrchestratorAgent:
    """Classifies a user message into chat / unstructured / structured.

    Inject `structured_chain` for tests; default builds the production
    Azure OpenAI chain on first use.
    """

    def __init__(self, structured_chain: Runnable | None = None) -> None:
        self._chain = structured_chain

    def _ensure_chain(self) -> Runnable:
        if self._chain is None:
            self._chain = _build_default_chain()
        return self._chain

    async def classify(
        self,
        message: str,
        history: list[BaseMessage] | None = None,
    ) -> IntentRouterDecision:
        chain = self._ensure_chain()
        decision: IntentRouterDecision = await chain.ainvoke(
            {"message": message, "history": history or []}
        )
        logger.info(
            "intent classified",
            source_hint=decision.source_hint,
            needs_search=decision.needs_search,
        )
        return decision