medicare / app.py
athmikha's picture
Update app.py
2d05ef2 verified
import warnings
import os
import json
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import PromptTemplate
from langchain.chains.question_answering import load_qa_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import gradio as gr
import PyPDF2
import csv
import google.generativeai as genai
warnings.filterwarnings("ignore")
global context
# Define functions for extracting text from different file types
def extract_text_from_pdf(pdf_path):
with open(pdf_path, "rb") as f:
pdf_reader = PyPDF2.PdfReader(f)
text = ""
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
text += page.extract_text()
return text
def extract_text_from_txt(txt_path):
try:
with open(txt_path, "r", encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
with open(txt_path, "r", encoding='latin-1') as f:
return f.read()
def extract_text_from_json(json_path):
with open(json_path, "r", encoding='utf-8') as f:
try:
data = json.load(f)
if not data:
return ""
return json.dumps(data, indent=4)
except json.JSONDecodeError:
return ""
def read_and_structure_csv(csv_path):
structured_data = []
with open(csv_path, mode='r', encoding='utf-8-sig') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
plan_details = f"plan_type: {row['plan_type']}\n"
for key, value in row.items():
if key != 'plan_type':
plan_details += f" - **{key.replace('_', ' ').title()}**: {value}\n"
structured_data.append(plan_details)
return "\n\n".join(structured_data)
# Initial setup: load files and extract text
file_paths = ["./Final Medigap - Medigap Generic Plan Details - Medigap Generic Plan Details CSV.csv","finalll - Sheet1.csv"]
texts1 = []
for path in file_paths:
if path.endswith(".pdf"):
texts1.append(extract_text_from_pdf(path))
elif path.endswith(".txt"):
texts1.append(extract_text_from_txt(path))
elif path.endswith(".csv"):
texts1.append(read_and_structure_csv(path))
elif path.endswith(".json"):
texts1.append(extract_text_from_json(path))
context = "\n\n".join(texts1)
# Initialize text splitter and vector index
text_splitter = RecursiveCharacterTextSplitter(chunk_size=11000, chunk_overlap=1700)
texts = text_splitter.split_text(context)
api_key = "AIzaSyD4_iUnGy_ySlp1NCprIjhHL3iw25Ypreo"
if not api_key:
raise ValueError("API key not found. Please set your GEMINI_API_KEY in the environment.")
model = ChatGoogleGenerativeAI(
model="gemini-1.5-pro",
google_api_key=api_key,
temperature=0.1,
convert_system_message_to_human=True
)
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key)
vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5})
# Create QA chain
template = """You are a highly knowledgeable and detail-oriented medical assistant specializing in recommending insurance plans.\n
Ensure that each recommended plan meets every single requirement specified by the user.\n
Use only the information provided in the context. Do not generate any information that is not explicitly mentioned in the context\n
Context:
{context}
Question: {question}
Helpful Answer:"""
QA_CHAIN_PROMPT = PromptTemplate.from_template(template)
qa_chain = RetrievalQA.from_chain_type(
model,
retriever=vector_index,
return_source_documents=True,
chain_type_kwargs={"prompt": QA_CHAIN_PROMPT}
)
# History management
history_file = "./history2.json"
def load_history():
if os.path.exists(history_file):
with open(history_file, "r") as f:
try:
data = json.load(f)
if isinstance(data, list):
return data
except json.JSONDecodeError:
pass
return []
def save_history(history):
with open(history_file, "w") as f:
json.dump(history, f, indent=4)
history = load_history()
def userPreference():
genai.configure(api_key="AIzaSyD4_iUnGy_ySlp1NCprIjhHL3iw25Ypreo")
user_history = extract_text_from_json("./history2.json")
print(user_history,"user history\n\n\n")
generation_config = {
"temperature": 0.9,
"top_p": 1,
"max_output_tokens": 2048,
"response_mime_type": "text/plain",
}
model1 = genai.GenerativeModel(
model_name="gemini-1.0-pro",
generation_config=generation_config,
)
chat_session = model1.start_chat(
history=[]
)
response = chat_session.send_message(f"""{str(user_history)}
Take user's preference only if they mention as want ,prefer or preference, etc.List user's preference.when user ask's to list something don't take it as preference.
""")
print("\n\n\n",response.text,"response \n\n")
return response.text
def ask_question(question):
global history, context, vector_index
if question.strip().lower() == "exit":
history = []
save_history(history)
# Reinitialize context and vector index
context = "\n\n".join(texts1)
texts = text_splitter.split_text(context)
vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5})
return "Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started!"
with open("./chat_history.txt", "a") as f:
f.write(f"USER: {question}\n")
result = qa_chain({"query": question})
answer = result["result"]
history.append({"USER": question, "answer": answer})
save_history(history)
pref = userPreference()
print("\n\n",pref,"pref\n\n\n")
context = "\n\n".join(texts1)
context1 = context +"USER'S PREFERENCE"+pref
texts = text_splitter.split_text(context1)
vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5})
history_md = ""
for entry in history:
history_md += f"**USER:** {entry['USER']}\n\n**BOT:** {entry['answer']}\n\n---\n\n"
return history_md
initial_history_md = ""
if not history:
initial_history_md = "Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started!"
else:
for entry in history:
initial_history_md += f"**USER:** {entry['USER']}\n\n**BOT:** {entry['answer']}\n\n---\n\n"
with gr.Blocks() as demo:
gr.HTML(
"""
<style>
.fixed-bottom {
position: fixed;
bottom: 0;
width: 100%;
padding: 10px;
box-shadow: 0 -1px 10px rgba(0, 0, 0, 0.1);
}
.scrollable-history {
max-height: 80vh;
overflow-y: auto;
margin-bottom: 100px;
}
</style>
"""
)
history_output = gr.Markdown(value=initial_history_md, elem_classes="scrollable-history")
with gr.Row(elem_classes="fixed-bottom"):
with gr.Column():
question_input = gr.Textbox(lines=2, placeholder="Type your question here...", show_label=False)
submit_button = gr.Button("Submit")
submit_button.click(ask_question, inputs=question_input, outputs=history_output)
submit_button.click(lambda: "", None, question_input)
history_output
demo.launch()