Spaces:
Sleeping
Sleeping
| import os, os.path | |
| # to search web for results | |
| from urllib.parse import urlparse, quote | |
| import requests | |
| from duckduckgo_search import DDGS | |
| # to present web search results in a table | |
| import pandas as pd | |
| # to get get document chunks, embed and build vector database | |
| # 2 vector databases are built, one for keyword search (BM25), one for semantic (Chroma) | |
| from langchain.document_loaders import WebBaseLoader, PyPDFLoader | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.vectorstores import Chroma | |
| from langchain.retrievers import BM25Retriever | |
| # for saving bm25 retriever | |
| # pickle is not for production, just for prototype | |
| import pickle | |
| # this is for returning top n search results using DuckDuckGo | |
| top_n_results = 10 | |
| # chroma vector store will be set up for all the different combinations of chunk sizes and overlaps below | |
| # the process of building up the vector store will take very long | |
| chunk_sizes = [500, 600, 700, 800, 900, 1000, 1250, 1500, 1750, 2000, 2250, 2500, 2750, 3000] | |
| chunk_overlaps = [50, 100, 150, 200] | |
| ################################ Search the Web ################################ | |
| ## Use DuckDuckGo to Search for Top N Results for Each Country's ESG Policies | |
| # Use DuckDuckGo search to loop through each country and save the top N results by searching for | |
| # "{country} sustainability esg newest updated public policy document government" | |
| # After some experimentation the search phrase above seems to give the best results | |
| # for the most recent ESG policies as it contains all the necessary keywords, but it can be changed | |
| # Store the Relevant Links in a Dictionary | |
| # Links are Mostly HTML or PDF | |
| def duckduckgo_scrape(country, search_term, n_search_results): | |
| all_links = [] | |
| with DDGS() as ddgs: | |
| results = ddgs.text(search_term, max_results=n_search_results) | |
| for result in results: | |
| result['country'] = country | |
| all_links.append(result) | |
| # Save scraped links into csv | |
| df_links = pd.DataFrame(all_links).rename(columns = { | |
| 'title': 'Title', | |
| 'href': 'url', | |
| 'body': 'Summarized Body', | |
| 'country': 'Country' | |
| }) | |
| # save scraped links into csv | |
| df_links.to_csv("duck_duck_go_scraped_links.csv") | |
| return all_links, df_links | |
| ################################ Load the Documents ################################ | |
| ## For every search result returned by DuckDuckGo for each country above, scrape the web using the url and convert to documents using Langchain loaders: | |
| # PDF Documents: If link from search result points to PDF document, | |
| # save the PDF permanently in local storage in the folder called 'pdfs', then use PyPDFLoader to convert it to raw documents. | |
| # HTML Documents: If link is just a HTML page, use Langchain WebBaseLoader to convert it to raw documents. | |
| # Metadata: Add country in the metadata, this is an important step as it is needed by RetrieveQA for filtering in the future. | |
| # For PDFs, langchain will use its local path as the source, need to change it back to the online path. | |
| # Save all the documents into a list called "all_documents". | |
| # for adding country metadata | |
| def add_country_metadata(docs, country): | |
| for doc in docs: | |
| doc.metadata['country'] = country | |
| return docs | |
| # for adding source url metadata | |
| def add_url_metadata(docs, url): | |
| for doc in docs: | |
| doc.metadata['source'] = url | |
| return docs | |
| # If link from search result points to PDF document, | |
| # save the PDF permanently in local storage in the folder called 'pdfs', | |
| # then use PyPDFLoader to convert it to raw documents. | |
| def pdf_loader(url, country): | |
| try: | |
| try: | |
| response = requests.get(url) | |
| except: | |
| # sometimes there is ssl error, and the page is actually http:// | |
| url = url.replace("https://", "http://") | |
| response = requests.get(url) | |
| # create pdf directory to save pdfs locally | |
| pdf_dir = f"pdfs/{country}" | |
| if not os.path.exists(pdf_dir): | |
| os.makedirs(pdf_dir) | |
| pdf_filename = f"{pdf_dir}/{url.split('/')[-1]}" | |
| with open(pdf_filename, 'wb') as f: # save the pdf locally first | |
| f.write(response.content) | |
| loader = PyPDFLoader(pdf_filename) # then use langchain loader to load it | |
| raw_pdf_documents = loader.load() | |
| raw_pdf_documents = add_country_metadata(raw_pdf_documents, country) | |
| # pdf source data will be populated by Langchain as the local path | |
| # we do not want this, we change it back to the original path on the web instead | |
| raw_pdf_documents = add_url_metadata(raw_pdf_documents, url) | |
| return raw_pdf_documents | |
| except Exception as e: | |
| print(f"Failed to load for {url}") | |
| # Same as above but for pdf in local directory | |
| def pdf_loader_local(pdf_filename, country): | |
| try: | |
| loader = PyPDFLoader(pdf_filename) # then use langchain loader to load it | |
| raw_pdf_documents = loader.load() | |
| raw_pdf_documents = add_country_metadata(raw_pdf_documents, country) | |
| return raw_pdf_documents | |
| except Exception as e: | |
| print(f"Failed to load for {pdf_filename} {e}") | |
| return False | |
| # If link is just a HTML page, use Langchain WebBaseLoader to convert it to raw documents. | |
| def html_loader(url, country): | |
| try: | |
| loader = WebBaseLoader(url) | |
| raw_html_documents = loader.load() | |
| raw_html_documents = add_country_metadata(raw_html_documents, country) | |
| return raw_html_documents | |
| except: | |
| print(f"Failed to load for {url}") | |
| def process_links_load_documents(all_links): | |
| all_documents = [] # store all the documents | |
| for link in all_links: | |
| country = link['country'] | |
| title = link['title'] | |
| url = link['href'] | |
| url = url.replace(" ", "%20") # replace spaces to encoded version e.g. %20 | |
| # If url points to PDF documents | |
| if url.endswith('.pdf') or (('.pdf' in url) & ('blob' in url)): | |
| print(f"{country}: Loading PDF from {url}") | |
| docs = pdf_loader(url, country) | |
| if docs is not None: # if error, docs will be None | |
| if isinstance(docs, list): | |
| all_documents.extend(docs) | |
| else: | |
| all_documents.append(docs) | |
| #print(docs) | |
| # If url is just a HTML page | |
| else: | |
| print(f"{country}: Loading HTML from {url}") | |
| docs = html_loader(url, country) | |
| if docs is not None: # if error, docs will be None | |
| if isinstance(docs, list): | |
| all_documents.extend(docs) | |
| else: | |
| all_documents.append(docs) | |
| #print(docs) | |
| # documents return a lot of \n, perform some cleaning | |
| for document in all_documents: | |
| document.page_content = document.page_content.replace('\n', '') | |
| return all_documents | |
| ################################ Set Up Chroma Vector Store ################################ | |
| # This is for semantic search. | |
| # In the configuration cell at the top, we define all the chunk sizes and overlaps that we are interested in. | |
| # The Chroma vector stores will be set up for each of the configuration, persisted in a different directory. | |
| # These vector stores can be accessed in the main app later. | |
| # Time taken to get the embeddings for every document chunk can be very long. | |
| # Note: If we are using a lot more data than can be stored in the RAM or when in production, | |
| # better to initialize a separate vector store in a server (Postgres or online solutions like Pinecone) before pushing the document chunks to it bit by bit. | |
| def setup_chromadb_vectorstore(hf_embeddings, all_documents, chunk_size, chunk_overlap, country): | |
| chromadb_dir = "chromadb" | |
| if not os.path.exists(chromadb_dir): | |
| os.makedirs(chromadb_dir) | |
| print(f"Processing Chunk Size: {chunk_size}, Chunk Overlap: {chunk_overlap}") | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, chunk_overlap=chunk_overlap | |
| ) | |
| split_documents = text_splitter.split_documents(all_documents) | |
| persist_directory = f"{chromadb_dir}/new_{country}_chunk_{chunk_size}_overlap_{chunk_overlap}_" | |
| # Build the vector database using Chroma and persist it in a local directory | |
| chroma_db = Chroma.from_documents(split_documents, | |
| hf_embeddings, | |
| persist_directory=persist_directory) | |
| chroma_db.persist() | |
| return True # to let user know this process is done | |
| ################################ Set Up BM25 Retriever ################################ | |
| # This is for keyword search. | |
| # BM25 is a keyword-based algorithm that performs well on queries containing keywords without capturing the semantic meaning of the query terms, | |
| # hence there is no need to embed the text with HuggingFaceEmbeddings and it is relatively faster to set up. | |
| # We will use it with combination of the chroma_db vector store retriver in our application later, with ensemble retriever to re-rank the results. | |
| # The retriever is just a small file so we just store it using pickle, but for production this is still not recommended. | |
| def setup_bm25_retriever(all_documents, chunk_size, chunk_overlap, country): | |
| bm25_dir = "bm25" | |
| if not os.path.exists(bm25_dir): | |
| os.makedirs(bm25_dir) | |
| print(f"Processing Chunk Size: {chunk_size}, Chunk Overlap: {chunk_overlap}, Country: {country}") | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, chunk_overlap=chunk_overlap | |
| ) | |
| split_documents = text_splitter.split_documents(all_documents) | |
| split_documents = [doc for doc in split_documents if doc.metadata['country']==country] | |
| bm25_retriever = BM25Retriever.from_documents(split_documents) | |
| filename = f"{bm25_dir}/new_{country}_chunk_{chunk_size}_overlap_{chunk_overlap}_.pickle" | |
| with open(filename, 'wb') as handle: | |
| pickle.dump(bm25_retriever, handle) | |
| return True # to let user know this process is done | |