import warnings import os import json from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.prompts import PromptTemplate from langchain.chains.question_answering import load_qa_chain from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain.chains import RetrievalQA from langchain_google_genai import GoogleGenerativeAIEmbeddings import gradio as gr import PyPDF2 import csv import google.generativeai as genai warnings.filterwarnings("ignore") global context # Define functions for extracting text from different file types def extract_text_from_pdf(pdf_path): with open(pdf_path, "rb") as f: pdf_reader = PyPDF2.PdfReader(f) text = "" for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] text += page.extract_text() return text def extract_text_from_txt(txt_path): try: with open(txt_path, "r", encoding='utf-8') as f: return f.read() except UnicodeDecodeError: with open(txt_path, "r", encoding='latin-1') as f: return f.read() def extract_text_from_json(json_path): with open(json_path, "r", encoding='utf-8') as f: try: data = json.load(f) if not data: return "" return json.dumps(data, indent=4) except json.JSONDecodeError: return "" def read_and_structure_csv(csv_path): structured_data = [] with open(csv_path, mode='r', encoding='utf-8-sig') as file: csv_reader = csv.DictReader(file) for row in csv_reader: plan_details = f"plan_type: {row['plan_type']}\n" for key, value in row.items(): if key != 'plan_type': plan_details += f" - **{key.replace('_', ' ').title()}**: {value}\n" structured_data.append(plan_details) return "\n\n".join(structured_data) # Initial setup: load files and extract text file_paths = ["./Final Medigap - Medigap Generic Plan Details - Medigap Generic Plan Details CSV.csv","finalll - Sheet1.csv"] texts1 = [] for path in file_paths: if path.endswith(".pdf"): texts1.append(extract_text_from_pdf(path)) elif path.endswith(".txt"): texts1.append(extract_text_from_txt(path)) elif path.endswith(".csv"): texts1.append(read_and_structure_csv(path)) elif path.endswith(".json"): texts1.append(extract_text_from_json(path)) context = "\n\n".join(texts1) # Initialize text splitter and vector index text_splitter = RecursiveCharacterTextSplitter(chunk_size=11000, chunk_overlap=1700) texts = text_splitter.split_text(context) api_key = "AIzaSyD4_iUnGy_ySlp1NCprIjhHL3iw25Ypreo" if not api_key: raise ValueError("API key not found. Please set your GEMINI_API_KEY in the environment.") model = ChatGoogleGenerativeAI( model="gemini-1.5-pro", google_api_key=api_key, temperature=0.1, convert_system_message_to_human=True ) embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5}) # Create QA chain template = """You are a highly knowledgeable and detail-oriented medical assistant specializing in recommending insurance plans.\n Ensure that each recommended plan meets every single requirement specified by the user.\n Use only the information provided in the context. Do not generate any information that is not explicitly mentioned in the context\n Context: {context} Question: {question} Helpful Answer:""" QA_CHAIN_PROMPT = PromptTemplate.from_template(template) qa_chain = RetrievalQA.from_chain_type( model, retriever=vector_index, return_source_documents=True, chain_type_kwargs={"prompt": QA_CHAIN_PROMPT} ) # History management history_file = "./history2.json" def load_history(): if os.path.exists(history_file): with open(history_file, "r") as f: try: data = json.load(f) if isinstance(data, list): return data except json.JSONDecodeError: pass return [] def save_history(history): with open(history_file, "w") as f: json.dump(history, f, indent=4) history = load_history() def userPreference(): genai.configure(api_key="AIzaSyD4_iUnGy_ySlp1NCprIjhHL3iw25Ypreo") user_history = extract_text_from_json("./history2.json") print(user_history,"user history\n\n\n") generation_config = { "temperature": 0.9, "top_p": 1, "max_output_tokens": 2048, "response_mime_type": "text/plain", } model1 = genai.GenerativeModel( model_name="gemini-1.0-pro", generation_config=generation_config, ) chat_session = model1.start_chat( history=[] ) response = chat_session.send_message(f"""{str(user_history)} Take user's preference only if they mention as want ,prefer or preference, etc.List user's preference.when user ask's to list something don't take it as preference. """) print("\n\n\n",response.text,"response \n\n") return response.text def ask_question(question): global history, context, vector_index if question.strip().lower() == "exit": history = [] save_history(history) # Reinitialize context and vector index context = "\n\n".join(texts1) texts = text_splitter.split_text(context) vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5}) return "Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started!" with open("./chat_history.txt", "a") as f: f.write(f"USER: {question}\n") result = qa_chain({"query": question}) answer = result["result"] history.append({"USER": question, "answer": answer}) save_history(history) pref = userPreference() print("\n\n",pref,"pref\n\n\n") context = "\n\n".join(texts1) context1 = context +"USER'S PREFERENCE"+pref texts = text_splitter.split_text(context1) vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5}) history_md = "" for entry in history: history_md += f"**USER:** {entry['USER']}\n\n**BOT:** {entry['answer']}\n\n---\n\n" return history_md initial_history_md = "" if not history: initial_history_md = "Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started!" else: for entry in history: initial_history_md += f"**USER:** {entry['USER']}\n\n**BOT:** {entry['answer']}\n\n---\n\n" with gr.Blocks() as demo: gr.HTML( """ """ ) history_output = gr.Markdown(value=initial_history_md, elem_classes="scrollable-history") with gr.Row(elem_classes="fixed-bottom"): with gr.Column(): question_input = gr.Textbox(lines=2, placeholder="Type your question here...", show_label=False) submit_button = gr.Button("Submit") submit_button.click(ask_question, inputs=question_input, outputs=history_output) submit_button.click(lambda: "", None, question_input) history_output demo.launch()