atriumchain-api / utils /blockchain_utils.py
Jainish1808's picture
Upload folder using huggingface_hub
4e4664a verified
"""
Blockchain Utilities
====================
Retry logic, error handling, and transaction monitoring for XRP Ledger operations.
"""
import time
from typing import Dict, Any, Optional, Callable
from xrpl.clients import JsonRpcClient
from xrpl.models import Response
from xrpl.wallet import Wallet
from xrpl.transaction import submit_and_wait
import logging
logger = logging.getLogger(__name__)
class BlockchainError(Exception):
"""Custom exception for blockchain-related errors with user-friendly messages"""
def __init__(self, technical_msg: str, user_msg: str, retryable: bool = False):
self.technical_msg = technical_msg
self.user_msg = user_msg
self.retryable = retryable
super().__init__(technical_msg)
class BlockchainRetryHandler:
"""
Handles retry logic for blockchain transactions with exponential backoff.
"""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 10.0,
exponential_base: float = 2.0
):
"""
Initialize retry handler.
Args:
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay between retries
exponential_base: Base for exponential backoff calculation
"""
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff"""
delay = self.base_delay * (self.exponential_base ** attempt)
return min(delay, self.max_delay)
def execute_with_retry(
self,
operation: Callable,
operation_name: str,
*args,
**kwargs
) -> Any:
"""
Execute a blockchain operation with retry logic.
Args:
operation: Function to execute
operation_name: Human-readable name for logging
*args, **kwargs: Arguments to pass to operation
Returns:
Result from the operation
Raises:
BlockchainError: If all retries fail
"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
logger.info(f"[BLOCKCHAIN] Attempting {operation_name} (attempt {attempt + 1}/{self.max_retries + 1})")
result = operation(*args, **kwargs)
if attempt > 0:
logger.info(f"[BLOCKCHAIN] ✅ {operation_name} succeeded after {attempt} retries")
return result
except Exception as e:
last_exception = e
error_msg = str(e)
# Check if error is retryable
is_retryable = self._is_retryable_error(error_msg)
if not is_retryable or attempt >= self.max_retries:
logger.error(f"[BLOCKCHAIN] [ERROR] {operation_name} failed permanently: {error_msg}")
raise self._convert_to_user_friendly_error(e, operation_name)
# Calculate delay and retry
delay = self._calculate_delay(attempt)
logger.warning(
f"[BLOCKCHAIN] [WARNING] {operation_name} failed (attempt {attempt + 1}): {error_msg}. "
f"Retrying in {delay:.1f}s..."
)
time.sleep(delay)
# Should never reach here, but just in case
raise self._convert_to_user_friendly_error(
last_exception or Exception("Unknown error"),
operation_name
)
def _is_retryable_error(self, error_msg: str) -> bool:
"""
Determine if an error is retryable based on error message.
Retryable errors include:
- Network issues
- Temporary server errors
- Sequence number mismatches
- Insufficient XRP (user might fund wallet)
Non-retryable errors include:
- Invalid signatures
- Malformed transactions
- Insufficient token balance (not XRP)
"""
error_lower = error_msg.lower()
# Retryable errors
retryable_keywords = [
"timeout",
"connection",
"network",
"sequence",
"telINSUF_FEE_P", # Fee too low (can retry with higher fee)
"tefPAST_SEQ", # Sequence already used
"terQUEUED", # Transaction queued
]
for keyword in retryable_keywords:
if keyword in error_lower:
return True
# Non-retryable errors
non_retryable_keywords = [
"signature",
"malformed",
"tecUNFUNDED_PAYMENT", # Insufficient token balance
"tecNO_DST", # Destination doesn't exist
"tecNO_PERMISSION", # Unauthorized
"temBAD_FEE", # Invalid fee
]
for keyword in non_retryable_keywords:
if keyword in error_lower:
return False
# Default: retry on unknown errors (conservative approach)
return True
def _convert_to_user_friendly_error(
self,
exception: Exception,
operation_name: str
) -> BlockchainError:
"""
Convert technical blockchain errors to user-friendly messages.
"""
error_msg = str(exception).lower()
# Map technical errors to user messages
if "insuf" in error_msg and "fee" in error_msg:
return BlockchainError(
technical_msg=str(exception),
user_msg="Transaction fee too low. Please try again.",
retryable=True
)
if "tecUNFUNDED_PAYMENT" in error_msg or "insufficient" in error_msg:
return BlockchainError(
technical_msg=str(exception),
user_msg="Insufficient balance to complete this transaction.",
retryable=False
)
if "sequence" in error_msg:
return BlockchainError(
technical_msg=str(exception),
user_msg="Transaction ordering issue. Please try again.",
retryable=True
)
if "timeout" in error_msg or "connection" in error_msg:
return BlockchainError(
technical_msg=str(exception),
user_msg="Network connection issue. Please check your internet and try again.",
retryable=True
)
if "signature" in error_msg or "unauthorized" in error_msg:
return BlockchainError(
technical_msg=str(exception),
user_msg="Authentication failed. Please contact support.",
retryable=False
)
if "tecNO_DST" in error_msg:
return BlockchainError(
technical_msg=str(exception),
user_msg="Recipient wallet not activated. Please ensure the wallet is funded with XRP.",
retryable=False
)
# Default user-friendly message
return BlockchainError(
technical_msg=str(exception),
user_msg=f"Transaction failed: {operation_name}. Please try again or contact support if the issue persists.",
retryable=True
)
class TransactionMonitor:
"""
Monitor blockchain transaction status and provide detailed feedback.
"""
@staticmethod
def submit_and_monitor(
client: JsonRpcClient,
transaction: Any,
wallet: Wallet,
operation_name: str = "Transaction"
) -> Response:
"""
Submit a transaction and monitor its status with detailed logging.
Args:
client: XRP Ledger client
transaction: Transaction to submit
wallet: Wallet to sign with
operation_name: Human-readable operation name
Returns:
Transaction response
Raises:
BlockchainError: If transaction fails
"""
logger.info(f"[TX_MONITOR] 📤 Submitting {operation_name}...")
logger.debug(f"[TX_MONITOR] Transaction details: {transaction.to_dict()}")
try:
# Submit and wait for validation
response = submit_and_wait(transaction, client, wallet)
# Check result
result = response.result
metadata = result.get("meta", {})
tx_result = metadata.get("TransactionResult", "unknown")
logger.info(f"[TX_MONITOR] Transaction hash: {result.get('hash', 'N/A')}")
logger.info(f"[TX_MONITOR] Result code: {tx_result}")
# Success codes start with "tes" (tesSUCCESS)
if tx_result.startswith("tes"):
logger.info(f"[TX_MONITOR] ✅ {operation_name} succeeded")
return response
# Error codes
error_msg = f"{operation_name} failed with code: {tx_result}"
logger.error(f"[TX_MONITOR] [ERROR] {error_msg}")
raise BlockchainError(
technical_msg=error_msg,
user_msg=TransactionMonitor._get_user_message_for_code(tx_result),
retryable=tx_result.startswith("ter") # "ter" = retry, "tec" = claimed fee, "tem"/"tef" = malformed
)
except BlockchainError:
raise
except Exception as e:
logger.error(f"[TX_MONITOR] [ERROR] {operation_name} exception: {str(e)}")
raise BlockchainError(
technical_msg=str(e),
user_msg=f"{operation_name} failed. Please try again.",
retryable=True
)
@staticmethod
def _get_user_message_for_code(result_code: str) -> str:
"""Map XRP Ledger result codes to user-friendly messages"""
code_map = {
"tecUNFUNDED_PAYMENT": "Insufficient balance to complete this transaction.",
"tecNO_DST": "Destination wallet not found or not activated.",
"tecNO_LINE": "Trust line not established. Please enable the token first.",
"tecNO_PERMISSION": "You don't have permission to perform this action.",
"tecINSUFFICIENT_RESERVE": "Insufficient XRP reserve. You need at least 10 XRP in your wallet.",
"tecPATH_DRY": "No available path for this transaction.",
"terQUEUED": "Transaction queued. Please wait a moment and try again.",
"tefPAST_SEQ": "Transaction already processed. Please refresh and check your balance.",
}
return code_map.get(result_code, f"Transaction failed with code {result_code}. Please contact support.")
# Global retry handler instance
retry_handler = BlockchainRetryHandler(
max_retries=3,
base_delay=1.0,
max_delay=10.0
)