Spaces:
Runtime error
Runtime error
| import pinecone | |
| import os | |
| import PyPDF2 | |
| import gradio as gr | |
| from tqdm import tqdm | |
| from pydantic import Field | |
| from langchain.load.serializable import Serializable | |
| # from langchain.vectorstores import Pinecone | |
| from custom_vectordb import Pinecone | |
| from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, CONNECTION_STRING, CONTAINER_NAME, NAME_SPACE_1, NAME_SPACE_2 | |
| from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID | |
| from langchain.embeddings import OpenAIEmbeddings | |
| from langchain.text_splitter import TokenTextSplitter | |
| from azure.storage.blob import BlobServiceClient | |
| # initialize pinecone | |
| pinecone.init( | |
| api_key=PINECONE_API_KEY, # find at app.pinecone.io | |
| environment=PINECONE_ENVIRONMENT, # next to api key in console | |
| ) | |
| # Azure embedding model definition | |
| embeddings = OpenAIEmbeddings( | |
| deployment=EMBEDDING_DEPLOYMENT_ID, | |
| openai_api_key=EMBEDDING_API_KEY, | |
| openai_api_base=EMBEDDING_API_BASE, | |
| openai_api_type=OPENAI_API_TYPE, | |
| openai_api_version=OPENAI_API_VERSION, | |
| chunk_size=16 | |
| ) | |
| text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30) | |
| if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes(): | |
| pinecone.create_index( | |
| INDEX_NAME, | |
| metric="cosine", | |
| dimension=1536 | |
| ) | |
| print(f"Index {INDEX_NAME} created successfully") | |
| index = pinecone.Index(INDEX_NAME) | |
| blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING) | |
| class Document(Serializable): | |
| """Class for storing a piece of text and associated metadata.""" | |
| page_content: str | |
| """String text.""" | |
| metadata: dict = Field(default_factory=dict) | |
| """Arbitrary metadata about the page content (e.g., source, relationships to other | |
| documents, etc.). | |
| """ | |
| # def update_fb(): | |
| # with open('data.json') as json_file: | |
| # data = json.load(json_file) | |
| # datas = ast.literal_eval(data) | |
| # texts = [] | |
| # for k, v in datas.items(): | |
| # content = v["content"].split("-----")[0] + "\nimage_link: " + str(v["image"]) | |
| # post_url = v["post_url"] | |
| # texts.append(Document(page_content=content, metadata={"source": post_url})) | |
| # if len(texts)>0: | |
| # Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME, namespace=NAME_SPACE_2) | |
| # message = f"Add facebook data to space {NAME_SPACE_2} in {INDEX_NAME} sucessfully" | |
| # return message | |
| def upload_files_blob(file_path): | |
| file_name = os.path.basename(file_path) | |
| blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=file_name) | |
| with open(file_path,'rb') as data: | |
| blob_client.upload_blob(data) | |
| print(f"Uploaded {file_name}.") | |
| def load_files_blob(): | |
| container_client = blob_service_client.get_container_client(CONTAINER_NAME) | |
| files_name = [] | |
| for blob in container_client.list_blobs(): | |
| files_name.append(blob.name) | |
| return files_name | |
| def delete_blob(blob_name): | |
| # Get container client | |
| container_client = blob_service_client.get_container_client(CONTAINER_NAME) | |
| container_client.delete_blob(blob_name) | |
| print(f"Deleted {blob_name}") | |
| def delete_all(): | |
| container_client = blob_service_client.get_container_client(CONTAINER_NAME) | |
| blob_list = container_client.list_blobs() | |
| for blob in blob_list: | |
| container_client.delete_blob(blob.name) | |
| index.delete(delete_all=True, namespace=NAME_SPACE_1) | |
| message = f"Delete all files in space {NAME_SPACE_1} succesfully" | |
| return gr.update(choices=[]), message, gr.Files.update(None) | |
| def delete_file(files_src): | |
| file_name = [] | |
| for files in files_src: | |
| delete_blob(files) | |
| file_name.append(files) | |
| _filter = {"source": {"$in": file_name}} | |
| index.delete(filter=_filter, namespace=NAME_SPACE_1) | |
| message = f"Delete {len(files_src)} files in space {NAME_SPACE_1} files succesfully" | |
| available_files = load_files_blob() | |
| return gr.update(choices=available_files), message, gr.Files.update(None) | |
| def upload_file(check_box): | |
| if check_box: | |
| namespace = NAME_SPACE_1 | |
| else: | |
| namespace = NAME_SPACE_2 | |
| vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings, namespace=namespace) | |
| print(f"Load files from space {namespace} in {INDEX_NAME}") | |
| return vectorstore | |
| def handle_upload_file(files): | |
| documents = get_documents(files) | |
| if len(documents)>0: | |
| Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME, namespace=NAME_SPACE_1) | |
| message = f"Add files to space {NAME_SPACE_1} in {INDEX_NAME} sucessfully" | |
| print(message) | |
| else: | |
| message = f"Load files from space existing {NAME_SPACE_1} in {INDEX_NAME}" | |
| print(message) | |
| return message | |
| def get_documents(file_src): | |
| documents = [] | |
| if file_src is None: | |
| return documents | |
| available_files = load_files_blob() | |
| for file in file_src: | |
| filepath = file.name | |
| filename = os.path.basename(filepath) | |
| file_type = os.path.splitext(filename)[1] | |
| if filename in available_files: | |
| continue | |
| else: | |
| upload_files_blob(filepath) | |
| try: | |
| if file_type == ".pdf": | |
| pdftext = "" | |
| with open(filepath, "rb") as pdfFileObj: | |
| pdf_reader = PyPDF2.PdfReader(pdfFileObj) | |
| for page in tqdm(pdf_reader.pages): | |
| pdftext += page.extract_text() | |
| texts = [Document(page_content=pdftext, metadata={"source": filename})] | |
| elif file_type == ".docx": | |
| from langchain.document_loaders import UnstructuredWordDocumentLoader | |
| loader = UnstructuredWordDocumentLoader(filepath) | |
| texts = loader.load() | |
| elif file_type == ".pptx": | |
| from langchain.document_loaders import UnstructuredPowerPointLoader | |
| loader = UnstructuredPowerPointLoader(filepath) | |
| texts = loader.load() | |
| else: | |
| from langchain.document_loaders import TextLoader | |
| loader = TextLoader(filepath, "utf8") | |
| texts = loader.load() | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| texts = text_splitter.split_documents(texts) | |
| documents.extend(texts) | |
| return documents | |
| if __name__ == "__main__": | |
| upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"]) |