test-ragp / scripts /indexing_pinecone.py
awinml's picture
Upload 11 files
6c5ce7a verified
import argparse
import weave
from dataloaders.langchain import FinanceBenchDataloader
from rag_pipelines.embeddings.dense import DenseEmbeddings
from rag_pipelines.embeddings.sparse_pinecone_text import SparseEmbeddings
from rag_pipelines.vectordb.pinecone_hybrid_index import PineconeHybridVectorDB
def parse_arguments() -> argparse.Namespace:
"""Parse command-line arguments for the FinanceBench pipeline.
Returns:
argparse.Namespace: Parsed command-line arguments.
"""
parser = argparse.ArgumentParser(
description="Process FinanceBench data, generate embeddings, and add processed documents to a Pinecone hybrid index."
)
# Weave tracing project name
parser.add_argument(
"--project_name",
required=True,
help="Weave project name to initialize tracing.",
)
# FinanceBench dataloader arguments
parser.add_argument(
"--dataset_name",
type=str,
required=True,
help="Name of the FinanceBench dataset (e.g., 'PatronusAI/financebench').",
)
parser.add_argument(
"--split",
type=str,
default="train[:1]",
help="Dataset split to use (e.g., 'train[:1]').",
)
# Dense Embeddings arguments
parser.add_argument(
"--dense_model_name",
type=str,
required=True,
help="Dense embedding model name (e.g., 'sentence-transformers/all-MiniLM-L6-v2').",
)
parser.add_argument(
"--dense_device",
type=str,
default="cpu",
help="Device to run the dense embedding model (e.g., 'cpu' or 'cuda').",
)
parser.add_argument(
"--normalize_embeddings",
action="store_true",
help="Flag to normalize embeddings during encoding.",
)
parser.add_argument(
"--show_progress",
action="store_true",
help="Flag to show progress during embedding generation.",
)
# Sparse Embeddings arguments
parser.add_argument(
"--sparse_max_seq_length",
type=int,
required=True,
help="Maximum sequence length for sparse embeddings.",
)
# Semantic Chunking arguments (if applicable in your downstream pipeline)
parser.add_argument(
"--chunking_threshold_type",
type=str,
default="percentile",
help="Threshold type for semantic chunking (e.g., 'percentile' or 'absolute').",
)
# Pinecone configuration arguments
parser.add_argument(
"--pinecone_api_key",
type=str,
required=True,
help="API key for the Pinecone vector database.",
)
parser.add_argument(
"--pinecone_index_name",
type=str,
required=True,
help="Name of the Pinecone index.",
)
parser.add_argument(
"--pinecone_dimension",
type=int,
required=True,
help="Vector dimension in the Pinecone index.",
)
parser.add_argument(
"--pinecone_metric",
type=str,
required=True,
help="Similarity metric for the Pinecone index (e.g., 'dotproduct' or 'cosine').",
)
parser.add_argument(
"--pinecone_region",
type=str,
required=True,
help="Pinecone region (e.g., 'us-east-1').",
)
parser.add_argument(
"--pinecone_cloud",
type=str,
required=True,
help="Pinecone cloud provider (e.g., 'aws').",
)
parser.add_argument(
"--namespace",
type=str,
required=True,
help="Namespace for document storage in Pinecone.",
)
return parser.parse_args()
def main() -> None:
"""Load FinanceBench data, generate dense and sparse embeddings, add processed documents to a Pinecone index.
The pipeline performs the following steps:
1. Initialize Weave tracing.
2. Load FinanceBench documents.
3. Generate dense and sparse embeddings for the documents.
4. Initialize and configure the Pinecone hybrid vector database.
5. Index the processed documents in Pinecone.
"""
args = parse_arguments()
# Initialize Weave tracing
weave.init(args.project_name)
# Load FinanceBench dataset using FinanceBenchDataloader
data_loader = FinanceBenchDataloader(
dataset_name=args.dataset_name,
split=args.split,
)
# Download and prepare PDF documents from the dataset (if not already cached)
data_loader.get_corpus_pdfs()
# Create structured documents from the downloaded PDFs
documents = data_loader.create_documents()
print("Loaded Documents:")
print(documents)
# Initialize dense embedding model
dense_embeddings = DenseEmbeddings(
model_name=args.dense_model_name,
model_kwargs={"device": args.dense_device},
encode_kwargs={"normalize_embeddings": args.normalize_embeddings},
show_progress=args.show_progress,
)
# Initialize sparse embedding model
sparse_embeddings = SparseEmbeddings(model_kwargs={"max_seq_length": args.sparse_max_seq_length})
# Extract embedding models for use in the Pinecone vector database
dense_embedding_model = dense_embeddings.embedding_model
sparse_embedding_model = sparse_embeddings.sparse_embedding_model
# Initialize PineconeHybridVectorDB with specified configuration
pinecone_vector_db = PineconeHybridVectorDB(
api_key=args.pinecone_api_key,
index_name=args.pinecone_index_name,
dimension=args.pinecone_dimension,
metric=args.pinecone_metric,
region=args.pinecone_region,
cloud=args.pinecone_cloud,
)
# Add the processed documents to the Pinecone hybrid index using both dense and sparse embeddings
pinecone_vector_db.add_documents(
documents=documents,
dense_embedding_model=dense_embedding_model,
sparse_embedding_model=sparse_embedding_model,
namespace=args.namespace,
)
print("Documents have been indexed successfully in Pinecone.")
if __name__ == "__main__":
main()