scam / app /utils /guvi_callback.py
Gankit12's picture
Updates: README, endpoints, extractor, guvi_callback; add GUVI test scripts
632166f
"""
GUVI Hackathon Final Result Callback Module.
Implements the mandatory callback to GUVI's evaluation endpoint
as specified in the problem statement.
Requirement: "Once the system detects scam intent and the AI Agent
completes the engagement, participants must send the final extracted
intelligence to the GUVI evaluation endpoint."
Callback Endpoint: POST https://hackathon.guvi.in/api/updateHoneyPotFinalResult
"""
import requests
from typing import Dict, List, Optional
from datetime import datetime
from app.config import settings
from app.utils.logger import get_logger
logger = get_logger(__name__)
# Default GUVI callback URL
DEFAULT_GUVI_CALLBACK_URL = "https://hackathon.guvi.in/api/updateHoneyPotFinalResult"
def identify_red_flags(messages: List[Dict]) -> List[str]:
"""
Identify explicit red flags from scammer messages.
Returns a list of identified red flags for scoring.
GUVI Doc: "Red Flag Identification | 8 pts | >=5 flags = 8pts"
Args:
messages: List of conversation messages
Returns:
List of identified red flag descriptions
"""
red_flags: List[str] = []
scammer_messages = [
m.get("message", "") for m in messages if m.get("sender") == "scammer"
]
full_text_lower = " ".join(scammer_messages).lower()
full_text_raw = " ".join(scammer_messages)
# Red flag categories with specific patterns
red_flag_patterns = {
"Urgency/Time Pressure": [
"urgent", "immediately", "now", "today", "hurry", "quick",
"fast", "expire", "last chance", "limited time", "deadline",
"turant", "jaldi", "abhi", "foran",
],
"Authority Impersonation": [
"police", "court", "government", "bank official", "rbi",
"investigation", "arrest", "legal", "warrant", "department",
"officer", "inspector", "commissioner",
],
"Account/Service Threat": [
"block", "suspend", "deactivate", "freeze", "seize",
"terminate", "close", "disable", "restrict",
],
"OTP/Credential Request": [
"otp", "password", "pin", "cvv", "verify", "confirm",
"share otp", "send otp", "tell otp",
],
"Prize/Lottery Lure": [
"won", "winner", "prize", "lottery", "jackpot", "lucky",
"congratulations", "reward", "selected", "chosen",
],
"Payment/Fee Demand": [
"processing fee", "transfer fee", "tax", "charges",
"pay first", "send money", "registration fee",
],
"Suspicious Link": [
"http://", "https://", "click here", "click link",
"www.", ".xyz", ".tk", "bit.ly", "tinyurl",
],
"KYC/Document Request": [
"kyc", "aadhaar", "pan card", "pan number", "update kyc",
"verify identity", "link expired",
],
"False Urgency Claim": [
"within 24 hours", "within 1 hour", "today only",
"expires today", "last warning", "final notice",
],
"Impersonation of Known Entity": [
"sbi", "hdfc", "icici", "axis", "rbi", "amazon",
"flipkart", "paytm", "phonepe", "gpay",
],
}
for flag_name, patterns in red_flag_patterns.items():
for pattern in patterns:
if pattern in full_text_lower or pattern in full_text_raw:
if flag_name not in red_flags:
red_flags.append(flag_name)
break
return red_flags
def count_elicitation_attempts(messages: List[Dict]) -> int:
"""
Count the number of elicitation attempts made by the agent.
GUVI Doc: "Information Elicitation | 7 pts | Each elicitation attempt earns 1.5pts (max 7)"
Max 5 attempts for full 7 points (5 * 1.5 = 7.5, capped at 7).
Elicitation = asking questions to extract scammer's financial details.
Args:
messages: List of conversation messages
Returns:
Number of elicitation attempts detected
"""
elicitation_patterns = [
# Direct questions for financial details
r"upi\s*(id)?[\?\s]",
r"phone\s*(number)?[\?\s]",
r"account\s*(number)?[\?\s]",
r"ifsc[\?\s]",
r"bank\s*(details|account)[\?\s]",
r"what.{0,20}(upi|phone|number|account|ifsc)",
r"give.{0,15}(upi|phone|number|account|ifsc)",
r"tell.{0,15}(upi|phone|number|account|ifsc)",
r"send.{0,15}(upi|phone|number|account|details)",
r"share.{0,15}(upi|phone|number|account|details)",
# Questions ending with ?
r"where.{0,30}\?",
r"what.{0,30}\?",
r"how.{0,30}\?",
r"which.{0,30}\?",
# Hindi/Hinglish elicitation
r"kya\s*hai",
r"batao",
r"bolo",
r"dijiye",
r"bhejo",
]
import re
agent_messages = [
m.get("message", "") for m in messages if m.get("sender") == "agent"
]
count = 0
for msg in agent_messages:
msg_lower = msg.lower()
for pattern in elicitation_patterns:
if re.search(pattern, msg_lower):
count += 1
break # Count each message only once
return min(count, 5) # Cap at 5 for max 7 points
def generate_agent_notes(
messages: List[Dict],
extracted_intel: Dict,
scam_indicators: List[str],
) -> str:
"""
Generate a detailed summary of scammer behavior for agent notes.
Produces a law-enforcement-friendly summary covering:
- Identified red flags (explicitly enumerated for scoring)
- Identified scam type
- Tactics used (urgency, threats, impersonation, etc.)
- Elicitation attempts count
- Extracted intelligence summary
- Conversation depth
Args:
messages: List of conversation messages
extracted_intel: Extracted intelligence dictionary
scam_indicators: List of detected scam indicators/keywords
Returns:
Agent notes string with explicit red flag enumeration for GUVI scoring
"""
notes_parts: List[str] = []
scammer_messages = [
m.get("message", "") for m in messages if m.get("sender") == "scammer"
]
full_scammer_text = " ".join(scammer_messages).lower()
full_scammer_raw = " ".join(scammer_messages)
# ---- Red Flags (explicitly enumerated for scoring) ----
red_flags = identify_red_flags(messages)
if red_flags:
flags_str = ", ".join(f"[{i+1}] {flag}" for i, flag in enumerate(red_flags))
notes_parts.append(f"RED FLAGS DETECTED ({len(red_flags)}): {flags_str}")
# ---- Elicitation attempts ----
elicitation_count = count_elicitation_attempts(messages)
if elicitation_count > 0:
notes_parts.append(f"ELICITATION ATTEMPTS: {elicitation_count} direct questions asked to extract scammer details")
# ---- Scam type identification ----
scam_type = identify_scam_type(full_scammer_text, full_scammer_raw)
if scam_type:
notes_parts.append(f"Scam type: {scam_type}")
# ---- Tactic detection ----
urgency_words = [
"urgent", "immediately", "now", "today", "hurry", "quick",
"fast", "expire", "last chance", "turant", "jaldi", "abhi",
"\u0924\u0941\u0930\u0902\u0924", "\u091c\u0932\u094d\u0926\u0940",
]
if any(w in full_scammer_text or w in full_scammer_raw for w in urgency_words):
notes_parts.append("Used urgency tactics to pressure victim")
authority_words = [
"police", "court", "government", "bank official", "rbi",
"investigation", "arrest", "legal", "warrant", "department",
"\u092a\u0941\u0932\u093f\u0938",
"\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930",
]
if any(w in full_scammer_text or w in full_scammer_raw for w in authority_words):
notes_parts.append("Attempted authority/official impersonation")
prize_words = [
"won", "winner", "prize", "lottery", "jackpot", "lucky",
"congratulations", "reward", "jeeta", "jeet", "inaam",
"\u091c\u0940\u0924\u093e", "\u0907\u0928\u093e\u092e",
]
if any(w in full_scammer_text or w in full_scammer_raw for w in prize_words):
notes_parts.append("Used prize/lottery lure")
payment_words = [
"upi", "transfer", "send money", "pay", "account number",
"bank details", "paise bhejo", "transfer karo",
"\u092a\u0948\u0938\u0947 \u092d\u0947\u091c\u094b",
]
if any(w in full_scammer_text or w in full_scammer_raw for w in payment_words):
notes_parts.append("Attempted payment/money redirection")
credential_words = [
"otp", "password", "pin", "cvv", "verify", "confirm",
"otp bhejo", "verify karo", "\u0913\u091f\u0940\u092a\u0940",
]
if any(w in full_scammer_text or w in full_scammer_raw for w in credential_words):
notes_parts.append("Attempted OTP/credential harvesting")
threat_words = [
"block", "suspend", "deactivate", "arrest", "fine",
"penalty", "legal action", "case file", "fir",
"\u092c\u094d\u0932\u0949\u0915", "\u092c\u0902\u0926",
]
if any(w in full_scammer_text or w in full_scammer_raw for w in threat_words):
notes_parts.append("Used threat/fear tactics")
kyc_words = ["kyc", "aadhaar", "pan card", "pan number", "link expired", "update kyc"]
if any(w in full_scammer_text for w in kyc_words):
notes_parts.append("Used KYC/document verification lure")
loan_words = ["loan approved", "pre-approved", "emi", "interest rate", "loan offer"]
if any(w in full_scammer_text for w in loan_words):
notes_parts.append("Used fake loan/credit offer")
delivery_words = ["delivery failed", "customs", "parcel", "courier", "shipment"]
if any(w in full_scammer_text for w in delivery_words):
notes_parts.append("Used fake delivery/parcel scam")
# ---- Intelligence summary ----
intel_items: List[str] = []
if extracted_intel.get("upi_ids"):
items = extracted_intel["upi_ids"]
intel_items.append(f"{len(items)} UPI ID(s): {', '.join(items[:3])}")
if extracted_intel.get("bank_accounts"):
items = extracted_intel["bank_accounts"]
intel_items.append(f"{len(items)} bank account(s)")
if extracted_intel.get("ifsc_codes"):
items = extracted_intel["ifsc_codes"]
intel_items.append(f"{len(items)} IFSC code(s): {', '.join(items[:3])}")
if extracted_intel.get("phone_numbers"):
items = extracted_intel["phone_numbers"]
intel_items.append(f"{len(items)} phone number(s): {', '.join(items[:3])}")
if extracted_intel.get("phishing_links"):
items = extracted_intel["phishing_links"]
intel_items.append(f"{len(items)} phishing link(s)")
if extracted_intel.get("email_addresses"):
items = extracted_intel["email_addresses"]
intel_items.append(f"{len(items)} email address(es): {', '.join(items[:3])}")
if extracted_intel.get("case_ids"):
items = extracted_intel["case_ids"]
intel_items.append(f"{len(items)} case/reference ID(s): {', '.join(items[:3])}")
if extracted_intel.get("policy_numbers"):
items = extracted_intel["policy_numbers"]
intel_items.append(f"{len(items)} policy number(s): {', '.join(items[:3])}")
if extracted_intel.get("order_numbers"):
items = extracted_intel["order_numbers"]
intel_items.append(f"{len(items)} order/transaction ID(s): {', '.join(items[:3])}")
if intel_items:
notes_parts.append(f"Extracted intelligence: {'; '.join(intel_items)}")
# ---- Conversation depth ----
total_turns = len(scammer_messages)
if total_turns > 0:
notes_parts.append(f"Conversation depth: {total_turns} scammer message(s) analyzed")
if notes_parts:
return ". ".join(notes_parts) + "."
return "Scam engagement completed. Limited intelligence extracted."
def identify_scam_type(text_lower: str, text_raw: str = "") -> Optional[str]:
"""
Identify the primary scam type from scammer text.
Returns a human-readable scam type label or None if unknown.
"""
# Order matters: more specific checks first
if any(w in text_lower for w in ["kyc", "aadhaar", "pan card", "update kyc"]):
return "KYC/Document Verification Fraud"
if any(w in text_lower for w in ["loan approved", "pre-approved", "emi", "loan offer"]):
return "Fake Loan/Credit Offer"
if any(w in text_lower for w in ["delivery failed", "customs", "parcel", "courier"]):
return "Fake Delivery/Parcel Scam"
if any(w in text_lower for w in ["won", "winner", "prize", "lottery", "jackpot"]):
return "Prize/Lottery Scam"
if any(w in text_lower for w in [
"police", "arrest", "warrant", "court", "legal action", "investigation",
"\u092a\u0941\u0932\u093f\u0938", "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930",
]) or any(w in text_raw for w in [
"\u092a\u0941\u0932\u093f\u0938", "\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930",
]):
return "Authority/Police Impersonation"
if any(w in text_lower for w in [
"bank official", "rbi", "bank manager", "account blocked", "account suspended",
]):
return "Bank Official Impersonation"
if any(w in text_lower for w in ["otp", "password", "pin", "cvv"]):
return "Credential/OTP Harvesting"
if any(w in text_lower for w in ["refund", "cashback", "insurance claim"]):
return "Refund/Insurance Scam"
if any(w in text_lower for w in ["investment", "returns", "crypto", "trading", "profit"]):
return "Investment/Trading Scam"
if any(w in text_lower for w in ["electricity", "electric bill", "power bill", "power cut", "power disconnection"]):
return "Electricity Bill Scam"
if any(w in text_lower for w in ["utility", "water bill", "gas bill"]):
return "Utility Bill Scam"
if any(w in text_lower for w in ["job", "employment", "hiring", "work from home", "earn from home"]):
return "Job/Employment Scam"
if any(w in text_lower for w in ["income tax", "tax notice", "tax department", "it department"]):
return "Income Tax Scam"
if any(w in text_lower for w in ["tech support", "computer problem", "virus", "microsoft", "windows"]):
return "Tech Support Scam"
if any(w in text_lower for w in ["government scheme", "govt scheme", "subsidy", "pm scheme"]):
return "Government Scheme Scam"
if any(w in text_lower for w in ["upi", "send money", "transfer", "pay"]):
return "Payment Redirection Fraud"
return None
def extract_suspicious_keywords(
messages: List[Dict],
scam_indicators: List[str],
) -> List[str]:
"""
Extract suspicious keywords from the conversation.
Checks scammer messages for English, Hindi, and Hinglish scam keywords
so that multilingual conversations produce meaningful keyword lists.
Args:
messages: List of conversation messages
scam_indicators: List of detected scam indicators from detector
Returns:
List of suspicious keywords found in messages (up to 25)
"""
keywords = set(scam_indicators) if scam_indicators else set()
# English suspicious patterns
en_patterns = [
"urgent", "immediately", "now", "today", "hurry", "fast", "quick",
"won", "winner", "prize", "lottery", "jackpot", "congratulations",
"otp", "verify", "confirm", "blocked", "suspended", "deactivated",
"police", "arrest", "court", "legal", "investigation", "warrant",
"transfer", "send money", "pay now", "account blocked",
"free", "gift", "reward", "selected", "lucky",
"click here", "call now", "limited time", "expire",
"kyc", "aadhaar", "pan card", "link expired",
"upi", "bank account", "ifsc", "cvv", "pin",
"loan approved", "credit card", "insurance", "refund",
"delivery failed", "customs", "parcel",
]
# Hindi / Hinglish suspicious patterns
hi_patterns = [
"turant", "jaldi", "abhi",
"jeeta", "jeet", "inaam", "lottery",
"otp bhejo", "verify karo", "confirm karo",
"block", "suspend", "band",
"police", "giraftaar", "giraftari", "court", "kanoon",
"paise bhejo", "transfer karo", "pay karo",
"muft", "free", "gift",
"link pe click", "call karo",
"kyc update", "aadhaar", "pan",
"loan", "insurance", "refund",
# Devanagari
"\u0924\u0941\u0930\u0902\u0924", # turant
"\u091c\u0932\u094d\u0926\u0940", # jaldi
"\u0905\u092d\u0940", # abhi
"\u091c\u0940\u0924\u093e", # jeeta
"\u0907\u0928\u093e\u092e", # inaam
"\u0932\u0949\u091f\u0930\u0940", # lottery
"\u092a\u0941\u0932\u093f\u0938", # police
"\u0917\u093f\u0930\u092b\u094d\u0924\u093e\u0930", # giraftaar
"\u092a\u0948\u0938\u0947 \u092d\u0947\u091c\u094b", # paise bhejo
"\u091f\u094d\u0930\u093e\u0902\u0938\u092b\u0930", # transfer
"\u092c\u094d\u0932\u0949\u0915", # block
"\u092c\u0948\u0902\u0915", # bank
"\u0916\u093e\u0924\u093e", # khaata
"\u092f\u0942\u092a\u0940\u0906\u0908", # UPI
"\u0913\u091f\u0940\u092a\u0940", # OTP
]
scammer_messages = [
m.get("message", "") for m in messages if m.get("sender") == "scammer"
]
full_text = " ".join(scammer_messages).lower()
for pattern in en_patterns:
if pattern in full_text:
keywords.add(pattern)
# Hindi patterns need original-case text for Devanagari matching
full_text_raw = " ".join(scammer_messages)
for pattern in hi_patterns:
if pattern.lower() in full_text or pattern in full_text_raw:
keywords.add(pattern)
return sorted(keywords)[:25]
def send_final_result_to_guvi(
session_id: str,
scam_detected: bool,
total_messages: int,
extracted_intel: Dict,
messages: List[Dict],
scam_indicators: List[str] = None,
agent_notes: str = None,
engagement_duration_seconds: int = 0,
) -> bool:
"""
Send final result to GUVI evaluation endpoint.
This is MANDATORY for the hackathon submission. The platform uses
this data to measure engagement depth, intelligence quality, and
agent effectiveness.
Args:
session_id: Unique session ID for the conversation
scam_detected: Whether scam intent was confirmed
total_messages: Total number of messages exchanged
extracted_intel: Dictionary of extracted intelligence
messages: Full conversation history
scam_indicators: Optional list of detected scam indicators
agent_notes: Optional pre-generated agent notes
engagement_duration_seconds: Duration of engagement in seconds
Returns:
True if callback was successful, False otherwise
"""
if not settings.GUVI_CALLBACK_ENABLED:
logger.info("GUVI callback disabled, skipping")
return True
callback_url = settings.GUVI_CALLBACK_URL or DEFAULT_GUVI_CALLBACK_URL
suspicious_keywords = extract_suspicious_keywords(
messages,
scam_indicators or [],
)
if not agent_notes:
agent_notes = generate_agent_notes(
messages,
extracted_intel,
scam_indicators or [],
)
# Identify scam type from messages
scammer_messages = [m.get("message", "") for m in messages if m.get("sender") == "scammer"]
scammer_text = " ".join(scammer_messages)
scam_type = identify_scam_type(scammer_text.lower(), scammer_text)
# Build payload in GUVI's expected format (camelCase)
payload = {
"sessionId": session_id,
"status": "success",
"scamDetected": scam_detected,
"scamType": scam_type or "Financial Fraud",
"confidenceLevel": 0.95,
"totalMessagesExchanged": total_messages,
"engagementDurationSeconds": engagement_duration_seconds,
"extractedIntelligence": {
"bankAccounts": extracted_intel.get("bank_accounts", []),
"upiIds": extracted_intel.get("upi_ids", []),
"ifscCodes": extracted_intel.get("ifsc_codes", []),
"phishingLinks": extracted_intel.get("phishing_links", []),
"phoneNumbers": extracted_intel.get("phone_numbers", []),
"emailAddresses": extracted_intel.get("email_addresses", []),
"caseIds": extracted_intel.get("case_ids", []),
"policyNumbers": extracted_intel.get("policy_numbers", []),
"orderNumbers": extracted_intel.get("order_numbers", []),
"suspiciousKeywords": suspicious_keywords,
},
"engagementMetrics": {
"engagementDurationSeconds": engagement_duration_seconds,
"totalMessagesExchanged": total_messages,
},
"agentNotes": agent_notes,
}
logger.info(f"Sending GUVI callback for session {session_id}")
logger.debug(f"GUVI callback payload: {payload}")
try:
response = requests.post(
callback_url,
json=payload,
timeout=10,
headers={
"Content-Type": "application/json",
},
)
if response.status_code == 200:
logger.info(f"GUVI callback successful for session {session_id}")
return True
else:
logger.warning(
f"GUVI callback returned status {response.status_code}: {response.text}"
)
return False
except requests.exceptions.Timeout:
logger.error(f"GUVI callback timed out for session {session_id}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"GUVI callback failed for session {session_id}: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error in GUVI callback: {e}")
return False
def should_send_callback(
turn_count: int,
max_turns_reached: bool,
extraction_confidence: float,
terminated: bool,
) -> bool:
"""
Determine if GUVI callback should be sent based on conversation state.
Callback should be sent when:
- Turn count >= 5 (GUVI runs 10 turns max, send callback frequently)
- Max turns (10 or 20) is reached
- High extraction confidence (>= 0.5) achieved
- Session is explicitly terminated
Args:
turn_count: Current turn count
max_turns_reached: Whether max turns limit was hit
extraction_confidence: Confidence in extracted intelligence
terminated: Whether session is terminated
Returns:
True if callback should be sent
"""
# GUVI runs 10 turns max - send callback after 5+ turns to ensure
# the evaluator receives final output before conversation ends
if turn_count >= 5:
logger.info(f"Callback trigger: turn count >= 5 ({turn_count})")
return True
# Send if max turns reached (either GUVI's 10 or our 20)
if max_turns_reached or turn_count >= 10:
logger.info(f"Callback trigger: max turns reached ({turn_count})")
return True
# Send if moderate extraction confidence (lowered from 0.85 to 0.5)
if extraction_confidence >= 0.5:
logger.info(f"Callback trigger: extraction confidence ({extraction_confidence:.2f})")
return True
# Send if explicitly terminated
if terminated:
logger.info("Callback trigger: session terminated")
return True
return False