import re import time import json from typing import Dict, List, Any, Optional from collections import defaultdict from agents.states import AgentState, ExtractedEntities from rag.retriever import RAGRetriever from langchain_core.messages import HumanMessage, SystemMessage from models.llm import LLMFactory # Compliance disclaimer to append to all answers COMPLIANCE_DISCLAIMER = ( "\n\n---\n" ) # Prompting rules for all agents COMPLIANCE_RULES = """ CRITICAL RULES: - ❌ OUT-OF-BOUNDS REFUSAL: If the user asks about topics NOT related to insurance (e.g., booking flights, recipes, general news), you MUST politely refuse and state that you can only assist with insurance-related queries. - ❌ NO hallucinations - if a plan name is not in the provided context, state clearly that you do not have information about that specific plan. - ❌ NO assumptions - if numerical data or policy details are missing from the context, do NOT invent them. Say "Information not available." - ❌ NO meta-commentary - start answering the question directly. - ✅ PROPER REDIRECTION: After refusing an out-of-bounds query, invite the user to ask about insurance products, available plans, or policy definitions. - ✅ GROUNDING: Only use facts from the provided context. CIS overrides brochure for exclusions/charges. """ class AgentNodes: """ Enhanced LangGraph nodes implementing the full RAG specification. """ def __init__(self): self.retriever = None def _get_retriever(self) -> Optional[RAGRetriever]: """Lazy initialization of retriever.""" if not self.retriever: try: self.retriever = RAGRetriever() except Exception: return None return self.retriever def reload_retriever(self): """Triggers a reload of the retriever's index.""" retriever = self._get_retriever() if retriever: retriever.reload() def _log_debug(self, msg: str): """Internal debug logger.""" print(f"[DEBUG] {msg}") # ========================================================================= # NODE 1: Query Rewriter # ========================================================================= def query_rewriter_node(self, state: AgentState) -> Dict[str, Any]: """ Rewrites conversational queries into self-contained, RAG-friendly queries. Uses conversation history to resolve pronouns and implicit context. """ llm = LLMFactory.get_llm("low") query = state["input"] history = state.get("chat_history", []) if not history: return {"input": query} system_prompt = ( "You are a professional query rewriter for an insurance consultation system. " "Rewrite the latest user input to be a standalone search/extraction query.\n\n" "RULES:\n" "1. If the user provides a missing profile detail (e.g., 'pt 20'), combine it with previous profile data into a recommendation request: " "'I want an insurance calculation for [age/gender] with Policy Term 20 years'.\n" "2. Resolve all pronouns (it, they) and vague terms (the plan, previous one).\n" "3. IMPORTANT: For general questions (e.g., 'What is PPT?') or broad listings (e.g., 'Show all plans'), do NOT inject the user's age/gender if it wasn't requested. Keep the search query clean.\n" "4. Only preserve profile details (age, budget) if the user's latest query is a follow-up about a specific calculation or plan recommendation.\n" "5. Return ONLY the rewritten query text." ) history_str = "\n".join([f"- {h}" for h in history[-5:]]) # Last 5 turns prompt = f"History:\n{history_str}\n\nLatest: {query}" response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)]) rewritten = getattr(response, 'content', str(response)).strip() return {"input": rewritten} # ========================================================================= # NODE 2: Query Classifier # ========================================================================= def query_classifier_node(self, state: AgentState) -> Dict[str, Any]: """ Classifies user intent into: - list_plans: User wants to see available plans - plan_details: User asks about a specific plan - compare_plans: User wants to compare multiple plans - recommendation: User seeks personalized advice - general_query: General insurance questions """ llm = LLMFactory.get_llm("small") query = state["input"].lower() # 1. Plan Details (specific plan mentioned) # Check specific plan indicators specific_plan_indicators = ["star", "guaranteed income", "bharat savings", "premier", "smart value", "raksha", "saral jeevan", "edelweiss", "tata", "generali", "pramerica", "canara", "indusind", "max life", "hdfc", "icici"] has_plan_name = any(plan in query for plan in specific_plan_indicators) if has_plan_name and ("benefit" in query or "feature" in query or "detail" in query or "eligibility" in query): return {"intent": "plan_details", "query_complexity": "low"} # 2. Comparison (compare, difference, vs) compare_keywords = ["compare", "difference", "better", "vs", "versus", "or"] if any(kw in query for kw in compare_keywords) and has_plan_name: return {"intent": "compare_plans", "query_complexity": "high"} # 3. Listing queries - CHECK BEFORE RECOMMENDATION (to avoid "term" matching) listing_keywords = ["list", "show me", "available", "which plans", "what plans", "types of", "providers", "insurers", "all plans"] if any(kw in query for kw in listing_keywords): return {"intent": "list_plans", "query_complexity": "low"} # 4. General FAQ queries - CHECK BEFORE RECOMMENDATION # These include "what is", "what does", "explain", "define" faq_keywords = ["what is", "what does", "explain", "define", "meaning of", "tell me about insurance", "what are the types", "difference between", "how does insurance"] if any(kw in query for kw in faq_keywords): return {"intent": "general_query", "query_complexity": "low"} # 5. Recommendation/Calculation queries # IMPORTANT: Only specific recommendation indicators, avoiding generic words like "term", "mode" recommendation_keywords = ["suggest", "recommend", "best for", "should i", "suitable for", "calculate", "how much will i get", "what will i get", "i am", "i'm", "my age", "my budget", "my premium", "years old", "year old"] # Also check for profile indicators (age, gender) combined with numbers/plan mention has_profile = any(kw in query for kw in ["male", "female", "age =", "age=", "premium =", "premium=", "pt =", "pt=", "ppt =", "ppt="]) has_numbers_with_context = any(kw in query for kw in recommendation_keywords) or has_profile if has_numbers_with_context: return {"intent": "recommendation", "query_complexity": "high"} # 6. Fallback for explicit plan names if not caught by others if has_plan_name: return {"intent": "plan_details", "query_complexity": "low"} # 7. Follow-up detection if len(state.get("chat_history", [])) > 0 and ("details" in query or "more" in query): return {"intent": "plan_details", "query_complexity": "low"} # Default fallback return {"intent": "general_query", "query_complexity": "low"} # LLM-based classification for ambiguous cases # This section is removed as per the instructions. # system_prompt = ( # "Classify the user's insurance query into ONE of:\n" # "- 'plan_details': Asking about features, benefits, eligibility of a SPECIFIC plan (should retrieve from documents)\n" # "- 'list_plans': Wants to know WHICH plans are available from an insurer or category\n" # "- 'recommendation': Seeks personalized benefit calculations or plan suggestions based on their profile (age, gender, premium)\n" # "- 'general_query': General insurance terminology, concepts, or FAQs (not specific plans)\n\n" # "IMPORTANT: 'What are the benefits of [Plan Name]' is 'plan_details', NOT 'recommendation'\n" # "Return ONLY the category name." # ) # response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=query)]) # intent = getattr(response, 'content', str(response)).lower().strip() # valid_intents = ['list_plans', 'plan_details', 'recommendation', 'general_query'] # if intent not in valid_intents: # intent = "plan_details" # Default fallback # return {"intent": intent} def entity_extractor_node(self, state: AgentState) -> Dict[str, Any]: """ Extracts structured entities from the query. """ # DEBUG: Write to file to ensure we see it # try: # with open("extraction_debug.log", "a") as f: # f.write(f"\n\n[TIME] Execution at {time.time()}\n") # f.write(f"[INPUT] {state.get('input', 'NO INPUT')}\n") # f.write(f"[INTENT] {state.get('intent', 'NOT SET')}\n") # except: pass # DEBUG: Write to file to ensure we see it try: with open("extraction_debug.log", "a") as f: f.write(f"\n\n[TIME] Execution at {time.time()}\n") f.write(f"[INPUT] {state.get('input', 'NO INPUT')}\n") f.write(f"[INTENT] {state.get('intent', 'NOT SET')}\n") except: pass try: print(f"[ENTITY DEBUG] ===== STARTING ENTITY EXTRACTION =====") # FORCE extraction for debugging if needed, but rely on logic try: with open("extraction_debug.log", "a") as f: f.write(f"[STATUS] Starting extraction logic...\n") except: pass query = state["input"].lower() # Extract providers provider_map = { "edelweiss": "Edelweiss Life", "tata": "TATA AIA", "tata aia": "TATA AIA", "generali": "Generali Central", "central": "Generali Central", "pramerica": "PRAMERICA" } providers = [] for keyword, name in provider_map.items(): if keyword in query and name not in providers: providers.append(name) # Extract insurance types type_map = { "term": ["Term Insurance", "Term Plan"], "ulip": ["Unit Linked Insurance Plan", "ULIP Plan"], "wealth": ["Unit Linked Insurance Plan"], "savings": ["Savings Plan", "Guaranteed Return"], "retirement": ["Retirement and Pension"], "pension": ["Retirement and Pension"], "health": ["Health Insurance"], "group": ["Group Plan"] } insurance_types = [] for keyword, types in type_map.items(): if keyword in query: for t in types: if t not in insurance_types: insurance_types.append(t) # Extract specific plan names using LLM plan_names = self._extract_plan_names_from_query(state["input"]) # Extract user profile (Merge with existing data in state AND chat history) existing_profile = state.get("extracted_entities", {}).get("user_profile", {}) history = state.get("chat_history", []) new_profile = {} # Always attempt extraction if it's a recommendation or if profile indicators exist profile_indicators = ["old", "male", "female", "year", "lakh", "rs", "budget", "premium", "invest", "benefit", "pt ", "ppt ", "mode", "age"] should_extract = any(ind in query for ind in profile_indicators) or state.get("intent") == "recommendation" print(f"[EXTRACTION DEBUG] Should extract: {should_extract}, Intent: {state.get('intent')}") try: with open("extraction_debug.log", "a") as f: f.write(f"[STATUS] Should extract: {should_extract}\n") except: pass if should_extract: new_profile = self._extract_user_profile(state["input"], history=history) print(f"[EXTRACTION DEBUG] Extracted profile: {new_profile}") try: with open("extraction_debug.log", "a") as f: f.write(f"[STATUS] Extracted profile: {new_profile}\n") except: pass # Merge: new data overwrites old, but old data is kept if not in new # IMPORTANT: Ensure keys with 'null' or empty values in new_profile do not overwrite valid existing data user_profile = existing_profile.copy() for k, v in new_profile.items(): if v is not None and v != "" and v != "null": user_profile[k] = v # Explicitly handle keys that often get dropped or overwritten incorrectly if "policy_term" in new_profile and str(new_profile["policy_term"]).strip(): user_profile["policy_term"] = new_profile["policy_term"] entities: ExtractedEntities = { "provider": list(set(providers)) if providers else [], "insurance_type": list(set(insurance_types)) if insurance_types else [], "plan_names": list(set(plan_names)) if plan_names else [], "user_profile": user_profile } # Build metadata filters from entities filters = {} if providers: filters["insurer"] = providers if insurance_types: filters["insurance_type"] = insurance_types try: with open("extraction_debug.log", "a") as f: f.write(f"[RESULT] Entities: {entities}\n") f.write(f"[RESULT] Profile: {user_profile}\n") except: pass print(f"[ENTITY DEBUG] Final entities: {entities}") result = { "extracted_entities": entities, "metadata_filters": filters } return result except Exception as e: try: with open("extraction_debug.log", "a") as f: f.write(f"[ERROR] {str(e)}\n") import traceback f.write(traceback.format_exc()) except: pass print(f"[ENTITY DEBUG] Error: {e}") import traceback traceback.print_exc() return { "extracted_entities": { "provider": [], "insurance_type": [], "plan_names": [], "user_profile": {} }, "metadata_filters": {} } def _extract_plan_names_from_query(self, query: str) -> List[str]: """Use LLM to extract specific plan names mentioned in query.""" llm = LLMFactory.get_llm("small") system_prompt = ( "Extract EXACT insurance plan names from the query.\n" "If the user is asking to compare, extract BOTH plan names.\n" "RULES:\n" "- Return one plan name per line\n" "- Include insurer prefix if mentioned (e.g., 'TATA AIA Smart Value Income', 'Edelweiss Saral Jeevan Bima')\n" "- Return EMPTY if no specific plan names found\n" "- Do NOT invent plan names" ) response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=query)]) result = getattr(response, 'content', str(response)).strip() # Validation: If LLM returns a sentence instead of names, skip it if "mentioned in" in result.lower() or "referring to" in result.lower() or len(result) > 200: return [] if not result or result.lower() in ['none', 'empty', 'n/a']: return [] # Parse response plan_names = [] for line in result.split('\n'): line = re.sub(r'^[\d\.\-\*\u2022]\s*', '', line).strip().strip('"\'') if len(line) > 5: plan_names.append(line) return plan_names def _extract_user_profile(self, query: str, history: List[str] = None) -> Dict[str, Any]: """Extract user profile information for recommendations, using history if available.""" profile = {} # ======================================================================== # PRIORITY 1: REGEX EXTRACTION (Most Reliable) # ======================================================================== # These patterns work with formats like: # "age=30", "age = 30", "age is 30", "I am 30 years old" query_lower = query.lower() # Age extraction age_patterns = [ r'\bage\s*[=:]\s*(\d+)', # age=30, age = 30, age: 30 r'\bage\s+is\s+(\d+)', # age is 30 r'i\s+am\s+(\d+)\s+years?\s+old', # I am 30 years old r'(\d+)\s+years?\s+old', # 30 years old r'\bage\s+(\d+)\b', # age 30 ] for pattern in age_patterns: match = re.search(pattern, query_lower) if match and not profile.get('age'): try: age = int(match.group(1)) if 18 <= age <= 100: # Expanded age range profile['age'] = age break except: pass # Gender extraction if 'gender' not in profile: if re.search(r'gender\s*[=:]\s*(male|m\b)', query_lower) or \ re.search(r'gender\s+is\s+(male|m\b)', query_lower) or \ re.search(r'\bmale\b', query_lower): profile['gender'] = 'male' elif re.search(r'gender\s*[=:]\s*(female|f\b)', query_lower) or \ re.search(r'gender\s+is\s+(female|f\b)', query_lower) or \ re.search(r'\bfemale\b', query_lower): profile['gender'] = 'female' # Premium extraction premium_patterns = [ r'premium\s*[=:]\s*([\d,\.]+)', # premium=100000.50 r'premium\s+(?:amount\s+)?(?:is\s+)?(?:of\s+)?([\d,\.]+)', r'invest(?:ing)?\s+([\d,\.]+)\s*(?:lakh|lac|cr|crore|k|thousand)?', r'([\d,\.]+)\s*(?:lakh|lac|cr|crore|k|thousand)\s+(?:per year|annual|premium)', r'budget\s*[=:]\s*([\d,\.]+)', ] def parse_indian_amount(text): """Parse amounts like '1 lakh', '5.5 cr', '100,000'""" if not text: return None text = text.lower().replace(',', '').strip() multiplier = 1 if 'lakh' in text or 'lac' in text: multiplier = 100000 elif 'cr' in text or 'crore' in text: multiplier = 10000000 elif 'k' in text: multiplier = 1000 # Find the number in the segment nums = re.findall(r'(\d+(?:\.\d+)?)', text) if nums: try: return int(float(nums[0]) * multiplier) except: return None return None for pattern in premium_patterns: match = re.search(pattern, query_lower) if match and not profile.get('premium_amount'): # Pass the matched segment to parser amount = parse_indian_amount(match.group(0)) if amount and 500 <= amount <= 50000000: profile['premium_amount'] = str(amount) break # Policy Term (PT) pt_patterns = [ r'\bpt\s*[=:]\s*(\d+)', r'\bpt\s+(\d+)\b', r'policy\s+term\s*[=:]\s*(\d+)', r'policy\s+term\s+(?:of\s+)?(\d+)', r'term\s*[=:]\s*(\d+)\b', ] for pattern in pt_patterns: match = re.search(pattern, query_lower) if match and not profile.get('policy_term'): pt = match.group(1) profile['policy_term'] = pt + " years" break # Payment Term (PPT) ppt_patterns = [ r'\bppt\s*[=:]\s*(\d+)', r'\bppt\s+(\d+)\b', r'(?:premium\s+)?payment\s+term\s*[=:]\s*(\d+)', r'paying\s+term\s*[=:]\s*(\d+)', r'pay\s+term\s*[=:]\s*(\d+)', ] for pattern in ppt_patterns: match = re.search(pattern, query_lower) if match and not profile.get('payment_term'): ppt = match.group(1) profile['payment_term'] = ppt + " years" break # Payment Mode mode_patterns = [ r'mode\s*[=:]\s*(monthly|annual|yearly|quarterly|half\s*yearly)', r'(?:premium\s+)?(?:payment\s+)?mode\s+(?:is\s+)?(monthly|annual|yearly|quarterly)', r'\b(monthly|annual|yearly|quarterly)\b', ] for pattern in mode_patterns: match = re.search(pattern, query_lower) if match and not profile.get('payment_mode'): mode = match.group(1).strip() if mode == 'yearly': mode = 'annual' profile['payment_mode'] = mode break # ======================================================================== # PRIORITY 2: LLM EXTRACTION (Fallback for complex cases) # ======================================================================== # Use LLM if critical fields are missing OR if it's a recommendation intent critical_fields = ['age', 'gender', 'premium_amount'] missing_critical = any(field not in profile for field in critical_fields) if missing_critical: llm = LLMFactory.get_llm("medium") history_context = "" if history: history_str = "\n".join([f"- {h}" for h in history[-5:]]) history_context = f"\n\nCONVERSATION HISTORY:\n{history_str}" system_prompt = ( "Extract user profile details for insurance recommendations.\n" "JSON Output fields (use null if unknown):\n" "- age (number)\n" "- gender (male/female)\n" "- premium_amount (number)\n" "- policy_term (number of years)\n" "- payment_term (number of years)\n" "- payment_mode (Monthly/Annual/Quarterly/Half-Yearly)\n\n" "MAPPING RULES:\n" "- PT = policy_term\n" "- PPT = payment_term\n" "- mode = payment_mode\n" "- Extract from latest query AND history. Latest query wins conflicts.\n" "Return ONLY a raw JSON object." ) prompt = f"LATEST QUERY: {query}{history_context}" try: response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)]) result_text = getattr(response, 'content', str(response)) # Try to parse JSON try: # Clean the response in case LLM added markdown blocks clean_json = re.search(r'\{.*\}', result_text, re.DOTALL) if clean_json: llm_profile = json.loads(clean_json.group(0)) # Merge LLM results into profile if regex didn't find them if 'age' not in profile and llm_profile.get('age'): profile['age'] = int(llm_profile['age']) if 'gender' not in profile and llm_profile.get('gender'): profile['gender'] = llm_profile['gender'].lower() if 'premium_amount' not in profile and llm_profile.get('premium_amount'): profile['premium_amount'] = str(llm_profile['premium_amount']) if 'policy_term' not in profile and llm_profile.get('policy_term'): profile['policy_term'] = str(llm_profile['policy_term']) + " years" if 'payment_term' not in profile and llm_profile.get('payment_term'): profile['payment_term'] = str(llm_profile['payment_term']) + " years" if 'payment_mode' not in profile and llm_profile.get('payment_mode'): profile['payment_mode'] = llm_profile['payment_mode'].title().replace('Annual', 'annual').lower() except: # Fallback to line-based parsing if JSON fails for line in result_text.split('\n'): if ':' in line: parts = line.split(':', 1) k = parts[0].strip().lower() v = parts[1].strip().lower().replace('"', '').replace("'", "") if v and v != 'null': if 'age' in k and 'age' not in profile: nums = re.findall(r'\d+', v) if nums: profile['age'] = int(nums[0]) elif 'gender' in k and 'gender' not in profile: profile['gender'] = v elif 'premium' in k and 'premium_amount' not in profile: profile['premium_amount'] = v elif 'policy_term' in k or 'pt' == k and 'policy_term' not in profile: profile['policy_term'] = v + " years" elif 'payment_term' in k or 'ppt' == k and 'payment_term' not in profile: profile['payment_term'] = v + " years" except Exception as e: print(f"[WARNING] LLM extraction failed: {e}") return profile # ========================================================================= # NODE 4: Retrieval Router # ========================================================================= def retrieval_router_node(self, state: AgentState) -> Dict[str, Any]: """ Determines retrieval strategy based on intent. """ intent = state.get("intent", "plan_details") strategy_map = { "list_plans": "metadata_only", "plan_details": "plan_level", "compare_plans": "cross_plan", "recommendation": "section_specific", "general_query": "plan_level" } return {"retrieval_strategy": strategy_map.get(intent, "plan_level")} # ========================================================================= # NODE 5: Retriever # ========================================================================= def retriever_node(self, state: AgentState) -> Dict[str, Any]: """ Retrieves documents with: - Metadata filtering - CIS boosting for exclusions/charges/conditions - Deduplication by similarity """ retriever = self._get_retriever() if not retriever: return {"context": [], "retrieved_chunks": {}} query = state["input"] filters = state.get("metadata_filters", {}) entities = state.get("extracted_entities", {}) strategy = state.get("retrieval_strategy", "plan_level") # If specific plan names were extracted, use them for precise retrieval plan_names = entities.get("plan_names") or [] matched_plans = [] if plan_names: # Resolve to actual plan names in index all_plans = self._list_plans_from_index() for name in plan_names: match = self._find_closest_plan_name(name, all_plans) if match: matched_plans.append(match) # Update filters for non-comparison queries (for comparison, _retrieve_for_comparison handles it) if matched_plans and strategy != "cross_plan": filters = filters.copy() filters["product_name"] = matched_plans[0] if len(matched_plans) == 1 else matched_plans boost_cis = any(kw in query.lower() for kw in ["exclusion", "excluded", "not covered", "charges", "fee", "condition", "waiting"]) # Retrieve documents if strategy == "cross_plan": # For comparisons, retrieve for each plan separately # Pass matched_plans if we have them, otherwise it will try to find them from filters chunks_by_plan = self._retrieve_for_comparison(query, filters, entities, matched_plans=matched_plans) else: docs = retriever.search(query, filters=filters, k=8) chunks_by_plan = self._group_by_plan_id(docs) # Boost CIS documents if needed if boost_cis: chunks_by_plan = self._boost_cis_chunks(chunks_by_plan) # Format context strings limit_per_plan = 5 if strategy == "cross_plan" else 3 context = self._format_context(chunks_by_plan, limit=limit_per_plan) return { "context": context, "retrieved_chunks": chunks_by_plan } def _format_context(self, chunks_by_plan: Dict[str, List[Dict]], limit: int = 3) -> List[str]: """Helper to format chunks into LLM-readable context strings.""" context = [] for plan_id, chunks in chunks_by_plan.items(): for chunk in chunks[:limit]: content = chunk.get("content", "") plan_name = chunk.get("product_name", "Unknown") doc_type = chunk.get("document_type", "brochure") section = chunk.get("section", "General") context.append(f"[{plan_name} - {doc_type.upper()} - {section}] {content}") return context def _retrieve_for_comparison(self, query: str, filters: Dict, entities: Dict, matched_plans: List[str] = None) -> Dict[str, List]: """Retrieve chunks for each plan separately in comparison mode.""" retriever = self._get_retriever() if not retriever: return {} if not matched_plans: plan_names = entities.get("plan_names") or [] all_index_plans = self._list_plans_from_index() matched_plans = [] for name in plan_names: match = self._find_closest_plan_name(name, all_index_plans) if match: matched_plans.append(match) if not matched_plans: # Plan A: Deterministic "List & Match" Discovery # For each provider, list all their plans and see if any match the query providers = entities.get("provider") or [] if not providers: search_providers = [None] else: search_providers = providers discovered_names = [] all_plans_in_index = self._list_plans_from_index() for prov in search_providers: prov_filter = {"insurer": prov} if prov else {} prov_plans = self._list_plans_from_index(filters=prov_filter) self._log_debug(f"Provider: {prov}, Plans found: {len(prov_plans)}") # Try to find which plan from this insurer is mentioned in the query match = self._find_closest_plan_name(query, prov_plans) self._log_debug(f"Match for {prov}: {match} (In list: {match in prov_plans})") if match and match in prov_plans and match not in discovered_names: discovered_names.append(match) matched_plans = discovered_names if not matched_plans: # Plan B: Fall back to broad similarity-based discovery as a last resort discovery_docs = retriever.search(query, k=20) for d in discovery_docs: p_name = d.metadata.get("product_name") if p_name and p_name not in matched_plans: matched_plans.append(p_name) matched_plans = matched_plans[:3] if not matched_plans: # Plan B: Fall back to listing plans matching filters (metadata-only) matched_plans = self._list_plans_from_index(filters)[:5] chunks_by_plan = defaultdict(list) for matched in matched_plans: # Use a focused query for each plan instead of the broad comparison query # This helps the retriever find relevant feature chunks for the specific plan focused_query = f"features, benefits, eligibility and exclusions of {matched}" # Find the insurer for this product from cache for better filtering matched_insurer = None if hasattr(self, "_cached_plans") and self._cached_plans: for p_meta in self._cached_plans: if p_meta["product_name"] == matched: matched_insurer = p_meta.get("insurer") break # IMPORTANT: Search by product_name directly if possible search_filters = {"product_name": matched} if matched_insurer: search_filters["insurer"] = matched_insurer # Use a slightly lower k because we are being very specific with the filter docs = retriever.search(focused_query, filters=search_filters, k=20) plan_chunks = [] for doc in docs: doc_product = doc.metadata.get("product_name", "") # Final check for safety, but with accurate fuzzy matching if self._find_closest_plan_name(doc_product, [matched]) == matched: plan_chunks.append(doc) for doc in plan_chunks[:8]: # Use product_name for the key instead of plan_id to ensure clean table headers plan_name = doc.metadata.get("product_name", matched) chunks_by_plan[plan_name].append({ "content": doc.page_content, "product_name": doc.metadata.get("product_name"), "document_type": doc.metadata.get("document_type", "brochure"), "section": doc.metadata.get("section", "General") }) return dict(chunks_by_plan) def _group_by_plan_id(self, docs: List) -> Dict[str, List]: """Group retrieved documents by plan_id.""" grouped = defaultdict(list) for doc in docs: # Prefer product_name for display keys plan_name = doc.metadata.get("product_name", doc.metadata.get("plan_id", "unknown")) grouped[plan_name].append({ "content": doc.page_content, "product_name": doc.metadata.get("product_name"), "document_type": doc.metadata.get("document_type", "brochure"), "section": doc.metadata.get("section", "General") }) return dict(grouped) def _boost_cis_chunks(self, chunks_by_plan: Dict[str, List]) -> Dict[str, List]: """Boost CIS documents to appear first for each plan.""" boosted = {} for plan_id, chunks in chunks_by_plan.items(): cis_chunks = [c for c in chunks if c.get("document_type") == "cis"] brochure_chunks = [c for c in chunks if c.get("document_type") != "cis"] boosted[plan_id] = cis_chunks + brochure_chunks return boosted # ========================================================================= # NODE 6: Plan Aggregator # ========================================================================= def plan_aggregator_node(self, state: AgentState) -> Dict[str, Any]: """ Aggregates chunks by plan_id, merging brochure and CIS context. CIS overrides brochure for exclusions, charges, conditions. """ chunks_by_plan = state.get("retrieved_chunks", {}) # Already grouped, just ensure proper ordering aggregated = {} for plan_id, chunks in chunks_by_plan.items(): # Separate by document type cis_chunks = [c for c in chunks if c.get("document_type") == "cis"] brochure_chunks = [c for c in chunks if c.get("document_type") != "cis"] # For exclusions/charges sections, prefer CIS override_sections = ["Exclusions", "Charges", "Waiting Period", "Conditions"] final_chunks = [] covered_sections = set() # Add CIS chunks first for override sections for chunk in cis_chunks: section = chunk.get("section", "General") if section in override_sections: final_chunks.append(chunk) covered_sections.add(section) # Add brochure chunks, skipping overridden sections for chunk in brochure_chunks: section = chunk.get("section", "General") if section not in covered_sections: final_chunks.append(chunk) # Add remaining CIS chunks for chunk in cis_chunks: if chunk not in final_chunks: final_chunks.append(chunk) aggregated[plan_id] = final_chunks # Refresh context strings based on aggregated chunks intent = state.get("intent", "compare_plans") limit = 5 if intent == "compare_plans" else 3 context = self._format_context(aggregated, limit=limit) return { "retrieved_chunks": aggregated, "context": context } # ========================================================================= # NODE 7: Listing Agent # ========================================================================= def listing_agent(self, state: AgentState) -> Dict[str, Any]: """ Lists available plans based on filters. Uses direct index access for accuracy. """ llm = LLMFactory.get_llm("small") query = state["input"] filters = state.get("metadata_filters", {}) plans = self._list_plans_from_index(filters) plans = sorted(list(set(plans))) if not plans: filter_desc = ", ".join([str(v) for v in filters.values()]) if filters else "your criteria" answer = f"I couldn't find any plans matching {filter_desc}. Please try a different search." return {"context": [], "answer": answer} plans_str = "\n".join([f"- {p}" for p in plans]) # Describe the filters filter_parts = [] if filters.get("insurer"): insurer_list = filters["insurer"] if isinstance(filters["insurer"], list) else [filters["insurer"]] filter_parts.append(f"from {', '.join(insurer_list)}") if filters.get("insurance_type"): type_list = filters["insurance_type"] if isinstance(filters["insurance_type"], list) else [filters["insurance_type"]] filter_parts.append(f"in {', '.join(type_list)} category") filter_desc = " ".join(filter_parts) if filter_parts else "" system_prompt = ( "Present the following insurance plans in a clear, friendly manner.\n" "RULES:\n" "- ONLY include plans from the list below\n" "- Group by insurer if multiple insurers present\n" "- Use bullet points for clarity\n" "- Do NOT mention technical details about data retrieval" ) prompt = f"User asked: {query}\n\nAvailable plans {filter_desc}:\n{plans_str}" response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)]) answer = getattr(response, 'content', str(response)) return {"context": [f"Plans: {plans}"], "answer": answer} # ========================================================================= # NODE 8: Plan Details Agent (Retrieval Agent) # ========================================================================= def retrieval_agent(self, state: AgentState) -> Dict[str, Any]: """ Agent for answering plan-specific or comparison questions using retrieved context. """ complexity = state.get("query_complexity", "low") llm = LLMFactory.get_llm(complexity) query = state["input"] context = state.get("context", []) entities = state.get("extracted_entities", {}) if not context: # Fallback retrieval with better filtering retriever = self._get_retriever() if retriever: # Try to extract plan names from query for better filtering plan_names = entities.get("plan_names", []) filters = state.get("metadata_filters", {}) # If we have plan names, use them for filtering if plan_names: filters["product_name"] = plan_names # Retrieve with filters if filters: docs = retriever.search(query, filters=filters, k=10) else: docs = retriever.search(query, k=10) # Format context with plan names context = [f"[{d.metadata.get('product_name', 'Unknown')}] {d.page_content}" for d in docs] # If still no context, provide a helpful message if not context: return { "answer": "I couldn't find specific information about that plan in my knowledge base. " "Could you please provide more details or try asking about a different plan? " "You can also ask me to list available plans." } context_str = "\n\n".join(context) system_prompt = f"""You are an Insurance Policy Specialist providing accurate information. {COMPLIANCE_RULES} STRICT GROUNDING RULES: - Answer the user's question using the Policy Context provided to you. - If the requested plan is NOT mentioned in the Policy Context, say: "I'm sorry, but I couldn't find information regarding [Plan Name] in our current policy database. Please verify the name or ask me to list available plans." - If the question is about non-insurance topics, refuse using the OUT-OF-BOUNDS REFUSAL rule. - Structure your response with clear headings and bullet points. """ prompt = f"Policy Context:\n{context_str}\n\nUser Question: {query}" response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)]) answer = getattr(response, 'content', str(response)) return {"answer": answer} # ========================================================================= # NODE 9: Recommendation Agent (Advisory) # ========================================================================= def advisory_agent(self, state: AgentState) -> Dict[str, Any]: """ Provides personalized recommendations based on user profile. Grounds all advice in retrieved documents. If critical info (age/gender/premium) is missing for specific plans, asks for it. """ llm = LLMFactory.get_llm("large") query = state["input"] context = state.get("context", []) entities = state.get("extracted_entities", {}) user_profile = entities.get("user_profile", {}) # Check for Insurer and Guaranteed/Savings context providers = entities.get("provider", []) is_guaranteed = any(t in ["Savings Plan", "Guaranteed Return"] for t in entities.get("insurance_type", [])) is_rec = state.get("intent") == "recommendation" # Only block and ask for info IF the intent is explicitly a recommendation/calculation if is_rec: print(f"[ADVISORY DEBUG] Full entities: {entities}") print(f"[ADVISORY DEBUG] User profile: {user_profile}") missing = [] if not user_profile.get("age"): missing.append("age") if not user_profile.get("gender"): missing.append("gender") if not user_profile.get("premium_amount"): missing.append("annual premium amount") if not user_profile.get("policy_term"): missing.append("policy term (PT)") if not user_profile.get("payment_term"): missing.append("premium payment term (PPT)") if not user_profile.get("payment_mode"): missing.append("premium payment mode") print(f"[ADVISORY DEBUG] Missing fields check:") for field in ["age", "gender", "premium_amount", "policy_term", "payment_term", "payment_mode"]: value = user_profile.get(field) print(f" - {field}: {value} (truthy: {bool(value)})") print(f"[ADVISORY DEBUG] Final missing list: {missing}") # Block and ask for info for professional consultation if missing: missing_str = " and ".join([", ".join(missing[:-1]), missing[-1]] if len(missing) > 1 else missing) return {"answer": f"To provide you with specific benefit figures and a professional recommendation, I need a few more details: **{missing_str}**. Could you please provide these?"} # If we have everything, get the numbers calc_result = self.plan_calculator_tool(state) state["reasoning_output"] = calc_result.get("reasoning_output", "") else: # If not a recommendation intent, check if we have enough profile data to show numbers anyway # (e.g., if user asks about a specific plan but we already know their profile) if user_profile.get("age") and user_profile.get("premium_amount") and user_profile.get("policy_term"): calc_result = self.plan_calculator_tool(state) state["reasoning_output"] = calc_result.get("reasoning_output", "") calculation_info = "" raw_calc = state.get('reasoning_output', '') if raw_calc: try: calc_json = json.loads(raw_calc) table = calc_json.get("summary_table", "") if table: calculation_info = f"\n\n### MANDATORY GROUNDING: NUMERICAL DATA TABLE\n{table}\n(PRIORITIZE THESE PLANS AND NUMBERS OVER ANY TEXT BELOW)\n" except: pass context_str = "\n\n".join(context) if context else "No plans found." profile_info = "" if user_profile: profile_parts = [f"{k}: {v}" for k, v in user_profile.items() if v] if profile_parts: profile_info = f"\n\nUser Profile: {', '.join(profile_parts)}" system_prompt = f"""You are an Expert Insurance Advisor. {COMPLIANCE_RULES} RECOMMENDATION RULES: - 🚨 PRIORITY 1: Recommending plans from the 'MANDATORY GROUNDING' table above. Use those EXACT numbers. - 🚨 PRIORITY 2: Only provide benefit calculations for the plans in the GROUNDING table. - If the user asks about plans not in the table for calculation, say you don't have calculation data for them yet. - If the query is out-of-bounds, use the OUT-OF-BOUNDS REFUSAL rule. - NEVER say "Not Available" if numbers exist in the grounding table. - Be consultative and grounded. """ prompt = f"{calculation_info}\n\nPolicy Context:\n{context_str}{profile_info}\n\nUser Question: {query}" response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)]) answer = getattr(response, 'content', str(response)) return {"answer": answer} # ========================================================================= # NODE 11: General Query Agent (FAQ) # ========================================================================= def faq_agent(self, state: AgentState) -> Dict[str, Any]: """ Agent for general insurance questions (glossary, concepts). """ llm = LLMFactory.get_llm("low") query = state["input"] context = state.get("context", []) # Try to retrieve context for general insurance terms if not already provided if not context: retriever = self._get_retriever() if retriever: # Use broader search for general queries docs = retriever.search(query, k=3) # Reduced from 5 to 3 for more focused context if docs: context = [d.page_content for d in docs] context_str = "\n\n".join(context) if context else "" system_prompt = f"""You are an Insurance Helpdesk Assistant. {COMPLIANCE_RULES} INSTRUCTIONS: - For insurance terminology: Provide a clear, concise definition. - 🚨 STRICT RULE: If the user asks about ANYTHING non-insurance related (e.g., travel tickets, cooking, etc.), you MUST refuse and redirect to insurance topics. - 🚨 NO HALLUCINATION: If the term is not common insurance knowledge and not in context, say you don't know rather than guessing. - Keep the total response under 150 words. Common Insurance Terms to use as reference: - **Policy Term (PT)**: The total duration for which the policy remains active. - **Premium Payment Term (PPT)**: The duration during which premiums must be paid. - **Maturity Benefit**: The lump sum amount paid when the policy matures. - **Sum Assured**: The guaranteed amount payable on death or maturity. """ prompt = f"Context (if relevant):\n{context_str}\n\nUser Question: {query}" if context_str else f"User Question: {query}" response = llm.invoke([SystemMessage(content=system_prompt), HumanMessage(content=prompt)]) answer = getattr(response, 'content', str(response)) return {"answer": answer} # ========================================================================= # NODE 12: Guardrail # ========================================================================= def guardrail_node(self, state: AgentState) -> Dict[str, Any]: """ Final validation and compliance disclaimer. - Validates answer is grounded - Adds compliance disclaimer - Blocks hallucinated content """ answer = state.get("answer", "") if not answer: answer = "I apologize, but I couldn't generate a response. Please try rephrasing your question." # Add compliance disclaimer if COMPLIANCE_DISCLAIMER not in answer: answer = answer + COMPLIANCE_DISCLAIMER return {"answer": answer} # ========================================================================= # TOOL: Plan Calculator Tool # ========================================================================= def plan_calculator_tool(self, state: AgentState) -> Dict[str, Any]: """ Tool logic to calculate benefits using the API's dummy logic. Extremely robust extraction fallback for age, gender, and premium. """ from api.plans import get_plan_benefits_tool, resolve_plan_id user_profile = state.get("extracted_entities", {}).get("user_profile", {}) plan_names = state.get("extracted_entities", {}).get("plan_names", []) query = state["input"].lower() # --- ROBUST FALLBACKS --- # 1. Age Fallback age = user_profile.get("age") if not age: age_match = re.search(r'\b(\d{2})\b\s*(?:year|yr|old|male|female)?', query) if age_match: age = int(age_match.group(1)) # 2. Gender Fallback gender = user_profile.get("gender") if not gender: if "male" in query and "female" not in query: gender = "male" elif "female" in query: gender = "female" # 3. Premium Fallback premium = user_profile.get("premium_amount") clean_premium = 0.0 if not premium: # Look for any number followed by a potential unit prem_match = re.search(r'(\d+(?:\.\d+)?)\s*(?:rs\.?|inr|lakh|cr|k|thousand)?', query) if prem_match: val = float(prem_match.group(1)) unit_search = query[prem_match.start():prem_match.end()+20] # look ahead if 'lakh' in unit_search: val *= 100000 elif 'cr' in unit_search: val *= 10000000 elif any(k in unit_search for k in ['k', 'thousand']): val *= 1000 clean_premium = val else: try: if isinstance(premium, (int, float)): clean_premium = float(premium) else: nums = re.findall(r'\d+\.?\d*', str(premium)) if nums: clean_premium = float(nums[0]) if 'lakh' in str(premium).lower(): clean_premium *= 100000 elif 'cr' in str(premium).lower(): clean_premium *= 10000000 except: pass if not (age and gender and clean_premium > 0): return {"reasoning_output": "Insufficient data (age, gender, or premium) to calculate benefits."} # 4. Resolve Plan IDs pids = [] for name in plan_names: pid = resolve_plan_id(name) if pid: pids.append(pid) # If no specific plan found, calculate for ALL default plans target_plan_id = pids[0] if len(pids) == 1 else None # 5. Execute Tool calculation_json = get_plan_benefits_tool( age=int(age), gender=str(gender), premium_amount=clean_premium, plan_id=target_plan_id, policy_term=user_profile.get("policy_term"), payment_term=user_profile.get("payment_term"), payment_mode=user_profile.get("payment_mode") ) return {"reasoning_output": calculation_json} # ========================================================================= # HELPER METHODS # ========================================================================= def _list_plans_from_index(self, filters: Dict = None) -> List[str]: """Returns unique product names matching filters. Optimized with caching.""" retriever = self._get_retriever() if not retriever: return [] try: # Use a simple cache attribute on the instance if it doesn't exist if not hasattr(self, "_cached_plans") or self._cached_plans is None: store = retriever.vector_store plans_metadata = [] for doc in store.docstore._dict.values(): p_name = doc.metadata.get('product_name') insurer = doc.metadata.get('insurer') i_type = doc.metadata.get('insurance_type') if p_name: plans_metadata.append({ "product_name": p_name, "insurer": insurer, "insurance_type": i_type }) self._cached_plans = plans_metadata # Filter from cache plans = set() for meta in self._cached_plans: if filters: match = True for k, v in filters.items(): doc_val = str(meta.get(k, "")).lower().strip() if not doc_val: match = False break # Standardize filter values to list of lowercase strings filter_values = v if isinstance(v, list) else [v] filter_values = [str(fv).lower().strip() for fv in filter_values] # Robust match: any filter item matches or is matched by doc_val val_match = False for fv in filter_values: if k == "product_name": if fv in doc_val or doc_val in fv: val_match = True break elif k == "insurer": # Strictly match insurer names if fv == doc_val: val_match = True break else: # For other keys like insurance_type, allow exact match if fv == doc_val: val_match = True break if not val_match: match = False break if not match: continue plans.add(meta["product_name"]) return sorted(list(plans)) except Exception: return [] def _find_closest_plan_name(self, query_plan: str, all_plans: List[str]) -> Optional[str]: """Finds closest matching plan name using fuzzy matching.""" if not all_plans: return query_plan def normalize(s): return s.lower().replace(" ", "").replace("-", "").replace("_", "").replace("edelweisslife", "edelweiss") query_norm = normalize(query_plan) # 1. Exact match (case insensitive) for plan in all_plans: if plan.lower() == query_plan.lower(): return plan # 2. Normalized containment match (High Confidence) # Check if the plan name is mentioned in the query for plan in all_plans: plan_norm = normalize(plan) if plan_norm in query_norm or query_norm in plan_norm: return plan # 3. Word overlap (Lower Confidence fallback) query_words = set(query_plan.lower().split()) # REMOVED insurer names from stop_words because they are critical for distinguishing # similar plan names (like 'Saral Jeevan Bima') across different companies. stop_words = {"plan", "insurance", "the", "a", "of", "with", "compare", "is", "between"} query_significant = query_words - stop_words best_match = None max_overlap = 0 for plan in all_plans: plan_words = set(plan.lower().split()) plan_significant = plan_words - stop_words # Count significant word overlap overlap = len(query_significant.intersection(plan_significant)) if overlap > max_overlap: max_overlap = overlap best_match = plan # Return best match if we found significant overlap (at least 2 words) return best_match if max_overlap >= 2 else query_plan # Singleton instance nodes = AgentNodes()