Spaces:
Build error
Build error
| import requests | |
| import json | |
| import time | |
| import logging | |
| from datetime import datetime | |
| import pandas as pd | |
| from typing import Dict, List, Optional, Union, Any | |
| class ArbiscanClient: | |
| """ | |
| Client to interact with the Arbiscan API for fetching on-chain data from Arbitrum | |
| """ | |
| def __init__(self, api_key: str): | |
| self.api_key = api_key | |
| self.base_url = "https://api.arbiscan.io/api" | |
| self.rate_limit_delay = 0.2 # Delay between API calls to avoid rate limiting (200ms) | |
| # Add caching to improve performance | |
| self._transaction_cache = {} | |
| self._last_api_call_time = 0 | |
| # Configure debug logging - set to True for verbose output, False for minimal output | |
| self.verbose_debug = False | |
| def _make_request(self, params: Dict[str, str]) -> Dict[str, Any]: | |
| """ | |
| Make a request to the Arbiscan API with rate limiting | |
| """ | |
| params["apikey"] = self.api_key | |
| # Implement rate limiting | |
| current_time = time.time() | |
| time_since_last_call = current_time - self._last_api_call_time | |
| if time_since_last_call < self.rate_limit_delay: | |
| time.sleep(self.rate_limit_delay - time_since_last_call) | |
| self._last_api_call_time = time.time() | |
| try: | |
| # Log the request details but only in verbose mode | |
| if self.verbose_debug: | |
| debug_params = params.copy() | |
| debug_params.pop("apikey", None) | |
| logging.debug(f"API Request: {self.base_url}") | |
| logging.debug(f"Params: {json.dumps(debug_params, indent=2)}") | |
| response = requests.get(self.base_url, params=params) | |
| # Print response status and URL only in verbose mode | |
| if self.verbose_debug: | |
| logging.debug(f"Response Status: {response.status_code}") | |
| logging.debug(f"Full URL: {response.url.replace(self.api_key, 'API_KEY_REDACTED')}") | |
| response.raise_for_status() | |
| # Parse the JSON response | |
| json_data = response.json() | |
| # Log the response structure but only in verbose mode | |
| if self.verbose_debug: | |
| result_preview = str(json_data.get('result', ''))[:100] + '...' if len(str(json_data.get('result', ''))) > 100 else str(json_data.get('result', '')) | |
| logging.debug(f"Response Status: {json_data.get('status')}") | |
| logging.debug(f"Response Message: {json_data.get('message', 'No message')}") | |
| logging.debug(f"Result Preview: {result_preview}") | |
| # Check for API-level errors in the response | |
| status = json_data.get('status') | |
| message = json_data.get('message', 'No message') | |
| if status == '0' and message != 'No transactions found': | |
| logging.warning(f"API Error: {message}") | |
| return json_data | |
| except requests.exceptions.HTTPError as e: | |
| logging.error(f"HTTP Error in API Request: {e.response.status_code}") | |
| raise | |
| except requests.exceptions.ConnectionError as e: | |
| logging.error(f"Connection Error in API Request: {str(e)}") | |
| raise | |
| except requests.exceptions.Timeout as e: | |
| logging.error(f"Timeout in API Request: {str(e)}") | |
| raise | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"API Request failed: {str(e)}") | |
| print(f"ERROR - URL: {self.base_url}") | |
| print(f"ERROR - Method: {params.get('module')}/{params.get('action')}") | |
| return {"status": "0", "message": f"Error: {str(e)}", "result": []} | |
| def get_eth_balance(self, address: str) -> float: | |
| """ | |
| Get the ETH balance of an address | |
| Args: | |
| address: Wallet address | |
| Returns: | |
| ETH balance as a float | |
| """ | |
| params = { | |
| "module": "account", | |
| "action": "balance", | |
| "address": address, | |
| "tag": "latest" | |
| } | |
| result = self._make_request(params) | |
| if result.get("status") == "1": | |
| # Convert wei to ETH | |
| wei_balance = int(result.get("result", "0")) | |
| eth_balance = wei_balance / 10**18 | |
| return eth_balance | |
| else: | |
| return 0.0 | |
| def get_token_balance(self, address: str, token_address: str) -> float: | |
| """ | |
| Get the token balance of an address for a specific token | |
| Args: | |
| address: Wallet address | |
| token_address: Token contract address | |
| Returns: | |
| Token balance as a float | |
| """ | |
| params = { | |
| "module": "account", | |
| "action": "tokenbalance", | |
| "address": address, | |
| "contractaddress": token_address, | |
| "tag": "latest" | |
| } | |
| result = self._make_request(params) | |
| if result.get("status") == "1": | |
| # Get token decimals and convert to proper amount | |
| decimals = self.get_token_decimals(token_address) | |
| raw_balance = int(result.get("result", "0")) | |
| token_balance = raw_balance / 10**decimals | |
| return token_balance | |
| else: | |
| return 0.0 | |
| def get_token_decimals(self, token_address: str) -> int: | |
| """ | |
| Get the number of decimals for a token | |
| Args: | |
| token_address: Token contract address | |
| Returns: | |
| Number of decimals (default: 18) | |
| """ | |
| params = { | |
| "module": "token", | |
| "action": "getToken", | |
| "contractaddress": token_address | |
| } | |
| result = self._make_request(params) | |
| if result.get("status") == "1": | |
| token_info = result.get("result", {}) | |
| return int(token_info.get("divisor", "18")) | |
| else: | |
| # Default to 18 decimals (most ERC-20 tokens) | |
| return 18 | |
| def get_token_transfers(self, | |
| address: str, | |
| contract_address: Optional[str] = None, | |
| start_block: int = 0, | |
| end_block: int = 99999999, | |
| page: int = 1, | |
| offset: int = 100, | |
| sort: str = "desc") -> List[Dict[str, Any]]: | |
| """ | |
| Get token transfers for an address | |
| Args: | |
| address: Wallet address | |
| contract_address: Optional token contract address to filter by | |
| start_block: Starting block number | |
| end_block: Ending block number | |
| page: Page number | |
| offset: Number of results per page | |
| sort: Sort order ("asc" or "desc") | |
| Returns: | |
| List of token transfers | |
| """ | |
| params = { | |
| "module": "account", | |
| "action": "tokentx", | |
| "address": address, | |
| "startblock": str(start_block), | |
| "endblock": str(end_block), | |
| "page": str(page), | |
| "offset": str(offset), | |
| "sort": sort | |
| } | |
| # Add contract address if specified | |
| if contract_address: | |
| params["contractaddress"] = contract_address | |
| result = self._make_request(params) | |
| if result.get("status") == "1": | |
| return result.get("result", []) | |
| else: | |
| message = result.get("message", "Unknown error") | |
| if "No transactions found" in message: | |
| return [] | |
| else: | |
| logging.warning(f"Error fetching token transfers: {message}") | |
| return [] | |
| def fetch_all_token_transfers(self, | |
| address: str, | |
| contract_address: Optional[str] = None, | |
| start_block: int = 0, | |
| end_block: int = 99999999, | |
| max_pages: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| Fetch all token transfers for an address, paginating through results | |
| Args: | |
| address: Wallet address | |
| contract_address: Optional token contract address to filter by | |
| start_block: Starting block number | |
| end_block: Ending block number | |
| max_pages: Maximum number of pages to fetch | |
| Returns: | |
| List of all token transfers | |
| """ | |
| all_transfers = [] | |
| offset = 100 # Results per page (API limit) | |
| for page in range(1, max_pages + 1): | |
| try: | |
| transfers = self.get_token_transfers( | |
| address=address, | |
| contract_address=contract_address, | |
| start_block=start_block, | |
| end_block=end_block, | |
| page=page, | |
| offset=offset | |
| ) | |
| # No more transfers, break the loop | |
| if not transfers: | |
| break | |
| all_transfers.extend(transfers) | |
| # If we got fewer results than the offset, we've reached the end | |
| if len(transfers) < offset: | |
| break | |
| except Exception as e: | |
| logging.error(f"Error fetching page {page} of token transfers: {str(e)}") | |
| break | |
| return all_transfers | |
| def fetch_whale_transactions(self, | |
| addresses: List[str], | |
| token_address: Optional[str] = None, | |
| min_token_amount: Optional[float] = None, | |
| min_usd_value: Optional[float] = None, | |
| start_block: int = 0, | |
| end_block: int = 99999999, | |
| max_pages: int = 10) -> pd.DataFrame: | |
| """ | |
| Fetch whale transactions for a list of addresses | |
| Args: | |
| addresses: List of wallet addresses | |
| token_address: Optional token contract address to filter by | |
| min_token_amount: Minimum token amount to be considered a whale transaction | |
| min_usd_value: Minimum USD value to be considered a whale transaction | |
| start_block: Starting block number | |
| end_block: Ending block number | |
| max_pages: Maximum number of pages to fetch per address (default: 10) | |
| Returns: | |
| DataFrame of whale transactions | |
| """ | |
| try: | |
| # Create a cache key based on parameters | |
| cache_key = f"{','.join(addresses)}_{token_address}_{min_token_amount}_{min_usd_value}_{start_block}_{end_block}_{max_pages}" | |
| # Check if we have cached results | |
| if cache_key in self._transaction_cache: | |
| logging.info(f"Using cached transactions for {len(addresses)} addresses") | |
| return self._transaction_cache[cache_key] | |
| all_transfers = [] | |
| logging.info(f"Fetching whale transactions for {len(addresses)} addresses") | |
| logging.info(f"Token address filter: {token_address if token_address else 'None'}") | |
| logging.info(f"Min token amount: {min_token_amount}") | |
| logging.info(f"Min USD value: {min_usd_value}") | |
| for i, address in enumerate(addresses): | |
| try: | |
| logging.info(f"Processing address {i+1}/{len(addresses)}: {address}") | |
| # Create address-specific cache key | |
| addr_cache_key = f"{address}_{token_address}_{start_block}_{end_block}_{max_pages}" | |
| # Check if we have cached results for this specific address | |
| if addr_cache_key in self._transaction_cache: | |
| transfers = self._transaction_cache[addr_cache_key] | |
| logging.info(f"Using cached {len(transfers)} transfers for address {address}") | |
| else: | |
| transfers = self.fetch_all_token_transfers( | |
| address=address, | |
| contract_address=token_address, | |
| start_block=start_block, | |
| end_block=end_block, | |
| max_pages=max_pages | |
| ) | |
| logging.info(f"Found {len(transfers)} transfers for address {address}") | |
| # Cache the results for this address | |
| self._transaction_cache[addr_cache_key] = transfers | |
| all_transfers.extend(transfers) | |
| except Exception as e: | |
| logging.error(f"Failed to fetch transactions for address {address}: {str(e)}") | |
| continue | |
| logging.info(f"Total transfers found: {len(all_transfers)}") | |
| if not all_transfers: | |
| logging.warning("No whale transactions found for the specified addresses") | |
| return pd.DataFrame() | |
| # Convert to DataFrame | |
| logging.info("Converting transfers to DataFrame") | |
| df = pd.DataFrame(all_transfers) | |
| # Log the column names | |
| logging.info(f"DataFrame created with {len(df)} rows and {len(df.columns)} columns") | |
| logging.info(f"Columns: {', '.join(df.columns[:5])}...") | |
| # Apply token amount filter if specified | |
| if min_token_amount is not None: | |
| logging.info(f"Applying min token amount filter: {min_token_amount}") | |
| # Convert to float and then filter | |
| df['tokenAmount'] = df['value'].astype(float) / (10 ** df['tokenDecimal'].astype(int)) | |
| df = df[df['tokenAmount'] >= min_token_amount] | |
| logging.info(f"After token amount filtering: {len(df)}/{len(all_transfers)} rows remain") | |
| # Apply USD value filter if specified (this would require price data) | |
| if min_usd_value is not None and 'tokenAmount' in df.columns: | |
| logging.info(f"USD value filtering is not implemented yet") | |
| # This would require token price data, which we don't have yet | |
| # df = df[df['usd_value'] >= min_usd_value] | |
| # Convert timestamp to datetime | |
| if 'timeStamp' in df.columns: | |
| logging.info("Converting timestamp to datetime") | |
| try: | |
| df['timeStamp'] = pd.to_datetime(df['timeStamp'].astype(float), unit='s') | |
| except Exception as e: | |
| logging.error(f"Error converting timestamp: {str(e)}") | |
| logging.info(f"Final DataFrame has {len(df)} rows") | |
| # Cache the final result | |
| self._transaction_cache[cache_key] = df | |
| return df | |
| except Exception as e: | |
| logging.error(f"Error fetching whale transactions: {str(e)}") | |
| return pd.DataFrame() | |
| def get_internal_transactions(self, | |
| address: str, | |
| start_block: int = 0, | |
| end_block: int = 99999999, | |
| page: int = 1, | |
| offset: int = 100, | |
| sort: str = "desc") -> List[Dict[str, Any]]: | |
| """ | |
| Get internal transactions for an address | |
| Args: | |
| address: Wallet address | |
| start_block: Starting block number | |
| end_block: Ending block number | |
| page: Page number | |
| offset: Number of results per page | |
| sort: Sort order ("asc" or "desc") | |
| Returns: | |
| List of internal transactions | |
| """ | |
| params = { | |
| "module": "account", | |
| "action": "txlistinternal", | |
| "address": address, | |
| "startblock": str(start_block), | |
| "endblock": str(end_block), | |
| "page": str(page), | |
| "offset": str(offset), | |
| "sort": sort | |
| } | |
| result = self._make_request(params) | |
| if result.get("status") == "1": | |
| return result.get("result", []) | |
| else: | |
| message = result.get("message", "Unknown error") | |
| if "No transactions found" in message: | |
| return [] | |
| else: | |
| logging.warning(f"Error fetching internal transactions: {message}") | |
| return [] | |
| class GeminiClient: | |
| """ | |
| Client to interact with the Gemini API for fetching token prices | |
| """ | |
| def __init__(self, api_key: str): | |
| self.api_key = api_key | |
| self.base_url = "https://api.gemini.com/v1" | |
| # Add caching to avoid repetitive API calls | |
| self._price_cache = {} | |
| # Track API errors to avoid flooding logs | |
| self._error_count = {} | |
| self._last_api_call = 0 # For rate limiting | |
| def get_current_price(self, symbol: str) -> Optional[float]: | |
| """ | |
| Get the current price of a token | |
| Args: | |
| symbol: Token symbol (e.g., "ETHUSD") | |
| Returns: | |
| Current price as a float or None if not found | |
| """ | |
| try: | |
| url = f"{self.base_url}/pubticker/{symbol}" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| return float(data.get("last", 0)) | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"Error fetching price from Gemini API: {e}") | |
| return None | |
| def get_historical_prices(self, | |
| symbol: str, | |
| start_time: datetime, | |
| end_time: datetime) -> Optional[pd.DataFrame]: | |
| """ | |
| Get historical prices for a token within a time range | |
| Args: | |
| symbol: Token symbol (e.g., "ETHUSD") | |
| start_time: Start datetime | |
| end_time: End datetime | |
| Returns: | |
| DataFrame of historical prices with timestamps | |
| """ | |
| # Implement simple rate limiting | |
| current_time = time.time() | |
| if current_time - self._last_api_call < 0.05: # 50ms minimum between calls | |
| time.sleep(0.05) | |
| self._last_api_call = current_time | |
| # Create a cache key based on the parameters | |
| cache_key = f"{symbol}_{int(start_time.timestamp())}_{int(end_time.timestamp())}" | |
| # Check if we already have this data cached | |
| if cache_key in self._price_cache: | |
| return self._price_cache[cache_key] | |
| try: | |
| # Convert datetime to milliseconds | |
| start_ms = int(start_time.timestamp() * 1000) | |
| end_ms = int(end_time.timestamp() * 1000) | |
| url = f"{self.base_url}/trades/{symbol}" | |
| params = { | |
| "limit_trades": 500, | |
| "timestamp": start_ms | |
| } | |
| # Check if we've seen too many errors for this symbol | |
| error_key = f"error_{symbol}" | |
| if self._error_count.get(error_key, 0) > 10: | |
| # If we've already had too many errors for this symbol, don't try again | |
| return None | |
| response = requests.get(url, params=params) | |
| response.raise_for_status() | |
| trades = response.json() | |
| # Reset error count on success | |
| self._error_count[error_key] = 0 | |
| # Filter trades within the time range | |
| filtered_trades = [ | |
| trade for trade in trades | |
| if start_ms <= trade.get("timestampms", 0) <= end_ms | |
| ] | |
| if not filtered_trades: | |
| # Cache negative result to avoid future lookups | |
| self._price_cache[cache_key] = None | |
| return None | |
| # Convert to DataFrame | |
| df = pd.DataFrame(filtered_trades) | |
| # Convert timestamp to datetime | |
| df['timestamp'] = pd.to_datetime(df['timestampms'], unit='ms') | |
| # Select and rename columns | |
| result_df = df[['timestamp', 'price', 'amount']].copy() | |
| result_df.columns = ['Timestamp', 'Price', 'Amount'] | |
| # Convert price to float | |
| result_df['Price'] = result_df['Price'].astype(float) | |
| # Cache the result | |
| self._price_cache[cache_key] = result_df | |
| return result_df | |
| except requests.exceptions.HTTPError as e: | |
| # Handle HTTP errors more efficiently | |
| self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 | |
| # Only log the first few occurrences of each error | |
| if self._error_count[error_key] <= 3: | |
| logging.warning(f"HTTP error fetching price for {symbol}: {e.response.status_code}") | |
| return None | |
| except Exception as e: | |
| # For other errors, use a similar approach | |
| self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 | |
| if self._error_count[error_key] <= 3: | |
| logging.error(f"Error fetching prices for {symbol}: {str(e)}") | |
| return None | |
| def get_price_at_time(self, | |
| symbol: str, | |
| timestamp: datetime) -> Optional[float]: | |
| """ | |
| Get the approximate price of a token at a specific time | |
| Args: | |
| symbol: Token symbol (e.g., "ETHUSD") | |
| timestamp: Target datetime | |
| Returns: | |
| Price at the specified time as a float or None if not found | |
| """ | |
| # Look for prices 5 minutes before and after the target time | |
| start_time = timestamp - pd.Timedelta(minutes=5) | |
| end_time = timestamp + pd.Timedelta(minutes=5) | |
| prices_df = self.get_historical_prices(symbol, start_time, end_time) | |
| if prices_df is None or prices_df.empty: | |
| return None | |
| # Find the closest price | |
| prices_df['time_diff'] = abs(prices_df['Timestamp'] - timestamp) | |
| closest_price = prices_df.loc[prices_df['time_diff'].idxmin(), 'Price'] | |
| return closest_price | |
| def get_price_impact(self, | |
| symbol: str, | |
| transaction_time: datetime, | |
| lookback_minutes: int = 5, | |
| lookahead_minutes: int = 5) -> Dict[str, Any]: | |
| """ | |
| Analyze the price impact before and after a transaction | |
| Args: | |
| symbol: Token symbol (e.g., "ETHUSD") | |
| transaction_time: Transaction datetime | |
| lookback_minutes: Minutes to look back before the transaction | |
| lookahead_minutes: Minutes to look ahead after the transaction | |
| Returns: | |
| Dictionary with price impact metrics | |
| """ | |
| start_time = transaction_time - pd.Timedelta(minutes=lookback_minutes) | |
| end_time = transaction_time + pd.Timedelta(minutes=lookahead_minutes) | |
| prices_df = self.get_historical_prices(symbol, start_time, end_time) | |
| if prices_df is None or prices_df.empty: | |
| return { | |
| "pre_price": None, | |
| "post_price": None, | |
| "impact_pct": None, | |
| "prices_df": None | |
| } | |
| # Find pre and post transaction prices | |
| pre_prices = prices_df[prices_df['Timestamp'] < transaction_time] | |
| post_prices = prices_df[prices_df['Timestamp'] >= transaction_time] | |
| pre_price = pre_prices['Price'].iloc[-1] if not pre_prices.empty else None | |
| post_price = post_prices['Price'].iloc[0] if not post_prices.empty else None | |
| # Calculate impact percentage | |
| impact_pct = None | |
| if pre_price is not None and post_price is not None: | |
| impact_pct = ((post_price - pre_price) / pre_price) * 100 | |
| return { | |
| "pre_price": pre_price, | |
| "post_price": post_price, | |
| "impact_pct": impact_pct, | |
| "prices_df": prices_df | |
| } | |
| def fetch_historical_prices(self, token_symbol: str, timestamp) -> Dict[str, Any]: | |
| """Fetch historical price data for a token at a specific timestamp | |
| Args: | |
| token_symbol: Token symbol (e.g., "ETH") | |
| timestamp: Timestamp (can be int, float, datetime, or pandas Timestamp) | |
| Returns: | |
| Dictionary with price data | |
| """ | |
| # Convert timestamp to integer if it's not already | |
| timestamp_value = 0 | |
| try: | |
| # Handle different timestamp types | |
| if isinstance(timestamp, (int, float)): | |
| timestamp_value = int(timestamp) | |
| elif isinstance(timestamp, pd.Timestamp): | |
| timestamp_value = int(timestamp.timestamp()) | |
| elif isinstance(timestamp, datetime): | |
| timestamp_value = int(timestamp.timestamp()) | |
| elif isinstance(timestamp, str): | |
| # Try to parse string as timestamp | |
| dt = pd.to_datetime(timestamp) | |
| timestamp_value = int(dt.timestamp()) | |
| else: | |
| # Default to current time if invalid type | |
| logging.warning(f"Invalid timestamp type: {type(timestamp)}, using current time") | |
| timestamp_value = int(time.time()) | |
| except Exception as e: | |
| logging.warning(f"Error converting timestamp {timestamp}: {str(e)}, using current time") | |
| timestamp_value = int(time.time()) | |
| # Check cache first | |
| cache_key = f"{token_symbol}_{timestamp_value}" | |
| if cache_key in self._price_cache: | |
| return self._price_cache[cache_key] | |
| # Implement rate limiting | |
| current_time = time.time() | |
| if current_time - self._last_api_call < 0.05: # 50ms minimum between calls | |
| time.sleep(0.05) | |
| self._last_api_call = current_time | |
| # Check error count for this symbol | |
| error_key = f"error_{token_symbol}" | |
| if self._error_count.get(error_key, 0) > 10: | |
| # Too many errors, return cached failure | |
| return { | |
| 'symbol': token_symbol, | |
| 'timestamp': timestamp_value, | |
| 'price': None, | |
| 'status': 'error', | |
| 'error': 'Too many previous errors' | |
| } | |
| try: | |
| url = f"{self.base_url}/trades/{token_symbol}USD" | |
| params = { | |
| 'limit_trades': 500, | |
| 'timestamp': timestamp_value * 1000 # Convert to milliseconds | |
| } | |
| response = requests.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Reset error count on success | |
| self._error_count[error_key] = 0 | |
| # Calculate average price from recent trades | |
| if data: | |
| prices = [float(trade['price']) for trade in data] | |
| avg_price = sum(prices) / len(prices) | |
| result = { | |
| 'symbol': token_symbol, | |
| 'timestamp': timestamp_value, | |
| 'price': avg_price, | |
| 'status': 'success' | |
| } | |
| # Cache success | |
| self._price_cache[cache_key] = result | |
| return result | |
| else: | |
| result = { | |
| 'symbol': token_symbol, | |
| 'timestamp': timestamp_value, | |
| 'price': None, | |
| 'status': 'no_data' | |
| } | |
| # Cache no data | |
| self._price_cache[cache_key] = result | |
| return result | |
| except requests.exceptions.HTTPError as e: | |
| # Handle HTTP errors efficiently | |
| self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 | |
| # Only log first few occurrences | |
| if self._error_count[error_key] <= 3: | |
| logging.warning(f"HTTP error fetching price for {token_symbol}: {e.response.status_code}") | |
| elif self._error_count[error_key] == 10: | |
| logging.warning(f"Suppressing further logs for {token_symbol} errors") | |
| result = { | |
| 'symbol': token_symbol, | |
| 'timestamp': timestamp, | |
| 'price': None, | |
| 'status': 'error', | |
| 'error': f"HTTP {e.response.status_code}" | |
| } | |
| self._price_cache[cache_key] = result | |
| return result | |
| except Exception as e: | |
| # For other errors | |
| self._error_count[error_key] = self._error_count.get(error_key, 0) + 1 | |
| if self._error_count[error_key] <= 3: | |
| logging.error(f"Error fetching prices for {token_symbol}: {str(e)}") | |
| result = { | |
| 'symbol': token_symbol, | |
| 'timestamp': timestamp_value, | |
| 'price': None, | |
| 'status': 'error', | |
| 'error': str(e) | |
| } | |
| self._price_cache[cache_key] = result | |
| return result | |