ucg_v1 / components /functions.py
Junaidb's picture
Update components/functions.py
6b208f4 verified
import base58
from eth_utils import is_checksum_address, is_hex_address
from web3 import Web3
import requests
from solders.keypair import Keypair
import base58
from base58 import b58decode
from eth_account import Account
from eth_account.messages import encode_defunct
from solana.rpc.api import Client
from solders.keypair import Keypair
from solders.pubkey import Pubkey
from solders.system_program import transfer, TransferParams
from solders.transaction import Transaction
from solders.message import Message
def NetworkResolver(state):
address = state["address"]
# 1. Check Ethereum
if is_hex_address(address):
return {
"chain": "ethereum"
}
# 2. Check Solana
try:
if len(base58.b58decode(address)) == 32:
return {
"chain": "solana"
}
except:
pass
return {"chain": "unknown"}
def InfuraRPC(state):
infura="https://mainnet.infura.io/v3/acaf5aa43726452185384862eda2b38a"
apikey="acaf5aa43726452185384862eda2b38a"
w3 = Web3(Web3.HTTPProvider(infura))
wei = w3.eth.get_balance(state["address"])
eth = w3.from_wei(wei, "ether")
return {"eth_balance":eth}
def HeliusAPI(state):
url = f"https://mainnet.helius-rpc.com/?api-key=4e833ada-d32c-48c5-b020-c11b2253f25b"
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getBalance",
"params": [state["address"]]
}
res = requests.post(url, json=payload)
data = res.json()
lamports = data.get("result", {}).get("value",0)
sol = lamports / 1e9
return {"sol_balance":sol}
def EthereumSigner(state):
map={
"pkey":"b0e143aa6764ba2a5673b4bdc3891bd92bfceb60ca9aca810d11eca6362a074e"
}
pkey=None
if state["address"]=="0xA34A13e95CE831953e598689e864a97B7DE949eb":
pkey=map["pkey"]
try:
msg = encode_defunct(text=state["payload"])
#private_key="b0e143aa6764ba2a5673b4bdc3891bd92bfceb60ca9aca810d11eca6362a074e"
signed = Account.sign_message(msg, pkey)
return {"signature":signed.signature.hex()}
except Exception as e:
return {"signature":str(e)}
def SolanaSigner(state):
map={
"pkey": "3iGdffgcjsdjfRmQ4Jv5bdCaWZGdfbyT8k7vJiPhrghiCMYiqehGqA7QRmzeVpKJD6GRqXFkvj5nU3ZiD9JJQP97"
}
pkey=None
if state["address"]=="75BgVvMNZ8Es1JffEDNYxWNVE2yoBTndLxViARhYWPF":
pkey=map["pkey"]
try:
secret = base58.b58decode(pkey)
kp = Keypair.from_bytes(secret)
msg = state["payload"].encode()
sig = kp.sign_message(msg)
return {"signature":base58.b58encode(bytes(sig)).decode()}
except Exception as e:
return {"signature":str(e)}
def EthereumPayment(state):
map={
"pkey":"b0e143aa6764ba2a5673b4bdc3891bd92bfceb60ca9aca810d11eca6362a074e"
}
pkey=None
if state["address"]=="0xA34A13e95CE831953e598689e864a97B7DE949eb":
pkey=map["pkey"]
w3 = Web3(Web3.HTTPProvider(
"https://sepolia.infura.io/v3/acaf5aa43726452185384862eda2b38a"
))
sender = Web3.to_checksum_address(state["address"])
recipient = Web3.to_checksum_address(state["to"])
amount_eth = float(state["amount"])
private_key = pkey
nonce = w3.eth.get_transaction_count(sender)
tx = {
"to": recipient,
"value": w3.to_wei(amount_eth, "ether"),
"gas": 21000,
"gasPrice": w3.eth.gas_price,
"nonce": nonce,
"chainId": 11155111 # Sepolia
}
signed = w3.eth.account.sign_transaction(tx, private_key)
tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction)
return {
"tx_hash": tx_hash.hex(),
}
def SolanaPayment(state):
client = Client("https://api.devnet.solana.com")
map={
"pkey": "3iGdffgcjsdjfRmQ4Jv5bdCaWZGdfbyT8k7vJiPhrghiCMYiqehGqA7QRmzeVpKJD6GRqXFkvj5nU3ZiD9JJQP97"
}
pkey=None
if state["address"]=="75BgVvMNZ8Es1JffEDNYxWNVE2yoBTndLxViARhYWPF":
pkey=map["pkey"]
secret = b58decode(pkey)
sender = Keypair.from_bytes(secret)
recipient = Pubkey.from_string(state["to"])
lamports = int(float(state["amount"]) * 1_000_000_000)
bh_resp = client.get_latest_blockhash()
blockhash = bh_resp.value.blockhash
#Build instruction
ix = transfer(
TransferParams(
from_pubkey=sender.pubkey(),
to_pubkey=recipient,
lamports=lamports
)
)
#Build message
msg = Message.new_with_blockhash(
[ix],
sender.pubkey(),
blockhash
)
#Build & sign transaction
tx = Transaction([sender], msg, blockhash)
#Send it
result = client.send_transaction(tx)
return {
"tx_hash": str(result.value)
}