| import os |
| import json |
| import re |
| import gradio as gr |
| import pandas as pd |
| import requests |
| import random |
| import urllib.parse |
| from tempfile import NamedTemporaryFile |
| from typing import List |
| from bs4 import BeautifulSoup |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_community.vectorstores import FAISS |
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain_core.output_parsers import StrOutputParser |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_community.llms import HuggingFaceHub |
| from langchain_core.runnables import RunnableParallel, RunnablePassthrough |
| from langchain_core.documents import Document |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics.pairwise import cosine_similarity |
| from datetime import datetime |
| from huggingface_hub.utils import HfHubHTTPError |
|
|
| huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") |
|
|
| |
| memory_database = {} |
| conversation_history = [] |
|
|
| def load_and_split_document_basic(file): |
| """Loads and splits the document into pages.""" |
| loader = PyPDFLoader(file.name) |
| data = loader.load_and_split() |
| return data |
|
|
| def load_and_split_document_recursive(file: NamedTemporaryFile) -> List[Document]: |
| """Loads and splits the document into chunks.""" |
| loader = PyPDFLoader(file.name) |
| pages = loader.load() |
| |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200, |
| length_function=len, |
| ) |
| |
| chunks = text_splitter.split_documents(pages) |
| return chunks |
|
|
| def get_embeddings(): |
| return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") |
|
|
| def create_or_update_database(data, embeddings): |
| if os.path.exists("faiss_database"): |
| db = FAISS.load_local("faiss_database", embeddings, allow_dangerous_deserialization=True) |
| db.add_documents(data) |
| else: |
| db = FAISS.from_documents(data, embeddings) |
| db.save_local("faiss_database") |
|
|
| def clear_cache(): |
| if os.path.exists("faiss_database"): |
| os.remove("faiss_database") |
| return "Cache cleared successfully." |
| else: |
| return "No cache to clear." |
|
|
| def get_similarity(text1, text2): |
| vectorizer = TfidfVectorizer().fit_transform([text1, text2]) |
| return cosine_similarity(vectorizer[0:1], vectorizer[1:2])[0][0] |
|
|
| prompt = """ |
| Answer the question based on the following information: |
| |
| Conversation History: |
| {history} |
| |
| Context from documents: |
| {context} |
| |
| Current Question: {question} |
| |
| If the question is referring to the conversation history, use that information to answer. |
| If the question is not related to the conversation history, use the context from documents to answer. |
| If you don't have enough information to answer, say so. |
| |
| Provide a concise and direct answer to the question: |
| """ |
|
|
| def get_model(temperature, top_p, repetition_penalty): |
| return HuggingFaceHub( |
| repo_id="mistralai/Mistral-7B-Instruct-v0.3", |
| model_kwargs={ |
| "temperature": temperature, |
| "top_p": top_p, |
| "repetition_penalty": repetition_penalty, |
| "max_length": 1000 |
| }, |
| huggingfacehub_api_token=huggingface_token |
| ) |
|
|
| def generate_chunked_response(model, prompt, max_tokens=200): |
| full_response = "" |
| total_length = len(prompt.split()) |
| |
| while total_length < 7800: |
| try: |
| chunk = model(prompt + full_response, max_new_tokens=min(200, 7800 - total_length)) |
| chunk = chunk.strip() |
| if not chunk: |
| break |
| full_response += chunk |
| total_length += len(chunk.split()) |
| |
| if chunk.endswith((".", "!", "?")): |
| break |
| except Exception as e: |
| print(f"Error generating response: {str(e)}") |
| break |
| |
| return full_response.strip() |
|
|
| def manage_conversation_history(question, answer, history, max_history=5): |
| history.append({"question": question, "answer": answer}) |
| if len(history) > max_history: |
| history.pop(0) |
| return history |
|
|
| def is_related_to_history(question, history, threshold=0.3): |
| if not history: |
| return False |
| history_text = " ".join([f"{h['question']} {h['answer']}" for h in history]) |
| similarity = get_similarity(question, history_text) |
| return similarity > threshold |
|
|
| def extract_text_from_webpage(html): |
| soup = BeautifulSoup(html, 'html.parser') |
| for script in soup(["script", "style"]): |
| script.extract() |
| text = soup.get_text() |
| lines = (line.strip() for line in text.splitlines()) |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
| text = '\n'.join(chunk for chunk in chunks if chunk) |
| return text |
|
|
| _useragent_list = [ |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
| ] |
|
|
| def google_search(term, num_results=20, lang="en", timeout=5, safe="active", ssl_verify=None): |
| escaped_term = urllib.parse.quote_plus(term) |
| start = 0 |
| all_results = [] |
| max_chars_per_page = 8000 |
|
|
| print(f"Starting Google search for term: '{term}'") |
|
|
| with requests.Session() as session: |
| while start < num_results: |
| try: |
| user_agent = random.choice(_useragent_list) |
| headers = { |
| 'User-Agent': user_agent |
| } |
| resp = session.get( |
| url="https://www.google.com/search", |
| headers=headers, |
| params={ |
| "q": term, |
| "num": num_results - start, |
| "hl": lang, |
| "start": start, |
| "safe": safe, |
| }, |
| timeout=timeout, |
| verify=ssl_verify, |
| ) |
| resp.raise_for_status() |
| print(f"Successfully retrieved search results page (start={start})") |
| except requests.exceptions.RequestException as e: |
| print(f"Error retrieving search results: {e}") |
| break |
|
|
| soup = BeautifulSoup(resp.text, "html.parser") |
| result_block = soup.find_all("div", attrs={"class": "g"}) |
| if not result_block: |
| print("No results found on this page") |
| break |
| |
| print(f"Found {len(result_block)} results on this page") |
| for result in result_block: |
| link = result.find("a", href=True) |
| title = result.find("h3") |
| if link and title: |
| link = link["href"] |
| title = title.get_text() |
| print(f"Processing link: {link}") |
| try: |
| webpage = session.get(link, headers=headers, timeout=timeout) |
| webpage.raise_for_status() |
| visible_text = extract_text_from_webpage(webpage.text) |
| if len(visible_text) > max_chars_per_page: |
| visible_text = visible_text[:max_chars_per_page] + "..." |
| all_results.append({"link": link, "title": title, "text": visible_text}) |
| print(f"Successfully extracted text from {link}") |
| except requests.exceptions.RequestException as e: |
| print(f"Error retrieving webpage content: {e}") |
| all_results.append({"link": link, "title": title, "text": None}) |
| else: |
| print("No link or title found for this result") |
| all_results.append({"link": None, "title": None, "text": None}) |
| start += len(result_block) |
|
|
| print(f"Search completed. Total results: {len(all_results)}") |
| print("Search results:") |
| for i, result in enumerate(all_results, 1): |
| print(f"Result {i}:") |
| print(f" Title: {result['title']}") |
| print(f" Link: {result['link']}") |
| if result['text']: |
| print(f" Text: {result['text'][:100]}...") |
| else: |
| print(" Text: None") |
| print("End of search results") |
|
|
| if not all_results: |
| print("No search results found. Returning a default message.") |
| return [{"link": None, "title": "No Results", "text": "No information found in the web search results."}] |
|
|
| return all_results |
|
|
| def summarize_content(content, model): |
| if content is None: |
| return "No content available to summarize." |
|
|
| |
| |
| max_chars = 7000 * 4 |
| if len(content) > max_chars: |
| content = content[:max_chars] + "..." |
| |
| summary_prompt = f""" |
| Summarize the following content concisely: |
| {content} |
| Summary: |
| """ |
| summary = generate_chunked_response(model, summary_prompt, max_tokens=200) |
| return summary |
|
|
| def rank_search_results(titles, summaries, model): |
| if not titles or not summaries: |
| print("No titles or summaries to rank.") |
| return list(range(1, len(titles) + 1)) |
|
|
| ranking_prompt = ( |
| "Rank the following search results from a financial analyst perspective. " |
| f"Assign a rank from 1 to {len(titles)} based on relevance, with 1 being the most relevant. " |
| "Return only the numeric ranks in order, separated by commas.\n\n" |
| "Titles and summaries:\n" |
| ) |
| |
| for i, (title, summary) in enumerate(zip(titles, summaries), 1): |
| ranking_prompt += f"{i}. Title: {title}\nSummary: {summary}\n\n" |
| |
| ranking_prompt += "Ranks:" |
| |
| try: |
| ranks_str = generate_chunked_response(model, ranking_prompt) |
| ranks = [float(rank.strip()) for rank in ranks_str.split(',') if rank.strip()] |
| |
| |
| if len(ranks) != len(titles): |
| print(f"Warning: Number of ranks ({len(ranks)}) does not match number of titles ({len(titles)})") |
| print(f"Model output: {ranks_str}") |
| return list(range(1, len(titles) + 1)) |
| |
| return ranks |
| except Exception as e: |
| print(f"Error in ranking: {str(e)}. Using fallback ranking method.") |
| return list(range(1, len(titles) + 1)) |
|
|
| def ask_question(question, temperature, top_p, repetition_penalty, web_search): |
| global conversation_history |
|
|
| if not question: |
| return "Please enter a question." |
|
|
| model = get_model(temperature, top_p, repetition_penalty) |
| embed = get_embeddings() |
|
|
| |
| if os.path.exists("faiss_database"): |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
| else: |
| database = None |
|
|
| if web_search: |
| search_results = google_search(question) |
| |
| processed_results = [] |
| for index, result in enumerate(search_results, start=1): |
| if result["text"] is not None: |
| try: |
| summary = summarize_content(result["text"], model) |
| processed_results.append({ |
| "title": result.get("title", f"Result {index}"), |
| "content": result["text"], |
| "summary": summary, |
| "index": index |
| }) |
| except Exception as e: |
| print(f"Error processing search result {index}: {str(e)}") |
| else: |
| print(f"Skipping result {index} due to None content") |
| |
| if not processed_results: |
| return "No valid search results found." |
|
|
| |
| titles = [r["title"] for r in processed_results] |
| summaries = [r["summary"] for r in processed_results] |
| try: |
| ranks = rank_search_results(titles, summaries, model) |
| except Exception as e: |
| print(f"Error in ranking results: {str(e)}. Using default ranking.") |
| ranks = list(range(1, len(processed_results) + 1)) |
|
|
| |
| current_date = datetime.now().strftime("%Y-%m-%d") |
| update_vector_db_with_search_results(processed_results, ranks, current_date) |
| |
| |
| context_str = "\n\n".join([f"Title: {r['title']}\nSummary: {r['summary']}\nRank: {ranks[i]}" |
| for i, r in enumerate(processed_results)]) |
| |
| prompt_template = """ |
| Answer the question based on the following web search results: |
| Web Search Results: |
| {context} |
| Current Question: {question} |
| If the web search results don't contain relevant information, state that the information is not available in the search results. |
| Provide a concise and direct answer to the question without mentioning the web search or these instructions: |
| """ |
| prompt_val = ChatPromptTemplate.from_template(prompt_template) |
| formatted_prompt = prompt_val.format(context=context_str, question=question) |
| else: |
| if database is None: |
| return "No documents available. Please upload documents or enable web search to answer questions." |
|
|
| history_str = "\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in conversation_history]) |
|
|
| if is_related_to_history(question, conversation_history): |
| context_str = "No additional context needed. Please refer to the conversation history." |
| else: |
| retriever = database.as_retriever() |
| relevant_docs = retriever.get_relevant_documents(question) |
| context_str = "\n".join([doc.page_content for doc in relevant_docs]) |
|
|
| prompt_val = ChatPromptTemplate.from_template(prompt) |
| formatted_prompt = prompt_val.format(history=history_str, context=context_str, question=question) |
|
|
| full_response = generate_chunked_response(model, formatted_prompt) |
| |
| |
| answer_patterns = [ |
| r"Provide a concise and direct answer to the question without mentioning the web search or these instructions:", |
| r"Provide a concise and direct answer to the question:", |
| r"Answer:" |
| ] |
| |
| for pattern in answer_patterns: |
| match = re.split(pattern, full_response, flags=re.IGNORECASE) |
| if len(match) > 1: |
| answer = match[-1].strip() |
| break |
| else: |
| |
| answer = full_response.strip() |
|
|
| if not web_search: |
| memory_database[question] = answer |
| conversation_history = manage_conversation_history(question, answer, conversation_history) |
|
|
| return answer |
|
|
| def update_vectors(files, use_recursive_splitter): |
| if not files: |
| return "Please upload at least one PDF file." |
| |
| embed = get_embeddings() |
| total_chunks = 0 |
| |
| all_data = [] |
| for file in files: |
| if use_recursive_splitter: |
| data = load_and_split_document_recursive(file) |
| else: |
| data = load_and_split_document_basic(file) |
| all_data.extend(data) |
| total_chunks += len(data) |
| |
| if os.path.exists("faiss_database"): |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
| database.add_documents(all_data) |
| else: |
| database = FAISS.from_documents(all_data, embed) |
| |
| database.save_local("faiss_database") |
| |
| return f"Vector store updated successfully. Processed {total_chunks} chunks from {len(files)} files." |
|
|
| def update_vector_db_with_search_results(search_results, summaries, ranks): |
| embed = get_embeddings() |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) if os.path.exists("faiss_database") else FAISS.from_documents([], embed) |
| |
| current_date = datetime.now().strftime("%Y-%m-%d") |
| |
| documents = [] |
| for result, summary, rank in zip(search_results, summaries, ranks): |
| if summary: |
| doc = Document( |
| page_content=summary, |
| metadata={ |
| "search_date": current_date, |
| "search_title": result["title"], |
| "search_content": result["text"], |
| "search_summary": summary, |
| "rank": rank |
| } |
| ) |
| documents.append(doc) |
| |
| if documents: |
| database.add_documents(documents) |
| database.save_local("faiss_database") |
| else: |
| print("No valid documents to add to the database.") |
|
|
| def export_vector_db_to_excel(): |
| embed = get_embeddings() |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
| |
| documents = database.docstore._dict.values() |
| data = [{ |
| "Search Date": doc.metadata["search_date"], |
| "Search Title": doc.metadata["search_title"], |
| "Search Content": doc.metadata["search_content"], |
| "Search Summary": doc.metadata["search_summary"], |
| "Rank": doc.metadata["rank"] |
| } for doc in documents] |
| |
| df = pd.DataFrame(data) |
| |
| with NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: |
| excel_path = tmp.name |
| df.to_excel(excel_path, index=False) |
| |
| return excel_path |
| |
| def extract_db_to_excel(): |
| embed = get_embeddings() |
| database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
| |
| documents = database.docstore._dict.values() |
| data = [{"page_content": doc.page_content, "metadata": json.dumps(doc.metadata)} for doc in documents] |
| df = pd.DataFrame(data) |
| |
| with NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: |
| excel_path = tmp.name |
| df.to_excel(excel_path, index=False) |
| |
| return excel_path |
|
|
| def export_memory_db_to_excel(): |
| data = [{"question": question, "answer": answer} for question, answer in memory_database.items()] |
| df_memory = pd.DataFrame(data) |
| |
| data_history = [{"question": item["question"], "answer": item["answer"]} for item in conversation_history] |
| df_history = pd.DataFrame(data_history) |
| |
| with NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: |
| excel_path = tmp.name |
| with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: |
| df_memory.to_excel(writer, sheet_name='Memory Database', index=False) |
| df_history.to_excel(writer, sheet_name='Conversation History', index=False) |
| |
| return excel_path |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# Chat with your PDF documents") |
| |
| with gr.Row(): |
| file_input = gr.Files(label="Upload your PDF documents", file_types=[".pdf"]) |
| update_button = gr.Button("Update Vector Store") |
| use_recursive_splitter = gr.Checkbox(label="Use Recursive Text Splitter", value=False) |
| |
| update_output = gr.Textbox(label="Update Status") |
| update_button.click(update_vectors, inputs=[file_input, use_recursive_splitter], outputs=update_output) |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| chatbot = gr.Chatbot(label="Conversation") |
| question_input = gr.Textbox(label="Ask a question about your documents") |
| submit_button = gr.Button("Submit") |
| with gr.Column(scale=1): |
| temperature_slider = gr.Slider(label="Temperature", minimum=0.0, maximum=1.0, value=0.5, step=0.1) |
| top_p_slider = gr.Slider(label="Top P", minimum=0.0, maximum=1.0, value=0.9, step=0.1) |
| repetition_penalty_slider = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, value=1.0, step=0.1) |
| web_search_checkbox = gr.Checkbox(label="Enable Web Search", value=False) |
| |
| def chat(question, history, temperature, top_p, repetition_penalty, web_search): |
| answer = ask_question(question, temperature, top_p, repetition_penalty, web_search) |
| history.append((question, answer)) |
| return "", history |
| |
| submit_button.click(chat, inputs=[question_input, chatbot, temperature_slider, top_p_slider, repetition_penalty_slider, web_search_checkbox], outputs=[question_input, chatbot]) |
| |
| export_vector_db_button = gr.Button("Export Vector DB to Excel") |
| vector_db_excel_output = gr.File(label="Download Vector DB Excel File") |
| export_vector_db_button.click(export_vector_db_to_excel, inputs=[], outputs=vector_db_excel_output) |
|
|
| extract_button = gr.Button("Extract Database to Excel") |
| excel_output = gr.File(label="Download Excel File") |
| extract_button.click(extract_db_to_excel, inputs=[], outputs=excel_output) |
| |
| export_memory_button = gr.Button("Export Memory Database to Excel") |
| memory_excel_output = gr.File(label="Download Memory Excel File") |
| export_memory_button.click(export_memory_db_to_excel, inputs=[], outputs=memory_excel_output) |
| |
| clear_button = gr.Button("Clear Cache") |
| clear_output = gr.Textbox(label="Cache Status") |
| clear_button.click(clear_cache, inputs=[], outputs=clear_output) |
|
|
| if __name__ == "__main__": |
| demo.launch() |