Spaces:
Sleeping
Sleeping
File size: 7,137 Bytes
87c78a9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
import requests
import fitz
import textwrap
import os
import google.generativeai as genai
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
import hashlib
import time
# Load environment variables from .env file
load_dotenv()
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_ENVIRONMENT = os.environ.get("PINECONE_ENVIRONMENT")
# Initialize clients
genai.configure(api_key=GOOGLE_API_KEY)
pc = Pinecone(api_key=PINECONE_API_KEY)
# --- CORRECTED FUNCTION: Handles both URLs and binary file content ---
def get_document_text(source) -> str:
"""
Extracts text from a document, handling either a URL or raw binary content.
"""
document_content = None
if isinstance(source, str): # If the source is a URL string
print(f"Downloading document from {source}...")
try:
response = requests.get(source)
response.raise_for_status()
document_content = response.content
except requests.exceptions.RequestException as e:
print(f"Error downloading the document: {e}")
return ""
elif isinstance(source, bytes): # If the source is raw file content (from upload)
print("Processing uploaded document content...")
document_content = source
else:
print("Invalid source type provided to get_document_text.")
return ""
if not document_content:
return ""
print("Extracting text from the document...")
document_text = ""
try:
pdf_document = fitz.open(stream=document_content, filetype="pdf")
for page_num in range(len(pdf_document)):
page = pdf_document.load_page(page_num)
document_text += page.get_text()
except Exception as e:
print(f"Error extracting text: {e}")
return ""
return document_text
def create_document_id(source: str) -> str:
"""Creates a stable SHA256 hash of the URL to use as a document ID."""
return hashlib.sha256(source.encode()).hexdigest()
def split_text_into_chunks(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> list[str]:
"""
Splits a large text document into smaller, overlapping chunks using a recursive strategy.
"""
def _recursive_split(t, separators, size, overlap):
if not separators:
return textwrap.wrap(t, size)
current_sep = separators[0]
other_seps = separators[1:]
parts = t.split(current_sep)
chunks = []
for part in parts:
if len(part) > size:
chunks.extend(_recursive_split(part, other_seps, size, overlap))
else:
chunks.append(part)
final_chunks = []
if chunks:
current_chunk = chunks[0]
for i in range(1, len(chunks)):
if len(current_chunk) + len(chunks[i]) <= size + overlap:
current_chunk += current_sep + chunks[i]
else:
final_chunks.append(current_chunk)
current_chunk = chunks[i]
final_chunks.append(current_chunk)
return [c for c in final_chunks if c.strip()]
separators = ["\n\n", "\n", ". ", " "]
chunks = _recursive_split(text, separators, chunk_size, chunk_overlap)
return chunks
def generate_embeddings(text_chunks: list[str]) -> list:
"""
Generates vector embeddings for a list of text chunks using Gemini Pro API.
"""
print(f"Generating embeddings for {len(text_chunks)} chunks using Gemini Pro...")
embeddings = []
try:
response = genai.embed_content(
model="models/embedding-001",
content=text_chunks
)
embeddings = response['embedding']
print("Embeddings generated successfully.")
except Exception as e:
print(f"Error generating embeddings: {e}")
return embeddings
def index_chunks_in_pinecone(chunks: list[str], embeddings: list, index_name: str, namespace: str):
"""
Indexes the text chunks and their embeddings in a specific Pinecone namespace.
"""
print(f"Indexing {len(chunks)} chunks in Pinecone index '{index_name}' under namespace '{namespace}'...")
try:
# Check if index exists, and create if it doesn't
if index_name not in pc.list_indexes().names():
print(f"Creating new Pinecone index: '{index_name}'")
pc.create_index(
name=index_name,
dimension=len(embeddings[0]),
metric='cosine',
spec=ServerlessSpec(cloud='aws', region='us-east-1')
)
print("Index created successfully. Waiting for it to become ready...")
# Wait for index to be ready
while not pc.describe_index(index_name).status.ready:
time.sleep(1)
index = pc.Index(index_name)
# Prepare data for upsert
vectors_to_upsert = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
vectors_to_upsert.append({
"id": f"chunk-{namespace}-{i}", # Make ID unique across namespaces
"values": embedding,
"metadata": {"text": chunk}
})
# Upsert in batches
batch_size = 100
for i in range(0, len(vectors_to_upsert), batch_size):
batch = vectors_to_upsert[i:i + batch_size]
index.upsert(vectors=batch, namespace=namespace) # <-- USE THE NAMESPACE
print(f"Upserted batch {i // batch_size + 1} into namespace '{namespace}'")
print(f"Successfully indexed {len(chunks)} chunks in namespace '{namespace}'.")
# Give a moment for the index to become queryable
time.sleep(5)
except Exception as e:
print(f"Error indexing in Pinecone: {e}")
if __name__ == "__main__":
sample_url = "https://hackrx.blob.core.windows.net/assets/hackrx_6/policies/BAJHLIP23020V012223.pdf?sv=2023-01-03&st=2025-07-30T06%3A46%3A49Z&se=2025-09-01T06%3A46%3A00Z&sr=c&sp=rl&sig=9szykRKdGYj0BVm1skP%2BX8N9%2FRENEn2k7MQPUp33jyQ%3D"
index_name = "hackrx-policy-index"
document_content = get_document_text(sample_url)
if document_content:
chunks = split_text_into_chunks(document_content)
print(f"\n--- Document Split into {len(chunks)} Chunks ---")
embeddings = generate_embeddings(chunks)
if embeddings:
print(f"Generated {len(embeddings)} embeddings.")
print(f"Size of each embedding vector: {len(embeddings[0])}")
# Index the chunks in Pinecone
print("--- Running standalone script test ---")
test_namespace = create_document_id(sample_url) # Use the new function!
index_chunks_in_pinecone(chunks, embeddings, index_name, namespace=test_namespace)
else:
print("Failed to generate embeddings. Pinecone indexing skipped.")
else:
print("Failed to process document content.") |