File size: 4,888 Bytes
6c5ce7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import argparse
import logging

import weave
from dataloaders.langchain import FinanceBenchDataloader
from langchain_huggingface import HuggingFaceEmbeddings

from rag_pipelines.unstructured.unstructured_chunker import UnstructuredChunker
from rag_pipelines.unstructured.unstructured_pdf_loader import UnstructuredDocumentLoader
from rag_pipelines.utils.logging import LoggerFactory
from rag_pipelines.vectordb.weaviate import (
    WeaviateVectorDB,
)  # Assumes the WeaviateVectorDB class is defined as shown above

logger_factory = LoggerFactory(logger_name=__name__, log_level=logging.INFO)
logger = logger_factory.get_logger()


def parse_arguments() -> argparse.Namespace:
    """Parse command-line arguments.

    Returns:
        argparse.Namespace: Parsed command-line arguments.
    """
    parser = argparse.ArgumentParser(
        description="Run the FinanceBench pipeline to load, process, chunk, embed, and index documents in Weaviate."
    )

    # FinanceBench dataset parameters
    parser.add_argument(
        "--dataset_name",
        type=str,
        default="PatronusAI/financebench",
        help="Name of the FinanceBench dataset to use.",
    )
    parser.add_argument(
        "--split",
        type=str,
        default="train[:1]",
        help="Dataset split to use (e.g., 'train[:1]').",
    )

    # PDF directory for unstructured document loader
    parser.add_argument(
        "--pdf_dir",
        type=str,
        default="pdfs/",
        help="Directory path containing PDF files.",
    )

    # UnstructuredDocumentLoader parameters
    parser.add_argument(
        "--strategy",
        type=str,
        default="fast",
        help="Processing strategy for the unstructured document loader.",
    )
    parser.add_argument(
        "--mode",
        type=str,
        default="elements",
        help="Extraction mode for the unstructured document loader.",
    )

    # Weaviate connection parameters
    parser.add_argument(
        "--cluster_url",
        type=str,
        required=True,
        help="URL of the Weaviate cluster.",
    )
    parser.add_argument(
        "--api_key",
        type=str,
        required=True,
        help="API key for Weaviate authentication.",
    )
    parser.add_argument(
        "--collection_name",
        type=str,
        default="financebench",
        help="Name of the Weaviate collection to create/use.",
    )
    parser.add_argument(
        "--text_field",
        type=str,
        default="text",
        help="Field name that contains document text in Weaviate.",
    )

    # Dense embedding model parameters
    parser.add_argument(
        "--dense_model_name",
        type=str,
        default="sentence-transformers/all-mpnet-base-v2",
        help="Dense embedding model name.",
    )

    return parser.parse_args()


def main() -> None:
    """Run the FinanceBench document processing pipeline using Weaviate.

    The pipeline performs the following steps:
      1. Initializes Weave tracing.
      2. Loads a subset of the FinanceBench dataset.
      3. Retrieves PDF documents from the specified directory.
      4. Processes PDFs using the UnstructuredDocumentLoader.
      5. Chunks documents using the UnstructuredChunker.
      6. Generates dense embeddings.
      7. Sets up a Weaviate vector database and indexes the documents.
    """
    args = parse_arguments()

    # Initialize Weave tracing
    weave.init("financebench_test")

    # Load FinanceBench dataset and retrieve corpus PDFs
    dataloader = FinanceBenchDataloader(
        dataset_name=args.dataset_name,
        split=args.split,
    )
    dataloader.get_corpus_pdfs()

    # Load and transform PDF documents from the specified directory
    unstructured_document_loader = UnstructuredDocumentLoader(
        strategy=args.strategy,
        mode=args.mode,
    )
    documents = unstructured_document_loader.transform_documents(args.pdf_dir)
    logger.info("Loaded Documents:")
    logger.info(documents)

    # Chunk the documents using the UnstructuredChunker
    chunker = UnstructuredChunker()
    chunked_documents = chunker.transform_documents(documents)
    logger.info("Chunked Documents:")
    logger.info(chunked_documents)

    # Initialize the dense embedding model
    embeddings = HuggingFaceEmbeddings(model_name=args.dense_model_name)

    # Initialize the Weaviate vector database client
    weaviate_vector_db = WeaviateVectorDB(
        cluster_url=args.cluster_url,
        api_key=args.api_key,
        collection_name=args.collection_name,
        text_field=args.text_field,
        dense_embedding_model=embeddings,
    )

    # Index the chunked documents in Weaviate using the dense embeddings
    weaviate_vector_db.add_documents(documents=chunked_documents)
    logger.info("Documents have been indexed successfully in Weaviate.")


if __name__ == "__main__":
    main()