Spaces:
Runtime error
Runtime error
| from llama_index.core.ingestion import IngestionPipeline | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| from config import PINECONE_CONFIG | |
| from pinecone.grpc import PineconeGRPC as Pinecone | |
| # from service.reader import Reader | |
| from script.get_metadata import Metadata | |
| from fastapi import UploadFile, status | |
| from fastapi.responses import JSONResponse | |
| from llama_index.core.node_parser import ( | |
| SentenceSplitter, | |
| SemanticSplitterNodeParser, | |
| ) | |
| from service.reader_v3 import upload_file | |
| # from script.get_topic import extract_topic | |
| import logging | |
| import random | |
| class Uploader: | |
| # def __init__(self, reference, file: UploadFile, content_table: UploadFile): | |
| def __init__(self, reference, file: UploadFile): | |
| self.file = file | |
| # self.content_table = content_table | |
| # self.reader = Reader() | |
| self.reference = reference | |
| self.metadata = Metadata(reference) | |
| # async def ingest_documents(self, file: UploadFile): | |
| # """Load documents from the storage path.""" | |
| # documents = await self.reader.read_from_uploadfile(file) | |
| # print("Banyak document : ", len(documents)) | |
| # print("document successfully ingested") | |
| # return documents | |
| def check_existing_metadata(self, pinecone_index, title, random_vector): | |
| try: | |
| result = pinecone_index.query( | |
| vector=random_vector, | |
| top_k=1, | |
| filter={ | |
| "title": {"$eq": title}, | |
| }, | |
| ) | |
| return result["matches"] | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| content=f"Error check existing metadata {str(e)}", | |
| ) | |
| async def process_documents(self): | |
| # Ingest documents | |
| # documents = await self.ingest_documents(self.file) | |
| # Get metadata | |
| # documents_with_metadata = self.metadata.apply_metadata(documents) | |
| documents_with_metadata, file_stream = await upload_file(self.reference, self.file) | |
| # Get Topic | |
| # topic_extractor = extract_topic(self.reference, self.content_table) | |
| # document_filtered = self.filter_document(documents_with_metadata) | |
| embed_model = OpenAIEmbedding() | |
| # Set up the ingestion pipeline | |
| pipeline = IngestionPipeline( | |
| transformations=[ | |
| SemanticSplitterNodeParser( | |
| buffer_size=1, | |
| breakpoint_percentile_threshold=95, | |
| embed_model=embed_model, | |
| ), | |
| # topic_extractor, | |
| ] | |
| ) | |
| # splitter = SemanticSplitterNodeParser( | |
| # buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model | |
| # ) | |
| # Run the pipeline | |
| try: | |
| nodes_with_metadata = pipeline.run(documents=documents_with_metadata) | |
| # nodes_with_metadata = splitter.get_nodes_from_documents(documents_with_metadata) | |
| return nodes_with_metadata, file_stream | |
| except Exception as e: | |
| try: | |
| # If the first method fails, fallback to sentence splitter | |
| sentence_splitter = SentenceSplitter(chunk_size=512) | |
| nodes_with_metadata = sentence_splitter.get_nodes_from_documents( | |
| documents_with_metadata | |
| ) | |
| print("Pipeline processing completed with SentenceSplitter fallback.") | |
| return nodes_with_metadata | |
| except Exception as fallback_error: | |
| # Log the second error and return JSONResponse for FastAPI | |
| logging.error(f"Error with SentenceSplitter fallback: {fallback_error}") | |
| return JSONResponse( | |
| status_code=500, | |
| content="An internal server error occurred during pipeline processing.", | |
| ) | |
| def filter_document(self, documents): | |
| api_key = PINECONE_CONFIG.PINECONE_API_KEY | |
| client = Pinecone(api_key=api_key) | |
| pinecone_index = client.Index("test") | |
| random_vector = [random.uniform(0, 1) for _ in range(1536)] | |
| filtered_documents = [] | |
| for doc in documents: | |
| result = self.check_existing_metadata( | |
| pinecone_index, doc.metadata["title"], random_vector | |
| ) | |
| if len(result) == 0: | |
| filtered_documents.append(doc) | |
| return filtered_documents | |