Spaces:
Runtime error
Runtime error
| import json | |
| import uuid | |
| from langchain.vectorstores import FAISS | |
| import os | |
| from tqdm.auto import tqdm | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.document_loaders import DirectoryLoader, TextLoader | |
| from llms.embeddings import EMBEDDINGS_MAPPING | |
| import tiktoken | |
| import zipfile | |
| import pickle | |
| tokenizer_name = tiktoken.encoding_for_model('gpt-4') | |
| tokenizer = tiktoken.get_encoding(tokenizer_name.name) | |
| EMBED_MODEL = "text-embedding-ada-002" | |
| EMBED_DIM = 1536 | |
| METRIC = 'cosine' | |
| ####################################################################################################################### | |
| # Files handler | |
| ####################################################################################################################### | |
| def check_existence(path): | |
| return os.path.isfile(path) or os.path.isdir(path) | |
| def list_files(directory, ext=".pdf"): | |
| # List all files in the directory | |
| files_in_directory = os.listdir(directory) | |
| # Filter the list to only include PDF files | |
| files_list = [file for file in files_in_directory if file.endswith(ext)] | |
| return files_list | |
| def list_pdf_files(directory): | |
| # List all files in the directory | |
| files_in_directory = os.listdir(directory) | |
| # Filter the list to only include PDF files | |
| pdf_files = [file for file in files_in_directory if file.endswith(".pdf")] | |
| return pdf_files | |
| def tiktoken_len(text): | |
| # evaluate how many tokens for the given text | |
| tokens = tokenizer.encode(text, disallowed_special=()) | |
| return len(tokens) | |
| def get_chunks(docs, chunk_size=500, chunk_overlap=20, length_function=tiktoken_len): | |
| # docs should be the output of `loader.load()` | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| length_function=length_function, | |
| separators=["\n\n", "\n", " ", ""]) | |
| chunks = [] | |
| for idx, page in enumerate(tqdm(docs)): | |
| source = page.metadata.get('source') | |
| content = page.page_content | |
| if len(content) > 50: | |
| texts = text_splitter.split_text(content) | |
| chunks.extend([str({'content': texts[i], 'chunk': i, 'source': os.path.basename(source)}) for i in | |
| range(len(texts))]) | |
| return chunks | |
| ####################################################################################################################### | |
| # Create FAISS object | |
| ####################################################################################################################### | |
| # ["text-embedding-ada-002", "distilbert-dot-tas_b-b256-msmarco"] | |
| def create_faiss_index_from_zip(path_to_zip_file, embeddings=None, pdf_loader=None, | |
| chunk_size=500, chunk_overlap=20, | |
| project_name="Very_Cool_Project_Name"): | |
| # initialize the file structure | |
| # structure: project_name | |
| # - source data | |
| # - embeddings | |
| # - faiss_index | |
| if isinstance(embeddings, str): | |
| import copy | |
| embeddings_str = copy.deepcopy(embeddings) | |
| else: | |
| embeddings_str = "other-embedding-model" | |
| if embeddings is None or embeddings == "text-embedding-ada-002": | |
| embeddings = EMBEDDINGS_MAPPING["text-embedding-ada-002"] | |
| elif isinstance(embeddings, str): | |
| embeddings = EMBEDDINGS_MAPPING[embeddings] | |
| else: | |
| embeddings = EMBEDDINGS_MAPPING["text-embedding-ada-002"] | |
| # STEP 1: | |
| # Create a folder f"{project_name}" in the current directory. | |
| current_directory = os.getcwd() | |
| if not os.path.exists(project_name): | |
| os.makedirs(project_name) | |
| project_path = os.path.join(current_directory, project_name) | |
| source_data = os.path.join(project_path, "source_data") | |
| embeddings_data = os.path.join(project_path, "embeddings") | |
| index_data = os.path.join(project_path, "faiss_index") | |
| os.makedirs(source_data) #./project/source_data | |
| os.makedirs(embeddings_data) #./project/embeddings | |
| os.makedirs(index_data) #./project/faiss_index | |
| else: | |
| raise ValueError(f"The project {project_name} exists.") | |
| with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref: | |
| # extract everything to "source_data" | |
| zip_ref.extractall(source_data) | |
| db_meta = {"project_name": project_name, | |
| "pdf_loader": pdf_loader.__name__, "chunk_size": chunk_size, | |
| "chunk_overlap": chunk_overlap, | |
| "embedding_model": embeddings_str, | |
| "files": os.listdir(source_data), | |
| "source_path": source_data} | |
| with open(os.path.join(project_path, "db_meta.json"), "w", encoding="utf-8") as f: | |
| # save db_meta.json to folder | |
| json.dump(db_meta, f) | |
| all_docs = [] | |
| for ext in [".txt", ".tex", ".md", ".pdf"]: | |
| if ext in [".txt", ".tex", ".md"]: | |
| loader = DirectoryLoader(source_data, glob=f"**/*{ext}", loader_cls=TextLoader, | |
| loader_kwargs={'autodetect_encoding': True}) | |
| elif ext in [".pdf"]: | |
| loader = DirectoryLoader(source_data, glob=f"**/*{ext}", loader_cls=pdf_loader) | |
| else: | |
| continue | |
| docs = loader.load() | |
| all_docs = all_docs + docs | |
| # split pdf files into chunks and evaluate its embeddings; save all results into embeddings | |
| chunks = get_chunks(all_docs, chunk_size, chunk_overlap) | |
| text_embeddings = embeddings.embed_documents(chunks) | |
| text_embedding_pairs = list(zip(chunks, text_embeddings)) | |
| embeddings_save_to = os.path.join(embeddings_data, 'text_embedding_pairs.pickle') | |
| with open(embeddings_save_to, 'wb') as handle: | |
| pickle.dump(text_embedding_pairs, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
| db = FAISS.from_embeddings(text_embedding_pairs, embeddings) | |
| db.save_local(index_data) | |
| print(db_meta) | |
| print("Success!") | |
| return db, project_name, db_meta | |
| def find_file(file_name, directory): | |
| for root, dirs, files in os.walk(directory): | |
| if file_name in files: | |
| return os.path.join(root, file_name) | |
| return None # If the file was not found | |
| def find_file_dir(file_name, directory): | |
| for root, dirs, files in os.walk(directory): | |
| if file_name in files: | |
| return root # return the directory instead of the full path | |
| return None # If the file was not found | |
| def load_faiss_index_from_zip(path_to_zip_file): | |
| # Extract the zip file. Read the db_meta | |
| # base_name = os.path.basename(path_to_zip_file) | |
| path_to_extract = os.path.join(os.getcwd()) | |
| with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref: | |
| zip_ref.extractall(path_to_extract) | |
| db_meta_json = find_file("db_meta.json" , path_to_extract) | |
| if db_meta_json is not None: | |
| with open(db_meta_json, "r", encoding="utf-8") as f: | |
| db_meta_dict = json.load(f) | |
| else: | |
| raise ValueError("Cannot find `db_meta.json` in the .zip file. ") | |
| try: | |
| embeddings = EMBEDDINGS_MAPPING[db_meta_dict["embedding_model"]] | |
| except: | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| embeddings = OpenAIEmbeddings(model="text-embedding-ada-002") | |
| # locate index.faiss | |
| index_path = find_file_dir("index.faiss", path_to_extract) | |
| if index_path is not None: | |
| db = FAISS.load_local(index_path, embeddings) | |
| return db | |
| else: | |
| raise ValueError("Failed to find `index.faiss` in the .zip file.") | |
| if __name__ == "__main__": | |
| from langchain.document_loaders import PyPDFLoader | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| model_name = "sebastian-hofstaetter/distilbert-dot-tas_b-b256-msmarco" | |
| model_kwargs = {'device': 'cpu'} | |
| encode_kwargs = {'normalize_embeddings': False} | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name=model_name, | |
| model_kwargs=model_kwargs, | |
| encode_kwargs=encode_kwargs) | |
| create_faiss_index_from_zip(path_to_zip_file="document.zip", pdf_loader=PyPDFLoader, embeddings=embeddings) | |