Text Generation
English
opsiie
artificial-intelligence
self-centered-intelligence
sci
ai-assistant
multi-modal
image-generation
video-generation
music-generation
conversational-ai
blockchain
web3
facial-recognition
voice-synthesis
bioinformatics
financial-intelligence
text-classification
sentiment-analysis
token-classification
ner
question-answering
fill-mask
summarization
translation
text2text-generation
zero-shot-classification
image-classification
image-segmentation
object-detection
image-to-text
text-to-image
image-to-image
audio-classification
automatic-speech-recognition
text-to-speech
video-classification
depth-estimation
document-question-answering
visual-question-answering
zero-shot-image-classification
zero-shot-audio-classification
zero-shot-object-detection
feature-extraction
image-feature-extraction
mask-generation
table-question-answering
text-to-audio
| # web3.py | |
| import os | |
| import re | |
| import json | |
| from decimal import Decimal | |
| from dotenv import load_dotenv | |
| from web3 import Web3 | |
| from colorama import Fore, Style | |
| from kun import known_user_names | |
| import time | |
| import requests | |
| # Load environment variables from .env | |
| load_dotenv() | |
| # Retrieve the agent's private key and RPC URLs from the environment | |
| AGENT_PRIVATE_KEY = os.getenv("AGENT_PRIVATE_KEY") | |
| BASE_RPC_URL = os.getenv("BASE_RPC_URL") | |
| ETHEREUM_RPC_URL = os.getenv("ETHEREUM_RPC_URL") | |
| POLYGON_RPC_URL = os.getenv("POLYGON_RPC_URL") | |
| # Error handling if environment variables are missing | |
| if not AGENT_PRIVATE_KEY or not BASE_RPC_URL or not ETHEREUM_RPC_URL or not POLYGON_RPC_URL: | |
| print(Fore.RED + "Error: Missing environment variables. Check your .env file.") | |
| exit() | |
| # Initialize Web3 Connections for multiple chains | |
| CHAIN_INFO = { | |
| 'Base': { | |
| 'chain_id': 8453, | |
| 'rpc_url': BASE_RPC_URL, | |
| 'symbol': 'ETH', | |
| 'decimals': 18, | |
| 'explorer_url': 'https://basescan.org', | |
| }, | |
| 'Ethereum': { | |
| 'chain_id': 1, | |
| 'rpc_url': ETHEREUM_RPC_URL, | |
| 'symbol': 'ETH', | |
| 'decimals': 18, | |
| 'explorer_url': 'https://etherscan.io', | |
| }, | |
| 'Polygon': { | |
| 'chain_id': 137, | |
| 'rpc_url': POLYGON_RPC_URL, | |
| 'symbol': 'MATIC', | |
| 'decimals': 18, | |
| 'explorer_url': 'https://polygonscan.com', | |
| }, | |
| } | |
| # Token dictionary with human-readable names | |
| TOKENS = { | |
| 'Degen': { | |
| 'Base': '0x4ed4e862860bed51a9570b96d89af5e1b0efefed', | |
| 'Ethereum': '0xABCDEF1234567890ABCDEF1234567890ABCDEF12', | |
| 'Polygon': '0x1234567890ABCDEF1234567890ABCDEF12345678' | |
| }, | |
| 'USDC': { | |
| 'Base': '0x7F5c764cBc14f9669B88837ca1490cCa17c31607', | |
| 'Ethereum': '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606EB48', | |
| 'Polygon': '0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174' | |
| }, | |
| } | |
| # Web3 connection object and gas strategy | |
| web3_connection = None | |
| gas_strategy = 'medium' | |
| # Token ABI for ERC20 tokens (simplified) | |
| TOKEN_ABI = [ | |
| { | |
| "constant": True, | |
| "inputs": [], | |
| "name": "decimals", | |
| "outputs": [{"name": "", "type": "uint8"}], | |
| "payable": False, | |
| "stateMutability": "view", | |
| "type": "function" | |
| }, | |
| { | |
| "constant": True, | |
| "inputs": [], | |
| "name": "symbol", | |
| "outputs": [{"name": "", "type": "string"}], | |
| "payable": False, | |
| "stateMutability": "view", | |
| "type": "function" | |
| }, | |
| { | |
| "constant": False, | |
| "inputs": [ | |
| {"name": "_to", "type": "address"}, | |
| {"name": "_value", "type": "uint256"} | |
| ], | |
| "name": "transfer", | |
| "outputs": [], | |
| "payable": False, | |
| "stateMutability": "nonpayable", | |
| "type": "function" | |
| }, | |
| { | |
| "constant": False, | |
| "inputs": [ | |
| {"name": "_spender", "type": "address"}, | |
| {"name": "_value", "type": "uint256"} | |
| ], | |
| "name": "approve", | |
| "outputs": [{"name": "", "type": "bool"}], | |
| "payable": False, | |
| "stateMutability": "nonpayable", | |
| "type": "function" | |
| } | |
| ] | |
| # Add this near the top of the file with other constants | |
| ROUTER_ABI = [ | |
| # Quotes | |
| { | |
| "inputs": [ | |
| {"internalType": "uint256", "name": "amountOut", "type": "uint256"}, | |
| {"internalType": "address[]", "name": "path", "type": "address[]"} | |
| ], | |
| "name": "getAmountsIn", | |
| "outputs": [{"internalType": "uint256[]", "name": "amounts", "type": "uint256[]"}], | |
| "stateMutability": "view", | |
| "type": "function" | |
| }, | |
| { | |
| "inputs": [ | |
| {"internalType": "uint256", "name": "amountIn", "type": "uint256"}, | |
| {"internalType": "address[]", "name": "path", "type": "address[]"} | |
| ], | |
| "name": "getAmountsOut", | |
| "outputs": [{"internalType": "uint256[]", "name": "amounts", "type": "uint256[]"}], | |
| "stateMutability": "view", | |
| "type": "function" | |
| }, | |
| # Swapping | |
| { | |
| "inputs": [ | |
| {"internalType": "uint256", "name": "amountIn", "type": "uint256"}, | |
| {"internalType": "uint256", "name": "amountOutMin", "type": "uint256"}, | |
| {"internalType": "address[]", "name": "path", "type": "address[]"}, | |
| {"internalType": "address", "name": "to", "type": "address"}, | |
| {"internalType": "uint256", "name": "deadline", "type": "uint256"} | |
| ], | |
| "name": "swapExactTokensForTokens", | |
| "outputs": [{"internalType": "uint256[]", "name": "amounts", "type": "uint256[]"}], | |
| "stateMutability": "nonpayable", | |
| "type": "function" | |
| }, | |
| { | |
| "inputs": [ | |
| {"internalType": "uint256", "name": "amountOutMin", "type": "uint256"}, | |
| {"internalType": "address[]", "name": "path", "type": "address[]"}, | |
| {"internalType": "address", "name": "to", "type": "address"}, | |
| {"internalType": "uint256", "name": "deadline", "type": "uint256"} | |
| ], | |
| "name": "swapExactETHForTokens", | |
| "outputs": [{"internalType": "uint256[]", "name": "amounts", "type": "uint256[]"}], | |
| "stateMutability": "payable", | |
| "type": "function" | |
| }, | |
| { | |
| "inputs": [ | |
| {"internalType": "uint256", "name": "amountIn", "type": "uint256"}, | |
| {"internalType": "uint256", "name": "amountOutMin", "type": "uint256"}, | |
| {"internalType": "address[]", "name": "path", "type": "address[]"}, | |
| {"internalType": "address", "name": "to", "type": "address"}, | |
| {"internalType": "uint256", "name": "deadline", "type": "uint256"} | |
| ], | |
| "name": "swapExactTokensForETH", | |
| "outputs": [{"internalType": "uint256[]", "name": "amounts", "type": "uint256[]"}], | |
| "stateMutability": "nonpayable", | |
| "type": "function" | |
| }, | |
| # Factory | |
| { | |
| "inputs": [], | |
| "name": "factory", | |
| "outputs": [{"internalType": "address", "name": "", "type": "address"}], | |
| "stateMutability": "view", | |
| "type": "function" | |
| }, | |
| # WETH | |
| { | |
| "inputs": [], | |
| "name": "WETH", | |
| "outputs": [{"internalType": "address", "name": "", "type": "address"}], | |
| "stateMutability": "view", | |
| "type": "function" | |
| } | |
| ] | |
| class Web3Handler: | |
| def __init__(self, known_user_names): | |
| """Initialize Web3Handler with user data""" | |
| self.known_user_names = known_user_names | |
| self.web3_connection = None | |
| self.current_chain = None | |
| # Use the code's dictionaries as primary source | |
| self.CHAIN_INFO = CHAIN_INFO.copy() | |
| self.TOKENS = TOKENS.copy() | |
| # Get the directory for temporary JSON storage | |
| self.config_dir = os.path.dirname(os.path.abspath(__file__)) | |
| self.gas_strategy = 'medium' | |
| self.AGENT_WALLET = { | |
| 'private_key': AGENT_PRIVATE_KEY, | |
| 'public_key': Web3.to_checksum_address('0x89A7f83Db9C1919B89370182002ffE5dfFc03e21') | |
| } | |
| # Initialize connection to default chain | |
| self.connect_to_chain('Base') | |
| def save_chain_config(self): | |
| """Save chain configuration to JSON file""" | |
| with open('chain_config.json', 'w') as f: | |
| json.dump(self.CHAIN_INFO, f, indent=4) | |
| def save_token_config(self): | |
| """Save token configuration to JSON file""" | |
| with open('token_config.json', 'w') as f: | |
| json.dump(self.TOKENS, f, indent=4) | |
| def add_chain(self): | |
| """Add a new chain configuration""" | |
| try: | |
| print(Fore.YELLOW + "\nAdding new chain configuration:") | |
| # Get chain details | |
| chain_name = input("Chain name: ").strip() | |
| if chain_name in self.CHAIN_INFO: | |
| print(Fore.RED + "Chain already exists!") | |
| return | |
| try: | |
| chain_id = int(input("Chain ID: ").strip()) | |
| rpc_url = input("RPC URL: ").strip() | |
| symbol = input("Native token symbol: ").strip() | |
| explorer_url = input("Block explorer URL: ").strip() | |
| # Test connection | |
| web3 = Web3(Web3.HTTPProvider(rpc_url)) | |
| if not web3.is_connected(): | |
| raise ValueError("Unable to connect to RPC URL") | |
| # Update runtime dictionary | |
| self.CHAIN_INFO[chain_name] = { | |
| 'chain_id': chain_id, | |
| 'rpc_url': rpc_url, | |
| 'symbol': symbol, | |
| 'decimals': 18, | |
| 'explorer_url': explorer_url | |
| } | |
| # Save to temporary JSON for persistence until code is updated | |
| self.save_chain_config() | |
| # Update the Python file | |
| self._update_python_file_chain(chain_name) | |
| print(Fore.GREEN + f"\nChain {chain_name} added successfully!") | |
| except Exception as e: | |
| raise ValueError(f"Failed to add chain: {str(e)}") | |
| except Exception as e: | |
| raise ValueError(f"Failed to add chain: {str(e)}") | |
| def add_token(self): | |
| """Interactive token addition""" | |
| print(Fore.YELLOW + "\nAdding new token configuration:") | |
| # Get token details | |
| token_name = input("Token name: ").strip() | |
| chain_name = input("Chain name: ").strip() | |
| if chain_name not in self.CHAIN_INFO: | |
| print(Fore.RED + f"Chain {chain_name} not supported!") | |
| return | |
| try: | |
| contract_address = input("Contract address: ").strip() | |
| contract_address = Web3.to_checksum_address(contract_address) | |
| # Verify contract | |
| web3 = Web3(Web3.HTTPProvider(self.CHAIN_INFO[chain_name]['rpc_url'])) | |
| contract = web3.eth.contract(address=contract_address, abi=TOKEN_ABI) | |
| # Try to get token symbol | |
| symbol = contract.functions.symbol().call() | |
| # Add token | |
| if token_name not in self.TOKENS: | |
| self.TOKENS[token_name] = {} | |
| self.TOKENS[token_name][chain_name] = contract_address | |
| self.TOKENS[token_name]['symbol'] = symbol | |
| print(Fore.GREEN + f"\nToken {token_name} ({symbol}) added successfully on {chain_name}!") | |
| except Exception as e: | |
| print(Fore.RED + f"Error adding token: {str(e)}") | |
| def connect_to_chain(self, chain_name): | |
| """Establish a connection to the specified blockchain.""" | |
| if chain_name not in self.CHAIN_INFO: | |
| print(Fore.RED + f"Error: Chain '{chain_name}' not supported.") | |
| return False | |
| chain = self.CHAIN_INFO[chain_name] | |
| try: | |
| provider = Web3.HTTPProvider(chain['rpc_url']) | |
| self.web3_connection = Web3(provider) | |
| if self.web3_connection.is_connected(): | |
| self.current_chain = chain_name | |
| return True | |
| else: | |
| print(Fore.RED + f"Failed to connect to {chain_name}") | |
| return False | |
| except Exception as e: | |
| print(Fore.RED + f"Error connecting to {chain_name}: {str(e)}") | |
| return False | |
| def get_gas_price(self): | |
| """Return gas price based on the current strategy (low, medium, high).""" | |
| if self.gas_strategy == 'low': | |
| return self.web3_connection.eth.gas_price * 0.8 | |
| elif self.gas_strategy == 'high': | |
| return self.web3_connection.eth.gas_price * 1.5 | |
| else: # medium is default | |
| return self.web3_connection.eth.gas_price | |
| def set_gas_strategy(self, level): | |
| """Set the gas strategy for transactions.""" | |
| if level in ['low', 'medium', 'high']: | |
| self.gas_strategy = level | |
| print(Fore.GREEN + f"Gas strategy set to {self.gas_strategy}.") | |
| else: | |
| print(Fore.RED + "Error: Invalid gas level. Choose 'low', 'medium', or 'high'.") | |
| def get_recipient_address(self, recipient_name): | |
| """Fetch recipient's public Ethereum address by matching full_name or call_name.""" | |
| for user in self.known_user_names.values(): | |
| if user['full_name'].lower() == recipient_name.lower() or user['call_name'].lower() == recipient_name.lower(): | |
| return user['public0x'] | |
| return None | |
| def confirm_transaction(self): | |
| """Ask the user for confirmation before committing the transaction.""" | |
| print(Fore.LIGHTRED_EX + "Please confirm the transaction by typing or saying 'confirm' or cancel by typing 'cancel'.") | |
| user_input = input().strip().lower() | |
| if user_input == 'confirm': | |
| return True | |
| elif user_input == 'cancel': | |
| print(Fore.RED + "Transaction cancelled.") | |
| return False | |
| else: | |
| print(Fore.RED + "Invalid input. Transaction cancelled.") | |
| return False | |
| def get_transaction_url(self, chain_name, tx_hash): | |
| """Returns the appropriate URL for viewing the transaction based on the chain.""" | |
| if chain_name in self.CHAIN_INFO and 'explorer_url' in self.CHAIN_INFO[chain_name]: | |
| base_url = self.CHAIN_INFO[chain_name]['explorer_url'].rstrip('/') | |
| return f"{base_url}/tx/{tx_hash}" | |
| return f"Unknown chain: {chain_name}" | |
| def send_tokens(self, chain, token_name, amount, recipient_name): | |
| # First, ensure connection to the right chain | |
| if not self.connect_to_chain(chain): | |
| print(Fore.RED + f"Error: Failed to connect to {chain}.") | |
| return | |
| recipient_address = self.get_recipient_address(recipient_name) | |
| if not recipient_address: | |
| print(Fore.RED + f"Error: Recipient '{recipient_name}' not found or has no known public address.") | |
| return | |
| # Set default token as native token if not specified | |
| if token_name == '': | |
| token_name = self.CHAIN_INFO[chain]['symbol'] | |
| chain_data = self.CHAIN_INFO[chain] | |
| print(Fore.LIGHTYELLOW_EX + f"Preparing to send {amount} {token_name} to {recipient_name} ({recipient_address}) on {chain} network.") | |
| if not self.confirm_transaction(): | |
| return | |
| if token_name == chain_data['symbol']: # Native token | |
| amount_in_wei = self.web3_connection.to_wei(amount, 'ether') | |
| transaction = { | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'to': recipient_address, | |
| 'value': amount_in_wei, | |
| 'gas': 21000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'chainId': chain_data['chain_id'], | |
| } | |
| signed_txn = self.web3_connection.eth.account.sign_transaction( | |
| transaction, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(signed_txn.raw_transaction) | |
| # Get transaction URL and print success message | |
| tx_url = self.get_transaction_url(chain, self.web3_connection.to_hex(tx_hash)) | |
| print(Fore.LIGHTGREEN_EX + f"Transaction sent! View it on explorer: {tx_url}") | |
| else: # ERC-20 token | |
| token_address = self.TOKENS.get(token_name, {}).get(chain) | |
| if not token_address: | |
| print(Fore.RED + f"Error: Token '{token_name}' is not supported on {chain}.") | |
| return | |
| token_contract = self.web3_connection.eth.contract( | |
| address=Web3.to_checksum_address(token_address), | |
| abi=TOKEN_ABI | |
| ) | |
| amount_in_wei = self.web3_connection.to_wei(amount, 'ether') | |
| transaction = token_contract.functions.transfer( | |
| recipient_address, | |
| amount_in_wei | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'gas': 100000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': chain_data['chain_id'], | |
| }) | |
| signed_txn = self.web3_connection.eth.account.sign_transaction( | |
| transaction, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(signed_txn.raw_transaction) | |
| # Get transaction URL and print success message | |
| tx_url = self.get_transaction_url(chain, self.web3_connection.to_hex(tx_hash)) | |
| print(Fore.LIGHTGREEN_EX + f"Transaction sent! View it on explorer: {tx_url}") | |
| def handle_send_command(self, command, agent_voice_active=False, voice_mode_active=False, speak_response=None): | |
| """Handle the /send command logic.""" | |
| try: | |
| match = re.match(r'(?:/send|/0x\s+send)\s+(\w+)\s+([\w|\'\']+)\s+(\d+(\.\d+)?)\s+to\s+(\w+)', command) | |
| if not match: | |
| raise ValueError("Invalid command format. Use: /send <chain> <token|''> <amount> to <recipient>.") | |
| chain = match.group(1).capitalize() | |
| token_name = match.group(2).capitalize() | |
| amount = float(match.group(3)) | |
| recipient_name = match.group(5) | |
| if chain not in self.CHAIN_INFO: | |
| chain_matches = [c for c in self.CHAIN_INFO.keys() if c.lower() == chain.lower()] | |
| if chain_matches: | |
| chain = chain_matches[0] | |
| else: | |
| raise ValueError(f"Chain '{chain}' is not supported.") | |
| # Check if token exists (case-insensitive) | |
| if token_name and token_name not in self.TOKENS: # Fix: Use global TOKENS | |
| token_matches = [t for t in self.TOKENS.keys() if t.lower() == token_name.lower()] | |
| if token_matches: | |
| token_name = token_matches[0] | |
| else: | |
| raise ValueError(f"Token '{token_name}' is not supported.") | |
| self.send_tokens(chain, token_name, amount, recipient_name) | |
| except ValueError as ve: | |
| error_message = f"Error: {ve}" | |
| print(Fore.RED + error_message) | |
| if agent_voice_active or voice_mode_active and speak_response: | |
| speak_response(error_message) | |
| def handle_gas_command(self, command): | |
| """Handle the /send gas command.""" | |
| try: | |
| args = command.split() | |
| if len(args) < 2: | |
| raise ValueError("Invalid format. Use: /send gas <low|medium|high>.") | |
| gas_level = args[2] | |
| self.set_gas_strategy(gas_level) | |
| except ValueError as ve: | |
| print(Fore.RED + f"Error: {ve}") | |
| def handle_receive_command(self, current_user=None): | |
| """Display wallet addresses and supported tokens/chains""" | |
| print(Fore.LIGHTMAGENTA_EX + "\nWallet Information & Supported Assets") | |
| print("=" * 50) | |
| # Display wallet addresses | |
| if current_user and current_user in self.known_user_names: | |
| user_wallet = self.known_user_names[current_user]['public0x'] | |
| print(Fore.LIGHTYELLOW_EX + f"\n{current_user}'s Wallet:") | |
| print(f"Address: {user_wallet}") | |
| print(Fore.LIGHTYELLOW_EX + "\nOPSIE's Wallet:") | |
| print(f"Address: {self.AGENT_WALLET['public_key']}") | |
| # Display supported chains | |
| print(Fore.LIGHTYELLOW_EX + "\nSupported Chains:") | |
| for chain_name, chain_data in self.CHAIN_INFO.items(): # Fix: Use global CHAIN_INFO | |
| print(f"\n{chain_name}:") | |
| print(f" Native Token: {chain_data['symbol']}") | |
| print(f" Explorer: {chain_data['explorer_url']}") | |
| # Display supported tokens per chain | |
| print(Fore.LIGHTYELLOW_EX + "\nSupported Tokens:") | |
| for chain_name in self.CHAIN_INFO.keys(): # Fix: Use global CHAIN_INFO | |
| print(f"\n{chain_name} Tokens:") | |
| chain_tokens = [token for token, data in self.TOKENS.items() # Fix: Use global TOKENS | |
| if chain_name in data] | |
| for token in chain_tokens: | |
| contract = self.TOKENS[token].get(chain_name) # Fix: Use global TOKENS | |
| print(f" {token}: {contract}") | |
| print("\n" + "=" * 50) | |
| def parse_transaction_intent(self, command): | |
| """Parse user's natural language input into transaction parameters""" | |
| if not command: | |
| return None | |
| # Normalize input | |
| command = command.lower().strip() | |
| # Initialize parameters | |
| params = { | |
| 'action': None, # buy, sell | |
| 'amount': None, # numeric amount, 'all', 'half', '50%', etc. | |
| 'token': None, # token to buy/sell | |
| 'chain': None, # chain to operate on | |
| 'using_token': None, # token to pay with (for buy) or receive (for sell) | |
| 'amount_is_target': False # True if amount refers to using_token instead of main token | |
| } | |
| # Extract chain (look for "on <chain>") | |
| chain_match = re.search(r'on\s+(\w+)', command) | |
| if chain_match and chain_match.group(1): | |
| chain_name = chain_match.group(1).capitalize() | |
| if chain_name in self.CHAIN_INFO: # Fix: Use global CHAIN_INFO | |
| params['chain'] = chain_name | |
| # Determine action | |
| if 'sell' in command: | |
| params['action'] = 'sell' | |
| elif 'buy' in command: | |
| params['action'] = 'buy' | |
| # Extract amount and token | |
| amount_patterns = [ | |
| r'(all)(?:\s+my)?\s+(\w+)', | |
| r'(half)(?:\s+of)?\s+(?:my)?\s+(\w+)', | |
| r'(\d+(?:\.\d+)?%?)(?:\s+of)?\s+(?:my)?\s+(\w+)', | |
| r'(\d+(?:\.\d+)?)\s+(\w+)' | |
| ] | |
| for pattern in amount_patterns: | |
| match = re.search(pattern, command) | |
| if match: | |
| amount_str, token_name = match.groups() | |
| # Process amount | |
| if amount_str == 'all': | |
| params['amount'] = 'all' | |
| elif amount_str == 'half': | |
| params['amount'] = '50%' | |
| elif '%' in amount_str: | |
| params['amount'] = amount_str | |
| else: | |
| try: | |
| params['amount'] = float(amount_str) | |
| except ValueError: | |
| continue | |
| # Find matching token | |
| for trusted_token in self.TOKENS: # Fix: Use global TOKENS | |
| if trusted_token.lower() == token_name.lower(): | |
| params['token'] = trusted_token | |
| break | |
| # Check if it's a native token | |
| for chain_name, chain_data in self.CHAIN_INFO.items(): # Fix: Use global CHAIN_INFO | |
| if chain_data.get('symbol', '').lower() == token_name.lower(): | |
| params['token'] = chain_data['symbol'] | |
| break | |
| if params['token']: | |
| break | |
| # Handle target token amount (e.g., "sell degen for 0.1 eth") | |
| for pattern in [r'for\s+(\d+(?:\.\d+)?)\s+(\w+)', r'using\s+(\d+(?:\.\d+)?)\s+(\w+)']: | |
| match = re.search(pattern, command) | |
| if match: | |
| amount_str, token_name = match.groups() | |
| try: | |
| params['amount'] = float(amount_str) | |
| params['amount_is_target'] = True | |
| # Check if it's a native token | |
| for chain_name, chain_data in self.CHAIN_INFO.items(): # Fix: Use global CHAIN_INFO | |
| if chain_data.get('symbol', '').lower() == token_name.lower(): | |
| params['using_token'] = chain_data['symbol'] | |
| break | |
| # If not native, check trusted tokens | |
| if not params['using_token']: | |
| for trusted_token in self.TOKENS: # Fix: Use global TOKENS | |
| if trusted_token.lower() == token_name.lower(): | |
| params['using_token'] = trusted_token | |
| break | |
| except ValueError: | |
| continue | |
| # If no target amount found, look for target token | |
| if not params['using_token']: | |
| for pattern in [r'for\s+(\w+)', r'using\s+(\w+)']: | |
| match = re.search(pattern, command) | |
| if match and match.group(1): | |
| token_name = match.group(1) | |
| # Check native tokens | |
| for chain_name, chain_data in self.CHAIN_INFO.items(): # Fix: Use global CHAIN_INFO | |
| if chain_data.get('symbol', '').lower() == token_name.lower(): | |
| params['using_token'] = chain_data['symbol'] | |
| break | |
| # Check trusted tokens | |
| if not params['using_token']: | |
| for trusted_token in self.TOKENS: # Fix: Use global TOKENS | |
| if trusted_token.lower() == token_name.lower(): | |
| params['using_token'] = trusted_token | |
| break | |
| return params | |
| def validate_and_complete_transaction(self, params): | |
| """Validate transaction parameters and prompt for missing information""" | |
| if not params: | |
| raise ValueError("Could not understand transaction intent. Please try again.") | |
| # Validate/prompt for chain | |
| if not params['chain']: | |
| print(Fore.YELLOW + "\nAvailable chains:") | |
| for chain in self.CHAIN_INFO.keys(): | |
| print(f"- {chain}") | |
| chain_input = input("Which chain would you like to use? ").strip() | |
| if chain_input.capitalize() in self.CHAIN_INFO: | |
| params['chain'] = chain_input.capitalize() | |
| else: | |
| raise ValueError(f"Unsupported chain: {chain_input}") | |
| # Validate/prompt for token | |
| if not params['token']: | |
| print(Fore.YELLOW + f"\nAvailable tokens on {params['chain']}:") | |
| available_tokens = [token for token, data in self.TOKENS.items() | |
| if params['chain'] in data] | |
| for token in available_tokens: | |
| print(f"- {token}") | |
| token_input = input("Which token would you like to trade? ").strip().upper() | |
| if token_input in [t.upper() for t in available_tokens]: | |
| params['token'] = next(t for t in available_tokens if t.upper() == token_input) | |
| else: | |
| raise ValueError(f"Unsupported token: {token_input}") | |
| # For buy/sell operations, validate/prompt for using_token | |
| if params['action'] in ['buy', 'sell'] and not params['using_token']: | |
| print(Fore.YELLOW + f"\nAvailable tokens to trade with:") | |
| # Get available tokens excluding the one being traded | |
| available_tokens = [token for token, data in self.TOKENS.items() | |
| if params['chain'] in data and token != params['token']] | |
| # Add native token | |
| native_token = self.CHAIN_INFO[params['chain']]['symbol'] | |
| available_tokens.append(native_token) | |
| for token in available_tokens: | |
| print(f"- {token}") | |
| token_input = input(f"Which token would you like to {'pay with' if params['action'] == 'buy' else 'receive'}? ").strip().upper() | |
| if token_input in [t.upper() for t in available_tokens]: | |
| params['using_token'] = next(t for t in available_tokens if t.upper() == token_input) | |
| else: | |
| raise ValueError(f"Unsupported token: {token_input}") | |
| # Validate/prompt for amount | |
| if not params['amount'] and params['amount'] != 0: | |
| amount_input = input("How much would you like to trade? (or 'all' for entire balance) ").strip() | |
| if amount_input.lower() == 'all': | |
| params['amount'] = 'all' | |
| else: | |
| try: | |
| params['amount'] = float(amount_input) | |
| except ValueError: | |
| raise ValueError("Invalid amount specified") | |
| return params | |
| def format_transaction_preview(self, params, price_data): | |
| """Format transaction details for user confirmation""" | |
| preview = [] | |
| if params['action'] in ['buy', 'sell']: | |
| # Token amounts | |
| if params['action'] == 'buy': | |
| preview.extend([ | |
| "Price Quotes:", | |
| "Best Price from DEX:", | |
| f" You give: {price_data['amount_in_formatted']} {params['using_token']}", | |
| f" You get: {price_data['amount_out_formatted']} {params['token']}" | |
| ]) | |
| else: # sell remains unchanged | |
| preview.extend([ | |
| "Price Quotes:", | |
| "Best Price from DEX:", | |
| f" You give: {params['amount']} {params['token']}", | |
| f" You get: {price_data['amount_out_formatted']} {params['using_token']}" | |
| ]) | |
| # Exchange rate | |
| if params['action'] == 'buy': | |
| preview.extend([ | |
| "", | |
| "Exchange Rate:", | |
| f" 1 {params['using_token']} = {1/price_data['exchange_rate']:.6f} {params['token']}" | |
| ]) | |
| else: # sell remains unchanged | |
| preview.extend([ | |
| "", | |
| "Exchange Rate:", | |
| f" 1 {params['token']} = {price_data['exchange_rate']:.6f} {params['using_token']}" | |
| ]) | |
| # Rest of the preview formatting remains the same | |
| preview.extend([ | |
| "", | |
| "USD Values:", | |
| f" Input: ${price_data['usd_values']['input']:.2f}", | |
| f" Output: ${price_data['usd_values']['output']:.2f}", | |
| f" Price Impact: {((price_data['usd_values']['output'] - price_data['usd_values']['input']) / price_data['usd_values']['input'] * 100):.2f}%", | |
| "", | |
| "Available Routes:" | |
| ]) | |
| for route in price_data['all_quotes']['dexes']: | |
| preview.append(f" - {route['name']} via {route['router']}") | |
| return "\n".join(preview) | |
| def get_best_price(self, params): | |
| """Get best price across multiple DEXes with USD values""" | |
| try: | |
| # Ensure we have a connection | |
| if not self.web3_connection or not self.web3_connection.is_connected(): | |
| if not self.connect_to_chain(params['chain']): | |
| raise ValueError(f"Failed to connect to {params['chain']}") | |
| router = self.get_dex_router(params['chain']) | |
| token_address = Web3.to_checksum_address(self.TOKENS[params['token']][params['chain']]) | |
| using_token_address = (self.get_weth_address(params['chain']) | |
| if params['using_token'] == self.CHAIN_INFO[params['chain']]['symbol'] | |
| else Web3.to_checksum_address(self.TOKENS[params['using_token']][params['chain']])) | |
| if params['action'] == 'buy': | |
| # Get token decimals | |
| token_contract = self.web3_connection.eth.contract( | |
| address=token_address, | |
| abi=TOKEN_ABI | |
| ) | |
| token_decimals = token_contract.functions.decimals().call() | |
| # Calculate the target amount of tokens we want to buy | |
| amount_out = int(float(params['amount']) * (10 ** token_decimals)) | |
| # Get amounts from router using getAmountsIn() | |
| amounts = router.functions.getAmountsIn( | |
| amount_out, # Amount of tokens we want to receive | |
| [using_token_address, token_address] # Path: ETH -> Token | |
| ).call() | |
| amount_in = amounts[0] # This is how much ETH we need to pay | |
| # Format amounts for display | |
| amount_in_formatted = Web3.from_wei(amount_in, 'ether') | |
| amount_out_formatted = float(params['amount']) | |
| # Get USD values | |
| eth_price = self.get_token_usd_price('ETH', params['chain']) | |
| input_usd = float(amount_in_formatted) * eth_price | |
| output_usd = input_usd # Simplified for now | |
| return { | |
| 'source': 'DEX', | |
| 'amount_in': amount_in, | |
| 'amount_in_formatted': amount_in_formatted, | |
| 'amount_out': amount_out, | |
| 'amount_out_formatted': amount_out_formatted, | |
| # For display purposes, we want to show how many tokens per ETH | |
| 'exchange_rate': float(amount_in_formatted) / float(amount_out_formatted), | |
| 'decimals': { | |
| 'from': 18, # ETH decimals | |
| 'to': token_decimals | |
| }, | |
| 'usd_values': { | |
| 'input': input_usd, | |
| 'output': output_usd | |
| }, | |
| 'quote': { | |
| 'router': router.address, | |
| 'path': [using_token_address, token_address] | |
| }, | |
| 'all_quotes': { | |
| 'dexes': [{ | |
| 'name': 'BaseSwap', | |
| 'router': router.address | |
| }] | |
| } | |
| } | |
| else: # sell | |
| # Get token decimals | |
| token_contract = self.web3_connection.eth.contract( | |
| address=token_address, | |
| abi=TOKEN_ABI | |
| ) | |
| token_decimals = token_contract.functions.decimals().call() | |
| # Calculate amount in wei | |
| amount_in = int(float(params['amount']) * (10 ** token_decimals)) | |
| # Get amounts from router | |
| amounts = router.functions.getAmountsOut( | |
| amount_in, | |
| [token_address, using_token_address] | |
| ).call() | |
| amount_out = amounts[1] | |
| # Format amounts for display | |
| amount_in_formatted = float(params['amount']) | |
| amount_out_formatted = Web3.from_wei(amount_out, 'ether') | |
| # Calculate exchange rate | |
| exchange_rate = float(amount_out_formatted) / float(amount_in_formatted) | |
| # Get USD values | |
| eth_price = self.get_token_usd_price('ETH', params['chain']) | |
| output_usd = float(amount_out_formatted) * eth_price | |
| input_usd = output_usd # Simplified for now | |
| return { | |
| 'source': 'DEX', | |
| 'amount_in': amount_in, | |
| 'amount_in_formatted': amount_in_formatted, | |
| 'amount_out': amount_out, | |
| 'amount_out_formatted': amount_out_formatted, | |
| 'exchange_rate': exchange_rate, | |
| 'decimals': { | |
| 'from': token_decimals, | |
| 'to': 18 # ETH decimals | |
| }, | |
| 'usd_values': { | |
| 'input': input_usd, | |
| 'output': output_usd | |
| }, | |
| 'quote': { | |
| 'router': router.address, | |
| 'path': [token_address, using_token_address] | |
| }, | |
| 'all_quotes': { | |
| 'dexes': [{ | |
| 'name': 'UniswapV2', | |
| 'router': router.address | |
| }] | |
| } | |
| } | |
| except Exception as e: | |
| raise ValueError(f"Failed to get price: {str(e)}") | |
| def execute_buy(self, params): | |
| """Execute a buy transaction with additional validation""" | |
| try: | |
| if not self.connect_to_chain(params['chain']): | |
| raise ValueError(f"Failed to connect to {params['chain']}") | |
| # Get price quote first | |
| price_quote = self.get_best_price(params) | |
| token_address = Web3.to_checksum_address(self.TOKENS[params['token']][params['chain']]) | |
| using_token_address = (self.get_weth_address(params['chain']) | |
| if params['using_token'] == self.CHAIN_INFO[params['chain']]['symbol'] | |
| else Web3.to_checksum_address(self.TOKENS[params['using_token']][params['chain']])) | |
| router = self.get_dex_router(params['chain']) | |
| # Calculate slippage and deadline | |
| slippage = 0.005 # 0.5% slippage tolerance | |
| amount_out_min = int(price_quote['amount_out'] * (1 - slippage)) | |
| deadline = int(time.time()) + 300 # 5 minutes | |
| if params['using_token'] == self.CHAIN_INFO[params['chain']]['symbol']: | |
| # Check ETH balance | |
| balance = self.web3_connection.eth.get_balance(self.AGENT_WALLET['public_key']) | |
| if balance < price_quote['amount_in']: | |
| raise ValueError(f"Insufficient ETH balance. Need {Web3.from_wei(price_quote['amount_in'], 'ether')} ETH") | |
| # Execute ETH swap | |
| swap_tx = router.functions.swapExactETHForTokens( | |
| amount_out_min, # minimum amount of tokens to receive | |
| [using_token_address, token_address], # path | |
| self.AGENT_WALLET['public_key'], # recipient | |
| deadline | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'value': price_quote['amount_in'], # amount of ETH to send | |
| 'gas': 250000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| else: | |
| # Handle ERC20 token | |
| token_contract = self.web3_connection.eth.contract( | |
| address=using_token_address, | |
| abi=TOKEN_ABI | |
| ) | |
| # Check token balance | |
| balance = token_contract.functions.balanceOf(self.AGENT_WALLET['public_key']).call() | |
| if balance < price_quote['amount_in']: | |
| raise ValueError(f"Insufficient {params['using_token']} balance") | |
| # Approve tokens if needed | |
| allowance = token_contract.functions.allowance( | |
| self.AGENT_WALLET['public_key'], | |
| router.address | |
| ).call() | |
| if allowance < price_quote['amount_in']: | |
| approve_tx = token_contract.functions.approve( | |
| router.address, | |
| price_quote['amount_in'] | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'gas': 100000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| signed_tx = self.web3_connection.eth.account.sign_transaction( | |
| approve_tx, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(signed_tx.raw_transaction) | |
| self.web3_connection.eth.wait_for_transaction_receipt(tx_hash) | |
| # Execute token swap | |
| swap_tx = router.functions.swapExactTokensForTokens( | |
| price_quote['amount_in'], # amount of tokens to send | |
| amount_out_min, # minimum amount of tokens to receive | |
| [using_token_address, token_address], # path | |
| self.AGENT_WALLET['public_key'], # recipient | |
| deadline | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'gas': 250000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| # Sign and send transaction | |
| signed_tx = self.web3_connection.eth.account.sign_transaction( | |
| swap_tx, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(signed_tx.raw_transaction) | |
| receipt = self.web3_connection.eth.wait_for_transaction_receipt(tx_hash) | |
| if receipt['status'] == 1: | |
| success_message = ( | |
| f"\nBought {price_quote['amount_out_formatted']} {params['token']} " | |
| f"using {price_quote['amount_in_formatted']} {params['using_token']} " | |
| f"at rate 1 {params['using_token']} = {1/price_quote['exchange_rate']:.8f} {params['token']} " | |
| f"via UniswapV2 pool" | |
| ) | |
| print(Fore.GREEN + success_message) | |
| print(f"View on explorer: {self.CHAIN_INFO[params['chain']]['explorer_url']}/tx/{self.web3_connection.to_hex(tx_hash)}") | |
| else: | |
| raise ValueError("Swap transaction failed!") | |
| except Exception as e: | |
| raise ValueError(f"Buy transaction failed: {str(e)}") | |
| def execute_sell(self, params): | |
| """Execute a sell transaction on DEX""" | |
| try: | |
| if not self.connect_to_chain(params['chain']): | |
| raise ValueError(f"Failed to connect to {params['chain']}") | |
| # Get price quote first | |
| price_quote = self.get_best_price(params) | |
| token_address = Web3.to_checksum_address(self.TOKENS[params['token']][params['chain']]) | |
| using_token_address = (self.get_weth_address(params['chain']) | |
| if params['using_token'] == self.CHAIN_INFO[params['chain']]['symbol'] | |
| else Web3.to_checksum_address(self.TOKENS[params['using_token']][params['chain']])) | |
| router = self.get_dex_router(params['chain']) | |
| # Create token contract | |
| token_contract = self.web3_connection.eth.contract( | |
| address=token_address, | |
| abi=TOKEN_ABI | |
| ) | |
| decimals = token_contract.functions.decimals().call() | |
| amount_in_wei = int(params['amount'] * (10 ** decimals)) | |
| # Calculate minimum tokens out with slippage | |
| slippage = 0.005 # 0.5% slippage tolerance | |
| min_tokens_out = int(price_quote['amount_out'] * (1 - slippage)) | |
| deadline = int(time.time()) + 300 # 5 minutes | |
| # Approve tokens first | |
| approve_tx = token_contract.functions.approve( | |
| router.address, | |
| amount_in_wei | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'gas': 100000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| # Sign and send approval | |
| signed_approve = self.web3_connection.eth.account.sign_transaction( | |
| approve_tx, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| try: | |
| raw_tx = signed_approve.raw_transaction | |
| except AttributeError: | |
| raw_tx = signed_approve.rawTransaction | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(raw_tx) | |
| receipt = self.web3_connection.eth.wait_for_transaction_receipt(tx_hash) | |
| if receipt['status'] != 1: | |
| raise ValueError("Token approval failed") | |
| print(Fore.GREEN + "Token approval successful") | |
| # Execute swap transaction | |
| swap_tx = router.functions.swapExactTokensForETH( | |
| amount_in_wei, | |
| min_tokens_out, | |
| [token_address, self.get_weth_address(params['chain'])], | |
| self.AGENT_WALLET['public_key'], | |
| deadline | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'gas': 250000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| # Sign and send swap transaction | |
| signed_tx = self.web3_connection.eth.account.sign_transaction( | |
| swap_tx, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| try: | |
| raw_tx = signed_tx.raw_transaction | |
| except AttributeError: | |
| raw_tx = signed_tx.rawTransaction | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(raw_tx) | |
| receipt = self.web3_connection.eth.wait_for_transaction_receipt(tx_hash) | |
| if receipt['status'] == 1: | |
| success_message = ( | |
| f"\nSold {price_quote['amount_in_formatted']} {params['token']} " | |
| f"for {price_quote['amount_out_formatted']} {params['using_token']} " | |
| f"at rate 1 {params['token']} = {price_quote['exchange_rate']} {params['using_token']} " | |
| f"via UniswapV2 pool" | |
| ) | |
| print(Fore.GREEN + success_message) | |
| print(f"View on explorer: {self.CHAIN_INFO[params['chain']]['explorer_url']}/tx/{self.web3_connection.to_hex(tx_hash)}") | |
| else: | |
| raise ValueError("Swap transaction failed!") | |
| except Exception as e: | |
| raise ValueError(f"Sell transaction failed: {str(e)}") | |
| def get_token_price(self, token1, token2, chain): | |
| """Get token price from DEX""" | |
| try: | |
| router = self.get_dex_router(chain) | |
| amount_in = Web3.to_wei(1, 'ether') # Price for 1 token | |
| # Get price path | |
| path = [ | |
| self.TOKENS[token1][chain] if token1 != self.CHAIN_INFO[chain]['symbol'] | |
| else self.get_weth_address(chain), | |
| self.TOKENS[token2][chain] if token2 != self.CHAIN_INFO[chain]['symbol'] | |
| else self.get_weth_address(chain) | |
| ] | |
| # Get amounts out | |
| amounts_out = router.functions.getAmountsOut(amount_in, path).call() | |
| price = amounts_out[1] / amounts_out[0] | |
| return { | |
| 'price': price, | |
| 'path': path | |
| } | |
| except Exception as e: | |
| raise ValueError(f"Failed to get token price: {str(e)}") | |
| def get_dex_router(self, chain): | |
| """Get DEX router contract based on chain with specific configurations""" | |
| # Router configurations | |
| ROUTER_CONFIGS = { | |
| 'Base': { | |
| 'address': '0x327Df1E6de05895d2ab08513aaDD9313Fe505d86', # BaseSwap | |
| 'name': 'BaseSwap', | |
| 'factory': '0xFDa619b6d20975be80A10332cD39b9a4b0FAa8BB' | |
| }, | |
| 'Ethereum': { | |
| 'address': '0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D', # Uniswap V2 | |
| 'name': 'Uniswap V2', | |
| 'factory': '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f' | |
| }, | |
| 'Polygon': { | |
| 'address': '0xa5E0829CaCEd8fFDD4De3c43696c57F7D7A678ff', # QuickSwap | |
| 'name': 'QuickSwap', | |
| 'factory': '0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32' | |
| } | |
| } | |
| config = ROUTER_CONFIGS.get(chain) | |
| if not config: | |
| raise ValueError(f"No DEX configuration for chain {chain}") | |
| router = self.web3_connection.eth.contract( | |
| address=Web3.to_checksum_address(config['address']), | |
| abi=ROUTER_ABI | |
| ) | |
| # Verify router is operational | |
| try: | |
| factory_address = router.functions.factory().call() | |
| if factory_address.lower() != config['factory'].lower(): | |
| raise ValueError(f"Router factory mismatch on {chain}") | |
| except Exception as e: | |
| raise ValueError(f"Router verification failed on {chain}: {str(e)}") | |
| return router | |
| def get_token_contract(self, token, chain): | |
| """Get token contract instance""" | |
| if token == self.CHAIN_INFO[chain]['symbol']: | |
| return None # Native token | |
| token_address = self.TOKENS.get(token, {}).get(chain) | |
| if not token_address: | |
| raise ValueError(f"Token {token} not supported on {chain}") | |
| return self.web3_connection.eth.contract( | |
| address=Web3.to_checksum_address(token_address), | |
| abi=TOKEN_ABI | |
| ) | |
| def get_weth_address(self, chain): | |
| """Get wrapped native token address for the chain""" | |
| WETH_ADDRESSES = { | |
| 'Base': '0x4200000000000000000000000000000000000006', | |
| 'Ethereum': '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', | |
| 'Polygon': '0x0d500B1d8E8eF31E21C99d1Db9A6444d3ADf1270' | |
| } | |
| weth_address = WETH_ADDRESSES.get(chain) | |
| if not weth_address: | |
| raise ValueError(f"No WETH address configured for chain {chain}") | |
| return Web3.to_checksum_address(weth_address) | |
| def handle_0x_command(self, command, agent_voice_active=False, voice_mode_active=False, speak_response=None): | |
| """Main handler for all /0x commands""" | |
| try: | |
| if not command: | |
| raise ValueError("Empty command received") | |
| parts = command.lower().split() | |
| if len(parts) < 2: | |
| raise ValueError("Invalid command format") | |
| subcommand = parts[1] | |
| args = ' '.join(parts[2:]) if len(parts) > 2 else "" | |
| # Existing transaction commands | |
| if subcommand in ["buy", "sell"]: | |
| params = self.parse_transaction_intent(command) | |
| if not params: | |
| raise ValueError("Could not understand transaction intent") | |
| params = self.validate_and_complete_transaction(params) | |
| price_data = self.get_best_price(params) | |
| if price_data: | |
| preview_text = self.format_transaction_preview(params, price_data) | |
| print(preview_text) | |
| if self.confirm_transaction(): | |
| if subcommand == "buy": | |
| self.execute_buy(params) | |
| else: | |
| self.execute_sell(params) | |
| elif subcommand == "send": | |
| self.handle_send_command(command, agent_voice_active, voice_mode_active, speak_response) | |
| elif subcommand == "receive": | |
| self.handle_receive_command() | |
| elif subcommand == "gas": | |
| if not args: | |
| raise ValueError("Gas level not specified. Use: /0x gas <low|medium|high>") | |
| self.handle_gas_command(command) | |
| # New configuration commands | |
| elif subcommand == "new": | |
| if len(parts) < 3: | |
| raise ValueError("Use: /0x new chain|token") | |
| if parts[2] == "chain": | |
| self.add_chain_interactive() | |
| elif parts[2] == "token": | |
| self.add_token_interactive() | |
| else: | |
| raise ValueError("Use: /0x new chain|token") | |
| elif subcommand == "forget": | |
| if len(parts) < 4: | |
| raise ValueError("Use: /0x forget chain|token <name>") | |
| target_type = parts[2] | |
| target_name = ' '.join(parts[3:]) # Join remaining parts for multi-word names | |
| if target_type == "chain": | |
| self.forget_chain(target_name) | |
| elif target_type == "token": | |
| self.forget_token(target_name) | |
| else: | |
| raise ValueError("Use: /0x forget chain|token <name>") | |
| else: | |
| raise ValueError("Invalid command. Use /0x followed by: buy, sell, send, gas, receive, new, or forget") | |
| except Exception as e: | |
| error_message = f"Error: {str(e)}" | |
| print(Fore.RED + error_message) | |
| if agent_voice_active or voice_mode_active and speak_response: | |
| speak_response(error_message) | |
| def get_token_usd_price(self, token_symbol, chain): | |
| """Get token price in USD using price feeds""" | |
| try: | |
| # Use CoinGecko API for price data | |
| token_id = self.get_coingecko_id(token_symbol, chain) | |
| response = requests.get( | |
| f"https://api.coingecko.com/api/v3/simple/price", | |
| params={ | |
| 'ids': token_id, | |
| 'vs_currencies': 'usd' | |
| } | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if token_id in data and 'usd' in data[token_id]: | |
| return float(data[token_id]['usd']) | |
| # Fallback to default price if API fails | |
| print(f"Warning: Could not get USD price for {token_symbol}, using fallback price") | |
| return self.get_fallback_price(token_symbol) | |
| except Exception as e: | |
| print(f"Warning: Failed to get USD price for {token_symbol}: {str(e)}") | |
| return self.get_fallback_price(token_symbol) | |
| def get_fallback_price(self, token_symbol): | |
| """Fallback prices when API fails""" | |
| FALLBACK_PRICES = { | |
| 'ETH': 2000.0, | |
| 'DEGEN': 0.01, | |
| # Add more fallback prices as needed | |
| } | |
| return FALLBACK_PRICES.get(token_symbol.upper(), 0.0) | |
| def execute_buy(self, params): | |
| """Execute a buy transaction with additional validation""" | |
| try: | |
| if not self.connect_to_chain(params['chain']): | |
| raise ValueError(f"Failed to connect to {params['chain']}") | |
| # Get price quote first | |
| price_quote = self.get_best_price(params) | |
| token_address = Web3.to_checksum_address(self.TOKENS[params['token']][params['chain']]) | |
| using_token_address = (self.get_weth_address(params['chain']) | |
| if params['using_token'] == self.CHAIN_INFO[params['chain']]['symbol'] | |
| else Web3.to_checksum_address(self.TOKENS[params['using_token']][params['chain']])) | |
| router = self.get_dex_router(params['chain']) | |
| # Calculate slippage and deadline | |
| slippage = 0.005 # 0.5% slippage tolerance | |
| amount_out_min = int(price_quote['amount_out'] * (1 - slippage)) | |
| deadline = int(time.time()) + 300 # 5 minutes | |
| if params['using_token'] == self.CHAIN_INFO[params['chain']]['symbol']: | |
| # Check ETH balance | |
| balance = self.web3_connection.eth.get_balance(self.AGENT_WALLET['public_key']) | |
| if balance < price_quote['amount_in']: | |
| raise ValueError(f"Insufficient ETH balance. Need {Web3.from_wei(price_quote['amount_in'], 'ether')} ETH") | |
| # Execute ETH swap | |
| swap_tx = router.functions.swapExactETHForTokens( | |
| amount_out_min, # minimum amount of tokens to receive | |
| [using_token_address, token_address], # path | |
| self.AGENT_WALLET['public_key'], # recipient | |
| deadline | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'value': price_quote['amount_in'], # amount of ETH to send | |
| 'gas': 250000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| else: | |
| # Handle ERC20 token | |
| token_contract = self.web3_connection.eth.contract( | |
| address=using_token_address, | |
| abi=TOKEN_ABI | |
| ) | |
| # Check token balance | |
| balance = token_contract.functions.balanceOf(self.AGENT_WALLET['public_key']).call() | |
| if balance < price_quote['amount_in']: | |
| raise ValueError(f"Insufficient {params['using_token']} balance") | |
| # Approve tokens if needed | |
| allowance = token_contract.functions.allowance( | |
| self.AGENT_WALLET['public_key'], | |
| router.address | |
| ).call() | |
| if allowance < price_quote['amount_in']: | |
| approve_tx = token_contract.functions.approve( | |
| router.address, | |
| price_quote['amount_in'] | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'gas': 100000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| signed_tx = self.web3_connection.eth.account.sign_transaction( | |
| approve_tx, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(signed_tx.raw_transaction) | |
| self.web3_connection.eth.wait_for_transaction_receipt(tx_hash) | |
| # Execute token swap | |
| swap_tx = router.functions.swapExactTokensForTokens( | |
| price_quote['amount_in'], # amount of tokens to send | |
| amount_out_min, # minimum amount of tokens to receive | |
| [using_token_address, token_address], # path | |
| self.AGENT_WALLET['public_key'], # recipient | |
| deadline | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'gas': 250000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| # Sign and send transaction | |
| signed_tx = self.web3_connection.eth.account.sign_transaction( | |
| swap_tx, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(signed_tx.raw_transaction) | |
| receipt = self.web3_connection.eth.wait_for_transaction_receipt(tx_hash) | |
| if receipt['status'] == 1: | |
| success_message = ( | |
| f"\nBought {price_quote['amount_out_formatted']} {params['token']} " | |
| f"using {price_quote['amount_in_formatted']} {params['using_token']} " | |
| f"at rate 1 {params['using_token']} = {1/price_quote['exchange_rate']:.8f} {params['token']} " | |
| f"via UniswapV2 pool" | |
| ) | |
| print(Fore.GREEN + success_message) | |
| print(f"View on explorer: {self.CHAIN_INFO[params['chain']]['explorer_url']}/tx/{self.web3_connection.to_hex(tx_hash)}") | |
| else: | |
| raise ValueError("Swap transaction failed!") | |
| except Exception as e: | |
| raise ValueError(f"Buy transaction failed: {str(e)}") | |
| def approve_token_spending(self, token_address, spender_address, amount): | |
| """Approve token spending for a specific contract""" | |
| try: | |
| token_contract = self.web3_connection.eth.contract( | |
| address=Web3.to_checksum_address(token_address), | |
| abi=TOKEN_ABI | |
| ) | |
| # Check current allowance | |
| current_allowance = token_contract.functions.allowance( | |
| self.AGENT_WALLET['public_key'], | |
| spender_address | |
| ).call() | |
| if current_allowance < amount: | |
| # Prepare approval transaction | |
| approve_tx = token_contract.functions.approve( | |
| spender_address, | |
| amount | |
| ).build_transaction({ | |
| 'from': self.AGENT_WALLET['public_key'], | |
| 'gas': 100000, | |
| 'gasPrice': self.get_gas_price(), | |
| 'nonce': self.web3_connection.eth.get_transaction_count(self.AGENT_WALLET['public_key']), | |
| 'chainId': self.web3_connection.eth.chain_id | |
| }) | |
| # Sign and send approval | |
| signed_approve = self.web3_connection.eth.account.sign_transaction( | |
| approve_tx, | |
| private_key=self.AGENT_WALLET['private_key'] | |
| ) | |
| # Send the raw transaction | |
| tx_hash = self.web3_connection.eth.send_raw_transaction(signed_approve.rawTransaction) | |
| # Wait for approval to be mined | |
| receipt = self.web3_connection.eth.wait_for_transaction_receipt(tx_hash) | |
| if receipt['status'] != 1: | |
| raise ValueError("Token approval failed") | |
| print(Fore.GREEN + "Token approval successful") | |
| except Exception as e: | |
| raise ValueError(f"Failed to approve token spending: {str(e)}") | |
| def ensure_checksum_address(self, address): | |
| """Ensure an address is in checksum format""" | |
| try: | |
| if not address: | |
| raise ValueError("Empty address provided") | |
| return Web3.to_checksum_address(address.lower()) | |
| except Exception as e: | |
| raise ValueError(f"Invalid address format: {address}") | |
| def get_coingecko_id(self, token_symbol, chain): | |
| """Map token symbols to CoinGecko IDs""" | |
| COINGECKO_IDS = { | |
| 'ETH': 'ethereum', | |
| 'DEGEN': 'degen', | |
| 'USDC': 'usd-coin', | |
| # Add more mappings as needed | |
| } | |
| return COINGECKO_IDS.get(token_symbol.upper(), token_symbol.lower()) | |
| def add_token_interactive(self): | |
| """Interactive token addition with validation and file update""" | |
| print(Fore.YELLOW + "\nAdding new token configuration:") | |
| try: | |
| token_name = input("Token name: ").strip().capitalize() | |
| chain_name = input("Chain name: ").strip().capitalize() | |
| if chain_name not in self.CHAIN_INFO: | |
| raise ValueError(f"Chain {chain_name} not supported!") | |
| contract_address = input("Contract address: ").strip() | |
| contract_address = Web3.to_checksum_address(contract_address) | |
| # Update runtime dictionary | |
| if token_name not in self.TOKENS: | |
| self.TOKENS[token_name] = {} | |
| self.TOKENS[token_name][chain_name] = contract_address | |
| # Update Python file | |
| current_file = os.path.abspath(__file__) | |
| with open(current_file, 'r', encoding='utf-8') as f: | |
| content = f.readlines() | |
| # Find TOKENS dictionary | |
| start_index = None | |
| end_index = None | |
| brace_count = 0 | |
| in_tokens = False | |
| for i, line in enumerate(content): | |
| if line.strip().startswith('TOKENS = {'): | |
| start_index = i | |
| in_tokens = True | |
| brace_count = 1 | |
| continue | |
| if in_tokens: | |
| if '{' in line: | |
| brace_count += 1 | |
| if '}' in line: | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| end_index = i + 1 | |
| break | |
| if start_index is None or end_index is None: | |
| raise ValueError("TOKENS dictionary not found in file") | |
| # Check if there are existing entries | |
| has_entries = False | |
| for i in range(start_index + 1, end_index - 1): | |
| if "': {" in content[i]: | |
| has_entries = True | |
| break | |
| # Create new token entry | |
| new_token_lines = [] | |
| if has_entries: | |
| # If there are existing entries, add comma to the previous entry if needed | |
| prev_line = content[end_index - 2].rstrip() | |
| if not prev_line.endswith(','): | |
| content[end_index - 2] = prev_line + ',\n' | |
| new_token_lines.extend([ | |
| f" '{token_name}': {{\n", | |
| f" '{chain_name}': '{contract_address}',\n", | |
| " }" # No comma here - will be added by next entry if needed | |
| ]) | |
| # Insert new token before the closing brace of TOKENS | |
| content.insert(end_index - 1, '\n'.join(new_token_lines) + '\n') | |
| with open(current_file, 'w', encoding='utf-8') as f: | |
| f.writelines(content) | |
| print(Fore.GREEN + f"\nToken {token_name} added successfully on {chain_name}!") | |
| except Exception as e: | |
| raise ValueError(f"Failed to add token: {str(e)}") | |
| def forget_token(self, token_name): | |
| """Remove token configuration""" | |
| try: | |
| token_name = token_name.strip().capitalize() | |
| if token_name not in self.TOKENS: | |
| raise ValueError(f"Token {token_name} not found") | |
| # Remove from runtime dictionary | |
| del self.TOKENS[token_name] | |
| # Update Python file | |
| current_file = os.path.abspath(__file__) | |
| with open(current_file, 'r', encoding='utf-8') as f: | |
| content = f.readlines() | |
| # Find token entry | |
| start_index = None | |
| end_index = None | |
| token_found = False | |
| brace_count = 0 | |
| for i, line in enumerate(content): | |
| if f"'{token_name}'" in line and "': {" in line: | |
| start_index = i | |
| token_found = True | |
| brace_count = 1 | |
| continue | |
| if token_found: | |
| if '{' in line: | |
| brace_count += 1 | |
| if '}' in line: | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| end_index = i + 1 | |
| break | |
| if start_index is None or end_index is None: | |
| raise ValueError(f"Token {token_name} entry not found in file") | |
| # Check if this is the last entry | |
| is_last_entry = True | |
| for i in range(end_index, len(content)): | |
| if "': {" in content[i]: | |
| is_last_entry = False | |
| break | |
| # If it's not the last entry, keep the comma from the current entry | |
| # If it is the last entry, remove the comma from the previous entry | |
| if is_last_entry and start_index > 0: | |
| # Find previous entry's closing brace | |
| for i in range(start_index - 1, -1, -1): | |
| if content[i].strip() == '},': | |
| content[i] = content[i].replace('},', '}') | |
| break | |
| # Remove the token entry | |
| del content[start_index:end_index] | |
| with open(current_file, 'w', encoding='utf-8') as f: | |
| f.writelines(content) | |
| print(Fore.GREEN + f"Token {token_name} removed successfully") | |
| except Exception as e: | |
| raise ValueError(f"Failed to remove token: {str(e)}") | |
| def add_chain_interactive(self): | |
| """Interactive chain addition with validation""" | |
| print(Fore.YELLOW + "\nAdding new chain configuration:") | |
| try: | |
| chain_name = input("Chain name: ").strip().capitalize() | |
| if chain_name in self.CHAIN_INFO: | |
| raise ValueError(f"Chain {chain_name} already exists!") | |
| chain_id = int(input("Chain ID: ").strip()) | |
| rpc_url = input("RPC URL: ").strip() | |
| symbol = input("Native token symbol: ").strip().upper() | |
| explorer_url = input("Block explorer URL: ").strip() | |
| # Test connection | |
| web3 = Web3(Web3.HTTPProvider(rpc_url)) | |
| if not web3.is_connected(): | |
| raise ValueError("Unable to connect to RPC URL") | |
| # Update runtime dictionary | |
| self.CHAIN_INFO[chain_name] = { | |
| 'chain_id': chain_id, | |
| 'rpc_url': rpc_url, | |
| 'symbol': symbol, | |
| 'decimals': 18, | |
| 'explorer_url': explorer_url | |
| } | |
| # Update Python file | |
| current_file = os.path.abspath(__file__) | |
| with open(current_file, 'r', encoding='utf-8') as f: | |
| content = f.readlines() | |
| # Find CHAIN_INFO dictionary | |
| start_index = None | |
| end_index = None | |
| brace_count = 0 | |
| in_chain_info = False | |
| for i, line in enumerate(content): | |
| if line.strip().startswith('CHAIN_INFO = {'): | |
| start_index = i | |
| in_chain_info = True | |
| brace_count = 1 | |
| continue | |
| if in_chain_info: | |
| if '{' in line: | |
| brace_count += 1 | |
| if '}' in line: | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| end_index = i + 1 | |
| break | |
| if start_index is None or end_index is None: | |
| raise ValueError("CHAIN_INFO dictionary not found in file") | |
| # Check if there are existing entries | |
| has_entries = False | |
| for i in range(start_index + 1, end_index - 1): | |
| if "': {" in content[i]: | |
| has_entries = True | |
| break | |
| # Create new chain entry | |
| new_chain_lines = [] | |
| if has_entries: | |
| # If there are existing entries, add comma to the previous entry if needed | |
| prev_line = content[end_index - 2].rstrip() | |
| if not prev_line.endswith(','): | |
| content[end_index - 2] = prev_line + ',\n' | |
| new_chain_lines.extend([ | |
| f" '{chain_name}': {{", | |
| f" 'chain_id': {chain_id},", | |
| f" 'rpc_url': '{rpc_url}',", | |
| f" 'symbol': '{symbol}',", | |
| " 'decimals': 18,", | |
| f" 'explorer_url': '{explorer_url}'", | |
| " }" # No comma here - will be added by next entry if needed | |
| ]) | |
| # Insert new chain before the closing brace | |
| content.insert(end_index - 1, '\n'.join(new_chain_lines) + '\n') | |
| with open(current_file, 'w', encoding='utf-8') as f: | |
| f.writelines(content) | |
| print(Fore.GREEN + f"\nChain {chain_name} added successfully!") | |
| except Exception as e: | |
| raise ValueError(f"Failed to add chain: {str(e)}") | |
| def forget_chain(self, chain_name): | |
| """Remove chain configuration""" | |
| try: | |
| chain_name = chain_name.strip().capitalize() | |
| if chain_name not in self.CHAIN_INFO: | |
| raise ValueError(f"Chain {chain_name} not found") | |
| # Check if chain is in use by any tokens | |
| for token, chains in self.TOKENS.items(): | |
| if chain_name in chains: | |
| raise ValueError(f"Cannot remove chain {chain_name} as it is used by token {token}") | |
| # Remove from runtime dictionary | |
| del self.CHAIN_INFO[chain_name] | |
| # Save to temporary JSON for persistence until code is updated | |
| self.save_chain_config() | |
| # Update the Python file | |
| self._update_python_file_chain_removal(chain_name) | |
| print(Fore.GREEN + f"Chain {chain_name} removed successfully") | |
| except Exception as e: | |
| raise ValueError(f"Failed to remove chain: {str(e)}") | |
| def get_price(self, token_name, chain_name, amount, using_token=None): | |
| """Get price for token swap""" | |
| try: | |
| if not self.web3_connection or not self.web3_connection.is_connected(): | |
| raise ConnectionError("No active web3 connection") | |
| # Get token addresses | |
| if token_name not in self.TOKENS and token_name != self.CHAIN_INFO[chain_name]['symbol']: | |
| raise ValueError(f"Token {token_name} not supported") | |
| if using_token and using_token not in self.TOKENS and using_token != self.CHAIN_INFO[chain_name]['symbol']: | |
| raise ValueError(f"Token {using_token} not supported") | |
| # Get token contract addresses | |
| token_address = None | |
| if token_name in self.TOKENS: | |
| if chain_name not in self.TOKENS[token_name]: | |
| raise ValueError(f"Token {token_name} not supported on {chain_name}") | |
| token_address = self.TOKENS[token_name][chain_name] | |
| using_token_address = None | |
| if using_token in self.TOKENS: | |
| if chain_name not in self.TOKENS[using_token]: | |
| raise ValueError(f"Token {using_token} not supported on {chain_name}") | |
| using_token_address = self.TOKENS[using_token][chain_name] | |
| # Create the path for the swap | |
| path = [] | |
| if token_address: | |
| path.append(token_address) | |
| if using_token_address: | |
| path.append(using_token_address) | |
| # Get the router contract | |
| router_address = Web3.to_checksum_address("0x327Df1E6de05895d2ab08513aaDD9313Fe505d86") | |
| router_contract = self.web3_connection.eth.contract( | |
| address=router_address, | |
| abi=ROUTER_ABI | |
| ) | |
| # Calculate the price | |
| if path: | |
| amount_in_wei = self.web3_connection.to_wei(amount, 'ether') | |
| amounts = router_contract.functions.getAmountsOut( | |
| amount_in_wei, | |
| path | |
| ).call() | |
| return self.web3_connection.from_wei(amounts[-1], 'ether') | |
| return amount | |
| except Exception as e: | |
| raise ValueError(f"Failed to get price: {str(e)}") | |
| def handle_buy_command(self, command): | |
| """Handle the buy command""" | |
| global CHAIN_INFO, TOKENS | |
| try: | |
| # Parse the command | |
| params = self.parse_transaction_intent(command) | |
| if not params or params['action'] != 'buy': | |
| raise ValueError("Invalid buy command") | |
| # Get required parameters | |
| token_name = params['token'] | |
| chain_name = params['chain'] | |
| amount = params['amount'] | |
| using_token = params['using_token'] | |
| if not all([token_name, chain_name, amount, using_token]): | |
| raise ValueError("Missing required parameters") | |
| # Connect to chain first | |
| if not self.connect_to_chain(chain_name): | |
| raise ConnectionError(f"Failed to connect to {chain_name}") | |
| # Get price quote | |
| price = self.get_price(token_name, chain_name, amount, using_token) | |
| print(f"Price quote: {price} {using_token}") | |
| except Exception as e: | |
| print(Fore.RED + f"Error: {str(e)}") | |
| def handle_sell_command(self, command): | |
| """Handle the sell command""" | |
| try: | |
| # Parse the command | |
| params = self.parse_transaction_intent(command) | |
| if not params or params['action'] != 'sell': | |
| raise ValueError("Invalid sell command") | |
| # Get required parameters and continue with similar updates | |
| # ... (implement similar to handle_buy_command) | |
| except Exception as e: | |
| print(Fore.RED + f"Error: {str(e)}") | |
| def _update_python_file_chain_removal(self, chain_name): | |
| """Remove chain entry from Python file""" | |
| try: | |
| current_file = os.path.abspath(__file__) | |
| with open(current_file, 'r', encoding='utf-8') as f: | |
| content = f.readlines() | |
| # Find CHAIN_INFO dictionary | |
| start_index = None | |
| end_index = None | |
| chain_found = False | |
| brace_count = 0 | |
| for i, line in enumerate(content): | |
| if line.strip().startswith('CHAIN_INFO = {'): | |
| start_index = i | |
| continue | |
| if start_index is not None: | |
| if f"'{chain_name}': {{" in line: | |
| chain_start = i | |
| chain_found = True | |
| brace_count = 1 | |
| continue | |
| if chain_found: | |
| if '{' in line: | |
| brace_count += 1 | |
| if '}' in line: | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| chain_end = i + 1 | |
| break | |
| if not chain_found: | |
| raise ValueError(f"Chain {chain_name} entry not found in file") | |
| # Check if this is the last entry | |
| is_last_entry = True | |
| for i in range(chain_end, len(content)): | |
| if "': {" in content[i]: | |
| is_last_entry = False | |
| break | |
| # If it's not the last entry, keep the comma from the current entry | |
| # If it is the last entry, remove the comma from the previous entry | |
| if is_last_entry and chain_start > 0: | |
| # Find previous entry's closing brace | |
| for i in range(chain_start - 1, -1, -1): | |
| if content[i].strip() == '},': | |
| content[i] = content[i].replace('},', '}') | |
| break | |
| # Remove the chain entry | |
| del content[chain_start:chain_end] | |
| with open(current_file, 'w', encoding='utf-8') as f: | |
| f.writelines(content) | |
| except Exception as e: | |
| raise ValueError(f"Failed to update Python file: {str(e)}") | |
| # Test loop | |
| def main(): | |
| """Main test loop for the Web3 transaction functionality.""" | |
| print(Fore.LIGHTCYAN_EX + "\n" + "β" * 80) | |
| print(Fore.LIGHTGREEN_EX + """ | |
| βββββββββββββββββββββββββββββββββββββββββββββ | |
| β Web3 Transaction Test Interface β | |
| βββββββββββββββββββββββββββββββββββββββββββββ | |
| """) | |
| print(Fore.LIGHTCYAN_EX + "β" * 80) | |
| print(Fore.LIGHTYELLOW_EX + "\nAvailable commands:") | |
| print(" /0x buy <amount> <token> using <token> on <chain>") | |
| print(" /0x sell <amount> <token> for <token> on <chain>") | |
| print(" /0x send <chain> <token> <amount> to <recipient>") | |
| print(" /0x gas <low|medium|high>") | |
| print(" /0x receive") | |
| print("\nConfiguration commands:") | |
| print(" /0x new chain") | |
| print(" /0x new token") | |
| print(" /0x forget chain <name>") | |
| print(" /0x forget token <name>") | |
| print(Fore.LIGHTCYAN_EX + "β" * 80) | |
| # Initialize Web3Handler with test user data | |
| test_users = { | |
| 'Ross': { | |
| 'full_name': 'Ross Peili', | |
| 'call_name': 'Ross', | |
| 'public0x': '0x89A7f83Db9C1919B89370182002ffE5dfFc03e21' | |
| } | |
| } | |
| handler = Web3Handler(test_users) | |
| while True: | |
| try: | |
| command = input(f"\n{Fore.GREEN}Enter command: {Style.RESET_ALL}") | |
| if command.lower() in ['exit', 'quit', 'q']: | |
| print(f"{Fore.LIGHTGREEN_EX}[SYSTEM] Exiting Web3 Transaction Interface...{Style.RESET_ALL}") | |
| break | |
| if not command.strip(): | |
| continue | |
| if command.startswith('/0x'): | |
| handler.handle_0x_command(command, False, False, None) | |
| else: | |
| print(Fore.RED + "Invalid command. Use /0x followed by: buy, sell, send, gas, or receive") | |
| except KeyboardInterrupt: | |
| print(f"\n{Fore.LIGHTGREEN_EX}[SYSTEM] Exiting Web3 Transaction Interface...{Style.RESET_ALL}") | |
| break | |
| except Exception as e: | |
| print(f"{Fore.RED}[ERROR] {str(e)}{Style.RESET_ALL}") | |
| if __name__ == "__main__": | |
| main() |