PercivalFletcher commited on
Commit
d4b4587
·
verified ·
1 Parent(s): 0c7fe1b

Update main3.py

Browse files
Files changed (1) hide show
  1. main3.py +39 -7
main3.py CHANGED
@@ -1,4 +1,4 @@
1
- # file: main.py
2
  import time
3
  import os
4
  import asyncio
@@ -8,7 +8,7 @@ from typing import List, Dict, Any
8
  from dotenv import load_dotenv
9
 
10
  from document_processor import ingest_and_parse_document
11
- from chunking_parent import create_parent_child_chunks # <-- Using the parent-child function
12
  from embedding import EmbeddingClient
13
  from retrieval_parent import Retriever, generate_hypothetical_document
14
  from generation import generate_answer
@@ -18,7 +18,7 @@ load_dotenv()
18
  app = FastAPI(
19
  title="Modular RAG API",
20
  description="A modular API for Retrieval-Augmented Generation with Parent-Child Retrieval.",
21
- version="2.2.1", # Updated version
22
  )
23
 
24
  GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
@@ -33,7 +33,7 @@ class RunRequest(BaseModel):
33
  class RunResponse(BaseModel):
34
  answers: List[str]
35
 
36
- class TestRequest(BaseModel): # <-- Model for test endpoints
37
  document_url: HttpUrl
38
 
39
  # --- NEW: Test Endpoint for Parent-Child Chunking ---
@@ -84,37 +84,69 @@ async def test_chunking_endpoint(request: TestRequest):
84
  @app.post("/hackrx/run", response_model=RunResponse)
85
  async def run_rag_pipeline(request: RunRequest):
86
  try:
 
 
87
  print("--- Kicking off RAG Pipeline with Parent-Child Strategy ---")
88
 
89
- # --- STAGE 1: DOCUMENT INGESTION ---
 
90
  markdown_content = await ingest_and_parse_document(request.document_url)
 
 
91
 
92
  # --- STAGE 2: PARENT-CHILD CHUNKING ---
 
93
  child_documents, docstore, _ = create_parent_child_chunks(markdown_content)
 
 
94
 
95
  if not child_documents:
96
  raise HTTPException(status_code=400, detail="Document could not be processed into chunks.")
97
 
98
- # --- STAGE 3: INDEXING ---
 
99
  retriever.index(child_documents, docstore)
 
 
100
 
101
  # --- CONCURRENT WORKFLOW ---
 
 
 
102
  hyde_tasks = [generate_hypothetical_document(q, GROQ_API_KEY) for q in request.questions]
103
  all_hyde_docs = await asyncio.gather(*hyde_tasks)
 
 
104
 
 
 
 
105
  retrieval_tasks = [
106
  retriever.retrieve(q, hyde_doc)
107
  for q, hyde_doc in zip(request.questions, all_hyde_docs)
108
  ]
109
  all_retrieved_chunks = await asyncio.gather(*retrieval_tasks)
 
 
110
 
 
 
 
111
  answer_tasks = [
112
  generate_answer(q, chunks, GROQ_API_KEY)
113
  for q, chunks in zip(request.questions, all_retrieved_chunks)
114
  ]
115
  final_answers = await asyncio.gather(*answer_tasks)
 
 
 
 
 
 
 
 
 
116
 
117
- print("--- RAG Pipeline Completed Successfully ---")
118
  return RunResponse(answers=final_answers)
119
 
120
  except Exception as e:
 
1
+ # file: main3.py
2
  import time
3
  import os
4
  import asyncio
 
8
  from dotenv import load_dotenv
9
 
10
  from document_processor import ingest_and_parse_document
11
+ from chunking_parent import create_parent_child_chunks
12
  from embedding import EmbeddingClient
13
  from retrieval_parent import Retriever, generate_hypothetical_document
14
  from generation import generate_answer
 
18
  app = FastAPI(
19
  title="Modular RAG API",
20
  description="A modular API for Retrieval-Augmented Generation with Parent-Child Retrieval.",
21
+ version="2.2.2", # Version updated
22
  )
23
 
24
  GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
 
33
  class RunResponse(BaseModel):
34
  answers: List[str]
35
 
