| from typing import Any |
|
|
| import weave |
| from langchain_core.documents import Document |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_groq import ChatGroq |
|
|
| from rag_pipelines.prompts import RETRIEVAL_EVALUATION_PROMPT, RetrievalEvaluationResult |
|
|
|
|
| class RetrievalEvaluator: |
| """Evaluates the relevance of retrieved documents in response to a user question. |
| |
| This class uses a language model to assess whether retrieved documents are relevant |
| to answering a given question. It provides both individual document scoring and |
| batch processing capabilities through a state-based interface. |
| |
| Attributes: |
| llm (ChatGroq): Language model used for relevance evaluation |
| prompt (ChatPromptTemplate): Template for structuring the evaluation prompt |
| retrieval_evaluation_chain (Chain): Configured processing chain for evaluations |
| """ |
|
|
| def __init__(self, llm: ChatGroq) -> None: |
| """Initialize the evaluator with a language model and processing chain. |
| |
| Constructs a complete evaluation pipeline combining: |
| - Predefined prompt template |
| - Specified language model |
| - Structured output parser |
| |
| Args: |
| llm (ChatGroq): Configured ChatGroq instance for processing evaluations |
| """ |
| self.llm = llm |
| self.prompt = ChatPromptTemplate.from_messages([("system", RETRIEVAL_EVALUATION_PROMPT)]) |
| self.retrieval_evaluation_chain = self.prompt | self.llm.with_structured_output(RetrievalEvaluationResult) |
|
|
| @weave.op() |
| def score_document(self, question: str, document: Document) -> str: |
| """Evaluate a single document's relevance to a given question. |
| |
| Args: |
| question (str): User query to evaluate against |
| document (Document): Document object to assess for relevance |
| |
| Returns: |
| str: Binary relevance decision - either 'relevant' or 'irrelevant' |
| |
| Example: |
| >>> evaluator.score_document("What is AI?", Document(page_content="AI is...")) |
| 'relevant' |
| """ |
| result = self.retrieval_evaluation_chain.invoke( |
| { |
| "question": question, |
| "context": document.page_content, |
| } |
| ) |
| return result.decision |
|
|
| @weave.op() |
| def __call__(self, state: dict[str, Any]) -> dict[str, Any]: |
| """Process a state dictionary to filter relevant documents. |
| |
| Takes a state containing a question and retrieved documents, returning an updated |
| state with filtered context while preserving original documents. |
| |
| Args: |
| state (dict[str, Any]): Processing state containing: |
| - question (str): Original user question |
| - documents (List[Document]): Retrieved documents to evaluate |
| |
| Returns: |
| dict[str, Any]: Updated state with: |
| - question (str): Original question |
| - context (List[str]): Content from relevant documents |
| - documents (List[Document]): Original document list (unmodified) |
| |
| Raises: |
| KeyError: If input state is missing required keys ('question' or 'documents') |
| """ |
| question = state["question"] |
| documents = state["documents"] |
| relevant_context = [ |
| document.page_content for document in documents if self.score_document(question, document) == "relevant" |
| ] |
|
|
| return { |
| "question": question, |
| "context": relevant_context, |
| "documents": documents, |
| } |
|
|