sabitax / rag /ingest.py
nexusbert's picture
Upload 11 files
bfdf1f3 verified
import os
from pathlib import Path
from hashlib import md5
import pdfplumber
from pinecone import Pinecone
try:
from .utils import (
get_gemini_client,
chunk_text,
clean_text,
generate_batch_embeddings,
count_tokens
)
except ImportError:
from utils import (
get_gemini_client,
chunk_text,
clean_text,
generate_batch_embeddings,
count_tokens
)
DATA_DIR = Path(__file__).parent.parent / "docs"
PINECONE_INDEX = os.environ.get("PINECONE_INDEX", "sabitax")
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
def get_pinecone_client():
api_key = os.environ.get("PINECONE_API_KEY")
if not api_key:
raise ValueError("PINECONE_API_KEY environment variable is not set.")
return Pinecone(api_key=api_key)
def get_pinecone_index(pc=None):
if pc is None:
pc = get_pinecone_client()
return pc.Index(PINECONE_INDEX)
def extract_text_from_file(file_path: Path) -> str:
suffix = file_path.suffix.lower()
if suffix == ".pdf":
text_parts = []
try:
with pdfplumber.open(file_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
page_text = page.extract_text()
if page_text:
text_parts.append(f"[Page {page_num}]\n{page_text}")
except Exception as e:
print(f" Error extracting text from {file_path.name}: {e}")
return ""
full_text = "\n\n".join(text_parts)
return clean_text(full_text)
elif suffix in [".doc", ".docx"]:
try:
import docx2txt
text = docx2txt.process(file_path)
return clean_text(text)
except ImportError:
print(f" docx2txt not installed. Cannot process {file_path.name}")
return ""
except Exception as e:
print(f" Error extracting text from {file_path.name}: {e}")
return ""
elif suffix == ".txt":
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
return clean_text(text)
except Exception as e:
print(f" Error reading {file_path.name}: {e}")
return ""
else:
print(f" Unsupported file type: {suffix}")
return ""
def generate_chunk_id(doc_name: str, chunk_index: int) -> str:
content = f"{doc_name}_{chunk_index}"
return md5(content.encode()).hexdigest()
def ingest_single_pdf(
pdf_path: Path,
index,
gemini_client,
force: bool = False
) -> tuple[int, int]:
doc_name = pdf_path.name
if not force:
test_id = generate_chunk_id(doc_name, 0)
result = index.fetch(ids=[test_id])
if result.vectors:
print(f" Skipping {doc_name} (already ingested)")
return 0, 1
print(f" Processing: {doc_name}")
text = extract_text_from_pdf(pdf_path)
if not text:
print(f" No text extracted from {doc_name}")
return 0, 0
total_tokens = count_tokens(text)
print(f" Extracted {total_tokens:,} tokens")
chunks = chunk_text(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
print(f" Created {len(chunks)} chunks")
if not chunks:
return 0, 0
print(f" Generating embeddings...")
embeddings = generate_batch_embeddings(gemini_client, chunks)
vectors = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
vectors.append({
"id": generate_chunk_id(doc_name, i),
"values": embedding,
"metadata": {
"source": doc_name,
"chunk_index": i,
"total_chunks": len(chunks),
"text": chunk[:1000]
}
})
batch_size = 100
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch)
print(f" Added {len(chunks)} chunks to Pinecone")
return len(chunks), 0
def ingest_all_documents(data_dir: Path = DATA_DIR, force: bool = False):
print("\nStarting document ingestion pipeline\n")
print(f"Data directory: {data_dir}")
print(f"Pinecone index: {PINECONE_INDEX}\n")
pdf_files = list(data_dir.glob("*.pdf"))
if not pdf_files:
print(f"No PDF files found in {data_dir}")
return
print(f"Found {len(pdf_files)} PDF files\n")
print("Connecting to Gemini API...")
gemini_client = get_gemini_client()
print("Connecting to Pinecone...")
index = get_pinecone_index()
stats = index.describe_index_stats()
print(f"Current index size: {stats.total_vector_count} vectors\n")
print("-" * 60)
total_added = 0
total_skipped = 0
for pdf_path in sorted(pdf_files):
added, skipped = ingest_single_pdf(
pdf_path,
index,
gemini_client,
force=force
)
total_added += added
total_skipped += skipped
print("-" * 60)
stats = index.describe_index_stats()
print(f"\nIngestion complete!")
print(f" Chunks added: {total_added}")
print(f" Documents skipped: {total_skipped}")
print(f" Total index size: {stats.total_vector_count} vectors\n")
def clear_index():
print("Clearing Pinecone index...")
try:
index = get_pinecone_index()
index.delete(delete_all=True)
print("Index cleared successfully")
except Exception as e:
print(f"Error clearing index: {e}")
def show_stats():
print("\nPinecone Index Statistics\n")
try:
index = get_pinecone_index()
stats = index.describe_index_stats()
print(f" Index: {PINECONE_INDEX}")
print(f" Total vectors: {stats.total_vector_count}")
print(f" Dimensions: {stats.dimension}")
except Exception as e:
print(f" Error: {e}")
print()
if __name__ == "__main__":
import argparse
from dotenv import load_dotenv
load_dotenv()
parser = argparse.ArgumentParser(description="Ingest PDF documents into Pinecone for RAG")
parser.add_argument("--force", "-f", action="store_true")
parser.add_argument("--clear", action="store_true")
parser.add_argument("--stats", action="store_true")
parser.add_argument("--data-dir", type=Path, default=DATA_DIR)
args = parser.parse_args()
if args.stats:
show_stats()
elif args.clear:
clear_index()
if not args.stats:
ingest_all_documents(data_dir=args.data_dir, force=True)
else:
ingest_all_documents(data_dir=args.data_dir, force=args.force)