import os import logging from web3 import Web3 from typing import Callable, Optional import asyncio logger = logging.getLogger(__name__) class BlockchainTracker: def __init__(self): """ Track whale movements on Polygon (Polymarket's native chain). """ # Alchemy or Infura RPC URL for Polygon Mainnet self.rpc_url = os.environ.get("POLYGON_RPC_URL", "https://polygon-rpc.com") self.w3 = Web3(Web3.HTTPProvider(self.rpc_url)) if self.w3.is_connected(): logger.info(f"Connected to Polygon Network via {self.rpc_url}") else: logger.error("Failed to connect to Polygon Network") # USDC Token Contract on Polygon self.usdc_address = self.w3.to_checksum_address("0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174") # Polymarket CTF Exchange Contract self.poly_exchange = self.w3.to_checksum_address("0x4bFb41d5B3570DeFd03C39a9A4D8dE6Bd8B8982E") # Standard ERC20 Transfer ABI (reduced form) self.erc20_abi = [ { "anonymous": False, "inputs": [ {"indexed": True, "name": "from", "type": "address"}, {"indexed": True, "name": "to", "type": "address"}, {"indexed": False, "name": "value", "type": "uint256"} ], "name": "Transfer", "type": "event" } ] self.usdc_contract = self.w3.eth.contract(address=self.usdc_address, abi=self.erc20_abi) async def watch_whale_deposits(self, threshold_usdc: float = 100000.0, callback: Optional[Callable] = None): """ Poll for large USDC transfers (whales) flowing into Polymarket. Threshold defaults to 100k USDC (USDC has 6 decimals on Polygon). """ threshold_wei = int(threshold_usdc * (10 ** 6)) # Create a filter for Transfer events to the Polymarket exchange # Note: In production, relying on direct get_logs polling can be rate limited. # WebSockets (WSS) and async Web3 is preferred. filter_params = { 'address': self.usdc_address, 'topics': [ self.w3.keccak(text="Transfer(address,address,uint256)").hex(), None, # from any "0x000000000000000000000000" + self.poly_exchange[2:] # to polymarket ] } latest_block = self.w3.eth.block_number while True: try: # Fetch logs from latest block logs = self.w3.eth.get_logs({**filter_params, 'fromBlock': latest_block, 'toBlock': 'latest'}) for log in logs: # value is not indexed, so it's in the data field value = int(log['data'].hex(), 16) if value >= threshold_wei: usdc_amount = value / (10 ** 6) sender = "0x" + log['topics'][1].hex()[-40:] logger.info(f"🐋 WHALE ALERT: {usdc_amount} USDC deposited to Polymarket by {sender}") if callback: callback(f"🐋 WHALE ALERT", f"${usdc_amount:,.2f} deposited to Polymarket by `{sender}`") latest_block = self.w3.eth.block_number + 1 except Exception as e: logger.error(f"Blockchain poll error: {e}") await asyncio.sleep(15) # Poll every 15 seconds (Polygon block time is ~2s)