from tinytroupe.utils import JsonSerializableRegistry import tinytroupe.utils as utils from tinytroupe.agent import logger from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Document, StorageContext, load_index_from_storage from llama_index.core.vector_stores import SimpleVectorStore from llama_index.readers.web import SimpleWebPageReader import json import tempfile import os import shutil ####################################################################################################################### # Grounding connectors ####################################################################################################################### class GroundingConnector(JsonSerializableRegistry): """ An abstract class representing a grounding connector. A grounding connector is a component that allows an agent to ground its knowledge in external sources, such as files, web pages, databases, etc. """ serializable_attributes = ["name"] def __init__(self, name:str) -> None: self.name = name def retrieve_relevant(self, relevance_target:str, source:str, top_k=20) -> list: raise NotImplementedError("Subclasses must implement this method.") def retrieve_by_name(self, name:str) -> str: raise NotImplementedError("Subclasses must implement this method.") def list_sources(self) -> list: raise NotImplementedError("Subclasses must implement this method.") @utils.post_init class BaseSemanticGroundingConnector(GroundingConnector): """ A base class for semantic grounding connectors. A semantic grounding connector is a component that indexes and retrieves documents based on so-called "semantic search" (i.e, embeddings-based search). This specific implementation is based on the VectorStoreIndex class from the LLaMa-Index library. Here, "documents" refer to the llama-index's data structure that stores a unit of content, not necessarily a file. """ serializable_attributes = ["documents", "index"] # needs custom deserialization to handle Pydantic models (Document is a Pydantic model) custom_deserializers = {"documents": lambda docs_json: [Document.from_json(doc_json) for doc_json in docs_json], "index": lambda index_json: BaseSemanticGroundingConnector._deserialize_index(index_json)} custom_serializers = {"documents": lambda docs: [doc.to_json() for doc in docs] if docs is not None else None, "index": lambda index: BaseSemanticGroundingConnector._serialize_index(index)} def __init__(self, name:str="Semantic Grounding") -> None: super().__init__(name) self.documents = None self.name_to_document = None self.index = None # @post_init ensures that _post_init is called after the __init__ method def _post_init(self): """ This will run after __init__, since the class has the @post_init decorator. It is convenient to separate some of the initialization processes to make deserialize easier. """ self.index = None if not hasattr(self, 'documents') or self.documents is None: self.documents = [] if not hasattr(self, 'name_to_document') or self.name_to_document is None: self.name_to_document = {} if hasattr(self, 'documents') and self.documents is not None: for document in self.documents: # if the document has a semantic memory ID, we use it as the identifier name = document.metadata.get("semantic_memory_id", document.id_) # self.name_to_document[name] contains a list, since each source file could be split into multiple pages if name in self.name_to_document: self.name_to_document[name].append(document) else: self.name_to_document[name] = [document] # Rebuild index from documents if it's None or invalid if self.index is None and self.documents: logger.warning("No index found. Rebuilding index from documents.") vector_store = SimpleVectorStore() self.index = VectorStoreIndex.from_documents( self.documents, vector_store=vector_store, store_nodes_override=True ) # TODO remove? #self.add_documents(self.documents) @staticmethod def _serialize_index(index): """Helper function to serialize index with proper storage context""" if index is None: return None try: # Create a temporary directory to store the index with tempfile.TemporaryDirectory() as temp_dir: # Persist the index to the temporary directory index.storage_context.persist(persist_dir=temp_dir) # Read all the persisted files and store them in a dictionary persisted_data = {} for filename in os.listdir(temp_dir): filepath = os.path.join(temp_dir, filename) if os.path.isfile(filepath): with open(filepath, 'r', encoding="utf-8", errors="replace") as f: persisted_data[filename] = f.read() return persisted_data except Exception as e: logger.warning(f"Failed to serialize index: {e}") return None @staticmethod def _deserialize_index(index_data): """Helper function to deserialize index with proper error handling""" if not index_data: return None try: # Create a temporary directory to restore the index with tempfile.TemporaryDirectory() as temp_dir: # Write all the persisted files to the temporary directory for filename, content in index_data.items(): filepath = os.path.join(temp_dir, filename) with open(filepath, 'w', encoding="utf-8", errors="replace") as f: f.write(content) # Load the index from the temporary directory storage_context = StorageContext.from_defaults(persist_dir=temp_dir) index = load_index_from_storage(storage_context) return index except Exception as e: # If deserialization fails, return None # The index will be rebuilt from documents in _post_init logger.warning(f"Failed to deserialize index: {e}. Index will be rebuilt.") return None def retrieve_relevant(self, relevance_target:str, top_k=20) -> list: """ Retrieves all values from memory that are relevant to a given target. """ # Handle empty or None query if not relevance_target or not relevance_target.strip(): return [] if self.index is not None: retriever = self.index.as_retriever(similarity_top_k=top_k) nodes = retriever.retrieve(relevance_target) else: nodes = [] retrieved = [] for node in nodes: content = "SOURCE: " + node.metadata.get('file_name', '(unknown)') content += "\n" + "SIMILARITY SCORE:" + str(node.score) content += "\n" + "RELEVANT CONTENT:" + node.text retrieved.append(content) logger.debug(f"Content retrieved: {content[:200]}") return retrieved def retrieve_by_name(self, name:str) -> list: """ Retrieves a content source by its name. """ # TODO also optionally provide a relevance target? results = [] if self.name_to_document is not None and name in self.name_to_document: docs = self.name_to_document[name] for i, doc in enumerate(docs): if doc is not None: content = f"SOURCE: {name}\n" content += f"PAGE: {i}\n" content += "CONTENT: \n" + doc.text[:10000] # TODO a more intelligent way to limit the content results.append(content) return results def list_sources(self) -> list: """ Lists the names of the available content sources. """ if self.name_to_document is not None: return list(self.name_to_document.keys()) else: return [] def add_document(self, document) -> None: """ Indexes a document for semantic retrieval. Assumes the document has a metadata field called "semantic_memory_id" that is used to identify the document within Semantic Memory. """ self.add_documents([document]) def add_documents(self, new_documents) -> list: """ Indexes documents for semantic retrieval. """ # index documents by name if len(new_documents) > 0: # process documents individually too for document in new_documents: logger.debug(f"Adding document {document} to index, text is: {document.text}") # out of an abundance of caution, we sanitize the text document.text = utils.sanitize_raw_string(document.text) logger.debug(f"Document text after sanitization: {document.text}") # add the new document to the list of documents after all sanitization and checks self.documents.append(document) if document.metadata.get("semantic_memory_id") is not None: # if the document has a semantic memory ID, we use it as the identifier name = document.metadata["semantic_memory_id"] # Ensure name_to_document is initialized if not hasattr(self, 'name_to_document') or self.name_to_document is None: self.name_to_document = {} # self.name_to_document[name] contains a list, since each source file could be split into multiple pages if name in self.name_to_document: self.name_to_document[name].append(document) else: self.name_to_document[name] = [document] # index documents for semantic retrieval if self.index is None: # Create storage context with vector store vector_store = SimpleVectorStore() storage_context = StorageContext.from_defaults(vector_store=vector_store) self.index = VectorStoreIndex.from_documents( self.documents, storage_context=storage_context, store_nodes_override=True # This ensures nodes (with text) are stored ) else: self.index.refresh(self.documents) @staticmethod def _set_internal_id_to_documents(documents:list, external_attribute_name:str ="file_name") -> None: """ Sets the internal ID for each document in the list of documents. This is useful to ensure that each document has a unique identifier. """ for doc in documents: if not hasattr(doc, 'metadata'): doc.metadata = {} doc.metadata["semantic_memory_id"] = doc.metadata.get(external_attribute_name, doc.id_) return documents @utils.post_init class LocalFilesGroundingConnector(BaseSemanticGroundingConnector): serializable_attributes = ["folders_paths"] def __init__(self, name:str="Local Files", folders_paths: list=None) -> None: super().__init__(name) self.folders_paths = folders_paths # @post_init ensures that _post_init is called after the __init__ method def _post_init(self): """ This will run after __init__, since the class has the @post_init decorator. It is convenient to separate some of the initialization processes to make deserialize easier. """ self.loaded_folders_paths = [] if not hasattr(self, 'folders_paths') or self.folders_paths is None: self.folders_paths = [] self.add_folders(self.folders_paths) def add_folders(self, folders_paths:list) -> None: """ Adds a path to a folder with files used for grounding. """ if folders_paths is not None: for folder_path in folders_paths: try: logger.debug(f"Adding the following folder to grounding index: {folder_path}") self.add_folder(folder_path) except (FileNotFoundError, ValueError) as e: print(f"Error: {e}") print(f"Current working directory: {os.getcwd()}") print(f"Provided path: {folder_path}") print("Please check if the path exists and is accessible.") def add_folder(self, folder_path:str) -> None: """ Adds a path to a folder with files used for grounding. """ if folder_path not in self.loaded_folders_paths: self._mark_folder_as_loaded(folder_path) # for PDF files, please note that the document will be split into pages: https://github.com/run-llama/llama_index/issues/15903 new_files = SimpleDirectoryReader(folder_path).load_data() BaseSemanticGroundingConnector._set_internal_id_to_documents(new_files, "file_name") self.add_documents(new_files) def add_file_path(self, file_path:str) -> None: """ Adds a path to a file used for grounding. """ # a trick to make SimpleDirectoryReader work with a single file new_files = SimpleDirectoryReader(input_files=[file_path]).load_data() logger.debug(f"Adding the following file to grounding index: {new_files}") BaseSemanticGroundingConnector._set_internal_id_to_documents(new_files, "file_name") def _mark_folder_as_loaded(self, folder_path:str) -> None: if folder_path not in self.loaded_folders_paths: self.loaded_folders_paths.append(folder_path) if folder_path not in self.folders_paths: self.folders_paths.append(folder_path) @utils.post_init class WebPagesGroundingConnector(BaseSemanticGroundingConnector): serializable_attributes = ["web_urls"] def __init__(self, name:str="Web Pages", web_urls: list=None) -> None: super().__init__(name) self.web_urls = web_urls # @post_init ensures that _post_init is called after the __init__ method def _post_init(self): self.loaded_web_urls = [] if not hasattr(self, 'web_urls') or self.web_urls is None: self.web_urls = [] # load web urls self.add_web_urls(self.web_urls) def add_web_urls(self, web_urls:list) -> None: """ Adds the data retrieved from the specified URLs to grounding. """ filtered_web_urls = [url for url in web_urls if url not in self.loaded_web_urls] for url in filtered_web_urls: self._mark_web_url_as_loaded(url) if len(filtered_web_urls) > 0: new_documents = SimpleWebPageReader(html_to_text=True).load_data(filtered_web_urls) BaseSemanticGroundingConnector._set_internal_id_to_documents(new_documents, "url") self.add_documents(new_documents) def add_web_url(self, web_url:str) -> None: """ Adds the data retrieved from the specified URL to grounding. """ # we do it like this because the add_web_urls could run scrapes in parallel, so it is better # to implement this one in terms of the other self.add_web_urls([web_url]) def _mark_web_url_as_loaded(self, web_url:str) -> None: if web_url not in self.loaded_web_urls: self.loaded_web_urls.append(web_url) if web_url not in self.web_urls: self.web_urls.append(web_url)