Spaces:
Build error
Build error
| """ | |
| Properly implemented tools for the WhaleAnalysisCrewSystem | |
| """ | |
| import json | |
| import pandas as pd | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional, Type | |
| from pydantic import BaseModel, Field | |
| import logging | |
| from modules.api_client import ArbiscanClient, GeminiClient | |
| from modules.data_processor import DataProcessor | |
| from langchain.tools import BaseTool | |
| class GetTokenTransfersInput(BaseModel): | |
| """Input for the get_token_transfers tool.""" | |
| address: str = Field(..., description="Wallet address to query") | |
| contract_address: Optional[str] = Field(None, description="Optional token contract address to filter by") | |
| # Global clients that will be used by all tools | |
| _GLOBAL_ARBISCAN_CLIENT = None | |
| _GLOBAL_GEMINI_CLIENT = None | |
| _GLOBAL_DATA_PROCESSOR = None | |
| def set_global_clients(arbiscan_client=None, gemini_client=None, data_processor=None): | |
| """Set global client instances that will be used by all tools""" | |
| global _GLOBAL_ARBISCAN_CLIENT, _GLOBAL_GEMINI_CLIENT, _GLOBAL_DATA_PROCESSOR | |
| if arbiscan_client: | |
| _GLOBAL_ARBISCAN_CLIENT = arbiscan_client | |
| if gemini_client: | |
| _GLOBAL_GEMINI_CLIENT = gemini_client | |
| if data_processor: | |
| _GLOBAL_DATA_PROCESSOR = data_processor | |
| class ArbiscanGetTokenTransfersTool(BaseTool): | |
| """Tool for fetching token transfers from Arbiscan.""" | |
| name = "arbiscan_get_token_transfers" | |
| description = "Get ERC-20 token transfers for a specific address" | |
| args_schema: Type[BaseModel] = GetTokenTransfersInput | |
| def __init__(self, arbiscan_client=None): | |
| super().__init__() | |
| # Store reference to client if provided, otherwise we'll use global instance | |
| if arbiscan_client: | |
| set_global_clients(arbiscan_client=arbiscan_client) | |
| def _run(self, address: str, contract_address: Optional[str] = None) -> str: | |
| global _GLOBAL_ARBISCAN_CLIENT | |
| if not _GLOBAL_ARBISCAN_CLIENT: | |
| return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."}) | |
| try: | |
| transfers = _GLOBAL_ARBISCAN_CLIENT.get_token_transfers( | |
| address=address, | |
| contract_address=contract_address | |
| ) | |
| return json.dumps(transfers) | |
| except Exception as e: | |
| logging.error(f"Error in ArbiscanGetTokenTransfersTool: {str(e)}") | |
| return json.dumps({"error": str(e)}) | |
| class GetNormalTransactionsInput(BaseModel): | |
| """Input for the get_normal_transactions tool.""" | |
| address: str = Field(..., description="Wallet address to query") | |
| class ArbiscanGetNormalTransactionsTool(BaseTool): | |
| """Tool for fetching normal transactions from Arbiscan.""" | |
| name = "arbiscan_get_normal_transactions" | |
| description = "Get normal transactions (ETH/ARB transfers) for a specific address" | |
| args_schema: Type[BaseModel] = GetNormalTransactionsInput | |
| def __init__(self, arbiscan_client=None): | |
| super().__init__() | |
| # Store reference to client if provided, otherwise we'll use global instance | |
| if arbiscan_client: | |
| set_global_clients(arbiscan_client=arbiscan_client) | |
| def _run(self, address: str, startblock: int = 0, endblock: int = 99999999, page: int = 1, offset: int = 10) -> str: | |
| global _GLOBAL_ARBISCAN_CLIENT | |
| if not _GLOBAL_ARBISCAN_CLIENT: | |
| return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."}) | |
| try: | |
| txs = _GLOBAL_ARBISCAN_CLIENT.get_normal_transactions( | |
| address=address, | |
| start_block=startblock, | |
| end_block=endblock, | |
| page=page, | |
| offset=offset | |
| ) | |
| return json.dumps(txs) | |
| except Exception as e: | |
| logging.error(f"Error in ArbiscanGetNormalTransactionsTool: {str(e)}") | |
| return json.dumps({"error": str(e)}) | |
| class GetInternalTransactionsInput(BaseModel): | |
| """Input for the get_internal_transactions tool.""" | |
| address: str = Field(..., description="Wallet address to query") | |
| class ArbiscanGetInternalTransactionsTool(BaseTool): | |
| """Tool for fetching internal transactions from Arbiscan.""" | |
| name = "arbiscan_get_internal_transactions" | |
| description = "Get internal transactions for a specific address" | |
| args_schema: Type[BaseModel] = GetInternalTransactionsInput | |
| def __init__(self, arbiscan_client=None): | |
| super().__init__() | |
| # Store reference to client if provided, otherwise we'll use global instance | |
| if arbiscan_client: | |
| set_global_clients(arbiscan_client=arbiscan_client) | |
| def _run(self, address: str, startblock: int = 0, endblock: int = 99999999, page: int = 1, offset: int = 10) -> str: | |
| global _GLOBAL_ARBISCAN_CLIENT | |
| if not _GLOBAL_ARBISCAN_CLIENT: | |
| return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."}) | |
| try: | |
| txs = _GLOBAL_ARBISCAN_CLIENT.get_internal_transactions( | |
| address=address, | |
| start_block=startblock, | |
| end_block=endblock, | |
| page=page, | |
| offset=offset | |
| ) | |
| return json.dumps(txs) | |
| except Exception as e: | |
| logging.error(f"Error in ArbiscanGetInternalTransactionsTool: {str(e)}") | |
| return json.dumps({"error": str(e)}) | |
| class FetchWhaleTransactionsInput(BaseModel): | |
| """Input for the fetch_whale_transactions tool.""" | |
| addresses: List[str] = Field(..., description="List of wallet addresses to query") | |
| token_address: Optional[str] = Field(None, description="Optional token contract address to filter by") | |
| min_token_amount: Optional[float] = Field(None, description="Minimum token amount") | |
| min_usd_value: Optional[float] = Field(None, description="Minimum USD value") | |
| class ArbiscanFetchWhaleTransactionsTool(BaseTool): | |
| """Tool for fetching whale transactions from Arbiscan.""" | |
| name = "arbiscan_fetch_whale_transactions" | |
| description = "Fetch whale transactions for a list of addresses" | |
| args_schema: Type[BaseModel] = FetchWhaleTransactionsInput | |
| def __init__(self, arbiscan_client=None): | |
| super().__init__() | |
| # Store reference to client if provided, otherwise we'll use global instance | |
| if arbiscan_client: | |
| set_global_clients(arbiscan_client=arbiscan_client) | |
| def _run(self, addresses: List[str], token_address: Optional[str] = None, | |
| min_token_amount: Optional[float] = None, min_usd_value: Optional[float] = None) -> str: | |
| global _GLOBAL_ARBISCAN_CLIENT | |
| if not _GLOBAL_ARBISCAN_CLIENT: | |
| return json.dumps({"error": "Arbiscan client not initialized. Please set global client first."}) | |
| try: | |
| transactions_df = _GLOBAL_ARBISCAN_CLIENT.fetch_whale_transactions( | |
| addresses=addresses, | |
| token_address=token_address, | |
| min_token_amount=min_token_amount, | |
| min_usd_value=min_usd_value, | |
| max_pages=5 # Limit to 5 pages to prevent excessive API calls | |
| ) | |
| return transactions_df.to_json(orient="records") | |
| except Exception as e: | |
| logging.error(f"Error in ArbiscanFetchWhaleTransactionsTool: {str(e)}") | |
| return json.dumps({"error": str(e)}) | |
| class GetCurrentPriceInput(BaseModel): | |
| """Input for the get_current_price tool.""" | |
| symbol: str = Field(..., description="Token symbol (e.g., 'ETHUSD')") | |
| class GeminiGetCurrentPriceTool(BaseTool): | |
| """Tool for getting current token price from Gemini.""" | |
| name = "gemini_get_current_price" | |
| description = "Get the current price of a token" | |
| args_schema: Type[BaseModel] = GetCurrentPriceInput | |
| def __init__(self, gemini_client=None): | |
| super().__init__() | |
| # Store reference to client if provided, otherwise we'll use global instance | |
| if gemini_client: | |
| set_global_clients(gemini_client=gemini_client) | |
| def _run(self, symbol: str) -> str: | |
| global _GLOBAL_GEMINI_CLIENT | |
| if not _GLOBAL_GEMINI_CLIENT: | |
| return json.dumps({"error": "Gemini client not initialized. Please set global client first."}) | |
| try: | |
| price = _GLOBAL_GEMINI_CLIENT.get_current_price(symbol) | |
| return json.dumps({"symbol": symbol, "price": price}) | |
| except Exception as e: | |
| logging.error(f"Error in GeminiGetCurrentPriceTool: {str(e)}") | |
| return json.dumps({"error": str(e)}) | |
| class GetHistoricalPricesInput(BaseModel): | |
| """Input for the get_historical_prices tool.""" | |
| symbol: str = Field(..., description="Token symbol (e.g., 'ETHUSD')") | |
| start_time: str = Field(..., description="Start datetime in ISO format") | |
| end_time: str = Field(..., description="End datetime in ISO format") | |
| class GeminiGetHistoricalPricesTool(BaseTool): | |
| """Tool for getting historical token prices from Gemini.""" | |
| name = "gemini_get_historical_prices" | |
| description = "Get historical prices for a token within a time range" | |
| args_schema: Type[BaseModel] = GetHistoricalPricesInput | |
| def __init__(self, gemini_client=None): | |
| super().__init__() | |
| # Store reference to client if provided, otherwise we'll use global instance | |
| if gemini_client: | |
| set_global_clients(gemini_client=gemini_client) | |
| def _run( | |
| self, | |
| symbol: str, | |
| start_time: Optional[str] = None, | |
| end_time: Optional[str] = None, | |
| interval: str = "15m" | |
| ) -> str: | |
| global _GLOBAL_GEMINI_CLIENT | |
| if not _GLOBAL_GEMINI_CLIENT: | |
| return json.dumps({"error": "Gemini client not initialized. Please set global client first."}) | |
| try: | |
| # Convert string times to datetime if provided | |
| start_dt = None | |
| end_dt = None | |
| if start_time: | |
| start_dt = datetime.fromisoformat(start_time) | |
| if end_time: | |
| end_dt = datetime.fromisoformat(end_time) | |
| prices = _GLOBAL_GEMINI_CLIENT.get_historical_prices( | |
| symbol=symbol, | |
| start_time=start_dt, | |
| end_time=end_dt, | |
| interval=interval | |
| ) | |
| return json.dumps(prices) | |
| except Exception as e: | |
| logging.error(f"Error in GeminiGetHistoricalPricesTool: {str(e)}") | |
| return json.dumps({"error": str(e)}) | |
| class IdentifyPatternsInput(BaseModel): | |
| """Input for the identify_patterns tool.""" | |
| transactions_json: str = Field(..., description="JSON string of transactions") | |
| n_clusters: int = Field(3, description="Number of clusters for K-Means") | |
| class DataProcessorIdentifyPatternsTool(BaseTool): | |
| """Tool for identifying trading patterns using the DataProcessor.""" | |
| name = "data_processor_identify_patterns" | |
| description = "Identify trading patterns in a set of transactions" | |
| args_schema: Type[BaseModel] = IdentifyPatternsInput | |
| def __init__(self, data_processor=None): | |
| super().__init__() | |
| # Store reference to processor if provided, otherwise we'll use global instance | |
| if data_processor: | |
| set_global_clients(data_processor=data_processor) | |
| def _run(self, transactions_json: List[Dict[str, Any]], n_clusters: int = 3) -> str: | |
| global _GLOBAL_DATA_PROCESSOR | |
| if not _GLOBAL_DATA_PROCESSOR: | |
| return json.dumps({"error": "Data processor not initialized. Please set global processor first."}) | |
| try: | |
| # Convert JSON to DataFrame | |
| transactions_df = pd.DataFrame(transactions_json) | |
| # Ensure required columns exist | |
| required_columns = ['timeStamp', 'hash', 'from', 'to', 'value', 'tokenSymbol'] | |
| for col in required_columns: | |
| if col not in transactions_df.columns: | |
| return json.dumps({ | |
| "error": f"Missing required column: {col}", | |
| "available_columns": list(transactions_df.columns) | |
| }) | |
| # Run pattern identification | |
| patterns = _GLOBAL_DATA_PROCESSOR.identify_patterns( | |
| transactions_df=transactions_df, | |
| n_clusters=n_clusters | |
| ) | |
| return json.dumps(patterns) | |
| except Exception as e: | |
| logging.error(f"Error in DataProcessorIdentifyPatternsTool: {str(e)}") | |
| return json.dumps({"error": str(e)}) | |
| class DetectAnomalousTransactionsInput(BaseModel): | |
| """Input for the detect_anomalous_transactions tool.""" | |
| transactions_json: str = Field(..., description="JSON string of transactions") | |
| sensitivity: str = Field("Medium", description="Detection sensitivity ('Low', 'Medium', 'High')") | |
| class DataProcessorDetectAnomalousTransactionsTool(BaseTool): | |
| """Tool for detecting anomalous transactions using the DataProcessor.""" | |
| name = "data_processor_detect_anomalies" | |
| description = "Detect anomalous transactions in a dataset" | |
| args_schema: Type[BaseModel] = DetectAnomalousTransactionsInput | |
| def __init__(self, data_processor=None): | |
| super().__init__() | |
| # Store reference to processor if provided, otherwise we'll use global instance | |
| if data_processor: | |
| set_global_clients(data_processor=data_processor) | |
| def _run(self, transactions_json: List[Dict[str, Any]], sensitivity: str = "Medium") -> str: | |
| global _GLOBAL_DATA_PROCESSOR | |
| if not _GLOBAL_DATA_PROCESSOR: | |
| return json.dumps({"error": "Data processor not initialized. Please set global processor first."}) | |
| try: | |
| # Convert JSON to DataFrame | |
| transactions_df = pd.DataFrame(transactions_json) | |
| # Ensure required columns exist | |
| required_columns = ['timeStamp', 'hash', 'from', 'to', 'value', 'tokenSymbol'] | |
| for col in required_columns: | |
| if col not in transactions_df.columns: | |
| return json.dumps({ | |
| "error": f"Missing required column: {col}", | |
| "available_columns": list(transactions_df.columns) | |
| }) | |
| # Run anomaly detection | |
| anomalies = _GLOBAL_DATA_PROCESSOR.detect_anomalous_transactions( | |
| transactions_df=transactions_df, | |
| sensitivity=sensitivity | |
| ) | |
| return json.dumps(anomalies) | |
| except Exception as e: | |
| logging.error(f"Error in DataProcessorDetectAnomalousTransactionsTool: {str(e)}") | |
| return json.dumps({"error": str(e)}) | |