Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import zipfile | |
| import uuid | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.document_loaders import WhatsAppChatLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from dotenv import load_dotenv | |
| from pinecone import Pinecone, ServerlessSpec | |
| load_dotenv() | |
| # Initialize Pinecone and the index outside the function | |
| pinecone_key = os.getenv("PINECONE_API_KEY") | |
| pc = Pinecone(api_key=pinecone_key) | |
| index_name = "whatsapp-chat-index-1" | |
| if 'index_name' not in pc.list_indexes().names(): | |
| pc.create_index( | |
| name=index_name, | |
| dimension=384, # change as per embedding model | |
| metric="cosine", | |
| spec=ServerlessSpec( | |
| cloud='aws', | |
| region='us-east-1' | |
| ) | |
| ) | |
| index = pc.Index(index_name) | |
| # Initialize Hugging Face embeddings | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| # Maximum allowed chunk size in bytes (4MB) | |
| MAX_CHUNK_SIZE = 4 * 1024 * 1024 | |
| def load_chat_content(file) -> str: | |
| """Load chat content from the uploaded zip file and store it in Pinecone.""" | |
| if file is None: | |
| return "No file uploaded. Please upload a valid ZIP file to process." | |
| # Ensure the uploaded file is a ZIP file | |
| if not zipfile.is_zipfile(file.name): | |
| return "Uploaded file is not a valid ZIP file. Please upload a ZIP file." | |
| # Load and process the ZIP file | |
| temp_dir = 'temp_extracted_files' | |
| os.makedirs(temp_dir, exist_ok=True) | |
| try: | |
| with zipfile.ZipFile(file, 'r') as z: | |
| z.extractall(temp_dir) | |
| except zipfile.BadZipFile: | |
| return "Error reading ZIP file. The file may be corrupted." | |
| chat_files = [f for f in os.listdir(temp_dir) if f.endswith('.txt')] | |
| if not chat_files: | |
| return "No chat files found in the zip archive." | |
| chat_file_path = os.path.join(temp_dir, chat_files[0]) | |
| loader = WhatsAppChatLoader(path=chat_file_path) | |
| raw_messages = loader.lazy_load() | |
| messages = list(raw_messages) | |
| chat_content = "\n".join([doc.page_content for doc in messages]) | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| ) | |
| chunks = text_splitter.create_documents([chat_content]) | |
| # Store chunks in Pinecone with unique IDs | |
| vectors_to_upsert = [] | |
| for i, chunk in enumerate(chunks): | |
| vector = embeddings.embed_documents([chunk.page_content])[0] | |
| unique_id = str(uuid.uuid4()) # Generate a unique ID | |
| vectors_to_upsert.append((unique_id, vector, {"text": chunk.page_content})) | |
| # Split the vectors_to_upsert into smaller batches if needed | |
| for i in range(0, len(vectors_to_upsert), 100): # Batch size of 100 | |
| batch = vectors_to_upsert[i:i + 100] | |
| # Calculate batch size | |
| batch_size = sum(len(vector[2]["text"].encode('utf-8')) for vector in batch) | |
| if batch_size > MAX_CHUNK_SIZE: | |
| # Further split the batch if it exceeds the limit | |
| for j in range(0, len(batch), 10): # Split into even smaller batches | |
| sub_batch = batch[j:j + 10] | |
| index.upsert(sub_batch) | |
| else: | |
| index.upsert(batch) | |
| return "All chat content has been successfully upserted to Pinecone." | |
| # Define the Gradio interface | |
| interface = gr.Interface( | |
| fn=load_chat_content, | |
| inputs=[ | |
| gr.File(label="Upload WhatsApp Chat Zip File") | |
| ], | |
| outputs="text", | |
| title="WhatsApp Chat Upsert to Pinecone", | |
| description="Upload a zip file containing a WhatsApp chat file and upsert its content to Pinecone.", | |
| ) | |
| if __name__ == "__main__": | |
| interface.launch() | |