arbintel / src /blockchain.py
AJAY KASU
Add tests, alerts, and blockchain tracker
81146df
import os
import logging
from web3 import Web3
from typing import Callable, Optional
import asyncio
logger = logging.getLogger(__name__)
class BlockchainTracker:
def __init__(self):
"""
Track whale movements on Polygon (Polymarket's native chain).
"""
# Alchemy or Infura RPC URL for Polygon Mainnet
self.rpc_url = os.environ.get("POLYGON_RPC_URL", "https://polygon-rpc.com")
self.w3 = Web3(Web3.HTTPProvider(self.rpc_url))
if self.w3.is_connected():
logger.info(f"Connected to Polygon Network via {self.rpc_url}")
else:
logger.error("Failed to connect to Polygon Network")
# USDC Token Contract on Polygon
self.usdc_address = self.w3.to_checksum_address("0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174")
# Polymarket CTF Exchange Contract
self.poly_exchange = self.w3.to_checksum_address("0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E")
# Standard ERC20 Transfer ABI (reduced form)
self.erc20_abi = [
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "from", "type": "address"},
{"indexed": True, "name": "to", "type": "address"},
{"indexed": False, "name": "value", "type": "uint256"}
],
"name": "Transfer",
"type": "event"
}
]
self.usdc_contract = self.w3.eth.contract(address=self.usdc_address, abi=self.erc20_abi)
async def watch_whale_deposits(self, threshold_usdc: float = 100000.0, callback: Optional[Callable] = None):
"""
Poll for large USDC transfers (whales) flowing into Polymarket.
Threshold defaults to 100k USDC (USDC has 6 decimals on Polygon).
"""
threshold_wei = int(threshold_usdc * (10 ** 6))
# Create a filter for Transfer events to the Polymarket exchange
# Note: In production, relying on direct get_logs polling can be rate limited.
# WebSockets (WSS) and async Web3 is preferred.
filter_params = {
'address': self.usdc_address,
'topics': [
self.w3.keccak(text="Transfer(address,address,uint256)").hex(),
None, # from any
"0x000000000000000000000000" + self.poly_exchange[2:] # to polymarket
]
}
latest_block = self.w3.eth.block_number
while True:
try:
# Fetch logs from latest block
logs = self.w3.eth.get_logs({**filter_params, 'fromBlock': latest_block, 'toBlock': 'latest'})
for log in logs:
# value is not indexed, so it's in the data field
value = int(log['data'].hex(), 16)
if value >= threshold_wei:
usdc_amount = value / (10 ** 6)
sender = "0x" + log['topics'][1].hex()[-40:]
logger.info(f"πŸ‹ WHALE ALERT: {usdc_amount} USDC deposited to Polymarket by {sender}")
if callback:
callback(f"πŸ‹ WHALE ALERT", f"${usdc_amount:,.2f} deposited to Polymarket by `{sender}`")
latest_block = self.w3.eth.block_number + 1
except Exception as e:
logger.error(f"Blockchain poll error: {e}")
await asyncio.sleep(15) # Poll every 15 seconds (Polygon block time is ~2s)