TilanB commited on
Commit
c956e36
·
verified ·
1 Parent(s): 110632f

Update intelligence/orchestrator.py

Browse files
Files changed (1) hide show
  1. intelligence/orchestrator.py +154 -148
intelligence/orchestrator.py CHANGED
@@ -7,8 +7,10 @@ Defines the multi-agent orchestrator that:
7
  3. Selects the best answer through verification
8
  4. Provides feedback loop for iterative improvement
9
  """
10
- from langgraph.graph import StateGraph, END
11
- from typing import TypedDict, List, Dict, Any, Optional
 
 
12
  from langchain_core.documents import Document
13
  from langchain_core.retrievers import BaseRetriever
14
  import logging
@@ -22,8 +24,14 @@ from configuration.parameters import parameters
22
  logger = logging.getLogger(__name__)
23
 
24
 
25
- class AgentState(TypedDict):
26
- """State object passed between orchestrator nodes."""
 
 
 
 
 
 
27
  question: str
28
  documents: List[Document]
29
  draft_answer: str
@@ -35,27 +43,22 @@ class AgentState(TypedDict):
35
  query_used: str
36
  candidate_answers: List[str]
37
  selection_reasoning: str
38
- # For multi-question support
39
  is_multi_query: bool
40
  sub_queries: List[str]
41
- sub_answers: List[str]
42
 
43
 
44
  class AgentWorkflow:
45
- """
46
- Orchestrates multi-agent orchestrator for document Q&A.
47
- """
48
-
49
  MAX_RESEARCH_ATTEMPTS: int = parameters.MAX_RESEARCH_ATTEMPTS
50
  NUM_RESEARCH_CANDIDATES: int = parameters.NUM_RESEARCH_CANDIDATES
51
-
52
  def __init__(self, num_candidates: int = None) -> None:
53
- """Initialize orchestrator with required agents."""
54
  logger.info("Initializing AgentWorkflow...")
55
  self.researcher = ResearchAgent()
56
  self.verifier = VerificationAgent()
57
  self.context_validator = ContextValidator()
58
- self.compiled_orchestrator = None
 
59
  self.llm = ChatGoogleGenerativeAI(
60
  model=parameters.LLM_MODEL_NAME,
61
  google_api_key=parameters.GOOGLE_API_KEY,
@@ -65,45 +68,110 @@ class AgentWorkflow:
65
  if num_candidates is not None:
66
  self.NUM_RESEARCH_CANDIDATES = num_candidates
67
  logger.info(f"AgentWorkflow initialized (candidates={self.NUM_RESEARCH_CANDIDATES})")
68
-
69
- def build_orchestrator(self) -> Any:
70
- """Create and compile the orchestrator graph."""
71
- logger.debug("Building orchestrator graph...")
72
- orchestrator = StateGraph(AgentState)
73
-
74
- orchestrator.add_node("detect_query_type", self._detect_query_type)
75
- orchestrator.add_node("process_sub_queries", self._process_sub_queries_step)
76
- orchestrator.add_node("combine_answers", self._combine_answers_step)
77
- orchestrator.add_node("check_relevance", self._check_relevance_step)
78
- orchestrator.add_node("research", self._research_step)
79
- orchestrator.add_node("verify", self._verification_step)
80
-
81
- orchestrator.set_entry_point("detect_query_type")
82
- orchestrator.add_conditional_edges(
83
- "detect_query_type",
84
- lambda state: "multi" if state.get("is_multi_query") else "single",
85
- {"multi": "process_sub_queries", "single": "check_relevance"}
86
- )
87
- orchestrator.add_edge("process_sub_queries", "combine_answers")
88
- orchestrator.add_edge("combine_answers", END)
89
- orchestrator.add_conditional_edges(
 
 
 
 
 
90
  "check_relevance",
91
  self._decide_after_relevance_check,
92
- {"relevant": "research", "irrelevant": END}
93
  )
94
- orchestrator.add_edge("research", "verify")
95
- orchestrator.add_conditional_edges(
96
  "verify",
97
  self._decide_next_step,
98
- {"re_research": "research", "end": END}
99
  )
100
-
101
- return orchestrator.compile()
102
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
  def _detect_query_type(self, state: AgentState) -> Dict[str, Any]:
104
- """
105
- Use LLM to detect if the question is multi-part and decompose it if so.
106
- """
107
  prompt = f"""
108
  You are an expert assistant for document Q&A. Analyze the following question and determine:
