| | import warnings |
| | import os |
| | import json |
| | from langchain_google_genai import ChatGoogleGenerativeAI |
| | from langchain_core.prompts import PromptTemplate |
| | from langchain.chains.question_answering import load_qa_chain |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain_community.vectorstores import Chroma |
| | from langchain.chains import RetrievalQA |
| | from langchain_google_genai import GoogleGenerativeAIEmbeddings |
| | import gradio as gr |
| |
|
| | import PyPDF2 |
| | import csv |
| | import google.generativeai as genai |
| |
|
| | warnings.filterwarnings("ignore") |
| | global context |
| |
|
| |
|
| | |
| | def extract_text_from_pdf(pdf_path): |
| | with open(pdf_path, "rb") as f: |
| | pdf_reader = PyPDF2.PdfReader(f) |
| | text = "" |
| | for page_num in range(len(pdf_reader.pages)): |
| | page = pdf_reader.pages[page_num] |
| | text += page.extract_text() |
| | return text |
| |
|
| |
|
| | def extract_text_from_txt(txt_path): |
| | try: |
| | with open(txt_path, "r", encoding='utf-8') as f: |
| | return f.read() |
| | except UnicodeDecodeError: |
| | with open(txt_path, "r", encoding='latin-1') as f: |
| | return f.read() |
| |
|
| | def extract_text_from_json(json_path): |
| | with open(json_path, "r", encoding='utf-8') as f: |
| | try: |
| | data = json.load(f) |
| | if not data: |
| | return "" |
| | return json.dumps(data, indent=4) |
| | except json.JSONDecodeError: |
| | return "" |
| |
|
| | def read_and_structure_csv(csv_path): |
| | structured_data = [] |
| | with open(csv_path, mode='r', encoding='utf-8-sig') as file: |
| | csv_reader = csv.DictReader(file) |
| | for row in csv_reader: |
| | plan_details = f"plan_type: {row['plan_type']}\n" |
| | for key, value in row.items(): |
| | if key != 'plan_type': |
| | plan_details += f" - **{key.replace('_', ' ').title()}**: {value}\n" |
| | structured_data.append(plan_details) |
| | return "\n\n".join(structured_data) |
| |
|
| | |
| | file_paths = ["./Final Medigap - Medigap Generic Plan Details - Medigap Generic Plan Details CSV.csv","finalll - Sheet1.csv"] |
| | texts1 = [] |
| | for path in file_paths: |
| | if path.endswith(".pdf"): |
| | texts1.append(extract_text_from_pdf(path)) |
| | elif path.endswith(".txt"): |
| | texts1.append(extract_text_from_txt(path)) |
| | elif path.endswith(".csv"): |
| | texts1.append(read_and_structure_csv(path)) |
| | elif path.endswith(".json"): |
| | texts1.append(extract_text_from_json(path)) |
| |
|
| | context = "\n\n".join(texts1) |
| |
|
| | |
| | text_splitter = RecursiveCharacterTextSplitter(chunk_size=11000, chunk_overlap=1700) |
| | texts = text_splitter.split_text(context) |
| |
|
| | api_key = "AIzaSyD4_iUnGy_ySlp1NCprIjhHL3iw25Ypreo" |
| | if not api_key: |
| | raise ValueError("API key not found. Please set your GEMINI_API_KEY in the environment.") |
| |
|
| | model = ChatGoogleGenerativeAI( |
| | model="gemini-1.5-pro", |
| | google_api_key=api_key, |
| | temperature=0.1, |
| | convert_system_message_to_human=True |
| | ) |
| | embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) |
| | vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5}) |
| |
|
| | |
| | template = """You are a highly knowledgeable and detail-oriented medical assistant specializing in recommending insurance plans.\n |
| | Ensure that each recommended plan meets every single requirement specified by the user.\n |
| | Use only the information provided in the context. Do not generate any information that is not explicitly mentioned in the context\n |
| | Context: |
| | {context} |
| | Question: {question} |
| | Helpful Answer:""" |
| |
|
| | QA_CHAIN_PROMPT = PromptTemplate.from_template(template) |
| | qa_chain = RetrievalQA.from_chain_type( |
| | model, |
| | retriever=vector_index, |
| | return_source_documents=True, |
| | chain_type_kwargs={"prompt": QA_CHAIN_PROMPT} |
| | ) |
| |
|
| | |
| | history_file = "./history2.json" |
| |
|
| | def load_history(): |
| | if os.path.exists(history_file): |
| | with open(history_file, "r") as f: |
| | try: |
| | data = json.load(f) |
| | if isinstance(data, list): |
| | return data |
| | except json.JSONDecodeError: |
| | pass |
| | return [] |
| |
|
| | def save_history(history): |
| | with open(history_file, "w") as f: |
| | json.dump(history, f, indent=4) |
| |
|
| | history = load_history() |
| |
|
| | def userPreference(): |
| | genai.configure(api_key="AIzaSyD4_iUnGy_ySlp1NCprIjhHL3iw25Ypreo") |
| |
|
| | user_history = extract_text_from_json("./history2.json") |
| | print(user_history,"user history\n\n\n") |
| | generation_config = { |
| | "temperature": 0.9, |
| | "top_p": 1, |
| | "max_output_tokens": 2048, |
| | "response_mime_type": "text/plain", |
| | } |
| |
|
| | model1 = genai.GenerativeModel( |
| | model_name="gemini-1.0-pro", |
| | generation_config=generation_config, |
| |
|
| | ) |
| |
|
| | chat_session = model1.start_chat( |
| | history=[] |
| | ) |
| |
|
| | response = chat_session.send_message(f"""{str(user_history)} |
| | Take user's preference only if they mention as want ,prefer or preference, etc.List user's preference.when user ask's to list something don't take it as preference. |
| | """) |
| | print("\n\n\n",response.text,"response \n\n") |
| | return response.text |
| |
|
| | def ask_question(question): |
| | global history, context, vector_index |
| | |
| | if question.strip().lower() == "exit": |
| | history = [] |
| | save_history(history) |
| | |
| | |
| | context = "\n\n".join(texts1) |
| | texts = text_splitter.split_text(context) |
| | vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5}) |
| |
|
| | return "Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started!" |
| |
|
| | with open("./chat_history.txt", "a") as f: |
| | f.write(f"USER: {question}\n") |
| |
|
| | result = qa_chain({"query": question}) |
| | answer = result["result"] |
| | history.append({"USER": question, "answer": answer}) |
| | save_history(history) |
| | |
| | pref = userPreference() |
| | print("\n\n",pref,"pref\n\n\n") |
| | context = "\n\n".join(texts1) |
| | context1 = context +"USER'S PREFERENCE"+pref |
| | texts = text_splitter.split_text(context1) |
| | vector_index = Chroma.from_texts(texts, embeddings).as_retriever(search_kwargs={"k": 5}) |
| |
|
| | history_md = "" |
| | for entry in history: |
| | history_md += f"**USER:** {entry['USER']}\n\n**BOT:** {entry['answer']}\n\n---\n\n" |
| | return history_md |
| |
|
| | initial_history_md = "" |
| | if not history: |
| | initial_history_md = "Hey there! I'm your Medicare assistant. You can ask me questions related to different types of insurances and I'll help you. Let's get started!" |
| | else: |
| | for entry in history: |
| | initial_history_md += f"**USER:** {entry['USER']}\n\n**BOT:** {entry['answer']}\n\n---\n\n" |
| |
|
| | with gr.Blocks() as demo: |
| | gr.HTML( |
| | """ |
| | <style> |
| | .fixed-bottom { |
| | position: fixed; |
| | bottom: 0; |
| | width: 100%; |
| | padding: 10px; |
| | box-shadow: 0 -1px 10px rgba(0, 0, 0, 0.1); |
| | } |
| | .scrollable-history { |
| | max-height: 80vh; |
| | overflow-y: auto; |
| | margin-bottom: 100px; |
| | } |
| | </style> |
| | """ |
| | ) |
| | |
| | history_output = gr.Markdown(value=initial_history_md, elem_classes="scrollable-history") |
| | |
| | with gr.Row(elem_classes="fixed-bottom"): |
| | with gr.Column(): |
| | question_input = gr.Textbox(lines=2, placeholder="Type your question here...", show_label=False) |
| | submit_button = gr.Button("Submit") |
| | submit_button.click(ask_question, inputs=question_input, outputs=history_output) |
| | submit_button.click(lambda: "", None, question_input) |
| | |
| | history_output |
| |
|
| | demo.launch() |
| |
|