rag-based-search / src /utils /chatbot.py
sid-0313's picture
Create src/utils/chatbot.py
0fefc76 verified
import time
from openai import OpenAI
import os
from langchain_community.vectorstores import Chroma
from typing import List, Tuple
import re
import ast
import html
from utils.load_config import LoadConfig
client = OpenAI()
APPCFG = LoadConfig()
class ChatBot:
@staticmethod
def respond(chatbot: List, message: str, data_type: str = "Preprocessed doc", temperature: float = 0.0) -> Tuple:
if data_type == "Preprocessed doc":
# directories
if os.path.exists(APPCFG.persist_directory):
vectordb = Chroma(persist_directory=APPCFG.persist_directory,
embedding_function=APPCFG.embedding_model)
else:
chatbot.append(
(message, f"VectorDB does not exist. Please first execute the 'upload_data_manually.py' module."))
return "", chatbot, None
elif data_type == "Upload doc: Process for RAG":
if os.path.exists(APPCFG.custom_persist_directory):
vectordb = Chroma(persist_directory=APPCFG.custom_persist_directory,
embedding_function=APPCFG.embedding_model)
else:
chatbot.append(
(message, f"No file was uploaded. Please first upload your files using the 'upload' button."))
return "", chatbot, None
docs = vectordb.similarity_search(message, k=APPCFG.k)
print(docs)
question = "# User new question:\n" + message
retrieved_content = ChatBot.clean_references(docs)
# Memory: previous two Q&A pairs
chat_history = f"Chat history:\n {str(chatbot[-APPCFG.number_of_q_a_pairs:])}\n\n"
prompt = f"{chat_history}{retrieved_content}{question}"
print("========================")
print(prompt)
response = client.chat.completions.create(
model=APPCFG.llm_engine,
messages=[
{"role": "system", "content": APPCFG.llm_system_role},
{"role": "user", "content": prompt}
],
temperature=temperature,
# stream=False
)
chatbot.append(
(message, response.choices[0].message.content))
time.sleep(2)
return "", chatbot, retrieved_content
@staticmethod
def clean_references(documents: List) -> str:
server_url = "http://localhost:8000"
documents = [str(x)+"\n\n" for x in documents]
markdown_documents = ""
counter = 1
for doc in documents:
# Extract content and metadata
content, metadata = re.match(
r"page_content=(.*?)( metadata=\{.*\})", doc).groups()
metadata = metadata.split('=', 1)[1]
metadata_dict = ast.literal_eval(metadata)
# Decode newlines and other escape sequences
content = bytes(content, "utf-8").decode("unicode_escape")
# Replace escaped newlines with actual newlines
content = re.sub(r'\\n', '\n', content)
# Remove special tokens
content = re.sub(r'\s*<EOS>\s*<pad>\s*', ' ', content)
# Remove any remaining multiple spaces
content = re.sub(r'\s+', ' ', content).strip()
# Decode HTML entities
content = html.unescape(content)
# Replace incorrect unicode characters with correct ones
content = content.encode('latin1').decode('utf-8', 'ignore')
# Remove or replace special characters and mathematical symbols
# This step may need to be customized based on the specific symbols in your documents
content = re.sub(r'–', '-', content)
content = re.sub(r'∈', '∈', content)
content = re.sub(r'×', '×', content)
content = re.sub(r'fi', 'fi', content)
content = re.sub(r'∈', '∈', content)
content = re.sub(r'·', '·', content)
content = re.sub(r'fl', 'fl', content)
pdf_url = f"{server_url}/{os.path.basename(metadata_dict['source'])}"
# Append cleaned content to the markdown string with two newlines between documents
markdown_documents += f"# Retrieved content {counter}:\n" + content + "\n\n" + \
f"Source: {os.path.basename(metadata_dict['source'])}" + " | " +\
f"Page number: {str(metadata_dict['page'])}" + " | " +\
f"[View PDF]({pdf_url})" "\n\n"
counter += 1
return markdown_documents