109
  1. Is it a single question or does it contain multiple sub-questions?
@@ -119,7 +187,6 @@ Question: {state['question']}
119
  response = self.llm.invoke(prompt)
120
  import json
121
  content = response.content if hasattr(response, "content") else str(response)
122
- # Try to extract JSON from the response
123
  start = content.find('{')
124
  end = content.rfind('}')
125
  if start != -1 and end != -1:
@@ -128,7 +195,6 @@ Question: {state['question']}
128
  is_multi = bool(result.get("is_multi_query", False))
129
  sub_queries = result.get("sub_queries", [])
130
  else:
131
- # Fallback: treat as single question
132
  is_multi = False
133
  sub_queries = [state["question"]]
134
  except Exception as e:
@@ -141,145 +207,72 @@ Question: {state['question']}
141
  logger.info("[LLM Decompose] Single question detected; no decomposition needed.")
142
  return {"is_multi_query": is_multi, "sub_queries": sub_queries}
143
 
144
- def _process_sub_queries_step(self, state: AgentState) -> Dict[str, Any]:
145
- sub_answers = []
146
- logger.info(f"[Decompose] Processing {len(state['sub_queries'])} sub-queries...")
147
- for sub_query in state["sub_queries"]:
148
- logger.info(f"[Decompose] Processing sub-query: {sub_query}")
149
- sub_state = state.copy()
150
- sub_state["question"] = sub_query
151
- rel = self._check_relevance_step(sub_state)
152
- if not rel.get("is_relevant"):
153
- logger.warning(f"[Decompose] Sub-query not relevant: {sub_query}")
154
- sub_answers.append(rel.get("draft_answer", "No answer found."))
155
- continue
156
- sub_state.update(rel)
157
- research = self._research_step(sub_state)
158
- sub_state.update(research)
159
- verify = self._verification_step(sub_state)
160
- sub_state.update(verify)
161
- sub_answers.append(sub_state["draft_answer"])
162
- logger.info(f"[Decompose] Sub-query answers: {sub_answers}")
163
- return {"sub_answers": sub_answers}
164
-
165
- def _combine_answers_step(self, state: AgentState) -> Dict[str, Any]:
166
- logger.info(f"[Decompose] Combining {len(state['sub_answers'])} sub-answers into final answer.")
167
- combined = "\n\n".join(f"Q{i+1}: {q}\nA: {a}" for i, (q, a) in enumerate(zip(state["sub_queries"], state["sub_answers"])))
168
- return {"draft_answer": combined, "verification_report": "Multi-question answer combined."}
169
-
170
  def _check_relevance_step(self, state: AgentState) -> Dict[str, Any]:
171
  logger.debug("Checking context relevance...")
172
-
173
  result = self.context_validator.context_validate_with_rewrite(
174
  question=state["question"],
175
  retriever=state["retriever"],
176
- k=parameters.RELEVANCE_CHECK_K, # use config instead of hardcoding 20
177
  max_rewrites=parameters.MAX_QUERY_REWRITES,
178
  )
179
-
180
  classification = result.get("classification", "NO_MATCH")
181
  query_used = result.get("query_used", state["question"])
182
-
183
  logger.info(f"Relevance: {classification} (query_used={query_used[:80]})")
184
-
185
  if classification in ("CAN_ANSWER", "PARTIAL"):
186
- # ? ALWAYS retrieve docs for the query we�re actually going to answer
187
  documents = state["retriever"].invoke(query_used)
188
  return {
189
  "is_relevant": True,
190
  "query_used": query_used,
191
- "documents": documents
192
  }
193
-
194
  return {
195
  "is_relevant": False,
196
- "query_used": query_used,
197
  "draft_answer": "This question isn't related to the uploaded documents. Please ask another question.",
198
  }
199
 
200
  def _decide_after_relevance_check(self, state: AgentState) -> str:
201
- """Decide next step after relevance check."""
202
  return "relevant" if state["is_relevant"] else "irrelevant"
203
-
204
- def run_workflow(self, question: str, retriever: BaseRetriever) -> Dict[str, str]:
205
- """
206
- Execute the full Q&A pipeline.
207
-
208
- Args:
209
- question: The user's question
210
- retriever: The retriever for document lookup
211
-
212
- Returns:
213
- Dict with 'draft_answer' and 'verification_report'
214
- """
215
- try:
216
- if self.compiled_orchestrator is None:
217
- self.compiled_orchestrator = self.build_orchestrator()
218
 
