Spaces:
Runtime error
Runtime error
| import warnings | |
| import os | |
| import json | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain import PromptTemplate | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.vectorstores import Chroma | |
| from langchain.chains import RetrievalQA | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| import gradio as gr | |
| from docx import Document | |
| import PyPDF2 | |
| import csv | |
| import google.generativeai as genai | |
| warnings.filterwarnings("ignore") | |
| global context | |
| def extract_text_from_pdf(pdf_path): | |
| with open(pdf_path, "rb") as f: | |
| pdf_reader = PyPDF2.PdfReader(f) | |
| text = "" | |
| for page_num in range(len(pdf_reader.pages)): | |
| page = pdf_reader.pages[page_num] | |
| text += page.extract_text() | |
| return text | |
| def extract_text_from_docx(docx_path): | |
| doc = Document(docx_path) | |
| full_text = [] | |
| for para in doc.paragraphs: | |
| full_text.append(para.text) | |
| return '\n\n'.join(full_text) | |
| def extract_text_from_txt(txt_path): | |
| try: | |
| with open(txt_path, "r", encoding='utf-8') as f: | |
| return f.read() | |
| except UnicodeDecodeError: | |
| with open(txt_path, "r", encoding='latin-1') as f: | |
| return f.read() | |
| def extract_text_from_json(json_path): | |
| with open(json_path, "r", encoding='utf-8') as f: | |
| try: | |
| data = json.load(f) | |
| if not data: | |
| return "" | |
| return json.dumps(data, indent=4) | |
| except json.JSONDecodeError: | |
| return "" | |
| def read_and_structure_csv(csv_path): | |
| structured_data = [] | |
| with open(csv_path, mode='r', encoding='utf-8-sig') as file: | |
| csv_reader = csv.DictReader(file) | |
| for row in csv_reader: | |
| plan_details = f"plan_type: {row['plan_type']}\n" | |
| for key, value in row.items(): | |
| if key != 'plan_type': | |
| plan_details += f" - **{key.replace('_', ' ').title()}**: {value}\n" | |
| structured_data.append(plan_details) | |
| return "\n\n".join(structured_data) | |
| file_paths = ["./Final Medigap - Medigap Generic Plan Details - Medigap Generic Plan Details CSV.csv","./finalll - Sheet1.csv"] | |
| texts1 = [] | |
| for path in file_paths: | |
| if path.endswith(".pdf"): | |
| texts1.append(extract_text_from_pdf(path)) | |
| elif path.endswith(".docx"): | |
| texts1.append(extract_text_from_docx(path)) | |
| elif path.endswith(".txt"): | |
| txt_content = extract_text_from_txt(path) | |
| texts1.append(txt_content) | |
| elif path.endswith(".csv"): | |
| texts1.append(read_and_structure_csv(path)) | |
| elif path.endswith(".json"): | |
| texts1.append(extract_text_from_json(path)) | |
| context = "\n\n".join(texts1) | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=11000, chunk_overlap=1700) | |
| texts = text_splitter.split_text(context) | |
| print(texts,"\n\n") | |
| api_key = "AIzaSyCPcim6R35JkDYF30wvcNGd0y32SzGGcoM" | |
| if not api_key: | |
| raise ValueError("API key not found. Please set your GEMINI_API_KEY in the environment.") | |
| model = ChatGoogleGenerativeAI( | |
| model="gemini-1.5-pro", | |
| google_api_key=api_key, | |
| temperature=0.1, | |
| convert_system_message_to_human=True | |
| ) | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) | |
| vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5}) | |
| # Create QA chain | |
| template = """You are a highly knowledgeable and detail-oriented medical assistant specializing in insurance plans. Your task is to answer questions to user and recommend only those insurance plans that strictly align with all the needs and preferences provided by the user.Recommend all thoes plans that align with the user's needs .Don't miss any plan.Recommend only the plans that meet the user's preferences\n | |
| Ensure that each recommended plan meets every single requirement specified by the user. Do not recommend plans that only partially meet the requirements.\n | |
| Use only the information provided in the context. Do not generate any information that is not explicitly mentioned in the context.Recommend all the plans that align with the user's needs .Don't miss any plan\n | |
| Context: | |
| {context} | |
| Question: {question} | |
| Helpful Answer:""" | |
| QA_CHAIN_PROMPT = PromptTemplate.from_template(template) | |
| qa_chain = RetrievalQA.from_chain_type( | |
| model, | |
| retriever=vector_index, | |
| return_source_documents=True, | |
| chain_type_kwargs={"prompt": QA_CHAIN_PROMPT} | |
| ) | |
| history_file = "history2.json" | |
| def load_history(): | |
| if os.path.exists(history_file): | |
| with open(history_file, "r") as f: | |
| try: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| return data | |
| except json.JSONDecodeError: | |
| pass | |
| return [] | |
| def save_history(history): | |
| with open(history_file, "w") as f: | |
| json.dump(history, f, indent=4) | |
| history = load_history() | |
| def summarize_history(history): | |
| os.environ['GOOGLE_API_KEY'] = "AIzaSyCqEKwd23ztVuk-dkCXypjeHWlcs41aCSM" | |
| genai.configure(api_key = os.environ['GOOGLE_API_KEY']) | |
| # Summarize the user's preferences from the chat history | |
| user_history = "\n".join([entry["USER"] for entry in history]) | |
| '''prompt = f"Summarize only the important points related to a chat bot \n\n{user_history}" | |
| model1=genai.GenerativeModel('gemini-pro') | |
| summary_response = model1.generate_content(prompt) | |
| print(summary_response.text,"summary_response") | |
| summary = summary_response.text''' | |
| return user_history | |
| def handle_exit(): | |
| global history | |
| history = [] | |
| save_history(history) | |
| return f"""Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started! Here are some questions you can ask: | |
| \n• List down the items that are not included in Medicare | |
| \n• list down plans that cover Part B excess charges | |
| \n• My preferences are coverage for blood.""", "" | |
| def ask_question(question): | |
| global history | |
| if question.strip().lower() == "exit": | |
| history = [] | |
| save_history(history) | |
| return f"""Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started! Here are some questions you can ask: | |
| \n• List down the items that are not included in Medicare | |
| \n• list down plans that cover Part B excess charges | |
| \n• My preferences are coverage for blood.""" | |
| with open("./chat_history.txt", "a") as f: | |
| f.write(f"USER: {question}\n") | |
| result = qa_chain({"query": question}) | |
| answer = result["result"] | |
| history.append({"USER": question, "answer": answer}) | |
| save_history(history) | |
| # Summarize the chat history | |
| summary = summarize_history(history) | |
| # Combine the context and summary for the text splitter | |
| combined_text = context + "\n\n" +summary | |
| print(combined_text,"combined_text\n\n") | |
| texts = text_splitter.split_text(combined_text) | |
| vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5}) | |
| history_md = "" | |
| for entry in history: | |
| history_md += f"**USER:** {entry['USER']}\n\n**BOT:** {entry['answer']}\n\n---\n\n" | |
| return history_md | |
| initial_history_md = "" | |
| if not history: | |
| initial_history_md = f"""Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started! Here are some questions you can ask: | |
| \n• List down the items that are not included in Medicare | |
| \n• List down plans that cover Part B excess charges | |
| \n• My preferences are coverage for blood.""" | |
| else: | |
| for entry in history: | |
| initial_history_md += f"**USER:** {entry['USER']}\n\n**BOT:** {entry['answer']}\n\n---\n\n" | |
| with gr.Blocks() as demo: | |
| gr.HTML( | |
| """ | |
| <style> | |
| .fixed-bottom { | |
| position: fixed; | |
| bottom: 0; | |
| width: 100%; | |
| padding: 10px; | |
| box-shadow: 0 -1px 10px rgba(0, 0, 0, 0.1); | |
| } | |
| .scrollable-history { | |
| max-height: 80vh; | |
| overflow-y: auto; | |
| margin-bottom: 100px; /* Space for the fixed bottom bar */ | |
| } | |
| </style> | |
| """ | |
| ) | |
| history_output = gr.Markdown(value=initial_history_md, elem_classes="scrollable-history") | |
| with gr.Row(elem_classes="fixed-bottom"): | |
| with gr.Column(): | |
| question_input = gr.Textbox(lines=2, placeholder="Type your question here...", show_label=False) | |
| submit_button = gr.Button("Submit") | |
| exit_button = gr.Button("New Chat") | |
| submit_button.click(ask_question, inputs=question_input, outputs=history_output) | |
| submit_button.click(lambda: "", None, question_input) | |
| exit_button.click(handle_exit, inputs=None, outputs=[history_output, question_input]) | |
| history_output | |
| demo.launch(share=True) | |