Spaces:
Sleeping
Sleeping
File size: 10,804 Bytes
30533d1 ea9eade 3818a51 ea9eade 30533d1 ea9eade 3818a51 1510f7f ea9eade 3818a51 30533d1 3818a51 dbee4da 723cc80 1510f7f dbee4da 30533d1 ea9eade 30533d1 338316c 1510f7f 30533d1 4a01d54 1510f7f 30533d1 1510f7f 4a01d54 1510f7f 30533d1 1510f7f 30533d1 4a01d54 30533d1 338316c 1510f7f 30533d1 1510f7f 338316c 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 338316c 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 338316c 1510f7f 30533d1 338316c 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 1510f7f 30533d1 ea9eade 30533d1 eaf73e8 723cc80 30533d1 723cc80 30533d1 ea9eade 30533d1 ea9eade 30533d1 ea9eade 30533d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | """Agent selection and heuristic baseline for FraudShield."""
from __future__ import annotations
import logging
import os
from datetime import datetime
from typing import Any, Dict, Optional
from models import ActionTypeEnum, FraudCheckAction, ResolutionEnum
logger = logging.getLogger(__name__)
HIGH_VALUE_CATEGORIES = {"luxury", "electronics", "travel", "high_value_collectibles", "collectibles"}
RISKY_PAYMENT_METHODS = {"prepaid_card", "gift_card", "crypto_gateway"}
def get_env(*names: str, default: Optional[str] = None) -> Optional[str]:
"""Return the first non-empty environment variable from a list of aliases."""
for name in names:
value = os.getenv(name)
if value is not None:
stripped = value.strip()
if stripped:
return stripped
return default
class SnapshotCalibratedFraudDetectionAgent:
"""Deterministic baseline tuned for the hidden-evidence workflow."""
name = "snapshot-calibrated-heuristic"
agent_type = "heuristic"
def decide(self, observation) -> FraudCheckAction:
case_id = observation.case_id
revealed = observation.revealed_evidence
task_name = observation.task_name.value
budget = int(observation.app_context.get("investigation_budget_remaining", 0))
item_category = str(observation.app_context.get("item_category", ""))
amount = float(observation.case_summary.amount_usd)
if "transaction_review" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.REVIEW_TRANSACTION,
reasoning="Open the transaction trace before any deeper investigation.",
)
if task_name == "easy":
if budget > 0 and "merchant_profile" not in revealed and (
amount >= 200.0 or item_category in HIGH_VALUE_CATEGORIES
):
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE,
reasoning="A single merchant review is enough to confirm this easy case.",
)
if observation.note_required:
return self._note_action(
case_id,
"Reviewed the transaction trace and captured the visible merchant risk before routing.",
)
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.RESOLVE_CASE,
resolution=self._resolve_easy(revealed),
reasoning="The visible transaction pattern is sufficient for an easy-case route.",
)
if task_name == "medium":
if budget > 0 and "customer_profile" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_CUSTOMER_PROFILE,
reasoning="Customer context is needed before deciding this mixed-signal case.",
)
if budget > 0 and "policy_guide" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.CHECK_POLICY,
reasoning="Policy guidance helps separate a hold from a document request.",
)
if budget > 0 and "merchant_profile" not in revealed and self._transaction_looks_risky(revealed):
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE,
reasoning="Merchant context can resolve the remaining ambiguity in this medium case.",
)
if observation.note_required:
return self._note_action(
case_id,
"Reviewed the available customer, transaction, and policy evidence before routing the case.",
)
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.RESOLVE_CASE,
resolution=self._resolve_medium(revealed),
reasoning="The combined medium-case evidence supports a conservative final route.",
)
if budget > 0 and "network_graph" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_NETWORK_GRAPH,
reasoning="Hard cases usually need graph evidence before the routing becomes reliable.",
)
if case_id.endswith("primary") and budget > 0 and "merchant_profile" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE,
reasoning="Merchant risk helps determine whether the primary hard case should escalate.",
)
if case_id.endswith("secondary") and budget > 0 and "customer_profile" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_CUSTOMER_PROFILE,
reasoning="Customer context helps determine whether the secondary hard case should block or hold.",
)
if budget > 0 and "policy_guide" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.CHECK_POLICY,
reasoning="Policy is needed before choosing the final route on a hard case.",
)
if observation.note_required:
return self._note_action(
case_id,
"Captured the reviewed transaction, graph, and supporting evidence before closing the hard case.",
)
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.RESOLVE_CASE,
resolution=self._resolve_hard(case_id, revealed),
reasoning="The available hard-case evidence supports the strongest route currently justified.",
)
def _note_action(self, case_id: str, note_text: str) -> FraudCheckAction:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.ADD_CASE_NOTE,
note_text=note_text,
)
def _transaction_looks_risky(self, revealed: Dict[str, Dict[str, Any]]) -> bool:
facts = revealed.get("transaction_review", {}).get("facts", {})
if not facts:
return False
return bool(
facts.get("payment_method") in RISKY_PAYMENT_METHODS
or facts.get("same_address_orders_24h", 0) >= 4
or facts.get("device_country") != facts.get("shipping_country")
or facts.get("shipping_speed") in {"overnight", "same-day"}
)
def _resolve_easy(self, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum:
facts = revealed["transaction_review"]["facts"]
merchant = revealed.get("merchant_profile", {}).get("facts", {})
if (
facts.get("payment_method") in RISKY_PAYMENT_METHODS
or facts.get("same_address_orders_24h", 0) >= 4
or facts.get("device_country") != facts.get("shipping_country")
or merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.10
or merchant.get("seller_account_age_days", 9999) <= 45
):
return ResolutionEnum.BLOCK
return ResolutionEnum.APPROVE
def _resolve_medium(self, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum:
facts = revealed["transaction_review"]["facts"]
customer = revealed.get("customer_profile", {}).get("facts", {})
merchant = revealed.get("merchant_profile", {}).get("facts", {})
if not merchant and customer.get("buyer_disputes_90d", 0) >= 2:
return ResolutionEnum.HOLD
conflict_score = 0
if customer.get("buyer_disputes_90d", 0) >= 2:
conflict_score += 1
if not customer.get("is_repeat_buyer", True):
conflict_score += 1
if merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.06:
conflict_score += 1
if facts.get("payment_method") in RISKY_PAYMENT_METHODS:
conflict_score += 1
if conflict_score >= 3:
return ResolutionEnum.HOLD
if conflict_score >= 1:
return ResolutionEnum.REQUEST_DOCS
return ResolutionEnum.APPROVE
def _resolve_hard(self, case_id: str, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum:
network = revealed.get("network_graph", {}).get("facts", {})
merchant = revealed.get("merchant_profile", {}).get("facts", {})
customer = revealed.get("customer_profile", {}).get("facts", {})
if case_id.endswith("primary"):
if network.get("cluster_alert_score", 0.0) >= 0.75 and network.get("linked_case_ids"):
return ResolutionEnum.ESCALATE
if merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.10:
return ResolutionEnum.BLOCK
return ResolutionEnum.HOLD
if network.get("shared_device_accounts_24h", 0) >= 6 or network.get("previous_fraud_flags", 0) >= 1:
return ResolutionEnum.BLOCK
if customer.get("buyer_disputes_90d", 0) >= 2:
return ResolutionEnum.HOLD
return ResolutionEnum.HOLD
def build_default_agent() -> object:
"""Build the best available agent for the current runtime."""
heuristic = SnapshotCalibratedFraudDetectionAgent()
local_model_path = get_env("LOCAL_MODEL_PATH")
hf_token = get_env("HF_TOKEN", "HUGGINGFACEHUB_API_TOKEN")
api_key = get_env("API_KEY", "OPENAI_API_KEY", default=hf_token)
model_name = get_env(
"MODEL_NAME",
default="Qwen/Qwen2.5-1.5B-Instruct" if hf_token and not local_model_path else "gpt-4o-mini",
)
api_base_url = get_env(
"API_BASE_URL",
default="https://router.huggingface.co/v1" if hf_token and not local_model_path else None,
)
if local_model_path:
from llm_agent_openai import LocalModelFraudDetectionAgent
return LocalModelFraudDetectionAgent(
model_path=local_model_path,
fallback_agent=heuristic,
)
if api_key:
from llm_agent_openai import LLMFraudDetectionAgent
return LLMFraudDetectionAgent(
model_name=model_name or "gpt-4o-mini",
api_key=api_key,
api_base_url=api_base_url,
fallback_agent=heuristic,
)
logger.warning("No LOCAL_MODEL_PATH or API_KEY found. Falling back to the calibrated heuristic baseline.")
return heuristic
|