| import os |
| from typing import List |
| from chainlit.types import AskFileResponse |
| from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader |
| from aimakerspace.openai_utils.prompts import ( |
| UserRolePrompt, |
| SystemRolePrompt, |
| AssistantRolePrompt, |
| ) |
| from aimakerspace.openai_utils.embedding import EmbeddingModel |
| from aimakerspace.vectordatabase import VectorDatabase |
| from aimakerspace.openai_utils.chatmodel import ChatOpenAI |
| import chainlit as cl |
| import pymupdf |
|
|
| |
| |
|
|
| |
| |
| |
| |
|
|
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| system_template = """\ |
| You are a kind, helpful and polite AI. |
| |
| Make sure that you have received a question before you respond. |
| Use the following context to extract and synthesize information to answer the user's question as accurately as possible. |
| Make sure that you think through each step. |
| |
| If the answer is not found in the context: |
| 1. Politely inform the user that the information is not available. |
| 2. If possible, suggest where they might find more information or how they could rephrase their question for better clarity. |
| |
| Always aim to provide clear, concise, and helpful responses.""" |
| system_role_prompt = SystemRolePrompt(system_template) |
|
|
| user_prompt_template = """\ |
| Context: |
| {context} |
| |
| Question: |
| {question} |
| |
| Please provide a clear and concise answer that you have thought through based on the above context. |
| """ |
| user_role_prompt = UserRolePrompt(user_prompt_template) |
|
|
|
|
| class RetrievalAugmentedQAPipeline: |
| def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase) -> None: |
| self.llm = llm |
| self.vector_db_retriever = vector_db_retriever |
|
|
| async def arun_pipeline(self, user_query: str): |
| context_list = self.vector_db_retriever.search_by_text(user_query, k=4) |
|
|
| context_prompt = "" |
| for context in context_list: |
| context_prompt += context[0] + "\n" |
|
|
| formatted_system_prompt = system_role_prompt.create_message() |
|
|
| formatted_user_prompt = user_role_prompt.create_message(question=user_query, context=context_prompt) |
|
|
| async def generate_response(): |
| async for chunk in self.llm.astream([formatted_system_prompt, formatted_user_prompt]): |
| yield chunk |
|
|
| return {"response": generate_response(), "context": context_list} |
|
|
| text_splitter = CharacterTextSplitter() |
|
|
| def process_text_file(file: AskFileResponse): |
| import tempfile |
|
|
| with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as temp_file: |
| temp_file_path = temp_file.name |
|
|
| with open(temp_file_path, "wb") as f: |
| f.write(file.content) |
|
|
| text_loader = TextFileLoader(temp_file_path) |
| documents = text_loader.load_documents() |
| texts = text_splitter.split_texts(documents) |
| return texts |
|
|
| def process_pdf_file(file: AskFileResponse): |
| import tempfile |
|
|
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: |
| temp_file_path = temp_file.name |
|
|
| with open(temp_file_path, "wb") as f: |
| f.write(file.content) |
|
|
| doc = pymupdf.open(temp_file_path) |
| texts = [] |
| for page in doc: |
| texts.append(page.get_text()) |
| |
| os.remove(temp_file_path) |
| return texts |
|
|
| @cl.on_chat_start |
| async def on_chat_start(): |
| files = None |
|
|
| |
| while files == None: |
| files = await cl.AskFileMessage( |
| content="Please upload a Text or PDF file to begin!", |
| accept=["text/plain", "application/pdf"], |
| max_size_mb=2, |
| timeout=180, |
| ).send() |
|
|
| file = files[0] |
|
|
| msg = cl.Message( |
| content=f"Processing `{file.name}`...", disable_human_feedback=True |
| ) |
| await msg.send() |
|
|
| |
| if file.type == "text/plain": |
| texts = process_text_file(file) |
| elif file.type == "application/pdf": |
| texts = process_pdf_file(file) |
| else: |
| msg.content = "Unsupported file type." |
| await msg.update() |
| return |
|
|
| print(f"Processing {len(texts)} text chunks") |
|
|
| |
| vector_db = VectorDatabase() |
| vector_db = await vector_db.abuild_from_list(texts) |
| |
| chat_openai = ChatOpenAI() |
|
|
| |
| retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline( |
| vector_db_retriever=vector_db, |
| llm=chat_openai |
| ) |
| |
| |
| msg.content = f"Processing `{file.name}` done. You can now ask questions!" |
| await msg.update() |
|
|
| cl.user_session.set("chain", retrieval_augmented_qa_pipeline) |
|
|
|
|
| @cl.on_message |
| async def main(message): |
| chain = cl.user_session.get("chain") |
|
|
| msg = cl.Message(content="") |
| result = await chain.arun_pipeline(message.content) |
|
|
| async for stream_resp in result["response"]: |
| await msg.stream_token(stream_resp) |
|
|
| await msg.send() |
|
|