Spaces:
Build error
Build error
| import json | |
| import pandas as pd | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Union, Any, Tuple | |
| from langchain.tools import tool | |
| from modules.api_client import ArbiscanClient, GeminiClient | |
| from modules.data_processor import DataProcessor | |
| # Tools for Arbiscan API | |
| class ArbiscanTools: | |
| def __init__(self, arbiscan_client: ArbiscanClient): | |
| self.client = arbiscan_client | |
| def get_token_transfers(self, address: str, contract_address: Optional[str] = None) -> str: | |
| """ | |
| Get ERC-20 token transfers for a specific address | |
| Args: | |
| address: Wallet address | |
| contract_address: Optional token contract address to filter by | |
| Returns: | |
| List of token transfers as JSON string | |
| """ | |
| transfers = self.client.get_token_transfers( | |
| address=address, | |
| contract_address=contract_address | |
| ) | |
| return json.dumps(transfers) | |
| def get_token_balance(self, address: str, contract_address: str) -> str: | |
| """ | |
| Get the current balance of a specific token for an address | |
| Args: | |
| address: Wallet address | |
| contract_address: Token contract address | |
| Returns: | |
| Token balance | |
| """ | |
| balance = self.client.get_token_balance( | |
| address=address, | |
| contract_address=contract_address | |
| ) | |
| return balance | |
| def get_normal_transactions(self, address: str) -> str: | |
| """ | |
| Get normal transactions (ETH/ARB transfers) for a specific address | |
| Args: | |
| address: Wallet address | |
| Returns: | |
| List of normal transactions as JSON string | |
| """ | |
| transactions = self.client.get_normal_transactions(address=address) | |
| return json.dumps(transactions) | |
| def get_internal_transactions(self, address: str) -> str: | |
| """ | |
| Get internal transactions for a specific address | |
| Args: | |
| address: Wallet address | |
| Returns: | |
| List of internal transactions as JSON string | |
| """ | |
| transactions = self.client.get_internal_transactions(address=address) | |
| return json.dumps(transactions) | |
| def fetch_whale_transactions(self, | |
| addresses: List[str], | |
| token_address: Optional[str] = None, | |
| min_token_amount: Optional[float] = None, | |
| min_usd_value: Optional[float] = None) -> str: | |
| """ | |
| Fetch whale transactions for a list of addresses | |
| Args: | |
| addresses: List of wallet addresses | |
| token_address: Optional token contract address to filter by | |
| min_token_amount: Minimum token amount | |
| min_usd_value: Minimum USD value | |
| Returns: | |
| DataFrame of whale transactions as JSON string | |
| """ | |
| transactions_df = self.client.fetch_whale_transactions( | |
| addresses=addresses, | |
| token_address=token_address, | |
| min_token_amount=min_token_amount, | |
| min_usd_value=min_usd_value | |
| ) | |
| return transactions_df.to_json(orient="records") | |
| # Tools for Gemini API | |
| class GeminiTools: | |
| def __init__(self, gemini_client: GeminiClient): | |
| self.client = gemini_client | |
| def get_current_price(self, symbol: str) -> str: | |
| """ | |
| Get the current price of a token | |
| Args: | |
| symbol: Token symbol (e.g., "ETHUSD") | |
| Returns: | |
| Current price | |
| """ | |
| price = self.client.get_current_price(symbol=symbol) | |
| return str(price) if price is not None else "Price not found" | |
| def get_historical_prices(self, | |
| symbol: str, | |
| start_time: str, | |
| end_time: str) -> str: | |
| """ | |
| Get historical prices for a token within a time range | |
| Args: | |
| symbol: Token symbol (e.g., "ETHUSD") | |
| start_time: Start datetime in ISO format | |
| end_time: End datetime in ISO format | |
| Returns: | |
| DataFrame of historical prices as JSON string | |
| """ | |
| # Parse datetime strings | |
| start_time_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00')) | |
| end_time_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00')) | |
| prices_df = self.client.get_historical_prices( | |
| symbol=symbol, | |
| start_time=start_time_dt, | |
| end_time=end_time_dt | |
| ) | |
| if prices_df is not None: | |
| return prices_df.to_json(orient="records") | |
| else: | |
| return "[]" | |
| def get_price_impact(self, | |
| symbol: str, | |
| transaction_time: str, | |
| lookback_minutes: int = 5, | |
| lookahead_minutes: int = 5) -> str: | |
| """ | |
| Analyze the price impact before and after a transaction | |
| Args: | |
| symbol: Token symbol (e.g., "ETHUSD") | |
| transaction_time: Transaction datetime in ISO format | |
| lookback_minutes: Minutes to look back before the transaction | |
| lookahead_minutes: Minutes to look ahead after the transaction | |
| Returns: | |
| Price impact data as JSON string | |
| """ | |
| # Parse datetime string | |
| transaction_time_dt = datetime.fromisoformat(transaction_time.replace('Z', '+00:00')) | |
| impact_data = self.client.get_price_impact( | |
| symbol=symbol, | |
| transaction_time=transaction_time_dt, | |
| lookback_minutes=lookback_minutes, | |
| lookahead_minutes=lookahead_minutes | |
| ) | |
| # Convert to JSON string | |
| result = { | |
| "pre_price": impact_data["pre_price"], | |
| "post_price": impact_data["post_price"], | |
| "impact_pct": impact_data["impact_pct"] | |
| } | |
| return json.dumps(result) | |
| # Tools for Data Processor | |
| class DataProcessorTools: | |
| def __init__(self, data_processor: DataProcessor): | |
| self.processor = data_processor | |
| def aggregate_transactions(self, | |
| transactions_json: str, | |
| time_window: str = 'D') -> str: | |
| """ | |
| Aggregate transactions by time window | |
| Args: | |
| transactions_json: JSON string of transactions | |
| time_window: Time window for aggregation (e.g., 'D' for day, 'H' for hour) | |
| Returns: | |
| Aggregated DataFrame as JSON string | |
| """ | |
| # Convert JSON to DataFrame | |
| transactions_df = pd.read_json(transactions_json) | |
| # Process data | |
| agg_df = self.processor.aggregate_transactions( | |
| transactions_df=transactions_df, | |
| time_window=time_window | |
| ) | |
| # Convert result to JSON | |
| return agg_df.to_json(orient="records") | |
| def identify_patterns(self, | |
| transactions_json: str, | |
| n_clusters: int = 3) -> str: | |
| """ | |
| Identify trading patterns using clustering | |
| Args: | |
| transactions_json: JSON string of transactions | |
| n_clusters: Number of clusters for K-Means | |
| Returns: | |
| List of pattern dictionaries as JSON string | |
| """ | |
| # Convert JSON to DataFrame | |
| transactions_df = pd.read_json(transactions_json) | |
| # Process data | |
| patterns = self.processor.identify_patterns( | |
| transactions_df=transactions_df, | |
| n_clusters=n_clusters | |
| ) | |
| # Convert result to JSON | |
| result = [] | |
| for pattern in patterns: | |
| # Convert non-serializable objects to serializable format | |
| pattern_json = { | |
| "name": pattern["name"], | |
| "description": pattern["description"], | |
| "cluster_id": pattern["cluster_id"], | |
| "occurrence_count": pattern["occurrence_count"], | |
| "confidence": pattern["confidence"], | |
| # Skip chart_data as it's not JSON serializable | |
| "examples": pattern["examples"].to_json(orient="records") if isinstance(pattern["examples"], pd.DataFrame) else [] | |
| } | |
| result.append(pattern_json) | |
| return json.dumps(result) | |
| def detect_anomalous_transactions(self, | |
| transactions_json: str, | |
| sensitivity: str = "Medium") -> str: | |
| """ | |
| Detect anomalous transactions using statistical methods | |
| Args: | |
| transactions_json: JSON string of transactions | |
| sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
| Returns: | |
| DataFrame of anomalous transactions as JSON string | |
| """ | |
| # Convert JSON to DataFrame | |
| transactions_df = pd.read_json(transactions_json) | |
| # Process data | |
| anomalies_df = self.processor.detect_anomalous_transactions( | |
| transactions_df=transactions_df, | |
| sensitivity=sensitivity | |
| ) | |
| # Convert result to JSON | |
| return anomalies_df.to_json(orient="records") | |
| def analyze_price_impact(self, | |
| transactions_json: str, | |
| price_data_json: str) -> str: | |
| """ | |
| Analyze the price impact of transactions | |
| Args: | |
| transactions_json: JSON string of transactions | |
| price_data_json: JSON string of price impact data | |
| Returns: | |
| Price impact analysis as JSON string | |
| """ | |
| # Convert JSON to DataFrame | |
| transactions_df = pd.read_json(transactions_json) | |
| # Convert price_data_json to dictionary | |
| price_data = json.loads(price_data_json) | |
| # Process data | |
| impact_analysis = self.processor.analyze_price_impact( | |
| transactions_df=transactions_df, | |
| price_data=price_data | |
| ) | |
| # Convert result to JSON (excluding non-serializable objects) | |
| result = { | |
| "avg_impact_pct": impact_analysis.get("avg_impact_pct"), | |
| "max_impact_pct": impact_analysis.get("max_impact_pct"), | |
| "min_impact_pct": impact_analysis.get("min_impact_pct"), | |
| "significant_moves_count": impact_analysis.get("significant_moves_count"), | |
| "total_transactions": impact_analysis.get("total_transactions"), | |
| # Skip impact_chart as it's not JSON serializable | |
| "transactions_with_impact": impact_analysis.get("transactions_with_impact").to_json(orient="records") if "transactions_with_impact" in impact_analysis else [] | |
| } | |
| return json.dumps(result) | |
| def detect_wash_trading(self, | |
| transactions_json: str, | |
| addresses_json: str, | |
| sensitivity: str = "Medium") -> str: | |
| """ | |
| Detect potential wash trading between addresses | |
| Args: | |
| transactions_json: JSON string of transactions | |
| addresses_json: JSON string of addresses to analyze | |
| sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
| Returns: | |
| List of potential wash trading incidents as JSON string | |
| """ | |
| # Convert JSON to DataFrame | |
| transactions_df = pd.read_json(transactions_json) | |
| # Convert addresses_json to list | |
| addresses = json.loads(addresses_json) | |
| # Process data | |
| wash_trades = self.processor.detect_wash_trading( | |
| transactions_df=transactions_df, | |
| addresses=addresses, | |
| sensitivity=sensitivity | |
| ) | |
| # Convert result to JSON (excluding non-serializable objects) | |
| result = [] | |
| for trade in wash_trades: | |
| trade_json = { | |
| "type": trade["type"], | |
| "addresses": trade["addresses"], | |
| "risk_level": trade["risk_level"], | |
| "description": trade["description"], | |
| "detection_time": trade["detection_time"], | |
| "title": trade["title"], | |
| "evidence": trade["evidence"].to_json(orient="records") if isinstance(trade["evidence"], pd.DataFrame) else [] | |
| # Skip chart as it's not JSON serializable | |
| } | |
| result.append(trade_json) | |
| return json.dumps(result) | |