""" Data Leakage Detection Script Checks for common data leakage issues: 1. Duplicate URLs in train/test split 2. Feature extraction timing (done before split - CORRECT) 3. Scaler fitting (only on train data - CORRECT) 4. Feature contamination checks """ import pandas as pd import numpy as np from pathlib import Path from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger("data_leakage_check") def check_1_duplicate_urls_in_splits(): """Check if same URLs appear in both train and test sets.""" logger.info("\n" + "="*80) logger.info("CHECK 1: DUPLICATE URLs IN TRAIN/TEST SPLITS") logger.info("="*80) # Load original dataset with URLs data_dir = Path('data/processed') original_df = pd.read_csv(data_dir / 'clean_dataset_no_duplicates.csv') logger.info(f"\nOriginal dataset: {len(original_df):,} URLs") # Check for duplicates in original dataset duplicates = original_df['url'].duplicated().sum() logger.info(f"Duplicates in original dataset: {duplicates}") if duplicates > 0: logger.warning(f"⚠️ Found {duplicates} duplicate URLs in original dataset!") dup_urls = original_df[original_df['url'].duplicated(keep=False)]['url'].value_counts() logger.info(f"Top duplicated URLs:\n{dup_urls.head(10)}") else: logger.info("✓ No duplicates in original dataset") # Simulate train/test split (same as in training) X = original_df['url'] y = original_df['label'] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) logger.info(f"\nTrain set: {len(X_train):,} URLs") logger.info(f"Test set: {len(X_test):,} URLs") # Check for overlap train_urls = set(X_train) test_urls = set(X_test) overlap = train_urls.intersection(test_urls) logger.info(f"\nOverlapping URLs between train/test: {len(overlap)}") if len(overlap) > 0: logger.error(f"❌ DATA LEAKAGE DETECTED! {len(overlap)} URLs in both train and test!") logger.info(f"Sample overlapping URLs:\n{list(overlap)[:5]}") return False else: logger.info("✓ No URL overlap between train and test sets") return True def check_2_feature_extraction_timing(): """Check if features were extracted before split (CORRECT) or after (WRONG).""" logger.info("\n" + "="*80) logger.info("CHECK 2: FEATURE EXTRACTION TIMING") logger.info("="*80) # Load feature dataset features_df = pd.read_csv('data/features/url_features.csv') logger.info(f"\nFeature dataset: {len(features_df):,} rows") logger.info(f"Features: {len(features_df.columns) - 1}") # Load original dataset original_df = pd.read_csv('data/processed/clean_dataset.csv') logger.info(f"Original dataset: {len(original_df):,} rows") # Check sizes match if len(features_df) == len(original_df): logger.info("✓ Feature extraction done on ENTIRE dataset (before split)") logger.info(" This is CORRECT - prevents data leakage") return True else: logger.warning("⚠️ Dataset sizes don't match - check extraction process") logger.info(f" Difference: {abs(len(features_df) - len(original_df))}") return False def check_3_scaler_fitting(): """Check if scaler was fitted only on train data.""" logger.info("\n" + "="*80) logger.info("CHECK 3: SCALER FITTING (Logistic Regression only)") logger.info("="*80) # Load features features_df = pd.read_csv('data/features/url_features.csv') X = features_df.drop('label', axis=1) y = features_df['label'] # Split X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) # CORRECT way: fit on train, transform both scaler_correct = StandardScaler() X_train_scaled_correct = scaler_correct.fit_transform(X_train) X_test_scaled_correct = scaler_correct.transform(X_test) # WRONG way: fit on all data scaler_wrong = StandardScaler() X_all_scaled_wrong = scaler_wrong.fit_transform(X) X_train_wrong = X_all_scaled_wrong[:len(X_train)] X_test_wrong = X_all_scaled_wrong[len(X_train):] # Compare statistics logger.info("\nScaler statistics comparison:") logger.info("\nCORRECT (fitted on train only):") logger.info(f" Train mean: {scaler_correct.mean_[:5]}") logger.info(f" Train std: {scaler_correct.scale_[:5]}") logger.info("\nWRONG (fitted on all data):") logger.info(f" All mean: {scaler_wrong.mean_[:5]}") logger.info(f" All std: {scaler_wrong.scale_[:5]}") # Check difference mean_diff = np.abs(scaler_correct.mean_ - scaler_wrong.mean_).mean() std_diff = np.abs(scaler_correct.scale_ - scaler_wrong.scale_).mean() logger.info(f"\nAverage difference:") logger.info(f" Mean: {mean_diff:.6f}") logger.info(f" Std: {std_diff:.6f}") if mean_diff < 0.01 and std_diff < 0.01: logger.info("✓ Minimal difference - scaler likely fitted correctly on train only") return True else: logger.warning("⚠️ Significant difference detected - review scaler fitting") return False def check_4_feature_contamination(): """Check for features that could leak information.""" logger.info("\n" + "="*80) logger.info("CHECK 4: FEATURE CONTAMINATION") logger.info("="*80) features_df = pd.read_csv('data/features/url_features.csv') # Check for suspiciously perfect features logger.info("\nChecking for suspiciously perfect correlations with label...") X = features_df.drop('label', axis=1) y = features_df['label'] correlations = X.corrwith(y).abs().sort_values(ascending=False) logger.info("\nTop 10 features correlated with label:") for feat, corr in correlations.head(10).items(): logger.info(f" {feat:30s}: {corr:.4f}") # Check for suspiciously high correlations (> 0.9 is suspicious) suspicious = correlations[correlations > 0.9] if len(suspicious) > 0: logger.warning(f"⚠️ Found {len(suspicious)} features with >0.9 correlation!") logger.warning(f" These might be leaking information:\n{suspicious}") return False else: logger.info("✓ No suspiciously high correlations detected") return True def check_5_train_test_distribution(): """Check if train/test have similar distributions.""" logger.info("\n" + "="*80) logger.info("CHECK 5: TRAIN/TEST DISTRIBUTION SIMILARITY") logger.info("="*80) features_df = pd.read_csv('data/features/url_features.csv') X = features_df.drop('label', axis=1) y = features_df['label'] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) # Check label distribution logger.info("\nLabel distribution:") logger.info(f" Train: {y_train.value_counts().to_dict()}") logger.info(f" Test: {y_test.value_counts().to_dict()}") train_phishing_ratio = (y_train == 1).sum() / len(y_train) test_phishing_ratio = (y_test == 1).sum() / len(y_test) logger.info(f"\nPhishing ratio:") logger.info(f" Train: {train_phishing_ratio:.4f}") logger.info(f" Test: {test_phishing_ratio:.4f}") logger.info(f" Difference: {abs(train_phishing_ratio - test_phishing_ratio):.4f}") if abs(train_phishing_ratio - test_phishing_ratio) < 0.01: logger.info("✓ Train/test distributions are well balanced") return True else: logger.warning("⚠️ Train/test distributions differ significantly") return False def main(): """Run all data leakage checks.""" logger.info("="*80) logger.info("DATA LEAKAGE DETECTION") logger.info("="*80) results = {} try: results['duplicates'] = check_1_duplicate_urls_in_splits() except Exception as e: logger.error(f"Error in duplicate check: {e}") results['duplicates'] = None try: results['extraction_timing'] = check_2_feature_extraction_timing() except Exception as e: logger.error(f"Error in extraction timing check: {e}") results['extraction_timing'] = None try: results['scaler'] = check_3_scaler_fitting() except Exception as e: logger.error(f"Error in scaler check: {e}") results['scaler'] = None try: results['contamination'] = check_4_feature_contamination() except Exception as e: logger.error(f"Error in contamination check: {e}") results['contamination'] = None try: results['distribution'] = check_5_train_test_distribution() except Exception as e: logger.error(f"Error in distribution check: {e}") results['distribution'] = None # Final summary logger.info("\n" + "="*80) logger.info("SUMMARY") logger.info("="*80) passed = sum(1 for v in results.values() if v is True) failed = sum(1 for v in results.values() if v is False) errors = sum(1 for v in results.values() if v is None) logger.info(f"\nChecks passed: {passed}") logger.info(f"Checks failed: {failed}") logger.info(f"Checks errored: {errors}") for check, result in results.items(): status = "✓ PASS" if result else ("❌ FAIL" if result is False else "⚠️ ERROR") logger.info(f" {check:20s}: {status}") if failed == 0 and errors == 0: logger.info("\n🎉 ALL CHECKS PASSED - No data leakage detected!") logger.info("Your results are LEGITIMATE!") elif failed > 0: logger.warning(f"\n⚠️ {failed} checks failed - review your pipeline!") logger.info("\n" + "="*80) if __name__ == "__main__": main()