Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| from dotenv import load_dotenv | |
| from pinecone import Pinecone, ServerlessSpec | |
| import openai | |
| import hashlib | |
| from processing import extract_text, preprocess_text_generalized | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Get Pinecone and OpenAI API keys from .env | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| INDEX_NAME = "document-embeddings" | |
| EMBEDDING_DIMENSION = 1536 # OpenAI's embeddings dimension for `text-embedding-ada-002` | |
| CLOUD = "aws" | |
| REGION = "us-east-1" | |
| # Set OpenAI API key | |
| openai.api_key = OPENAI_API_KEY | |
| # Initialize Pinecone | |
| def initialize_pinecone(api_key, index_name, dimension, cloud="aws", region="us-east-1"): | |
| """ | |
| Initializes Pinecone and creates an index if it doesn't exist. | |
| """ | |
| # Create a Pinecone client instance | |
| pc = Pinecone(api_key=api_key) | |
| # Check if the index exists; if not, create it | |
| if index_name not in pc.list_indexes().names(): | |
| print(f"Index '{index_name}' does not exist. Creating a new index...") | |
| pc.create_index( | |
| name=index_name, | |
| dimension=dimension, | |
| metric="cosine", | |
| spec=ServerlessSpec(cloud=cloud, region=region) | |
| ) | |
| # Wait for the index to be ready | |
| while not pc.describe_index(index_name).status["ready"]: | |
| print("Waiting for index to be ready...") | |
| time.sleep(1) | |
| # Return the Pinecone Index object | |
| return pc.Index(index_name) | |
| # Save embeddings to Pinecone vector DB | |
| from pinecone.core.openapi.shared.exceptions import NotFoundException | |
| def save_embeddings_to_pinecone(index, embeddings, metadata, namespace="default"): | |
| """ | |
| Save embeddings to Pinecone. Clears old embeddings if they exist. | |
| """ | |
| try: | |
| # Check if the namespace exists before attempting deletion | |
| index_description = index.describe_index_stats() | |
| if namespace in index_description.get("namespaces", {}): | |
| index.delete(delete_all=True, namespace=namespace) | |
| print(f"Cleared all previous embeddings in namespace: {namespace}") | |
| else: | |
| print(f"Namespace '{namespace}' not found. Proceeding to save new embeddings.") | |
| except Exception as e: | |
| print(f"Error while checking/deleting embeddings in namespace {namespace}: {e}") | |
| if embeddings: | |
| vectors = [ | |
| {"id": f"doc_{i}", "values": embedding, "metadata": metadata} | |
| for i, embedding in enumerate(embeddings) | |
| ] | |
| index.upsert(vectors=vectors, namespace=namespace) | |
| print(f"Saved embeddings to namespace: {namespace}") | |
| else: | |
| print("No embeddings to save. Skipping upsert operation.") | |
| # Generate embeddings using OpenAI API | |
| def get_openai_embeddings(text, model="text-embedding-ada-002"): | |
| """ | |
| Generate embeddings for a given text using OpenAI's embedding model. | |
| Handles splitting text into chunks if it exceeds the token limit. | |
| """ | |
| max_tokens = 8192 # Adjust based on the model's maximum token limit | |
| try: | |
| # Split text into smaller chunks | |
| chunks = [text[i:i + max_tokens] for i in range(0, len(text), max_tokens)] | |
| embeddings = [] | |
| for chunk in chunks: | |
| response = openai.Embedding.create(input=chunk, model=model) | |
| embeddings.extend([embedding["embedding"] for embedding in response["data"]]) | |
| return embeddings | |
| except Exception as e: | |
| print(f"Error generating embeddings with OpenAI API: {e}") | |
| return None | |
| # Query Pinecone for relevant embeddings | |
| def query_pinecone(index, query_embedding, namespace="default", top_k=3): | |
| """ | |
| Retrieve relevant embeddings from Pinecone using similarity search. | |
| """ | |
| results = index.query( | |
| vector=query_embedding, | |
| namespace=namespace, | |
| top_k=top_k, | |
| include_metadata=True | |
| ) | |
| return results["matches"] # Returns the top-k matches with metadata | |
| # Pipeline for handling file uploads and updating Pinecone vector DB | |
| # Global variable to track the previous file hash | |
| previous_file_hash = None | |
| def calculate_file_hash(file_path): | |
| """ | |
| Calculate a hash for the uploaded file to uniquely identify it. | |
| """ | |
| hasher = hashlib.md5() | |
| with open(file_path, "rb") as f: | |
| while chunk := f.read(8192): | |
| hasher.update(chunk) | |
| return hasher.hexdigest() | |
| def handle_file_upload(file_path, pinecone_index, namespace="default"): | |
| """ | |
| Handle the process of uploading a file, clearing old embeddings, | |
| and saving new embeddings dynamically. | |
| """ | |
| global previous_file_hash | |
| current_file_hash = calculate_file_hash(file_path) | |
| if current_file_hash == previous_file_hash: | |
| print(f"File '{file_path}' is identical to the previously uploaded file. Skipping processing.") | |
| return | |
| try: | |
| text = extract_text(file_path) | |
| processed_text = preprocess_text_generalized(text) | |
| # Generate embeddings | |
| embeddings = get_openai_embeddings(processed_text) | |
| if embeddings: | |
| metadata = {"file_name": os.path.basename(file_path), "text": processed_text} | |
| save_embeddings_to_pinecone(pinecone_index, embeddings, metadata, namespace) | |
| previous_file_hash = current_file_hash | |
| else: | |
| print("Failed to generate embeddings. Skipping save operation.") | |
| except Exception as e: | |
| print(f"Error processing file upload: {e}") | |
| # Example usage | |
| if __name__ == "__main__": | |
| # Initialize Pinecone with serverless specifications | |
| pinecone_index = initialize_pinecone( | |
| api_key=PINECONE_API_KEY, | |
| index_name=INDEX_NAME, | |
| dimension=EMBEDDING_DIMENSION, | |
| cloud=CLOUD, | |
| region=REGION | |
| ) | |