Dharma20's picture
Update pipeline.py
4940051 verified
from itertools import chain
from typing import Any, List
from haystack.components.converters import (
PyPDFToDocument,
MarkdownToDocument,
TextFileToDocument,
OutputAdapter
)
from haystack.components.routers import FileTypeRouter
from haystack.components.joiners import DocumentJoiner
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.writers import DocumentWriter
from haystack.components.builders import ChatPromptBuilder, PromptBuilder
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.core.component.types import Variadic
from haystack_experimental.chat_message_stores.in_memory import InMemoryChatMessageStore
from haystack_experimental.components.retrievers import ChatMessageRetriever
from haystack_experimental.components.writers import ChatMessageWriter
from haystack_integrations.components.generators.cohere import (
CohereChatGenerator,
CohereGenerator
)
from haystack.dataclasses import ChatMessage
from haystack import Pipeline, component
from constants import COHERE_CHAT_MODEL, COHERE_GEN_MODEL
import os
from dotenv import load_dotenv
load_dotenv()
os.environ["COHERE_API_KEY"] = os.getenv("COHERE_API_KEY")
document_store = InMemoryDocumentStore()
file_type_router = FileTypeRouter(
mime_types=["text/plain", "application/pdf", "text/markdown"]
)
pdf_converter = PyPDFToDocument()
text_file_converter = TextFileToDocument()
markdown_converter = MarkdownToDocument()
document_joiner = DocumentJoiner()
document_cleaner = DocumentCleaner()
document_splitter = DocumentSplitter(split_by="word", split_overlap=50)
document_embedder = SentenceTransformersDocumentEmbedder(
model="sentence-transformers/all-MiniLM-L12-v2"
)
document_writer = DocumentWriter(document_store)
preprocessing_pipeline = Pipeline()
preprocessing_pipeline.add_component("file_type_router", file_type_router)
preprocessing_pipeline.add_component("text_file_converter", text_file_converter)
preprocessing_pipeline.add_component("markdown_converter", markdown_converter)
preprocessing_pipeline.add_component("pdf_converter", pdf_converter)
preprocessing_pipeline.add_component("document_joiner", document_joiner)
preprocessing_pipeline.add_component("document_cleaner", document_cleaner)
preprocessing_pipeline.add_component("document_splitter", document_splitter)
preprocessing_pipeline.add_component("document_embedder", document_embedder)
preprocessing_pipeline.add_component("document_writer", document_writer)
preprocessing_pipeline.connect(
"file_type_router.text/plain",
"text_file_converter.sources"
)
preprocessing_pipeline.connect(
"file_type_router.application/pdf",
"pdf_converter.sources"
)
preprocessing_pipeline.connect(
"file_type_router.text/markdown",
"markdown_converter.sources"
)
preprocessing_pipeline.connect("text_file_converter", "document_joiner")
preprocessing_pipeline.connect("markdown_converter", "document_joiner")
preprocessing_pipeline.connect("pdf_converter", "document_joiner")
preprocessing_pipeline.connect("document_joiner", "document_cleaner")
preprocessing_pipeline.connect("document_cleaner", "document_splitter")
preprocessing_pipeline.connect("document_splitter", "document_embedder")
preprocessing_pipeline.connect("document_embedder", "document_writer")
@component
class ListJoiner:
def __init__(self, _type: Any):
component.set_output_types(self, values=_type)
def run(self, values: Variadic[Any]):
return {"values": list(chain(*values))}
memory_store = InMemoryChatMessageStore()
query_rephrase_template = """
Rewrite the question for search while keeping its meaning and key terms intact.
If the conversation history is empty, DO NOT change the query.
Use conversation history only if necessary, and avoid extending the query with your own knowledge.
If no changes are needed, output the current question as is.
Conversation history:
{% for memory in memories %}
{{ memory.content }}
{% endfor %}
User Query: {{ query }}
Rewritten Query:
"""
conversational_rag = Pipeline()
# Query rephrasing
conversational_rag.add_component(
"query_rephrase_prompt_builder",
PromptBuilder(query_rephrase_template)
)
conversational_rag.add_component(
"query_rephrase_llm",
CohereGenerator(
model=COHERE_GEN_MODEL,
temperature=0.0
)
)
conversational_rag.add_component(
"list_to_str_adapter",
OutputAdapter(template="{{ replies[0] }}", output_type=str)
)
# Retrieval & RAG
conversational_rag.add_component(
"retriever",
InMemoryBM25Retriever(document_store=document_store, top_k=3)
)
conversational_rag.add_component(
"prompt_builder",
ChatPromptBuilder(
variables=["query", "documents", "memories"],
required_variables=["query", "documents", "memories"]
)
)
conversational_rag.add_component(
"llm",
CohereChatGenerator(
model=COHERE_CHAT_MODEL,
temperature=0.3
)
)
# Memory
conversational_rag.add_component(
"memory_retriever",
ChatMessageRetriever(memory_store)
)
conversational_rag.add_component(
"memory_writer",
ChatMessageWriter(memory_store)
)
conversational_rag.add_component(
"memory_joiner",
ListJoiner(List[ChatMessage])
)
conversational_rag.connect(
"memory_retriever",
"query_rephrase_prompt_builder.memories"
)
conversational_rag.connect(
"query_rephrase_prompt_builder.prompt",
"query_rephrase_llm"
)
conversational_rag.connect(
"query_rephrase_llm.replies",
"list_to_str_adapter"
)
conversational_rag.connect(
"list_to_str_adapter",
"retriever.query"
)
conversational_rag.connect(
"retriever.documents",
"prompt_builder.documents"
)
conversational_rag.connect(
"memory_retriever",
"prompt_builder.memories"
)
conversational_rag.connect(
"prompt_builder.prompt",
"llm.messages"
)
conversational_rag.connect(
"llm.replies",
"memory_joiner"
)
conversational_rag.connect(
"memory_joiner",
"memory_writer"
)
system_message = ChatMessage.from_system(
"""You are an intelligent and cheerful AI assistant specialized in assisting humans with queries
based on provided supporting documents and conversation history.
Always prioritize accurate and concise answers derived from the documents.
If the answer is not present in the documents, politely inform the user."""
)
user_message_template = """
Based on the conversation history and the provided supporting documents,
provide a brief and accurate answer to the question.
- Format your response for clarity and readability.
- Supporting documents are not part of the conversation history.
- If the question cannot be answered using the documents, respond with:
"The answer is not available in the provided documents."
Conversation History:
{% for memory in memories %}
{{ memory.content }}
{% endfor %}
Supporting Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{ query }}
Answer:
"""
user_message = ChatMessage.from_user(user_message_template)