Spaces:
Runtime error
Runtime error
| import logging | |
| import tempfile | |
| from pathlib import Path | |
| from typing import AnyStr, BinaryIO | |
| from llama_index import ServiceContext, StorageContext | |
| from llama_index.node_parser import SentenceWindowNodeParser | |
| from app.components.embedding.component import EmbeddingComponent | |
| from app.components.ingest.component import get_ingestion_component | |
| from app.components.llm.component import LLMComponent | |
| from app.components.node_store.component import NodeStoreComponent | |
| from app.components.vector_store.component import VectorStoreComponent | |
| from app.server.ingest.schemas import IngestedDoc | |
| logger = logging.getLogger(__name__) | |
| class IngestService: | |
| def __init__( | |
| self, | |
| llm_component: LLMComponent, | |
| vector_store_component: VectorStoreComponent, | |
| embedding_component: EmbeddingComponent, | |
| node_store_component: NodeStoreComponent, | |
| ) -> None: | |
| self.llm_service = llm_component | |
| self.storage_context = StorageContext.from_defaults( | |
| vector_store=vector_store_component.vector_store, | |
| docstore=node_store_component.doc_store, | |
| index_store=node_store_component.index_store, | |
| ) | |
| node_parser = SentenceWindowNodeParser.from_defaults() | |
| self.ingest_service_context = ServiceContext.from_defaults( | |
| llm=self.llm_service.llm, | |
| embed_model=embedding_component.embedding_model, | |
| node_parser=node_parser, | |
| # Embeddings done early in the pipeline of node transformations, right | |
| # after the node parsing | |
| transformations=[node_parser, embedding_component.embedding_model], | |
| ) | |
| self.ingest_component = get_ingestion_component( | |
| self.storage_context, self.ingest_service_context | |
| ) | |
| def _ingest_data(self, file_name: str, file_data: AnyStr) -> list[IngestedDoc]: | |
| logger.debug(f"Got file data of size={len(file_data)} to ingest") | |
| # llama-index mainly supports reading from files, so | |
| # we have to create a tmp file to read for it to work | |
| # delete=False to avoid a Windows 11 permission error. | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
| try: | |
| path_to_tmp = Path(tmp.name) | |
| if isinstance(file_data, bytes): | |
| path_to_tmp.write_bytes(file_data) | |
| else: | |
| path_to_tmp.write_text(str(file_data)) | |
| return self.ingest_file(file_name, path_to_tmp) | |
| finally: | |
| tmp.close() | |
| path_to_tmp.unlink() | |
| def ingest_file(self, file_name: str, file_data: Path) -> list[IngestedDoc]: | |
| logger.info(f"Ingesting file_name={file_name}") | |
| documents = self.ingest_component.ingest(file_name, file_data) | |
| logger.info(f"Finished ingestion file_name={file_name}") | |
| return [IngestedDoc.from_document(document) for document in documents] | |
| def ingest_text(self, file_name: str, text: str) -> list[IngestedDoc]: | |
| logger.debug(f"Ingesting text data with file_name={file_name}") | |
| return self._ingest_data(file_name, text) | |
| def ingest_bin_data( | |
| self, file_name: str, raw_file_data: BinaryIO | |
| ) -> list[IngestedDoc]: | |
| logger.debug(f"Ingesting binary data with file_name={file_name}") | |
| file_data = raw_file_data.read() | |
| return self._ingest_data(file_name, file_data) | |
| def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[IngestedDoc]: | |
| logger.info(f"Ingesting file_names={[f[0] for f in files]}") | |
| documents = self.ingest_component.bulk_ingest(files) | |
| logger.info(f"Finished ingestion file_name={[f[0] for f in files]}") | |
| return [IngestedDoc.from_document(document) for document in documents] | |
| def list_ingested(self) -> list[IngestedDoc]: | |
| ingested_docs = [] | |
| try: | |
| docstore = self.storage_context.docstore | |
| ingested_docs_ids: set[str] = set() | |
| for node in docstore.docs.values(): | |
| if node.ref_doc_id is not None: | |
| ingested_docs_ids.add(node.ref_doc_id) | |
| for doc_id in ingested_docs_ids: | |
| ref_doc_info = docstore.get_ref_doc_info(ref_doc_id=doc_id) | |
| doc_metadata = None | |
| if ref_doc_info is not None and ref_doc_info.metadata is not None: | |
| doc_metadata = IngestedDoc.curate_metadata(ref_doc_info.metadata) | |
| ingested_docs.append( | |
| IngestedDoc( | |
| object="ingest.document", | |
| doc_id=doc_id, | |
| doc_metadata=doc_metadata, | |
| ) | |
| ) | |
| except ValueError: | |
| logger.warning("Got an exception when getting list of docs", exc_info=True) | |
| pass | |
| logger.debug(f"Found count={len(ingested_docs)} ingested documents") | |
| return ingested_docs | |
| def delete(self, doc_id: str) -> None: | |
| """Delete an ingested document. | |
| :raises ValueError: if the document does not exist | |
| """ | |
| logger.info( | |
| "Deleting the ingested document=%s in the doc and index store", doc_id | |
| ) | |
| self.ingest_component.delete(doc_id) | |