DeltaVenom's picture
Update app code and initialize runtime databases
72bff80
import re
import time
import json
from typing import Dict, List, Any, Optional
from collections import defaultdict
from agents.states import AgentState, ExtractedEntities
from rag.retriever import RAGRetriever
from langchain_core.messages import HumanMessage, SystemMessage
from models.llm import LLMFactory
# Compliance disclaimer to append to all answers
COMPLIANCE_DISCLAIMER = (
"\n\n---\n"
)
# Prompting rules for all agents
COMPLIANCE_RULES = """
CRITICAL RULES:
- ❌ OUT-OF-BOUNDS REFUSAL: If the user asks about topics NOT related to insurance (e.g., booking flights, recipes, general news), you MUST politely refuse and state that you can only assist with insurance-related queries.
- ❌ NO hallucinations - if a plan name is not in the provided context, state clearly that you do not have information about that specific plan.
- ❌ NO assumptions - if numerical data or policy details are missing from the context, do NOT invent them. Say "Information not available."
- ❌ NO meta-commentary - start answering the question directly.
- ✅ PROPER REDIRECTION: After refusing an out-of-bounds query, invite the user to ask about insurance products, available plans, or policy definitions.
- ✅ GROUNDING: Only use facts from the provided context. CIS overrides brochure for exclusions/charges.
"""
class AgentNodes:
"""
Enhanced LangGraph nodes implementing the full RAG specification.
"""
def __init__(self):
self.retriever = None
def _get_retriever(self) -> Optional[RAGRetriever]:
"""Lazy initialization of retriever."""
if not self.retriever:
try:
self.retriever = RAGRetriever()
except Exception:
return None
return self.retriever
def reload_retriever(self):
"""Triggers a reload of the retriever's index."""
retriever = self._get_retriever()
if retriever:
retriever.reload()
def _log_debug(self, msg: str):
"""Internal debug logger."""
print(f"[DEBUG] {msg}")
# =========================================================================
# NODE 1: Query Rewriter
# =========================================================================
def query_rewriter_node(self, state: AgentState) -> Dict[str, Any]:
"""
Rewrites conversational queries into self-contained, RAG-friendly queries.
Uses conversation history to resolve pronouns and implicit context.
"""
llm = LLMFactory.get_llm("low")
query = state["input"]
history = state.get("chat_history", [])
if not history:
return {"input": query}
system_prompt = (
"You are a professional query rewriter for an insurance consultation system. "
"Rewrite the latest user input to be a standalone search/extraction query.\n\n"
"RULES:\n"
"1. If the user provides a missing profile detail (e.g., 'pt 20'), combine it with previous profile data into a recommendation request: "
"'I want an insurance calculation for [age/gender] with Policy Term 20 years'.\n"
"2. Resolve all pronouns (it, they) and vague terms (the plan, previous one).\n"
"3. IMPORTANT: For general questions (e.g., 'What is PPT?') or broad listings (e.g., 'Show all plans'), do NOT inject the user's age/gender if it wasn't requested. Keep the search query clean.\n"
"4. Only preserve profile details (age, budget) if the user's latest query is a follow-up about a specific calculation or plan recommendation.\n"
"5. Return ONLY the rewritten query text."
)
history_str = "\n".join([f"- {h}" for h in history[-5:]]) # Last 5 turns
prompt = f"History:\n{history_str}\n\nLatest: {query}"
response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)])
rewritten = getattr(response, 'content', str(response)).strip()
return {"input": rewritten}
# =========================================================================
# NODE 2: Query Classifier
# =========================================================================
def query_classifier_node(self, state: AgentState) -> Dict[str, Any]:
"""
Classifies user intent into:
- list_plans: User wants to see available plans
- plan_details: User asks about a specific plan
- compare_plans: User wants to compare multiple plans
- recommendation: User seeks personalized advice
- general_query: General insurance questions
"""
llm = LLMFactory.get_llm("small")
query = state["input"].lower()
# 1. Plan Details (specific plan mentioned)
# Check specific plan indicators
specific_plan_indicators = ["star", "guaranteed income", "bharat savings", "premier", "smart value",
"raksha", "saral jeevan", "edelweiss", "tata", "generali", "pramerica",
"canara", "indusind", "max life", "hdfc", "icici"]
has_plan_name = any(plan in query for plan in specific_plan_indicators)
if has_plan_name and ("benefit" in query or "feature" in query or "detail" in query or "eligibility" in query):
return {"intent": "plan_details", "query_complexity": "low"}
# 2. Comparison (compare, difference, vs)
compare_keywords = ["compare", "difference", "better", "vs", "versus", "or"]
if any(kw in query for kw in compare_keywords) and has_plan_name:
return {"intent": "compare_plans", "query_complexity": "high"}
# 3. Listing queries - CHECK BEFORE RECOMMENDATION (to avoid "term" matching)
listing_keywords = ["list", "show me", "available", "which plans", "what plans",
"types of", "providers", "insurers", "all plans"]
if any(kw in query for kw in listing_keywords):
return {"intent": "list_plans", "query_complexity": "low"}
# 4. General FAQ queries - CHECK BEFORE RECOMMENDATION
# These include "what is", "what does", "explain", "define"
faq_keywords = ["what is", "what does", "explain", "define", "meaning of", "tell me about insurance",
"what are the types", "difference between", "how does insurance"]
if any(kw in query for kw in faq_keywords):
return {"intent": "general_query", "query_complexity": "low"}
# 5. Recommendation/Calculation queries
# IMPORTANT: Only specific recommendation indicators, avoiding generic words like "term", "mode"
recommendation_keywords = ["suggest", "recommend", "best for", "should i", "suitable for",
"calculate", "how much will i get", "what will i get",
"i am", "i'm", "my age", "my budget", "my premium",
"years old", "year old"]
# Also check for profile indicators (age, gender) combined with numbers/plan mention
has_profile = any(kw in query for kw in ["male", "female", "age =", "age=", "premium =", "premium=",
"pt =", "pt=", "ppt =", "ppt="])
has_numbers_with_context = any(kw in query for kw in recommendation_keywords) or has_profile
if has_numbers_with_context:
return {"intent": "recommendation", "query_complexity": "high"}
# 6. Fallback for explicit plan names if not caught by others
if has_plan_name:
return {"intent": "plan_details", "query_complexity": "low"}
# 7. Follow-up detection
if len(state.get("chat_history", [])) > 0 and ("details" in query or "more" in query):
return {"intent": "plan_details", "query_complexity": "low"}
# Default fallback
return {"intent": "general_query", "query_complexity": "low"}
# LLM-based classification for ambiguous cases
# This section is removed as per the instructions.
# system_prompt = (
# "Classify the user's insurance query into ONE of:\n"
# "- 'plan_details': Asking about features, benefits, eligibility of a SPECIFIC plan (should retrieve from documents)\n"
# "- 'list_plans': Wants to know WHICH plans are available from an insurer or category\n"
# "- 'recommendation': Seeks personalized benefit calculations or plan suggestions based on their profile (age, gender, premium)\n"
# "- 'general_query': General insurance terminology, concepts, or FAQs (not specific plans)\n\n"
# "IMPORTANT: 'What are the benefits of [Plan Name]' is 'plan_details', NOT 'recommendation'\n"
# "Return ONLY the category name."
# )
# response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=query)])
# intent = getattr(response, 'content', str(response)).lower().strip()
# valid_intents = ['list_plans', 'plan_details', 'recommendation', 'general_query']
# if intent not in valid_intents:
# intent = "plan_details" # Default fallback
# return {"intent": intent}
def entity_extractor_node(self, state: AgentState) -> Dict[str, Any]:
"""
Extracts structured entities from the query.
"""
# DEBUG: Write to file to ensure we see it
# try:
# with open("extraction_debug.log", "a") as f:
# f.write(f"\n\n[TIME] Execution at {time.time()}\n")
# f.write(f"[INPUT] {state.get('input', 'NO INPUT')}\n")
# f.write(f"[INTENT] {state.get('intent', 'NOT SET')}\n")
# except: pass
# DEBUG: Write to file to ensure we see it
try:
with open("extraction_debug.log", "a") as f:
f.write(f"\n\n[TIME] Execution at {time.time()}\n")
f.write(f"[INPUT] {state.get('input', 'NO INPUT')}\n")
f.write(f"[INTENT] {state.get('intent', 'NOT SET')}\n")
except: pass
try:
print(f"[ENTITY DEBUG] ===== STARTING ENTITY EXTRACTION =====")
# FORCE extraction for debugging if needed, but rely on logic
try:
with open("extraction_debug.log", "a") as f:
f.write(f"[STATUS] Starting extraction logic...\n")
except: pass
query = state["input"].lower()
# Extract providers
provider_map = {
"edelweiss": "Edelweiss Life",
"tata": "TATA AIA",
"tata aia": "TATA AIA",
"generali": "Generali Central",
"central": "Generali Central",
"pramerica": "PRAMERICA"
}
providers = []
for keyword, name in provider_map.items():
if keyword in query and name not in providers:
providers.append(name)
# Extract insurance types
type_map = {
"term": ["Term Insurance", "Term Plan"],
"ulip": ["Unit Linked Insurance Plan", "ULIP Plan"],
"wealth": ["Unit Linked Insurance Plan"],
"savings": ["Savings Plan", "Guaranteed Return"],
"retirement": ["Retirement and Pension"],
"pension": ["Retirement and Pension"],
"health": ["Health Insurance"],
"group": ["Group Plan"]
}
insurance_types = []
for keyword, types in type_map.items():
if keyword in query:
for t in types:
if t not in insurance_types:
insurance_types.append(t)
# Extract specific plan names using LLM
plan_names = self._extract_plan_names_from_query(state["input"])
# Extract user profile (Merge with existing data in state AND chat history)
existing_profile = state.get("extracted_entities", {}).get("user_profile", {})
history = state.get("chat_history", [])
new_profile = {}
# Always attempt extraction if it's a recommendation or if profile indicators exist
profile_indicators = ["old", "male", "female", "year", "lakh", "rs", "budget", "premium", "invest", "benefit", "pt ", "ppt ", "mode", "age"]
should_extract = any(ind in query for ind in profile_indicators) or state.get("intent") == "recommendation"
print(f"[EXTRACTION DEBUG] Should extract: {should_extract}, Intent: {state.get('intent')}")
try:
with open("extraction_debug.log", "a") as f:
f.write(f"[STATUS] Should extract: {should_extract}\n")
except: pass
if should_extract:
new_profile = self._extract_user_profile(state["input"], history=history)
print(f"[EXTRACTION DEBUG] Extracted profile: {new_profile}")
try:
with open("extraction_debug.log", "a") as f:
f.write(f"[STATUS] Extracted profile: {new_profile}\n")
except: pass
# Merge: new data overwrites old, but old data is kept if not in new
# IMPORTANT: Ensure keys with 'null' or empty values in new_profile do not overwrite valid existing data
user_profile = existing_profile.copy()
for k, v in new_profile.items():
if v is not None and v != "" and v != "null":
user_profile[k] = v
# Explicitly handle keys that often get dropped or overwritten incorrectly
if "policy_term" in new_profile and str(new_profile["policy_term"]).strip():
user_profile["policy_term"] = new_profile["policy_term"]
entities: ExtractedEntities = {
"provider": list(set(providers)) if providers else [],
"insurance_type": list(set(insurance_types)) if insurance_types else [],
"plan_names": list(set(plan_names)) if plan_names else [],
"user_profile": user_profile
}
# Build metadata filters from entities
filters = {}
if providers:
filters["insurer"] = providers
if insurance_types:
filters["insurance_type"] = insurance_types
try:
with open("extraction_debug.log", "a") as f:
f.write(f"[RESULT] Entities: {entities}\n")
f.write(f"[RESULT] Profile: {user_profile}\n")
except: pass
print(f"[ENTITY DEBUG] Final entities: {entities}")
result = {
"extracted_entities": entities,
"metadata_filters": filters
}
return result
except Exception as e:
try:
with open("extraction_debug.log", "a") as f:
f.write(f"[ERROR] {str(e)}\n")
import traceback
f.write(traceback.format_exc())
except: pass
print(f"[ENTITY DEBUG] Error: {e}")
import traceback
traceback.print_exc()
return {
"extracted_entities": {
"provider": [],
"insurance_type": [],
"plan_names": [],
"user_profile": {}
},
"metadata_filters": {}
}
def _extract_plan_names_from_query(self, query: str) -> List[str]:
"""Use LLM to extract specific plan names mentioned in query."""
llm = LLMFactory.get_llm("small")
system_prompt = (
"Extract EXACT insurance plan names from the query.\n"
"If the user is asking to compare, extract BOTH plan names.\n"
"RULES:\n"
"- Return one plan name per line\n"
"- Include insurer prefix if mentioned (e.g., 'TATA AIA Smart Value Income', 'Edelweiss Saral Jeevan Bima')\n"
"- Return EMPTY if no specific plan names found\n"
"- Do NOT invent plan names"
)
response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=query)])
result = getattr(response, 'content', str(response)).strip()
# Validation: If LLM returns a sentence instead of names, skip it
if "mentioned in" in result.lower() or "referring to" in result.lower() or len(result) > 200:
return []
if not result or result.lower() in ['none', 'empty', 'n/a']:
return []
# Parse response
plan_names = []
for line in result.split('\n'):
line = re.sub(r'^[\d\.\-\*\u2022]\s*', '', line).strip().strip('"\'')
if len(line) > 5:
plan_names.append(line)
return plan_names
def _extract_user_profile(self, query: str, history: List[str] = None) -> Dict[str, Any]:
"""Extract user profile information for recommendations, using history if available."""
profile = {}
# ========================================================================
# PRIORITY 1: REGEX EXTRACTION (Most Reliable)
# ========================================================================
# These patterns work with formats like:
# "age=30", "age = 30", "age is 30", "I am 30 years old"
query_lower = query.lower()
# Age extraction
age_patterns = [
r'\bage\s*[=:]\s*(\d+)', # age=30, age = 30, age: 30
r'\bage\s+is\s+(\d+)', # age is 30
r'i\s+am\s+(\d+)\s+years?\s+old', # I am 30 years old
r'(\d+)\s+years?\s+old', # 30 years old
r'\bage\s+(\d+)\b', # age 30
]
for pattern in age_patterns:
match = re.search(pattern, query_lower)
if match and not profile.get('age'):
try:
age = int(match.group(1))
if 18 <= age <= 100: # Expanded age range
profile['age'] = age
break
except: pass
# Gender extraction
if 'gender' not in profile:
if re.search(r'gender\s*[=:]\s*(male|m\b)', query_lower) or \
re.search(r'gender\s+is\s+(male|m\b)', query_lower) or \
re.search(r'\bmale\b', query_lower):
profile['gender'] = 'male'
elif re.search(r'gender\s*[=:]\s*(female|f\b)', query_lower) or \
re.search(r'gender\s+is\s+(female|f\b)', query_lower) or \
re.search(r'\bfemale\b', query_lower):
profile['gender'] = 'female'
# Premium extraction
premium_patterns = [
r'premium\s*[=:]\s*([\d,\.]+)', # premium=100000.50
r'premium\s+(?:amount\s+)?(?:is\s+)?(?:of\s+)?([\d,\.]+)',
r'invest(?:ing)?\s+([\d,\.]+)\s*(?:lakh|lac|cr|crore|k|thousand)?',
r'([\d,\.]+)\s*(?:lakh|lac|cr|crore|k|thousand)\s+(?:per year|annual|premium)',
r'budget\s*[=:]\s*([\d,\.]+)',
]
def parse_indian_amount(text):
"""Parse amounts like '1 lakh', '5.5 cr', '100,000'"""
if not text: return None
text = text.lower().replace(',', '').strip()
multiplier = 1
if 'lakh' in text or 'lac' in text: multiplier = 100000
elif 'cr' in text or 'crore' in text: multiplier = 10000000
elif 'k' in text: multiplier = 1000
# Find the number in the segment
nums = re.findall(r'(\d+(?:\.\d+)?)', text)
if nums:
try:
return int(float(nums[0]) * multiplier)
except: return None
return None
for pattern in premium_patterns:
match = re.search(pattern, query_lower)
if match and not profile.get('premium_amount'):
# Pass the matched segment to parser
amount = parse_indian_amount(match.group(0))
if amount and 500 <= amount <= 50000000:
profile['premium_amount'] = str(amount)
break
# Policy Term (PT)
pt_patterns = [
r'\bpt\s*[=:]\s*(\d+)',
r'\bpt\s+(\d+)\b',
r'policy\s+term\s*[=:]\s*(\d+)',
r'policy\s+term\s+(?:of\s+)?(\d+)',
r'term\s*[=:]\s*(\d+)\b',
]
for pattern in pt_patterns:
match = re.search(pattern, query_lower)
if match and not profile.get('policy_term'):
pt = match.group(1)
profile['policy_term'] = pt + " years"
break
# Payment Term (PPT)
ppt_patterns = [
r'\bppt\s*[=:]\s*(\d+)',
r'\bppt\s+(\d+)\b',
r'(?:premium\s+)?payment\s+term\s*[=:]\s*(\d+)',
r'paying\s+term\s*[=:]\s*(\d+)',
r'pay\s+term\s*[=:]\s*(\d+)',
]
for pattern in ppt_patterns:
match = re.search(pattern, query_lower)
if match and not profile.get('payment_term'):
ppt = match.group(1)
profile['payment_term'] = ppt + " years"
break
# Payment Mode
mode_patterns = [
r'mode\s*[=:]\s*(monthly|annual|yearly|quarterly|half\s*yearly)',
r'(?:premium\s+)?(?:payment\s+)?mode\s+(?:is\s+)?(monthly|annual|yearly|quarterly)',
r'\b(monthly|annual|yearly|quarterly)\b',
]
for pattern in mode_patterns:
match = re.search(pattern, query_lower)
if match and not profile.get('payment_mode'):
mode = match.group(1).strip()
if mode == 'yearly': mode = 'annual'
profile['payment_mode'] = mode
break
# ========================================================================
# PRIORITY 2: LLM EXTRACTION (Fallback for complex cases)
# ========================================================================
# Use LLM if critical fields are missing OR if it's a recommendation intent
critical_fields = ['age', 'gender', 'premium_amount']
missing_critical = any(field not in profile for field in critical_fields)
if missing_critical:
llm = LLMFactory.get_llm("medium")
history_context = ""
if history:
history_str = "\n".join([f"- {h}" for h in history[-5:]])
history_context = f"\n\nCONVERSATION HISTORY:\n{history_str}"
system_prompt = (
"Extract user profile details for insurance recommendations.\n"
"JSON Output fields (use null if unknown):\n"
"- age (number)\n"
"- gender (male/female)\n"
"- premium_amount (number)\n"
"- policy_term (number of years)\n"
"- payment_term (number of years)\n"
"- payment_mode (Monthly/Annual/Quarterly/Half-Yearly)\n\n"
"MAPPING RULES:\n"
"- PT = policy_term\n"
"- PPT = payment_term\n"
"- mode = payment_mode\n"
"- Extract from latest query AND history. Latest query wins conflicts.\n"
"Return ONLY a raw JSON object."
)
prompt = f"LATEST QUERY: {query}{history_context}"
try:
response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)])
result_text = getattr(response, 'content', str(response))
# Try to parse JSON
try:
# Clean the response in case LLM added markdown blocks
clean_json = re.search(r'\{.*\}', result_text, re.DOTALL)
if clean_json:
llm_profile = json.loads(clean_json.group(0))
# Merge LLM results into profile if regex didn't find them
if 'age' not in profile and llm_profile.get('age'):
profile['age'] = int(llm_profile['age'])
if 'gender' not in profile and llm_profile.get('gender'):
profile['gender'] = llm_profile['gender'].lower()
if 'premium_amount' not in profile and llm_profile.get('premium_amount'):
profile['premium_amount'] = str(llm_profile['premium_amount'])
if 'policy_term' not in profile and llm_profile.get('policy_term'):
profile['policy_term'] = str(llm_profile['policy_term']) + " years"
if 'payment_term' not in profile and llm_profile.get('payment_term'):
profile['payment_term'] = str(llm_profile['payment_term']) + " years"
if 'payment_mode' not in profile and llm_profile.get('payment_mode'):
profile['payment_mode'] = llm_profile['payment_mode'].title().replace('Annual', 'annual').lower()
except:
# Fallback to line-based parsing if JSON fails
for line in result_text.split('\n'):
if ':' in line:
parts = line.split(':', 1)
k = parts[0].strip().lower()
v = parts[1].strip().lower().replace('"', '').replace("'", "")
if v and v != 'null':
if 'age' in k and 'age' not in profile:
nums = re.findall(r'\d+', v)
if nums: profile['age'] = int(nums[0])
elif 'gender' in k and 'gender' not in profile: profile['gender'] = v
elif 'premium' in k and 'premium_amount' not in profile: profile['premium_amount'] = v
elif 'policy_term' in k or 'pt' == k and 'policy_term' not in profile: profile['policy_term'] = v + " years"
elif 'payment_term' in k or 'ppt' == k and 'payment_term' not in profile: profile['payment_term'] = v + " years"
except Exception as e:
print(f"[WARNING] LLM extraction failed: {e}")
return profile
# =========================================================================
# NODE 4: Retrieval Router
# =========================================================================
def retrieval_router_node(self, state: AgentState) -> Dict[str, Any]:
"""
Determines retrieval strategy based on intent.
"""
intent = state.get("intent", "plan_details")
strategy_map = {
"list_plans": "metadata_only",
"plan_details": "plan_level",
"compare_plans": "cross_plan",
"recommendation": "section_specific",
"general_query": "plan_level"
}
return {"retrieval_strategy": strategy_map.get(intent, "plan_level")}
# =========================================================================
# NODE 5: Retriever
# =========================================================================
def retriever_node(self, state: AgentState) -> Dict[str, Any]:
"""
Retrieves documents with:
- Metadata filtering
- CIS boosting for exclusions/charges/conditions
- Deduplication by similarity
"""
retriever = self._get_retriever()
if not retriever:
return {"context": [], "retrieved_chunks": {}}
query = state["input"]
filters = state.get("metadata_filters", {})
entities = state.get("extracted_entities", {})
strategy = state.get("retrieval_strategy", "plan_level")
# If specific plan names were extracted, use them for precise retrieval
plan_names = entities.get("plan_names") or []
matched_plans = []
if plan_names:
# Resolve to actual plan names in index
all_plans = self._list_plans_from_index()
for name in plan_names:
match = self._find_closest_plan_name(name, all_plans)
if match:
matched_plans.append(match)
# Update filters for non-comparison queries (for comparison, _retrieve_for_comparison handles it)
if matched_plans and strategy != "cross_plan":
filters = filters.copy()
filters["product_name"] = matched_plans[0] if len(matched_plans) == 1 else matched_plans
boost_cis = any(kw in query.lower() for kw in
["exclusion", "excluded", "not covered", "charges", "fee", "condition", "waiting"])
# Retrieve documents
if strategy == "cross_plan":
# For comparisons, retrieve for each plan separately
# Pass matched_plans if we have them, otherwise it will try to find them from filters
chunks_by_plan = self._retrieve_for_comparison(query, filters, entities, matched_plans=matched_plans)
else:
docs = retriever.search(query, filters=filters, k=8)
chunks_by_plan = self._group_by_plan_id(docs)
# Boost CIS documents if needed
if boost_cis:
chunks_by_plan = self._boost_cis_chunks(chunks_by_plan)
# Format context strings
limit_per_plan = 5 if strategy == "cross_plan" else 3
context = self._format_context(chunks_by_plan, limit=limit_per_plan)
return {
"context": context,
"retrieved_chunks": chunks_by_plan
}
def _format_context(self, chunks_by_plan: Dict[str, List[Dict]], limit: int = 3) -> List[str]:
"""Helper to format chunks into LLM-readable context strings."""
context = []
for plan_id, chunks in chunks_by_plan.items():
for chunk in chunks[:limit]:
content = chunk.get("content", "")
plan_name = chunk.get("product_name", "Unknown")
doc_type = chunk.get("document_type", "brochure")
section = chunk.get("section", "General")
context.append(f"[{plan_name} - {doc_type.upper()} - {section}] {content}")
return context
def _retrieve_for_comparison(self, query: str, filters: Dict, entities: Dict, matched_plans: List[str] = None) -> Dict[str, List]:
"""Retrieve chunks for each plan separately in comparison mode."""
retriever = self._get_retriever()
if not retriever:
return {}
if not matched_plans:
plan_names = entities.get("plan_names") or []
all_index_plans = self._list_plans_from_index()
matched_plans = []
for name in plan_names:
match = self._find_closest_plan_name(name, all_index_plans)
if match:
matched_plans.append(match)
if not matched_plans:
# Plan A: Deterministic "List & Match" Discovery
# For each provider, list all their plans and see if any match the query
providers = entities.get("provider") or []
if not providers:
search_providers = [None]
else:
search_providers = providers
discovered_names = []
all_plans_in_index = self._list_plans_from_index()
for prov in search_providers:
prov_filter = {"insurer": prov} if prov else {}
prov_plans = self._list_plans_from_index(filters=prov_filter)
self._log_debug(f"Provider: {prov}, Plans found: {len(prov_plans)}")
# Try to find which plan from this insurer is mentioned in the query
match = self._find_closest_plan_name(query, prov_plans)
self._log_debug(f"Match for {prov}: {match} (In list: {match in prov_plans})")
if match and match in prov_plans and match not in discovered_names:
discovered_names.append(match)
matched_plans = discovered_names
if not matched_plans:
# Plan B: Fall back to broad similarity-based discovery as a last resort
discovery_docs = retriever.search(query, k=20)
for d in discovery_docs:
p_name = d.metadata.get("product_name")
if p_name and p_name not in matched_plans:
matched_plans.append(p_name)
matched_plans = matched_plans[:3]
if not matched_plans:
# Plan B: Fall back to listing plans matching filters (metadata-only)
matched_plans = self._list_plans_from_index(filters)[:5]
chunks_by_plan = defaultdict(list)
for matched in matched_plans:
# Use a focused query for each plan instead of the broad comparison query
# This helps the retriever find relevant feature chunks for the specific plan
focused_query = f"features, benefits, eligibility and exclusions of {matched}"
# Find the insurer for this product from cache for better filtering
matched_insurer = None
if hasattr(self, "_cached_plans") and self._cached_plans:
for p_meta in self._cached_plans:
if p_meta["product_name"] == matched:
matched_insurer = p_meta.get("insurer")
break
# IMPORTANT: Search by product_name directly if possible
search_filters = {"product_name": matched}
if matched_insurer:
search_filters["insurer"] = matched_insurer
# Use a slightly lower k because we are being very specific with the filter
docs = retriever.search(focused_query, filters=search_filters, k=20)
plan_chunks = []
for doc in docs:
doc_product = doc.metadata.get("product_name", "")
# Final check for safety, but with accurate fuzzy matching
if self._find_closest_plan_name(doc_product, [matched]) == matched:
plan_chunks.append(doc)
for doc in plan_chunks[:8]:
# Use product_name for the key instead of plan_id to ensure clean table headers
plan_name = doc.metadata.get("product_name", matched)
chunks_by_plan[plan_name].append({
"content": doc.page_content,
"product_name": doc.metadata.get("product_name"),
"document_type": doc.metadata.get("document_type", "brochure"),
"section": doc.metadata.get("section", "General")
})
return dict(chunks_by_plan)
def _group_by_plan_id(self, docs: List) -> Dict[str, List]:
"""Group retrieved documents by plan_id."""
grouped = defaultdict(list)
for doc in docs:
# Prefer product_name for display keys
plan_name = doc.metadata.get("product_name", doc.metadata.get("plan_id", "unknown"))
grouped[plan_name].append({
"content": doc.page_content,
"product_name": doc.metadata.get("product_name"),
"document_type": doc.metadata.get("document_type", "brochure"),
"section": doc.metadata.get("section", "General")
})
return dict(grouped)
def _boost_cis_chunks(self, chunks_by_plan: Dict[str, List]) -> Dict[str, List]:
"""Boost CIS documents to appear first for each plan."""
boosted = {}
for plan_id, chunks in chunks_by_plan.items():
cis_chunks = [c for c in chunks if c.get("document_type") == "cis"]
brochure_chunks = [c for c in chunks if c.get("document_type") != "cis"]
boosted[plan_id] = cis_chunks + brochure_chunks
return boosted
# =========================================================================
# NODE 6: Plan Aggregator
# =========================================================================
def plan_aggregator_node(self, state: AgentState) -> Dict[str, Any]:
"""
Aggregates chunks by plan_id, merging brochure and CIS context.
CIS overrides brochure for exclusions, charges, conditions.
"""
chunks_by_plan = state.get("retrieved_chunks", {})
# Already grouped, just ensure proper ordering
aggregated = {}
for plan_id, chunks in chunks_by_plan.items():
# Separate by document type
cis_chunks = [c for c in chunks if c.get("document_type") == "cis"]
brochure_chunks = [c for c in chunks if c.get("document_type") != "cis"]
# For exclusions/charges sections, prefer CIS
override_sections = ["Exclusions", "Charges", "Waiting Period", "Conditions"]
final_chunks = []
covered_sections = set()
# Add CIS chunks first for override sections
for chunk in cis_chunks:
section = chunk.get("section", "General")
if section in override_sections:
final_chunks.append(chunk)
covered_sections.add(section)
# Add brochure chunks, skipping overridden sections
for chunk in brochure_chunks:
section = chunk.get("section", "General")
if section not in covered_sections:
final_chunks.append(chunk)
# Add remaining CIS chunks
for chunk in cis_chunks:
if chunk not in final_chunks:
final_chunks.append(chunk)
aggregated[plan_id] = final_chunks
# Refresh context strings based on aggregated chunks
intent = state.get("intent", "compare_plans")
limit = 5 if intent == "compare_plans" else 3
context = self._format_context(aggregated, limit=limit)
return {
"retrieved_chunks": aggregated,
"context": context
}
# =========================================================================
# NODE 7: Listing Agent
# =========================================================================
def listing_agent(self, state: AgentState) -> Dict[str, Any]:
"""
Lists available plans based on filters.
Uses direct index access for accuracy.
"""
llm = LLMFactory.get_llm("small")
query = state["input"]
filters = state.get("metadata_filters", {})
plans = self._list_plans_from_index(filters)
plans = sorted(list(set(plans)))
if not plans:
filter_desc = ", ".join([str(v) for v in filters.values()]) if filters else "your criteria"
answer = f"I couldn't find any plans matching {filter_desc}. Please try a different search."
return {"context": [], "answer": answer}
plans_str = "\n".join([f"- {p}" for p in plans])
# Describe the filters
filter_parts = []
if filters.get("insurer"):
insurer_list = filters["insurer"] if isinstance(filters["insurer"], list) else [filters["insurer"]]
filter_parts.append(f"from {', '.join(insurer_list)}")
if filters.get("insurance_type"):
type_list = filters["insurance_type"] if isinstance(filters["insurance_type"], list) else [filters["insurance_type"]]
filter_parts.append(f"in {', '.join(type_list)} category")
filter_desc = " ".join(filter_parts) if filter_parts else ""
system_prompt = (
"Present the following insurance plans in a clear, friendly manner.\n"
"RULES:\n"
"- ONLY include plans from the list below\n"
"- Group by insurer if multiple insurers present\n"
"- Use bullet points for clarity\n"
"- Do NOT mention technical details about data retrieval"
)
prompt = f"User asked: {query}\n\nAvailable plans {filter_desc}:\n{plans_str}"
response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)])
answer = getattr(response, 'content', str(response))
return {"context": [f"Plans: {plans}"], "answer": answer}
# =========================================================================
# NODE 8: Plan Details Agent (Retrieval Agent)
# =========================================================================
def retrieval_agent(self, state: AgentState) -> Dict[str, Any]:
"""
Agent for answering plan-specific or comparison questions using retrieved context.
"""
complexity = state.get("query_complexity", "low")
llm = LLMFactory.get_llm(complexity)
query = state["input"]
context = state.get("context", [])
entities = state.get("extracted_entities", {})
if not context:
# Fallback retrieval with better filtering
retriever = self._get_retriever()
if retriever:
# Try to extract plan names from query for better filtering
plan_names = entities.get("plan_names", [])
filters = state.get("metadata_filters", {})
# If we have plan names, use them for filtering
if plan_names:
filters["product_name"] = plan_names
# Retrieve with filters
if filters:
docs = retriever.search(query, filters=filters, k=10)
else:
docs = retriever.search(query, k=10)
# Format context with plan names
context = [f"[{d.metadata.get('product_name', 'Unknown')}] {d.page_content}" for d in docs]
# If still no context, provide a helpful message
if not context:
return {
"answer": "I couldn't find specific information about that plan in my knowledge base. "
"Could you please provide more details or try asking about a different plan? "
"You can also ask me to list available plans."
}
context_str = "\n\n".join(context)
system_prompt = f"""You are an Insurance Policy Specialist providing accurate information.
{COMPLIANCE_RULES}
STRICT GROUNDING RULES:
- Answer the user's question using the Policy Context provided to you.
- If the requested plan is NOT mentioned in the Policy Context, say: "I'm sorry, but I couldn't find information regarding [Plan Name] in our current policy database. Please verify the name or ask me to list available plans."
- If the question is about non-insurance topics, refuse using the OUT-OF-BOUNDS REFUSAL rule.
- Structure your response with clear headings and bullet points.
"""
prompt = f"Policy Context:\n{context_str}\n\nUser Question: {query}"
response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)])
answer = getattr(response, 'content', str(response))
return {"answer": answer}
# =========================================================================
# NODE 9: Recommendation Agent (Advisory)
# =========================================================================
def advisory_agent(self, state: AgentState) -> Dict[str, Any]:
"""
Provides personalized recommendations based on user profile.
Grounds all advice in retrieved documents.
If critical info (age/gender/premium) is missing for specific plans, asks for it.
"""
llm = LLMFactory.get_llm("large")
query = state["input"]
context = state.get("context", [])
entities = state.get("extracted_entities", {})
user_profile = entities.get("user_profile", {})
# Check for Insurer and Guaranteed/Savings context
providers = entities.get("provider", [])
is_guaranteed = any(t in ["Savings Plan", "Guaranteed Return"] for t in entities.get("insurance_type", []))
is_rec = state.get("intent") == "recommendation"
# Only block and ask for info IF the intent is explicitly a recommendation/calculation
if is_rec:
print(f"[ADVISORY DEBUG] Full entities: {entities}")
print(f"[ADVISORY DEBUG] User profile: {user_profile}")
missing = []
if not user_profile.get("age"): missing.append("age")
if not user_profile.get("gender"): missing.append("gender")
if not user_profile.get("premium_amount"): missing.append("annual premium amount")
if not user_profile.get("policy_term"): missing.append("policy term (PT)")
if not user_profile.get("payment_term"): missing.append("premium payment term (PPT)")
if not user_profile.get("payment_mode"): missing.append("premium payment mode")
print(f"[ADVISORY DEBUG] Missing fields check:")
for field in ["age", "gender", "premium_amount", "policy_term", "payment_term", "payment_mode"]:
value = user_profile.get(field)
print(f" - {field}: {value} (truthy: {bool(value)})")
print(f"[ADVISORY DEBUG] Final missing list: {missing}")
# Block and ask for info for professional consultation
if missing:
missing_str = " and ".join([", ".join(missing[:-1]), missing[-1]] if len(missing) > 1 else missing)
return {"answer": f"To provide you with specific benefit figures and a professional recommendation, I need a few more details: **{missing_str}**. Could you please provide these?"}
# If we have everything, get the numbers
calc_result = self.plan_calculator_tool(state)
state["reasoning_output"] = calc_result.get("reasoning_output", "")
else:
# If not a recommendation intent, check if we have enough profile data to show numbers anyway
# (e.g., if user asks about a specific plan but we already know their profile)
if user_profile.get("age") and user_profile.get("premium_amount") and user_profile.get("policy_term"):
calc_result = self.plan_calculator_tool(state)
state["reasoning_output"] = calc_result.get("reasoning_output", "")
calculation_info = ""
raw_calc = state.get('reasoning_output', '')
if raw_calc:
try:
calc_json = json.loads(raw_calc)
table = calc_json.get("summary_table", "")
if table:
calculation_info = f"\n\n### MANDATORY GROUNDING: NUMERICAL DATA TABLE\n{table}\n(PRIORITIZE THESE PLANS AND NUMBERS OVER ANY TEXT BELOW)\n"
except: pass
context_str = "\n\n".join(context) if context else "No plans found."
profile_info = ""
if user_profile:
profile_parts = [f"{k}: {v}" for k, v in user_profile.items() if v]
if profile_parts:
profile_info = f"\n\nUser Profile: {', '.join(profile_parts)}"
system_prompt = f"""You are an Expert Insurance Advisor.
{COMPLIANCE_RULES}
RECOMMENDATION RULES:
- 🚨 PRIORITY 1: Recommending plans from the 'MANDATORY GROUNDING' table above. Use those EXACT numbers.
- 🚨 PRIORITY 2: Only provide benefit calculations for the plans in the GROUNDING table.
- If the user asks about plans not in the table for calculation, say you don't have calculation data for them yet.
- If the query is out-of-bounds, use the OUT-OF-BOUNDS REFUSAL rule.
- NEVER say "Not Available" if numbers exist in the grounding table.
- Be consultative and grounded.
"""
prompt = f"{calculation_info}\n\nPolicy Context:\n{context_str}{profile_info}\n\nUser Question: {query}"
response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)])
answer = getattr(response, 'content', str(response))
return {"answer": answer}
# =========================================================================
# NODE 11: General Query Agent (FAQ)
# =========================================================================
def faq_agent(self, state: AgentState) -> Dict[str, Any]:
"""
Agent for general insurance questions (glossary, concepts).
"""
llm = LLMFactory.get_llm("low")
query = state["input"]
context = state.get("context", [])
# Try to retrieve context for general insurance terms if not already provided
if not context:
retriever = self._get_retriever()
if retriever:
# Use broader search for general queries
docs = retriever.search(query, k=3) # Reduced from 5 to 3 for more focused context
if docs:
context = [d.page_content for d in docs]
context_str = "\n\n".join(context) if context else ""
system_prompt = f"""You are an Insurance Helpdesk Assistant.
{COMPLIANCE_RULES}
INSTRUCTIONS:
- For insurance terminology: Provide a clear, concise definition.
- 🚨 STRICT RULE: If the user asks about ANYTHING non-insurance related (e.g., travel tickets, cooking, etc.), you MUST refuse and redirect to insurance topics.
- 🚨 NO HALLUCINATION: If the term is not common insurance knowledge and not in context, say you don't know rather than guessing.
- Keep the total response under 150 words.
Common Insurance Terms to use as reference:
- **Policy Term (PT)**: The total duration for which the policy remains active.
- **Premium Payment Term (PPT)**: The duration during which premiums must be paid.
- **Maturity Benefit**: The lump sum amount paid when the policy matures.
- **Sum Assured**: The guaranteed amount payable on death or maturity.
"""
prompt = f"Context (if relevant):\n{context_str}\n\nUser Question: {query}" if context_str else f"User Question: {query}"
response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)])
answer = getattr(response, 'content', str(response))
return {"answer": answer}
# =========================================================================
# NODE 12: Guardrail
# =========================================================================
def guardrail_node(self, state: AgentState) -> Dict[str, Any]:
"""
Final validation and compliance disclaimer.
- Validates answer is grounded
- Adds compliance disclaimer
- Blocks hallucinated content
"""
answer = state.get("answer", "")
if not answer:
answer = "I apologize, but I couldn't generate a response. Please try rephrasing your question."
# Add compliance disclaimer
if COMPLIANCE_DISCLAIMER not in answer:
answer = answer + COMPLIANCE_DISCLAIMER
return {"answer": answer}
# =========================================================================
# TOOL: Plan Calculator Tool
# =========================================================================
def plan_calculator_tool(self, state: AgentState) -> Dict[str, Any]:
"""
Tool logic to calculate benefits using the API's dummy logic.
Extremely robust extraction fallback for age, gender, and premium.
"""
from api.plans import get_plan_benefits_tool, resolve_plan_id
user_profile = state.get("extracted_entities", {}).get("user_profile", {})
plan_names = state.get("extracted_entities", {}).get("plan_names", [])
query = state["input"].lower()
# --- ROBUST FALLBACKS ---
# 1. Age Fallback
age = user_profile.get("age")
if not age:
age_match = re.search(r'\b(\d{2})\b\s*(?:year|yr|old|male|female)?', query)
if age_match:
age = int(age_match.group(1))
# 2. Gender Fallback
gender = user_profile.get("gender")
if not gender:
if "male" in query and "female" not in query: gender = "male"
elif "female" in query: gender = "female"
# 3. Premium Fallback
premium = user_profile.get("premium_amount")
clean_premium = 0.0
if not premium:
# Look for any number followed by a potential unit
prem_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:rs\.?|inr|lakh|cr|k|thousand)?', query)
if prem_match:
val = float(prem_match.group(1))
unit_search = query[prem_match.start():prem_match.end()+20] # look ahead
if 'lakh' in unit_search: val *= 100000
elif 'cr' in unit_search: val *= 10000000
elif any(k in unit_search for k in ['k', 'thousand']): val *= 1000
clean_premium = val
else:
try:
if isinstance(premium, (int, float)):
clean_premium = float(premium)
else:
nums = re.findall(r'\d+\.?\d*', str(premium))
if nums:
clean_premium = float(nums[0])
if 'lakh' in str(premium).lower(): clean_premium *= 100000
elif 'cr' in str(premium).lower(): clean_premium *= 10000000
except:
pass
if not (age and gender and clean_premium > 0):
return {"reasoning_output": "Insufficient data (age, gender, or premium) to calculate benefits."}
# 4. Resolve Plan IDs
pids = []
for name in plan_names:
pid = resolve_plan_id(name)
if pid: pids.append(pid)
# If no specific plan found, calculate for ALL default plans
target_plan_id = pids[0] if len(pids) == 1 else None
# 5. Execute Tool
calculation_json = get_plan_benefits_tool(
age=int(age),
gender=str(gender),
premium_amount=clean_premium,
plan_id=target_plan_id,
policy_term=user_profile.get("policy_term"),
payment_term=user_profile.get("payment_term"),
payment_mode=user_profile.get("payment_mode")
)
return {"reasoning_output": calculation_json}
# =========================================================================
# HELPER METHODS
# =========================================================================
def _list_plans_from_index(self, filters: Dict = None) -> List[str]:
"""Returns unique product names matching filters. Optimized with caching."""
retriever = self._get_retriever()
if not retriever:
return []
try:
# Use a simple cache attribute on the instance if it doesn't exist
if not hasattr(self, "_cached_plans") or self._cached_plans is None:
store = retriever.vector_store
plans_metadata = []
for doc in store.docstore._dict.values():
p_name = doc.metadata.get('product_name')
insurer = doc.metadata.get('insurer')
i_type = doc.metadata.get('insurance_type')
if p_name:
plans_metadata.append({
"product_name": p_name,
"insurer": insurer,
"insurance_type": i_type
})
self._cached_plans = plans_metadata
# Filter from cache
plans = set()
for meta in self._cached_plans:
if filters:
match = True
for k, v in filters.items():
doc_val = str(meta.get(k, "")).lower().strip()
if not doc_val:
match = False
break
# Standardize filter values to list of lowercase strings
filter_values = v if isinstance(v, list) else [v]
filter_values = [str(fv).lower().strip() for fv in filter_values]
# Robust match: any filter item matches or is matched by doc_val
val_match = False
for fv in filter_values:
if k == "product_name":
if fv in doc_val or doc_val in fv:
val_match = True
break
elif k == "insurer": # Strictly match insurer names
if fv == doc_val:
val_match = True
break
else: # For other keys like insurance_type, allow exact match
if fv == doc_val:
val_match = True
break
if not val_match:
match = False
break
if not match:
continue
plans.add(meta["product_name"])
return sorted(list(plans))
except Exception:
return []
def _find_closest_plan_name(self, query_plan: str, all_plans: List[str]) -> Optional[str]:
"""Finds closest matching plan name using fuzzy matching."""
if not all_plans:
return query_plan
def normalize(s):
return s.lower().replace(" ", "").replace("-", "").replace("_", "").replace("edelweisslife", "edelweiss")
query_norm = normalize(query_plan)
# 1. Exact match (case insensitive)
for plan in all_plans:
if plan.lower() == query_plan.lower():
return plan
# 2. Normalized containment match (High Confidence)
# Check if the plan name is mentioned in the query
for plan in all_plans:
plan_norm = normalize(plan)
if plan_norm in query_norm or query_norm in plan_norm:
return plan
# 3. Word overlap (Lower Confidence fallback)
query_words = set(query_plan.lower().split())
# REMOVED insurer names from stop_words because they are critical for distinguishing
# similar plan names (like 'Saral Jeevan Bima') across different companies.
stop_words = {"plan", "insurance", "the", "a", "of", "with", "compare", "is", "between"}
query_significant = query_words - stop_words
best_match = None
max_overlap = 0
for plan in all_plans:
plan_words = set(plan.lower().split())
plan_significant = plan_words - stop_words
# Count significant word overlap
overlap = len(query_significant.intersection(plan_significant))
if overlap > max_overlap:
max_overlap = overlap
best_match = plan
# Return best match if we found significant overlap (at least 2 words)
return best_match if max_overlap >= 2 else query_plan
# Singleton instance
nodes = AgentNodes()