219
- initial_state: AgentState = {
220
- "question": question,
221
- "documents": [], # Let _check_relevance_step fill this
222
- "draft_answer": "",
223
- "verification_report": "",
224
- "is_relevant": False,
225
- "retriever": retriever,
226
- "feedback": None,
227
- "research_attempts": 0,
228
- "query_used": question,
229
- "candidate_answers": [],
230
- "selection_reasoning": "",
231
- "is_multi_query": False,
232
- "sub_queries": [],
233
- "sub_answers": []
234
- }
235
-
236
- final_state = self.compiled_orchestrator.invoke(initial_state)
237
-
238
- logger.info(f"Pipeline completed (attempts: {final_state.get('research_attempts', 1)})")
239
 
240
- return {
241
- "draft_answer": final_state["draft_answer"],
242
- "verification_report": final_state["verification_report"]
243
- }
244
- except Exception as e:
245
- logger.error(f"Pipeline failed: {e}", exc_info=True)
246
- raise RuntimeError(f"Workflow execution failed: {e}") from e
247
-
248
  def _verification_step(self, state: AgentState) -> Dict[str, Any]:
249
- """Select the best answer from candidates and verify it."""
250
  logger.debug("Selecting best answer from candidates...")
251
-
252
  candidate_answers = state.get("candidate_answers", []) or [state.get("draft_answer", "")]
253
-
254
- # Select the best answer from candidates
255
  selection_result = self.verifier.select_best_answer(
256
  candidate_answers=candidate_answers,
257
  documents=state["documents"],
258
- question=state["question"]
259
  )
260
-
261
  best_answer = selection_result["selected_answer"]
262
  selection_reasoning = selection_result.get("reasoning", "")
263
-
264
  logger.info(f"Selected candidate {selection_result['selected_index'] + 1} as best answer")
265
-
266
- # Verify the selected answer
267
  verification_result = self.verifier.check(
268
- answer=best_answer,
269
  documents=state["documents"],
270
  question=state["question"]
271
  )
272
-
273
- # Enhance verification report with selection info
274
  verification_report = verification_result["verification_report"]
275
  verification_report = f"**Candidates Evaluated:** {len(candidate_answers)}\n" + \
276
  f"**Selected Candidate:** {selection_result['selected_index'] + 1}\n" + \
277
  f"**Selection Confidence:** {selection_result.get('confidence', 'N/A')}\n" + \
278
  f"**Selection Reasoning:** {selection_reasoning}\n\n" + \
279
  verification_report
280
-
281
  feedback_for_research = verification_result.get("feedback")
282
-
283
  return {
284
  "draft_answer": best_answer,
285
  "verification_report": verification_report,
@@ -287,9 +280,8 @@ Question: {state['question']}
287
  "selection_reasoning": selection_reasoning,
288
  "should_retry": verification_result.get("should_retry", False),
289
  }
290
-
291
  def _decide_next_step(self, state: AgentState) -> str:
292
- """Decide whether to re-research or end orchestrator."""
293
  research_attempts = state.get("research_attempts", 1)
294
  should_retry = bool(state.get("should_retry", False))
295
  if should_retry and research_attempts < self.MAX_RESEARCH_ATTEMPTS:
@@ -297,23 +289,37 @@ Question: {state['question']}
297
  return "end"
298
 
299
  def _research_step(self, state: AgentState) -> Dict[str, Any]:
300
- """Generate multiple answer candidates using the research agent."""
301
  attempts = state.get("research_attempts", 0) + 1
302
  feedback_for_research = state.get("feedback_for_research")
303
  previous_answer = state.get("draft_answer") if feedback_for_research else None
304
  logger.info(f"Research step (attempt {attempts}/{self.MAX_RESEARCH_ATTEMPTS})")
305
- logger.info(f"Generating {self.NUM_RESEARCH_CANDIDATES} candidate answers...")
 
 
 
306
  candidate_answers = []
307
- for i in range(self.NUM_RESEARCH_CANDIDATES):
308
- logger.info(f"Generating candidate {i + 1}/{self.NUM_RESEARCH_CANDIDATES}")
 
309
  result = self.researcher.generate(
310
  question=state["question"],
311
  documents=state["documents"],
312
  feedback=feedback_for_research,
313
  previous_answer=previous_answer
314
  )
315
- candidate_answers.append(result["draft_answer"])
316
- logger.info(f"Generated {len(candidate_answers)} candidate answers")
 
 
 
 
 
 
 
 
 
 
 
