Spaces:
Runtime error
Runtime error
| import abc | |
| import logging | |
| import threading | |
| from pathlib import Path | |
| from typing import Any | |
| from llama_index import ( | |
| Document, | |
| ServiceContext, | |
| StorageContext, | |
| VectorStoreIndex, | |
| load_index_from_storage, | |
| ) | |
| from llama_index.data_structs import IndexDict | |
| from llama_index.indices.base import BaseIndex | |
| from app.components.ingest.helpers import IngestionHelper | |
| from app.paths import local_data_path | |
| logger = logging.getLogger(__name__) | |
| class BaseIngestComponent(abc.ABC): | |
| def __init__( | |
| self, | |
| storage_context: StorageContext, | |
| service_context: ServiceContext, | |
| *args: Any, | |
| **kwargs: Any, | |
| ) -> None: | |
| logger.debug(f"Initializing base ingest component type={type(self).__name__}") | |
| self.storage_context = storage_context | |
| self.service_context = service_context | |
| def ingest(self, file_name: str, file_data: Path) -> list[Document]: | |
| pass | |
| def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: | |
| pass | |
| def delete(self, doc_id: str) -> None: | |
| pass | |
| class BaseIngestComponentWithIndex(BaseIngestComponent, abc.ABC): | |
| def __init__( | |
| self, | |
| storage_context: StorageContext, | |
| service_context: ServiceContext, | |
| *args: Any, | |
| **kwargs: Any, | |
| ) -> None: | |
| super().__init__(storage_context, service_context, *args, **kwargs) | |
| self.show_progress = True | |
| self._index_thread_lock = ( | |
| threading.Lock() | |
| ) # Thread lock! Not Multiprocessing lock | |
| self._index = self._initialize_index() | |
| def _initialize_index(self) -> BaseIndex[IndexDict]: | |
| """Initialize the index from the storage context.""" | |
| try: | |
| # Load the index with store_nodes_override=True to be able to delete them | |
| index = load_index_from_storage( | |
| storage_context=self.storage_context, | |
| service_context=self.service_context, | |
| store_nodes_override=True, # Force store nodes in index and document stores | |
| show_progress=self.show_progress, | |
| ) | |
| except ValueError: | |
| # There are no index in the storage context, creating a new one | |
| logger.info("Creating a new vector store index") | |
| index = VectorStoreIndex.from_documents( | |
| [], | |
| storage_context=self.storage_context, | |
| service_context=self.service_context, | |
| store_nodes_override=True, # Force store nodes in index and document stores | |
| show_progress=self.show_progress, | |
| ) | |
| index.storage_context.persist(persist_dir=local_data_path) | |
| return index | |
| def _save_index(self) -> None: | |
| self._index.storage_context.persist(persist_dir=local_data_path) | |
| def delete(self, doc_id: str) -> None: | |
| with self._index_thread_lock: | |
| # Delete the document from the index | |
| self._index.delete_ref_doc(doc_id, delete_from_docstore=True) | |
| # Save the index | |
| self._save_index() | |
| class SimpleIngestComponent(BaseIngestComponentWithIndex): | |
| def __init__( | |
| self, | |
| storage_context: StorageContext, | |
| service_context: ServiceContext, | |
| *args: Any, | |
| **kwargs: Any, | |
| ) -> None: | |
| super().__init__(storage_context, service_context, *args, **kwargs) | |
| def ingest(self, file_name: str, file_data: Path) -> list[Document]: | |
| logger.info("Ingesting file_name=%s", file_name) | |
| documents = IngestionHelper.transform_file_into_documents(file_name, file_data) | |
| logger.info( | |
| "Transformed file=%s into count=%s documents", file_name, len(documents) | |
| ) | |
| logger.debug("Saving the documents in the index and doc store") | |
| return self._save_docs(documents) | |
| def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]: | |
| saved_documents = [] | |
| for file_name, file_data in files: | |
| documents = IngestionHelper.transform_file_into_documents( | |
| file_name, file_data | |
| ) | |
| saved_documents.extend(self._save_docs(documents)) | |
| return saved_documents | |
| def _save_docs(self, documents: list[Document]) -> list[Document]: | |
| logger.debug("Transforming count=%s documents into nodes", len(documents)) | |
| with self._index_thread_lock: | |
| for document in documents: | |
| self._index.insert(document, show_progress=True) | |
| logger.debug("Persisting the index and nodes") | |
| # persist the index and nodes | |
| self._save_index() | |
| logger.debug("Persisted the index and nodes") | |
| return documents | |
| def get_ingestion_component( | |
| storage_context: StorageContext, | |
| service_context: ServiceContext, | |
| ) -> BaseIngestComponent: | |
| return SimpleIngestComponent(storage_context, service_context) | |