Spaces:
Runtime error
Runtime error
| import pinecone | |
| import os | |
| import json | |
| import ast | |
| import PyPDF2 | |
| import shutil | |
| import gradio as gr | |
| from tqdm import tqdm | |
| from pydantic import Field | |
| from typing import List, Optional | |
| from langchain.load.serializable import Serializable | |
| from langchain.vectorstores import Pinecone | |
| from config import PINECONE_API_KEY, PINECONE_ENVIRONMENT, INDEX_NAME, SAVE_DIR | |
| from config import EMBEDDING_API_BASE, EMBEDDING_API_KEY, OPENAI_API_TYPE, OPENAI_API_VERSION, EMBEDDING_DEPLOYMENT_ID | |
| from langchain.embeddings import OpenAIEmbeddings | |
| from langchain.text_splitter import TokenTextSplitter | |
| # initialize pinecone | |
| pinecone.init( | |
| api_key=PINECONE_API_KEY, # find at app.pinecone.io | |
| environment=PINECONE_ENVIRONMENT, # next to api key in console | |
| ) | |
| # Azure embedding model definition | |
| embeddings = OpenAIEmbeddings( | |
| deployment=EMBEDDING_DEPLOYMENT_ID, | |
| openai_api_key=EMBEDDING_API_KEY, | |
| openai_api_base=EMBEDDING_API_BASE, | |
| openai_api_type=OPENAI_API_TYPE, | |
| openai_api_version=OPENAI_API_VERSION, | |
| chunk_size=16 | |
| ) | |
| text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30) | |
| if INDEX_NAME and INDEX_NAME not in pinecone.list_indexes(): | |
| pinecone.create_index( | |
| INDEX_NAME, | |
| metric="cosine", | |
| dimension=1536 | |
| ) | |
| print(f"Index {INDEX_NAME} created successfully") | |
| index = pinecone.Index(INDEX_NAME) | |
| class Document(Serializable): | |
| """Class for storing a piece of text and associated metadata.""" | |
| page_content: str | |
| """String text.""" | |
| metadata: dict = Field(default_factory=dict) | |
| """Arbitrary metadata about the page content (e.g., source, relationships to other | |
| documents, etc.). | |
| """ | |
| def delete_all(): | |
| for files in os.listdir(SAVE_DIR): | |
| os.remove(os.path.join(SAVE_DIR, files)) | |
| index.delete(delete_all=True) | |
| message = "Delete all files succesfully" | |
| return gr.update(choices=[]), message, gr.Files.update(None) | |
| def delete_file(files_src): | |
| file_name = [] | |
| for files in files_src: | |
| os.remove(os.path.join(SAVE_DIR, files)) | |
| file_name.append(files) | |
| _filter = {"document_id": {"$in": file_name}} | |
| index.delete(filter=_filter) | |
| message = f"Delete {len(files_src)} files succesfully" | |
| return gr.update(choices=os.listdir(SAVE_DIR)), message, gr.Files.update(None) | |
| def upload_file(): | |
| vectorstore = Pinecone.from_existing_index(INDEX_NAME, embeddings) | |
| print(f"Load files from existing {INDEX_NAME}") | |
| return vectorstore | |
| def handle_upload_file(files): | |
| documents = get_documents(files) | |
| if len(documents)>0: | |
| Pinecone.from_documents(documents, embeddings, index_name=INDEX_NAME) | |
| message = f"Add files to {INDEX_NAME} sucessfully" | |
| print(message) | |
| else: | |
| message = f"Load files from existing {INDEX_NAME}" | |
| print(message) | |
| return message | |
| def update_file(): | |
| with open('data.json') as json_file: | |
| data = json.load(json_file) | |
| datas = ast.literal_eval(data) | |
| texts = [] | |
| for k, v in datas.items(): | |
| content = v["content"] | |
| post_url = v["post_url"] | |
| texts.append(Document(page_content=content, metadata={"source": post_url})) | |
| if len(texts)>0: | |
| Pinecone.from_documents(texts, embeddings, index_name=INDEX_NAME) | |
| message = f"Add facebook data to {INDEX_NAME} sucessfully" | |
| return message | |
| def get_documents(file_src): | |
| documents = [] | |
| if file_src is None: | |
| return documents | |
| for file in file_src: | |
| filepath = file.name | |
| filename = os.path.basename(filepath) | |
| file_type = os.path.splitext(filename)[1] | |
| if filename in os.listdir(SAVE_DIR): | |
| continue | |
| else: | |
| shutil.copy(filepath, os.path.join(SAVE_DIR, filename)) | |
| try: | |
| if file_type == ".pdf": | |
| pdftext = "" | |
| with open(filepath, "rb") as pdfFileObj: | |
| pdf_reader = PyPDF2.PdfReader(pdfFileObj) | |
| for page in tqdm(pdf_reader.pages): | |
| pdftext += page.extract_text() | |
| texts = [Document(page_content=pdftext, metadata={"source": filepath})] | |
| elif file_type == ".docx": | |
| from langchain.document_loaders import UnstructuredWordDocumentLoader | |
| loader = UnstructuredWordDocumentLoader(filepath) | |
| texts = loader.load() | |
| elif file_type == ".pptx": | |
| from langchain.document_loaders import UnstructuredPowerPointLoader | |
| loader = UnstructuredPowerPointLoader(filepath) | |
| texts = loader.load() | |
| else: | |
| from langchain.document_loaders import TextLoader | |
| loader = TextLoader(filepath, "utf8") | |
| texts = loader.load() | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| texts = text_splitter.split_documents(texts) | |
| documents.extend(texts) | |
| return documents | |
| if __name__ == "__main__": | |
| upload_file(["STANDARD_SOFTWARE LIFECYCLES.pdf"]) |