File size: 4,888 Bytes
6c5ce7a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import argparse
import logging
import weave
from dataloaders.langchain import FinanceBenchDataloader
from langchain_huggingface import HuggingFaceEmbeddings
from rag_pipelines.unstructured.unstructured_chunker import UnstructuredChunker
from rag_pipelines.unstructured.unstructured_pdf_loader import UnstructuredDocumentLoader
from rag_pipelines.utils.logging import LoggerFactory
from rag_pipelines.vectordb.weaviate import (
WeaviateVectorDB,
) # Assumes the WeaviateVectorDB class is defined as shown above
logger_factory = LoggerFactory(logger_name=__name__, log_level=logging.INFO)
logger = logger_factory.get_logger()
def parse_arguments() -> argparse.Namespace:
"""Parse command-line arguments.
Returns:
argparse.Namespace: Parsed command-line arguments.
"""
parser = argparse.ArgumentParser(
description="Run the FinanceBench pipeline to load, process, chunk, embed, and index documents in Weaviate."
)
# FinanceBench dataset parameters
parser.add_argument(
"--dataset_name",
type=str,
default="PatronusAI/financebench",
help="Name of the FinanceBench dataset to use.",
)
parser.add_argument(
"--split",
type=str,
default="train[:1]",
help="Dataset split to use (e.g., 'train[:1]').",
)
# PDF directory for unstructured document loader
parser.add_argument(
"--pdf_dir",
type=str,
default="pdfs/",
help="Directory path containing PDF files.",
)
# UnstructuredDocumentLoader parameters
parser.add_argument(
"--strategy",
type=str,
default="fast",
help="Processing strategy for the unstructured document loader.",
)
parser.add_argument(
"--mode",
type=str,
default="elements",
help="Extraction mode for the unstructured document loader.",
)
# Weaviate connection parameters
parser.add_argument(
"--cluster_url",
type=str,
required=True,
help="URL of the Weaviate cluster.",
)
parser.add_argument(
"--api_key",
type=str,
required=True,
help="API key for Weaviate authentication.",
)
parser.add_argument(
"--collection_name",
type=str,
default="financebench",
help="Name of the Weaviate collection to create/use.",
)
parser.add_argument(
"--text_field",
type=str,
default="text",
help="Field name that contains document text in Weaviate.",
)
# Dense embedding model parameters
parser.add_argument(
"--dense_model_name",
type=str,
default="sentence-transformers/all-mpnet-base-v2",
help="Dense embedding model name.",
)
return parser.parse_args()
def main() -> None:
"""Run the FinanceBench document processing pipeline using Weaviate.
The pipeline performs the following steps:
1. Initializes Weave tracing.
2. Loads a subset of the FinanceBench dataset.
3. Retrieves PDF documents from the specified directory.
4. Processes PDFs using the UnstructuredDocumentLoader.
5. Chunks documents using the UnstructuredChunker.
6. Generates dense embeddings.
7. Sets up a Weaviate vector database and indexes the documents.
"""
args = parse_arguments()
# Initialize Weave tracing
weave.init("financebench_test")
# Load FinanceBench dataset and retrieve corpus PDFs
dataloader = FinanceBenchDataloader(
dataset_name=args.dataset_name,
split=args.split,
)
dataloader.get_corpus_pdfs()
# Load and transform PDF documents from the specified directory
unstructured_document_loader = UnstructuredDocumentLoader(
strategy=args.strategy,
mode=args.mode,
)
documents = unstructured_document_loader.transform_documents(args.pdf_dir)
logger.info("Loaded Documents:")
logger.info(documents)
# Chunk the documents using the UnstructuredChunker
chunker = UnstructuredChunker()
chunked_documents = chunker.transform_documents(documents)
logger.info("Chunked Documents:")
logger.info(chunked_documents)
# Initialize the dense embedding model
embeddings = HuggingFaceEmbeddings(model_name=args.dense_model_name)
# Initialize the Weaviate vector database client
weaviate_vector_db = WeaviateVectorDB(
cluster_url=args.cluster_url,
api_key=args.api_key,
collection_name=args.collection_name,
text_field=args.text_field,
dense_embedding_model=embeddings,
)
# Index the chunked documents in Weaviate using the dense embeddings
weaviate_vector_db.add_documents(documents=chunked_documents)
logger.info("Documents have been indexed successfully in Weaviate.")
if __name__ == "__main__":
main()
|