"""Agent selection and heuristic baseline for FraudShield.""" from __future__ import annotations import logging import os from datetime import datetime from typing import Any, Dict, Optional from models import ActionTypeEnum, FraudCheckAction, ResolutionEnum logger = logging.getLogger(__name__) HIGH_VALUE_CATEGORIES = {"luxury", "electronics", "travel", "high_value_collectibles", "collectibles"} RISKY_PAYMENT_METHODS = {"prepaid_card", "gift_card", "crypto_gateway"} def get_env(*names: str, default: Optional[str] = None) -> Optional[str]: """Return the first non-empty environment variable from a list of aliases.""" for name in names: value = os.getenv(name) if value is not None: stripped = value.strip() if stripped: return stripped return default class SnapshotCalibratedFraudDetectionAgent: """Deterministic baseline tuned for the hidden-evidence workflow.""" name = "snapshot-calibrated-heuristic" agent_type = "heuristic" def decide(self, observation) -> FraudCheckAction: case_id = observation.case_id revealed = observation.revealed_evidence task_name = observation.task_name.value budget = int(observation.app_context.get("investigation_budget_remaining", 0)) item_category = str(observation.app_context.get("item_category", "")) amount = float(observation.case_summary.amount_usd) if "transaction_review" not in revealed: return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.REVIEW_TRANSACTION, reasoning="Open the transaction trace before any deeper investigation.", ) if task_name == "easy": if budget > 0 and "merchant_profile" not in revealed and ( amount >= 200.0 or item_category in HIGH_VALUE_CATEGORIES ): return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE, reasoning="A single merchant review is enough to confirm this easy case.", ) if observation.note_required: return self._note_action( case_id, "Reviewed the transaction trace and captured the visible merchant risk before routing.", ) return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.RESOLVE_CASE, resolution=self._resolve_easy(revealed), reasoning="The visible transaction pattern is sufficient for an easy-case route.", ) if task_name == "medium": if budget > 0 and "customer_profile" not in revealed: return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.FETCH_CUSTOMER_PROFILE, reasoning="Customer context is needed before deciding this mixed-signal case.", ) if budget > 0 and "policy_guide" not in revealed: return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.CHECK_POLICY, reasoning="Policy guidance helps separate a hold from a document request.", ) if budget > 0 and "merchant_profile" not in revealed and self._transaction_looks_risky(revealed): return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE, reasoning="Merchant context can resolve the remaining ambiguity in this medium case.", ) if observation.note_required: return self._note_action( case_id, "Reviewed the available customer, transaction, and policy evidence before routing the case.", ) return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.RESOLVE_CASE, resolution=self._resolve_medium(revealed), reasoning="The combined medium-case evidence supports a conservative final route.", ) if budget > 0 and "network_graph" not in revealed: return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.FETCH_NETWORK_GRAPH, reasoning="Hard cases usually need graph evidence before the routing becomes reliable.", ) if case_id.endswith("primary") and budget > 0 and "merchant_profile" not in revealed: return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE, reasoning="Merchant risk helps determine whether the primary hard case should escalate.", ) if case_id.endswith("secondary") and budget > 0 and "customer_profile" not in revealed: return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.FETCH_CUSTOMER_PROFILE, reasoning="Customer context helps determine whether the secondary hard case should block or hold.", ) if budget > 0 and "policy_guide" not in revealed: return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.CHECK_POLICY, reasoning="Policy is needed before choosing the final route on a hard case.", ) if observation.note_required: return self._note_action( case_id, "Captured the reviewed transaction, graph, and supporting evidence before closing the hard case.", ) return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.RESOLVE_CASE, resolution=self._resolve_hard(case_id, revealed), reasoning="The available hard-case evidence supports the strongest route currently justified.", ) def _note_action(self, case_id: str, note_text: str) -> FraudCheckAction: return FraudCheckAction( case_id=case_id, action_type=ActionTypeEnum.ADD_CASE_NOTE, note_text=note_text, ) def _transaction_looks_risky(self, revealed: Dict[str, Dict[str, Any]]) -> bool: facts = revealed.get("transaction_review", {}).get("facts", {}) if not facts: return False return bool( facts.get("payment_method") in RISKY_PAYMENT_METHODS or facts.get("same_address_orders_24h", 0) >= 4 or facts.get("device_country") != facts.get("shipping_country") or facts.get("shipping_speed") in {"overnight", "same-day"} ) def _resolve_easy(self, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum: facts = revealed["transaction_review"]["facts"] merchant = revealed.get("merchant_profile", {}).get("facts", {}) if ( facts.get("payment_method") in RISKY_PAYMENT_METHODS or facts.get("same_address_orders_24h", 0) >= 4 or facts.get("device_country") != facts.get("shipping_country") or merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.10 or merchant.get("seller_account_age_days", 9999) <= 45 ): return ResolutionEnum.BLOCK return ResolutionEnum.APPROVE def _resolve_medium(self, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum: facts = revealed["transaction_review"]["facts"] customer = revealed.get("customer_profile", {}).get("facts", {}) merchant = revealed.get("merchant_profile", {}).get("facts", {}) if not merchant and customer.get("buyer_disputes_90d", 0) >= 2: return ResolutionEnum.HOLD conflict_score = 0 if customer.get("buyer_disputes_90d", 0) >= 2: conflict_score += 1 if not customer.get("is_repeat_buyer", True): conflict_score += 1 if merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.06: conflict_score += 1 if facts.get("payment_method") in RISKY_PAYMENT_METHODS: conflict_score += 1 if conflict_score >= 3: return ResolutionEnum.HOLD if conflict_score >= 1: return ResolutionEnum.REQUEST_DOCS return ResolutionEnum.APPROVE def _resolve_hard(self, case_id: str, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum: network = revealed.get("network_graph", {}).get("facts", {}) merchant = revealed.get("merchant_profile", {}).get("facts", {}) customer = revealed.get("customer_profile", {}).get("facts", {}) if case_id.endswith("primary"): if network.get("cluster_alert_score", 0.0) >= 0.75 and network.get("linked_case_ids"): return ResolutionEnum.ESCALATE if merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.10: return ResolutionEnum.BLOCK return ResolutionEnum.HOLD if network.get("shared_device_accounts_24h", 0) >= 6 or network.get("previous_fraud_flags", 0) >= 1: return ResolutionEnum.BLOCK if customer.get("buyer_disputes_90d", 0) >= 2: return ResolutionEnum.HOLD return ResolutionEnum.HOLD def build_default_agent() -> object: """Build the best available agent for the current runtime.""" heuristic = SnapshotCalibratedFraudDetectionAgent() local_model_path = get_env("LOCAL_MODEL_PATH") hf_token = get_env("HF_TOKEN", "HUGGINGFACEHUB_API_TOKEN") api_key = get_env("API_KEY", "OPENAI_API_KEY", default=hf_token) model_name = get_env( "MODEL_NAME", default="Qwen/Qwen2.5-1.5B-Instruct" if hf_token and not local_model_path else "gpt-4o-mini", ) api_base_url = get_env( "API_BASE_URL", default="https://router.huggingface.co/v1" if hf_token and not local_model_path else None, ) if local_model_path: from llm_agent_openai import LocalModelFraudDetectionAgent return LocalModelFraudDetectionAgent( model_path=local_model_path, fallback_agent=heuristic, ) if api_key: from llm_agent_openai import LLMFraudDetectionAgent return LLMFraudDetectionAgent( model_name=model_name or "gpt-4o-mini", api_key=api_key, api_base_url=api_base_url, fallback_agent=heuristic, ) logger.warning("No LOCAL_MODEL_PATH or API_KEY found. Falling back to the calibrated heuristic baseline.") return heuristic