|
|
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
|
|
|
from langchain_classic import hub
|
|
|
from langchain_google_genai import ChatGoogleGenerativeAI
|
|
|
from langchain_classic.chains.combine_documents import create_stuff_documents_chain
|
|
|
from langchain_core.tools import Tool
|
|
|
from langchain_community.tools.tavily_search import TavilySearchResults
|
|
|
from langchain_community.retrievers import BM25Retriever
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
from langchain_core.output_parsers import JsonOutputParser
|
|
|
from langchain_classic.agents import AgentExecutor, create_react_agent
|
|
|
from langchain_core.documents import Document
|
|
|
from langchain_core.messages import AIMessage, HumanMessage
|
|
|
from langchain_chroma import Chroma
|
|
|
from langchain_core.agents import AgentAction
|
|
|
from langchain_google_genai import GoogleGenerativeAIEmbeddings
|
|
|
from flashrank import Ranker, RerankRequest
|
|
|
from src.metrics_tracker import MetricsTracker
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ContextRetriever:
|
|
|
def __init__(self, retriever):
|
|
|
self.retriever = retriever
|
|
|
|
|
|
def deduplicate_context(self, context_list):
|
|
|
"""Deduplicate context entries to avoid repetition."""
|
|
|
seen = set()
|
|
|
deduped = []
|
|
|
for item in context_list:
|
|
|
if item not in seen:
|
|
|
seen.add(item)
|
|
|
deduped.append(item)
|
|
|
return "\n".join(deduped) if deduped else "No relevant context found."
|
|
|
|
|
|
def retrieve(self, query, top_k=5):
|
|
|
"""
|
|
|
Retrieve the top-k relevant contexts from ChromaDB based on the query.
|
|
|
|
|
|
Args:
|
|
|
query (str): The query or prediction to search for.
|
|
|
top_k (int): Number of top results to return (default: 3).
|
|
|
|
|
|
Returns:
|
|
|
str: Deduplicated context string from the top-k results.
|
|
|
"""
|
|
|
logger.info(f"Retrieving for query: {query}")
|
|
|
try:
|
|
|
|
|
|
results = self.retriever.invoke(query, k=top_k)
|
|
|
logger.info(f"Retrieved documents: {[doc.metadata.get('source') for doc in results]}")
|
|
|
|
|
|
|
|
|
contexts = [doc.page_content for doc in results]
|
|
|
logger.info(f"Context : {contexts}")
|
|
|
|
|
|
|
|
|
deduped_context = self.deduplicate_context(contexts)
|
|
|
logger.info(f"Deduplicated context: {deduped_context}")
|
|
|
|
|
|
return deduped_context
|
|
|
except Exception as e:
|
|
|
logger.error(f"Retrieval error: {str(e)}")
|
|
|
return "Retrieval failed due to error."
|
|
|
|
|
|
class LLMComplexityAnalyzer:
|
|
|
"""
|
|
|
Analyzes query complexity using an LLM to make a "managerial" decision
|
|
|
on the optimal retrieval strategy.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, domain: str, llm: ChatGoogleGenerativeAI):
|
|
|
self.domain = domain
|
|
|
self.llm = llm
|
|
|
|
|
|
self.system_prompt = (
|
|
|
"You are a 'Complexity Analyzer' manager for a RAG (Retrieval-Augmented Generation) system. "
|
|
|
"Your domain of expertise is: **{domain}**."
|
|
|
"\n"
|
|
|
"Your task is to analyze the user's query and determine its complexity. Based on this, "
|
|
|
"you will decide how many documents (k) to retrieve. More complex queries require "
|
|
|
"more documents to synthesize a good answer."
|
|
|
"\n"
|
|
|
"Here are the retrieval strategies:"
|
|
|
"1. **simple**: For simple, direct fact-finding queries. (e.g., 'What is takaful?') "
|
|
|
" - Set k = 5"
|
|
|
"2. **moderate**: For queries that require explanation, some comparison, or have multiple parts. "
|
|
|
" (e.g., 'What is the difference between madhab Shafi'i and Maliki on prayer?') "
|
|
|
" - Set k = 10"
|
|
|
"3. **complex**: For deep, nuanced, multi-step, or highly comparative/synthetic queries. "
|
|
|
" (e.g., 'Explain in detail the treatment options for type 2 diabetes, comparing "
|
|
|
" their side effects and suitability for elderly patients.')"
|
|
|
" - Set k = 15"
|
|
|
"\n"
|
|
|
"Analyze the following query and provide your reasoning."
|
|
|
"\n"
|
|
|
"**IMPORTANT**: You MUST respond ONLY with a single, valid JSON object. Do not add any "
|
|
|
"other text. The JSON object must have these three keys:"
|
|
|
"- `complexity`: (string) Must be one of 'simple', 'moderate', or 'complex'."
|
|
|
"- `k`: (integer) Must be 5, 10, or 15, corresponding to the complexity."
|
|
|
"- `reasoning`: (string) A brief 1-sentence explanation for your decision."
|
|
|
)
|
|
|
|
|
|
self.prompt_template = ChatPromptTemplate.from_messages([
|
|
|
("system", self.system_prompt.format(domain=self.domain)),
|
|
|
("human", "{query}")
|
|
|
])
|
|
|
|
|
|
self.output_parser = JsonOutputParser()
|
|
|
|
|
|
|
|
|
self.chain = self.prompt_template | self.llm | self.output_parser
|
|
|
|
|
|
logger.info(f"🧠 LLMComplexityAnalyzer initialized for '{self.domain}'")
|
|
|
|
|
|
def analyze(self, query: str) -> dict:
|
|
|
"""
|
|
|
Analyzes query complexity using an LLM and returns the retrieval strategy.
|
|
|
"""
|
|
|
logger.info(f"🧠 LLMComplexityAnalyzer: Analyzing query...")
|
|
|
|
|
|
try:
|
|
|
|
|
|
result = self.chain.invoke({"query": query})
|
|
|
|
|
|
|
|
|
score_map = {"simple": 2, "moderate": 4, "complex": 6}
|
|
|
result['score'] = score_map.get(result.get('complexity'), 0)
|
|
|
|
|
|
logger.info(f"🧠 LLM Decision: {result.get('complexity').upper()} (k={result.get('k')})")
|
|
|
logger.info(f" Reasoning: {result.get('reasoning')}")
|
|
|
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"❌ LLMComplexityAnalyzer failed: {e}. Defaulting to 'moderate' strategy.")
|
|
|
return {
|
|
|
"complexity": "moderate",
|
|
|
"k": 12,
|
|
|
"score": 4,
|
|
|
"reasoning": "Fallback: LLM analysis or JSON parsing failed."
|
|
|
}
|
|
|
|
|
|
|
|
|
class SwarmRetriever:
|
|
|
"""
|
|
|
Multi-retriever swarm that executes parallel retrieval strategies.
|
|
|
Worker component that takes orders from LLMComplexityAnalyzer.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, chroma_retriever, documents):
|
|
|
self.dense_retriever = chroma_retriever
|
|
|
self.bm25_retriever = BM25Retriever.from_documents(documents)
|
|
|
self.bm25_retriever.k = 20
|
|
|
logger.info("✅ SwarmRetriever initialized (Dense + BM25 workers)")
|
|
|
|
|
|
def retrieve_with_swarm(self, query: str, k: int) -> list:
|
|
|
"""
|
|
|
Execute multi-retriever swarm with parallel workers.
|
|
|
"""
|
|
|
logger.info(f"🐝 Swarm deployment: {2} workers, target k={k}")
|
|
|
|
|
|
|
|
|
retrieval_tasks = {
|
|
|
"dense_semantic": lambda: self.dense_retriever.invoke(query, k=k),
|
|
|
"bm25_keyword": lambda: self.bm25_retriever.invoke(query)[:k],
|
|
|
}
|
|
|
|
|
|
|
|
|
swarm_results = {}
|
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
|
futures = {
|
|
|
executor.submit(task): name
|
|
|
for name, task in retrieval_tasks.items()
|
|
|
}
|
|
|
|
|
|
for future in as_completed(futures):
|
|
|
worker_name = futures[future]
|
|
|
try:
|
|
|
results = future.result()
|
|
|
swarm_results[worker_name] = results
|
|
|
logger.info(f" ✅ Worker '{worker_name}': {len(results)} docs")
|
|
|
except Exception as e:
|
|
|
logger.error(f" ❌ Worker '{worker_name}' failed: {e}")
|
|
|
swarm_results[worker_name] = []
|
|
|
|
|
|
|
|
|
combined_docs = self._combine_and_deduplicate(swarm_results)
|
|
|
|
|
|
return combined_docs
|
|
|
|
|
|
def _combine_and_deduplicate(self, swarm_results: dict) -> list:
|
|
|
"""Combine results from all workers and remove duplicates."""
|
|
|
all_docs = []
|
|
|
seen_content = set()
|
|
|
worker_contributions = {}
|
|
|
|
|
|
for worker_name, docs in swarm_results.items():
|
|
|
for doc in docs:
|
|
|
|
|
|
content_hash = hash(doc.page_content[:200])
|
|
|
|
|
|
if content_hash not in seen_content:
|
|
|
seen_content.add(content_hash)
|
|
|
|
|
|
|
|
|
doc.metadata['swarm_worker'] = worker_name
|
|
|
all_docs.append(doc)
|
|
|
|
|
|
|
|
|
worker_contributions[worker_name] = \
|
|
|
worker_contributions.get(worker_name, 0) + 1
|
|
|
|
|
|
logger.info(f"🐝 Swarm combined: {len(all_docs)} unique docs")
|
|
|
logger.info(f" Worker contributions: {worker_contributions}")
|
|
|
|
|
|
return all_docs
|
|
|
|
|
|
class AgenticQA:
|
|
|
def __init__(self, config=None):
|
|
|
logger.info("Initializing AgenticQA...")
|
|
|
|
|
|
|
|
|
try:
|
|
|
self.reranker = Ranker(model_name="ms-marco-MiniLM-L-12-v2")
|
|
|
logger.info("FlashRank Reranker loaded successfully.")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to load FlashRank reranker: {e}")
|
|
|
self.reranker = None
|
|
|
|
|
|
self.contextualize_q_system_prompt = (
|
|
|
"Given a chat history and the latest user question which might reference context in the chat history, "
|
|
|
"formulate a standalone question which can be understood without the chat history. "
|
|
|
"IMPORTANT: DO NOT provide any answers or explanations. ONLY rephrase the question if needed. "
|
|
|
"If the question is already clear and standalone, return it exactly as is. "
|
|
|
"Output ONLY the reformulated question, nothing else."
|
|
|
)
|
|
|
|
|
|
self.contextualize_q_prompt = ChatPromptTemplate.from_messages(
|
|
|
[("system", self.contextualize_q_system_prompt),
|
|
|
MessagesPlaceholder("chat_history"),
|
|
|
("human", "{input}")]
|
|
|
)
|
|
|
self.qa_system_prompt = (
|
|
|
"You are an assistant that answers questions in a specific domain for citizens mainly in Malaysia, "
|
|
|
"depending on the context. "
|
|
|
"You will receive:\n"
|
|
|
" • domain = {domain} (either 'medical', 'islamic' , or 'insurance')\n"
|
|
|
" • context = relevant retrieved passages\n"
|
|
|
" • user question\n\n"
|
|
|
"If the context does not contain the answer, **YOU MUST SAY 'I do not know'** or 'I cannot find that information in the provided documents.' Do not use your general knowledge.\n\n"
|
|
|
"Instructions based on domain:\n"
|
|
|
"1. If domain = 'medical' :\n"
|
|
|
" - Answer the question in clear, simple layperson language, "
|
|
|
" - Citing your sources (e.g. article name, section)."
|
|
|
" - Add a medical disclaimer: “I am not a doctor…”.\n"
|
|
|
"2. If domain = 'islamic':\n"
|
|
|
" - **ALWAYS present both Shafi'i AND Maliki perspectives** if the question is about fiqh/rulings\n"
|
|
|
" - **Cite specific sources**: Always mention the book name (e.g., 'According to Muwatta Imam Malik...', 'Minhaj al-Talibin states...', 'Umdat al-Salik explains...')\n"
|
|
|
" - **Structure answer as**:\n"
|
|
|
" - Shafi'i view (from Umdat al-Salik/Minhaj): [ruling with citation]\n"
|
|
|
" - Maliki view (from Muwatta): [ruling with citation]\n"
|
|
|
" - If they agree: mention the consensus\n"
|
|
|
" - If they differ: present both views objectively without favoring one\n"
|
|
|
" - **For hadith questions**: provide the narration text, source (book name, hadith number)\n "
|
|
|
" - - **If ruling has EXCEPTIONS** (like 'except for...', 'unless...'), YOU MUST include them. "
|
|
|
" If context doesn't show exceptions but the ruling seems absolute, indicate this uncertainty.\n"
|
|
|
" - If the context does not contain relevant information from BOTH madhabs, acknowledge which sources you have "
|
|
|
" (e.g., 'Based on Shafi'i sources only...') and suggest consulting additional madhab resources.\n"
|
|
|
" - **Always end with**: 'This is not a fatwa. Consult a local scholar for guidance specific to your situation.'\n"
|
|
|
" - Always include hadith narration or quran verse as evidence (if it exists) in the final response "
|
|
|
" - Keep answers concise but comprehensive enough to show different scholarly views.\n\n"
|
|
|
|
|
|
"3. If domain = 'insurance':\n"
|
|
|
" - Your knowledge is STRICTLY limited to Etiqa Takaful (Motor and Car policies).\n"
|
|
|
" - First, try to answer ONLY using the provided <context>.\n"
|
|
|
" - **If the answer is not in the context, YOU MUST SAY 'I do not have information on that specific topic.'** Do not make up an answer.\n"
|
|
|
" - If the user asks about other Etiqa products (e.g., medical, travel), you MUST use the 'EtiqaWebSearch' tool.\n"
|
|
|
" - If the user asks about another insurance company (e.g., 'Prudential', 'Takaful Ikhlas'), state that you can only answer about Etiqa Takaful.\n"
|
|
|
" - If the user asks a general insurance question (e.g., 'What is takaful?', 'What is an excess?'), use the 'GeneralWebSearch' tool.\n"
|
|
|
|
|
|
"4. For ALL domains: If the context does not contain the answer, do not make one up. Be honest.\n\n"
|
|
|
"Context:\n"
|
|
|
"{context}"
|
|
|
)
|
|
|
|
|
|
self.qa_prompt = ChatPromptTemplate.from_messages(
|
|
|
[("system", self.qa_system_prompt),
|
|
|
MessagesPlaceholder("chat_history"),
|
|
|
("human", "{input}")]
|
|
|
)
|
|
|
self.llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash",temperature=0.05)
|
|
|
|
|
|
self.refiner_system_prompt = (
|
|
|
"You are an expert search query refiner. Your task is to take a user's question "
|
|
|
"and rewrite it to be a perfect, concise search query for a database. "
|
|
|
"Remove all conversational fluff, emotion, and filler words. "
|
|
|
"Distill the query to its core semantic intent. "
|
|
|
"For example:"
|
|
|
"- 'Hi, I was wondering if I can touch a dog if I found it is cute?' becomes 'ruling on touching a dog in islam'"
|
|
|
"- 'What are the treatments for, like, a common cold?' becomes 'common cold treatment options'"
|
|
|
"- 'Tell me about diabetes' becomes 'what is diabetes'"
|
|
|
"Output ONLY the refined query, nothing else."
|
|
|
)
|
|
|
|
|
|
self.refiner_prompt = ChatPromptTemplate.from_messages([
|
|
|
("system", self.refiner_system_prompt),
|
|
|
("human", "{query}")
|
|
|
])
|
|
|
|
|
|
self.refiner_chain = self.refiner_prompt | self.llm
|
|
|
logger.info("✅ Query Refiner chain initialized.")
|
|
|
|
|
|
|
|
|
self.react_docstore_prompt = hub.pull("aallali/react_tool_priority")
|
|
|
self.answer_validator = AnswerValidatorAgent(self.llm)
|
|
|
|
|
|
self.retriever = None
|
|
|
self.agent_executor = None
|
|
|
self.tools = []
|
|
|
self.domain = "general"
|
|
|
self.answer_validator = None
|
|
|
self.retrieval_agent = None
|
|
|
|
|
|
if config:
|
|
|
logger.info(f"Configuring AgenticQA with provided config: {config}")
|
|
|
try:
|
|
|
collection_name = config["retriever"]["collection_name"]
|
|
|
persist_directory = config["retriever"]["persist_directory"]
|
|
|
self.domain = config.get("domain", "general")
|
|
|
|
|
|
|
|
|
embedding_function = GoogleGenerativeAIEmbeddings(model="models/text-embedding-004")
|
|
|
|
|
|
|
|
|
db_client = Chroma(
|
|
|
persist_directory=persist_directory,
|
|
|
embedding_function=embedding_function,
|
|
|
collection_name=collection_name
|
|
|
)
|
|
|
|
|
|
|
|
|
self.retriever = db_client.as_retriever()
|
|
|
logger.info(f"✅ Successfully created retriever for collection '{collection_name}'")
|
|
|
|
|
|
logger.info("Initializing Swarm components...")
|
|
|
|
|
|
all_docs_data = db_client.get()
|
|
|
docs_for_bm25 = [
|
|
|
Document(page_content=content, metadata=meta)
|
|
|
for content, meta in zip(
|
|
|
all_docs_data['documents'],
|
|
|
all_docs_data['metadatas']
|
|
|
)
|
|
|
]
|
|
|
|
|
|
|
|
|
self.swarm_retriever = SwarmRetriever(self.retriever, docs_for_bm25)
|
|
|
|
|
|
|
|
|
self.complexity_analyzer = LLMComplexityAnalyzer(self.domain, self.llm)
|
|
|
logger.info("✅ Swarm components (Manager + Workers) initialized.")
|
|
|
|
|
|
self.metrics_tracker = MetricsTracker(save_path=f"metrics_{self.domain}.json")
|
|
|
logger.info("✅ Metrics tracker initialized")
|
|
|
|
|
|
self.answer_validator = AnswerValidatorAgent(self.llm, self.domain)
|
|
|
|
|
|
self.qa_chain = create_stuff_documents_chain(self.llm, self.qa_prompt)
|
|
|
|
|
|
self._initialize_agent()
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ Error during AgenticQA setup for '{self.domain}': {e}", exc_info=True)
|
|
|
else:
|
|
|
logger.warning("⚠️ AgenticQA initialized without a config. Retriever will be None.")
|
|
|
|
|
|
|
|
|
|
|
|
def _run_rag_with_reranking(self, query: str, chat_history: list) -> str:
|
|
|
"""
|
|
|
Enhanced Swarm-RAG pipeline with adaptive retrieval and reranking.
|
|
|
|
|
|
Pipeline:
|
|
|
1. Contextualize query
|
|
|
2. Refine query
|
|
|
3. ComplexityAnalyzer (Manager) determines optimal k
|
|
|
4. SwarmRetriever (Workers) deploys parallel retrievers with k
|
|
|
5. Rerank combined swarm results
|
|
|
6. Filter results by threshold
|
|
|
7. Generate Answer
|
|
|
"""
|
|
|
logger.info(f"--- 🐝 SWARM RAG (with Reranker) PIPELINE RUNNING for query: '{query}' ---")
|
|
|
|
|
|
if not self.reranker or not self.swarm_retriever or not self.complexity_analyzer:
|
|
|
logger.error("Swarm components or Reranker not initialized. Cannot perform RAG.")
|
|
|
return "Error: RAG components are not available."
|
|
|
|
|
|
try:
|
|
|
|
|
|
standalone_query = query
|
|
|
if chat_history:
|
|
|
contextualize_chain = self.contextualize_q_prompt | self.llm
|
|
|
response = contextualize_chain.invoke({"chat_history": chat_history, "input": query})
|
|
|
standalone_query = response.content
|
|
|
logger.info(f"Contextualized query: '{standalone_query}'")
|
|
|
|
|
|
|
|
|
logger.info("Refining query for search...")
|
|
|
response = self.refiner_chain.invoke({"query": standalone_query})
|
|
|
refined_query = response.content.strip()
|
|
|
logger.info(f"Refined query: '{refined_query}'")
|
|
|
|
|
|
|
|
|
|
|
|
analysis = self.complexity_analyzer.analyze(standalone_query)
|
|
|
k = analysis['k']
|
|
|
self._last_complexity_analysis = analysis
|
|
|
logger.info(f"Query complexity: {analysis['complexity'].upper()} | k={k}")
|
|
|
|
|
|
|
|
|
swarm_docs = self.swarm_retriever.retrieve_with_swarm(standalone_query, k=k)
|
|
|
|
|
|
if not swarm_docs:
|
|
|
logger.warning("Swarm Retriever found no documents.")
|
|
|
return "I do not know the answer to that as it is not in my documents."
|
|
|
|
|
|
|
|
|
passages = [
|
|
|
{"id": i, "text": doc.page_content, "meta": doc.metadata}
|
|
|
for i, doc in enumerate(swarm_docs)
|
|
|
]
|
|
|
|
|
|
|
|
|
logger.info(f"Reranking {len(passages)} swarm-retrieved documents...")
|
|
|
rerank_request = RerankRequest(query=standalone_query, passages=passages)
|
|
|
reranked_results = self.reranker.rerank(rerank_request)
|
|
|
|
|
|
top_score = reranked_results[0]['score'] if reranked_results else 0
|
|
|
logger.info(f"Reranking complete. Top score: {top_score:.3f}")
|
|
|
|
|
|
|
|
|
threshold = 0.1
|
|
|
if self.domain == "islamic":
|
|
|
threshold = 0.05
|
|
|
elif self.domain == "medical":
|
|
|
threshold = 0.15
|
|
|
else:
|
|
|
threshold = 0.10
|
|
|
|
|
|
logger.info(f"Using threshold={threshold} for {self.domain} domain")
|
|
|
final_docs = []
|
|
|
worker_contributions = {}
|
|
|
|
|
|
for result in reranked_results:
|
|
|
if result['score'] > threshold:
|
|
|
|
|
|
doc = Document(
|
|
|
page_content=result['text'],
|
|
|
metadata=result.get('meta', {})
|
|
|
)
|
|
|
final_docs.append(doc)
|
|
|
|
|
|
|
|
|
worker = result.get('meta', {}).get('swarm_worker', 'unknown')
|
|
|
worker_contributions[worker] = \
|
|
|
worker_contributions.get(worker, 0) + 1
|
|
|
|
|
|
logger.info(f"Filtered to {len(final_docs)} documents above threshold {threshold}.")
|
|
|
logger.info(f"Final doc contributions: {worker_contributions}")
|
|
|
|
|
|
self.metrics_tracker.log_worker_contribution(worker_contributions)
|
|
|
|
|
|
if not final_docs:
|
|
|
logger.warning("No documents passed the reranker threshold. Returning 'I don't know.'")
|
|
|
return "I do not know the answer to that as my document search found no relevant information."
|
|
|
|
|
|
|
|
|
response = self.qa_chain.invoke({
|
|
|
"context": final_docs,
|
|
|
"chat_history": chat_history,
|
|
|
"input": query,
|
|
|
"domain": self.domain
|
|
|
})
|
|
|
|
|
|
logger.info("🐝 Swarm RAG pipeline complete. Returning answer.")
|
|
|
return response
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in Swarm RAG pipeline: {e}", exc_info=True)
|
|
|
return "An error occurred while processing your request."
|
|
|
|
|
|
def _initialize_agent(self):
|
|
|
"""Build the ReAct agent"""
|
|
|
"""A helper function to build the agent components."""
|
|
|
|
|
|
logger.info(f"Initializing agent for domain: '{self.domain}'")
|
|
|
self.context_retriever = ContextRetriever(self.retriever)
|
|
|
|
|
|
|
|
|
self._current_chat_history = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rag_tool_wrapper(query: str) -> str:
|
|
|
"""Wrapper to pass chat history to RAG pipeline."""
|
|
|
return self._run_rag_with_reranking(query, self._current_chat_history)
|
|
|
|
|
|
self.tools = [
|
|
|
Tool(
|
|
|
name="RAG",
|
|
|
func=rag_tool_wrapper,
|
|
|
description=(f"Use this tool FIRST to search and answer questions about the {self.domain} domain using internal vector database.")
|
|
|
)
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
if self.domain == "insurance":
|
|
|
|
|
|
etiqa_search_tool = TavilySearchResults(max_results=3)
|
|
|
etiqa_search_tool.description = "Use this tool to search the Etiqa Takaful website for products NOT in the RAG context (e.g., medical, travel)."
|
|
|
|
|
|
|
|
|
original_etiqa_func = etiqa_search_tool.invoke
|
|
|
def etiqa_site_search(query):
|
|
|
return original_etiqa_func(f"site:etiqa.com.my {query}")
|
|
|
|
|
|
self.tools.append(Tool(
|
|
|
name="EtiqaWebSearch",
|
|
|
func=etiqa_site_search,
|
|
|
description=etiqa_search_tool.description
|
|
|
))
|
|
|
|
|
|
|
|
|
self.tools.append(Tool(
|
|
|
name="GeneralWebSearch",
|
|
|
func=TavilySearchResults(max_results=2).invoke,
|
|
|
description="Use this tool as a fallback for general, non-Etiqa questions (e.g., 'What is takaful?')."
|
|
|
))
|
|
|
elif self.domain == "islamic":
|
|
|
|
|
|
islamic_search = TavilySearchResults(max_results=3)
|
|
|
|
|
|
def islamic_trusted_search(query):
|
|
|
|
|
|
sites = "site:muftiwp.gov.my OR site:zulkiflialbakri.com"
|
|
|
return islamic_search.invoke(f"{sites} {query}")
|
|
|
|
|
|
self.tools.append(Tool(
|
|
|
name="TrustedIslamicSearch",
|
|
|
func=islamic_trusted_search,
|
|
|
description=(
|
|
|
"Use this tool if RAG has incomplete or no answer. "
|
|
|
"Searches ONLY trusted Malaysian Islamic sources: "
|
|
|
"Pejabat Mufti Wilayah Persekutuan (muftiwp.gov.my) and "
|
|
|
"Dr Zulkifli Mohamad Al Bakri (zulkiflialbakri.com/category/soal-jawab-agama/). "
|
|
|
"These follow Shafi'i madhab which is official in Malaysia."
|
|
|
)
|
|
|
))
|
|
|
|
|
|
|
|
|
self.tools.append(Tool(
|
|
|
name="GeneralWebSearch",
|
|
|
func=TavilySearchResults(max_results=2).invoke,
|
|
|
description="Last resort: Use only for general Islamic terms or definitions not found in RAG or trusted sources."
|
|
|
))
|
|
|
else:
|
|
|
|
|
|
self.tools.append(Tool(
|
|
|
name="GeneralWebSearch",
|
|
|
func=TavilySearchResults(max_results=2).invoke,
|
|
|
description="Use this tool as a fallback if the RAG tool finds no relevant information or if the query is about a general topic."
|
|
|
))
|
|
|
|
|
|
agent = create_react_agent(llm=self.llm, tools=self.tools, prompt=self.react_docstore_prompt)
|
|
|
|
|
|
self.agent_executor = AgentExecutor.from_agent_and_tools(
|
|
|
agent=agent,
|
|
|
tools=self.tools,
|
|
|
handle_parsing_errors=True,
|
|
|
verbose=True,
|
|
|
return_intermediate_steps=True,
|
|
|
max_iterations=5
|
|
|
)
|
|
|
logger.info(f"✅ Agent Executor(ReAct Agent) created successfully for '{self.domain}'.")
|
|
|
|
|
|
|
|
|
def answer(self, query, chat_history=None):
|
|
|
"""
|
|
|
Process a query using the agent and returns a clean dictionary.
|
|
|
|
|
|
Args:
|
|
|
query (str): User's question
|
|
|
chat_history (list): List of previous messages (AIMessage, HumanMessage)
|
|
|
|
|
|
Returns:
|
|
|
dict: Contains 'answer', 'context', 'validation', 'source', 'thoughts'
|
|
|
"""
|
|
|
if chat_history is None:
|
|
|
chat_history = []
|
|
|
self._current_chat_history = chat_history
|
|
|
if not self.agent_executor:
|
|
|
return {"answer": "Error: Agent not initialized.", "context": "", "validation": (False, "Init failed"), "source": "Error"}
|
|
|
|
|
|
start_time = self.metrics_tracker.start_query()
|
|
|
print(f"\n📝 AGENTIC_QA PROCESSING QUERY: '{query}'")
|
|
|
|
|
|
response = self.agent_executor.invoke({
|
|
|
"input": query,
|
|
|
"chat_history": chat_history,
|
|
|
"domain": self.domain,
|
|
|
"metadata": {
|
|
|
"domain": self.domain
|
|
|
}
|
|
|
})
|
|
|
thoughts= ""
|
|
|
|
|
|
final_answer = response.get("output", "Could not generate an answer")
|
|
|
|
|
|
tool_used = None
|
|
|
if "intermediate_steps" in response:
|
|
|
thought_log= []
|
|
|
for step in response["intermediate_steps"]:
|
|
|
|
|
|
action, observation = step
|
|
|
|
|
|
if isinstance(action, AgentAction) and action.tool:
|
|
|
tool_used = action.tool
|
|
|
|
|
|
|
|
|
thought_log.append(action.log)
|
|
|
thought_log.append(f"\nObservation: {str(observation)}\n---")
|
|
|
|
|
|
thoughts = "\n".join(thought_log)
|
|
|
|
|
|
|
|
|
if tool_used == "RAG":
|
|
|
source = "Etiqa Takaful Database" if self.domain == "insurance" else "Domain Database (RAG)"
|
|
|
elif tool_used == "EtiqaWebSearch":
|
|
|
source = "Etiqa Website Search"
|
|
|
elif tool_used == "TrustedIslamicSearch":
|
|
|
source = "Mufti WP & Dr Zul Search"
|
|
|
elif tool_used == "GeneralWebSearch":
|
|
|
source = "General Web Search"
|
|
|
else:
|
|
|
source = "Agent Logic"
|
|
|
|
|
|
logger.info(f"Tool used: {tool_used}, Source determined: {source}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
context = "No RAG context retrieved."
|
|
|
if source.endswith("(RAG)") or source.startswith("Etiqa Takaful Database"):
|
|
|
if self.context_retriever:
|
|
|
context = self.context_retriever.retrieve(query)
|
|
|
else:
|
|
|
context = "RAG tool was used, but ContextRetriever not initialized."
|
|
|
elif "Web" in source:
|
|
|
context = "Web search results were used. See 'Observation' in thoughts log."
|
|
|
|
|
|
validation = self.answer_validator.validate(query, final_answer, source=source)
|
|
|
|
|
|
response_time = self.metrics_tracker.end_query(start_time)
|
|
|
|
|
|
complexity_info = getattr(self, '_last_complexity_analysis', None)
|
|
|
|
|
|
|
|
|
self.metrics_tracker.log_query(
|
|
|
query=query,
|
|
|
domain=self.domain,
|
|
|
source=source,
|
|
|
complexity=complexity_info,
|
|
|
validation=validation,
|
|
|
response_time=response_time,
|
|
|
answer_preview=final_answer
|
|
|
)
|
|
|
return {"answer": final_answer, "context": context, "validation": validation, "source": source, "thoughts": thoughts,"response_time": response_time,
|
|
|
"complexity": complexity_info}
|
|
|
|
|
|
class AnswerValidatorAgent:
|
|
|
def __init__(self, llm, domain="general"):
|
|
|
self.llm = llm
|
|
|
self.domain = domain
|
|
|
self.general_prompt = ChatPromptTemplate.from_messages([
|
|
|
("system", (
|
|
|
"You are an answer validator. Check if the generated answer is factually correct "
|
|
|
"and relevant to the query. Return 'Valid' if the answer is correct and relevant, "
|
|
|
"or 'Invalid: [reason]' if not, where [reason] is a brief explanation of the issue."
|
|
|
)),
|
|
|
("human", "Query: {query}\nAnswer: {answer}")
|
|
|
])
|
|
|
self.medical_prompt = ChatPromptTemplate.from_messages([
|
|
|
("system", (
|
|
|
"You are an answer validator. Check if the generated answer is factually correct, "
|
|
|
"relevant to the query, and consistent with known medical knowledge. "
|
|
|
"Return 'Valid' if the answer is correct and relevant, or 'Invalid: [reason]' if not, "
|
|
|
"where [reason] is a brief explanation of the issue. "
|
|
|
"**Pay close attention to contradictions.** If an answer gives advice and then "
|
|
|
"contradicts it (e.g., 'switch immediately' and then 'always consult your doctor first'), "
|
|
|
"it is **Invalid** because it is unsafe and confusing."
|
|
|
)),
|
|
|
("human", "Query: {query}\nAnswer: {answer}")
|
|
|
])
|
|
|
self.islamic_prompt = ChatPromptTemplate.from_messages([
|
|
|
("system", (
|
|
|
"You are an answer validator for Islamic Fiqh or anything related to Islam. Check if the answer correctly addresses "
|
|
|
"the query based on the provided sources. The answer should be neutral and present "
|
|
|
"the required perspectives (e.g., Shafi'i and Maliki) if available. "
|
|
|
"Return 'Valid' if the answer is correct and relevant, or 'Invalid: [reason]' if not."
|
|
|
)),
|
|
|
("human", "Query: {query}\nAnswer: {answer}")
|
|
|
])
|
|
|
|
|
|
def validate(self, query, answer, source="RAG"):
|
|
|
if self.domain == "insurance":
|
|
|
logger.info("Skipping validation for insurance domain.")
|
|
|
return True, "Validation skipped for insurance domain."
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
prompt = self.general_prompt
|
|
|
if source == "RAG" or "Database" in source:
|
|
|
if self.domain == "medical":
|
|
|
prompt = self.medical_prompt
|
|
|
elif self.domain == "islamic":
|
|
|
prompt = self.islamic_prompt
|
|
|
|
|
|
response = self.llm.invoke(prompt.format(query=query, answer=answer))
|
|
|
validation = response.content.strip()
|
|
|
logger.info(f"AnswerValidator result for query '{query}': {validation}")
|
|
|
|
|
|
if validation.lower().startswith("valid"):
|
|
|
return True, "Answer is valid and relevant."
|
|
|
elif validation.lower().startswith("invalid"):
|
|
|
reason = validation.split(":", 1)[1].strip() if ":" in validation else "No reason provided."
|
|
|
return False, reason
|
|
|
else:
|
|
|
return False, "Validation response format unexpected."
|
|
|
except Exception as e:
|
|
|
logger.error(f"AnswerValidator error: {str(e)}")
|
|
|
return False, "Validation failed due to error." |