| import os |
| from pathlib import Path |
| from hashlib import md5 |
|
|
| import pdfplumber |
| from pinecone import Pinecone |
|
|
| try: |
| from .utils import ( |
| get_gemini_client, |
| chunk_text, |
| clean_text, |
| generate_batch_embeddings, |
| count_tokens |
| ) |
| except ImportError: |
| from utils import ( |
| get_gemini_client, |
| chunk_text, |
| clean_text, |
| generate_batch_embeddings, |
| count_tokens |
| ) |
|
|
|
|
| DATA_DIR = Path(__file__).parent.parent / "docs" |
| PINECONE_INDEX = os.environ.get("PINECONE_INDEX", "sabitax") |
| CHUNK_SIZE = 500 |
| CHUNK_OVERLAP = 50 |
|
|
|
|
| def get_pinecone_client(): |
| api_key = os.environ.get("PINECONE_API_KEY") |
| if not api_key: |
| raise ValueError("PINECONE_API_KEY environment variable is not set.") |
| return Pinecone(api_key=api_key) |
|
|
|
|
| def get_pinecone_index(pc=None): |
| if pc is None: |
| pc = get_pinecone_client() |
| return pc.Index(PINECONE_INDEX) |
|
|
|
|
| def extract_text_from_file(file_path: Path) -> str: |
| suffix = file_path.suffix.lower() |
|
|
| if suffix == ".pdf": |
| text_parts = [] |
| try: |
| with pdfplumber.open(file_path) as pdf: |
| for page_num, page in enumerate(pdf.pages, 1): |
| page_text = page.extract_text() |
| if page_text: |
| text_parts.append(f"[Page {page_num}]\n{page_text}") |
| except Exception as e: |
| print(f" Error extracting text from {file_path.name}: {e}") |
| return "" |
|
|
| full_text = "\n\n".join(text_parts) |
| return clean_text(full_text) |
|
|
| elif suffix in [".doc", ".docx"]: |
| try: |
| import docx2txt |
| text = docx2txt.process(file_path) |
| return clean_text(text) |
| except ImportError: |
| print(f" docx2txt not installed. Cannot process {file_path.name}") |
| return "" |
| except Exception as e: |
| print(f" Error extracting text from {file_path.name}: {e}") |
| return "" |
|
|
| elif suffix == ".txt": |
| try: |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: |
| text = f.read() |
| return clean_text(text) |
| except Exception as e: |
| print(f" Error reading {file_path.name}: {e}") |
| return "" |
|
|
| else: |
| print(f" Unsupported file type: {suffix}") |
| return "" |
|
|
|
|
| def generate_chunk_id(doc_name: str, chunk_index: int) -> str: |
| content = f"{doc_name}_{chunk_index}" |
| return md5(content.encode()).hexdigest() |
|
|
|
|
| def ingest_single_pdf( |
| pdf_path: Path, |
| index, |
| gemini_client, |
| force: bool = False |
| ) -> tuple[int, int]: |
| doc_name = pdf_path.name |
| |
| if not force: |
| test_id = generate_chunk_id(doc_name, 0) |
| result = index.fetch(ids=[test_id]) |
| if result.vectors: |
| print(f" Skipping {doc_name} (already ingested)") |
| return 0, 1 |
| |
| print(f" Processing: {doc_name}") |
| |
| text = extract_text_from_pdf(pdf_path) |
| if not text: |
| print(f" No text extracted from {doc_name}") |
| return 0, 0 |
| |
| total_tokens = count_tokens(text) |
| print(f" Extracted {total_tokens:,} tokens") |
| |
| chunks = chunk_text(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) |
| print(f" Created {len(chunks)} chunks") |
| |
| if not chunks: |
| return 0, 0 |
| |
| print(f" Generating embeddings...") |
| embeddings = generate_batch_embeddings(gemini_client, chunks) |
| |
| vectors = [] |
| for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): |
| vectors.append({ |
| "id": generate_chunk_id(doc_name, i), |
| "values": embedding, |
| "metadata": { |
| "source": doc_name, |
| "chunk_index": i, |
| "total_chunks": len(chunks), |
| "text": chunk[:1000] |
| } |
| }) |
| |
| batch_size = 100 |
| for i in range(0, len(vectors), batch_size): |
| batch = vectors[i:i + batch_size] |
| index.upsert(vectors=batch) |
| |
| print(f" Added {len(chunks)} chunks to Pinecone") |
| return len(chunks), 0 |
|
|
|
|
| def ingest_all_documents(data_dir: Path = DATA_DIR, force: bool = False): |
| print("\nStarting document ingestion pipeline\n") |
| print(f"Data directory: {data_dir}") |
| print(f"Pinecone index: {PINECONE_INDEX}\n") |
| |
| pdf_files = list(data_dir.glob("*.pdf")) |
| |
| if not pdf_files: |
| print(f"No PDF files found in {data_dir}") |
| return |
| |
| print(f"Found {len(pdf_files)} PDF files\n") |
| |
| print("Connecting to Gemini API...") |
| gemini_client = get_gemini_client() |
| |
| print("Connecting to Pinecone...") |
| index = get_pinecone_index() |
| stats = index.describe_index_stats() |
| print(f"Current index size: {stats.total_vector_count} vectors\n") |
| print("-" * 60) |
| |
| total_added = 0 |
| total_skipped = 0 |
| |
| for pdf_path in sorted(pdf_files): |
| added, skipped = ingest_single_pdf( |
| pdf_path, |
| index, |
| gemini_client, |
| force=force |
| ) |
| total_added += added |
| total_skipped += skipped |
| |
| print("-" * 60) |
| stats = index.describe_index_stats() |
| print(f"\nIngestion complete!") |
| print(f" Chunks added: {total_added}") |
| print(f" Documents skipped: {total_skipped}") |
| print(f" Total index size: {stats.total_vector_count} vectors\n") |
|
|
|
|
| def clear_index(): |
| print("Clearing Pinecone index...") |
| try: |
| index = get_pinecone_index() |
| index.delete(delete_all=True) |
| print("Index cleared successfully") |
| except Exception as e: |
| print(f"Error clearing index: {e}") |
|
|
|
|
| def show_stats(): |
| print("\nPinecone Index Statistics\n") |
| |
| try: |
| index = get_pinecone_index() |
| stats = index.describe_index_stats() |
| print(f" Index: {PINECONE_INDEX}") |
| print(f" Total vectors: {stats.total_vector_count}") |
| print(f" Dimensions: {stats.dimension}") |
| except Exception as e: |
| print(f" Error: {e}") |
| |
| print() |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
| from dotenv import load_dotenv |
| load_dotenv() |
| |
| parser = argparse.ArgumentParser(description="Ingest PDF documents into Pinecone for RAG") |
| parser.add_argument("--force", "-f", action="store_true") |
| parser.add_argument("--clear", action="store_true") |
| parser.add_argument("--stats", action="store_true") |
| parser.add_argument("--data-dir", type=Path, default=DATA_DIR) |
| |
| args = parser.parse_args() |
| |
| if args.stats: |
| show_stats() |
| elif args.clear: |
| clear_index() |
| if not args.stats: |
| ingest_all_documents(data_dir=args.data_dir, force=True) |
| else: |
| ingest_all_documents(data_dir=args.data_dir, force=args.force) |
|
|