WanIrfan commited on
Commit
caf1cab
·
verified ·
1 Parent(s): 00be4d0

Update src/doc_qa.py

Browse files
Files changed (1) hide show
  1. src/doc_qa.py +724 -750
src/doc_qa.py CHANGED
@@ -1,751 +1,725 @@
1
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
2
- from langchain_classic import hub
3
- from langchain_google_genai import ChatGoogleGenerativeAI
4
- from langchain_classic.chains.combine_documents import create_stuff_documents_chain
5
- from langchain_core.tools import Tool
6
- from langchain_community.tools.tavily_search import TavilySearchResults
7
- from langchain_community.retrievers import BM25Retriever
8
- from concurrent.futures import ThreadPoolExecutor, as_completed
9
- from langchain_core.output_parsers import JsonOutputParser
10
- from langchain_classic.agents import AgentExecutor, create_react_agent
11
- from langchain_core.documents import Document
12
- from langchain_core.messages import AIMessage, HumanMessage
13
- from langchain_chroma import Chroma
14
- from langchain_core.agents import AgentAction
15
- from langchain_google_genai import GoogleGenerativeAIEmbeddings
16
- from flashrank import Ranker, RerankRequest
17
- from src.metrics_tracker import MetricsTracker
18
- import logging
19
-
20
-
21
- # Setup logging
22
- logging.basicConfig(level=logging.DEBUG)
23
- logger = logging.getLogger(__name__)
24
-
25
- class ContextRetriever:
26
- def __init__(self, retriever):
27
- self.retriever = retriever
28
-
29
- def deduplicate_context(self, context_list):
30
- """Deduplicate context entries to avoid repetition."""
31
- seen = set()
32
- deduped = []
33
- for item in context_list:
34
- if item not in seen:
35
- seen.add(item)
36
- deduped.append(item)
37
- return "\n".join(deduped) if deduped else "No relevant context found."
38
-
39
- def retrieve(self, query, top_k=5):
40
- """
41
- Retrieve the top-k relevant contexts from ChromaDB based on the query.
42
-
43
- Args:
44
- query (str): The query or prediction to search for.
45
- top_k (int): Number of top results to return (default: 3).
46
-
47
- Returns:
48
- str: Deduplicated context string from the top-k results.
49
- """
50
- logger.info(f"Retrieving for query: {query}")
51
- try:
52
- # Perform similarity search using ChromaDB retriever
53
- results = self.retriever.invoke(query, k=top_k)
54
- logger.info(f"Retrieved documents: {[doc.metadata.get('source') for doc in results]}")
55
-
56
- # Extract the page content (context) from each result
57
- contexts = [doc.page_content for doc in results]
58
- logger.info(f"Context : {contexts}")
59
-
60
- # Deduplicate the contexts
61
- deduped_context = self.deduplicate_context(contexts)
62
- logger.info(f"Deduplicated context: {deduped_context}")
63
-
64
- return deduped_context
65
- except Exception as e:
66
- logger.error(f"Retrieval error: {str(e)}")
67
- return "Retrieval failed due to error."
68
-
69
- class LLMComplexityAnalyzer:
70
- """
71
- Analyzes query complexity using an LLM to make a "managerial" decision
72
- on the optimal retrieval strategy.
73
- """
74
-
75
- def __init__(self, domain: str, llm: ChatGoogleGenerativeAI):
76
- self.domain = domain
77
- self.llm = llm
78
-
79
- self.system_prompt = (
80
- "You are a 'Complexity Analyzer' manager for a RAG (Retrieval-Augmented Generation) system. "
81
- "Your domain of expertise is: **{domain}**."
82
- "\n"
83
- "Your task is to analyze the user's query and determine its complexity. Based on this, "
84
- "you will decide how many documents (k) to retrieve. More complex queries require "
85
- "more documents to synthesize a good answer."
86
- "\n"
87
- "Here are the retrieval strategies:"
88
- "1. **simple**: For simple, direct fact-finding queries. (e.g., 'What is takaful?') "
89
- " - Set k = 5"
90
- "2. **moderate**: For queries that require explanation, some comparison, or have multiple parts. "
91
- " (e.g., 'What is the difference between madhab Shafi'i and Maliki on prayer?') "
92
- " - Set k = 10"
93
- "3. **complex**: For deep, nuanced, multi-step, or highly comparative/synthetic queries. "
94
- " (e.g., 'Explain in detail the treatment options for type 2 diabetes, comparing "
95
- " their side effects and suitability for elderly patients.')"
96
- " - Set k = 15"
97
- "\n"
98
- "Analyze the following query and provide your reasoning."
99
- "\n"
100
- "**IMPORTANT**: You MUST respond ONLY with a single, valid JSON object. Do not add any "
101
- "other text. The JSON object must have these three keys:"
102
- "- `complexity`: (string) Must be one of 'simple', 'moderate', or 'complex'."
103
- "- `k`: (integer) Must be 5, 10, or 15, corresponding to the complexity."
104
- "- `reasoning`: (string) A brief 1-sentence explanation for your decision."
105
- )
106
-
107
- self.prompt_template = ChatPromptTemplate.from_messages([
108
- ("system", self.system_prompt.format(domain=self.domain)),
109
- ("human", "{query}")
110
- ])
111
-
112
- self.output_parser = JsonOutputParser()
113
-
114
- # This chain will output a parsed dictionary
115
- self.chain = self.prompt_template | self.llm | self.output_parser
116
-
117
- logger.info(f"🧠 LLMComplexityAnalyzer initialized for '{self.domain}'")
118
-
119
- def analyze(self, query: str) -> dict:
120
- """
121
- Analyzes query complexity using an LLM and returns the retrieval strategy.
122
- """
123
- logger.info(f"🧠 LLMComplexityAnalyzer: Analyzing query...")
124
-
125
- try:
126
- # Invoke the chain to get the structured JSON output
127
- result = self.chain.invoke({"query": query})
128
-
129
- # Add a 'score' field for compatibility
130
- score_map = {"simple": 2, "moderate": 4, "complex": 6}
131
- result['score'] = score_map.get(result.get('complexity'), 0)
132
-
133
- logger.info(f"🧠 LLM Decision: {result.get('complexity').upper()} (k={result.get('k')})")
134
- logger.info(f" Reasoning: {result.get('reasoning')}")
135
-
136
- return result
137
-
138
- except Exception as e:
139
- # Fallback in case the LLM fails or returns bad JSON
140
- logger.error(f"❌ LLMComplexityAnalyzer failed: {e}. Defaulting to 'moderate' strategy.")
141
- return {
142
- "complexity": "moderate",
143
- "k": 12,
144
- "score": 4,
145
- "reasoning": "Fallback: LLM analysis or JSON parsing failed."
146
- }
147
-
148
-
149
- class SwarmRetriever:
150
- """
151
- Multi-retriever swarm that executes parallel retrieval strategies.
152
- Worker component that takes orders from LLMComplexityAnalyzer.
153
- """
154
-
155
- def __init__(self, chroma_retriever, documents):
156
- self.dense_retriever = chroma_retriever # Semantic search
157
- self.bm25_retriever = BM25Retriever.from_documents(documents) # Keyword search
158
- self.bm25_retriever.k = 20 # Set high, will be limited by k parameter
159
- logger.info("✅ SwarmRetriever initialized (Dense + BM25 workers)")
160
-
161
- def retrieve_with_swarm(self, query: str, k: int) -> list:
162
- """
163
- Execute multi-retriever swarm with parallel workers.
164
- """
165
- logger.info(f"🐝 Swarm deployment: {2} workers, target k={k}")
166
-
167
- # Define worker tasks
168
- retrieval_tasks = {
169
- "dense_semantic": lambda: self.dense_retriever.invoke(query, k=k),
170
- "bm25_keyword": lambda: self.bm25_retriever.invoke(query)[:k],
171
- }
172
-
173
- # Execute workers in parallel
174
- swarm_results = {}
175
- with ThreadPoolExecutor(max_workers=2) as executor:
176
- futures = {
177
- executor.submit(task): name
178
- for name, task in retrieval_tasks.items()
179
- }
180
-
181
- for future in as_completed(futures):
182
- worker_name = futures[future]
183
- try:
184
- results = future.result()
185
- swarm_results[worker_name] = results
186
- logger.info(f" ✅ Worker '{worker_name}': {len(results)} docs")
187
- except Exception as e:
188
- logger.error(f" ❌ Worker '{worker_name}' failed: {e}")
189
- swarm_results[worker_name] = []
190
-
191
- # Combine and deduplicate documents
192
- combined_docs = self._combine_and_deduplicate(swarm_results)
193
-
194
- return combined_docs
195
-
196
- def _combine_and_deduplicate(self, swarm_results: dict) -> list:
197
- """Combine results from all workers and remove duplicates."""
198
- all_docs = []
199
- seen_content = set()
200
- worker_contributions = {}
201
-
202
- for worker_name, docs in swarm_results.items():
203
- for doc in docs:
204
- # Use first 200 chars as hash to detect duplicates
205
- content_hash = hash(doc.page_content[:200])
206
-
207
- if content_hash not in seen_content:
208
- seen_content.add(content_hash)
209
-
210
- # Tag document with source worker
211
- doc.metadata['swarm_worker'] = worker_name
212
- all_docs.append(doc)
213
-
214
- # Track contributions
215
- worker_contributions[worker_name] = \
216
- worker_contributions.get(worker_name, 0) + 1
217
-
218
- logger.info(f"🐝 Swarm combined: {len(all_docs)} unique docs")
219
- logger.info(f" Worker contributions: {worker_contributions}")
220
-
221
- return all_docs
222
-
223
- class AgenticQA:
224
- def __init__(self, config=None):
225
- logger.info("Initializing AgenticQA...")
226
-
227
- # Load a small, fast reranker model. This runs locally.
228
- try:
229
- self.reranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2")
230
- logger.info("FlashRank Reranker loaded successfully.")
231
- except Exception as e:
232
- logger.error(f"Failed to load FlashRank reranker: {e}")
233
- self.reranker = None
234
-
235
- self.contextualize_q_system_prompt = (
236
- "Given a chat history and the latest user question which might reference context in the chat history, "
237
- "formulate a standalone question which can be understood without the chat history. "
238
- "IMPORTANT: DO NOT provide any answers or explanations. ONLY rephrase the question if needed. "
239
- "If the question is already clear and standalone, return it exactly as is. "
240
- "Output ONLY the reformulated question, nothing else."
241
- )
242
-
243
- self.contextualize_q_prompt = ChatPromptTemplate.from_messages(
244
- [("system", self.contextualize_q_system_prompt),
245
- MessagesPlaceholder("chat_history"),
246
- ("human", "{input}")]
247
- )
248
- self.qa_system_prompt = (
249
- "You are an assistant that answers questions in a specific domain for citizens mainly in Malaysia, "
250
- "depending on the context. "
251
- "You will receive:\n"
252
- " • domain = {domain} (either 'medical', 'islamic' , or 'insurance')\n"
253
- " • context = relevant retrieved passages\n"
254
- " • user question\n\n"
255
- "If the context does not contain the answer, **YOU MUST SAY 'I do not know'** or 'I cannot find that information in the provided documents.' Do not use your general knowledge.\n\n"
256
- "Instructions based on domain:\n"
257
- "1. If domain = 'medical' :\n"
258
- " - Answer the question in clear, simple layperson language, "
259
- " - Citing your sources (e.g. article name, section)."
260
- " - Add a medical disclaimer: “I am not a doctor…”.\n"
261
- "2. If domain = 'islamic':\n"
262
- " - **ALWAYS present both Shafi'i AND Maliki perspectives** if the question is about fiqh/rulings\n"
263
- " - **Cite specific sources**: Always mention the book name (e.g., 'According to Muwatta Imam Malik...', 'Minhaj al-Talibin states...', 'Umdat al-Salik explains...')\n"
264
- " - **Structure answer as**:\n"
265
- " - Shafi'i view (from Umdat al-Salik/Minhaj): [ruling with citation]\n"
266
- " - Maliki view (from Muwatta): [ruling with citation]\n"
267
- " - If they agree: mention the consensus\n"
268
- " - If they differ: present both views objectively without favoring one\n"
269
- " - **For hadith questions**: provide the narration text, source (book name, hadith number)\n "
270
- " - - **If ruling has EXCEPTIONS** (like 'except for...', 'unless...'), YOU MUST include them. "
271
- " If context doesn't show exceptions but the ruling seems absolute, indicate this uncertainty.\n"
272
- " - If the context does not contain relevant information from BOTH madhabs, acknowledge which sources you have "
273
- " (e.g., 'Based on Shafi'i sources only...') and suggest consulting additional madhab resources.\n"
274
- " - **Always end with**: 'This is not a fatwa. Consult a local scholar for guidance specific to your situation.'\n"
275
- " - Always include hadith narration or quran verse as evidence (if it exists) in the final response "
276
- " - Keep answers concise but comprehensive enough to show different scholarly views.\n\n"
277
-
278
- "3. If domain = 'insurance':\n"
279
- " - Your knowledge is STRICTLY limited to Etiqa Takaful (Motor and Car policies).\n"
280
- " - First, try to answer ONLY using the provided <context>.\n"
281
- " - **If the answer is not in the context, YOU MUST SAY 'I do not have information on that specific topic.'** Do not make up an answer.\n"
282
- " - If the user asks about other Etiqa products (e.g., medical, travel), you MUST use the 'EtiqaWebSearch' tool.\n"
283
- " - If the user asks about another insurance company (e.g., 'Prudential', 'Takaful Ikhlas'), state that you can only answer about Etiqa Takaful.\n"
284
- " - If the user asks a general insurance question (e.g., 'What is takaful?', 'What is an excess?'), use the 'GeneralWebSearch' tool.\n"
285
-
286
- "4. For ALL domains: If the context does not contain the answer, do not make one up. Be honest.\n\n"
287
- "Context:\n"
288
- "{context}"
289
- )
290
-
291
- self.qa_prompt = ChatPromptTemplate.from_messages(
292
- [("system", self.qa_system_prompt),
293
- MessagesPlaceholder("chat_history"),
294
- ("human", "{input}")]
295
- )
296
- self.llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash",temperature=0.05)
297
- # --- START: NEW QUERY REFINER ---
298
- self.refiner_system_prompt = (
299
- "You are an expert search query refiner. Your task is to take a user's question "
300
- "and rewrite it to be a perfect, concise search query for a database. "
301
- "Remove all conversational fluff, emotion, and filler words. "
302
- "Distill the query to its core semantic intent. "
303
- "For example:"
304
- "- 'Hi, I was wondering if I can touch a dog if I found it is cute?' becomes 'ruling on touching a dog in islam'"
305
- "- 'What are the treatments for, like, a common cold?' becomes 'common cold treatment options'"
306
- "- 'Tell me about diabetes' becomes 'what is diabetes'"
307
- "Output ONLY the refined query, nothing else."
308
- )
309
-
310
- self.refiner_prompt = ChatPromptTemplate.from_messages([
311
- ("system", self.refiner_system_prompt),
312
- ("human", "{query}")
313
- ])
314
-
315
- self.refiner_chain = self.refiner_prompt | self.llm
316
- logger.info("✅ Query Refiner chain initialized.")
317
- # --- END: NEW QUERY REFINER ---
318
-
319
- self.react_docstore_prompt = hub.pull("aallali/react_tool_priority")
320
- self.answer_validator = AnswerValidatorAgent(self.llm)
321
-
322
- self.retriever = None
323
- self.agent_executor = None
324
- self.tools = [] # Initialize the attribute
325
- self.domain = "general"
326
- self.answer_validator = None
327
- self.retrieval_agent = None
328
-
329
- if config:
330
- logger.info(f"Configuring AgenticQA with provided config: {config}")
331
- try:
332
- collection_name = config["retriever"]["collection_name"]
333
- persist_directory = config["retriever"]["persist_directory"]
334
- self.domain = config.get("domain", "general") # Get domain from config
335
-
336
- # 1. Initialize the embedding function
337
- embedding_function = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004")
338
-
339
- # 2. Connect to the persistent ChromaDB
340
- db_client = Chroma(
341
- persist_directory=persist_directory,
342
- embedding_function=embedding_function,
343
- collection_name=collection_name
344
- )
345
-
346
- # 3. Set the retriever for this instance
347
- self.retriever = db_client.as_retriever()
348
- logger.info(f"✅ Successfully created retriever for collection '{collection_name}'")
349
- # --- START: NEW SWARM INITIALIZATION ---
350
- logger.info("Initializing Swarm components...")
351
- # Get all documents from Chroma for BM25
352
- all_docs_data = db_client.get()
353
- docs_for_bm25 = [
354
- Document(page_content=content, metadata=meta)
355
- for content, meta in zip(
356
- all_docs_data['documents'],
357
- all_docs_data['metadatas']
358
- )
359
- ]
360
-
361
- # Initialize SwarmRetriever (Workers)
362
- self.swarm_retriever = SwarmRetriever(self.retriever, docs_for_bm25)
363
-
364
- # Initialize LLMComplexityAnalyzer (Manager)
365
- self.complexity_analyzer = LLMComplexityAnalyzer(self.domain, self.llm)
366
- logger.info("✅ Swarm components (Manager + Workers) initialized.")
367
- # --- END: NEW SWARM INITIALIZATION ---
368
- self.metrics_tracker = MetricsTracker(save_path=f"metrics_{self.domain}.json")
369
- logger.info("✅ Metrics tracker initialized")
370
- # Initialize validator *after* setting domain
371
- self.answer_validator = AnswerValidatorAgent(self.llm, self.domain)
372
- # --- This is the new, simple QA chain that will be used *after* reranking ---
373
- self.qa_chain = create_stuff_documents_chain(self.llm, self.qa_prompt)
374
-
375
- self._initialize_agent()
376
-
377
- except Exception as e:
378
- logger.error(f"❌ Error during AgenticQA setup for '{self.domain}': {e}", exc_info=True)
379
- else:
380
- logger.warning("⚠️ AgenticQA initialized without a config. Retriever will be None.")
381
-
382
- # --- 5. NEW UPGRADED RAG FUNCTION ---
383
- # This is our new, smarter "worker" function that includes the reranker.
384
- def _run_rag_with_reranking(self, query: str, chat_history: list) -> str:
385
- """
386
- Enhanced Swarm-RAG pipeline with adaptive retrieval and reranking.
387
-
388
- Pipeline:
389
- 1. Contextualize query
390
- 2. Refine query
391
- 3. ComplexityAnalyzer (Manager) determines optimal k
392
- 4. SwarmRetriever (Workers) deploys parallel retrievers with k
393
- 5. Rerank combined swarm results
394
- 6. Filter results by threshold
395
- 7. Generate Answer
396
- """
397
- logger.info(f"--- 🐝 SWARM RAG (with Reranker) PIPELINE RUNNING for query: '{query}' ---")
398
-
399
- if not self.reranker or not self.swarm_retriever or not self.complexity_analyzer:
400
- logger.error("Swarm components or Reranker not initialized. Cannot perform RAG.")
401
- return "Error: RAG components are not available."
402
-
403
- try:
404
- # 1. Contextualize query
405
- standalone_query = query
406
- if chat_history:
407
- contextualize_chain = self.contextualize_q_prompt | self.llm
408
- response = contextualize_chain.invoke({"chat_history": chat_history, "input": query})
409
- standalone_query = response.content
410
- logger.info(f"Contextualized query: '{standalone_query}'")
411
-
412
- # 2 - REFINE QUERY ---
413
- logger.info("Refining query for search...")
414
- response = self.refiner_chain.invoke({"query": standalone_query})
415
- refined_query = response.content.strip()
416
- logger.info(f"Refined query: '{refined_query}'")
417
-
418
-
419
- # 3. Complexity analysis
420
- analysis = self.complexity_analyzer.analyze(standalone_query)
421
- k = analysis['k']
422
- self._last_complexity_analysis = analysis
423
- logger.info(f"Query complexity: {analysis['complexity'].upper()} | k={k}")
424
-
425
- # 4. Retrieve with Swarm (Workers)
426
- swarm_docs = self.swarm_retriever.retrieve_with_swarm(standalone_query, k=k)
427
-
428
- if not swarm_docs:
429
- logger.warning("Swarm Retriever found no documents.")
430
- return "I do not know the answer to that as it is not in my documents."
431
-
432
- # 5. Format for Reranker
433
- passages = [
434
- {"id": i, "text": doc.page_content, "meta": doc.metadata}
435
- for i, doc in enumerate(swarm_docs)
436
- ]
437
-
438
- # 6. Rerank
439
- logger.info(f"Reranking {len(passages)} swarm-retrieved documents...")
440
- rerank_request = RerankRequest(query=standalone_query, passages=passages)
441
- reranked_results = self.reranker.rerank(rerank_request)
442
-
443
- top_score = reranked_results[0]['score'] if reranked_results else 0
444
- logger.info(f"Reranking complete. Top score: {top_score:.3f}")
445
-
446
- # 7. Filter
447
- threshold = 0.1
448
- if self.domain == "islamic":
449
- threshold = 0.05
450
- elif self.domain == "medical":
451
- threshold = 0.15
452
- else:
453
- threshold = 0.10
454
-
455
- logger.info(f"Using threshold={threshold} for {self.domain} domain")
456
- final_docs = []
457
- worker_contributions = {}
458
-
459
- for result in reranked_results:
460
- if result['score'] > threshold:
461
- # Re-create the Document object from reranked data
462
- doc = Document(
463
- page_content=result['text'],
464
- metadata=result.get('meta', {})
465
- )
466
- final_docs.append(doc)
467
-
468
- # Track worker contributions in final answer
469
- worker = result.get('meta', {}).get('swarm_worker', 'unknown')
470
- worker_contributions[worker] = \
471
- worker_contributions.get(worker, 0) + 1
472
-
473
- logger.info(f"Filtered to {len(final_docs)} documents above threshold {threshold}.")
474
- logger.info(f"Final doc contributions: {worker_contributions}")
475
-
476
- self.metrics_tracker.log_worker_contribution(worker_contributions)
477
- # 8. Respond
478
- if not final_docs:
479
- logger.warning("No documents passed the reranker threshold. Returning 'I don't know.'")
480
- return "I do not know the answer to that as my document search found no relevant information."
481
-
482
- # Call the QA chain with the *reranked, filtered* docs
483
- response = self.qa_chain.invoke({
484
- "context": final_docs,
485
- "chat_history": chat_history,
486
- "input": query,
487
- "domain": self.domain
488
- })
489
-
490
- logger.info("🐝 Swarm RAG pipeline complete. Returning answer.")
491
- return response
492
-
493
- except Exception as e:
494
- logger.error(f"Error in Swarm RAG pipeline: {e}", exc_info=True)
495
- return "An error occurred while processing your request."
496
-
497
- def _initialize_agent(self):
498
- """Build the ReAct agent"""
499
- """A helper function to build the agent components."""
500
-
501
- logger.info(f"Initializing agent for domain: '{self.domain}'")
502
- self.context_retriever = ContextRetriever(self.retriever)
503
-
504
- # Store chat_history as instance variable so tools can access it
505
- self._current_chat_history = []
506
-
507
- # We need a RAG chain for the tool
508
- # history_aware_retriever = create_history_aware_retriever(self.llm, self.retriever, self.contextualize_q_prompt)
509
- # question_answer_chain = create_stuff_documents_chain(self.llm, self.qa_prompt)
510
- # rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
511
-
512
- def rag_tool_wrapper(query: str) -> str:
513
- """Wrapper to pass chat history to RAG pipeline."""
514
- return self._run_rag_with_reranking(query, self._current_chat_history)
515
-
516
- self.tools = [
517
- Tool(
518
- name="RAG",
519
- func=rag_tool_wrapper,
520
- description=(f"Use this tool FIRST to search and answer questions about the {self.domain} domain using internal vector database.")
521
- )
522
-
523
- ]
524
-
525
- # --- DOMAIN-SPECIFIC TOOLS ---
526
- if self.domain == "insurance":
527
- # Add a specific tool for searching Etiqa's website
528
- etiqa_search_tool = TavilySearchResults(max_results=3)
529
- etiqa_search_tool.description = "Use this tool to search the Etiqa Takaful website for products NOT in the RAG context (e.g., medical, travel)."
530
- # This is a bit of a "hack" to force Tavily to search a specific site.
531
- # We modify the function it calls.
532
- original_etiqa_func = etiqa_search_tool.invoke
533
- def etiqa_site_search(query):
534
- return original_etiqa_func(f"site:etiqa.com.my {query}")
535
-
536
- self.tools.append(Tool(
537
- name="EtiqaWebSearch",
538
- func=etiqa_site_search,
539
- description=etiqa_search_tool.description
540
- ))
541
-
542
- # Add a general web search tool
543
- self.tools.append(Tool(
544
- name="GeneralWebSearch",
545
- func=TavilySearchResults(max_results=2).invoke,
546
- description="Use this tool as a fallback for general, non-Etiqa questions (e.g., 'What is takaful?')."
547
- ))
548
- elif self.domain == "islamic":
549
- # Trusted Islamic sources for Malaysia
550
- islamic_search = TavilySearchResults(max_results=3)
551
-
552
- def islamic_trusted_search(query):
553
- # Search only trusted Malaysian Islamic authorities
554
- sites = "site:muftiwp.gov.my OR site:zulkiflialbakri.com"
555
- return islamic_search.invoke(f"{sites} {query}")
556
-
557
- self.tools.append(Tool(
558
- name="TrustedIslamicSearch",
559
- func=islamic_trusted_search,
560
- description=(
561
- "Use this tool if RAG has incomplete or no answer. "
562
- "Searches ONLY trusted Malaysian Islamic sources: "
563
- "Pejabat Mufti Wilayah Persekutuan (muftiwp.gov.my) and "
564
- "Dr Zulkifli Mohamad Al Bakri (zulkiflialbakri.com/category/soal-jawab-agama/). "
565
- "These follow Shafi'i madhab which is official in Malaysia."
566
- )
567
- ))
568
-
569
- # General fallback (last resort)
570
- self.tools.append(Tool(
571
- name="GeneralWebSearch",
572
- func=TavilySearchResults(max_results=2).invoke,
573
- description="Last resort: Use only for general Islamic terms or definitions not found in RAG or trusted sources."
574
- ))
575
- else:
576
- # Medical and Islamic domains only get the general web search fallback
577
- self.tools.append(Tool(
578
- name="GeneralWebSearch",
579
- func=TavilySearchResults(max_results=2).invoke,
580
- description="Use this tool as a fallback if the RAG tool finds no relevant information or if the query is about a general topic."
581
- ))
582
-
583
- agent = create_react_agent(llm=self.llm, tools=self.tools, prompt=self.react_docstore_prompt)
584
-
585
- self.agent_executor = AgentExecutor.from_agent_and_tools(
586
- agent=agent,
587
- tools=self.tools,
588
- handle_parsing_errors=True,
589
- verbose=True,
590
- return_intermediate_steps=True,
591
- max_iterations=5
592
- )
593
- logger.info(f"✅ Agent Executor(ReAct Agent) created successfully for '{self.domain}'.")
594
-
595
-
596
- def answer(self, query, chat_history=None):
597
- """
598
- Process a query using the agent and returns a clean dictionary.
599
-
600
- Args:
601
- query (str): User's question
602
- chat_history (list): List of previous messages (AIMessage, HumanMessage)
603
-
604
- Returns:
605
- dict: Contains 'answer', 'context', 'validation', 'source', 'thoughts'
606
- """
607
- if chat_history is None:
608
- chat_history = []
609
- self._current_chat_history = chat_history
610
- if not self.agent_executor:
611
- return {"answer": "Error: Agent not initialized.", "context": "", "validation": (False, "Init failed"), "source": "Error"}
612
- # START TIMING
613
- start_time = self.metrics_tracker.start_query()
614
- print(f"\n📝 AGENTIC_QA PROCESSING QUERY: '{query}'")
615
-
616
- response = self.agent_executor.invoke({
617
- "input": query,
618
- "chat_history": chat_history,
619
- "domain": self.domain, # Pass domain to agent
620
- "metadata": {
621
- "domain": self.domain
622
- }
623
- })
624
- thoughts= ""
625
-
626
- final_answer = response.get("output", "Could not generate an answer")
627
-
628
- tool_used = None
629
- if "intermediate_steps" in response:
630
- thought_log= []
631
- for step in response["intermediate_steps"]:
632
- # --- FIX: Unpack the (Action, Observation) tuple first ---
633
- action, observation = step
634
-
635
- if isinstance(action, AgentAction) and action.tool:
636
- tool_used = action.tool #Capture the last tool used
637
-
638
- # Append Thought, Action, Action Input & Observation
639
- thought_log.append(action.log)
640
- thought_log.append(f"\nObservation: {str(observation)}\n---")
641
-
642
- thoughts = "\n".join(thought_log)
643
-
644
- # Assign source based on the LAST tool used
645
- if tool_used == "RAG":
646
- source = "Etiqa Takaful Database" if self.domain == "insurance" else "Domain Database (RAG)"
647
- elif tool_used == "EtiqaWebSearch":
648
- source = "Etiqa Website Search"
649
- elif tool_used == "TrustedIslamicSearch":
650
- source = "Mufti WP & Dr Zul Search"
651
- elif tool_used == "GeneralWebSearch":
652
- source = "General Web Search"
653
- else:
654
- source = "Agent Logic"
655
-
656
- logger.info(f"Tool used: {tool_used}, Source determined: {source}")
657
-
658
- # Retrieve context only if the RAG tool was used
659
- # This call is inefficient (it runs a *second* retrieval), but it
660
- # respects your architecture and works for logging.
661
- context = "No RAG context retrieved."
662
- if source.endswith("(RAG)") or source.startswith("Etiqa Takaful Database"):
663
- if self.context_retriever:
664
- context = self.context_retriever.retrieve(query)
665
- else:
666
- context = "RAG tool was used, but ContextRetriever not initialized."
667
- elif "Web" in source:
668
- context = "Web search results were used. See 'Observation' in thoughts log."
669
-
670
- validation = self.answer_validator.validate(query, final_answer, source=source)
671
- # END TIMING
672
- response_time = self.metrics_tracker.end_query(start_time)
673
-
674
- complexity_info = getattr(self, '_last_complexity_analysis', None)
675
-
676
- # LOG METRICS
677
- self.metrics_tracker.log_query(
678
- query=query,
679
- domain=self.domain,
680
- source=source,
681
- complexity=complexity_info,
682
- validation=validation,
683
- response_time=response_time,
684
- answer_preview=final_answer
685
- )
686
- return {"answer": final_answer, "context": context, "validation": validation, "source": source, "thoughts": thoughts,"response_time": response_time,
687
- "complexity": complexity_info}
688
-
689
- class AnswerValidatorAgent:
690
- def __init__(self, llm, domain="general"):
691
- self.llm = llm
692
- self.domain = domain
693
- self.general_prompt = ChatPromptTemplate.from_messages([
694
- ("system", (
695
- "You are an answer validator. Check if the generated answer is factually correct "
696
- "and relevant to the query. Return 'Valid' if the answer is correct and relevant, "
697
- "or 'Invalid: [reason]' if not, where [reason] is a brief explanation of the issue."
698
- )),
699
- ("human", "Query: {query}\nAnswer: {answer}")
700
- ])
701
- self.medical_prompt = ChatPromptTemplate.from_messages([
702
- ("system", (
703
- "You are an answer validator. Check if the generated answer is factually correct, "
704
- "relevant to the query, and consistent with known medical knowledge. "
705
- "Return 'Valid' if the answer is correct and relevant, or 'Invalid: [reason]' if not, "
706
- "where [reason] is a brief explanation of the issue. "
707
- "**Pay close attention to contradictions.** If an answer gives advice and then "
708
- "contradicts it (e.g., 'switch immediately' and then 'always consult your doctor first'), "
709
- "it is **Invalid** because it is unsafe and confusing."
710
- )),
711
- ("human", "Query: {query}\nAnswer: {answer}")
712
- ])
713
- self.islamic_prompt = ChatPromptTemplate.from_messages([
714
- ("system", (
715
- "You are an answer validator for Islamic Fiqh or anything related to Islam. Check if the answer correctly addresses "
716
- "the query based on the provided sources. The answer should be neutral and present "
717
- "the required perspectives (e.g., Shafi'i and Maliki) if available. "
718
- "Return 'Valid' if the answer is correct and relevant, or 'Invalid: [reason]' if not."
719
- )),
720
- ("human", "Query: {query}\nAnswer: {answer}")
721
- ])
722
-
723
- def validate(self, query, answer, source="RAG"):
724
- if self.domain == "insurance":
725
- logger.info("Skipping validation for insurance domain.")
726
- return True, "Validation skipped for insurance domain."
727
-
728
- try:
729
- # --- 11. IMPROVED VALIDATOR LOGIC ---
730
- # Choose the right prompt based on domain and source
731
- prompt = self.general_prompt # Default
732
- if source == "RAG" or "Database" in source:
733
- if self.domain == "medical":
734
- prompt = self.medical_prompt
735
- elif self.domain == "islamic":
736
- prompt = self.islamic_prompt
737
-
738
- response = self.llm.invoke(prompt.format(query=query, answer=answer))
739
- validation = response.content.strip()
740
- logger.info(f"AnswerValidator result for query '{query}': {validation}")
741
-
742
- if validation.lower().startswith("valid"):
743
- return True, "Answer is valid and relevant."
744
- elif validation.lower().startswith("invalid"):
745
- reason = validation.split(":", 1)[1].strip() if ":" in validation else "No reason provided."
746
- return False, reason
747
- else:
748
- return False, "Validation response format unexpected."
749
- except Exception as e:
750
- logger.error(f"AnswerValidator error: {str(e)}")
751
  return False, "Validation failed due to error."
 
1
+ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
2
+ from langchain_classic import hub
3
+ from langchain_google_genai import ChatGoogleGenerativeAI
4
+ from langchain_classic.chains.combine_documents import create_stuff_documents_chain
5
+ from langchain_core.tools import Tool
6
+ from langchain_community.tools.tavily_search import TavilySearchResults
7
+ from langchain_community.retrievers import BM25Retriever
8
+ from concurrent.futures import ThreadPoolExecutor, as_completed
9
+ from langchain_core.output_parsers import JsonOutputParser
10
+ from langchain_classic.agents import AgentExecutor, create_react_agent
11
+ from langchain_core.documents import Document
12
+ from langchain_core.messages import AIMessage, HumanMessage
13
+ from langchain_chroma import Chroma
14
+ from langchain_core.agents import AgentAction
15
+ from langchain_google_genai import GoogleGenerativeAIEmbeddings
16
+ from flashrank import Ranker, RerankRequest
17
+ from src.metrics_tracker import MetricsTracker
18
+ import logging
19
+
20
+
21
+ # Setup logging
22
+ logging.basicConfig(level=logging.DEBUG)
23
+ logger = logging.getLogger(__name__)
24
+
25
+
26
+ class LLMComplexityAnalyzer:
27
+ """
28
+ Analyzes query complexity using an LLM to make a "managerial" decision
29
+ on the optimal retrieval strategy.
30
+ """
31
+
32
+ def __init__(self, domain: str, llm: ChatGoogleGenerativeAI):
33
+ self.domain = domain
34
+ self.llm = llm
35
+
36
+ self.system_prompt = (
37
+ "You are a 'Complexity Analyzer' manager for a RAG (Retrieval-Augmented Generation) system. "
38
+ "Your domain of expertise is: **{domain}**."
39
+ "\n"
40
+ "Your task is to analyze the user's query and determine its complexity. Based on this, "
41
+ "you will decide how many documents (k) to retrieve. More complex queries require "
42
+ "more documents to synthesize a good answer."
43
+ "\n"
44
+ "Here are the retrieval strategies:"
45
+ "1. **simple**: For simple, direct fact-finding queries. (e.g., 'What is takaful?') "
46
+ " - Set k = 5"
47
+ "2. **moderate**: For queries that require explanation, some comparison, or have multiple parts. "
48
+ " (e.g., 'What is the difference between madhab Shafi'i and Maliki on prayer?') "
49
+ " - Set k = 10"
50
+ "3. **complex**: For deep, nuanced, multi-step, or highly comparative/synthetic queries. "
51
+ " (e.g., 'Explain in detail the treatment options for type 2 diabetes, comparing "
52
+ " their side effects and suitability for elderly patients.')"
53
+ " - Set k = 15"
54
+ "\n"
55
+ "Analyze the following query and provide your reasoning."
56
+ "\n"
57
+ "**IMPORTANT**: You MUST respond ONLY with a single, valid JSON object. Do not add any "
58
+ "other text. The JSON object must have these three keys:"
59
+ "- `complexity`: (string) Must be one of 'simple', 'moderate', or 'complex'."
60
+ "- `k`: (integer) Must be 5, 10, or 15, corresponding to the complexity."
61
+ "- `reasoning`: (string) A brief 1-sentence explanation for your decision."
62
+ )
63
+
64
+ self.prompt_template = ChatPromptTemplate.from_messages([
65
+ ("system", self.system_prompt.format(domain=self.domain)),
66
+ ("human", "{query}")
67
+ ])
68
+
69
+ self.output_parser = JsonOutputParser()
70
+
71
+ # This chain will output a parsed dictionary
72
+ self.chain = self.prompt_template | self.llm | self.output_parser
73
+
74
+ logger.info(f"🧠 LLMComplexityAnalyzer initialized for '{self.domain}'")
75
+
76
+ def analyze(self, query: str) -> dict:
77
+ """
78
+ Analyzes query complexity using an LLM and returns the retrieval strategy.
79
+ """
80
+ logger.info(f"🧠 LLMComplexityAnalyzer: Analyzing query...")
81
+
82
+ try:
83
+ # Invoke the chain to get the structured JSON output
84
+ result = self.chain.invoke({"query": query})
85
+
86
+ # Add a 'score' field for compatibility
87
+ score_map = {"simple": 2, "moderate": 4, "complex": 6}
88
+ result['score'] = score_map.get(result.get('complexity'), 0)
89
+
90
+ logger.info(f"🧠 LLM Decision: {result.get('complexity').upper()} (k={result.get('k')})")
91
+ logger.info(f" Reasoning: {result.get('reasoning')}")
92
+
93
+ return result
94
+
95
+ except Exception as e:
96
+ # Fallback in case the LLM fails or returns bad JSON
97
+ logger.error(f"❌ LLMComplexityAnalyzer failed: {e}. Defaulting to 'moderate' strategy.")
98
+ return {
99
+ "complexity": "moderate",
100
+ "k": 12,
101
+ "score": 4,
102
+ "reasoning": "Fallback: LLM analysis or JSON parsing failed."
103
+ }
104
+
105
+
106
+ class SwarmRetriever:
107
+ """
108
+ Multi-retriever swarm that executes parallel retrieval strategies.
109
+ Worker component that takes orders from LLMComplexityAnalyzer.
110
+ """
111
+
112
+ def __init__(self, chroma_retriever, documents):
113
+ self.dense_retriever = chroma_retriever # Semantic search
114
+ self.bm25_retriever = BM25Retriever.from_documents(documents) # Keyword search
115
+ self.bm25_retriever.k = 20 # Set high, will be limited by k parameter
116
+ logger.info("✅ SwarmRetriever initialized (Dense + BM25 workers)")
117
+
118
+ def retrieve_with_swarm(self, query: str, k: int) -> list:
119
+ """
120
+ Execute multi-retriever swarm with parallel workers.
121
+ """
122
+ logger.info(f"🐝 Swarm deployment: {2} workers, target k={k}")
123
+
124
+ # Define worker tasks
125
+ retrieval_tasks = {
126
+ "dense_semantic": lambda: self.dense_retriever.invoke(query, k=k),
127
+ "bm25_keyword": lambda: self.bm25_retriever.invoke(query)[:k],
128
+ }
129
+
130
+ # Execute workers in parallel
131
+ swarm_results = {}
132
+ with ThreadPoolExecutor(max_workers=2) as executor:
133
+ futures = {
134
+ executor.submit(task): name
135
+ for name, task in retrieval_tasks.items()
136
+ }
137
+
138
+ for future in as_completed(futures):
139
+ worker_name = futures[future]
140
+ try:
141
+ results = future.result()
142
+ swarm_results[worker_name] = results
143
+ logger.info(f" ✅ Worker '{worker_name}': {len(results)} docs")
144
+ except Exception as e:
145
+ logger.error(f" Worker '{worker_name}' failed: {e}")
146
+ swarm_results[worker_name] = []
147
+
148
+ # Combine and deduplicate documents
149
+ combined_docs = self._combine_and_deduplicate(swarm_results)
150
+
151
+ return combined_docs
152
+
153
+ def _combine_and_deduplicate(self, swarm_results: dict) -> list:
154
+ """Combine results from all workers and remove duplicates."""
155
+ all_docs = []
156
+ seen_content = set()
157
+ worker_contributions = {}
158
+
159
+ for worker_name, docs in swarm_results.items():
160
+ for doc in docs:
161
+ # Use first 200 chars as hash to detect duplicates
162
+ content_hash = hash(doc.page_content[:200])
163
+
164
+ if content_hash not in seen_content:
165
+ seen_content.add(content_hash)
166
+
167
+ # Tag document with source worker
168
+ doc.metadata['swarm_worker'] = worker_name
169
+ all_docs.append(doc)
170
+
171
+ # Track contributions
172
+ worker_contributions[worker_name] = \
173
+ worker_contributions.get(worker_name, 0) + 1
174
+
175
+ logger.info(f"🐝 Swarm combined: {len(all_docs)} unique docs")
176
+ logger.info(f" Worker contributions: {worker_contributions}")
177
+
178
+ return all_docs
179
+
180
+ class AgenticQA:
181
+ def __init__(self, config=None):
182
+ logger.info("Initializing AgenticQA...")
183
+
184
+ # Load a small, fast reranker model. This runs locally.
185
+ try:
186
+ self.reranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2")
187
+ logger.info("FlashRank Reranker loaded successfully.")
188
+ except Exception as e:
189
+ logger.error(f"Failed to load FlashRank reranker: {e}")
190
+ self.reranker = None
191
+
192
+ self.contextualize_q_system_prompt = (
193
+ "Given a chat history and the latest user question which might reference context in the chat history, "
194
+ "formulate a standalone question which can be understood without the chat history. "
195
+ "IMPORTANT: DO NOT provide any answers or explanations. ONLY rephrase the question if needed. "
196
+ "If the question is already clear and standalone, return it exactly as is. "
197
+ "Output ONLY the reformulated question, nothing else."
198
+ )
199
+
200
+ self.contextualize_q_prompt = ChatPromptTemplate.from_messages(
201
+ [("system", self.contextualize_q_system_prompt),
202
+ MessagesPlaceholder("chat_history"),
203
+ ("human", "{input}")]
204
+ )
205
+ self.qa_system_prompt = (
206
+ "You are an assistant that answers questions in a specific domain for citizens mainly in Malaysia, "
207
+ "depending on the context. "
208
+ "You will receive:\n"
209
+ " • domain = {domain} (either 'medical', 'islamic' , or 'insurance')\n"
210
+ " • context = relevant retrieved passages\n"
211
+ " • user question\n\n"
212
+ "If the context does not contain the answer, **YOU MUST SAY 'I do not know'** or 'I cannot find that information in the provided documents.' Do not use your general knowledge.\n\n"
213
+ "Instructions based on domain:\n"
214
+ "1. If domain = 'medical' :\n"
215
+ " - Answer the question in clear, simple layperson language, "
216
+ " - Citing your sources (e.g. article name, section)."
217
+ " - Add a medical disclaimer: “I am not a doctor…”.\n"
218
+ "2. If domain = 'islamic':\n"
219
+ " - **ALWAYS present both Shafi'i AND Maliki perspectives** if the question is about fiqh/rulings\n"
220
+ " - **Cite specific sources**: Always mention the book name (e.g., 'According to Muwatta Imam Malik...', 'Minhaj al-Talibin states...', 'Umdat al-Salik explains...')\n"
221
+ " - **Structure answer as**:\n"
222
+ " - Shafi'i view (from Umdat al-Salik/Minhaj): [ruling with citation]\n"
223
+ " - Maliki view (from Muwatta): [ruling with citation]\n"
224
+ " - If they agree: mention the consensus\n"
225
+ " - If they differ: present both views objectively without favoring one\n"
226
+ " - **For hadith questions**: provide the narration text, source (book name, hadith number)\n "
227
+ " - - **If ruling has EXCEPTIONS** (like 'except for...', 'unless...'), YOU MUST include them. "
228
+ " If context doesn't show exceptions but the ruling seems absolute, indicate this uncertainty.\n"
229
+ " - If the context does not contain relevant information from BOTH madhabs, acknowledge which sources you have "
230
+ " (e.g., 'Based on Shafi'i sources only...') and suggest consulting additional madhab resources.\n"
231
+ " - **Always end with**: 'This is not a fatwa. Consult a local scholar for guidance specific to your situation.'\n"
232
+ " - Always include hadith narration or quran verse as evidence (if it exists) in the final response "
233
+ " - Keep answers concise but comprehensive enough to show different scholarly views.\n\n"
234
+
235
+ "3. If domain = 'insurance':\n"
236
+ " - Your knowledge is STRICTLY limited to Etiqa Takaful (Motor and Car policies).\n"
237
+ " - First, try to answer ONLY using the provided <context>.\n"
238
+ " - **If the answer is not in the context, YOU MUST SAY 'I do not have information on that specific topic.'** Do not make up an answer.\n"
239
+ " - If the user asks about other Etiqa products (e.g., medical, travel), you MUST use the 'EtiqaWebSearch' tool.\n"
240
+ " - If the user asks about another insurance company (e.g., 'Prudential', 'Takaful Ikhlas'), state that you can only answer about Etiqa Takaful.\n"
241
+ " - If the user asks a general insurance question (e.g., 'What is takaful?', 'What is an excess?'), use the 'GeneralWebSearch' tool.\n"
242
+
243
+ "4. For ALL domains: If the context does not contain the answer, do not make one up. Be honest.\n\n"
244
+ "Context:\n"
245
+ "{context}"
246
+ )
247
+
248
+ self.qa_prompt = ChatPromptTemplate.from_messages(
249
+ [("system", self.qa_system_prompt),
250
+ MessagesPlaceholder("chat_history"),
251
+ ("human", "{input}")]
252
+ )
253
+ self.llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash",temperature=0.05)
254
+ # --- START: NEW QUERY REFINER ---
255
+ self.refiner_system_prompt = (
256
+ "You are an expert search query refiner. Your task is to take a user's question "
257
+ "and rewrite it to be a perfect, concise search query for a database. "
258
+ "Remove all conversational fluff, emotion, and filler words. "
259
+ "Distill the query to its core semantic intent. "
260
+ "For example:"
261
+ "- 'Hi, I was wondering if I can touch a dog if I found it is cute?' becomes 'ruling on touching a dog in islam'"
262
+ "- 'What are the treatments for, like, a common cold?' becomes 'common cold treatment options'"
263
+ "- 'Tell me about diabetes' becomes 'what is diabetes'"
264
+ "Output ONLY the refined query, nothing else."
265
+ )
266
+
267
+ self.refiner_prompt = ChatPromptTemplate.from_messages([
268
+ ("system", self.refiner_system_prompt),
269
+ ("human", "{query}")
270
+ ])
271
+
272
+ self.refiner_chain = self.refiner_prompt | self.llm
273
+ logger.info(" Query Refiner chain initialized.")
274
+ # --- END: NEW QUERY REFINER ---
275
+
276
+ self.react_docstore_prompt = hub.pull("aallali/react_tool_priority")
277
+ self.answer_validator = AnswerValidatorAgent(self.llm)
278
+
279
+ self.retriever = None
280
+ self.agent_executor = None
281
+ self.tools = [] # Initialize the attribute
282
+ self.domain = "general"
283
+ self.answer_validator = None
284
+ self.retrieval_agent = None
285
+
286
+ if config:
287
+ logger.info(f"Configuring AgenticQA with provided config: {config}")
288
+ try:
289
+ collection_name = config["retriever"]["collection_name"]
290
+ persist_directory = config["retriever"]["persist_directory"]
291
+ self.domain = config.get("domain", "general") # Get domain from config
292
+
293
+ # 1. Initialize the embedding function
294
+ embedding_function = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004")
295
+
296
+ # 2. Connect to the persistent ChromaDB
297
+ db_client = Chroma(
298
+ persist_directory=persist_directory,
299
+ embedding_function=embedding_function,
300
+ collection_name=collection_name
301
+ )
302
+
303
+ # 3. Set the retriever for this instance
304
+ self.retriever = db_client.as_retriever()
305
+ logger.info(f" Successfully created retriever for collection '{collection_name}'")
306
+ # --- START: NEW SWARM INITIALIZATION ---
307
+ logger.info("Initializing Swarm components...")
308
+ # Get all documents from Chroma for BM25
309
+ all_docs_data = db_client.get()
310
+ docs_for_bm25 = [
311
+ Document(page_content=content, metadata=meta)
312
+ for content, meta in zip(
313
+ all_docs_data['documents'],
314
+ all_docs_data['metadatas']
315
+ )
316
+ ]
317
+
318
+ # Initialize SwarmRetriever (Workers)
319
+ self.swarm_retriever = SwarmRetriever(self.retriever, docs_for_bm25)
320
+
321
+ # Initialize LLMComplexityAnalyzer (Manager)
322
+ self.complexity_analyzer = LLMComplexityAnalyzer(self.domain, self.llm)
323
+ logger.info("✅ Swarm components (Manager + Workers) initialized.")
324
+ # --- END: NEW SWARM INITIALIZATION ---
325
+ self.metrics_tracker = MetricsTracker(save_path=f"metrics_{self.domain}.json")
326
+ logger.info("✅ Metrics tracker initialized")
327
+ # Initialize validator *after* setting domain
328
+ self.answer_validator = AnswerValidatorAgent(self.llm, self.domain)
329
+ # --- This is the new, simple QA chain that will be used *after* reranking ---
330
+ self.qa_chain = create_stuff_documents_chain(self.llm, self.qa_prompt)
331
+
332
+ self._initialize_agent()
333
+
334
+ except Exception as e:
335
+ logger.error(f"❌ Error during AgenticQA setup for '{self.domain}': {e}", exc_info=True)
336
+ else:
337
+ logger.warning("⚠️ AgenticQA initialized without a config. Retriever will be None.")
338
+
339
+ # --- 5. NEW UPGRADED RAG FUNCTION ---
340
+ # This is our new, smarter "worker" function that includes the reranker.
341
+ def _run_rag_with_reranking(self, query: str, chat_history: list) -> str:
342
+ """
343
+ Enhanced Swarm-RAG pipeline with adaptive retrieval and reranking.
344
+
345
+ Pipeline:
346
+ 1. Contextualize query
347
+ 2. Refine query
348
+ 3. ComplexityAnalyzer (Manager) determines optimal k
349
+ 4. SwarmRetriever (Workers) deploys parallel retrievers with k
350
+ 5. Rerank combined swarm results
351
+ 6. Filter results by threshold
352
+ 7. Generate Answer
353
+ """
354
+ logger.info(f"--- 🐝 SWARM RAG (with Reranker) PIPELINE RUNNING for query: '{query}' ---")
355
+
356
+ if not self.reranker or not self.swarm_retriever or not self.complexity_analyzer:
357
+ logger.error("Swarm components or Reranker not initialized. Cannot perform RAG.")
358
+ return "Error: RAG components are not available."
359
+
360
+ try:
361
+ # 1. Contextualize query
362
+ standalone_query = query
363
+ if chat_history:
364
+ contextualize_chain = self.contextualize_q_prompt | self.llm
365
+ response = contextualize_chain.invoke({"chat_history": chat_history, "input": query})
366
+ standalone_query = response.content
367
+ logger.info(f"Contextualized query: '{standalone_query}'")
368
+
369
+ # 2 - REFINE QUERY ---
370
+ logger.info("Refining query for search...")
371
+ response = self.refiner_chain.invoke({"query": standalone_query})
372
+ refined_query = response.content.strip()
373
+ logger.info(f"Refined query: '{refined_query}'")
374
+
375
+
376
+ # 3. Complexity analysis
377
+ analysis = self.complexity_analyzer.analyze(standalone_query)
378
+ k = analysis['k']
379
+ self._last_complexity_analysis = analysis
380
+ logger.info(f"Query complexity: {analysis['complexity'].upper()} | k={k}")
381
+
382
+ # 4. Retrieve with Swarm (Workers)
383
+ swarm_docs = self.swarm_retriever.retrieve_with_swarm(standalone_query, k=k)
384
+
385
+ if not swarm_docs:
386
+ self._last_context = None
387
+ logger.warning("Swarm Retriever found no documents.")
388
+ return "I do not know the answer to that as it is not in my documents."
389
+
390
+ # 5. Format for Reranker
391
+ passages = [
392
+ {"id": i, "text": doc.page_content, "meta": doc.metadata}
393
+ for i, doc in enumerate(swarm_docs)
394
+ ]
395
+
396
+ # 6. Rerank
397
+ logger.info(f"Reranking {len(passages)} swarm-retrieved documents...")
398
+ rerank_request = RerankRequest(query=standalone_query, passages=passages)
399
+ reranked_results = self.reranker.rerank(rerank_request)
400
+
401
+ top_score = reranked_results[0]['score'] if reranked_results else 0
402
+ logger.info(f"Reranking complete. Top score: {top_score:.3f}")
403
+
404
+ # 7. Filter
405
+ threshold = 0.1
406
+ if self.domain == "islamic":
407
+ threshold = 0.05
408
+ elif self.domain == "medical":
409
+ threshold = 0.15
410
+ else:
411
+ threshold = 0.10
412
+
413
+ logger.info(f"Using threshold={threshold} for {self.domain} domain")
414
+ final_docs = []
415
+ worker_contributions = {}
416
+
417
+ for result in reranked_results:
418
+ if result['score'] > threshold:
419
+ # Re-create the Document object from reranked data
420
+ doc = Document(
421
+ page_content=result['text'],
422
+ metadata=result.get('meta', {})
423
+ )
424
+ final_docs.append(doc)
425
+
426
+ # Track worker contributions in final answer
427
+ worker = result.get('meta', {}).get('swarm_worker', 'unknown')
428
+ worker_contributions[worker] = \
429
+ worker_contributions.get(worker, 0) + 1
430
+
431
+ logger.info(f"Filtered to {len(final_docs)} documents above threshold {threshold}.")
432
+ logger.info(f"Final doc contributions: {worker_contributions}")
433
+
434
+ self.metrics_tracker.log_worker_contribution(worker_contributions)
435
+ if final_docs:
436
+ # 1. Log Metadata
437
+ sources = [doc.metadata.get('source', 'unknown') for doc in final_docs]
438
+ logger.info(f"Retrieved documents: {sources}")
439
+
440
+ # 2. Log Context
441
+ contexts = [doc.page_content for doc in final_docs]
442
+ logger.info(f"Context : {contexts}")
443
+
444
+ # 3. Deduplicate and Save for Answer()
445
+ seen = set()
446
+ deduped_lines = []
447
+ for item in contexts:
448
+ if item not in seen:
449
+ seen.add(item)
450
+ deduped_lines.append(item)
451
+
452
+ self._last_context = "\n".join(deduped_lines)
453
+ else:
454
+ self._last_context = None
455
+
456
+ # 8. Respond
457
+ if not final_docs:
458
+ logger.warning("No documents passed the reranker threshold. Returning 'I don't know.'")
459
+ return "I do not know the answer to that as my document search found no relevant information."
460
+
461
+ # Call the QA chain with the *reranked, filtered* docs
462
+ response = self.qa_chain.invoke({
463
+ "context": final_docs,
464
+ "chat_history": chat_history,
465
+ "input": query,
466
+ "domain": self.domain
467
+ })
468
+
469
+ logger.info("🐝 Swarm RAG pipeline complete. Returning answer.")
470
+ return response
471
+
472
+ except Exception as e:
473
+ logger.error(f"Error in Swarm RAG pipeline: {e}", exc_info=True)
474
+ return "An error occurred while processing your request."
475
+
476
+ def _initialize_agent(self):
477
+ """Build the ReAct agent"""
478
+ """A helper function to build the agent components."""
479
+
480
+ logger.info(f"Initializing agent for domain: '{self.domain}'")
481
+
482
+ # Store chat_history as instance variable so tools can access it
483
+ self._current_chat_history = []
484
+
485
+ # We need a RAG chain for the tool
486
+ # history_aware_retriever = create_history_aware_retriever(self.llm, self.retriever, self.contextualize_q_prompt)
487
+ # question_answer_chain = create_stuff_documents_chain(self.llm, self.qa_prompt)
488
+ # rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
489
+
490
+ def rag_tool_wrapper(query: str) -> str:
491
+ """Wrapper to pass chat history to RAG pipeline."""
492
+ return self._run_rag_with_reranking(query, self._current_chat_history)
493
+
494
+ self.tools = [
495
+ Tool(
496
+ name="RAG",
497
+ func=rag_tool_wrapper,
498
+ description=(f"Use this tool FIRST to search and answer questions about the {self.domain} domain using internal vector database.")
499
+ )
500
+
501
+ ]
502
+
503
+ # --- DOMAIN-SPECIFIC TOOLS ---
504
+ if self.domain == "insurance":
505
+ # Add a specific tool for searching Etiqa's website
506
+ etiqa_search_tool = TavilySearchResults(max_results=3)
507
+ etiqa_search_tool.description = "Use this tool to search the Etiqa Takaful website for products NOT in the RAG context (e.g., medical, travel)."
508
+ # This is a bit of a "hack" to force Tavily to search a specific site.
509
+ # We modify the function it calls.
510
+ original_etiqa_func = etiqa_search_tool.invoke
511
+ def etiqa_site_search(query):
512
+ return original_etiqa_func(f"site:etiqa.com.my {query}")
513
+
514
+ self.tools.append(Tool(
515
+ name="EtiqaWebSearch",
516
+ func=etiqa_site_search,
517
+ description=etiqa_search_tool.description
518
+ ))
519
+
520
+ # Add a general web search tool
521
+ self.tools.append(Tool(
522
+ name="GeneralWebSearch",
523
+ func=TavilySearchResults(max_results=2).invoke,
524
+ description="Use this tool as a fallback for general, non-Etiqa questions (e.g., 'What is takaful?')."
525
+ ))
526
+ elif self.domain == "islamic":
527
+ # Trusted Islamic sources for Malaysia
528
+ islamic_search = TavilySearchResults(max_results=3)
529
+
530
+ def islamic_trusted_search(query):
531
+ # Search only trusted Malaysian Islamic authorities
532
+ sites = "site:muftiwp.gov.my OR site:zulkiflialbakri.com"
533
+ return islamic_search.invoke(f"{sites} {query}")
534
+
535
+ self.tools.append(Tool(
536
+ name="TrustedIslamicSearch",
537
+ func=islamic_trusted_search,
538
+ description=(
539
+ "Use this tool if RAG has incomplete or no answer. "
540
+ "Searches ONLY trusted Malaysian Islamic sources: "
541
+ "Pejabat Mufti Wilayah Persekutuan (muftiwp.gov.my) and "
542
+ "Dr Zulkifli Mohamad Al Bakri (zulkiflialbakri.com/category/soal-jawab-agama/). "
543
+ "These follow Shafi'i madhab which is official in Malaysia."
544
+ )
545
+ ))
546
+
547
+ # General fallback (last resort)
548
+ self.tools.append(Tool(
549
+ name="GeneralWebSearch",
550
+ func=TavilySearchResults(max_results=2).invoke,
551
+ description="Last resort: Use only for general Islamic terms or definitions not found in RAG or trusted sources."
552
+ ))
553
+ else:
554
+ # Medical and Islamic domains only get the general web search fallback
555
+ self.tools.append(Tool(
556
+ name="GeneralWebSearch",
557
+ func=TavilySearchResults(max_results=2).invoke,
558
+ description="Use this tool as a fallback if the RAG tool finds no relevant information or if the query is about a general topic."
559
+ ))
560
+
561
+ agent = create_react_agent(llm=self.llm, tools=self.tools, prompt=self.react_docstore_prompt)
562
+
563
+ self.agent_executor = AgentExecutor.from_agent_and_tools(
564
+ agent=agent,
565
+ tools=self.tools,
566
+ handle_parsing_errors=True,
567
+ verbose=True,
568
+ return_intermediate_steps=True,
569
+ max_iterations=5
570
+ )
571
+ logger.info(f"✅ Agent Executor(ReAct Agent) created successfully for '{self.domain}'.")
572
+
573
+
574
+ def answer(self, query, chat_history=None):
575
+ """
576
+ Process a query using the agent and returns a clean dictionary.
577
+
578
+ Args:
579
+ query (str): User's question
580
+ chat_history (list): List of previous messages (AIMessage, HumanMessage)
581
+
582
+ Returns:
583
+ dict: Contains 'answer', 'context', 'validation', 'source', 'thoughts'
584
+ """
585
+ if chat_history is None:
586
+ chat_history = []
587
+ self._current_chat_history = chat_history
588
+ if not self.agent_executor:
589
+ return {"answer": "Error: Agent not initialized.", "context": "", "validation": (False, "Init failed"), "source": "Error"}
590
+ # START TIMING
591
+ start_time = self.metrics_tracker.start_query()
592
+ print(f"\n📝 AGENTIC_QA PROCESSING QUERY: '{query}'")
593
+
594
+ response = self.agent_executor.invoke({
595
+ "input": query,
596
+ "chat_history": chat_history,
597
+ "domain": self.domain, # Pass domain to agent
598
+ "metadata": {
599
+ "domain": self.domain
600
+ }
601
+ })
602
+ thoughts= ""
603
+
604
+ final_answer = response.get("output", "Could not generate an answer")
605
+
606
+ tool_used = None
607
+ if "intermediate_steps" in response:
608
+ thought_log= []
609
+ for step in response["intermediate_steps"]:
610
+ # --- FIX: Unpack the (Action, Observation) tuple first ---
611
+ action, observation = step
612
+
613
+ if isinstance(action, AgentAction) and action.tool:
614
+ tool_used = action.tool #Capture the last tool used
615
+
616
+ # Append Thought, Action, Action Input & Observation
617
+ thought_log.append(action.log)
618
+ thought_log.append(f"\nObservation: {str(observation)}\n---")
619
+
620
+ thoughts = "\n".join(thought_log)
621
+
622
+ # Assign source based on the LAST tool used
623
+ if tool_used == "RAG":
624
+ source = "Etiqa Takaful Database" if self.domain == "insurance" else "Domain Database (RAG)"
625
+ elif tool_used == "EtiqaWebSearch":
626
+ source = "Etiqa Website Search"
627
+ elif tool_used == "TrustedIslamicSearch":
628
+ source = "Mufti WP & Dr Zul Search"
629
+ elif tool_used == "GeneralWebSearch":
630
+ source = "General Web Search"
631
+ else:
632
+ source = "Agent Logic"
633
+
634
+ logger.info(f"Tool used: {tool_used}, Source determined: {source}")
635
+
636
+
637
+ if (source.endswith("(RAG)") or source.startswith("Etiqa Takaful Database")) and self._last_context:
638
+ context = self._last_context
639
+ elif "Web" in source:
640
+ context = "Web search results were used. See 'Observation' in thoughts log."
641
+ else:
642
+ context = "No RAG context retrieved."
643
+
644
+ validation = self.answer_validator.validate(query, final_answer, source=source)
645
+ # END TIMING
646
+ response_time = self.metrics_tracker.end_query(start_time)
647
+
648
+ complexity_info = getattr(self, '_last_complexity_analysis', None)
649
+
650
+ # LOG METRICS
651
+ self.metrics_tracker.log_query(
652
+ query=query,
653
+ domain=self.domain,
654
+ source=source,
655
+ complexity=complexity_info,
656
+ validation=validation,
657
+ response_time=response_time,
658
+ answer_preview=final_answer
659
+ )
660
+ return {"answer": final_answer, "context": context, "validation": validation, "source": source, "thoughts": thoughts,"response_time": response_time,
661
+ "complexity": complexity_info}
662
+
663
+ class AnswerValidatorAgent:
664
+ def __init__(self, llm, domain="general"):
665
+ self.llm = llm
666
+ self.domain = domain
667
+ self.general_prompt = ChatPromptTemplate.from_messages([
668
+ ("system", (
669
+ "You are an answer validator. Check if the generated answer is factually correct "
670
+ "and relevant to the query. Return 'Valid' if the answer is correct and relevant, "
671
+ "or 'Invalid: [reason]' if not, where [reason] is a brief explanation of the issue."
672
+ )),
673
+ ("human", "Query: {query}\nAnswer: {answer}")
674
+ ])
675
+ self.medical_prompt = ChatPromptTemplate.from_messages([
676
+ ("system", (
677
+ "You are an answer validator. Check if the generated answer is factually correct, "
678
+ "relevant to the query, and consistent with known medical knowledge. "
679
+ "Return 'Valid' if the answer is correct and relevant, or 'Invalid: [reason]' if not, "
680
+ "where [reason] is a brief explanation of the issue. "
681
+ "**Pay close attention to contradictions.** If an answer gives advice and then "
682
+ "contradicts it (e.g., 'switch immediately' and then 'always consult your doctor first'), "
683
+ "it is **Invalid** because it is unsafe and confusing."
684
+ )),
685
+ ("human", "Query: {query}\nAnswer: {answer}")
686
+ ])
687
+ self.islamic_prompt = ChatPromptTemplate.from_messages([
688
+ ("system", (
689
+ "You are an answer validator for Islamic Fiqh or anything related to Islam. Check if the answer correctly addresses "
690
+ "the query based on the provided sources. The answer should be neutral and present "
691
+ "the required perspectives (e.g., Shafi'i and Maliki) if available. "
692
+ "Return 'Valid' if the answer is correct and relevant, or 'Invalid: [reason]' if not."
693
+ )),
694
+ ("human", "Query: {query}\nAnswer: {answer}")
695
+ ])
696
+
697
+ def validate(self, query, answer, source="RAG"):
698
+ if self.domain == "insurance":
699
+ logger.info("Skipping validation for insurance domain.")
700
+ return True, "Validation skipped for insurance domain."
701
+
702
+ try:
703
+ # --- 11. IMPROVED VALIDATOR LOGIC ---
704
+ # Choose the right prompt based on domain and source
705
+ prompt = self.general_prompt # Default
706
+ if source == "RAG" or "Database" in source:
707
+ if self.domain == "medical":
708
+ prompt = self.medical_prompt
709
+ elif self.domain == "islamic":
710
+ prompt = self.islamic_prompt
711
+
712
+ response = self.llm.invoke(prompt.format(query=query, answer=answer))
713
+ validation = response.content.strip()
714
+ logger.info(f"AnswerValidator result for query '{query}': {validation}")
715
+
716
+ if validation.lower().startswith("valid"):
717
+ return True, "Answer is valid and relevant."
718
+ elif validation.lower().startswith("invalid"):
719
+ reason = validation.split(":", 1)[1].strip() if ":" in validation else "No reason provided."
720
+ return False, reason
721
+ else:
722
+ return False, "Validation response format unexpected."
723
+ except Exception as e:
724
+ logger.error(f"AnswerValidator error: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
725
  return False, "Validation failed due to error."