36
+ class TestRequest(BaseModel):
37
  document_url: HttpUrl
38
 
39
  # --- NEW: Test Endpoint for Parent-Child Chunking ---
 
84
  @app.post("/hackrx/run", response_model=RunResponse)
85
  async def run_rag_pipeline(request: RunRequest):
86
  try:
87
+ total_pipeline_start_time = time.perf_counter()
88
+ timings = {}
89
  print("--- Kicking off RAG Pipeline with Parent-Child Strategy ---")
90
 
91
+ # --- STAGE 1: DOCUMENT INGESTION (Parsing) ---
92
+ parse_start = time.perf_counter()
93
  markdown_content = await ingest_and_parse_document(request.document_url)
94
+ timings["1_parsing"] = time.perf_counter() - parse_start
95
+ print(f"Time taken for Parsing: {timings['1_parsing']:.4f} seconds.")
96
 
97
  # --- STAGE 2: PARENT-CHILD CHUNKING ---
98
+ chunk_start = time.perf_counter()
99
  child_documents, docstore, _ = create_parent_child_chunks(markdown_content)
100
+ timings["2_chunking"] = time.perf_counter() - chunk_start
101
+ print(f"Time taken for Parent-Child Chunking: {timings['2_chunking']:.4f} seconds.")
102
 
103
  if not child_documents:
104
  raise HTTPException(status_code=400, detail="Document could not be processed into chunks.")
105
 
106
+ # --- STAGE 3: INDEXING (Embedding) ---
107
+ index_start = time.perf_counter()
108
  retriever.index(child_documents, docstore)
109
+ timings["3_indexing_and_embedding"] = time.perf_counter() - index_start
110
+ print(f"Time taken for Indexing (incl. Embeddings): {timings['3_indexing_and_embedding']:.4f} seconds.")
111
 
112
  # --- CONCURRENT WORKFLOW ---
113
+ # Step A: Concurrently generate hypothetical documents (HyDE)
114
+ print("Generating hypothetical documents...")
115
+ hyde_start = time.perf_counter()
116
  hyde_tasks = [generate_hypothetical_document(q, GROQ_API_KEY) for q in request.questions]
117
  all_hyde_docs = await asyncio.gather(*hyde_tasks)
118
+ timings["4_hyde_generation_total"] = time.perf_counter() - hyde_start
119
+ print(f"Time taken for HyDE Generation (total): {timings['4_hyde_generation_total']:.4f} seconds.")
120
 
121
+ # Step B: Concurrently retrieve relevant chunks
122
+ print("Retrieving chunks...")
123
+ retrieval_start = time.perf_counter()
124
  retrieval_tasks = [
125
  retriever.retrieve(q, hyde_doc)
126
  for q, hyde_doc in zip(request.questions, all_hyde_docs)
127
  ]
128
  all_retrieved_chunks = await asyncio.gather(*retrieval_tasks)
129
+ timings["5_retrieval_total"] = time.perf_counter() - retrieval_start
130
+ print(f"Time taken for Retrieval (total): {timings['5_retrieval_total']:.4f} seconds.")
131
 
132
+ # Step C: Concurrently generate final answers
133
+ print("Generating final answers...")
134
+ generation_start = time.perf_counter()
135
  answer_tasks = [
136
  generate_answer(q, chunks, GROQ_API_KEY)
137
  for q, chunks in zip(request.questions, all_retrieved_chunks)
138
  ]
139
  final_answers = await asyncio.gather(*answer_tasks)
140
+ timings["6_answer_generation_total"] = time.perf_counter() - generation_start
141
+ print(f"Time taken for Answer Generation (total): {timings['6_answer_generation_total']:.4f} seconds.")
142
+
143
+ timings["total_pipeline_time"] = time.perf_counter() - total_pipeline_start_time
144
+ print("\n--- RAG Pipeline Completed Successfully ---")
145
+ print(f"--- Total Pipeline Time: {timings['total_pipeline_time']:.4f} seconds ---")
146
+ print("--- Timing Breakdown ---")
147
+ for stage, duration in timings.items():
148
+ print(f"- {stage}: {duration:.4f} seconds")
149
 
 
150
  return RunResponse(answers=final_answers)
151
 
152
  except Exception as e: