Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import re | |
| from typing import List, Dict, Any, Optional | |
| from langgraph.graph import StateGraph, END, START | |
| from langgraph.checkpoint.base import BaseCheckpointSaver | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from pydantic import ValidationError | |
| from openai import OpenAI | |
| from models import AgentState, Message, ExtractedIntelligence | |
| # --- Configuration --- | |
| CALLBACK_URL = "https://hackathon.guvi.in/api/updateHoneyPotFinalResult" | |
| HONEYPOT_API_KEY = os.environ.get("HONEYPOT_API_KEY", "sk_test_123456789") | |
| # Updated to use Arcee Trinity Large Preview: 100% upstream availability and optimized for agents | |
| OPENROUTER_MODEL = os.environ.get("OPENROUTER_MODEL", "arcee-ai/trinity-large-preview:free") | |
| # API key for OpenRouter | |
| OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") | |
| client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=OPENROUTER_API_KEY, | |
| ) | |
| def call_openrouter(messages: List[Dict[str, str]], max_tokens: int = 512) -> str: | |
| """ | |
| Call the OpenRouter API to generate a text response. | |
| Updated: No reasoning tokens requested to ensure full content delivery. | |
| Raises ValueError on empty responses to trigger fallback. | |
| """ | |
| if not OPENROUTER_API_KEY: | |
| raise ValueError("OPENROUTER_API_KEY is not set.") | |
| response = client.chat.completions.create( | |
| model=OPENROUTER_MODEL, | |
| messages=messages, | |
| max_tokens=max_tokens | |
| # include_reasoning is False by default; avoiding extra_body for maximum compatibility | |
| ) | |
| # Guard against empty responses | |
| message_obj = response.choices[0].message | |
| content = getattr(message_obj, "content", "") | |
| if not content or not content.strip(): | |
| raise ValueError("Empty model response from OpenRouter; check model or parameters.") | |
| return content | |
| # --- LangGraph Nodes (Functions) --- | |
| def detect_scam(state: AgentState) -> AgentState: | |
| """Node 1: Highly specific scam detection for Indian fraud patterns.""" | |
| latest_message = state["conversationHistory"][-1] | |
| text = latest_message.text | |
| is_scam = False | |
| reason = "No scam indicators found" | |
| # Detailed prompt focusing on specific Indian scam vectors and subtle indicators | |
| prompt = ( | |
| "You are a Senior Fraud Analyst specializing in Indian Cybercrime patterns. " | |
| "Analyze the following message for scam intent. Be extremely vigilant for: " | |
| "1. IMPERSONATION: Claiming to be from SBI, HDFC, ICICI, Paytm, PhonePe, Electricity Board (BESCOM/TNEB), or Government agencies. " | |
| "2. URGENCY/THREATS: 'Account blocked', 'Electricity disconnected', 'KYC expired', 'Sim card block', 'Avoid suspension'. " | |
| "3. SOLICITATION: Asking for OTP, PIN, Password, or to 'Verify' via a link or phone call. " | |
| "4. FINANCIAL BAIT: Lottery (KBC), Prize, Job offers with 'registration fees', or 'Refund' processing. " | |
| "5. PAYMENT REDIRECTION: Providing UPI IDs, Bank Accounts, or QR codes for 'verification' or 'payment'. " | |
| "6. OBFUSCATION: Using unusual characters or spaces in UPI IDs or links to bypass filters. " | |
| "\n\n" | |
| "Even if the message is just a greeting ('Hi', 'Hello'), check the conversation history for context. " | |
| "If the total message count is high (e.g., 10), and the user is engaging with a potential scammer, flag it. " | |
| "\n\n" | |
| "Respond ONLY in the format 'true|<reason>' if it is a scam or 'false|<reason>' if not. " | |
| f"Message: {text}" | |
| ) | |
| try: | |
| response = call_openrouter([{"role": "user", "content": prompt}], max_tokens=150) | |
| resp = (response or "").strip() | |
| if not resp: | |
| raise ValueError("OpenRouter returned empty response") | |
| # Split by the first pipe to capture the full reason | |
| if "|" in resp: | |
| parts = resp.split("|", 1) | |
| flag_part = parts[0].strip().lower() | |
| is_scam = flag_part in {"true", "yes"} | |
| reason = parts[1].strip() | |
| else: | |
| # Graceful fallback for models that don't follow delimiter format | |
| low = resp.lower().strip() | |
| if low.startswith("true"): | |
| is_scam = True | |
| reason = resp[4:].lstrip(" |:-").strip() | |
| elif low.startswith("false"): | |
| is_scam = False | |
| reason = resp[5:].lstrip(" |:-").strip() | |
| else: | |
| m_flag = re.search(r"\b(true|false)\b", low) | |
| if m_flag: | |
| is_scam = (m_flag.group(1) == "true") | |
| reason = resp.strip() | |
| if not reason: | |
| reason = "OpenRouter classification did not provide a reason" | |
| except Exception as e: | |
| # Fallback heuristic if the model fails or returns empty content | |
| lower_text = text.lower() | |
| scam_keywords = [ | |
| "bank", "account", "blocked", "verify", "otp", "password", "upi", "urgent", "link", "update", | |
| "kyc", "electricity", "bill", "disconnected", "kbc", "lottery", "prize", "job", "registration", | |
| "customer care", "helpline", "sbi", "hdfc", "icici", "paytm", "phonepe", "gpay", "suspension" | |
| ] | |
| for kw in scam_keywords: | |
| if kw in lower_text: | |
| is_scam = True | |
| reason = f"Keyword '{kw}' found in message (fallback: {str(e)})" | |
| break | |
| state["scamDetected"] = is_scam | |
| if "agentNotes" not in state: | |
| state["agentNotes"] = "" | |
| state["agentNotes"] += f"Detection: {reason}. " | |
| return state | |
| def agent_persona_response(state: AgentState) -> AgentState: | |
| """Node 2: Generates a believable Indian persona response to maximize intelligence extraction.""" | |
| if not state["scamDetected"]: | |
| state["agent_response_text"] = "Thank you for reaching out. Have a nice day!" | |
| state["should_continue_engagement"] = False | |
| return state | |
| latest_text = state["conversationHistory"][-1].text | |
| # Detailed prompt for persona engagement | |
| prompt = ( | |
| "You are an AI Honeypot Agent. Your goal is to keep a scammer engaged to extract intelligence (UPI IDs, Bank Accounts, Links, Phone Numbers). " | |
| "Adopt a believable Indian persona: a slightly worried, non-tech-savvy person who wants to comply but is 'having trouble'. " | |
| "Use common Indian English expressions ('Sir', 'Please help', 'I am trying', 'Network issue'). " | |
| "Do NOT reveal you are an AI or that you suspect a scam. " | |
| "If they sent a link, ask 'Is this the correct link? It is not opening'. " | |
| "If they asked for payment, ask 'Can I pay via UPI? What is your ID?' or 'Can you send account details again?'. " | |
| "\n\n" | |
| f"Scammer's latest message: {latest_text}\n\n" | |
| "Respond in under 40 words. Be polite and encouraging." | |
| ) | |
| try: | |
| response_text = call_openrouter([{"role": "user", "content": prompt}], max_tokens=150) | |
| response_text = response_text.strip().split('\n')[0] | |
| except Exception: | |
| # Fallback phrase if the model fails or returns empty content | |
| response_text = ( | |
| "Sir, I am trying to do as you said but it is not working. " | |
| "Can you please guide me again? I don't want my account to be blocked." | |
| ) | |
| agent_message = Message( | |
| sender="user", | |
| text=response_text, | |
| timestamp=state["conversationHistory"][-1].timestamp | |
| ) | |
| state["conversationHistory"].append(agent_message) | |
| state["agent_response_text"] = response_text | |
| state["totalMessagesExchanged"] += 1 | |
| state["should_continue_engagement"] = True | |
| return state | |
| def extract_intelligence(state: AgentState) -> AgentState: | |
| """Node 3: Extract structured intelligence (phones, bank accounts, UPI IDs, links, keywords).""" | |
| # Combine all scammer messages for comprehensive extraction | |
| scammer_text = " ".join([m.text for m in state["conversationHistory"] if m.sender == "scammer"]) | |
| # Helper: context-window scoring | |
| bank_kw = { | |
| "account": 2, "a/c": 2, "ac": 1, "acc": 2, "acct": 2, "account no": 3, "account number": 3, | |
| "ifsc": 3, "branch": 2, "passbook": 2, "cheque": 2, "beneficiary": 3, | |
| "neft": 3, "rtgs": 3, "imps": 3, "statement": 2, "transfer": 2, "deposit": 2, | |
| "bank": 1, | |
| } | |
| phone_kw = { | |
| "call": 3, "phone": 2, "mobile": 2, "whatsapp": 3, "sms": 3, "otp": 3, | |
| "contact": 2, "helpline": 2, "customer care": 3, "dial": 2, "ring": 1, | |
| "missed call": 2, | |
| } | |
| def _get_ctx(text: str, s: int, e: int, win: int = 60) -> str: | |
| left = max(0, s - win) | |
| right = min(len(text), e + win) | |
| return text[left:right].lower() | |
| def _score_ctx(ctx: str) -> tuple[int, int]: | |
| b, p = 0, 0 | |
| for k, w in bank_kw.items(): | |
| if k in ctx: b += w | |
| for k, w in phone_kw.items(): | |
| if k in ctx: p += w | |
| if re.search(r"\b(ifsc|beneficiary|neft|rtgs|imps)\b", ctx): b += 2 | |
| if re.search(r"\b(call|whatsapp|otp|sms)\b", ctx): p += 1 | |
| return b, p | |
| def _is_phone_like(digits: str) -> bool: | |
| if len(digits) == 10 and digits[0] in "6789": return True | |
| if len(digits) == 11 and digits.startswith("0") and digits[1] in "6789": return True | |
| if len(digits) == 12 and digits.startswith("91") and digits[2] in "6789": return True | |
| return False | |
| def _phone_canonical(digits: str, had_plus: bool) -> Optional[str]: | |
| if len(digits) == 10 and digits[0] in "6789": return digits | |
| if len(digits) == 11 and digits.startswith("0") and digits[1] in "6789": return digits[1:] | |
| if len(digits) == 12 and digits.startswith("91") and digits[2] in "6789": return f"+91{digits[2:]}" | |
| return None | |
| candidates = [] | |
| for m in re.finditer(r"(?<!\d)(\+?\d[\d\s\-]{7,}\d)(?!\d)", scammer_text): | |
| raw = m.group(1) | |
| digits = re.sub(r"\D", "", raw) | |
| if 9 <= len(digits) <= 18: | |
| candidates.append({"raw": raw, "digits": digits, "start": m.start(1), "end": m.end(1), "had_plus": raw.strip().startswith("+")}) | |
| phone_numbers, bank_accounts, unknown_numbers = [], [], [] | |
| for c in candidates: | |
| digits = c["digits"] | |
| ctx = _get_ctx(scammer_text, c["start"], c["end"]) | |
| bank_score, phone_score = _score_ctx(ctx) | |
| if 9 <= len(digits) <= 18: bank_score += 1 | |
| if _is_phone_like(digits): phone_score += 1 | |
| phone_norm = _phone_canonical(digits, c["had_plus"]) | |
| if phone_norm: | |
| if bank_score >= phone_score + 2: bank_accounts.append(digits) | |
| elif phone_score >= bank_score + 2: phone_numbers.append(phone_norm) | |
| else: unknown_numbers.append(digits) | |
| else: | |
| if bank_score >= phone_score + 2: bank_accounts.append(digits) | |
| else: unknown_numbers.append(digits) | |
| upiIds = re.findall(r"\b[a-zA-Z0-9\.\-_]{3,}@[a-zA-Z]{3,}\b", scammer_text) | |
| phishing_links = re.findall(r"https?://(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}(?:/[^\s]*)?", scammer_text) | |
| scam_keywords_list = ["bank", "account", "blocked", "verify", "otp", "password", "upi", "urgent", "link", "update", "kyc", "electricity", "bill", "disconnected", "kbc", "lottery", "prize", "job", "registration", "customer care", "helpline", "sbi", "hdfc", "icici", "paytm", "phonepe", "gpay", "suspension", "claim"] | |
| found_keywords = [kw for kw in scam_keywords_list if kw.lower() in scammer_text.lower()] | |
| current_intel = state.get("extractedIntelligence", ExtractedIntelligence()) | |
| current_data = current_intel.model_dump() | |
| new_data = {"bankAccounts": bank_accounts, "upiIds": upiIds, "phishingLinks": phishing_links, "phoneNumbers": phone_numbers, "suspiciousKeywords": found_keywords, "unknownNumbers": unknown_numbers} | |
| for key, vals in new_data.items(): | |
| combined = current_data.get(key, []) + vals | |
| current_data[key] = list(set(combined)) | |
| state["extractedIntelligence"] = ExtractedIntelligence(**current_data) | |
| if any(new_data.values()): | |
| if "agentNotes" not in state: state["agentNotes"] = "" | |
| state["agentNotes"] += "Intelligence updated. " | |
| return state | |
| def decide_engagement_end(state: AgentState) -> AgentState: | |
| """Node 4: Decides whether to continue or end the conversation based on intelligence gathered.""" | |
| intelligence: ExtractedIntelligence = state.get("extractedIntelligence", ExtractedIntelligence()) | |
| MIN_SCAMMER_TURNS = 3 | |
| scammer_turns = sum(1 for m in state.get("conversationHistory", []) if getattr(m, "sender", None) == "scammer") | |
| continue_engagement = True | |
| if (len(intelligence.bankAccounts) > 0 or len(intelligence.upiIds) > 0 or len(intelligence.phishingLinks) > 0): | |
| continue_engagement = False | |
| if scammer_turns < MIN_SCAMMER_TURNS: | |
| continue_engagement = True | |
| if state.get("totalMessagesExchanged", 0) >= 10: | |
| continue_engagement = False | |
| if "agentNotes" not in state: state["agentNotes"] = "" | |
| state["agentNotes"] += "Engagement limit reached (10 messages). " | |
| state["should_continue_engagement"] = continue_engagement | |
| return state | |
| def final_callback(state: AgentState) -> AgentState: | |
| """Node 5: Sends the mandatory final result callback.""" | |
| if not state["scamDetected"] or state.get("callbackSent", False): | |
| return state | |
| intelligence = state.get("extractedIntelligence", ExtractedIntelligence()) | |
| payload = { | |
| "sessionId": state.get("sessionId"), | |
| "scamDetected": state.get("scamDetected", False), | |
| "totalMessagesExchanged": state.get("totalMessagesExchanged", 0), | |
| "extractedIntelligence": intelligence.model_dump(), | |
| "agentNotes": state.get("agentNotes", "") | |
| } | |
| headers = {"Content-Type": "application/json", "x-api-key": HONEYPOT_API_KEY} | |
| try: | |
| response = requests.post(CALLBACK_URL, json=payload, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| state["callbackSent"] = True | |
| if "agentNotes" not in state: state["agentNotes"] = "" | |
| state["agentNotes"] += "Final callback sent successfully. " | |
| except Exception as e: | |
| if "agentNotes" not in state: state["agentNotes"] = "" | |
| state["agentNotes"] += f"Final callback failed: {e}. " | |
| return state | |
| def create_honeypot_graph(checkpoint_saver: BaseCheckpointSaver): | |
| workflow = StateGraph(AgentState) | |
| workflow.add_node("detect_scam", detect_scam) | |
| workflow.add_node("extract_intelligence", extract_intelligence) | |
| workflow.add_node("agent_persona_response", agent_persona_response) | |
| workflow.add_node("decide_engagement_end", decide_engagement_end) | |
| workflow.add_node("final_callback", final_callback) | |
| workflow.add_edge(START, "detect_scam") | |
| workflow.add_conditional_edges("detect_scam", lambda state: "extract_intelligence" if state["scamDetected"] else END) | |
| workflow.add_edge("extract_intelligence", "agent_persona_response") | |
| workflow.add_edge("agent_persona_response", "decide_engagement_end") | |
| workflow.add_conditional_edges("decide_engagement_end", lambda state: END if state["should_continue_engagement"] else ("final_callback" if not state.get("callbackSent", False) else END)) | |
| workflow.add_edge("final_callback", END) | |
| return workflow.compile(checkpointer=checkpoint_saver) |