rb1337's picture
Upload 50 files
2cc7f91 verified
"""
Data Leakage Detection Script
Checks for common data leakage issues:
1. Duplicate URLs in train/test split
2. Feature extraction timing (done before split - CORRECT)
3. Scaler fitting (only on train data - CORRECT)
4. Feature contamination checks
"""
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger("data_leakage_check")
def check_1_duplicate_urls_in_splits():
"""Check if same URLs appear in both train and test sets."""
logger.info("\n" + "="*80)
logger.info("CHECK 1: DUPLICATE URLs IN TRAIN/TEST SPLITS")
logger.info("="*80)
# Load original dataset with URLs
data_dir = Path('data/processed')
original_df = pd.read_csv(data_dir / 'clean_dataset_no_duplicates.csv')
logger.info(f"\nOriginal dataset: {len(original_df):,} URLs")
# Check for duplicates in original dataset
duplicates = original_df['url'].duplicated().sum()
logger.info(f"Duplicates in original dataset: {duplicates}")
if duplicates > 0:
logger.warning(f"⚠️ Found {duplicates} duplicate URLs in original dataset!")
dup_urls = original_df[original_df['url'].duplicated(keep=False)]['url'].value_counts()
logger.info(f"Top duplicated URLs:\n{dup_urls.head(10)}")
else:
logger.info("✓ No duplicates in original dataset")
# Simulate train/test split (same as in training)
X = original_df['url']
y = original_df['label']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
logger.info(f"\nTrain set: {len(X_train):,} URLs")
logger.info(f"Test set: {len(X_test):,} URLs")
# Check for overlap
train_urls = set(X_train)
test_urls = set(X_test)
overlap = train_urls.intersection(test_urls)
logger.info(f"\nOverlapping URLs between train/test: {len(overlap)}")
if len(overlap) > 0:
logger.error(f"❌ DATA LEAKAGE DETECTED! {len(overlap)} URLs in both train and test!")
logger.info(f"Sample overlapping URLs:\n{list(overlap)[:5]}")
return False
else:
logger.info("✓ No URL overlap between train and test sets")
return True
def check_2_feature_extraction_timing():
"""Check if features were extracted before split (CORRECT) or after (WRONG)."""
logger.info("\n" + "="*80)
logger.info("CHECK 2: FEATURE EXTRACTION TIMING")
logger.info("="*80)
# Load feature dataset
features_df = pd.read_csv('data/features/url_features.csv')
logger.info(f"\nFeature dataset: {len(features_df):,} rows")
logger.info(f"Features: {len(features_df.columns) - 1}")
# Load original dataset
original_df = pd.read_csv('data/processed/clean_dataset.csv')
logger.info(f"Original dataset: {len(original_df):,} rows")
# Check sizes match
if len(features_df) == len(original_df):
logger.info("✓ Feature extraction done on ENTIRE dataset (before split)")
logger.info(" This is CORRECT - prevents data leakage")
return True
else:
logger.warning("⚠️ Dataset sizes don't match - check extraction process")
logger.info(f" Difference: {abs(len(features_df) - len(original_df))}")
return False
def check_3_scaler_fitting():
"""Check if scaler was fitted only on train data."""
logger.info("\n" + "="*80)
logger.info("CHECK 3: SCALER FITTING (Logistic Regression only)")
logger.info("="*80)
# Load features
features_df = pd.read_csv('data/features/url_features.csv')
X = features_df.drop('label', axis=1)
y = features_df['label']
# Split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# CORRECT way: fit on train, transform both
scaler_correct = StandardScaler()
X_train_scaled_correct = scaler_correct.fit_transform(X_train)
X_test_scaled_correct = scaler_correct.transform(X_test)
# WRONG way: fit on all data
scaler_wrong = StandardScaler()
X_all_scaled_wrong = scaler_wrong.fit_transform(X)
X_train_wrong = X_all_scaled_wrong[:len(X_train)]
X_test_wrong = X_all_scaled_wrong[len(X_train):]
# Compare statistics
logger.info("\nScaler statistics comparison:")
logger.info("\nCORRECT (fitted on train only):")
logger.info(f" Train mean: {scaler_correct.mean_[:5]}")
logger.info(f" Train std: {scaler_correct.scale_[:5]}")
logger.info("\nWRONG (fitted on all data):")
logger.info(f" All mean: {scaler_wrong.mean_[:5]}")
logger.info(f" All std: {scaler_wrong.scale_[:5]}")
# Check difference
mean_diff = np.abs(scaler_correct.mean_ - scaler_wrong.mean_).mean()
std_diff = np.abs(scaler_correct.scale_ - scaler_wrong.scale_).mean()
logger.info(f"\nAverage difference:")
logger.info(f" Mean: {mean_diff:.6f}")
logger.info(f" Std: {std_diff:.6f}")
if mean_diff < 0.01 and std_diff < 0.01:
logger.info("✓ Minimal difference - scaler likely fitted correctly on train only")
return True
else:
logger.warning("⚠️ Significant difference detected - review scaler fitting")
return False
def check_4_feature_contamination():
"""Check for features that could leak information."""
logger.info("\n" + "="*80)
logger.info("CHECK 4: FEATURE CONTAMINATION")
logger.info("="*80)
features_df = pd.read_csv('data/features/url_features.csv')
# Check for suspiciously perfect features
logger.info("\nChecking for suspiciously perfect correlations with label...")
X = features_df.drop('label', axis=1)
y = features_df['label']
correlations = X.corrwith(y).abs().sort_values(ascending=False)
logger.info("\nTop 10 features correlated with label:")
for feat, corr in correlations.head(10).items():
logger.info(f" {feat:30s}: {corr:.4f}")
# Check for suspiciously high correlations (> 0.9 is suspicious)
suspicious = correlations[correlations > 0.9]
if len(suspicious) > 0:
logger.warning(f"⚠️ Found {len(suspicious)} features with >0.9 correlation!")
logger.warning(f" These might be leaking information:\n{suspicious}")
return False
else:
logger.info("✓ No suspiciously high correlations detected")
return True
def check_5_train_test_distribution():
"""Check if train/test have similar distributions."""
logger.info("\n" + "="*80)
logger.info("CHECK 5: TRAIN/TEST DISTRIBUTION SIMILARITY")
logger.info("="*80)
features_df = pd.read_csv('data/features/url_features.csv')
X = features_df.drop('label', axis=1)
y = features_df['label']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Check label distribution
logger.info("\nLabel distribution:")
logger.info(f" Train: {y_train.value_counts().to_dict()}")
logger.info(f" Test: {y_test.value_counts().to_dict()}")
train_phishing_ratio = (y_train == 1).sum() / len(y_train)
test_phishing_ratio = (y_test == 1).sum() / len(y_test)
logger.info(f"\nPhishing ratio:")
logger.info(f" Train: {train_phishing_ratio:.4f}")
logger.info(f" Test: {test_phishing_ratio:.4f}")
logger.info(f" Difference: {abs(train_phishing_ratio - test_phishing_ratio):.4f}")
if abs(train_phishing_ratio - test_phishing_ratio) < 0.01:
logger.info("✓ Train/test distributions are well balanced")
return True
else:
logger.warning("⚠️ Train/test distributions differ significantly")
return False
def main():
"""Run all data leakage checks."""
logger.info("="*80)
logger.info("DATA LEAKAGE DETECTION")
logger.info("="*80)
results = {}
try:
results['duplicates'] = check_1_duplicate_urls_in_splits()
except Exception as e:
logger.error(f"Error in duplicate check: {e}")
results['duplicates'] = None
try:
results['extraction_timing'] = check_2_feature_extraction_timing()
except Exception as e:
logger.error(f"Error in extraction timing check: {e}")
results['extraction_timing'] = None
try:
results['scaler'] = check_3_scaler_fitting()
except Exception as e:
logger.error(f"Error in scaler check: {e}")
results['scaler'] = None
try:
results['contamination'] = check_4_feature_contamination()
except Exception as e:
logger.error(f"Error in contamination check: {e}")
results['contamination'] = None
try:
results['distribution'] = check_5_train_test_distribution()
except Exception as e:
logger.error(f"Error in distribution check: {e}")
results['distribution'] = None
# Final summary
logger.info("\n" + "="*80)
logger.info("SUMMARY")
logger.info("="*80)
passed = sum(1 for v in results.values() if v is True)
failed = sum(1 for v in results.values() if v is False)
errors = sum(1 for v in results.values() if v is None)
logger.info(f"\nChecks passed: {passed}")
logger.info(f"Checks failed: {failed}")
logger.info(f"Checks errored: {errors}")
for check, result in results.items():
status = "✓ PASS" if result else ("❌ FAIL" if result is False else "⚠️ ERROR")
logger.info(f" {check:20s}: {status}")
if failed == 0 and errors == 0:
logger.info("\n🎉 ALL CHECKS PASSED - No data leakage detected!")
logger.info("Your results are LEGITIMATE!")
elif failed > 0:
logger.warning(f"\n⚠️ {failed} checks failed - review your pipeline!")
logger.info("\n" + "="*80)
if __name__ == "__main__":
main()