Removed langchain and llama-cpp-python (not actively supported anymore) dependencies. Updated packages. Updated default dataset
5b2f824
| # Import package | |
| import os | |
| from pathlib import Path | |
| import re | |
| import requests | |
| import pandas as pd | |
| import dateutil.parser | |
| from typing import Type, List, Tuple | |
| import shutil | |
| import numpy as np | |
| import gradio as gr | |
| import zipfile | |
| import tempfile | |
| from pathlib import Path | |
| from tools.embeddings import HuggingFaceEmbeddings | |
| from tools.faiss_store import FAISS, InMemoryDocstore | |
| from tools.text_splitter import RecursiveCharacterTextSplitter | |
| from tools.document import Document | |
| from typing import Protocol # For type hinting | |
| from tqdm import tqdm | |
| from uuid import uuid4 # To generate unique IDs for documents in the docstore | |
| # Type hint for embeddings | |
| class Embeddings(Protocol): | |
| def embed_documents(self, texts: List[str]) -> List[List[float]]: ... | |
| def embed_query(self, text: str) -> List[float]: ... | |
| from bs4 import BeautifulSoup | |
| from docx import Document as Doc | |
| from pypdf import PdfReader | |
| import faiss # For directly creating the FAISS index | |
| from tools.config import EMBEDDINGS_MODEL_NAME | |
| PandasDataFrame = Type[pd.DataFrame] | |
| split_strat = ["\n\n", "\n", ". ", "! ", "? "] | |
| chunk_size = 300 | |
| chunk_overlap = 0 | |
| start_index = True | |
| ## Parse files | |
| def determine_file_type(file_path): | |
| """ | |
| Determine the file type based on its extension. | |
| Parameters: | |
| file_path (str): Path to the file. | |
| Returns: | |
| str: File extension (e.g., '.pdf', '.docx', '.txt', '.html', '.md'). | |
| """ | |
| return os.path.splitext(file_path)[1].lower() | |
| def parse_file(file_paths, text_column='text'): | |
| """ | |
| Accepts a list of file paths, determines each file's type based on its extension, | |
| and passes it to the relevant parsing function. | |
| Parameters: | |
| file_paths (list): List of file paths. | |
| text_column (str): Name of the column in CSV/Excel files that contains the text content. | |
| Returns: | |
| dict: A dictionary with file paths as keys and their parsed content (or error message) as values. | |
| """ | |
| if not isinstance(file_paths, list): | |
| raise ValueError("Expected a list of file paths.") | |
| extension_to_parser = { | |
| '.pdf': parse_pdf, | |
| '.docx': parse_docx, | |
| '.txt': parse_txt, | |
| '.md': parse_markdown, | |
| '.html': parse_html, | |
| '.htm': parse_html, # Considering both .html and .htm for HTML files | |
| '.csv': lambda file_path: parse_csv_or_excel(file_path, text_column), | |
| '.xlsx': lambda file_path: parse_csv_or_excel(file_path, text_column) | |
| } | |
| parsed_contents = {} | |
| file_names = [] | |
| for file_path in file_paths: | |
| print(file_path.name) | |
| #file = open(file_path.name, 'r') | |
| #print(file) | |
| file_extension = determine_file_type(file_path.name) | |
| if file_extension in extension_to_parser: | |
| parsed_contents[file_path.name] = extension_to_parser[file_extension](file_path.name) | |
| else: | |
| parsed_contents[file_path.name] = f"Unsupported file type: {file_extension}" | |
| filename_end = get_file_path_end(file_path.name) | |
| file_names.append(filename_end) | |
| return parsed_contents, file_names | |
| def text_regex_clean(text): | |
| # Merge hyphenated words | |
| text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text) | |
| # If a double newline ends in a letter, add a full stop. | |
| text = re.sub(r'(?<=[a-zA-Z])\n\n', '.\n\n', text) | |
| # Fix newlines in the middle of sentences | |
| text = re.sub(r"(?<!\n\s)\n(?!\s\n)", " ", text.strip()) | |
| # Remove multiple newlines | |
| text = re.sub(r"\n\s*\n", "\n\n", text) | |
| text = re.sub(r" ", " ", text) | |
| # Add full stops and new lines between words with no space between where the second one has a capital letter | |
| text = re.sub(r'(?<=[a-z])(?=[A-Z])', '. \n\n', text) | |
| return text | |
| def parse_csv_or_excel(file_paths, text_column = "text"): | |
| """ | |
| Read in a CSV or Excel file. | |
| Parameters: | |
| file_path (str): Path to the CSV file. | |
| text_column (str): Name of the column in the CSV file that contains the text content. | |
| Returns: | |
| Pandas DataFrame: Dataframe output from file read | |
| """ | |
| file_names = [] | |
| out_df = pd.DataFrame() | |
| for file_path in file_paths: | |
| file_extension = determine_file_type(file_path.name) | |
| file_name = get_file_path_end(file_path.name) | |
| if file_extension == ".csv": | |
| df = pd.read_csv(file_path.name) | |
| if text_column not in df.columns: return pd.DataFrame(), ['Please choose a valid column name'] | |
| df['source'] = file_name | |
| df['page_section'] = "" | |
| elif file_extension == ".xlsx": | |
| df = pd.read_excel(file_path.name, engine='openpyxl') | |
| if text_column not in df.columns: return pd.DataFrame(), ['Please choose a valid column name'] | |
| df['source'] = file_name | |
| df['page_section'] = "" | |
| else: | |
| print(f"Unsupported file type: {file_extension}") | |
| return pd.DataFrame(), ['Please choose a valid file type'] | |
| file_names.append(file_name) | |
| out_df = pd.concat([out_df, df]) | |
| #if text_column not in df.columns: | |
| # return f"Column '{text_column}' not found in {file_path}" | |
| #text_out = " ".join(df[text_column].dropna().astype(str)) | |
| return out_df, file_names | |
| def parse_excel(file_path, text_column): | |
| """ | |
| Read text from an Excel file. | |
| Parameters: | |
| file_path (str): Path to the Excel file. | |
| text_column (str): Name of the column in the Excel file that contains the text content. | |
| Returns: | |
| Pandas DataFrame: Dataframe output from file read | |
| """ | |
| df = pd.read_excel(file_path, engine='openpyxl') | |
| #if text_column not in df.columns: | |
| # return f"Column '{text_column}' not found in {file_path}" | |
| #text_out = " ".join(df[text_column].dropna().astype(str)) | |
| return df | |
| def parse_pdf(file) -> List[str]: | |
| """ | |
| Extract text from a PDF file. | |
| Parameters: | |
| file_path (str): Path to the PDF file. | |
| Returns: | |
| List[str]: Extracted text from the PDF. | |
| """ | |
| output = [] | |
| #for file in files: | |
| print(file) # .name | |
| pdf = PdfReader(file) #[i] .name[i] | |
| for page in pdf.pages: | |
| text = page.extract_text() | |
| text = text_regex_clean(text) | |
| output.append(text) | |
| return output | |
| def parse_docx(file_path): | |
| """ | |
| Reads the content of a .docx file and returns it as a string. | |
| Parameters: | |
| - file_path (str): Path to the .docx file. | |
| Returns: | |
| - str: Content of the .docx file. | |
| """ | |
| doc = Doc(file_path) | |
| full_text = [] | |
| for para in doc.paragraphs: | |
| para = text_regex_clean(para) | |
| full_text.append(para.text.replace(" ", " ").strip()) | |
| return '\n'.join(full_text) | |
| def parse_txt(file_path): | |
| """ | |
| Read text from a TXT, HTML, or MD file. | |
| Parameters: | |
| file_path (str): Path to the TXT, HTML, or MD file. | |
| Returns: | |
| str: Text content of the file. | |
| """ | |
| with open(file_path, 'r', encoding="utf-8") as file: | |
| file_contents = file.read().replace(" ", " ").strip() | |
| file_contents = text_regex_clean(file_contents) | |
| return file_contents | |
| def parse_markdown(file_path): | |
| """ | |
| Read text from a MD file. | |
| """ | |
| with open(file_path, 'r', encoding="utf-8") as file: | |
| file_contents = file.read().replace(" ", " ").strip() | |
| file_contents = text_regex_clean(file_contents) | |
| return file_contents | |
| def parse_html(page_url, div_filter="p"): | |
| """ | |
| Determine if the source is a web URL or a local HTML file, extract the content based on the div of choice. Also tries to extract dates (WIP) | |
| Parameters: | |
| page_url (str): The web URL or local file path. | |
| Returns: | |
| str: Extracted content. | |
| """ | |
| def is_web_url(s): | |
| """ | |
| Check if the input string is a web URL. | |
| """ | |
| return s.startswith("http://") or s.startswith("https://") | |
| def is_local_html_file(s): | |
| """ | |
| Check if the input string is a path to a local HTML file. | |
| """ | |
| return (s.endswith(".html") or s.endswith(".htm")) and os.path.isfile(s) | |
| def extract_text_from_source(source): | |
| """ | |
| Determine if the source is a web URL or a local HTML file, | |
| and then extract its content accordingly. | |
| Parameters: | |
| source (str): The web URL or local file path. | |
| Returns: | |
| str: Extracted content. | |
| """ | |
| if is_web_url(source): | |
| response = requests.get(source) | |
| response.raise_for_status() # Raise an HTTPError for bad responses | |
| return response.text.replace(" ", " ").strip() | |
| elif is_local_html_file(source): | |
| with open(source, 'r', encoding='utf-8') as file: | |
| file_out = file.read().replace | |
| return file_out | |
| else: | |
| raise ValueError("Input is neither a valid web URL nor a local HTML file path.") | |
| def clean_html_data(data, date_filter="", div_filt="p"): | |
| """ | |
| Extracts and cleans data from HTML content. | |
| Parameters: | |
| data (str): HTML content to be parsed. | |
| date_filter (str, optional): Date string to filter results. If set, only content with a date greater than this will be returned. | |
| div_filt (str, optional): HTML tag to search for text content. Defaults to "p". | |
| Returns: | |
| tuple: Contains extracted text and date as strings. Returns empty strings if not found. | |
| """ | |
| soup = BeautifulSoup(data, 'html.parser') | |
| # Function to exclude div with id "bar" | |
| def exclude_div_with_id_bar(tag): | |
| return tag.has_attr('id') and tag['id'] == 'related-links' | |
| text_elements = soup.find_all(div_filt) | |
| date_elements = soup.find_all(div_filt, {"class": "page-neutral-intro__meta"}) | |
| # Extract date | |
| date_out = "" | |
| if date_elements: | |
| date_out = re.search(">(.*?)<", str(date_elements[0])).group(1) | |
| date_dt = dateutil.parser.parse(date_out) | |
| if date_filter: | |
| date_filter_dt = dateutil.parser.parse(date_filter) | |
| if date_dt < date_filter_dt: | |
| return '', date_out | |
| # Extract text | |
| text_out_final = "" | |
| if text_elements: | |
| text_out_final = '\n'.join(paragraph.text for paragraph in text_elements) | |
| text_out_final = text_regex_clean(text_out_final) | |
| else: | |
| print(f"No elements found with tag '{div_filt}'. No text returned.") | |
| return text_out_final, date_out | |
| #page_url = "https://pypi.org/project/InstructorEmbedding/" #'https://www.ons.gov.uk/visualisations/censusareachanges/E09000022/index.html' | |
| html_text = extract_text_from_source(page_url) | |
| #print(page.text) | |
| texts = [] | |
| metadatas = [] | |
| clean_text, date = clean_html_data(html_text, date_filter="", div_filt=div_filter) | |
| texts.append(clean_text) | |
| metadatas.append({"source": page_url, "date":str(date)}) | |
| #print(metadatas) | |
| return texts, metadatas, page_url | |
| def get_file_path_end(file_path): | |
| match = re.search(r'(.*[\/\\])?(.+)$', file_path) | |
| filename_end = match.group(2) if match else '' | |
| return filename_end | |
| # + | |
| # Convert parsed text to docs | |
| # - | |
| def text_to_docs(text_dict: dict, chunk_size: int = chunk_size) -> List[Document]: | |
| """ | |
| Converts the output of parse_file (a dictionary of file paths to content) | |
| to a list of Documents with metadata. | |
| """ | |
| doc_sections = [] | |
| parent_doc_sections = [] | |
| for file_path, content in text_dict.items(): | |
| ext = os.path.splitext(file_path)[1].lower() | |
| # Depending on the file extension, handle the content | |
| if ext == '.pdf': | |
| docs, page_docs = pdf_text_to_docs(content, chunk_size=chunk_size) | |
| elif ext in ['.html', '.htm', '.txt', '.docx', '.md']: | |
| docs = html_text_to_docs(content, chunk_size=chunk_size) | |
| elif ext in ['.csv', '.xlsx']: | |
| docs, page_docs = csv_excel_text_to_docs(content, chunk_size=chunk_size) | |
| else: | |
| print(f"Unsupported file type {ext} for {file_path}. Skipping.") | |
| continue | |
| filename_end = get_file_path_end(file_path) | |
| #match = re.search(r'(.*[\/\\])?(.+)$', file_path) | |
| #filename_end = match.group(2) if match else '' | |
| # Add filename as metadata | |
| for doc in docs: doc.metadata["source"] = filename_end | |
| #for parent_doc in parent_docs: parent_doc.metadata["source"] = filename_end | |
| doc_sections.extend(docs) | |
| #parent_doc_sections.extend(parent_docs) | |
| return doc_sections#, page_docs | |
| def pdf_text_to_docs(text, chunk_size: int = chunk_size) -> List[Document]: | |
| """Converts a string or list of strings to a list of Documents | |
| with metadata.""" | |
| #print(text) | |
| if isinstance(text, str): | |
| # Take a single string as one page | |
| text = [text] | |
| page_docs = [Document(page_content=page, metadata={"page": page}) for page in text] | |
| # Add page numbers as metadata | |
| for i, doc in enumerate(page_docs): | |
| doc.metadata["page"] = i + 1 | |
| print("page docs are: ") | |
| print(page_docs) | |
| # Split pages into sections | |
| doc_sections = [] | |
| for doc in page_docs: | |
| #print("page content: ") | |
| #print(doc.page_content) | |
| if doc.page_content == '': | |
| sections = [''] | |
| else: | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| separators=split_strat,#["\n\n", "\n", ".", "!", "?", ",", " ", ""], | |
| chunk_overlap=chunk_overlap, | |
| add_start_index=True | |
| ) | |
| sections = text_splitter.split_text(doc.page_content) | |
| for i, section in enumerate(sections): | |
| doc = Document( | |
| page_content=section, metadata={"page": doc.metadata["page"], "section": i, "page_section": f"{doc.metadata['page']}-{i}"}) | |
| doc_sections.append(doc) | |
| return doc_sections, page_docs#, parent_doc | |
| def html_text_to_docs(texts:list[str], metadatas:dict={}, chunk_size:int = chunk_size): | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| separators=split_strat,#["\n\n", "\n", ".", "!", "?", ",", " ", ""], | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| length_function=len, | |
| add_start_index=True | |
| ) | |
| #print(texts) | |
| #print(metadatas) | |
| documents = text_splitter.create_documents(texts, metadatas=metadatas) | |
| for i, section in enumerate(documents): | |
| section.metadata["page_section"] = i + 1 | |
| return documents | |
| def write_out_metadata_as_string(metadata_in): | |
| # If metadata_in is a single dictionary, wrap it in a list | |
| if isinstance(metadata_in, dict): | |
| metadata_in = [metadata_in] | |
| metadata_string = [f"{' '.join(f'{k}: {v}' for k, v in d.items() if k != 'page_section')}" for d in metadata_in] # ['metadata'] | |
| return metadata_string | |
| def csv_excel_text_to_docs(df, text_column='text', chunk_size=None) -> List[Document]: | |
| """Converts a DataFrame's content to a list of Documents with metadata.""" | |
| doc_sections = [] | |
| df[text_column] = df[text_column].astype(str) # Ensure column is a string column | |
| # For each row in the dataframe | |
| for idx, row in df.iterrows(): | |
| # Extract the text content for the document | |
| doc_content = row[text_column] | |
| # Generate metadata containing other columns' data | |
| metadata = {"row": idx + 1} | |
| for col, value in row.items(): | |
| if col != text_column: | |
| metadata[col] = value | |
| metadata_string = write_out_metadata_as_string(metadata)[0] | |
| # If chunk_size is provided, split the text into chunks | |
| if chunk_size: | |
| # Assuming you have a text splitter function similar to the PDF handling | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| # Other arguments as required by the splitter | |
| ) | |
| sections = text_splitter.split_text(doc_content) | |
| # For each section, create a Document object | |
| for i, section in enumerate(sections): | |
| section = '. '.join([metadata_string, section]) | |
| doc = Document(page_content=section, | |
| metadata={**metadata, "section": i, "row_section": f"{metadata['row']}-{i}"}) | |
| doc_sections.append(doc) | |
| else: | |
| # If no chunk_size is provided, create a single Document object for the row | |
| doc_content = '. '.join([metadata_string, doc_content]) | |
| doc = Document(page_content=doc_content, metadata=metadata) | |
| doc_sections.append(doc) | |
| return doc_sections | |
| # # Functions for working with documents after loading them back in | |
| def pull_out_data(series): | |
| # define a lambda function to convert each string into a tuple | |
| to_tuple = lambda x: eval(x) | |
| # apply the lambda function to each element of the series | |
| series_tup = series.apply(to_tuple) | |
| series_tup_content = list(zip(*series_tup))[1] | |
| series = pd.Series(list(series_tup_content))#.str.replace("^Main post content", "", regex=True).str.strip() | |
| return series | |
| def docs_from_csv(df): | |
| import ast | |
| documents = [] | |
| page_content = pull_out_data(df["0"]) | |
| metadatas = pull_out_data(df["1"]) | |
| for x in range(0,len(df)): | |
| new_doc = Document(page_content=page_content[x], metadata=metadatas[x]) | |
| documents.append(new_doc) | |
| return documents | |
| def docs_from_lists(docs, metadatas): | |
| documents = [] | |
| for x, doc in enumerate(docs): | |
| new_doc = Document(page_content=doc, metadata=metadatas[x]) | |
| documents.append(new_doc) | |
| return documents | |
| def docs_elements_from_csv_save(docs_path="documents.csv"): | |
| documents = pd.read_csv(docs_path) | |
| docs_out = docs_from_csv(documents) | |
| out_df = pd.DataFrame(docs_out) | |
| docs_content = pull_out_data(out_df[0].astype(str)) | |
| docs_meta = pull_out_data(out_df[1].astype(str)) | |
| doc_sources = [d['source'] for d in docs_meta] | |
| return out_df, docs_content, docs_meta, doc_sources | |
| # ## Create embeddings and save faiss vector store to the path specified in `save_to` | |
| def load_embeddings_model(embeddings_model = EMBEDDINGS_MODEL_NAME): | |
| embeddings_func = HuggingFaceEmbeddings(model_name=embeddings_model) | |
| #global embeddings | |
| #embeddings = embeddings_func | |
| return embeddings_func | |
| # def embed_faiss_save_to_zip(docs_out, save_folder, embeddings_model_object, save_to="faiss_embeddings", model_name="mixedbread-ai/mxbai-embed-xsmall-v1"): | |
| # print(f"> Total split documents: {len(docs_out)}") | |
| # vectorstore = FAISS.from_documents(documents=docs_out, embedding=embeddings_model_object) | |
| # save_to_path = Path(save_folder, save_to) | |
| # save_to_path.mkdir(parents=True, exist_ok=True) | |
| # vectorstore.save_local(folder_path=str(save_to_path)) | |
| # print("> FAISS index saved") | |
| # print(f"> Saved to: {save_to}") | |
| # # Ensure files are written before archiving | |
| # index_faiss = save_to_path / "index.faiss" | |
| # index_pkl = save_to_path / "index.pkl" | |
| # if not index_faiss.exists() or not index_pkl.exists(): | |
| # raise FileNotFoundError("Expected FAISS index files not found before zipping.") | |
| # # Flush file system writes by forcing a sync (works best on Unix) | |
| # try: | |
| # os.sync() | |
| # except AttributeError: | |
| # pass # os.sync() not available on Windows | |
| # # Create ZIP archive | |
| # final_zip_path = shutil.make_archive(str(save_to_path), 'zip', root_dir=str(save_to_path)) | |
| # # Remove individual index files to avoid leaking large raw files | |
| # index_faiss.unlink(missing_ok=True) | |
| # index_pkl.unlink(missing_ok=True) | |
| # # Move ZIP inside the folder for easier reference | |
| # #final_zip_path = save_to_path.with_suffix('.zip') | |
| # print("> Archive complete") | |
| # print(f"> Final ZIP path: {final_zip_path}") | |
| # return "Document processing complete", vectorstore, final_zip_path | |
| def embed_faiss_save_to_zip( | |
| docs_out: List[Document], | |
| save_folder: str, | |
| embeddings_model_object: Embeddings, # Type hint for clarity | |
| save_to: str = "faiss_embeddings", | |
| model_name: str = "mixedbread-ai/mxbai-embed-xsmall-v1", # This is a descriptive name, not directly used in FAISS build | |
| progress: gr.Progress = gr.Progress(track_tqdm=True) | |
| ) -> Tuple[str, FAISS, Path]: | |
| print(f"> Total split documents: {len(docs_out)}") | |
| # --- Progress Bar Integration Starts Here --- | |
| print("Starting embedding generation and FAISS index construction...") | |
| texts = [] | |
| metadatas = [] | |
| vectors = [] | |
| docstore = InMemoryDocstore() | |
| index_to_docstore_id = {} # Maps FAISS index position to docstore ID | |
| if not docs_out: | |
| print("No documents provided. Skipping FAISS index creation.") | |
| return "No documents to process", None, None # Or handle as an error | |
| # 1. Generate Embeddings and Populate Data Structures with tqdm | |
| # Wrap the iteration over docs_out with tqdm for a progress bar | |
| for i, doc in tqdm(enumerate(docs_out), desc="Generating Embeddings", total=len(docs_out)): | |
| # Store text and metadata | |
| texts.append(doc.page_content) | |
| metadatas.append(doc.metadata) | |
| # Generate embedding for the current document | |
| # embeddings_model_object.embed_documents expects a list of strings | |
| # and returns a list of lists (embeddings). We take the first element. | |
| vector = embeddings_model_object.embed_documents([doc.page_content])[0] | |
| vectors.append(vector) | |
| # Populate the internal docstore that FAISS uses | |
| doc_id = str(uuid4()) # Generate a unique ID for each document | |
| docstore.add({doc_id: doc}) # Add the full Document object to the docstore | |
| index_to_docstore_id[i] = doc_id # Map FAISS index position (i) to its doc_id | |
| print("\nEmbedding generation complete. Building FAISS index...") | |
| # 2. Build the Raw FAISS Index | |
| # Ensure all embeddings are numpy float32, which FAISS expects. | |
| # BGE models (like bge-base-en-v1.5) typically produce L2-normalized embeddings, | |
| # which are ideal for Inner Product (IP) similarity, equivalent to cosine similarity. | |
| # If your model *does not* output normalized vectors and you want cosine similarity, | |
| # you must normalize them here: `np.array([v / np.linalg.norm(v) for v in vectors]).astype("float32")` | |
| # Otherwise, you might use IndexFlatL2 for Euclidean distance. | |
| # For common embedding models and cosine similarity, `IndexFlatIP` with pre-normalized vectors is standard. | |
| embeddings_np = np.array(vectors).astype("float32") | |
| embedding_dimension = embeddings_np.shape[1] | |
| # Create a raw FAISS index (e.g., IndexFlatIP for cosine similarity) | |
| raw_faiss_index = faiss.IndexFlatIP(embedding_dimension) | |
| raw_faiss_index.add(embeddings_np) # Add all vectors to the raw FAISS index | |
| # 3. Create the FAISS Vectorstore from the components | |
| # The `embedding_function` is used for subsequent queries to the vectorstore, | |
| # not for building the initial index here (as we've already done that). | |
| vectorstore = FAISS( | |
| embedding_function=embeddings_model_object.embed_query, | |
| index=raw_faiss_index, | |
| docstore=docstore, | |
| index_to_docstore_id=index_to_docstore_id | |
| ) | |
| # --- Progress Bar Integration Ends Here --- | |
| save_to_path = Path(save_folder, save_to) | |
| save_to_path.mkdir(parents=True, exist_ok=True) | |
| vectorstore.save_local(folder_path=str(save_to_path)) | |
| print("> FAISS index saved") | |
| print(f"> Saved to: {save_to}") | |
| # Ensure files are written before archiving | |
| index_faiss = save_to_path / "index.faiss" | |
| index_pkl = save_to_path / "index.pkl" | |
| if not index_faiss.exists() or not index_pkl.exists(): | |
| raise FileNotFoundError("Expected FAISS index files not found before zipping.") | |
| # Flush file system writes by forcing a sync (works best on Unix) | |
| try: | |
| os.sync() | |
| except AttributeError: | |
| pass # os.sync() not available on Windows | |
| # Create ZIP archive | |
| final_zip_path = shutil.make_archive(str(save_to_path), 'zip', root_dir=str(save_to_path)) | |
| # Remove individual index files to avoid leaking large raw files | |
| index_faiss.unlink(missing_ok=True) | |
| index_pkl.unlink(missing_ok=True) | |
| print("> Archive complete") | |
| print(f"> Final ZIP path: {final_zip_path}") | |
| return "Document processing complete", vectorstore, final_zip_path # Return Path object for consistency | |
| def get_faiss_store(zip_file_path: str, embeddings_model: Embeddings) -> FAISS: | |
| """ | |
| Loads a FAISS vector store from a ZIP archive. | |
| Args: | |
| zip_file_path: The string path pointing to the .zip archive containing | |
| index.faiss and index.pkl. This should be the | |
| final_zip_path returned by embed_faiss_save_to_zip. | |
| embeddings_model: The embeddings model object (e.g., OpenAIEmbeddings, HuggingFaceEmbeddings) | |
| used to create the index. This is crucial for proper deserialization. | |
| Returns: | |
| A FAISS vector store object. | |
| """ | |
| zip_file_path = Path(zip_file_path) | |
| if not zip_file_path.exists(): | |
| raise FileNotFoundError(f"ZIP archive not found at: {zip_file_path}") | |
| if not zip_file_path.suffix == '.zip': | |
| raise ValueError(f"Expected a .zip file, but got: {zip_file_path}") | |
| # Create a temporary directory to extract the FAISS index files | |
| # tempfile.TemporaryDirectory() handles cleanup automatically when the 'with' block exits. | |
| with tempfile.TemporaryDirectory() as temp_dir_str: | |
| temp_extract_path = Path(temp_dir_str) | |
| print(f"> Extracting {zip_file_path} to temporary directory: {temp_extract_path}") | |
| with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | |
| # The zip file contains 'index.faiss' and 'index.pkl' directly at its root. | |
| # So, extracting to temp_extract_path will place them as temp_extract_path/index.faiss | |
| zip_ref.extractall(temp_extract_path) | |
| # Verify that the files were extracted successfully | |
| extracted_faiss_file = temp_extract_path / "index.faiss" | |
| extracted_pkl_file = temp_extract_path / "index.pkl" | |
| if not extracted_faiss_file.exists() or not extracted_pkl_file.exists(): | |
| raise FileNotFoundError( | |
| f"Required FAISS index files (index.faiss, index.pkl) not found " | |
| f"in extracted location: {temp_extract_path}. " | |
| f"ZIP content might be structured unexpectedly." | |
| ) | |
| print("> Loading FAISS index from extracted files...") | |
| faiss_vstore = FAISS.load_local( | |
| folder_path=str(temp_extract_path), # FAISS.load_local expects a string path | |
| embeddings=embeddings_model, | |
| allow_dangerous_deserialization=True | |
| ) | |
| print("> FAISS index loaded successfully.") | |
| # The temporary directory and its contents are automatically removed here | |
| # when the `with tempfile.TemporaryDirectory()` block exits. | |
| # No need for manual os.remove() calls for index.faiss and index.pkl. | |
| return faiss_vstore | |
| # def sim_search_local_saved_vec(query, k_val, save_to="faiss_lambeth_census_embedding"): | |
| # load_embeddings() | |
| # docsearch = FAISS.load_local(folder_path=save_to, embeddings=embeddings) | |
| # display(Markdown(question)) | |
| # search = docsearch.similarity_search_with_score(query, k=k_val) | |
| # for item in search: | |
| # print(item[0].page_content) | |
| # print(f"Page: {item[0].metadata['source']}") | |
| # print(f"Date: {item[0].metadata['date']}") | |
| # print(f"Score: {item[1]}") | |
| # print("---") |