atriumchain-api / services /xrp_service.py
Jainish1808's picture
Upload folder using huggingface_hub
4e4664a verified
from typing import Optional, Tuple, Dict, Any, List
from xrpl.clients import JsonRpcClient
from xrpl.wallet import Wallet
from xrpl.account import get_balance
from xrpl.models.transactions import Payment, TrustSet, OfferCreate
from xrpl.transaction import submit_and_wait, autofill_and_sign
from xrpl.models.amounts import IssuedCurrencyAmount
from xrpl.models.requests import Tx, AccountLines
from xrpl.utils import xrp_to_drops, drops_to_xrp
from config import settings
import asyncio
import time
# Import blockchain utilities for retry and error handling
from utils.blockchain_utils import (
retry_handler,
TransactionMonitor,
BlockchainError
)
# Import monitoring and logging
from utils.logger import (
monitor_performance,
monitor_transaction,
get_logger,
log_info,
log_error,
log_warning
)
# Import price oracle for real-time rates
try:
from services.price_oracle import price_oracle
USE_LIVE_RATES = True
except ImportError:
USE_LIVE_RATES = False
print("[XRPL] Warning: Price oracle not available, using static rates")
class XRPLService:
def __init__(self):
self.client = JsonRpcClient(settings.XRPL_RPC_URL)
def generate_wallet(self) -> Tuple[str, str]:
# For demo: generate an in-memory wallet; in production use faucet for funding
wallet = Wallet.create()
# No auto-fund here; user should fund via faucet UI
return wallet.classic_address, wallet.seed
def get_issuer_wallet(self) -> Wallet:
"""Get the issuer wallet from settings (DEPRECATED - use admin wallet instead)"""
if not settings.ISSUER_SEED:
raise ValueError("ISSUER_SEED not configured. Please use admin wallet for token issuance.")
return Wallet.from_seed(settings.ISSUER_SEED)
def generate_currency_code(self, property_id: str) -> str:
"""Generate unique currency code for property in XRPL hex format"""
# Convert to 40-character hex format (XRPL standard for custom currencies)
# Use first 8 chars of property_id (MongoDB ObjectId) to ensure uniqueness
# Format: PROP{first_8_chars_of_id}
property_suffix = str(property_id)[:8].upper() # e.g., 68E769F9
base_code = f"PROP{property_suffix}" # e.g., PROP68E769F9
# Convert to hex and pad to 40 characters (20 bytes)
hex_code = base_code.encode('ascii').hex().upper()
# Pad with zeros to reach exactly 40 characters
return hex_code.ljust(40, '0')
@monitor_performance("get_xrp_balance")
def get_xrp_balance(self, address: str) -> float:
try:
from xrpl.models.requests import AccountInfo
account_info_request = AccountInfo(account=address)
response = self.client.request(account_info_request)
if response.is_successful() and 'account_data' in response.result:
balance_drops = response.result['account_data'].get('Balance', '0')
print(f"Debug: Retrieved balance {balance_drops} drops for address {address}")
return float(balance_drops) / 1_000_000
else:
print(f"Debug: Account not found or inactive: {address}")
return 0.0
except Exception as e:
print(f"Error getting XRP balance for {address}: {e}")
return 0.0
def calculate_xrp_cost(self, token_amount: int, price_per_token_aed: int) -> float:
"""Calculate XRP cost for token purchase using live exchange rates.
This now uses a real-time price oracle to fetch current XRP/AED rates
from multiple exchanges (CoinGecko, CoinDCX, Binance).
Falls back to configured rate if live feeds are unavailable.
"""
aed_cost = token_amount * price_per_token_aed
if USE_LIVE_RATES:
try:
xrp_cost = price_oracle.get_aed_to_xrp(aed_cost)
print(f"[XRPL] Using live rate: {aed_cost} AED = {xrp_cost:.6f} XRP")
return max(xrp_cost, 0.000001) # Minimum 1 drop
except Exception as e:
print(f"[XRPL] Error fetching live rate: {e}, using fallback")
# Fallback to static rate
rate = max(settings.XRPL_RATE_AED_TO_XRP, 0.0000001)
xrp_cost = aed_cost * rate
print(f"[XRPL] Using static rate: {aed_cost} AED = {xrp_cost:.6f} XRP")
return max(xrp_cost, 0.000001)
@monitor_transaction("property_token_creation")
@monitor_performance("create_property_token")
def create_property_token(self, property_id: str, total_tokens: int, issuer_seed: str = None) -> Dict[str, Any]:
"""Create and issue tokens for a property on XRPL"""
try:
if not issuer_seed:
issuer_seed = settings.ISSUER_SEED
if not issuer_seed:
raise ValueError("No issuer seed provided")
issuer = Wallet.from_seed(issuer_seed)
currency_code = self.generate_currency_code(property_id)
# Create token metadata record
return {
'success': True,
'currency_code': currency_code,
'issuer_address': issuer.classic_address,
'total_supply': total_tokens,
'property_id': property_id
}
except Exception as e:
print(f"Error creating property token: {e}")
return {'success': False, 'error': str(e)}
@monitor_transaction("trustline_setup")
@monitor_performance("setup_user_trustline")
def setup_user_trustline(self, user_seed: str, currency: str, issuer: str) -> Dict[str, Any]:
"""Setup trustline for user to receive tokens - WITH RETRY"""
try:
def create_trustline_with_retry():
result = self.create_trustline(user_seed, currency, issuer)
return result
result = retry_handler.execute_with_retry(
create_trustline_with_retry,
"Trustline Setup"
)
return {
'success': True,
'tx_hash': result.get('tx_json', {}).get('hash') or result.get('hash'),
'result': result
}
except BlockchainError as e:
print(f"Trustline setup failed: {e.technical_msg}")
return {'success': False, 'error': e.user_msg}
except Exception as e:
print(f"Error setting up trustline: {e}")
return {'success': False, 'error': 'Failed to set up token trust line. Please try again.'}
@monitor_transaction("token_transfer_to_issuer")
@monitor_performance("transfer_tokens_to_issuer")
def transfer_tokens_to_issuer(self, seller_seed: str, currency: str, issuer: str, token_amount: int, issuer_seed: str = None) -> Dict[str, Any]:
"""Transfer tokens from user back to the issuer (for sell-back functionality).
IMPORTANT: When you send IOU tokens back to the issuer, they are effectively "burned" or "redeemed".
The issuer's balance of their own currency doesn't increase - the tokens simply cease to exist.
This is the correct behavior for a sell-back/redemption scenario.
Args:
seller_seed: Seller's wallet seed
currency: Property token currency code
issuer: Issuer wallet address
token_amount: Number of tokens to transfer back
issuer_seed: Property's dedicated issuer seed (optional)
Returns:
Dict with success status, transaction hash, and details
"""
try:
if settings.XRPL_TOKEN_MODEL.upper() != 'IOU':
return {
'success': False,
'error': 'Token transfer requires IOU token model'
}
seller = Wallet.from_seed(seller_seed)
print(f"[XRPL] Burning {token_amount} tokens by sending to issuer {issuer}")
print(f" Seller: {seller.classic_address}")
print(f" Currency: {currency}")
print(f" Note: Tokens sent to issuer are automatically burned/redeemed")
# Create payment to send tokens back to issuer (this burns them)
# The issuer doesn't need a trustline to receive their own tokens
payment = Payment(
account=seller.classic_address,
destination=issuer,
amount=IssuedCurrencyAmount(
currency=currency,
issuer=issuer,
value=str(token_amount)
),
# Add Flags to allow partial payments if needed
# But for burning, this should work directly
)
# Sign and submit transaction with retry logic
try:
signed_tx = autofill_and_sign(payment, self.client, seller)
result = submit_and_wait(signed_tx, self.client)
# Check if transaction was successful
# For IOU burns, we expect tesSUCCESS
result_code = result.result.get('meta', {}).get('TransactionResult', result.result.get('engine_result', 'UNKNOWN'))
if result.is_successful() or result_code == 'tesSUCCESS':
tx_hash = result.result.get('hash')
print(f"[XRPL] ✅ Tokens burned successfully. TX: {tx_hash}")
print(f" Result code: {result_code}")
return {
'success': True,
'tx_hash': tx_hash,
'token_amount': token_amount,
'seller_address': seller.classic_address,
'issuer_address': issuer,
'action': 'burned',
'result': result.result
}
else:
error_code = result.result.get('engine_result', 'UNKNOWN')
error_msg = result.result.get('engine_result_message', 'Transaction failed')
print(f"[XRPL] ❌ Token burn failed: {error_code} - {error_msg}")
# Provide more helpful error messages
if error_code == 'tecPATH_PARTIAL':
return {
'success': False,
'error': 'Insufficient token balance or trustline issue. Please verify your token balance.'
}
elif error_code == 'tecUNFUNDED_PAYMENT':
return {
'success': False,
'error': 'Insufficient XRP for transaction fee. Please add XRP to your wallet.'
}
else:
return {
'success': False,
'error': f'Transaction failed: {error_msg}'
}
except Exception as tx_error:
print(f"[XRPL] Transaction submission error: {tx_error}")
return {
'success': False,
'error': f'Failed to submit transaction: {str(tx_error)}'
}
except Exception as e:
print(f"[XRPL] Error in transfer_tokens_to_issuer: {e}")
return {'success': False, 'error': str(e)}
@monitor_transaction("token_purchase")
@monitor_performance("purchase_tokens_with_xrp")
def purchase_tokens_with_xrp(self, buyer_seed: str, currency: str, issuer: str, token_amount: int, xrp_cost: float, issuer_seed: str = None) -> Dict[str, Any]:
"""Execute IOU purchase (Payment + TrustSet + IOU issuance).
Args:
buyer_seed: Buyer's wallet seed
currency: Property token currency code
issuer: Issuer wallet address
token_amount: Number of tokens to purchase
xrp_cost: Cost in XRP
issuer_seed: Property's dedicated issuer seed (NEW - per-property wallet)
Uses IOU tokens for fractional real estate ownership.
"""
try:
if settings.XRPL_TOKEN_MODEL.upper() != 'IOU':
return {
'success': False,
'error': 'IOU purchase path disabled (XRPL_TOKEN_MODEL must be IOU)'
}
# Use property-specific issuer seed if provided, otherwise fall back to master
effective_issuer_seed = issuer_seed or settings.ISSUER_SEED
if effective_issuer_seed:
issuer_wallet = Wallet.from_seed(effective_issuer_seed)
print(f"[XRPL] Using {'property-specific' if issuer_seed else 'master'} issuer wallet: {issuer_wallet.classic_address}")
else:
return {
'success': False,
'error': 'No issuer seed available; cannot issue IOU tokens'
}
buyer = Wallet.from_seed(buyer_seed)
# Note: In this system, the buyer pays with AED (fiat) in their platform wallet.
# The XRP transaction is just for blockchain record-keeping and IOU token issuance.
# The buyer needs minimal XRP only for transaction fees, not the full purchase amount.
# The property wallet (issuer) needs sufficient XRP reserves to issue IOU tokens.
print(f"=== EXECUTING TOKEN PURCHASE ===")
print(f"Buyer: {buyer.classic_address}")
print(f"Issuer: {issuer_wallet.classic_address}")
print(f"Token amount: {token_amount}")
print(f"XRP cost: {xrp_cost}")
# Step 1: OPTIONAL XRP payment to issuer (for blockchain record-keeping)
# Note: Users pay in AED via platform wallet. This XRP payment is optional
# and only executed if buyer has sufficient XRP balance.
payment_tx_hash = None
buyer_balance = self.get_xrp_balance(buyer.classic_address)
if buyer_balance >= xrp_cost + 0.01: # Has enough XRP for payment + fees
print(f"[OPTIONAL] Buyer has sufficient XRP ({buyer_balance} XRP), sending payment...")
def send_xrp_payment():
from xrpl.transaction import sign_and_submit
xrp_payment = Payment(
account=buyer.classic_address,
amount=str(int(xrp_to_drops(xrp_cost))), # Convert to string drops
destination=issuer_wallet.classic_address
)
# Use sign_and_submit method
payment_result = sign_and_submit(xrp_payment, self.client, buyer)
if payment_result.result.get('engine_result') != 'tesSUCCESS':
raise Exception(f'XRP payment failed: {payment_result.result.get("engine_result_message", "Unknown error")}')
return payment_result
try:
payment_result = retry_handler.execute_with_retry(
send_xrp_payment,
"XRP Payment to Issuer"
)
payment_tx_hash = payment_result.result.get('tx_json', {}).get('hash')
print(f"✅ XRP payment successful: {payment_tx_hash}")
except BlockchainError as e:
print(f"[WARNING] XRP payment failed (non-critical): {e.user_msg}")
# Continue anyway - payment is optional
else:
print(f"[SKIP] Buyer has insufficient XRP ({buyer_balance} XRP < {xrp_cost + 0.01} XRP required)")
print(f"[INFO] Skipping XRP payment - user paid in AED via platform wallet")
print(f"[INFO] Will issue IOU tokens directly")
# Step 2: Setup trustline (only if not already present)
if not self._has_trustline(buyer.classic_address, currency, issuer):
trustline_result = self.setup_user_trustline(buyer_seed, currency, issuer)
if not trustline_result['success']:
print(f"Trustline setup issue (continuing): {trustline_result}")
else:
print("Skipping trustline creation – already exists.")
# Step 3: Issuer sends tokens to buyer - WITH RETRY
def send_token_transfer():
result = self.issue_tokens(
issuer_seed=effective_issuer_seed,
destination=buyer.classic_address,
currency=currency,
amount=str(token_amount)
)
if 'error' in result:
raise Exception(f'Token issuance failed: {result["error"]}')
return result
try:
token_transfer = retry_handler.execute_with_retry(
send_token_transfer,
"Token Transfer to Buyer"
)
except BlockchainError as e:
return {
'success': False,
'error': e.user_msg
}
token_tx_hash = token_transfer.get('tx_json', {}).get('hash') or token_transfer.get('hash')
print(f"✅ Token transfer successful: {token_tx_hash}")
return {
'success': True,
'payment_tx_hash': payment_tx_hash,
'token_tx_hash': token_tx_hash,
'token_amount': token_amount,
'xrp_cost': xrp_cost,
'buyer_address': buyer.classic_address,
'issuer_address': issuer_wallet.classic_address
}
except BlockchainError as e:
# Return user-friendly error message
return {'success': False, 'error': e.user_msg}
except Exception as e:
# Sanitize outward error while logging locally
print(f"Error in token purchase: {e}")
if settings.XRPL_NETWORK == 'testnet':
import traceback
traceback.print_exc()
return {'success': False, 'error': 'Transaction failed. Please try again or contact support.'}
@monitor_performance("get_user_token_balance")
def get_user_token_balance(self, address: str, currency: str, issuer: str) -> float:
"""Get specific token balance for user"""
try:
token_balances = self.get_token_balances(address)
for token in token_balances:
if token['currency'] == currency and token['issuer'] == issuer:
return float(token['balance'])
return 0.0
except Exception as e:
print(f"Error getting user token balance: {e}")
return 0.0
def verify_transaction_status(self, tx_hash: str) -> Dict[str, Any]:
"""Verify if a transaction was successful"""
try:
tx_data = self.get_tx(tx_hash)
if 'TransactionResult' in tx_data:
success = tx_data['TransactionResult'] == 'tesSUCCESS'
return {
'success': success,
'status': tx_data['TransactionResult'],
'tx_data': tx_data
}
return {'success': False, 'error': 'Transaction not found'}
except Exception as e:
return {'success': False, 'error': str(e)}
# Duplicate methods removed - using the ones defined below
@monitor_transaction("trustline_creation")
@monitor_performance("create_trustline")
def create_trustline(self, holder_seed: str, currency: str, issuer: str, limit_amount: str = "1000000000") -> Dict[str, Any]:
try:
from xrpl.transaction import sign_and_submit
holder = Wallet.from_seed(holder_seed)
trust_set_tx = TrustSet(
account=holder.classic_address,
limit_amount=IssuedCurrencyAmount(currency=currency, issuer=issuer, value=limit_amount),
)
result = sign_and_submit(trust_set_tx, self.client, holder)
return result.result
except Exception as e:
print(f"Error in create_trustline: {e}")
return {'error': str(e)}
def _has_trustline(self, address: str, currency: str, issuer: str) -> bool:
"""Check if an account already has a trustline for the given currency/issuer."""
try:
from xrpl.models.requests import AccountLines
resp = self.client.request(AccountLines(account=address))
if not resp.is_successful():
return False
for line in resp.result.get('lines', []):
if line.get('currency') == currency and line.get('account') == issuer:
return True
return False
except Exception:
return False
@monitor_transaction("token_issuance")
@monitor_performance("issue_tokens")
def issue_tokens(self, issuer_seed: str, destination: str, currency: str, amount: str) -> Dict[str, Any]:
try:
from xrpl.transaction import sign_and_submit
issuer = Wallet.from_seed(issuer_seed)
# Create IOU amount for the token
token_amount = IssuedCurrencyAmount(currency=currency, issuer=issuer.classic_address, value=amount)
payment = Payment(
account=issuer.classic_address,
amount=token_amount,
destination=destination,
send_max=token_amount, # Required for currency conversion/token issuance
)
result = sign_and_submit(payment, self.client, issuer)
return result.result
except Exception as e:
print(f"Error in issue_tokens: {e}")
return {'error': str(e)}
# Note: Explicit on-ledger OfferCreate helper was removed as unused in app flows
@monitor_performance("get_token_balances")
def get_token_balances(self, address: str) -> List[Dict[str, Any]]:
"""Get IOU token balances for an address"""
try:
account_lines_request = AccountLines(account=address)
response = self.client.request(account_lines_request)
if response.is_successful() and 'lines' in response.result:
tokens = []
for line in response.result['lines']:
tokens.append({
'currency': line.get('currency', ''),
'issuer': line.get('account', ''),
'balance': float(line.get('balance', '0')),
'limit': line.get('limit', '0')
})
print(f"Debug: Found {len(tokens)} token balances for {address}")
return tokens
else:
print(f"Debug: No token lines found for {address}")
return []
except Exception as e:
print(f"Error getting token balances for {address}: {e}")
return []
def get_tx(self, tx_hash: str) -> Dict[str, Any]:
req = Tx(transaction=tx_hash)
resp = self.client.request(req)
return resp.result
@monitor_transaction("kyc_blockchain_write")
@monitor_performance("write_kyc_to_blockchain")
async def write_kyc_to_blockchain(self, kyc_data: Dict[str, Any], issuer_seed: str = None) -> Dict[str, Any]:
"""
Write KYC approval data to XRP Ledger as permanent proof (ASYNC)
Stores user data + IPFS document hash in transaction memo field
Args:
kyc_data: {
'user_id': str,
'name': str,
'email': str,
'phone': str,
'address': str,
'ipfs_hash': str, # IPFS CID of KYC document
'approved_by': str,
'approved_at': str
}
issuer_seed: Admin/issuer wallet seed (defaults to settings.ISSUER_SEED)
Returns:
{
'success': bool,
'tx_hash': str, # XRP Ledger transaction hash
'explorer_url': str, # Link to view on explorer
'memo_data': dict # Data written to blockchain
}
"""
try:
if not issuer_seed:
issuer_seed = settings.ISSUER_SEED
if not issuer_seed:
raise ValueError("No issuer seed provided for KYC blockchain write")
issuer = Wallet.from_seed(issuer_seed)
print(f"[XRPL-KYC] Writing KYC data to blockchain...")
print(f"[XRPL-KYC] User: {kyc_data.get('name')} ({kyc_data.get('email')})")
print(f"[XRPL-KYC] IPFS Hash: {kyc_data.get('ipfs_hash')}")
# Prepare memo data (max 1KB for XRP Ledger)
memo_data = {
"type": "KYC_APPROVAL",
"user_id": kyc_data.get('user_id'),
"name": kyc_data.get('name'),
"email": kyc_data.get('email'),
"phone": kyc_data.get('phone'),
"address": kyc_data.get('address'),
"ipfs_doc": kyc_data.get('ipfs_hash'),
"approved_at": kyc_data.get('approved_at'),
"approved_by": kyc_data.get('approved_by')
}
# Convert to JSON string and check size
import json
memo_json = json.dumps(memo_data, separators=(',', ':')) # Compact JSON
memo_size = len(memo_json.encode('utf-8'))
if memo_size > 1024:
print(f"[XRPL-KYC] [WARNING] Memo too large ({memo_size} bytes), truncating...")
# Remove address if too large
memo_data.pop('address', None)
memo_json = json.dumps(memo_data, separators=(',', ':'))
print(f"[XRPL-KYC] Memo size: {len(memo_json.encode('utf-8'))} bytes")
# Create AccountSet transaction with memo (no destination needed, just stores data on ledger)
from xrpl.models.transactions import Memo, AccountSet
from xrpl.transaction import sign_and_submit
# Convert memo to hex (required by XRPL)
memo_hex = memo_json.encode('utf-8').hex().upper()
# Use AccountSet transaction to write data to blockchain without sending XRP
account_set_tx = AccountSet(
account=issuer.classic_address,
memos=[
Memo(
memo_data=memo_hex,
memo_type="4B59435F415050524F56414C".upper(), # "KYC_APPROVAL" in hex
memo_format="6A736F6E".upper() # "json" in hex
)
]
)
# Run blockchain submission in thread pool to avoid blocking event loop
# This allows async function to work with sync blockchain library
import asyncio
from concurrent.futures import ThreadPoolExecutor
def _submit_transaction_sync():
"""Synchronous transaction submission - runs in thread pool"""
from xrpl.transaction import submit_and_wait
try:
# Use submit_and_wait which handles signing automatically
response = submit_and_wait(
transaction=account_set_tx,
client=self.client,
wallet=issuer
)
return response
except Exception as submit_error:
print(f"[XRPL-KYC] ERROR: Transaction submission failed: {submit_error}")
raise Exception(f'Transaction signing failed: {str(submit_error)}')
# Execute sync function in thread pool and await result
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(None, _submit_transaction_sync)
except Exception as exec_error:
return {
'success': False,
'error': f'Transaction execution failed: {str(exec_error)}'
}
# submit_and_wait returns the full response with metadata
# Check if transaction was successful
if result.result.get('meta', {}).get('TransactionResult') == 'tesSUCCESS':
tx_hash = result.result.get('hash')
explorer_url = f"{settings.XRPL_EXPLORER_BASE}{tx_hash}"
print(f"[XRPL-KYC] SUCCESS: KYC data written to blockchain!")
print(f"[XRPL-KYC] TX Hash: {tx_hash}")
print(f"[XRPL-KYC] Explorer: {explorer_url}")
return {
'success': True,
'tx_hash': tx_hash,
'explorer_url': explorer_url,
'memo_data': memo_data,
'ipfs_hash': kyc_data.get('ipfs_hash')
}
else:
error_code = result.result.get('meta', {}).get('TransactionResult', 'Unknown')
error_msg = result.result.get('engine_result_message', error_code)
print(f"[XRPL-KYC] ERROR: Transaction failed: {error_msg}")
return {
'success': False,
'error': f'Blockchain write failed: {error_msg}'
}
except Exception as e:
print(f"[XRPL-KYC] ERROR: Error writing KYC to blockchain: {e}")
return {
'success': False,
'error': str(e)
}
xrpl_service = XRPLService()