""" GUVI Hackathon Final Result Callback Module. Implements the mandatory callback to GUVI's evaluation endpoint as specified in the problem statement. Requirement: "Once the system detects scam intent and the AI Agent completes the engagement, participants must send the final extracted intelligence to the GUVI evaluation endpoint." Callback Endpoint: POST https://hackathon.guvi.in/api/updateHoneyPotFinalResult """ import requests from typing import Dict, List, Optional from datetime import datetime from app.config import settings from app.utils.logger import get_logger logger = get_logger(__name__) # Default GUVI callback URL DEFAULT_GUVI_CALLBACK_URL = "https://hackathon.guvi.in/api/updateHoneyPotFinalResult" def identify_red_flags(messages: List[Dict]) -> List[str]: """ Identify explicit red flags from scammer messages. Returns a list of identified red flags for scoring. GUVI Doc: "Red Flag Identification | 8 pts | >=5 flags = 8pts" Args: messages: List of conversation messages Returns: List of identified red flag descriptions """ red_flags: List[str] = [] scammer_messages = [ m.get("message", "") for m in messages if m.get("sender") == "scammer" ] full_text_lower = " ".join(scammer_messages).lower() full_text_raw = " ".join(scammer_messages) # Red flag categories with specific patterns red_flag_patterns = { "Urgency/Time Pressure": [ "urgent", "immediately", "now", "today", "hurry", "quick", "fast", "expire", "last chance", "limited time", "deadline", "turant", "jaldi", "abhi", "foran", ], "Authority Impersonation": [ "police", "court", "government", "bank official", "rbi", "investigation", "arrest", "legal", "warrant", "department", "officer", "inspector", "commissioner", ], "Account/Service Threat": [ "block", "suspend", "deactivate", "freeze", "seize", "terminate", "close", "disable", "restrict", ], "OTP/Credential Request": [ "otp", "password", "pin", "cvv", "verify", "confirm", "share otp", "send otp", "tell otp", ], "Prize/Lottery Lure": [ "won", "winner", "prize", "lottery", "jackpot", "lucky", "congratulations", "reward", "selected", "chosen", ], "Payment/Fee Demand": [ "processing fee", "transfer fee", "tax", "charges", "pay first", "send money", "registration fee", ], "Suspicious Link": [ "http://", "https://", "click here", "click link", "www.", ".xyz", ".tk", "bit.ly", "tinyurl", ], "KYC/Document Request": [ "kyc", "aadhaar", "pan card", "pan number", "update kyc", "verify identity", "link expired", ], "False Urgency Claim": [ "within 24 hours", "within 1 hour", "today only", "expires today", "last warning", "final notice", ], "Impersonation of Known Entity": [ "sbi", "hdfc", "icici", "axis", "rbi", "amazon", "flipkart", "paytm", "phonepe", "gpay", ], } for flag_name, patterns in red_flag_patterns.items(): for pattern in patterns: if pattern in full_text_lower or pattern in full_text_raw: if flag_name not in red_flags: red_flags.append(flag_name) break return red_flags def count_elicitation_attempts(messages: List[Dict]) -> int: """ Count the number of elicitation attempts made by the agent. GUVI Doc: "Information Elicitation | 7 pts | Each elicitation attempt earns 1.5pts (max 7)" Max 5 attempts for full 7 points (5 * 1.5 = 7.5, capped at 7). Elicitation = asking questions to extract scammer's financial details. Args: messages: List of conversation messages Returns: Number of elicitation attempts detected """ elicitation_patterns = [ # Direct questions for financial details r"upi\s*(id)?[\?\s]", r"phone\s*(number)?[\?\s]", r"account\s*(number)?[\?\s]", r"ifsc[\?\s]", r"bank\s*(details|account)[\?\s]", r"what.{0,20}(upi|phone|number|account|ifsc)", r"give.{0,15}(upi|phone|number|account|ifsc)", r"tell.{0,15}(upi|phone|number|account|ifsc)", r"send.{0,15}(upi|phone|number|account|details)", r"share.{0,15}(upi|phone|number|account|details)", # Questions ending with ? r"where.{0,30}\?", r"what.{0,30}\?", r"how.{0,30}\?", r"which.{0,30}\?", # Hindi/Hinglish elicitation r"kya\s*hai", r"batao", r"bolo", r"dijiye", r"bhejo", ] import re agent_messages = [ m.get("message", "") for m in messages if m.get("sender") == "agent" ] count = 0 for msg in agent_messages: msg_lower = msg.lower() for pattern in elicitation_patterns: if re.search(pattern, msg_lower): count += 1 break # Count each message only once return min(count, 5) # Cap at 5 for max 7 points def generate_agent_notes( messages: List[Dict], extracted_intel: Dict, scam_indicators: List[str], ) -> str: """ Generate a detailed summary of scammer behavior for agent notes. Produces a law-enforcement-friendly summary covering: - Identified red flags (explicitly enumerated for scoring) - Identified scam type - Tactics used (urgency, threats, impersonation, etc.) - Elicitation attempts count - Extracted intelligence summary - Conversation depth Args: messages: List of conversation messages extracted_intel: Extracted intelligence dictionary scam_indicators: List of detected scam indicators/keywords Returns: Agent notes string with explicit red flag enumeration for GUVI scoring """ notes_parts: List[str] = [] scammer_messages = [ m.get("message", "") for m in messages if m.get("sender") == "scammer" ] full_scammer_text = " ".join(scammer_messages).lower() full_scammer_raw = " ".join(scammer_messages) # ---- Red Flags (explicitly enumerated for scoring) ---- red_flags = identify_red_flags(messages) if red_flags: flags_str = ", ".join(f"[{i+1}] {flag}" for i, flag in enumerate(red_flags)) notes_parts.append(f"RED FLAGS DETECTED ({len(red_flags)}): {flags_str}") # ---- Elicitation attempts ---- elicitation_count = count_elicitation_attempts(messages) if elicitation_count > 0: notes_parts.append(f"ELICITATION ATTEMPTS: {elicitation_count} direct questions asked to extract scammer details") # ---- Scam type identification ---- scam_type = identify_scam_type(full_scammer_text, full_scammer_raw) if scam_type: notes_parts.append(f"Scam type: {scam_type}") # ---- Tactic detection ---- urgency_words = [ "urgent", "immediately", "now", "today", "hurry", "quick", "fast", "expire", "last chance", "turant", "jaldi", "abhi", "\u0924\u0941\u0930\u0902\u0924", "\u091c\u0932\u094d\u0926\u0940", ] if any(w in full_scammer_text or w in full_scammer_raw for w in urgency_words): notes_parts.append("Used urgency tactics to pressure victim") authority_words = [ "police", "court", "government", "bank official", "rbi", "investigation", "arrest", "legal", "warrant", "department", "\u092a\u0941\u0932\u093f\u0938", "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", ] if any(w in full_scammer_text or w in full_scammer_raw for w in authority_words): notes_parts.append("Attempted authority/official impersonation") prize_words = [ "won", "winner", "prize", "lottery", "jackpot", "lucky", "congratulations", "reward", "jeeta", "jeet", "inaam", "\u091c\u0940\u0924\u093e", "\u0907\u0928\u093e\u092e", ] if any(w in full_scammer_text or w in full_scammer_raw for w in prize_words): notes_parts.append("Used prize/lottery lure") payment_words = [ "upi", "transfer", "send money", "pay", "account number", "bank details", "paise bhejo", "transfer karo", "\u092a\u0948\u0938\u0947 \u092d\u0947\u091c\u094b", ] if any(w in full_scammer_text or w in full_scammer_raw for w in payment_words): notes_parts.append("Attempted payment/money redirection") credential_words = [ "otp", "password", "pin", "cvv", "verify", "confirm", "otp bhejo", "verify karo", "\u0913\u091f\u0940\u092a\u0940", ] if any(w in full_scammer_text or w in full_scammer_raw for w in credential_words): notes_parts.append("Attempted OTP/credential harvesting") threat_words = [ "block", "suspend", "deactivate", "arrest", "fine", "penalty", "legal action", "case file", "fir", "\u092c\u094d\u0932\u0949\u0915", "\u092c\u0902\u0926", ] if any(w in full_scammer_text or w in full_scammer_raw for w in threat_words): notes_parts.append("Used threat/fear tactics") kyc_words = ["kyc", "aadhaar", "pan card", "pan number", "link expired", "update kyc"] if any(w in full_scammer_text for w in kyc_words): notes_parts.append("Used KYC/document verification lure") loan_words = ["loan approved", "pre-approved", "emi", "interest rate", "loan offer"] if any(w in full_scammer_text for w in loan_words): notes_parts.append("Used fake loan/credit offer") delivery_words = ["delivery failed", "customs", "parcel", "courier", "shipment"] if any(w in full_scammer_text for w in delivery_words): notes_parts.append("Used fake delivery/parcel scam") # ---- Intelligence summary ---- intel_items: List[str] = [] if extracted_intel.get("upi_ids"): items = extracted_intel["upi_ids"] intel_items.append(f"{len(items)} UPI ID(s): {', '.join(items[:3])}") if extracted_intel.get("bank_accounts"): items = extracted_intel["bank_accounts"] intel_items.append(f"{len(items)} bank account(s)") if extracted_intel.get("ifsc_codes"): items = extracted_intel["ifsc_codes"] intel_items.append(f"{len(items)} IFSC code(s): {', '.join(items[:3])}") if extracted_intel.get("phone_numbers"): items = extracted_intel["phone_numbers"] intel_items.append(f"{len(items)} phone number(s): {', '.join(items[:3])}") if extracted_intel.get("phishing_links"): items = extracted_intel["phishing_links"] intel_items.append(f"{len(items)} phishing link(s)") if extracted_intel.get("email_addresses"): items = extracted_intel["email_addresses"] intel_items.append(f"{len(items)} email address(es): {', '.join(items[:3])}") if extracted_intel.get("case_ids"): items = extracted_intel["case_ids"] intel_items.append(f"{len(items)} case/reference ID(s): {', '.join(items[:3])}") if extracted_intel.get("policy_numbers"): items = extracted_intel["policy_numbers"] intel_items.append(f"{len(items)} policy number(s): {', '.join(items[:3])}") if extracted_intel.get("order_numbers"): items = extracted_intel["order_numbers"] intel_items.append(f"{len(items)} order/transaction ID(s): {', '.join(items[:3])}") if intel_items: notes_parts.append(f"Extracted intelligence: {'; '.join(intel_items)}") # ---- Conversation depth ---- total_turns = len(scammer_messages) if total_turns > 0: notes_parts.append(f"Conversation depth: {total_turns} scammer message(s) analyzed") if notes_parts: return ". ".join(notes_parts) + "." return "Scam engagement completed. Limited intelligence extracted." def identify_scam_type(text_lower: str, text_raw: str = "") -> Optional[str]: """ Identify the primary scam type from scammer text. Returns a human-readable scam type label or None if unknown. """ # Order matters: more specific checks first if any(w in text_lower for w in ["kyc", "aadhaar", "pan card", "update kyc"]): return "KYC/Document Verification Fraud" if any(w in text_lower for w in ["loan approved", "pre-approved", "emi", "loan offer"]): return "Fake Loan/Credit Offer" if any(w in text_lower for w in ["delivery failed", "customs", "parcel", "courier"]): return "Fake Delivery/Parcel Scam" if any(w in text_lower for w in ["won", "winner", "prize", "lottery", "jackpot"]): return "Prize/Lottery Scam" if any(w in text_lower for w in [ "police", "arrest", "warrant", "court", "legal action", "investigation", "\u092a\u0941\u0932\u093f\u0938", "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", ]) or any(w in text_raw for w in [ "\u092a\u0941\u0932\u093f\u0938", "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", ]): return "Authority/Police Impersonation" if any(w in text_lower for w in [ "bank official", "rbi", "bank manager", "account blocked", "account suspended", ]): return "Bank Official Impersonation" if any(w in text_lower for w in ["otp", "password", "pin", "cvv"]): return "Credential/OTP Harvesting" if any(w in text_lower for w in ["refund", "cashback", "insurance claim"]): return "Refund/Insurance Scam" if any(w in text_lower for w in ["investment", "returns", "crypto", "trading", "profit"]): return "Investment/Trading Scam" if any(w in text_lower for w in ["electricity", "electric bill", "power bill", "power cut", "power disconnection"]): return "Electricity Bill Scam" if any(w in text_lower for w in ["utility", "water bill", "gas bill"]): return "Utility Bill Scam" if any(w in text_lower for w in ["job", "employment", "hiring", "work from home", "earn from home"]): return "Job/Employment Scam" if any(w in text_lower for w in ["income tax", "tax notice", "tax department", "it department"]): return "Income Tax Scam" if any(w in text_lower for w in ["tech support", "computer problem", "virus", "microsoft", "windows"]): return "Tech Support Scam" if any(w in text_lower for w in ["government scheme", "govt scheme", "subsidy", "pm scheme"]): return "Government Scheme Scam" if any(w in text_lower for w in ["upi", "send money", "transfer", "pay"]): return "Payment Redirection Fraud" return None def extract_suspicious_keywords( messages: List[Dict], scam_indicators: List[str], ) -> List[str]: """ Extract suspicious keywords from the conversation. Checks scammer messages for English, Hindi, and Hinglish scam keywords so that multilingual conversations produce meaningful keyword lists. Args: messages: List of conversation messages scam_indicators: List of detected scam indicators from detector Returns: List of suspicious keywords found in messages (up to 25) """ keywords = set(scam_indicators) if scam_indicators else set() # English suspicious patterns en_patterns = [ "urgent", "immediately", "now", "today", "hurry", "fast", "quick", "won", "winner", "prize", "lottery", "jackpot", "congratulations", "otp", "verify", "confirm", "blocked", "suspended", "deactivated", "police", "arrest", "court", "legal", "investigation", "warrant", "transfer", "send money", "pay now", "account blocked", "free", "gift", "reward", "selected", "lucky", "click here", "call now", "limited time", "expire", "kyc", "aadhaar", "pan card", "link expired", "upi", "bank account", "ifsc", "cvv", "pin", "loan approved", "credit card", "insurance", "refund", "delivery failed", "customs", "parcel", ] # Hindi / Hinglish suspicious patterns hi_patterns = [ "turant", "jaldi", "abhi", "jeeta", "jeet", "inaam", "lottery", "otp bhejo", "verify karo", "confirm karo", "block", "suspend", "band", "police", "giraftaar", "giraftari", "court", "kanoon", "paise bhejo", "transfer karo", "pay karo", "muft", "free", "gift", "link pe click", "call karo", "kyc update", "aadhaar", "pan", "loan", "insurance", "refund", # Devanagari "\u0924\u0941\u0930\u0902\u0924", # turant "\u091c\u0932\u094d\u0926\u0940", # jaldi "\u0905\u092d\u0940", # abhi "\u091c\u0940\u0924\u093e", # jeeta "\u0907\u0928\u093e\u092e", # inaam "\u0932\u0949\u091f\u0930\u0940", # lottery "\u092a\u0941\u0932\u093f\u0938", # police "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", # giraftaar "\u092a\u0948\u0938\u0947 \u092d\u0947\u091c\u094b", # paise bhejo "\u091f\u094d\u0930\u093e\u0902\u0938\u092b\u0930", # transfer "\u092c\u094d\u0932\u0949\u0915", # block "\u092c\u0948\u0902\u0915", # bank "\u0916\u093e\u0924\u093e", # khaata "\u092f\u0942\u092a\u0940\u0906\u0908", # UPI "\u0913\u091f\u0940\u092a\u0940", # OTP ] scammer_messages = [ m.get("message", "") for m in messages if m.get("sender") == "scammer" ] full_text = " ".join(scammer_messages).lower() for pattern in en_patterns: if pattern in full_text: keywords.add(pattern) # Hindi patterns need original-case text for Devanagari matching full_text_raw = " ".join(scammer_messages) for pattern in hi_patterns: if pattern.lower() in full_text or pattern in full_text_raw: keywords.add(pattern) return sorted(keywords)[:25] def send_final_result_to_guvi( session_id: str, scam_detected: bool, total_messages: int, extracted_intel: Dict, messages: List[Dict], scam_indicators: List[str] = None, agent_notes: str = None, engagement_duration_seconds: int = 0, ) -> bool: """ Send final result to GUVI evaluation endpoint. This is MANDATORY for the hackathon submission. The platform uses this data to measure engagement depth, intelligence quality, and agent effectiveness. Args: session_id: Unique session ID for the conversation scam_detected: Whether scam intent was confirmed total_messages: Total number of messages exchanged extracted_intel: Dictionary of extracted intelligence messages: Full conversation history scam_indicators: Optional list of detected scam indicators agent_notes: Optional pre-generated agent notes engagement_duration_seconds: Duration of engagement in seconds Returns: True if callback was successful, False otherwise """ if not settings.GUVI_CALLBACK_ENABLED: logger.info("GUVI callback disabled, skipping") return True callback_url = settings.GUVI_CALLBACK_URL or DEFAULT_GUVI_CALLBACK_URL suspicious_keywords = extract_suspicious_keywords( messages, scam_indicators or [], ) if not agent_notes: agent_notes = generate_agent_notes( messages, extracted_intel, scam_indicators or [], ) # Identify scam type from messages scammer_messages = [m.get("message", "") for m in messages if m.get("sender") == "scammer"] scammer_text = " ".join(scammer_messages) scam_type = identify_scam_type(scammer_text.lower(), scammer_text) # Build payload in GUVI's expected format (camelCase) payload = { "sessionId": session_id, "status": "success", "scamDetected": scam_detected, "scamType": scam_type or "Financial Fraud", "confidenceLevel": 0.95, "totalMessagesExchanged": total_messages, "engagementDurationSeconds": engagement_duration_seconds, "extractedIntelligence": { "bankAccounts": extracted_intel.get("bank_accounts", []), "upiIds": extracted_intel.get("upi_ids", []), "ifscCodes": extracted_intel.get("ifsc_codes", []), "phishingLinks": extracted_intel.get("phishing_links", []), "phoneNumbers": extracted_intel.get("phone_numbers", []), "emailAddresses": extracted_intel.get("email_addresses", []), "caseIds": extracted_intel.get("case_ids", []), "policyNumbers": extracted_intel.get("policy_numbers", []), "orderNumbers": extracted_intel.get("order_numbers", []), "suspiciousKeywords": suspicious_keywords, }, "engagementMetrics": { "engagementDurationSeconds": engagement_duration_seconds, "totalMessagesExchanged": total_messages, }, "agentNotes": agent_notes, } logger.info(f"Sending GUVI callback for session {session_id}") logger.debug(f"GUVI callback payload: {payload}") try: response = requests.post( callback_url, json=payload, timeout=10, headers={ "Content-Type": "application/json", }, ) if response.status_code == 200: logger.info(f"GUVI callback successful for session {session_id}") return True else: logger.warning( f"GUVI callback returned status {response.status_code}: {response.text}" ) return False except requests.exceptions.Timeout: logger.error(f"GUVI callback timed out for session {session_id}") return False except requests.exceptions.RequestException as e: logger.error(f"GUVI callback failed for session {session_id}: {e}") return False except Exception as e: logger.error(f"Unexpected error in GUVI callback: {e}") return False def should_send_callback( turn_count: int, max_turns_reached: bool, extraction_confidence: float, terminated: bool, ) -> bool: """ Determine if GUVI callback should be sent based on conversation state. Callback should be sent when: - Turn count >= 5 (GUVI runs 10 turns max, send callback frequently) - Max turns (10 or 20) is reached - High extraction confidence (>= 0.5) achieved - Session is explicitly terminated Args: turn_count: Current turn count max_turns_reached: Whether max turns limit was hit extraction_confidence: Confidence in extracted intelligence terminated: Whether session is terminated Returns: True if callback should be sent """ # GUVI runs 10 turns max - send callback after 5+ turns to ensure # the evaluator receives final output before conversation ends if turn_count >= 5: logger.info(f"Callback trigger: turn count >= 5 ({turn_count})") return True # Send if max turns reached (either GUVI's 10 or our 20) if max_turns_reached or turn_count >= 10: logger.info(f"Callback trigger: max turns reached ({turn_count})") return True # Send if moderate extraction confidence (lowered from 0.85 to 0.5) if extraction_confidence >= 0.5: logger.info(f"Callback trigger: extraction confidence ({extraction_confidence:.2f})") return True # Send if explicitly terminated if terminated: logger.info("Callback trigger: session terminated") return True return False