fraudshield-1 / llm_agent.py
DevikaJ2005's picture
Update agent selection for HF-hosted and local LLM paths
eaf73e8
"""Agent selection and heuristic baseline for FraudShield."""
from __future__ import annotations
import logging
import os
from datetime import datetime
from typing import Any, Dict, Optional
from models import ActionTypeEnum, FraudCheckAction, ResolutionEnum
logger = logging.getLogger(__name__)
HIGH_VALUE_CATEGORIES = {"luxury", "electronics", "travel", "high_value_collectibles", "collectibles"}
RISKY_PAYMENT_METHODS = {"prepaid_card", "gift_card", "crypto_gateway"}
def get_env(*names: str, default: Optional[str] = None) -> Optional[str]:
"""Return the first non-empty environment variable from a list of aliases."""
for name in names:
value = os.getenv(name)
if value is not None:
stripped = value.strip()
if stripped:
return stripped
return default
class SnapshotCalibratedFraudDetectionAgent:
"""Deterministic baseline tuned for the hidden-evidence workflow."""
name = "snapshot-calibrated-heuristic"
agent_type = "heuristic"
def decide(self, observation) -> FraudCheckAction:
case_id = observation.case_id
revealed = observation.revealed_evidence
task_name = observation.task_name.value
budget = int(observation.app_context.get("investigation_budget_remaining", 0))
item_category = str(observation.app_context.get("item_category", ""))
amount = float(observation.case_summary.amount_usd)
if "transaction_review" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.REVIEW_TRANSACTION,
reasoning="Open the transaction trace before any deeper investigation.",
)
if task_name == "easy":
if budget > 0 and "merchant_profile" not in revealed and (
amount >= 200.0 or item_category in HIGH_VALUE_CATEGORIES
):
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE,
reasoning="A single merchant review is enough to confirm this easy case.",
)
if observation.note_required:
return self._note_action(
case_id,
"Reviewed the transaction trace and captured the visible merchant risk before routing.",
)
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.RESOLVE_CASE,
resolution=self._resolve_easy(revealed),
reasoning="The visible transaction pattern is sufficient for an easy-case route.",
)
if task_name == "medium":
if budget > 0 and "customer_profile" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_CUSTOMER_PROFILE,
reasoning="Customer context is needed before deciding this mixed-signal case.",
)
if budget > 0 and "policy_guide" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.CHECK_POLICY,
reasoning="Policy guidance helps separate a hold from a document request.",
)
if budget > 0 and "merchant_profile" not in revealed and self._transaction_looks_risky(revealed):
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE,
reasoning="Merchant context can resolve the remaining ambiguity in this medium case.",
)
if observation.note_required:
return self._note_action(
case_id,
"Reviewed the available customer, transaction, and policy evidence before routing the case.",
)
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.RESOLVE_CASE,
resolution=self._resolve_medium(revealed),
reasoning="The combined medium-case evidence supports a conservative final route.",
)
if budget > 0 and "network_graph" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_NETWORK_GRAPH,
reasoning="Hard cases usually need graph evidence before the routing becomes reliable.",
)
if case_id.endswith("primary") and budget > 0 and "merchant_profile" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_MERCHANT_PROFILE,
reasoning="Merchant risk helps determine whether the primary hard case should escalate.",
)
if case_id.endswith("secondary") and budget > 0 and "customer_profile" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.FETCH_CUSTOMER_PROFILE,
reasoning="Customer context helps determine whether the secondary hard case should block or hold.",
)
if budget > 0 and "policy_guide" not in revealed:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.CHECK_POLICY,
reasoning="Policy is needed before choosing the final route on a hard case.",
)
if observation.note_required:
return self._note_action(
case_id,
"Captured the reviewed transaction, graph, and supporting evidence before closing the hard case.",
)
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.RESOLVE_CASE,
resolution=self._resolve_hard(case_id, revealed),
reasoning="The available hard-case evidence supports the strongest route currently justified.",
)
def _note_action(self, case_id: str, note_text: str) -> FraudCheckAction:
return FraudCheckAction(
case_id=case_id,
action_type=ActionTypeEnum.ADD_CASE_NOTE,
note_text=note_text,
)
def _transaction_looks_risky(self, revealed: Dict[str, Dict[str, Any]]) -> bool:
facts = revealed.get("transaction_review", {}).get("facts", {})
if not facts:
return False
return bool(
facts.get("payment_method") in RISKY_PAYMENT_METHODS
or facts.get("same_address_orders_24h", 0) >= 4
or facts.get("device_country") != facts.get("shipping_country")
or facts.get("shipping_speed") in {"overnight", "same-day"}
)
def _resolve_easy(self, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum:
facts = revealed["transaction_review"]["facts"]
merchant = revealed.get("merchant_profile", {}).get("facts", {})
if (
facts.get("payment_method") in RISKY_PAYMENT_METHODS
or facts.get("same_address_orders_24h", 0) >= 4
or facts.get("device_country") != facts.get("shipping_country")
or merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.10
or merchant.get("seller_account_age_days", 9999) <= 45
):
return ResolutionEnum.BLOCK
return ResolutionEnum.APPROVE
def _resolve_medium(self, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum:
facts = revealed["transaction_review"]["facts"]
customer = revealed.get("customer_profile", {}).get("facts", {})
merchant = revealed.get("merchant_profile", {}).get("facts", {})
if not merchant and customer.get("buyer_disputes_90d", 0) >= 2:
return ResolutionEnum.HOLD
conflict_score = 0
if customer.get("buyer_disputes_90d", 0) >= 2:
conflict_score += 1
if not customer.get("is_repeat_buyer", True):
conflict_score += 1
if merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.06:
conflict_score += 1
if facts.get("payment_method") in RISKY_PAYMENT_METHODS:
conflict_score += 1
if conflict_score >= 3:
return ResolutionEnum.HOLD
if conflict_score >= 1:
return ResolutionEnum.REQUEST_DOCS
return ResolutionEnum.APPROVE
def _resolve_hard(self, case_id: str, revealed: Dict[str, Dict[str, Any]]) -> ResolutionEnum:
network = revealed.get("network_graph", {}).get("facts", {})
merchant = revealed.get("merchant_profile", {}).get("facts", {})
customer = revealed.get("customer_profile", {}).get("facts", {})
if case_id.endswith("primary"):
if network.get("cluster_alert_score", 0.0) >= 0.75 and network.get("linked_case_ids"):
return ResolutionEnum.ESCALATE
if merchant.get("seller_chargeback_rate_30d", 0.0) >= 0.10:
return ResolutionEnum.BLOCK
return ResolutionEnum.HOLD
if network.get("shared_device_accounts_24h", 0) >= 6 or network.get("previous_fraud_flags", 0) >= 1:
return ResolutionEnum.BLOCK
if customer.get("buyer_disputes_90d", 0) >= 2:
return ResolutionEnum.HOLD
return ResolutionEnum.HOLD
def build_default_agent() -> object:
"""Build the best available agent for the current runtime."""
heuristic = SnapshotCalibratedFraudDetectionAgent()
local_model_path = get_env("LOCAL_MODEL_PATH")
hf_token = get_env("HF_TOKEN", "HUGGINGFACEHUB_API_TOKEN")
api_key = get_env("API_KEY", "OPENAI_API_KEY", default=hf_token)
model_name = get_env(
"MODEL_NAME",
default="Qwen/Qwen2.5-1.5B-Instruct" if hf_token and not local_model_path else "gpt-4o-mini",
)
api_base_url = get_env(
"API_BASE_URL",
default="https://router.huggingface.co/v1" if hf_token and not local_model_path else None,
)
if local_model_path:
from llm_agent_openai import LocalModelFraudDetectionAgent
return LocalModelFraudDetectionAgent(
model_path=local_model_path,
fallback_agent=heuristic,
)
if api_key:
from llm_agent_openai import LLMFraudDetectionAgent
return LLMFraudDetectionAgent(
model_name=model_name or "gpt-4o-mini",
api_key=api_key,
api_base_url=api_base_url,
fallback_agent=heuristic,
)
logger.warning("No LOCAL_MODEL_PATH or API_KEY found. Falling back to the calibrated heuristic baseline.")
return heuristic