317
  return {
318
  "candidate_answers": candidate_answers,
319
  "research_attempts": attempts,
 
7
  3. Selects the best answer through verification
8
  4. Provides feedback loop for iterative improvement
9
  """
10
+ from langgraph.graph import StateGraph, END, START
11
+ from langgraph.types import Send
12
+ from typing import TypedDict, List, Dict, Any, Optional, Annotated
13
+ import operator
14
  from langchain_core.documents import Document
15
  from langchain_core.retrievers import BaseRetriever
16
  import logging
 
24
  logger = logging.getLogger(__name__)
25
 
26
 
27
+ class SubQResult(TypedDict):
28
+ idx: int
29
+ question: str
30
+ answer: str
31
+ report: str
32
+
33
+
34
+ class AgentState(TypedDict, total=False):
35
  question: str
36
  documents: List[Document]
37
  draft_answer: str
 
43
  query_used: str
44
  candidate_answers: List[str]
45
  selection_reasoning: str
 
46
  is_multi_query: bool
47
  sub_queries: List[str]
48
+ sub_results: Annotated[List[SubQResult], operator.add]
49
 
50
 
51
  class AgentWorkflow:
 
 
 
 
52
  MAX_RESEARCH_ATTEMPTS: int = parameters.MAX_RESEARCH_ATTEMPTS
53
  NUM_RESEARCH_CANDIDATES: int = parameters.NUM_RESEARCH_CANDIDATES
54
+
55
  def __init__(self, num_candidates: int = None) -> None:
 
56
  logger.info("Initializing AgentWorkflow...")
57
  self.researcher = ResearchAgent()
58
  self.verifier = VerificationAgent()
59
  self.context_validator = ContextValidator()
60
+ self.compiled_single = None
61
+ self.compiled_main = None
62
  self.llm = ChatGoogleGenerativeAI(
63
  model=parameters.LLM_MODEL_NAME,
64
  google_api_key=parameters.GOOGLE_API_KEY,
 
68
  if num_candidates is not None:
69
  self.NUM_RESEARCH_CANDIDATES = num_candidates
70
  logger.info(f"AgentWorkflow initialized (candidates={self.NUM_RESEARCH_CANDIDATES})")
71
+
72
+ def _retrieve_docs(self, state: AgentState) -> Dict[str, Any]:
73
+ docs = state["retriever"].invoke(state["question"])
74
+ return {
75
+ "documents": docs,
76
+ "draft_answer": "",
77
+ "verification_report": "",
78
+ "is_relevant": False,
79
+ "feedback": None,
80
+ "feedback_for_research": None,
81
+ "contradictions_for_research": [],
82
+ "unsupported_claims_for_research": [],
83
+ "research_attempts": 0,
84
+ "candidate_answers": [],
85
+ "selection_reasoning": "",
86
+ "query_used": state["question"],
87
+ }
88
+
89
+ def _build_single_question_graph(self):
90
+ g = StateGraph(AgentState)
91
+ g.add_node("retrieve_docs", self._retrieve_docs)
92
+ g.add_node("check_relevance", self._check_relevance_step)
93
+ g.add_node("research", self._research_step)
94
+ g.add_node("verify", self._verification_step)
95
+ g.add_edge(START, "retrieve_docs")
96
+ g.add_edge("retrieve_docs", "check_relevance")
97
+ g.add_conditional_edges(
98
  "check_relevance",
99
  self._decide_after_relevance_check,
100
+ {"relevant": "research", "irrelevant": END},
101
  )
102
+ g.add_edge("research", "verify")
103
+ g.add_conditional_edges(
104
  "verify",
105
  self._decide_next_step,
106
+ {"re_research": "research", "end": END},
107
  )
108
+ return g.compile()
109
+
110
+ def _assign_workers(self, state: AgentState):
111
+ sends = []
112
+ for i, q in enumerate(state.get("sub_queries", [])):
113
+ sends.append(Send("subq_worker", {"question": q, "subq_idx": i, "retriever": state["retriever"]}))
114
+ return sends
115
+
116
+ def _subq_worker(self, state: AgentState) -> Dict[str, Any]:
117
+ subq_idx = state["subq_idx"]
118
+ q = state["question"]
119
+ result_state = self.compiled_single.invoke({
120
+ "question": q,
121
+ "retriever": state["retriever"],
122
+ "research_attempts": 0,
123
+ })
124
+ return {
125
+ "sub_results": [{
126
+ "idx": subq_idx,
127
+ "question": q,
128
+ "answer": result_state.get("draft_answer", ""),
129
+ "report": result_state.get("verification_report", ""),
130
+ }]
131
+ }
132
+
133
+ def _combine_answers(self, state: AgentState) -> Dict[str, Any]:
134
+ sub_results = sorted(state.get("sub_results", []), key=lambda r: r["idx"])
135
+ combined = "\n\n".join(
136
+ f"Q{i+1}: {r['question']}\nA: {r['answer']}"
137
+ for i, r in enumerate(sub_results)
138
+ )
139
+ return {
140
+ "draft_answer": combined,
141
+ "verification_report": "Multi-question answer combined."
142
+ }
143
+
144
+ def build_orchestrator(self) -> Any:
145
+ self.compiled_single = self._build_single_question_graph()
146
+ g = StateGraph(AgentState)
147
+ g.add_node("detect_query_type", self._detect_query_type)
148
+ g.add_node("subq_worker", self._subq_worker)
149
+ g.add_node("combine_answers", self._combine_answers)
150
+ def run_single(state: AgentState) -> Dict[str, Any]:
151
+ out = self.compiled_single.invoke({
152
+ "question": state["question"],
153
+ "retriever": state["retriever"],
154
+ "research_attempts": 0,
155
+ })
156
+ return {
157
+ "draft_answer": out.get("draft_answer", ""),
158
+ "verification_report": out.get("verification_report", ""),
159
+ }
160
+ g.add_node("run_single", run_single)
161
+ g.set_entry_point("detect_query_type")
162
+ g.add_conditional_edges(
163
+ "detect_query_type",
164
+ lambda s: "multi" if s.get("is_multi_query") else "single",
165
+ {"multi": "fanout", "single": "run_single"},
166
+ )
167
+ g.add_node("fanout", lambda s: {})
168
+ g.add_conditional_edges("fanout", self._assign_workers, ["subq_worker"])
169
+ g.add_edge("subq_worker", "combine_answers")
170
+ g.add_edge("combine_answers", END)
171
+ g.add_edge("run_single", END)
172
+ return g.compile()
173
+
174
  def _detect_query_type(self, state: AgentState) -> Dict[str, Any]:
 
 
 
175
  prompt = f"""
176
  You are an expert assistant for document Q&A. Analyze the following question and determine:
177
  1. Is it a single question or does it contain multiple sub-questions?
 
187
  response = self.llm.invoke(prompt)
188
  import json
189
  content = response.content if hasattr(response, "content") else str(response)
 
190
  start = content.find('{')
191
  end = content.rfind('}')
192
  if start != -1 and end != -1:
 
195
  is_multi = bool(result.get("is_multi_query", False))
196
  sub_queries = result.get("sub_queries", [])
197
  else:
 
198
  is_multi = False
199
  sub_queries = [state["question"]]
200
  except Exception as e:
 
207
  logger.info("[LLM Decompose] Single question detected; no decomposition needed.")
208
  return {"is_multi_query": is_multi, "sub_queries": sub_queries}
209
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210
  def _check_relevance_step(self, state: AgentState) -> Dict[str, Any]:
211
  logger.debug("Checking context relevance...")
 
212
  result = self.context_validator.context_validate_with_rewrite(
213
  question=state["question"],
214
  retriever=state["retriever"],
215
+ k=parameters.RELEVANCE_CHECK_K,
216
  max_rewrites=parameters.MAX_QUERY_REWRITES,
217
  )
 
218
  classification = result.get("classification", "NO_MATCH")
219
  query_used = result.get("query_used", state["question"])
 
220
  logger.info(f"Relevance: {classification} (query_used={query_used[:80]})")
 
221
  if classification in ("CAN_ANSWER", "PARTIAL"):
 
222
  documents = state["retriever"].invoke(query_used)
223
  return {
224
  "is_relevant": True,
225
  "query_used": query_used,
226
+ "documents": documents
227
  }
 
228
  return {
229
  "is_relevant": False,
230
+ "query_used": query_used,
231
  "draft_answer": "This question isn't related to the uploaded documents. Please ask another question.",
232
  }
233
 
234
  def _decide_after_relevance_check(self, state: AgentState) -> str:
 
235
  return "relevant" if state["is_relevant"] else "irrelevant"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
 
237
+ def run_workflow(self, question: str, retriever: BaseRetriever) -> Dict[str, str]:
238
+ if self.compiled_main is None:
239
+ self.compiled_main = self.build_orchestrator()
240
+ initial_state: AgentState = {
241
+ "question": question,
242
+ "retriever": retriever,
243
+ "sub_results": [],
244
+ "sub_queries": [],
245
+ "is_multi_query": False,
246
+ }
247
+ final = self.compiled_main.invoke(initial_state)
248
+ return {
249
+ "draft_answer": final.get("draft_answer", ""),
250
+ "verification_report": final.get("verification_report", ""),
251
+ }
 
 
 
 
 
252
 
 
 
 
 
 
 
 
 
253
  def _verification_step(self, state: AgentState) -> Dict[str, Any]:
 
254
  logger.debug("Selecting best answer from candidates...")
 
255
  candidate_answers = state.get("candidate_answers", []) or [state.get("draft_answer", "")]
 
 
256
  selection_result = self.verifier.select_best_answer(
257
  candidate_answers=candidate_answers,
258
  documents=state["documents"],
259
+ question=state["question"]
260
  )
 
261
  best_answer = selection_result["selected_answer"]
262
  selection_reasoning = selection_result.get("reasoning", "")
 
263
  logger.info(f"Selected candidate {selection_result['selected_index'] + 1} as best answer")
 
 
264
  verification_result = self.verifier.check(
265
+ answer=best_answer,
266
  documents=state["documents"],
267
  question=state["question"]
268
  )
 
 
269
  verification_report = verification_result["verification_report"]
270
  verification_report = f"**Candidates Evaluated:** {len(candidate_answers)}\n" + \
271
  f"**Selected Candidate:** {selection_result['selected_index'] + 1}\n" + \
272
  f"**Selection Confidence:** {selection_result.get('confidence', 'N/A')}\n" + \
273
  f"**Selection Reasoning:** {selection_reasoning}\n\n" + \
274
  verification_report
 
275
  feedback_for_research = verification_result.get("feedback")
 
276
  return {
277
  "draft_answer": best_answer,
278
  "verification_report": verification_report,
 
280
  "selection_reasoning": selection_reasoning,
281
  "should_retry": verification_result.get("should_retry", False),
282
  }
283
+
284
  def _decide_next_step(self, state: AgentState) -> str:
 
285
  research_attempts = state.get("research_attempts", 1)
286
  should_retry = bool(state.get("should_retry", False))
287
  if should_retry and research_attempts < self.MAX_RESEARCH_ATTEMPTS:
 
289
  return "end"
290
 
291
  def _research_step(self, state: AgentState) -> Dict[str, Any]:
 
292
  attempts = state.get("research_attempts", 0) + 1
293
  feedback_for_research = state.get("feedback_for_research")
294
  previous_answer = state.get("draft_answer") if feedback_for_research else None
295
  logger.info(f"Research step (attempt {attempts}/{self.MAX_RESEARCH_ATTEMPTS})")
296
+ logger.info(f"Generating {self.NUM_RESEARCH_CANDIDATES} candidate answers in parallel...")
297
+
298
+ # Parallel candidate generation for 2× speedup
299
+ import concurrent.futures
300
  candidate_answers = []
301
+
302
+ def generate_candidate(index):
303
+ logger.info(f"Generating candidate {index + 1}/{self.NUM_RESEARCH_CANDIDATES}")
304
  result = self.researcher.generate(
305
  question=state["question"],
306
  documents=state["documents"],
307
  feedback=feedback_for_research,
308
  previous_answer=previous_answer
309
  )
310
+ return result["draft_answer"]
311
+
312
+ # Use ThreadPoolExecutor for parallel LLM API calls (I/O-bound)
313
+ with concurrent.futures.ThreadPoolExecutor(max_workers=self.NUM_RESEARCH_CANDIDATES) as executor:
314
+ futures = [executor.submit(generate_candidate, i) for i in range(self.NUM_RESEARCH_CANDIDATES)]
315
+ for future in concurrent.futures.as_completed(futures):
316
+ try:
317
+ candidate_answers.append(future.result())
318
+ except Exception as e:
319
+ logger.error(f"Candidate generation failed: {e}")
320
+ # Continue with other candidates even if one fails
321
+
322
+ logger.info(f"Generated {len(candidate_answers)} candidate answers in parallel")
323
  return {
324
  "candidate_answers": candidate_answers,
325
  "research_attempts": attempts,