""" Blockchain Utilities ==================== Retry logic, error handling, and transaction monitoring for XRP Ledger operations. """ import time from typing import Dict, Any, Optional, Callable from xrpl.clients import JsonRpcClient from xrpl.models import Response from xrpl.wallet import Wallet from xrpl.transaction import submit_and_wait import logging logger = logging.getLogger(__name__) class BlockchainError(Exception): """Custom exception for blockchain-related errors with user-friendly messages""" def __init__(self, technical_msg: str, user_msg: str, retryable: bool = False): self.technical_msg = technical_msg self.user_msg = user_msg self.retryable = retryable super().__init__(technical_msg) class BlockchainRetryHandler: """ Handles retry logic for blockchain transactions with exponential backoff. """ def __init__( self, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 10.0, exponential_base: float = 2.0 ): """ Initialize retry handler. Args: max_retries: Maximum number of retry attempts base_delay: Initial delay in seconds max_delay: Maximum delay between retries exponential_base: Base for exponential backoff calculation """ self.max_retries = max_retries self.base_delay = base_delay self.max_delay = max_delay self.exponential_base = exponential_base def _calculate_delay(self, attempt: int) -> float: """Calculate delay with exponential backoff""" delay = self.base_delay * (self.exponential_base ** attempt) return min(delay, self.max_delay) def execute_with_retry( self, operation: Callable, operation_name: str, *args, **kwargs ) -> Any: """ Execute a blockchain operation with retry logic. Args: operation: Function to execute operation_name: Human-readable name for logging *args, **kwargs: Arguments to pass to operation Returns: Result from the operation Raises: BlockchainError: If all retries fail """ last_exception = None for attempt in range(self.max_retries + 1): try: logger.info(f"[BLOCKCHAIN] Attempting {operation_name} (attempt {attempt + 1}/{self.max_retries + 1})") result = operation(*args, **kwargs) if attempt > 0: logger.info(f"[BLOCKCHAIN] ✅ {operation_name} succeeded after {attempt} retries") return result except Exception as e: last_exception = e error_msg = str(e) # Check if error is retryable is_retryable = self._is_retryable_error(error_msg) if not is_retryable or attempt >= self.max_retries: logger.error(f"[BLOCKCHAIN] [ERROR] {operation_name} failed permanently: {error_msg}") raise self._convert_to_user_friendly_error(e, operation_name) # Calculate delay and retry delay = self._calculate_delay(attempt) logger.warning( f"[BLOCKCHAIN] [WARNING] {operation_name} failed (attempt {attempt + 1}): {error_msg}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) # Should never reach here, but just in case raise self._convert_to_user_friendly_error( last_exception or Exception("Unknown error"), operation_name ) def _is_retryable_error(self, error_msg: str) -> bool: """ Determine if an error is retryable based on error message. Retryable errors include: - Network issues - Temporary server errors - Sequence number mismatches - Insufficient XRP (user might fund wallet) Non-retryable errors include: - Invalid signatures - Malformed transactions - Insufficient token balance (not XRP) """ error_lower = error_msg.lower() # Retryable errors retryable_keywords = [ "timeout", "connection", "network", "sequence", "telINSUF_FEE_P", # Fee too low (can retry with higher fee) "tefPAST_SEQ", # Sequence already used "terQUEUED", # Transaction queued ] for keyword in retryable_keywords: if keyword in error_lower: return True # Non-retryable errors non_retryable_keywords = [ "signature", "malformed", "tecUNFUNDED_PAYMENT", # Insufficient token balance "tecNO_DST", # Destination doesn't exist "tecNO_PERMISSION", # Unauthorized "temBAD_FEE", # Invalid fee ] for keyword in non_retryable_keywords: if keyword in error_lower: return False # Default: retry on unknown errors (conservative approach) return True def _convert_to_user_friendly_error( self, exception: Exception, operation_name: str ) -> BlockchainError: """ Convert technical blockchain errors to user-friendly messages. """ error_msg = str(exception).lower() # Map technical errors to user messages if "insuf" in error_msg and "fee" in error_msg: return BlockchainError( technical_msg=str(exception), user_msg="Transaction fee too low. Please try again.", retryable=True ) if "tecUNFUNDED_PAYMENT" in error_msg or "insufficient" in error_msg: return BlockchainError( technical_msg=str(exception), user_msg="Insufficient balance to complete this transaction.", retryable=False ) if "sequence" in error_msg: return BlockchainError( technical_msg=str(exception), user_msg="Transaction ordering issue. Please try again.", retryable=True ) if "timeout" in error_msg or "connection" in error_msg: return BlockchainError( technical_msg=str(exception), user_msg="Network connection issue. Please check your internet and try again.", retryable=True ) if "signature" in error_msg or "unauthorized" in error_msg: return BlockchainError( technical_msg=str(exception), user_msg="Authentication failed. Please contact support.", retryable=False ) if "tecNO_DST" in error_msg: return BlockchainError( technical_msg=str(exception), user_msg="Recipient wallet not activated. Please ensure the wallet is funded with XRP.", retryable=False ) # Default user-friendly message return BlockchainError( technical_msg=str(exception), user_msg=f"Transaction failed: {operation_name}. Please try again or contact support if the issue persists.", retryable=True ) class TransactionMonitor: """ Monitor blockchain transaction status and provide detailed feedback. """ @staticmethod def submit_and_monitor( client: JsonRpcClient, transaction: Any, wallet: Wallet, operation_name: str = "Transaction" ) -> Response: """ Submit a transaction and monitor its status with detailed logging. Args: client: XRP Ledger client transaction: Transaction to submit wallet: Wallet to sign with operation_name: Human-readable operation name Returns: Transaction response Raises: BlockchainError: If transaction fails """ logger.info(f"[TX_MONITOR] 📤 Submitting {operation_name}...") logger.debug(f"[TX_MONITOR] Transaction details: {transaction.to_dict()}") try: # Submit and wait for validation response = submit_and_wait(transaction, client, wallet) # Check result result = response.result metadata = result.get("meta", {}) tx_result = metadata.get("TransactionResult", "unknown") logger.info(f"[TX_MONITOR] Transaction hash: {result.get('hash', 'N/A')}") logger.info(f"[TX_MONITOR] Result code: {tx_result}") # Success codes start with "tes" (tesSUCCESS) if tx_result.startswith("tes"): logger.info(f"[TX_MONITOR] ✅ {operation_name} succeeded") return response # Error codes error_msg = f"{operation_name} failed with code: {tx_result}" logger.error(f"[TX_MONITOR] [ERROR] {error_msg}") raise BlockchainError( technical_msg=error_msg, user_msg=TransactionMonitor._get_user_message_for_code(tx_result), retryable=tx_result.startswith("ter") # "ter" = retry, "tec" = claimed fee, "tem"/"tef" = malformed ) except BlockchainError: raise except Exception as e: logger.error(f"[TX_MONITOR] [ERROR] {operation_name} exception: {str(e)}") raise BlockchainError( technical_msg=str(e), user_msg=f"{operation_name} failed. Please try again.", retryable=True ) @staticmethod def _get_user_message_for_code(result_code: str) -> str: """Map XRP Ledger result codes to user-friendly messages""" code_map = { "tecUNFUNDED_PAYMENT": "Insufficient balance to complete this transaction.", "tecNO_DST": "Destination wallet not found or not activated.", "tecNO_LINE": "Trust line not established. Please enable the token first.", "tecNO_PERMISSION": "You don't have permission to perform this action.", "tecINSUFFICIENT_RESERVE": "Insufficient XRP reserve. You need at least 10 XRP in your wallet.", "tecPATH_DRY": "No available path for this transaction.", "terQUEUED": "Transaction queued. Please wait a moment and try again.", "tefPAST_SEQ": "Transaction already processed. Please refresh and check your balance.", } return code_map.get(result_code, f"Transaction failed with code {result_code}. Please contact support.") # Global retry handler instance retry_handler = BlockchainRetryHandler( max_retries=3, base_delay=1.0, max_delay=10.0 )