Spaces:
Runtime error
Runtime error
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import gradio as gr | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.preprocessing import StandardScaler | |
| import openai | |
| from datetime import datetime, timedelta | |
| import json | |
| import tempfile | |
| # Set OpenAI API key from Hugging Face Spaces secrets | |
| openai.api_key = os.environ.get("OPENAI_API_KEY") | |
| def analyze_dataset_structure(df): | |
| """Use OpenAI to analyze the dataset structure and identify relevant columns""" | |
| if not openai.api_key: | |
| return None, "OpenAI API key not found. Please add it to the Hugging Face Spaces secrets." | |
| try: | |
| # Get basic dataset info | |
| sample_data = df.head(3).to_dict(orient='records') | |
| column_info = [] | |
| for col in df.columns: | |
| dtype = str(df[col].dtype) | |
| unique_values = len(df[col].unique()) | |
| null_percentage = round((df[col].isna().sum() / len(df)) * 100, 2) | |
| sample_values = df[col].dropna().sample(min(3, len(df[col].dropna()))).tolist() | |
| column_info.append({ | |
| "column_name": col, | |
| "data_type": dtype, | |
| "unique_values_count": unique_values, | |
| "null_percentage": null_percentage, | |
| "sample_values": str(sample_values)[:100] # Limit sample length | |
| }) | |
| # Create prompt for OpenAI | |
| prompt = f""" | |
| Analyze this transaction dataset structure to identify the purpose of each column. | |
| Dataset Information: | |
| - Number of rows: {len(df)} | |
| - Number of columns: {len(df.columns)} | |
| Column Information: | |
| {json.dumps(column_info, indent=2)} | |
| Sample Data: | |
| {json.dumps(sample_data, indent=2)} | |
| For each column in the dataset, identify its likely purpose in a transaction dataset. | |
| Specifically identify: | |
| 1. Which column is likely the transaction ID or reference number | |
| 2. Which column represents the transaction amount or value | |
| 3. Which column represents the timestamp or date of the transaction | |
| 4. Which column represents the user ID, account ID, or customer identifier | |
| 5. Which column might represent location information | |
| 6. Which columns might be useful for fraud detection (e.g., IP address, device info, transaction status) | |
| Return your analysis as a JSON object with this structure: | |
| { | |
| "id_column": "column_name", | |
| "amount_column": "column_name", | |
| "timestamp_column": "column_name", | |
| "user_column": "column_name", | |
| "location_column": "column_name", | |
| "fraud_indicator_columns": ["column1", "column2"], | |
| "column_descriptions": { | |
| "column_name": "description of purpose" | |
| } | |
| } | |
| Include only columns that you're reasonably confident about, and use null for any category where you can't identify a matching column. | |
| """ | |
| # Create an OpenAI client with the API key | |
| client = openai.OpenAI(api_key=openai.api_key) | |
| # Call OpenAI API | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a data analysis expert specializing in financial transaction data structures."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=1000, | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse the JSON response | |
| structure_analysis = json.loads(response.choices[0].message.content) | |
| # Also get a natural language explanation | |
| explanation_prompt = f""" | |
| Based on your analysis of the dataset structure, provide a brief natural language explanation of: | |
| 1. What kind of transactions this dataset appears to contain | |
| 2. What the key columns are and what they represent | |
| 3. What approach would be best for detecting anomalies or fraud in this specific dataset | |
| Keep your explanation concise and focused on the unique characteristics of this dataset. | |
| """ | |
| explanation_response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a data analysis expert specializing in financial transaction data structures."}, | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": response.choices[0].message.content}, | |
| {"role": "user", "content": explanation_prompt} | |
| ], | |
| max_tokens=500 | |
| ) | |
| explanation = explanation_response.choices[0].message.content | |
| return structure_analysis, explanation | |
| except Exception as e: | |
| return None, f"Error analyzing dataset structure: {str(e)}" | |
| def analyze_dataset_structure(df): | |
| """Use OpenAI to analyze the dataset structure and identify relevant columns""" | |
| if not openai.api_key: | |
| return None, "OpenAI API key not found. Please add it to the Hugging Face Spaces secrets." | |
| try: | |
| # Get basic dataset info | |
| sample_data = df.head(3).copy() | |
| # Convert any non-serializable data types to strings | |
| for col in sample_data.columns: | |
| if pd.api.types.is_datetime64_any_dtype(sample_data[col]): | |
| sample_data[col] = sample_data[col].astype(str) | |
| elif isinstance(sample_data[col].iloc[0], (np.int64, np.float64)): | |
| sample_data[col] = sample_data[col].astype(float) | |
| # Now convert to dict | |
| sample_data_dict = sample_data.to_dict(orient='records') | |
| column_info = [] | |
| for col in df.columns: | |
| dtype = str(df[col].dtype) | |
| unique_values = len(df[col].unique()) | |
| null_percentage = round((df[col].isna().sum() / len(df)) * 100, 2) | |
| # Handle sample values more carefully | |
| try: | |
| sample_values = df[col].dropna().sample(min(3, len(df[col].dropna()))).tolist() | |
| # Convert numpy types to native Python types | |
| if isinstance(sample_values, list): | |
| sample_values = [item.item() if hasattr(item, 'item') else str(item) for item in sample_values] | |
| sample_values_str = str(sample_values)[:100] # Limit sample length | |
| except: | |
| sample_values_str = "Error getting sample values" | |
| column_info.append({ | |
| "column_name": col, | |
| "data_type": dtype, | |
| "unique_values_count": unique_values, | |
| "null_percentage": null_percentage, | |
| "sample_values": sample_values_str | |
| }) | |
| # Create prompt for OpenAI | |
| prompt = f""" | |
| Analyze this transaction dataset structure to identify the purpose of each column. | |
| Dataset Information: | |
| - Number of rows: {len(df)} | |
| - Number of columns: {len(df.columns)} | |
| Column Information: | |
| {json.dumps(column_info, indent=2)} | |
| Sample Data: | |
| {json.dumps(sample_data_dict, indent=2)} | |
| For each column in the dataset, identify its likely purpose in a transaction dataset. | |
| Specifically identify: | |
| 1. Which column is likely the transaction ID or reference number | |
| 2. Which column represents the transaction amount or value | |
| 3. Which column represents the timestamp or date of the transaction | |
| 4. Which column represents the user ID, account ID, or customer identifier | |
| 5. Which column might represent location information | |
| 6. Which columns might be useful for fraud detection (e.g., IP address, device info, transaction status) | |
| Return your analysis as a JSON object with this structure: | |
| {{ | |
| "id_column": "column_name", | |
| "amount_column": "column_name", | |
| "timestamp_column": "column_name", | |
| "user_column": "column_name", | |
| "location_column": "column_name", | |
| "fraud_indicator_columns": ["column1", "column2"], | |
| "column_descriptions": {{ | |
| "column_name": "description of purpose" | |
| }} | |
| }} | |
| Include only columns that you're reasonably confident about, and use null for any category where you can't identify a matching column. | |
| """ | |
| # Create an OpenAI client with the API key | |
| client = openai.OpenAI(api_key=openai.api_key) | |
| # Call OpenAI API | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a data analysis expert specializing in financial transaction data structures."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=1000, | |
| response_format={"type": "json_object"} | |
| ) | |
| # Parse the JSON response | |
| structure_analysis = json.loads(response.choices[0].message.content) | |
| # Also get a natural language explanation | |
| explanation_prompt = f""" | |
| Based on your analysis of the dataset structure, provide a brief natural language explanation of: | |
| 1. What kind of transactions this dataset appears to contain | |
| 2. What the key columns are and what they represent | |
| 3. What approach would be best for detecting anomalies or fraud in this specific dataset | |
| Keep your explanation concise and focused on the unique characteristics of this dataset. | |
| """ | |
| explanation_response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a data analysis expert specializing in financial transaction data structures."}, | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": response.choices[0].message.content}, | |
| {"role": "user", "content": explanation_prompt} | |
| ], | |
| max_tokens=500 | |
| ) | |
| explanation = explanation_response.choices[0].message.content | |
| return structure_analysis, explanation | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| return None, f"Error analyzing dataset structure: {str(e)}\n\nTrace: {error_trace}" | |
| def load_and_preprocess_data(file): | |
| """Load and preprocess transaction data from CSV or Excel file""" | |
| if file is None: | |
| return None, None, None # Return three values instead of two | |
| # Get file extension | |
| file_extension = os.path.splitext(file.name)[1].lower() | |
| # Read file based on extension | |
| if file_extension == '.csv': | |
| df = pd.read_csv(file.name) | |
| elif file_extension in ['.xlsx', '.xls']: | |
| df = pd.read_excel(file.name) | |
| else: | |
| raise ValueError("Unsupported file format. Please upload a CSV or Excel file.") | |
| # Check if the DataFrame is empty | |
| if df.empty: | |
| raise ValueError("The uploaded file is empty.") | |
| # Analyze dataset structure with LLM | |
| column_mapping, dataset_explanation = analyze_dataset_structure(df) | |
| # If LLM analysis failed, perform basic preprocessing | |
| if column_mapping is None: | |
| return df, dataset_explanation, None # Return three values with column_mapping as None | |
| # Process the data based on identified columns | |
| processed_df = df.copy() | |
| # Convert timestamp to datetime if identified | |
| timestamp_col = column_mapping.get("timestamp_column") | |
| if timestamp_col and timestamp_col in df.columns: | |
| try: | |
| processed_df[timestamp_col] = pd.to_datetime(df[timestamp_col]) | |
| except: | |
| print(f"Warning: Could not convert {timestamp_col} to datetime format.") | |
| # Ensure amount column is numeric if identified | |
| amount_col = column_mapping.get("amount_column") | |
| if amount_col and amount_col in df.columns: | |
| try: | |
| processed_df[amount_col] = pd.to_numeric(df[amount_col]) | |
| except: | |
| print(f"Warning: Could not convert {amount_col} to numeric values.") | |
| return processed_df, dataset_explanation, column_mapping | |
| def detect_fraud_and_anomalies(df, column_mapping): | |
| """Detect fraud and anomalies in transaction data based on LLM-identified columns""" | |
| # Create feature set for anomaly detection | |
| features = pd.DataFrame() | |
| # Add amount feature if available | |
| amount_col = column_mapping.get("amount_column") | |
| if amount_col and amount_col in df.columns: | |
| features['amount'] = df[amount_col] | |
| # Add time-based features if available | |
| timestamp_col = column_mapping.get("timestamp_column") | |
| if timestamp_col and timestamp_col in df.columns and pd.api.types.is_datetime64_any_dtype(df[timestamp_col]): | |
| # Extract hour and day of week | |
| features['hour_of_day'] = pd.to_numeric(df[timestamp_col].dt.hour) | |
| features['day_of_week'] = pd.to_numeric(df[timestamp_col].dt.dayofweek) | |
| # Add location feature if available | |
| location_col = column_mapping.get("location_column") | |
| if location_col and location_col in df.columns: | |
| # One-hot encode location | |
| location_dummies = pd.get_dummies(df[location_col], prefix='location') | |
| features = pd.concat([features, location_dummies], axis=1) | |
| # Add fraud indicator columns if identified | |
| fraud_indicators = column_mapping.get("fraud_indicator_columns", []) | |
| for col in fraud_indicators: | |
| if col in df.columns: | |
| if pd.api.types.is_numeric_dtype(df[col]): | |
| features[col] = df[col] | |
| else: | |
| # One-hot encode categorical indicators | |
| indicator_dummies = pd.get_dummies(df[col], prefix=col) | |
| features = pd.concat([features, indicator_dummies], axis=1) | |
| # If still no features available, use all numeric columns | |
| if features.empty or features.shape[1] < 2: | |
| numeric_cols = df.select_dtypes(include=['number']).columns.tolist() | |
| if numeric_cols: | |
| for col in numeric_cols: | |
| if col not in features.columns: | |
| features[col] = df[col] | |
| # If still not enough features, add dummy feature | |
| if features.empty or features.shape[1] < 2: | |
| features['dummy1'] = np.random.random(len(df)) | |
| features['dummy2'] = np.random.random(len(df)) | |
| # Standardize features | |
| scaler = StandardScaler() | |
| scaled_features = scaler.fit_transform(features) | |
| # Apply Isolation Forest for anomaly detection | |
| clf = IsolationForest(contamination=0.05, random_state=42) | |
| anomaly_scores = clf.fit_predict(scaled_features) | |
| # Create a result DataFrame with original data and anomaly scores | |
| result_df = df.copy() | |
| # Add anomaly flags | |
| result_df['anomaly_score'] = anomaly_scores | |
| result_df['is_anomaly'] = result_df['anomaly_score'] == -1 | |
| # Initialize fraud indicators | |
| result_df['high_amount'] = False | |
| result_df['unusual_hour'] = False | |
| result_df['high_frequency'] = False | |
| result_df['rapid_succession'] = False | |
| # 1. Unusually large transactions (if amount column is available) | |
| if amount_col and amount_col in df.columns: | |
| amount_threshold = df[amount_col].quantile(0.95) | |
| result_df['high_amount'] = df[amount_col] > amount_threshold | |
| # 2. Transactions occurring at unusual hours (if timestamp available) | |
| if timestamp_col and timestamp_col in df.columns and pd.api.types.is_datetime64_any_dtype(df[timestamp_col]): | |
| hours = np.array(df[timestamp_col].dt.hour) | |
| result_df['unusual_hour'] = np.isin(hours, [0, 1, 2, 3, 4]) | |
| # 3. Calculate transaction frequency by user or account (if available) | |
| user_col = column_mapping.get("user_column") | |
| if user_col and user_col in df.columns: | |
| transaction_counts = df.groupby(user_col).size().reset_index(name='transaction_count') | |
| result_df = result_df.merge(transaction_counts, on=user_col, how='left') | |
| result_df['high_frequency'] = result_df['transaction_count'] > result_df['transaction_count'].quantile(0.9) | |
| # 4. Velocity check: multiple transactions in short time period | |
| if timestamp_col and user_col and timestamp_col in df.columns and user_col in df.columns: | |
| if pd.api.types.is_datetime64_any_dtype(df[timestamp_col]): | |
| velocity_df = df[[timestamp_col, user_col]].copy().sort_values([user_col, timestamp_col]) | |
| velocity_df['time_diff'] = velocity_df.groupby(user_col)[timestamp_col].diff() | |
| # Handle potential NaT values | |
| velocity_df['time_diff_seconds'] = velocity_df['time_diff'].dt.total_seconds().fillna(0) | |
| velocity_df['rapid_succession'] = velocity_df['time_diff_seconds'] < 300 # Less than 5 minutes | |
| # Map back to the original DataFrame | |
| result_df = result_df.merge( | |
| velocity_df[['rapid_succession']], | |
| left_index=True, | |
| right_index=True, | |
| how='left' | |
| ) | |
| result_df['rapid_succession'] = result_df['rapid_succession'].fillna(False) | |
| # Combine all fraud indicators with adaptive weighting | |
| weights = { | |
| 'is_anomaly': 3, # Base weight for anomaly detection | |
| 'high_amount': 2, | |
| 'unusual_hour': 1, | |
| 'high_frequency': 1, | |
| 'rapid_succession': 1 | |
| } | |
| # Calculate fraud score based on available indicators | |
| result_df['fraud_score'] = 0 | |
| for indicator, weight in weights.items(): | |
| if indicator in result_df.columns: | |
| result_df['fraud_score'] += result_df[indicator].astype(int) * weight | |
| # Flag as suspicious if fraud score is above threshold (adapt based on available indicators) | |
| available_weights = sum([weight for indicator, weight in weights.items() if indicator in result_df.columns]) | |
| threshold = max(3, available_weights * 0.3) # At least 3 or 30% of max possible score | |
| result_df['is_suspicious'] = result_df['fraud_score'] >= threshold | |
| return result_df | |
| def create_visualizations(df, column_mapping): | |
| """Create visualizations for transaction data and anomalies based on LLM-identified columns""" | |
| visualizations = {} | |
| try: | |
| # Prepare a copy for plotting | |
| plot_df = df.copy() | |
| # Get important columns | |
| timestamp_col = column_mapping.get("timestamp_column") | |
| amount_col = column_mapping.get("amount_column") | |
| user_col = column_mapping.get("user_column") | |
| # Convert timestamp to string for plotly if it exists | |
| if timestamp_col and timestamp_col in plot_df.columns: | |
| if pd.api.types.is_datetime64_any_dtype(plot_df[timestamp_col]): | |
| plot_df['timestamp_str'] = plot_df[timestamp_col].dt.strftime('%Y-%m-%d %H:%M:%S') | |
| # 1. Distribution of transaction amounts with anomalies highlighted (if amount column exists) | |
| if amount_col and amount_col in plot_df.columns: | |
| fig1 = px.histogram( | |
| plot_df, x=amount_col, color='is_suspicious', | |
| color_discrete_map={True: 'red', False: 'blue'}, | |
| title='Distribution of Transaction Amounts', | |
| labels={amount_col: 'Transaction Amount', 'is_suspicious': 'Suspicious'} | |
| ) | |
| fig1.update_layout(height=500, width=700) | |
| visualizations['amount_distribution'] = fig1 | |
| # 2. Time series of transaction amounts (if both timestamp and amount columns exist) | |
| if timestamp_col and amount_col and 'timestamp_str' in plot_df.columns: | |
| fig2 = px.scatter( | |
| plot_df, x='timestamp_str', y=amount_col, color='is_suspicious', | |
| color_discrete_map={True: 'red', False: 'blue'}, | |
| title='Transaction Amounts Over Time', | |
| labels={amount_col: 'Transaction Amount', 'timestamp_str': 'Time', 'is_suspicious': 'Suspicious'} | |
| ) | |
| fig2.update_layout(height=500, width=700) | |
| visualizations['time_series'] = fig2 | |
| # 3. Fraud score distribution | |
| fig3 = px.histogram( | |
| plot_df, x='fraud_score', | |
| title='Distribution of Fraud Scores', | |
| labels={'fraud_score': 'Fraud Score'} | |
| ) | |
| fig3.update_layout(height=500, width=700) | |
| visualizations['fraud_score_dist'] = fig3 | |
| # 4. User transaction frequency (if user column exists) | |
| if user_col and user_col in plot_df.columns: | |
| user_counts = plot_df.groupby([user_col, 'is_suspicious']).size().reset_index(name='count') | |
| # Limit to top 20 users by transaction count | |
| top_users = plot_df.groupby(user_col).size().sort_values(ascending=False).head(20).index | |
| user_counts_filtered = user_counts[user_counts[user_col].isin(top_users)] | |
| fig4 = px.bar( | |
| user_counts_filtered, x=user_col, y='count', color='is_suspicious', | |
| color_discrete_map={True: 'red', False: 'blue'}, | |
| title='Transaction Frequency by User (Top 20)', | |
| labels={user_col: 'User', 'count': 'Number of Transactions', 'is_suspicious': 'Suspicious'} | |
| ) | |
| fig4.update_layout(height=500, width=700) | |
| visualizations['user_frequency'] = fig4 | |
| # 5. Hourly transaction pattern (if timestamp available) | |
| if timestamp_col and timestamp_col in plot_df.columns: | |
| if pd.api.types.is_datetime64_any_dtype(plot_df[timestamp_col]): | |
| # Get hourly data | |
| hourly_counts = plot_df.groupby([plot_df[timestamp_col].dt.hour, 'is_suspicious']).size() | |
| hourly_df = hourly_counts.reset_index() | |
| hourly_df.columns = ['hour', 'is_suspicious', 'count'] | |
| fig5 = px.line( | |
| hourly_df, x='hour', y='count', color='is_suspicious', | |
| color_discrete_map={True: 'red', False: 'blue'}, | |
| title='Hourly Transaction Pattern', | |
| labels={'hour': 'Hour of Day', 'count': 'Number of Transactions', 'is_suspicious': 'Suspicious'} | |
| ) | |
| fig5.update_layout(height=500, width=700) | |
| visualizations['hourly_pattern'] = fig5 | |
| except Exception as e: | |
| print(f"Error in visualization creation: {str(e)}") | |
| return visualizations | |
| def analyze_transaction_with_ai(transaction_data, suspicious_transactions, column_mapping): | |
| """Use OpenAI to analyze suspicious transactions and provide insights""" | |
| if not openai.api_key: | |
| return "OpenAI API key not found. Please add it to the Hugging Face Spaces secrets." | |
| try: | |
| # Prepare information for OpenAI, converting to a JSON-serializable format | |
| suspicious_sample = suspicious_transactions.head(5).copy() | |
| # Convert any datetime columns to string format to make it JSON serializable | |
| for col in suspicious_sample.columns: | |
| if pd.api.types.is_datetime64_any_dtype(suspicious_sample[col]): | |
| suspicious_sample[col] = suspicious_sample[col].astype(str) | |
| # Convert NumPy types to Python native types | |
| elif suspicious_sample[col].dtype in (np.int64, np.float64): | |
| suspicious_sample[col] = suspicious_sample[col].astype(float) | |
| # Handle boolean columns | |
| elif suspicious_sample[col].dtype == bool: | |
| suspicious_sample[col] = suspicious_sample[col].astype(str) | |
| # Convert to dictionary | |
| suspicious_dict = suspicious_sample.to_dict(orient='records') | |
| # Get summary statistics | |
| summary_stats = { | |
| "total_transactions": int(len(transaction_data)), | |
| "flagged_transactions": int(len(suspicious_transactions)), | |
| "flagged_percentage": float(round(len(suspicious_transactions) / len(transaction_data) * 100, 2)), | |
| } | |
| # Add amount-related statistics if available | |
| amount_col = column_mapping.get("amount_column") | |
| if amount_col and amount_col in transaction_data.columns: | |
| summary_stats.update({ | |
| "avg_transaction_amount": float(round(transaction_data[amount_col].mean(), 2)), | |
| "suspicious_avg_amount": float(round(suspicious_transactions[amount_col].mean(), 2)) | |
| }) | |
| # Create prompt for OpenAI | |
| prompt = f""" | |
| Analyze these potentially fraudulent transactions and identify patterns or anomalies: | |
| Transaction Data Summary: | |
| {json.dumps(summary_stats)} | |
| Column Mapping: | |
| {json.dumps(column_mapping)} | |
| Sample of Suspicious Transactions: | |
| {json.dumps(suspicious_dict)} | |
| Provide a concise fraud analysis report with: | |
| 1. Key patterns and red flags in these transactions | |
| 2. Possible fraud scenarios explaining the anomalies | |
| 3. Recommended next steps for investigation | |
| """ | |
| # Create an OpenAI client with the API key | |
| client = openai.OpenAI(api_key=openai.api_key) | |
| # Call OpenAI API | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a fraud detection expert helping analyze suspicious financial transactions."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| max_tokens=800 | |
| ) | |
| # Return the AI analysis | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| return f"Error in AI analysis: {str(e)}\n\nTrace: {error_trace}" | |
| def process_transactions(file): | |
| """Main function to process transaction data and detect fraud""" | |
| try: | |
| # Load and preprocess data with LLM-based analysis | |
| processed_df, dataset_explanation, column_mapping = load_and_preprocess_data(file) | |
| if processed_df is None: | |
| return "No file uploaded or error in processing", None, None, None, None, None | |
| # If column_mapping is None, only dataset_explanation was returned (containing error message) | |
| if column_mapping is None: | |
| return f"Error analyzing dataset: {dataset_explanation}", None, None, None, None, None | |
| # Detect fraud and anomalies using the LLM-identified column mapping | |
| df_with_anomalies = detect_fraud_and_anomalies(processed_df, column_mapping) | |
| # Get suspicious transactions | |
| suspicious_transactions = df_with_anomalies[df_with_anomalies['is_suspicious']] | |
| # Create visualizations using the identified columns | |
| visualizations = create_visualizations(df_with_anomalies, column_mapping) | |
| # Basic statistics | |
| total_transactions = len(df_with_anomalies) | |
| suspicious_count = len(suspicious_transactions) | |
| suspicious_percentage = round((suspicious_count / total_transactions) * 100, 2) | |
| # Format statistics for display | |
| stats_summary = f""" | |
| ## Transaction Analysis Summary | |
| - **Total Transactions**: {total_transactions} | |
| - **Suspicious Transactions**: {suspicious_count} ({suspicious_percentage}%) | |
| """ | |
| # Add amount-related statistics if available | |
| amount_col = column_mapping.get("amount_column") | |
| if amount_col and amount_col in df_with_anomalies.columns: | |
| stats_summary += f""" | |
| - **Total Transaction Value**: ${df_with_anomalies[amount_col].sum():,.2f} | |
| - **Suspicious Transaction Value**: ${suspicious_transactions[amount_col].sum():,.2f} | |
| - **Average Transaction Amount**: ${df_with_anomalies[amount_col].mean():,.2f} | |
| - **Average Suspicious Amount**: ${suspicious_transactions[amount_col].mean():,.2f} | |
| """ | |
| # Add dataset explanation from LLM | |
| stats_summary += f""" | |
| ## Dataset Analysis | |
| {dataset_explanation} | |
| ## Detected Columns | |
| """ | |
| for purpose, col_name in column_mapping.items(): | |
| if col_name and purpose not in ["column_descriptions", "fraud_indicator_columns"]: | |
| stats_summary += f"- **{purpose.replace('_column', '')}**: {col_name}\n" | |
| if column_mapping.get("fraud_indicator_columns"): | |
| stats_summary += "\n**Potential Fraud Indicator Columns**:\n" | |
| for col in column_mapping.get("fraud_indicator_columns", []): | |
| stats_summary += f"- {col}\n" | |
| # Get AI analysis of suspicious transactions | |
| ai_analysis = analyze_transaction_with_ai(df_with_anomalies, suspicious_transactions, column_mapping) | |
| # Save suspicious transactions to a temporary file | |
| temp_csv = tempfile.NamedTemporaryFile(delete=False, suffix='.csv') | |
| suspicious_transactions.to_csv(temp_csv.name, index=False) | |
| temp_csv.close() | |
| # Return results and visualizations | |
| return ( | |
| stats_summary, | |
| ai_analysis, | |
| temp_csv.name, # Return the path to the temporary file | |
| visualizations.get('amount_distribution', None), | |
| visualizations.get('time_series', None), | |
| visualizations.get('fraud_score_dist', None) | |
| ) | |
| except Exception as e: | |
| import traceback | |
| error_trace = traceback.format_exc() | |
| return f"Error: {str(e)}\n\nTrace: {error_trace}", None, None, None, None, None | |
| def create_gradio_interface(): | |
| """Create Gradio interface for the application""" | |
| with gr.Blocks(title="AI Fraud Detection System") as app: | |
| gr.Markdown("# AI Transaction Fraud & Anomaly Detection System") | |
| gr.Markdown("Upload your transaction data (CSV or Excel) to detect potential fraud and anomalies. The system will use AI to analyze your dataset structure and identify relevant columns.") | |
| with gr.Row(): | |
| file_input = gr.File(label="Upload Transaction Data", file_types=[".csv", ".xlsx", ".xls"]) | |
| with gr.Row(): | |
| submit_btn = gr.Button("Analyze Transactions", variant="primary") | |
| with gr.Tabs(): | |
| with gr.TabItem("Summary"): | |
| stats_output = gr.Markdown(label="Statistics Summary") | |
| ai_analysis_output = gr.Markdown(label="AI Analysis") | |
| with gr.TabItem("Visualizations"): | |
| with gr.Row(): | |
| amount_dist_plot = gr.Plot(label="Transaction Amount Distribution") | |
| with gr.Row(): | |
| time_series_plot = gr.Plot(label="Transactions Over Time") | |
| fraud_score_plot = gr.Plot(label="Fraud Score Distribution") | |
| with gr.TabItem("Suspicious Transactions"): | |
| suspicious_csv = gr.File(label="Download Suspicious Transactions (CSV)") | |
| submit_btn.click( | |
| process_transactions, | |
| inputs=[file_input], | |
| outputs=[stats_output, ai_analysis_output, suspicious_csv, | |
| amount_dist_plot, time_series_plot, fraud_score_plot] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| # Enable debug mode to get detailed error messages | |
| import logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| app = create_gradio_interface() | |
| app.launch(share=True) |