Spaces:
Build error
Build error
| import os | |
| import logging | |
| from typing import Dict, List, Optional, Union, Any, Tuple | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| import io | |
| import base64 | |
| from crewai import Agent, Task, Crew, Process | |
| from langchain.tools import BaseTool | |
| from langchain.chat_models import ChatOpenAI | |
| from modules.api_client import ArbiscanClient, GeminiClient | |
| from modules.data_processor import DataProcessor | |
| from modules.crew_tools import ( | |
| ArbiscanGetTokenTransfersTool, | |
| ArbiscanGetNormalTransactionsTool, | |
| ArbiscanGetInternalTransactionsTool, | |
| ArbiscanFetchWhaleTransactionsTool, | |
| GeminiGetCurrentPriceTool, | |
| GeminiGetHistoricalPricesTool, | |
| DataProcessorIdentifyPatternsTool, | |
| DataProcessorDetectAnomalousTransactionsTool, | |
| set_global_clients | |
| ) | |
| class WhaleAnalysisCrewSystem: | |
| """ | |
| CrewAI system for analyzing whale wallet activity and detecting market manipulation | |
| """ | |
| def __init__(self, arbiscan_client: ArbiscanClient, gemini_client: GeminiClient, data_processor: DataProcessor): | |
| self.arbiscan_client = arbiscan_client | |
| self.gemini_client = gemini_client | |
| self.data_processor = data_processor | |
| # Initialize LLM | |
| try: | |
| from langchain.chat_models import ChatOpenAI | |
| self.llm = ChatOpenAI( | |
| model="gpt-4", | |
| temperature=0.2, | |
| api_key=os.getenv("OPENAI_API_KEY") | |
| ) | |
| except Exception as e: | |
| logging.warning(f"Could not initialize LLM: {str(e)}") | |
| self.llm = None | |
| # Use a factory method to safely create tool instances | |
| self.setup_tools() | |
| def setup_tools(self): | |
| """Setup LangChain tools for the whale analysis crew""" | |
| try: | |
| # Setup clients | |
| arbiscan_client = ArbiscanClient(api_key=os.getenv("ARBISCAN_API_KEY")) | |
| gemini_client = GeminiClient(api_key=os.getenv("GEMINI_API_KEY")) | |
| data_processor = DataProcessor() | |
| # Set global clients first | |
| set_global_clients( | |
| arbiscan_client=arbiscan_client, | |
| gemini_client=gemini_client, | |
| data_processor=data_processor | |
| ) | |
| # Create tools (no need to pass clients, they'll use globals) | |
| self.arbiscan_tools = [ | |
| self._create_tool(ArbiscanGetTokenTransfersTool), | |
| self._create_tool(ArbiscanGetNormalTransactionsTool), | |
| self._create_tool(ArbiscanGetInternalTransactionsTool), | |
| self._create_tool(ArbiscanFetchWhaleTransactionsTool) | |
| ] | |
| self.gemini_tools = [ | |
| self._create_tool(GeminiGetCurrentPriceTool), | |
| self._create_tool(GeminiGetHistoricalPricesTool) | |
| ] | |
| self.data_processor_tools = [ | |
| self._create_tool(DataProcessorIdentifyPatternsTool), | |
| self._create_tool(DataProcessorDetectAnomalousTransactionsTool) | |
| ] | |
| logging.info(f"Successfully created {len(self.arbiscan_tools + self.gemini_tools + self.data_processor_tools)} tools") | |
| except Exception as e: | |
| logging.error(f"Error setting up tools: {str(e)}") | |
| raise Exception(f"Error setting up tools: {str(e)}") | |
| def _create_tool(self, tool_class, *args, **kwargs): | |
| """Factory method to safely create a tool with proper error handling""" | |
| try: | |
| tool = tool_class(*args, **kwargs) | |
| return tool | |
| except Exception as e: | |
| logging.error(f"Failed to create tool {tool_class.__name__}: {str(e)}") | |
| raise Exception(f"Failed to create tool {tool_class.__name__}: {str(e)}") | |
| def create_agents(self): | |
| """Create the agents for the crew""" | |
| # Data Collection Agent | |
| data_collector = Agent( | |
| role="Blockchain Data Collector", | |
| goal="Collect comprehensive whale transaction data from the blockchain", | |
| backstory="""You are a blockchain analytics expert specialized in extracting and | |
| organizing on-chain data from the Arbitrum network. You have deep knowledge of blockchain | |
| transaction structures and can efficiently query APIs to gather relevant whale activity.""", | |
| verbose=True, | |
| allow_delegation=True, | |
| tools=self.arbiscan_tools, | |
| llm=self.llm | |
| ) | |
| # Price Analysis Agent | |
| price_analyst = Agent( | |
| role="Price Impact Analyst", | |
| goal="Analyze how whale transactions impact token prices", | |
| backstory="""You are a quantitative market analyst with expertise in correlating | |
| trading activity with price movements. You specialize in detecting how large trades | |
| influence market dynamics, and can identify unusual price patterns.""", | |
| verbose=True, | |
| allow_delegation=True, | |
| tools=self.gemini_tools, | |
| llm=self.llm | |
| ) | |
| # Pattern Detection Agent | |
| pattern_detector = Agent( | |
| role="Trading Pattern Detector", | |
| goal="Identify recurring behavior patterns in whale trading activity", | |
| backstory="""You are a data scientist specialized in time-series analysis and behavioral | |
| pattern recognition. You excel at spotting cyclical behaviors, correlation patterns, and | |
| anomalous trading activities across multiple addresses.""", | |
| verbose=True, | |
| allow_delegation=True, | |
| tools=self.data_processor_tools, | |
| llm=self.llm | |
| ) | |
| # Manipulation Detector Agent | |
| manipulation_detector = Agent( | |
| role="Market Manipulation Investigator", | |
| goal="Detect potential market manipulation in whale activity", | |
| backstory="""You are a financial forensics expert who has studied market manipulation | |
| techniques for years. You can identify pump-and-dump schemes, wash trading, spoofing, | |
| and other deceptive practices used by whale traders to manipulate market prices.""", | |
| verbose=True, | |
| allow_delegation=True, | |
| tools=self.data_processor_tools, | |
| llm=self.llm | |
| ) | |
| # Report Generator Agent | |
| report_generator = Agent( | |
| role="Insights Reporter", | |
| goal="Create comprehensive, actionable reports on whale activity", | |
| backstory="""You are a financial data storyteller who excels at transforming complex | |
| blockchain data into clear, insightful narratives. You can distill technical findings | |
| into actionable intelligence for different audiences.""", | |
| verbose=True, | |
| allow_delegation=True, | |
| tools=[], | |
| llm=self.llm | |
| ) | |
| return { | |
| "data_collector": data_collector, | |
| "price_analyst": price_analyst, | |
| "pattern_detector": pattern_detector, | |
| "manipulation_detector": manipulation_detector, | |
| "report_generator": report_generator | |
| } | |
| def track_large_transactions(self, | |
| wallets: List[str], | |
| start_date: datetime, | |
| end_date: datetime, | |
| threshold_value: float, | |
| threshold_type: str, | |
| token_symbol: Optional[str] = None) -> pd.DataFrame: | |
| """ | |
| Track large buy/sell transactions for specified wallets | |
| Args: | |
| wallets: List of wallet addresses to track | |
| start_date: Start date for analysis | |
| end_date: End date for analysis | |
| threshold_value: Minimum value for transaction tracking | |
| threshold_type: Type of threshold ("Token Amount" or "USD Value") | |
| token_symbol: Symbol of token to track (only required if threshold_type is "Token Amount") | |
| Returns: | |
| DataFrame of large transactions | |
| """ | |
| agents = self.create_agents() | |
| # Define tasks | |
| data_collection_task = Task( | |
| description=f""" | |
| Collect all transactions for the following wallets: {', '.join(wallets)} | |
| between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
| Filter for transactions {'of ' + token_symbol if token_symbol else ''} with a | |
| {'token amount greater than ' + str(threshold_value) if threshold_type == 'Token Amount' | |
| else 'USD value greater than $' + str(threshold_value)}. | |
| Return the data in a well-structured format with timestamp, transaction hash, | |
| sender, recipient, token symbol, and amount. | |
| """, | |
| agent=agents["data_collector"], | |
| expected_output=""" | |
| A comprehensive dataset of all large transactions for the specified wallets, | |
| properly filtered according to the threshold criteria. | |
| """ | |
| ) | |
| # Create and run the crew | |
| crew = Crew( | |
| agents=[agents["data_collector"]], | |
| tasks=[data_collection_task], | |
| verbose=2, | |
| process=Process.sequential | |
| ) | |
| result = crew.kickoff() | |
| # Process the result | |
| import json | |
| try: | |
| # Try to extract JSON from the result | |
| import re | |
| json_match = re.search(r'```json\n([\s\S]*?)\n```', result) | |
| if json_match: | |
| json_str = json_match.group(1) | |
| transactions_data = json.loads(json_str) | |
| if isinstance(transactions_data, list): | |
| return pd.DataFrame(transactions_data) | |
| else: | |
| return pd.DataFrame() | |
| else: | |
| # Try to parse the entire result as JSON | |
| transactions_data = json.loads(result) | |
| if isinstance(transactions_data, list): | |
| return pd.DataFrame(transactions_data) | |
| else: | |
| return pd.DataFrame() | |
| except: | |
| # Fallback to querying the API directly | |
| token_address = None # Would need a mapping of symbol to address | |
| transactions_df = self.arbiscan_client.fetch_whale_transactions( | |
| addresses=wallets, | |
| token_address=token_address, | |
| min_token_amount=threshold_value if threshold_type == "Token Amount" else None, | |
| min_usd_value=threshold_value if threshold_type == "USD Value" else None | |
| ) | |
| return transactions_df | |
| def identify_trading_patterns(self, | |
| wallets: List[str], | |
| start_date: datetime, | |
| end_date: datetime) -> List[Dict[str, Any]]: | |
| """ | |
| Identify trading patterns for specified wallets | |
| Args: | |
| wallets: List of wallet addresses to analyze | |
| start_date: Start date for analysis | |
| end_date: End date for analysis | |
| Returns: | |
| List of identified patterns | |
| """ | |
| agents = self.create_agents() | |
| # Define tasks | |
| data_collection_task = Task( | |
| description=f""" | |
| Collect all transactions for the following wallets: {', '.join(wallets)} | |
| between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
| Include all token transfers, regardless of size. | |
| """, | |
| agent=agents["data_collector"], | |
| expected_output=""" | |
| A comprehensive dataset of all transactions for the specified wallets. | |
| """ | |
| ) | |
| pattern_analysis_task = Task( | |
| description=""" | |
| Analyze the transaction data to identify recurring trading patterns. | |
| Look for: | |
| 1. Cyclical buying/selling behaviors | |
| 2. Time-of-day patterns | |
| 3. Accumulation/distribution phases | |
| 4. Coordinated movements across multiple addresses | |
| Cluster similar behaviors and describe each pattern identified. | |
| """, | |
| agent=agents["pattern_detector"], | |
| expected_output=""" | |
| A detailed analysis of trading patterns with: | |
| - Pattern name/type | |
| - Description of behavior | |
| - Frequency and confidence level | |
| - Example transactions showing the pattern | |
| """, | |
| context=[data_collection_task] | |
| ) | |
| # Create and run the crew | |
| crew = Crew( | |
| agents=[agents["data_collector"], agents["pattern_detector"]], | |
| tasks=[data_collection_task, pattern_analysis_task], | |
| verbose=2, | |
| process=Process.sequential | |
| ) | |
| result = crew.kickoff() | |
| # Process the result | |
| import json | |
| try: | |
| # Try to extract JSON from the result | |
| import re | |
| json_match = re.search(r'```json\n([\s\S]*?)\n```', result) | |
| if json_match: | |
| json_str = json_match.group(1) | |
| patterns_data = json.loads(json_str) | |
| # Convert the patterns to the expected format | |
| return self._convert_patterns_to_visual_format(patterns_data) | |
| else: | |
| # Fallback to a simple pattern analysis | |
| # First, get transaction data directly | |
| all_transactions = [] | |
| for wallet in wallets: | |
| transfers = self.arbiscan_client.fetch_all_token_transfers( | |
| address=wallet | |
| ) | |
| all_transactions.extend(transfers) | |
| if not all_transactions: | |
| return [] | |
| transactions_df = pd.DataFrame(all_transactions) | |
| # Use data processor to identify patterns | |
| patterns = self.data_processor.identify_patterns(transactions_df) | |
| return patterns | |
| except Exception as e: | |
| print(f"Error processing patterns: {str(e)}") | |
| return [] | |
| def _convert_patterns_to_visual_format(self, patterns_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Convert pattern data from agents to visual format with charts | |
| Args: | |
| patterns_data: Pattern data from agents | |
| Returns: | |
| List of patterns with visualizations | |
| """ | |
| visual_patterns = [] | |
| for pattern in patterns_data: | |
| # Create chart | |
| if 'examples' in pattern and pattern['examples']: | |
| examples_data = [] | |
| # Check if examples is a JSON string | |
| if isinstance(pattern['examples'], str): | |
| try: | |
| examples_data = pd.read_json(pattern['examples']) | |
| except: | |
| examples_data = pd.DataFrame() | |
| else: | |
| examples_data = pd.DataFrame(pattern['examples']) | |
| # Create visualization | |
| if not examples_data.empty: | |
| import plotly.express as px | |
| # Check for timestamp column | |
| if 'Timestamp' in examples_data.columns: | |
| time_col = 'Timestamp' | |
| elif 'timeStamp' in examples_data.columns: | |
| time_col = 'timeStamp' | |
| else: | |
| time_col = None | |
| # Check for amount column | |
| if 'Amount' in examples_data.columns: | |
| amount_col = 'Amount' | |
| elif 'tokenAmount' in examples_data.columns: | |
| amount_col = 'tokenAmount' | |
| elif 'value' in examples_data.columns: | |
| amount_col = 'value' | |
| else: | |
| amount_col = None | |
| if time_col and amount_col: | |
| # Create time series chart | |
| fig = px.line( | |
| examples_data, | |
| x=time_col, | |
| y=amount_col, | |
| title=f"Pattern: {pattern['name']}" | |
| ) | |
| else: | |
| fig = None | |
| else: | |
| fig = None | |
| else: | |
| fig = None | |
| examples_data = pd.DataFrame() | |
| # Create visual pattern object | |
| visual_pattern = { | |
| "name": pattern.get("name", "Unknown Pattern"), | |
| "description": pattern.get("description", ""), | |
| "confidence": pattern.get("confidence", 0.5), | |
| "occurrence_count": pattern.get("occurrence_count", 0), | |
| "chart_data": fig, | |
| "examples": examples_data | |
| } | |
| visual_patterns.append(visual_pattern) | |
| return visual_patterns | |
| def analyze_price_impact(self, | |
| wallets: List[str], | |
| start_date: datetime, | |
| end_date: datetime, | |
| lookback_minutes: int = 5, | |
| lookahead_minutes: int = 5) -> Dict[str, Any]: | |
| """ | |
| Analyze the impact of whale transactions on token prices | |
| Args: | |
| wallets: List of wallet addresses to analyze | |
| start_date: Start date for analysis | |
| end_date: End date for analysis | |
| lookback_minutes: Minutes to look back before transactions | |
| lookahead_minutes: Minutes to look ahead after transactions | |
| Returns: | |
| Dictionary with price impact analysis | |
| """ | |
| agents = self.create_agents() | |
| # Define tasks | |
| data_collection_task = Task( | |
| description=f""" | |
| Collect all transactions for the following wallets: {', '.join(wallets)} | |
| between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
| Focus on large transactions that might impact price. | |
| """, | |
| agent=agents["data_collector"], | |
| expected_output=""" | |
| A comprehensive dataset of all significant transactions for the specified wallets. | |
| """ | |
| ) | |
| price_impact_task = Task( | |
| description=f""" | |
| Analyze the price impact of the whale transactions. | |
| For each transaction: | |
| 1. Fetch price data for {lookback_minutes} minutes before and {lookahead_minutes} minutes after the transaction | |
| 2. Calculate the percentage price change | |
| 3. Identify transactions that caused significant price moves | |
| Summarize the overall price impact statistics and highlight notable instances. | |
| """, | |
| agent=agents["price_analyst"], | |
| expected_output=""" | |
| A detailed analysis of price impacts with: | |
| - Average price impact percentage | |
| - Maximum price impact (positive and negative) | |
| - Count of significant price moves | |
| - List of transactions with their corresponding price impacts | |
| """, | |
| context=[data_collection_task] | |
| ) | |
| # Create and run the crew | |
| crew = Crew( | |
| agents=[agents["data_collector"], agents["price_analyst"]], | |
| tasks=[data_collection_task, price_impact_task], | |
| verbose=2, | |
| process=Process.sequential | |
| ) | |
| result = crew.kickoff() | |
| # Process the result | |
| import json | |
| try: | |
| # Try to extract JSON from the result | |
| import re | |
| json_match = re.search(r'```json\n([\s\S]*?)\n```', result) | |
| if json_match: | |
| json_str = json_match.group(1) | |
| impact_data = json.loads(json_str) | |
| # Convert the impact data to visual format | |
| return self._convert_impact_to_visual_format(impact_data) | |
| else: | |
| # Fallback to direct calculation | |
| # First, get transaction data | |
| all_transactions = [] | |
| for wallet in wallets: | |
| transfers = self.arbiscan_client.fetch_all_token_transfers( | |
| address=wallet | |
| ) | |
| all_transactions.extend(transfers) | |
| if not all_transactions: | |
| return {} | |
| transactions_df = pd.DataFrame(all_transactions) | |
| # Calculate price impact for each transaction | |
| price_data = {} | |
| for idx, row in transactions_df.iterrows(): | |
| tx_hash = row.get('hash', '') | |
| if not tx_hash: | |
| continue | |
| # Get symbol | |
| symbol = row.get('tokenSymbol', '') | |
| if not symbol: | |
| continue | |
| # Get timestamp | |
| timestamp = row.get('timeStamp', 0) | |
| if not timestamp: | |
| continue | |
| # Convert timestamp to datetime | |
| if isinstance(timestamp, (int, float)): | |
| tx_time = datetime.fromtimestamp(int(timestamp)) | |
| else: | |
| tx_time = timestamp | |
| # Get price impact | |
| symbol_usd = f"{symbol}USD" | |
| impact = self.gemini_client.get_price_impact( | |
| symbol=symbol_usd, | |
| transaction_time=tx_time, | |
| lookback_minutes=lookback_minutes, | |
| lookahead_minutes=lookahead_minutes | |
| ) | |
| price_data[tx_hash] = impact | |
| # Use data processor to analyze price impact | |
| impact_analysis = self.data_processor.analyze_price_impact( | |
| transactions_df=transactions_df, | |
| price_data=price_data | |
| ) | |
| return impact_analysis | |
| except Exception as e: | |
| print(f"Error processing price impact: {str(e)}") | |
| return {} | |
| def _convert_impact_to_visual_format(self, impact_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Convert price impact data to visual format with charts | |
| Args: | |
| impact_data: Price impact data | |
| Returns: | |
| Dictionary with price impact analysis and visualizations | |
| """ | |
| # Convert transactions_with_impact to DataFrame if it's a string | |
| if 'transactions_with_impact' in impact_data and isinstance(impact_data['transactions_with_impact'], str): | |
| try: | |
| transactions_df = pd.read_json(impact_data['transactions_with_impact']) | |
| except: | |
| transactions_df = pd.DataFrame() | |
| elif 'transactions_with_impact' in impact_data and isinstance(impact_data['transactions_with_impact'], list): | |
| transactions_df = pd.DataFrame(impact_data['transactions_with_impact']) | |
| else: | |
| transactions_df = pd.DataFrame() | |
| # Create impact chart | |
| if not transactions_df.empty and 'impact_pct' in transactions_df.columns and 'Timestamp' in transactions_df.columns: | |
| import plotly.graph_objects as go | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatter( | |
| x=transactions_df['Timestamp'], | |
| y=transactions_df['impact_pct'], | |
| mode='markers+lines', | |
| name='Price Impact (%)', | |
| marker=dict( | |
| size=10, | |
| color=transactions_df['impact_pct'], | |
| colorscale='RdBu', | |
| cmin=-max(abs(transactions_df['impact_pct'])) if len(transactions_df) > 0 else -1, | |
| cmax=max(abs(transactions_df['impact_pct'])) if len(transactions_df) > 0 else 1, | |
| colorbar=dict(title='Impact %'), | |
| symbol='circle' | |
| ) | |
| )) | |
| fig.update_layout( | |
| title='Price Impact of Whale Transactions', | |
| xaxis_title='Timestamp', | |
| yaxis_title='Price Impact (%)', | |
| hovermode='closest' | |
| ) | |
| # Add zero line | |
| fig.add_hline(y=0, line_dash="dash", line_color="gray") | |
| else: | |
| fig = None | |
| # Create visual impact analysis | |
| visual_impact = { | |
| 'avg_impact_pct': impact_data.get('avg_impact_pct', 0), | |
| 'max_impact_pct': impact_data.get('max_impact_pct', 0), | |
| 'min_impact_pct': impact_data.get('min_impact_pct', 0), | |
| 'significant_moves_count': impact_data.get('significant_moves_count', 0), | |
| 'total_transactions': impact_data.get('total_transactions', 0), | |
| 'impact_chart': fig, | |
| 'transactions_with_impact': transactions_df | |
| } | |
| return visual_impact | |
| def detect_manipulation(self, | |
| wallets: List[str], | |
| start_date: datetime, | |
| end_date: datetime, | |
| sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
| """ | |
| Detect potential market manipulation by whale wallets | |
| Args: | |
| wallets: List of wallet addresses to analyze | |
| start_date: Start date for analysis | |
| end_date: End date for analysis | |
| sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
| Returns: | |
| List of manipulation alerts | |
| """ | |
| agents = self.create_agents() | |
| # Define tasks | |
| data_collection_task = Task( | |
| description=f""" | |
| Collect all transactions for the following wallets: {', '.join(wallets)} | |
| between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
| Include all token transfers and also fetch price data if available. | |
| """, | |
| agent=agents["data_collector"], | |
| expected_output=""" | |
| A comprehensive dataset of all transactions for the specified wallets. | |
| """ | |
| ) | |
| price_impact_task = Task( | |
| description=""" | |
| Analyze the price impact of the whale transactions. | |
| For each significant transaction, fetch and analyze price data around the transaction time. | |
| """, | |
| agent=agents["price_analyst"], | |
| expected_output=""" | |
| Price impact data for the transactions. | |
| """, | |
| context=[data_collection_task] | |
| ) | |
| manipulation_detection_task = Task( | |
| description=f""" | |
| Detect potential market manipulation patterns in the transaction data with sensitivity level: {sensitivity}. | |
| Look for: | |
| 1. Pump-and-Dump: Rapid buys followed by coordinated sell-offs | |
| 2. Wash Trading: Self-trading across multiple addresses | |
| 3. Spoofing: Large orders placed then canceled (if detectable) | |
| 4. Momentum Ignition: Creating sharp price moves to trigger other participants' momentum-based trading | |
| For each potential manipulation, provide: | |
| - Type of manipulation | |
| - Involved addresses | |
| - Risk level (High, Medium, Low) | |
| - Description of the suspicious behavior | |
| - Evidence (transactions showing the pattern) | |
| """, | |
| agent=agents["manipulation_detector"], | |
| expected_output=""" | |
| A detailed list of potential manipulation incidents with supporting evidence. | |
| """, | |
| context=[data_collection_task, price_impact_task] | |
| ) | |
| # Create and run the crew | |
| crew = Crew( | |
| agents=[ | |
| agents["data_collector"], | |
| agents["price_analyst"], | |
| agents["manipulation_detector"] | |
| ], | |
| tasks=[ | |
| data_collection_task, | |
| price_impact_task, | |
| manipulation_detection_task | |
| ], | |
| verbose=2, | |
| process=Process.sequential | |
| ) | |
| result = crew.kickoff() | |
| # Process the result | |
| import json | |
| try: | |
| # Try to extract JSON from the result | |
| import re | |
| json_match = re.search(r'```json\n([\s\S]*?)\n```', result) | |
| if json_match: | |
| json_str = json_match.group(1) | |
| alerts_data = json.loads(json_str) | |
| # Convert the alerts to visual format | |
| return self._convert_alerts_to_visual_format(alerts_data) | |
| else: | |
| # Fallback to direct detection | |
| # First, get transaction data | |
| all_transactions = [] | |
| for wallet in wallets: | |
| transfers = self.arbiscan_client.fetch_all_token_transfers( | |
| address=wallet | |
| ) | |
| all_transactions.extend(transfers) | |
| if not all_transactions: | |
| return [] | |
| transactions_df = pd.DataFrame(all_transactions) | |
| # Calculate price impact for each transaction | |
| price_data = {} | |
| for idx, row in transactions_df.iterrows(): | |
| tx_hash = row.get('hash', '') | |
| if not tx_hash: | |
| continue | |
| # Get symbol | |
| symbol = row.get('tokenSymbol', '') | |
| if not symbol: | |
| continue | |
| # Get timestamp | |
| timestamp = row.get('timeStamp', 0) | |
| if not timestamp: | |
| continue | |
| # Convert timestamp to datetime | |
| if isinstance(timestamp, (int, float)): | |
| tx_time = datetime.fromtimestamp(int(timestamp)) | |
| else: | |
| tx_time = timestamp | |
| # Get price impact | |
| symbol_usd = f"{symbol}USD" | |
| impact = self.gemini_client.get_price_impact( | |
| symbol=symbol_usd, | |
| transaction_time=tx_time, | |
| lookback_minutes=5, | |
| lookahead_minutes=5 | |
| ) | |
| price_data[tx_hash] = impact | |
| # Detect wash trading | |
| wash_trading_alerts = self.data_processor.detect_wash_trading( | |
| transactions_df=transactions_df, | |
| addresses=wallets, | |
| sensitivity=sensitivity | |
| ) | |
| # Detect pump and dump | |
| pump_and_dump_alerts = self.data_processor.detect_pump_and_dump( | |
| transactions_df=transactions_df, | |
| price_data=price_data, | |
| sensitivity=sensitivity | |
| ) | |
| # Combine alerts | |
| all_alerts = wash_trading_alerts + pump_and_dump_alerts | |
| return all_alerts | |
| except Exception as e: | |
| print(f"Error detecting manipulation: {str(e)}") | |
| return [] | |
| def _convert_alerts_to_visual_format(self, alerts_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Convert manipulation alerts data to visual format with charts | |
| Args: | |
| alerts_data: Alerts data from agents | |
| Returns: | |
| List of alerts with visualizations | |
| """ | |
| visual_alerts = [] | |
| for alert in alerts_data: | |
| # Create chart based on alert type | |
| if 'evidence' in alert and alert['evidence']: | |
| evidence_data = [] | |
| # Check if evidence is a JSON string | |
| if isinstance(alert['evidence'], str): | |
| try: | |
| evidence_data = pd.read_json(alert['evidence']) | |
| except: | |
| evidence_data = pd.DataFrame() | |
| else: | |
| evidence_data = pd.DataFrame(alert['evidence']) | |
| # Create visualization based on alert type | |
| if not evidence_data.empty: | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| # Check for timestamp column | |
| if 'Timestamp' in evidence_data.columns: | |
| time_col = 'Timestamp' | |
| elif 'timeStamp' in evidence_data.columns: | |
| time_col = 'timeStamp' | |
| elif 'timestamp' in evidence_data.columns: | |
| time_col = 'timestamp' | |
| else: | |
| time_col = None | |
| # Different visualizations based on alert type | |
| if alert.get('type') == 'Wash Trading' and time_col: | |
| # Create scatter plot of wash trading | |
| fig = px.scatter( | |
| evidence_data, | |
| x=time_col, | |
| y=evidence_data.get('Amount', evidence_data.get('tokenAmount', evidence_data.get('value', 0))), | |
| color=evidence_data.get('From', evidence_data.get('from', 'Unknown')), | |
| title=f"Wash Trading Evidence: {alert.get('title', '')}" | |
| ) | |
| elif alert.get('type') == 'Pump and Dump' and time_col and 'pre_price' in evidence_data.columns: | |
| # Create price line for pump and dump | |
| fig = go.Figure() | |
| # Plot price line | |
| fig.add_trace(go.Scatter( | |
| x=evidence_data[time_col], | |
| y=evidence_data['pre_price'], | |
| mode='lines+markers', | |
| name='Price Before Transaction', | |
| line=dict(color='blue') | |
| )) | |
| fig.add_trace(go.Scatter( | |
| x=evidence_data[time_col], | |
| y=evidence_data['post_price'], | |
| mode='lines+markers', | |
| name='Price After Transaction', | |
| line=dict(color='red') | |
| )) | |
| fig.update_layout( | |
| title=f"Pump and Dump Evidence: {alert.get('title', '')}", | |
| xaxis_title='Time', | |
| yaxis_title='Price', | |
| hovermode='closest' | |
| ) | |
| elif alert.get('type') == 'Momentum Ignition' and time_col and 'impact_pct' in evidence_data.columns: | |
| # Create impact scatter for momentum ignition | |
| fig = px.scatter( | |
| evidence_data, | |
| x=time_col, | |
| y='impact_pct', | |
| size=abs(evidence_data['impact_pct']), | |
| color='impact_pct', | |
| color_continuous_scale='RdBu', | |
| title=f"Momentum Ignition Evidence: {alert.get('title', '')}" | |
| ) | |
| else: | |
| # Generic timeline view | |
| if time_col: | |
| fig = px.timeline( | |
| evidence_data, | |
| x_start=time_col, | |
| x_end=time_col, | |
| y=evidence_data.get('From', evidence_data.get('from', 'Unknown')), | |
| color=alert.get('risk_level', 'Medium'), | |
| title=f"Alert Evidence: {alert.get('title', '')}" | |
| ) | |
| else: | |
| fig = None | |
| else: | |
| fig = None | |
| else: | |
| fig = None | |
| evidence_data = pd.DataFrame() | |
| # Create visual alert object | |
| visual_alert = { | |
| "type": alert.get("type", "Unknown"), | |
| "addresses": alert.get("addresses", []), | |
| "risk_level": alert.get("risk_level", "Medium"), | |
| "description": alert.get("description", ""), | |
| "detection_time": alert.get("detection_time", datetime.now().strftime("%Y-%m-%d %H:%M:%S")), | |
| "title": alert.get("title", "Alert"), | |
| "evidence": evidence_data, | |
| "chart": fig | |
| } | |
| visual_alerts.append(visual_alert) | |
| return visual_alerts | |
| def generate_report(self, | |
| wallets: List[str], | |
| start_date: datetime, | |
| end_date: datetime, | |
| report_type: str = "Transaction Summary", | |
| export_format: str = "PDF") -> Dict[str, Any]: | |
| """ | |
| Generate a report of whale activity | |
| Args: | |
| wallets: List of wallet addresses to include in the report | |
| start_date: Start date for report period | |
| end_date: End date for report period | |
| report_type: Type of report to generate | |
| export_format: Format for the report (CSV, PDF, PNG) | |
| Returns: | |
| Dictionary with report data | |
| """ | |
| from modules.visualizer import Visualizer | |
| visualizer = Visualizer() | |
| agents = self.create_agents() | |
| # Define tasks | |
| data_collection_task = Task( | |
| description=f""" | |
| Collect all transactions for the following wallets: {', '.join(wallets)} | |
| between {start_date.strftime('%Y-%m-%d')} and {end_date.strftime('%Y-%m-%d')}. | |
| """, | |
| agent=agents["data_collector"], | |
| expected_output=""" | |
| A comprehensive dataset of all transactions for the specified wallets. | |
| """ | |
| ) | |
| report_task = Task( | |
| description=f""" | |
| Generate a {report_type} report in {export_format} format. | |
| The report should include: | |
| 1. Executive summary of wallet activity | |
| 2. Transaction analysis | |
| 3. Pattern identification (if applicable) | |
| 4. Price impact analysis (if applicable) | |
| 5. Manipulation detection (if applicable) | |
| Organize the information clearly and provide actionable insights. | |
| """, | |
| agent=agents["report_generator"], | |
| expected_output=f""" | |
| A complete {export_format} report with all relevant analyses. | |
| """, | |
| context=[data_collection_task] | |
| ) | |
| # Create and run the crew | |
| crew = Crew( | |
| agents=[agents["data_collector"], agents["report_generator"]], | |
| tasks=[data_collection_task, report_task], | |
| verbose=2, | |
| process=Process.sequential | |
| ) | |
| result = crew.kickoff() | |
| # Process the result - for reports, we'll use our visualizer directly | |
| # First, get transaction data | |
| all_transactions = [] | |
| for wallet in wallets: | |
| transfers = self.arbiscan_client.fetch_all_token_transfers( | |
| address=wallet | |
| ) | |
| all_transactions.extend(transfers) | |
| if not all_transactions: | |
| return { | |
| "filename": f"no_data_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{export_format.lower()}", | |
| "content": "" | |
| } | |
| transactions_df = pd.DataFrame(all_transactions) | |
| # Generate the report based on format | |
| filename = f"whale_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| if export_format == "CSV": | |
| content = visualizer.generate_csv_report( | |
| transactions_df=transactions_df, | |
| report_type=report_type | |
| ) | |
| filename += ".csv" | |
| return { | |
| "filename": filename, | |
| "content": content | |
| } | |
| elif export_format == "PDF": | |
| # For PDF we need to get more data | |
| # Run pattern detection | |
| patterns = self.identify_trading_patterns( | |
| wallets=wallets, | |
| start_date=start_date, | |
| end_date=end_date | |
| ) | |
| # Run price impact analysis | |
| price_impact = self.analyze_price_impact( | |
| wallets=wallets, | |
| start_date=start_date, | |
| end_date=end_date | |
| ) | |
| # Run manipulation detection | |
| alerts = self.detect_manipulation( | |
| wallets=wallets, | |
| start_date=start_date, | |
| end_date=end_date | |
| ) | |
| content = visualizer.generate_pdf_report( | |
| transactions_df=transactions_df, | |
| patterns=patterns, | |
| price_impact=price_impact, | |
| alerts=alerts, | |
| title=f"Whale Analysis Report: {report_type}", | |
| start_date=start_date, | |
| end_date=end_date | |
| ) | |
| filename += ".pdf" | |
| return { | |
| "filename": filename, | |
| "content": content | |
| } | |
| elif export_format == "PNG": | |
| # For PNG we'll create a chart based on report type | |
| if report_type == "Transaction Summary": | |
| fig = visualizer.create_transaction_timeline(transactions_df) | |
| elif report_type == "Pattern Analysis": | |
| fig = visualizer.create_volume_chart(transactions_df) | |
| elif report_type == "Price Impact": | |
| # Run price impact analysis first | |
| price_impact = self.analyze_price_impact( | |
| wallets=wallets, | |
| start_date=start_date, | |
| end_date=end_date | |
| ) | |
| fig = price_impact.get('impact_chart', visualizer.create_transaction_timeline(transactions_df)) | |
| else: # "Manipulation Detection" or "Complete Analysis" | |
| fig = visualizer.create_network_graph(transactions_df) | |
| content = visualizer.generate_png_chart(fig) | |
| filename += ".png" | |
| return { | |
| "filename": filename, | |
| "content": content | |
| } | |
| else: | |
| return { | |
| "filename": f"unsupported_format_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", | |
| "content": "Unsupported export format requested." | |
| } | |