Spaces:
Sleeping
Sleeping
File size: 5,774 Bytes
e582eae 1158dd2 e582eae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
import os
import time
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
import openai
import hashlib
from processing import extract_text, preprocess_text_generalized
# Load environment variables from .env file
load_dotenv()
# Get Pinecone and OpenAI API keys from .env
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
INDEX_NAME = "document-embeddings"
EMBEDDING_DIMENSION = 1536 # OpenAI's embeddings dimension for `text-embedding-ada-002`
CLOUD = "aws"
REGION = "us-east-1"
# Set OpenAI API key
openai.api_key = OPENAI_API_KEY
# Initialize Pinecone
def initialize_pinecone(api_key, index_name, dimension, cloud="aws", region="us-east-1"):
"""
Initializes Pinecone and creates an index if it doesn't exist.
"""
# Create a Pinecone client instance
pc = Pinecone(api_key=api_key)
# Check if the index exists; if not, create it
if index_name not in pc.list_indexes().names():
print(f"Index '{index_name}' does not exist. Creating a new index...")
pc.create_index(
name=index_name,
dimension=dimension,
metric="cosine",
spec=ServerlessSpec(cloud=cloud, region=region)
)
# Wait for the index to be ready
while not pc.describe_index(index_name).status["ready"]:
print("Waiting for index to be ready...")
time.sleep(1)
# Return the Pinecone Index object
return pc.Index(index_name)
# Save embeddings to Pinecone vector DB
from pinecone.core.openapi.shared.exceptions import NotFoundException
def save_embeddings_to_pinecone(index, embeddings, metadata, namespace="default"):
"""
Save embeddings to Pinecone. Clears old embeddings if they exist.
"""
try:
# Check if the namespace exists before attempting deletion
index_description = index.describe_index_stats()
if namespace in index_description.get("namespaces", {}):
index.delete(delete_all=True, namespace=namespace)
print(f"Cleared all previous embeddings in namespace: {namespace}")
else:
print(f"Namespace '{namespace}' not found. Proceeding to save new embeddings.")
except Exception as e:
print(f"Error while checking/deleting embeddings in namespace {namespace}: {e}")
if embeddings:
vectors = [
{"id": f"doc_{i}", "values": embedding, "metadata": metadata}
for i, embedding in enumerate(embeddings)
]
index.upsert(vectors=vectors, namespace=namespace)
print(f"Saved embeddings to namespace: {namespace}")
else:
print("No embeddings to save. Skipping upsert operation.")
# Generate embeddings using OpenAI API
def get_openai_embeddings(text, model="text-embedding-ada-002"):
"""
Generate embeddings for a given text using OpenAI's embedding model.
Handles splitting text into chunks if it exceeds the token limit.
"""
max_tokens = 8192 # Adjust based on the model's maximum token limit
try:
# Split text into smaller chunks
chunks = [text[i:i + max_tokens] for i in range(0, len(text), max_tokens)]
embeddings = []
for chunk in chunks:
response = openai.Embedding.create(input=chunk, model=model)
embeddings.extend([embedding["embedding"] for embedding in response["data"]])
return embeddings
except Exception as e:
print(f"Error generating embeddings with OpenAI API: {e}")
return None
# Query Pinecone for relevant embeddings
def query_pinecone(index, query_embedding, namespace="default", top_k=3):
"""
Retrieve relevant embeddings from Pinecone using similarity search.
"""
results = index.query(
vector=query_embedding,
namespace=namespace,
top_k=top_k,
include_metadata=True
)
return results["matches"] # Returns the top-k matches with metadata
# Pipeline for handling file uploads and updating Pinecone vector DB
# Global variable to track the previous file hash
previous_file_hash = None
def calculate_file_hash(file_path):
"""
Calculate a hash for the uploaded file to uniquely identify it.
"""
hasher = hashlib.md5()
with open(file_path, "rb") as f:
while chunk := f.read(8192):
hasher.update(chunk)
return hasher.hexdigest()
def handle_file_upload(file_path, pinecone_index, namespace="default"):
"""
Handle the process of uploading a file, clearing old embeddings,
and saving new embeddings dynamically.
"""
global previous_file_hash
current_file_hash = calculate_file_hash(file_path)
if current_file_hash == previous_file_hash:
print(f"File '{file_path}' is identical to the previously uploaded file. Skipping processing.")
return
try:
text = extract_text(file_path)
processed_text = preprocess_text_generalized(text)
# Generate embeddings
embeddings = get_openai_embeddings(processed_text)
if embeddings:
metadata = {"file_name": os.path.basename(file_path), "text": processed_text}
save_embeddings_to_pinecone(pinecone_index, embeddings, metadata, namespace)
previous_file_hash = current_file_hash
else:
print("Failed to generate embeddings. Skipping save operation.")
except Exception as e:
print(f"Error processing file upload: {e}")
# Example usage
if __name__ == "__main__":
# Initialize Pinecone with serverless specifications
pinecone_index = initialize_pinecone(
api_key=PINECONE_API_KEY,
index_name=INDEX_NAME,
dimension=EMBEDDING_DIMENSION,
cloud=CLOUD,
region=REGION
)
|