Spaces:
Build error
Build error
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Union, Any, Tuple | |
| from sklearn.cluster import KMeans, DBSCAN | |
| from sklearn.preprocessing import StandardScaler | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| import logging | |
| import time | |
| class DataProcessor: | |
| """ | |
| Process and analyze transaction data from blockchain APIs | |
| """ | |
| def __init__(self): | |
| pass | |
| def aggregate_transactions(self, | |
| transactions_df: pd.DataFrame, | |
| time_window: str = 'D') -> pd.DataFrame: | |
| """ | |
| Aggregate transactions by time window | |
| Args: | |
| transactions_df: DataFrame of transactions | |
| time_window: Time window for aggregation (e.g., 'D' for day, 'H' for hour) | |
| Returns: | |
| Aggregated DataFrame with transaction counts and volumes | |
| """ | |
| if transactions_df.empty: | |
| return pd.DataFrame() | |
| # Ensure timestamp column is datetime | |
| if 'Timestamp' in transactions_df.columns: | |
| timestamp_col = 'Timestamp' | |
| elif 'timeStamp' in transactions_df.columns: | |
| timestamp_col = 'timeStamp' | |
| else: | |
| raise ValueError("Timestamp column not found in transactions DataFrame") | |
| # Ensure amount column exists | |
| if 'Amount' in transactions_df.columns: | |
| amount_col = 'Amount' | |
| elif 'tokenAmount' in transactions_df.columns: | |
| amount_col = 'tokenAmount' | |
| elif 'value' in transactions_df.columns: | |
| # Try to adjust for decimals if 'tokenDecimal' exists | |
| if 'tokenDecimal' in transactions_df.columns: | |
| transactions_df['adjustedValue'] = transactions_df['value'].astype(float) / (10 ** transactions_df['tokenDecimal'].astype(int)) | |
| amount_col = 'adjustedValue' | |
| else: | |
| amount_col = 'value' | |
| else: | |
| raise ValueError("Amount column not found in transactions DataFrame") | |
| # Resample by time window | |
| transactions_df = transactions_df.copy() | |
| try: | |
| transactions_df.set_index(pd.DatetimeIndex(transactions_df[timestamp_col]), inplace=True) | |
| except Exception as e: | |
| print(f"Error setting DatetimeIndex: {str(e)}") | |
| # Create a safe index as a fallback | |
| transactions_df['safe_timestamp'] = pd.date_range( | |
| start='2025-01-01', | |
| periods=len(transactions_df), | |
| freq='H' | |
| ) | |
| transactions_df.set_index('safe_timestamp', inplace=True) | |
| # Identify buy vs sell transactions based on 'from' and 'to' addresses | |
| if 'From' in transactions_df.columns and 'To' in transactions_df.columns: | |
| from_col, to_col = 'From', 'To' | |
| elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: | |
| from_col, to_col = 'from', 'to' | |
| else: | |
| # If we can't determine direction, just aggregate total volume | |
| agg_df = transactions_df.resample(time_window).agg({ | |
| amount_col: 'sum', | |
| timestamp_col: 'count' | |
| }) | |
| agg_df.columns = ['Volume', 'Count'] | |
| return agg_df.reset_index() | |
| # Calculate net flow for each wallet address (positive = inflow, negative = outflow) | |
| wallet_addresses = set(transactions_df[from_col].unique()) | set(transactions_df[to_col].unique()) | |
| results = [] | |
| for wallet in wallet_addresses: | |
| wallet_df = transactions_df.copy() | |
| # Mark transactions as inflow or outflow | |
| wallet_df['Direction'] = 'Unknown' | |
| wallet_df.loc[wallet_df[to_col] == wallet, 'Direction'] = 'In' | |
| wallet_df.loc[wallet_df[from_col] == wallet, 'Direction'] = 'Out' | |
| # Calculate net flow | |
| wallet_df['NetFlow'] = wallet_df[amount_col] | |
| wallet_df.loc[wallet_df['Direction'] == 'Out', 'NetFlow'] = -wallet_df.loc[wallet_df['Direction'] == 'Out', amount_col] | |
| # Aggregate by time window | |
| wallet_agg = wallet_df.resample(time_window).agg({ | |
| 'NetFlow': 'sum', | |
| timestamp_col: 'count' | |
| }) | |
| wallet_agg.columns = ['NetFlow', 'Count'] | |
| wallet_agg['Wallet'] = wallet | |
| results.append(wallet_agg.reset_index()) | |
| if not results: | |
| return pd.DataFrame() | |
| combined_df = pd.concat(results, ignore_index=True) | |
| return combined_df | |
| # Cache for pattern identification to avoid repeating expensive calculations | |
| _pattern_cache = {} | |
| def identify_patterns(self, | |
| transactions_df: pd.DataFrame, | |
| n_clusters: int = 3) -> List[Dict[str, Any]]: | |
| """ | |
| Identify trading patterns using clustering algorithms | |
| Args: | |
| transactions_df: DataFrame of transactions | |
| n_clusters: Number of clusters to identify | |
| Returns: | |
| List of pattern dictionaries containing name, description, and confidence | |
| """ | |
| # Check for empty data early to avoid processing | |
| if transactions_df.empty: | |
| return [] | |
| # Create a cache key based on DataFrame hash and number of clusters | |
| try: | |
| cache_key = f"{hash(tuple(transactions_df.columns))}_{len(transactions_df)}_{n_clusters}" | |
| # Check cache first | |
| if cache_key in self._pattern_cache: | |
| return self._pattern_cache[cache_key] | |
| except Exception: | |
| # If hashing fails, proceed without caching | |
| cache_key = None | |
| try: | |
| # Create a reference instead of a deep copy to improve memory usage | |
| df = transactions_df | |
| # Ensure timestamp column exists - optimize column presence checks | |
| timestamp_cols = ['Timestamp', 'timeStamp'] | |
| timestamp_col = next((col for col in timestamp_cols if col in df.columns), None) | |
| if timestamp_col: | |
| # Convert timestamp only if needed | |
| if not pd.api.types.is_datetime64_any_dtype(df[timestamp_col]): | |
| try: | |
| # Use vectorized operations instead of astype where possible | |
| if df[timestamp_col].dtype == 'object': | |
| df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce') | |
| else: | |
| df[timestamp_col] = pd.to_datetime(df[timestamp_col], unit='s', errors='coerce') | |
| except Exception as e: | |
| # Create a date range index as fallback | |
| df['dummy_timestamp'] = pd.date_range(start='2025-01-01', periods=len(df), freq='H') | |
| timestamp_col = 'dummy_timestamp' | |
| else: | |
| # If no timestamp column, create a dummy index | |
| df['dummy_timestamp'] = pd.date_range(start='2025-01-01', periods=len(df), freq='H') | |
| timestamp_col = 'dummy_timestamp' | |
| # Efficiently calculate floor hour using vectorized operations | |
| df['hour'] = df[timestamp_col].dt.floor('H') | |
| # Check for address columns efficiently | |
| if 'From' in df.columns and 'To' in df.columns: | |
| from_col, to_col = 'From', 'To' | |
| elif 'from' in df.columns and 'to' in df.columns: | |
| from_col, to_col = 'from', 'to' | |
| else: | |
| # Create dummy addresses only if necessary | |
| df['from'] = [f'0x{i:040x}' for i in range(len(df))] | |
| df['to'] = [f'0x{(i+1):040x}' for i in range(len(df))] | |
| from_col, to_col = 'from', 'to' | |
| # Efficiently determine amount column | |
| amount_cols = ['Amount', 'tokenAmount', 'value', 'adjustedValue'] | |
| amount_col = next((col for col in amount_cols if col in df.columns), None) | |
| if not amount_col: | |
| # Handle special case for token values with decimals | |
| if 'value' in df.columns and 'tokenDecimal' in df.columns: | |
| # Vectorized calculation for improved performance | |
| try: | |
| # Ensure values are numeric | |
| df['value_numeric'] = pd.to_numeric(df['value'], errors='coerce') | |
| df['tokenDecimal_numeric'] = pd.to_numeric(df['tokenDecimal'], errors='coerce').fillna(18) | |
| df['adjustedValue'] = df['value_numeric'] / (10 ** df['tokenDecimal_numeric']) | |
| amount_col = 'adjustedValue' | |
| except Exception as e: | |
| logging.warning(f"Error converting values: {e}") | |
| df['dummy_amount'] = 1.0 | |
| amount_col = 'dummy_amount' | |
| else: | |
| # Fallback to dummy values | |
| df['dummy_amount'] = 1.0 | |
| amount_col = 'dummy_amount' | |
| # Ensure the amount column is numeric | |
| try: | |
| if amount_col in df.columns: | |
| df[f"{amount_col}_numeric"] = pd.to_numeric(df[amount_col], errors='coerce').fillna(0) | |
| amount_col = f"{amount_col}_numeric" | |
| except Exception: | |
| # If conversion fails, create a dummy numeric column | |
| df['safe_amount'] = 1.0 | |
| amount_col = 'safe_amount' | |
| # Calculate metrics using optimized groupby operations | |
| # Use a more efficient approach with built-in pandas aggregation | |
| agg_df = df.groupby('hour').agg( | |
| Count=pd.NamedAgg(column=from_col, aggfunc='count'), | |
| ).reset_index() | |
| # For NetFlow calculation, we need an additional pass | |
| # This uses a more efficient calculation method | |
| def calc_netflow(group): | |
| # Use optimized filtering and calculations for better performance | |
| first_to = group[to_col].iloc[0] if len(group) > 0 else None | |
| first_from = group[from_col].iloc[0] if len(group) > 0 else None | |
| if first_to is not None and first_from is not None: | |
| # Ensure values are converted to numeric before summing | |
| try: | |
| # Convert to numeric with pd.to_numeric, coerce errors to NaN | |
| total_in = pd.to_numeric(group.loc[group[to_col] == first_to, amount_col], errors='coerce').sum() | |
| total_out = pd.to_numeric(group.loc[group[from_col] == first_from, amount_col], errors='coerce').sum() | |
| # Replace NaN with 0 to avoid propagation | |
| if pd.isna(total_in): total_in = 0.0 | |
| if pd.isna(total_out): total_out = 0.0 | |
| return float(total_in) - float(total_out) | |
| except Exception as e: | |
| import logging | |
| logging.debug(f"Error converting values to numeric: {e}") | |
| return 0.0 | |
| return 0.0 | |
| # Calculate NetFlow using apply instead of loop | |
| netflows = df.groupby('hour').apply(calc_netflow) | |
| agg_df['NetFlow'] = netflows.values | |
| # Early return if not enough data for clustering | |
| if agg_df.empty or len(agg_df) < n_clusters: | |
| return [] | |
| # Ensure we don't have too many clusters for the dataset | |
| actual_n_clusters = min(n_clusters, max(2, len(agg_df) // 2)) | |
| # Prepare features for clustering - with careful type handling | |
| try: | |
| if 'NetFlow' in agg_df.columns: | |
| # Ensure NetFlow is numeric | |
| agg_df['NetFlow'] = pd.to_numeric(agg_df['NetFlow'], errors='coerce').fillna(0) | |
| features = agg_df[['NetFlow', 'Count']].copy() | |
| primary_metric = 'NetFlow' | |
| else: | |
| # Calculate Volume if needed | |
| if 'Volume' not in agg_df.columns and amount_col in df.columns: | |
| # Calculate volume with numeric conversion | |
| volume_by_hour = pd.to_numeric(df[amount_col], errors='coerce').fillna(0).groupby(df['hour']).sum() | |
| agg_df['Volume'] = agg_df['hour'].map(volume_by_hour) | |
| # Ensure Volume exists and is numeric | |
| if 'Volume' not in agg_df.columns: | |
| agg_df['Volume'] = 1.0 # Default value if calculation failed | |
| else: | |
| agg_df['Volume'] = pd.to_numeric(agg_df['Volume'], errors='coerce').fillna(1.0) | |
| # Ensure Count is numeric | |
| agg_df['Count'] = pd.to_numeric(agg_df['Count'], errors='coerce').fillna(1.0) | |
| features = agg_df[['Volume', 'Count']].copy() | |
| primary_metric = 'Volume' | |
| # Final check to ensure features are numeric | |
| for col in features.columns: | |
| features[col] = pd.to_numeric(features[col], errors='coerce').fillna(0) | |
| except Exception as e: | |
| logging.warning(f"Error preparing clustering features: {e}") | |
| # Create safe dummy features if everything else fails | |
| agg_df['SafeFeature'] = 1.0 | |
| agg_df['Count'] = 1.0 | |
| features = agg_df[['SafeFeature', 'Count']].copy() | |
| primary_metric = 'SafeFeature' | |
| # Scale features - import only when needed for efficiency | |
| from sklearn.preprocessing import StandardScaler | |
| scaler = StandardScaler() | |
| scaled_features = scaler.fit_transform(features) | |
| # Use K-Means with reduced complexity | |
| from sklearn.cluster import KMeans | |
| kmeans = KMeans(n_clusters=actual_n_clusters, random_state=42, n_init=10, max_iter=100) | |
| agg_df['Cluster'] = kmeans.fit_predict(scaled_features) | |
| # Calculate time-based metrics from the hour column directly | |
| if 'hour' in agg_df.columns: | |
| try: | |
| # Convert to datetime for hour and day extraction if needed | |
| hour_series = pd.to_datetime(agg_df['hour']) | |
| agg_df['Hour'] = hour_series.dt.hour | |
| agg_df['Day'] = hour_series.dt.dayofweek | |
| except Exception: | |
| # Fallback for non-convertible data | |
| agg_df['Hour'] = 0 | |
| agg_df['Day'] = 0 | |
| else: | |
| # Default values if no hour column | |
| agg_df['Hour'] = 0 | |
| agg_df['Day'] = 0 | |
| # Identify patterns efficiently | |
| patterns = [] | |
| for i in range(actual_n_clusters): | |
| # Use boolean indexing for better performance | |
| cluster_mask = agg_df['Cluster'] == i | |
| cluster_df = agg_df[cluster_mask] | |
| if len(cluster_df) == 0: | |
| continue | |
| if primary_metric == 'NetFlow': | |
| # Use numpy methods for faster calculation | |
| avg_flow = cluster_df['NetFlow'].mean() | |
| flow_std = cluster_df['NetFlow'].std() | |
| behavior = "Accumulation" if avg_flow > 0 else "Distribution" | |
| volume_metric = f"Net Flow: {avg_flow:.2f} ± {flow_std:.2f}" | |
| else: | |
| # Use Volume metrics - optimize to avoid redundant calculations | |
| avg_volume = cluster_df['Volume'].mean() if 'Volume' in cluster_df else 0 | |
| volume_std = cluster_df['Volume'].std() if 'Volume' in cluster_df else 0 | |
| behavior = "High Volume" if 'Volume' in agg_df and avg_volume > agg_df['Volume'].mean() else "Low Volume" | |
| volume_metric = f"Volume: {avg_volume:.2f} ± {volume_std:.2f}" | |
| # Pattern characteristics | |
| pattern_metrics = { | |
| "avg_flow": avg_flow, | |
| "flow_std": flow_std, | |
| "avg_count": cluster_df['Count'].mean(), | |
| "max_flow": cluster_df['NetFlow'].max(), | |
| "min_flow": cluster_df['NetFlow'].min(), | |
| "common_hour": cluster_df['Hour'].mode()[0] if not cluster_df['Hour'].empty else None, | |
| "common_day": cluster_df['Day'].mode()[0] if not cluster_df['Day'].empty else None | |
| } | |
| # Enhanced confidence calculation | |
| if primary_metric == 'NetFlow': | |
| # Calculate within-cluster variance as a percentage of total variance | |
| cluster_variance = cluster_df['NetFlow'].var() | |
| total_variance = agg_df['NetFlow'].var() or 1 # Avoid division by zero | |
| confidence = max(0.4, min(0.95, 1 - (cluster_variance / total_variance))) | |
| else: | |
| # Calculate within-cluster variance as a percentage of total variance | |
| cluster_variance = cluster_df['Volume'].var() | |
| total_variance = agg_df['Volume'].var() or 1 # Avoid division by zero | |
| confidence = max(0.4, min(0.95, 1 - (cluster_variance / total_variance))) | |
| # Create enhanced pattern charts - Main Chart | |
| if primary_metric == 'NetFlow': | |
| main_fig = px.scatter(cluster_df, x=cluster_df.index, y='NetFlow', | |
| size='Count', color='Cluster', | |
| title=f"Pattern {i+1}: {behavior}", | |
| labels={'NetFlow': 'Net Token Flow', 'index': 'Time'}, | |
| color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) | |
| # Add a trend line | |
| main_fig.add_trace(go.Scatter( | |
| x=cluster_df.index, | |
| y=cluster_df['NetFlow'].rolling(window=3, min_periods=1).mean(), | |
| mode='lines', | |
| name='Trend', | |
| line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') | |
| )) | |
| # Add a zero reference line | |
| main_fig.add_shape( | |
| type="line", | |
| x0=cluster_df.index.min(), | |
| y0=0, | |
| x1=cluster_df.index.max(), | |
| y1=0, | |
| line=dict(color="red", width=1, dash="dot"), | |
| ) | |
| else: | |
| main_fig = px.scatter(cluster_df, x=cluster_df.index, y='Volume', | |
| size='Count', color='Cluster', | |
| title=f"Pattern {i+1}: {behavior}", | |
| labels={'Volume': 'Transaction Volume', 'index': 'Time'}, | |
| color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) | |
| # Add a trend line | |
| main_fig.add_trace(go.Scatter( | |
| x=cluster_df.index, | |
| y=cluster_df['Volume'].rolling(window=3, min_periods=1).mean(), | |
| mode='lines', | |
| name='Trend', | |
| line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') | |
| )) | |
| main_fig.update_layout( | |
| template="plotly_white", | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
| margin=dict(l=20, r=20, t=50, b=20), | |
| height=400 | |
| ) | |
| # Create hourly distribution chart | |
| hour_counts = cluster_df.groupby('Hour')['Count'].sum().reindex(range(24), fill_value=0) | |
| hour_fig = px.bar(x=hour_counts.index, y=hour_counts.values, | |
| title="Hourly Distribution", | |
| labels={'x': 'Hour of Day', 'y': 'Transaction Count'}, | |
| color_discrete_sequence=['#1f77b4']) | |
| hour_fig.update_layout(template="plotly_white", height=300) | |
| # Create volume/flow distribution chart | |
| if primary_metric == 'NetFlow': | |
| hist_data = cluster_df['NetFlow'] | |
| hist_title = "Net Flow Distribution" | |
| hist_label = "Net Flow" | |
| else: | |
| hist_data = cluster_df['Volume'] | |
| hist_title = "Volume Distribution" | |
| hist_label = "Volume" | |
| dist_fig = px.histogram(hist_data, | |
| title=hist_title, | |
| labels={'value': hist_label, 'count': 'Frequency'}, | |
| color_discrete_sequence=['#2ca02c']) | |
| dist_fig.update_layout(template="plotly_white", height=300) | |
| # Find related transactions | |
| if not transactions_df.empty: | |
| # Get timestamps from this cluster | |
| cluster_times = pd.to_datetime(cluster_df.index) | |
| # Create time windows for matching | |
| time_windows = [(t - pd.Timedelta(hours=1), t + pd.Timedelta(hours=1)) for t in cluster_times] | |
| # Find transactions within these time windows | |
| pattern_txs = transactions_df[transactions_df[timestamp_col].apply( | |
| lambda x: any((start <= x <= end) for start, end in time_windows) | |
| )].copy() | |
| # If we have too many, sample them | |
| if len(pattern_txs) > 10: | |
| pattern_txs = pattern_txs.sample(10) | |
| # If we have too few, just sample from all transactions | |
| if len(pattern_txs) < 5 and len(transactions_df) >= 5: | |
| pattern_txs = transactions_df.sample(min(5, len(transactions_df))) | |
| else: | |
| pattern_txs = pd.DataFrame() | |
| # Comprehensive pattern dictionary | |
| pattern = { | |
| "name": behavior, | |
| "description": f"This pattern shows {behavior.lower()} activity.", | |
| "strategy": "Unknown", | |
| "risk_profile": "Unknown", | |
| "time_insight": "Unknown", | |
| "cluster_id": i, | |
| "metrics": pattern_metrics, | |
| "occurrence_count": len(cluster_df), | |
| "volume_metric": volume_metric, | |
| "confidence": confidence, | |
| "impact": 0.0, | |
| "charts": { | |
| "main": main_fig, | |
| "hourly_distribution": hour_fig, | |
| "value_distribution": dist_fig | |
| }, | |
| "examples": pattern_txs | |
| } | |
| patterns.append(pattern) | |
| # Cache results for future reuse | |
| if cache_key: | |
| self._pattern_cache[cache_key] = patterns | |
| return patterns | |
| except Exception as e: | |
| import logging | |
| logging.warning(f"Error during pattern identification: {str(e)}") | |
| return [] | |
| # Create enhanced pattern detection method with visualization capabilities | |
| if primary_metric == 'NetFlow': | |
| main_fig = px.scatter(cluster_df, x=cluster_df.index, y='NetFlow', | |
| size='Count', color='Cluster', | |
| title=f"Pattern {i+1}: {behavior}", | |
| labels={'NetFlow': 'Net Token Flow', 'index': 'Time'}, | |
| color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) | |
| # Add a trend line | |
| main_fig.add_trace(go.Scatter( | |
| x=cluster_df.index, | |
| y=cluster_df['NetFlow'].rolling(window=3, min_periods=1).mean(), | |
| mode='lines', | |
| name='Trend', | |
| line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') | |
| )) | |
| # Add a zero reference line | |
| main_fig.add_shape( | |
| type="line", | |
| x0=cluster_df.index.min(), | |
| y0=0, | |
| x1=cluster_df.index.max(), | |
| y1=0, | |
| line=dict(color="red", width=1, dash="dot"), | |
| ) | |
| else: | |
| main_fig = px.scatter(cluster_df, x=cluster_df.index, y='Volume', | |
| size='Count', color='Cluster', | |
| title=f"Pattern {i+1}: {behavior}", | |
| labels={'Volume': 'Transaction Volume', 'index': 'Time'}, | |
| color_discrete_sequence=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']) | |
| # Add a trend line | |
| main_fig.add_trace(go.Scatter( | |
| x=cluster_df.index, | |
| y=cluster_df['Volume'].rolling(window=3, min_periods=1).mean(), | |
| mode='lines', | |
| name='Trend', | |
| line=dict(width=2, dash='dash', color='rgba(0,0,0,0.5)') | |
| )) | |
| main_fig.update_layout( | |
| template="plotly_white", | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
| margin=dict(l=20, r=20, t=50, b=20), | |
| height=400 | |
| ) | |
| # Create hourly distribution chart | |
| hour_counts = cluster_df.groupby('Hour')['Count'].sum().reindex(range(24), fill_value=0) | |
| hour_fig = px.bar(x=hour_counts.index, y=hour_counts.values, | |
| title="Hourly Distribution", | |
| labels={'x': 'Hour of Day', 'y': 'Transaction Count'}, | |
| color_discrete_sequence=['#1f77b4']) | |
| hour_fig.update_layout(template="plotly_white", height=300) | |
| # Create volume/flow distribution chart | |
| if primary_metric == 'NetFlow': | |
| hist_data = cluster_df['NetFlow'] | |
| hist_title = "Net Flow Distribution" | |
| hist_label = "Net Flow" | |
| else: | |
| hist_data = cluster_df['Volume'] | |
| hist_title = "Volume Distribution" | |
| hist_label = "Volume" | |
| dist_fig = px.histogram(hist_data, | |
| title=hist_title, | |
| labels={'value': hist_label, 'count': 'Frequency'}, | |
| color_discrete_sequence=['#2ca02c']) | |
| dist_fig.update_layout(template="plotly_white", height=300) | |
| # Find related transactions | |
| if not transactions_df.empty: | |
| # Get timestamps from this cluster | |
| cluster_times = pd.to_datetime(cluster_df.index) | |
| # Create time windows for matching | |
| time_windows = [(t - pd.Timedelta(hours=1), t + pd.Timedelta(hours=1)) for t in cluster_times] | |
| # Find transactions within these time windows | |
| pattern_txs = transactions_df[transactions_df[timestamp_col].apply( | |
| lambda x: any((start <= x <= end) for start, end in time_windows) | |
| )].copy() | |
| # If we have too many, sample them | |
| if len(pattern_txs) > 10: | |
| pattern_txs = pattern_txs.sample(10) | |
| # If we have too few, just sample from all transactions | |
| if len(pattern_txs) < 5 and len(transactions_df) >= 5: | |
| pattern_txs = transactions_df.sample(min(5, len(transactions_df))) | |
| else: | |
| pattern_txs = pd.DataFrame() | |
| # Comprehensive pattern dictionary | |
| pattern = { | |
| "name": behavior, | |
| "description": description, | |
| "strategy": strategy, | |
| "risk_profile": risk_profile, | |
| "time_insight": time_insight, | |
| "cluster_id": i, | |
| "metrics": pattern_metrics, | |
| "occurrence_count": len(cluster_df), | |
| "volume_metric": volume_metric, | |
| "confidence": confidence, | |
| "charts": { | |
| "main": main_fig, | |
| "hourly_distribution": hour_fig, | |
| "value_distribution": dist_fig | |
| }, | |
| "examples": pattern_txs | |
| } | |
| patterns.append(pattern) | |
| return patterns | |
| def detect_anomalous_transactions(self, | |
| transactions_df: pd.DataFrame, | |
| sensitivity: str = "Medium") -> pd.DataFrame: | |
| """ | |
| Detect anomalous transactions using statistical methods | |
| Args: | |
| transactions_df: DataFrame of transactions | |
| sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
| Returns: | |
| DataFrame of anomalous transactions | |
| """ | |
| if transactions_df.empty: | |
| return pd.DataFrame() | |
| # Ensure amount column exists | |
| if 'Amount' in transactions_df.columns: | |
| amount_col = 'Amount' | |
| elif 'tokenAmount' in transactions_df.columns: | |
| amount_col = 'tokenAmount' | |
| elif 'value' in transactions_df.columns: | |
| # Try to adjust for decimals if 'tokenDecimal' exists | |
| if 'tokenDecimal' in transactions_df.columns: | |
| transactions_df['adjustedValue'] = transactions_df['value'].astype(float) / (10 ** transactions_df['tokenDecimal'].astype(int)) | |
| amount_col = 'adjustedValue' | |
| else: | |
| amount_col = 'value' | |
| else: | |
| raise ValueError("Amount column not found in transactions DataFrame") | |
| # Define sensitivity thresholds | |
| if sensitivity == "Low": | |
| z_threshold = 3.0 # Outliers beyond 3 standard deviations | |
| elif sensitivity == "Medium": | |
| z_threshold = 2.5 # Outliers beyond 2.5 standard deviations | |
| else: # High | |
| z_threshold = 2.0 # Outliers beyond 2 standard deviations | |
| # Calculate z-score for amount | |
| mean_amount = transactions_df[amount_col].mean() | |
| std_amount = transactions_df[amount_col].std() | |
| if std_amount == 0: | |
| return pd.DataFrame() | |
| transactions_df['z_score'] = abs((transactions_df[amount_col] - mean_amount) / std_amount) | |
| # Flag anomalous transactions | |
| anomalies = transactions_df[transactions_df['z_score'] > z_threshold].copy() | |
| # Add risk level based on z-score | |
| anomalies['risk_level'] = 'Medium' | |
| anomalies.loc[anomalies['z_score'] > z_threshold * 1.5, 'risk_level'] = 'High' | |
| anomalies.loc[anomalies['z_score'] <= z_threshold * 1.2, 'risk_level'] = 'Low' | |
| return anomalies | |
| def analyze_price_impact(self, | |
| transactions_df: pd.DataFrame, | |
| price_data: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Analyze the price impact of transactions with enhanced visualizations | |
| Args: | |
| transactions_df: DataFrame of transactions | |
| price_data: Dictionary of price impact data for each transaction | |
| Returns: | |
| Dictionary with comprehensive price impact analysis and visualizations | |
| """ | |
| if transactions_df.empty or not price_data: | |
| # Create an empty chart for the default case | |
| empty_fig = go.Figure() | |
| empty_fig.update_layout( | |
| title="No Price Impact Data Available", | |
| xaxis_title="Time", | |
| yaxis_title="Price Impact (%)", | |
| height=400, | |
| template="plotly_white" | |
| ) | |
| empty_fig.add_annotation( | |
| text="No transactions found with price impact data", | |
| showarrow=False, | |
| font=dict(size=14) | |
| ) | |
| return { | |
| 'avg_impact_pct': 0, | |
| 'max_impact_pct': 0, | |
| 'min_impact_pct': 0, | |
| 'significant_moves_count': 0, | |
| 'total_transactions': 0, | |
| 'charts': { | |
| 'main_chart': empty_fig, | |
| 'impact_distribution': empty_fig, | |
| 'cumulative_impact': empty_fig, | |
| 'hourly_impact': empty_fig | |
| }, | |
| 'transactions_with_impact': pd.DataFrame(), | |
| 'insights': [], | |
| 'impact_summary': "No price impact data available" | |
| } | |
| # Ensure timestamp column is datetime | |
| if 'Timestamp' in transactions_df.columns: | |
| timestamp_col = 'Timestamp' | |
| elif 'timeStamp' in transactions_df.columns: | |
| timestamp_col = 'timeStamp' | |
| # Convert timestamp to datetime if it's not already | |
| if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
| transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col], unit='s') | |
| else: | |
| raise ValueError("Timestamp column not found in transactions DataFrame") | |
| # Combine price impact data with transactions | |
| impact_data = [] | |
| for idx, row in transactions_df.iterrows(): | |
| tx_hash = row.get('Transaction Hash', row.get('hash', None)) | |
| if not tx_hash or tx_hash not in price_data: | |
| continue | |
| tx_impact = price_data[tx_hash] | |
| if tx_impact['impact_pct'] is None: | |
| continue | |
| # Get token symbol if available | |
| token_symbol = row.get('tokenSymbol', 'Unknown') | |
| token_amount = row.get('value', 0) | |
| if 'tokenDecimal' in row: | |
| try: | |
| token_amount = float(token_amount) / (10 ** int(row.get('tokenDecimal', 0))) | |
| except (ValueError, TypeError): | |
| token_amount = 0 | |
| impact_data.append({ | |
| 'transaction_hash': tx_hash, | |
| 'timestamp': row[timestamp_col], | |
| 'pre_price': tx_impact['pre_price'], | |
| 'post_price': tx_impact['post_price'], | |
| 'impact_pct': tx_impact['impact_pct'], | |
| 'token_symbol': token_symbol, | |
| 'token_amount': token_amount, | |
| 'from': row.get('from', ''), | |
| 'to': row.get('to', ''), | |
| 'hour': row[timestamp_col].hour if isinstance(row[timestamp_col], pd.Timestamp) else 0 | |
| }) | |
| if not impact_data: | |
| # Create an empty chart for the default case | |
| empty_fig = go.Figure() | |
| empty_fig.update_layout( | |
| title="No Price Impact Data Available", | |
| xaxis_title="Time", | |
| yaxis_title="Price Impact (%)", | |
| height=400, | |
| template="plotly_white" | |
| ) | |
| empty_fig.add_annotation( | |
| text="No transactions found with price impact data", | |
| showarrow=False, | |
| font=dict(size=14) | |
| ) | |
| return { | |
| 'avg_impact_pct': 0, | |
| 'max_impact_pct': 0, | |
| 'min_impact_pct': 0, | |
| 'significant_moves_count': 0, | |
| 'total_transactions': len(transactions_df) if not transactions_df.empty else 0, | |
| 'charts': { | |
| 'main_chart': empty_fig, | |
| 'impact_distribution': empty_fig, | |
| 'cumulative_impact': empty_fig, | |
| 'hourly_impact': empty_fig | |
| }, | |
| 'transactions_with_impact': pd.DataFrame(), | |
| 'insights': [], | |
| 'impact_summary': "No price impact data available" | |
| } | |
| impact_df = pd.DataFrame(impact_data) | |
| # Calculate aggregate metrics | |
| avg_impact = impact_df['impact_pct'].mean() | |
| max_impact = impact_df['impact_pct'].max() | |
| min_impact = impact_df['impact_pct'].min() | |
| median_impact = impact_df['impact_pct'].median() | |
| std_impact = impact_df['impact_pct'].std() | |
| # Count significant moves (>1% impact) | |
| significant_threshold = 1.0 | |
| high_impact_threshold = 3.0 | |
| significant_moves = len(impact_df[abs(impact_df['impact_pct']) > significant_threshold]) | |
| high_impact_moves = len(impact_df[abs(impact_df['impact_pct']) > high_impact_threshold]) | |
| positive_impacts = len(impact_df[impact_df['impact_pct'] > 0]) | |
| negative_impacts = len(impact_df[impact_df['impact_pct'] < 0]) | |
| # Calculate cumulative impact | |
| impact_df = impact_df.sort_values('timestamp') | |
| impact_df['cumulative_impact'] = impact_df['impact_pct'].cumsum() | |
| # Generate insights | |
| insights = [] | |
| # Market direction bias | |
| if avg_impact > 0.5: | |
| insights.append({ | |
| "title": "Positive Price Pressure", | |
| "description": f"Transactions show an overall positive price impact of {avg_impact:.2f}%, suggesting accumulation or market strength." | |
| }) | |
| elif avg_impact < -0.5: | |
| insights.append({ | |
| "title": "Negative Price Pressure", | |
| "description": f"Transactions show an overall negative price impact of {avg_impact:.2f}%, suggesting distribution or market weakness." | |
| }) | |
| # Volatility analysis | |
| if std_impact > 2.0: | |
| insights.append({ | |
| "title": "High Market Volatility", | |
| "description": f"Price impact shows high volatility (std: {std_impact:.2f}%), indicating potential market manipulation or whipsaw conditions." | |
| }) | |
| # Significant impacts | |
| if high_impact_moves > 0: | |
| insights.append({ | |
| "title": "High Impact Transactions", | |
| "description": f"Detected {high_impact_moves} high-impact transactions (>{high_impact_threshold}% price change), indicating potential market-moving activity." | |
| }) | |
| # Temporal patterns | |
| hourly_impact = impact_df.groupby('hour')['impact_pct'].mean() | |
| if len(hourly_impact) > 0: | |
| max_hour = hourly_impact.abs().idxmax() | |
| max_hour_impact = hourly_impact[max_hour] | |
| insights.append({ | |
| "title": "Time-Based Pattern", | |
| "description": f"Highest price impact occurs around {max_hour}:00 with an average of {max_hour_impact:.2f}%." | |
| }) | |
| # Create impact summary text | |
| impact_summary = f"Analysis of {len(impact_df)} price-impacting transactions shows an average impact of {avg_impact:.2f}% " | |
| impact_summary += f"(range: {min_impact:.2f}% to {max_impact:.2f}%). " | |
| impact_summary += f"Found {significant_moves} significant price moves and {high_impact_moves} high-impact transactions. " | |
| if positive_impacts > negative_impacts: | |
| impact_summary += f"There is a bias towards positive price impact ({positive_impacts} positive vs {negative_impacts} negative)." | |
| elif negative_impacts > positive_impacts: | |
| impact_summary += f"There is a bias towards negative price impact ({negative_impacts} negative vs {positive_impacts} positive)." | |
| else: | |
| impact_summary += "The price impact is balanced between positive and negative moves." | |
| # Create enhanced main visualization | |
| main_fig = go.Figure() | |
| # Add scatter plot for impact | |
| main_fig.add_trace(go.Scatter( | |
| x=impact_df['timestamp'], | |
| y=impact_df['impact_pct'], | |
| mode='markers+lines', | |
| marker=dict( | |
| size=impact_df['impact_pct'].abs() * 1.5 + 5, | |
| color=impact_df['impact_pct'], | |
| colorscale='RdBu_r', | |
| line=dict(width=1), | |
| symbol=['circle' if val >= 0 else 'diamond' for val in impact_df['impact_pct']] | |
| ), | |
| text=[ | |
| f"TX: {tx[:8]}...{tx[-6:]}<br>" + | |
| f"Impact: {impact:.2f}%<br>" + | |
| f"Token: {token} ({amount:.4f})<br>" + | |
| f"From: {src[:6]}...{src[-4:]}<br>" + | |
| f"To: {dst[:6]}...{dst[-4:]}" | |
| for tx, impact, token, amount, src, dst in zip( | |
| impact_df['transaction_hash'], | |
| impact_df['impact_pct'], | |
| impact_df['token_symbol'], | |
| impact_df['token_amount'], | |
| impact_df['from'], | |
| impact_df['to'] | |
| ) | |
| ], | |
| hovertemplate='%{text}<br>Time: %{x}<extra></extra>', | |
| name='Price Impact' | |
| )) | |
| # Add a moving average trendline | |
| window_size = max(3, len(impact_df) // 10) # Dynamic window size | |
| if len(impact_df) >= window_size: | |
| impact_df['ma'] = impact_df['impact_pct'].rolling(window=window_size, min_periods=1).mean() | |
| main_fig.add_trace(go.Scatter( | |
| x=impact_df['timestamp'], | |
| y=impact_df['ma'], | |
| mode='lines', | |
| line=dict(width=2, color='rgba(255,165,0,0.7)'), | |
| name=f'Moving Avg ({window_size} period)' | |
| )) | |
| # Add a zero line for reference | |
| main_fig.add_shape( | |
| type='line', | |
| x0=impact_df['timestamp'].min(), | |
| y0=0, | |
| x1=impact_df['timestamp'].max(), | |
| y1=0, | |
| line=dict(color='gray', width=1, dash='dash') | |
| ) | |
| # Add colored regions for significant impact | |
| # Add green band for normal price movement | |
| main_fig.add_shape( | |
| type='rect', | |
| x0=impact_df['timestamp'].min(), | |
| y0=-significant_threshold, | |
| x1=impact_df['timestamp'].max(), | |
| y1=significant_threshold, | |
| fillcolor='rgba(0,255,0,0.1)', | |
| line=dict(width=0), | |
| layer='below' | |
| ) | |
| # Add warning bands for higher impact movements | |
| main_fig.add_shape( | |
| type='rect', | |
| x0=impact_df['timestamp'].min(), | |
| y0=significant_threshold, | |
| x1=impact_df['timestamp'].max(), | |
| y1=high_impact_threshold, | |
| fillcolor='rgba(255,255,0,0.1)', | |
| line=dict(width=0), | |
| layer='below' | |
| ) | |
| main_fig.add_shape( | |
| type='rect', | |
| x0=impact_df['timestamp'].min(), | |
| y0=-high_impact_threshold, | |
| x1=impact_df['timestamp'].max(), | |
| y1=-significant_threshold, | |
| fillcolor='rgba(255,255,0,0.1)', | |
| line=dict(width=0), | |
| layer='below' | |
| ) | |
| # Add high impact regions | |
| main_fig.add_shape( | |
| type='rect', | |
| x0=impact_df['timestamp'].min(), | |
| y0=high_impact_threshold, | |
| x1=impact_df['timestamp'].max(), | |
| y1=max(high_impact_threshold * 2, max_impact * 1.1), | |
| fillcolor='rgba(255,0,0,0.1)', | |
| line=dict(width=0), | |
| layer='below' | |
| ) | |
| main_fig.add_shape( | |
| type='rect', | |
| x0=impact_df['timestamp'].min(), | |
| y0=min(high_impact_threshold * -2, min_impact * 1.1), | |
| x1=impact_df['timestamp'].max(), | |
| y1=-high_impact_threshold, | |
| fillcolor='rgba(255,0,0,0.1)', | |
| line=dict(width=0), | |
| layer='below' | |
| ) | |
| main_fig.update_layout( | |
| title='Price Impact of Whale Transactions', | |
| xaxis_title='Timestamp', | |
| yaxis_title='Price Impact (%)', | |
| hovermode='closest', | |
| template="plotly_white", | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
| margin=dict(l=20, r=20, t=50, b=20) | |
| ) | |
| # Create impact distribution histogram | |
| dist_fig = px.histogram( | |
| impact_df['impact_pct'], | |
| nbins=20, | |
| labels={'value': 'Price Impact (%)', 'count': 'Frequency'}, | |
| title='Distribution of Price Impact', | |
| color_discrete_sequence=['#3366CC'] | |
| ) | |
| # Add a vertical line at the mean | |
| dist_fig.add_vline(x=avg_impact, line_dash="dash", line_color="red") | |
| dist_fig.add_annotation(x=avg_impact, y=0.85, yref="paper", text=f"Mean: {avg_impact:.2f}%", | |
| showarrow=True, arrowhead=2, arrowcolor="red", ax=40) | |
| # Add a vertical line at zero | |
| dist_fig.add_vline(x=0, line_dash="solid", line_color="black") | |
| dist_fig.update_layout( | |
| template="plotly_white", | |
| bargap=0.1, | |
| height=350 | |
| ) | |
| # Create cumulative impact chart | |
| cumul_fig = go.Figure() | |
| cumul_fig.add_trace(go.Scatter( | |
| x=impact_df['timestamp'], | |
| y=impact_df['cumulative_impact'], | |
| mode='lines', | |
| fill='tozeroy', | |
| line=dict(width=2, color='#2ca02c'), | |
| name='Cumulative Impact' | |
| )) | |
| cumul_fig.update_layout( | |
| title='Cumulative Price Impact Over Time', | |
| xaxis_title='Timestamp', | |
| yaxis_title='Cumulative Price Impact (%)', | |
| template="plotly_white", | |
| height=350 | |
| ) | |
| # Create hourly impact analysis | |
| hourly_impact = impact_df.groupby('hour')['impact_pct'].agg(['mean', 'count', 'std']).reset_index() | |
| hourly_impact = hourly_impact.sort_values('hour') | |
| hour_fig = go.Figure() | |
| hour_fig.add_trace(go.Bar( | |
| x=hourly_impact['hour'], | |
| y=hourly_impact['mean'], | |
| error_y=dict(type='data', array=hourly_impact['std'], visible=True), | |
| marker_color=hourly_impact['mean'].apply(lambda x: 'green' if x > 0 else 'red'), | |
| name='Average Impact' | |
| )) | |
| hour_fig.update_layout( | |
| title='Price Impact by Hour of Day', | |
| xaxis_title='Hour of Day', | |
| yaxis_title='Average Price Impact (%)', | |
| template="plotly_white", | |
| height=350, | |
| xaxis=dict(tickmode='linear', tick0=0, dtick=2) | |
| ) | |
| # Join with original transactions | |
| transactions_df = transactions_df.copy() | |
| transactions_df['Timestamp_key'] = transactions_df[timestamp_col] | |
| impact_df['Timestamp_key'] = impact_df['timestamp'] | |
| merged_df = pd.merge( | |
| transactions_df, | |
| impact_df[['Timestamp_key', 'impact_pct', 'pre_price', 'post_price', 'cumulative_impact']], | |
| on='Timestamp_key', | |
| how='left' | |
| ) | |
| # Final result with enhanced output | |
| return { | |
| 'avg_impact_pct': avg_impact, | |
| 'max_impact_pct': max_impact, | |
| 'min_impact_pct': min_impact, | |
| 'median_impact_pct': median_impact, | |
| 'std_impact_pct': std_impact, | |
| 'significant_moves_count': significant_moves, | |
| 'high_impact_moves_count': high_impact_moves, | |
| 'positive_impacts_count': positive_impacts, | |
| 'negative_impacts_count': negative_impacts, | |
| 'total_transactions': len(transactions_df), | |
| 'charts': { | |
| 'main_chart': main_fig, | |
| 'impact_distribution': dist_fig, | |
| 'cumulative_impact': cumul_fig, | |
| 'hourly_impact': hour_fig | |
| }, | |
| 'transactions_with_impact': merged_df, | |
| 'insights': insights, | |
| 'impact_summary': impact_summary | |
| } | |
| def detect_wash_trading(self, | |
| transactions_df: pd.DataFrame, | |
| addresses: List[str], | |
| time_window_minutes: int = 60, | |
| sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
| """ | |
| Detect potential wash trading between addresses | |
| Args: | |
| transactions_df: DataFrame of transactions | |
| addresses: List of addresses to analyze | |
| time_window_minutes: Time window for detecting wash trades | |
| sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
| Returns: | |
| List of potential wash trading incidents | |
| """ | |
| if transactions_df.empty or not addresses: | |
| return [] | |
| # Ensure from/to columns exist | |
| if 'From' in transactions_df.columns and 'To' in transactions_df.columns: | |
| from_col, to_col = 'From', 'To' | |
| elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: | |
| from_col, to_col = 'from', 'to' | |
| else: | |
| raise ValueError("From/To columns not found in transactions DataFrame") | |
| # Ensure timestamp column exists | |
| if 'Timestamp' in transactions_df.columns: | |
| timestamp_col = 'Timestamp' | |
| elif 'timeStamp' in transactions_df.columns: | |
| timestamp_col = 'timeStamp' | |
| else: | |
| raise ValueError("Timestamp column not found in transactions DataFrame") | |
| # Ensure timestamp is datetime | |
| if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
| transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) | |
| # Define sensitivity thresholds | |
| if sensitivity == "Low": | |
| min_cycles = 3 # Minimum number of back-and-forth transactions | |
| max_time_diff = 120 # Maximum minutes between transactions | |
| elif sensitivity == "Medium": | |
| min_cycles = 2 | |
| max_time_diff = 60 | |
| else: # High | |
| min_cycles = 1 | |
| max_time_diff = 30 | |
| # Filter transactions involving the addresses | |
| address_txs = transactions_df[ | |
| (transactions_df[from_col].isin(addresses)) | | |
| (transactions_df[to_col].isin(addresses)) | |
| ].copy() | |
| if address_txs.empty: | |
| return [] | |
| # Sort by timestamp | |
| address_txs = address_txs.sort_values(by=timestamp_col) | |
| # Detect cycles of transactions between same addresses | |
| wash_trades = [] | |
| for addr1 in addresses: | |
| for addr2 in addresses: | |
| if addr1 == addr2: | |
| continue | |
| # Find transactions from addr1 to addr2 | |
| a1_to_a2 = address_txs[ | |
| (address_txs[from_col] == addr1) & | |
| (address_txs[to_col] == addr2) | |
| ] | |
| # Find transactions from addr2 to addr1 | |
| a2_to_a1 = address_txs[ | |
| (address_txs[from_col] == addr2) & | |
| (address_txs[to_col] == addr1) | |
| ] | |
| if a1_to_a2.empty or a2_to_a1.empty: | |
| continue | |
| # Check for back-and-forth patterns | |
| cycles = 0 | |
| evidence = [] | |
| for _, tx1 in a1_to_a2.iterrows(): | |
| tx1_time = tx1[timestamp_col] | |
| # Find return transactions within the time window | |
| return_txs = a2_to_a1[ | |
| (a2_to_a1[timestamp_col] > tx1_time) & | |
| (a2_to_a1[timestamp_col] <= tx1_time + pd.Timedelta(minutes=max_time_diff)) | |
| ] | |
| if not return_txs.empty: | |
| cycles += 1 | |
| evidence.append(tx1) | |
| evidence.append(return_txs.iloc[0]) | |
| if cycles >= min_cycles: | |
| # Create visualization | |
| if evidence: | |
| evidence_df = pd.DataFrame(evidence) | |
| fig = px.scatter( | |
| evidence_df, | |
| x=timestamp_col, | |
| y=evidence_df.get('Amount', evidence_df.get('tokenAmount', evidence_df.get('value', 0))), | |
| color=from_col, | |
| title=f"Potential Wash Trading Between {addr1[:8]}... and {addr2[:8]}..." | |
| ) | |
| else: | |
| fig = None | |
| wash_trades.append({ | |
| "type": "Wash Trading", | |
| "addresses": [addr1, addr2], | |
| "risk_level": "High" if cycles >= min_cycles * 2 else "Medium", | |
| "description": f"Detected {cycles} cycles of back-and-forth transactions between addresses", | |
| "detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "title": f"Wash Trading Pattern ({cycles} cycles)", | |
| "evidence": pd.DataFrame(evidence) if evidence else None, | |
| "chart": fig | |
| }) | |
| return wash_trades | |
| def detect_pump_and_dump(self, | |
| transactions_df: pd.DataFrame, | |
| price_data: Dict[str, Dict[str, Any]], | |
| sensitivity: str = "Medium") -> List[Dict[str, Any]]: | |
| """ | |
| Detect potential pump and dump schemes | |
| Args: | |
| transactions_df: DataFrame of transactions | |
| price_data: Dictionary of price impact data for each transaction | |
| sensitivity: Detection sensitivity ("Low", "Medium", "High") | |
| Returns: | |
| List of potential pump and dump incidents | |
| """ | |
| if transactions_df.empty or not price_data: | |
| return [] | |
| # Ensure timestamp column exists | |
| if 'Timestamp' in transactions_df.columns: | |
| timestamp_col = 'Timestamp' | |
| elif 'timeStamp' in transactions_df.columns: | |
| timestamp_col = 'timeStamp' | |
| else: | |
| raise ValueError("Timestamp column not found in transactions DataFrame") | |
| # Ensure from/to columns exist | |
| if 'From' in transactions_df.columns and 'To' in transactions_df.columns: | |
| from_col, to_col = 'From', 'To' | |
| elif 'from' in transactions_df.columns and 'to' in transactions_df.columns: | |
| from_col, to_col = 'from', 'to' | |
| else: | |
| raise ValueError("From/To columns not found in transactions DataFrame") | |
| # Ensure timestamp is datetime | |
| if not pd.api.types.is_datetime64_any_dtype(transactions_df[timestamp_col]): | |
| transactions_df[timestamp_col] = pd.to_datetime(transactions_df[timestamp_col]) | |
| # Define sensitivity thresholds | |
| if sensitivity == "Low": | |
| accumulation_threshold = 5 # Number of buys to consider accumulation | |
| pump_threshold = 10.0 # % price increase to trigger pump | |
| dump_threshold = -8.0 # % price decrease to trigger dump | |
| elif sensitivity == "Medium": | |
| accumulation_threshold = 3 | |
| pump_threshold = 7.0 | |
| dump_threshold = -5.0 | |
| else: # High | |
| accumulation_threshold = 2 | |
| pump_threshold = 5.0 | |
| dump_threshold = -3.0 | |
| # Combine price impact data with transactions | |
| txs_with_impact = [] | |
| for idx, row in transactions_df.iterrows(): | |
| tx_hash = row.get('Transaction Hash', row.get('hash', None)) | |
| if not tx_hash or tx_hash not in price_data: | |
| continue | |
| tx_impact = price_data[tx_hash] | |
| if tx_impact['impact_pct'] is None: | |
| continue | |
| txs_with_impact.append({ | |
| 'transaction_hash': tx_hash, | |
| 'timestamp': row[timestamp_col], | |
| 'from': row[from_col], | |
| 'to': row[to_col], | |
| 'pre_price': tx_impact['pre_price'], | |
| 'post_price': tx_impact['post_price'], | |
| 'impact_pct': tx_impact['impact_pct'] | |
| }) | |
| if not txs_with_impact: | |
| return [] | |
| impact_df = pd.DataFrame(txs_with_impact) | |
| impact_df = impact_df.sort_values(by='timestamp') | |
| # Look for accumulation phases followed by price pumps and then dumps | |
| pump_and_dumps = [] | |
| # Group by address to analyze per wallet | |
| address_groups = {} | |
| for from_addr in impact_df['from'].unique(): | |
| address_groups[from_addr] = impact_df[impact_df['from'] == from_addr] | |
| for to_addr in impact_df['to'].unique(): | |
| if to_addr in address_groups: | |
| address_groups[to_addr] = pd.concat([ | |
| address_groups[to_addr], | |
| impact_df[impact_df['to'] == to_addr] | |
| ]) | |
| else: | |
| address_groups[to_addr] = impact_df[impact_df['to'] == to_addr] | |
| for address, addr_df in address_groups.items(): | |
| # Skip if not enough transactions | |
| if len(addr_df) < accumulation_threshold + 2: | |
| continue | |
| # Look for continuous price increase followed by sharp drop | |
| window_size = min(len(addr_df), 10) | |
| for i in range(len(addr_df) - window_size + 1): | |
| window = addr_df.iloc[i:i+window_size] | |
| # Get cumulative price change in window | |
| if len(window) >= 2: | |
| first_price = window.iloc[0]['pre_price'] | |
| last_price = window.iloc[-1]['post_price'] | |
| if first_price is None or last_price is None: | |
| continue | |
| cumulative_change = ((last_price - first_price) / first_price) * 100 | |
| # Check for pump phase | |
| max_price = window['post_price'].max() | |
| max_idx = window['post_price'].idxmax() | |
| if max_idx < len(window) - 1: | |
| max_to_end = ((window.iloc[-1]['post_price'] - max_price) / max_price) * 100 | |
| # If we have a pump followed by a dump | |
| if (cumulative_change > pump_threshold or | |
| any(window['impact_pct'] > pump_threshold)) and max_to_end < dump_threshold: | |
| # Create chart | |
| fig = go.Figure() | |
| # Plot price line | |
| times = [t.timestamp() for t in window['timestamp']] | |
| prices = [] | |
| for _, row in window.iterrows(): | |
| prices.append(row['pre_price']) | |
| prices.append(row['post_price']) | |
| times_expanded = [] | |
| for t in times: | |
| times_expanded.append(t - 60) # 1 min before | |
| times_expanded.append(t + 60) # 1 min after | |
| fig.add_trace(go.Scatter( | |
| x=times_expanded, | |
| y=prices, | |
| mode='lines+markers', | |
| name='Price', | |
| line=dict(color='blue') | |
| )) | |
| # Highlight pump and dump phases | |
| max_time_idx = window.index.get_loc(max_idx) | |
| pump_x = times_expanded[:max_time_idx*2+2] | |
| pump_y = prices[:max_time_idx*2+2] | |
| dump_x = times_expanded[max_time_idx*2:] | |
| dump_y = prices[max_time_idx*2:] | |
| fig.add_trace(go.Scatter( | |
| x=pump_x, | |
| y=pump_y, | |
| mode='lines', | |
| line=dict(color='green', width=3), | |
| name='Pump Phase' | |
| )) | |
| fig.add_trace(go.Scatter( | |
| x=dump_x, | |
| y=dump_y, | |
| mode='lines', | |
| line=dict(color='red', width=3), | |
| name='Dump Phase' | |
| )) | |
| fig.update_layout( | |
| title='Potential Pump and Dump Pattern', | |
| xaxis_title='Time', | |
| yaxis_title='Price', | |
| hovermode='closest' | |
| ) | |
| pump_and_dumps.append({ | |
| "type": "Pump and Dump", | |
| "addresses": [address], | |
| "risk_level": "High" if max_to_end < dump_threshold * 1.5 else "Medium", | |
| "description": f"Price pumped {cumulative_change:.2f}% before dropping {max_to_end:.2f}%", | |
| "detection_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "title": f"Pump ({cumulative_change:.1f}%) and Dump ({max_to_end:.1f}%)", | |
| "evidence": window, | |
| "chart": fig | |
| }) | |
| return pump_and_dumps | |