Spaces:
Sleeping
Sleeping
| """ | |
| Data Cleaning and Quality Assurance Module | |
| This module addresses data quality issues identified by Deepchecks validation: | |
| 1. Removes duplicate samples (6.5% duplicates detected) | |
| 2. Resolves conflicting labels (8.9% samples with conflicts) | |
| 3. Ensures proper train/test split without data leakage | |
| 4. Removes highly correlated features | |
| This script should be run BEFORE training to ensure data quality. | |
| It regenerates the processed data files with cleaned data. | |
| Usage: | |
| python -m hopcroft_skill_classification_tool_competition.data_cleaning | |
| Output: | |
| - data/processed/tfidf/features_tfidf_clean.npy (cleaned training features) | |
| - data/processed/tfidf/labels_tfidf_clean.npy (cleaned training labels) | |
| - data/processed/tfidf/X_test_clean.npy (cleaned test features) | |
| - data/processed/tfidf/Y_test_clean.npy (cleaned test labels) | |
| """ | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.model_selection import train_test_split | |
| from hopcroft_skill_classification_tool_competition.config import PROCESSED_DATA_DIR | |
| def remove_duplicates(X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Dict]: | |
| """ | |
| Remove duplicate samples from the dataset. | |
| Duplicates are identified by identical feature vectors. | |
| When duplicates are found with different labels, we keep the first occurrence. | |
| Args: | |
| X: Feature matrix (samples x features) | |
| y: Label matrix (samples x labels) | |
| Returns: | |
| Tuple of (cleaned_X, cleaned_y, stats_dict) | |
| """ | |
| print("\n" + "=" * 80) | |
| print("STEP 1: REMOVING DUPLICATES") | |
| print("=" * 80) | |
| initial_samples = X.shape[0] | |
| # Convert to DataFrame for easier duplicate detection | |
| # Use feature hash to identify duplicates (more memory efficient than full comparison) | |
| df_features = pd.DataFrame(X) | |
| # Find duplicates based on all features | |
| duplicates_mask = df_features.duplicated(keep="first") | |
| n_duplicates = duplicates_mask.sum() | |
| print(f"Initial samples: {initial_samples:,}") | |
| print(f"Duplicates found: {n_duplicates:,} ({n_duplicates / initial_samples * 100:.2f}%)") | |
| if n_duplicates > 0: | |
| # Keep only non-duplicate rows | |
| X_clean = X[~duplicates_mask] | |
| y_clean = y[~duplicates_mask] | |
| print(f"Samples after removing duplicates: {X_clean.shape[0]:,}") | |
| print(f"Removed: {n_duplicates:,} duplicate samples") | |
| else: | |
| X_clean = X | |
| y_clean = y | |
| print("No duplicates found") | |
| stats = { | |
| "initial_samples": int(initial_samples), | |
| "duplicates_found": int(n_duplicates), | |
| "duplicates_percentage": float(n_duplicates / initial_samples * 100), | |
| "final_samples": int(X_clean.shape[0]), | |
| } | |
| return X_clean, y_clean, stats | |
| def resolve_conflicting_labels( | |
| X: np.ndarray, y: np.ndarray | |
| ) -> Tuple[np.ndarray, np.ndarray, Dict]: | |
| """ | |
| Resolve samples with conflicting labels. | |
| Conflicting labels occur when identical feature vectors have different labels. | |
| Resolution strategy: Use majority voting for each label across duplicates. | |
| Args: | |
| X: Feature matrix (samples x features) | |
| y: Label matrix (samples x labels) | |
| Returns: | |
| Tuple of (cleaned_X, cleaned_y, stats_dict) | |
| """ | |
| print("\n" + "=" * 80) | |
| print("STEP 2: RESOLVING CONFLICTING LABELS") | |
| print("=" * 80) | |
| initial_samples = X.shape[0] | |
| # Create a combined DataFrame | |
| df_X = pd.DataFrame(X) | |
| df_y = pd.DataFrame(y) | |
| # Add a unique identifier based on features (use hash for efficiency) | |
| # Create a string representation of each row | |
| feature_hashes = pd.util.hash_pandas_object(df_X, index=False) | |
| # Group by feature hash | |
| groups = df_y.groupby(feature_hashes) | |
| # Count conflicts: groups with size > 1 | |
| conflicts = groups.size() | |
| n_conflict_groups = (conflicts > 1).sum() | |
| n_conflict_samples = (conflicts[conflicts > 1]).sum() | |
| print(f"Initial samples: {initial_samples:,}") | |
| print(f"Duplicate feature groups: {n_conflict_groups:,}") | |
| print( | |
| f"Samples in conflict groups: {n_conflict_samples:,} ({n_conflict_samples / initial_samples * 100:.2f}%)" | |
| ) | |
| if n_conflict_groups > 0: | |
| # Resolve conflicts using majority voting | |
| # For each group of duplicates, use the most common label value | |
| resolved_labels = groups.apply( | |
| lambda x: x.mode(axis=0).iloc[0] if len(x) > 1 else x.iloc[0] | |
| ) | |
| # Keep only one sample per unique feature vector | |
| unique_indices = ~df_X.duplicated(keep="first") | |
| X_clean = X[unique_indices] | |
| # Map resolved labels back to unique samples | |
| unique_hashes = feature_hashes[unique_indices] | |
| y_clean = np.array([resolved_labels.loc[h].values for h in unique_hashes]) | |
| print(f"Samples after conflict resolution: {X_clean.shape[0]:,}") | |
| print("Conflicts resolved using majority voting") | |
| else: | |
| X_clean = X | |
| y_clean = y | |
| print("No conflicting labels found") | |
| stats = { | |
| "initial_samples": int(initial_samples), | |
| "conflict_groups": int(n_conflict_groups), | |
| "conflict_samples": int(n_conflict_samples), | |
| "conflict_percentage": float(n_conflict_samples / initial_samples * 100), | |
| "final_samples": int(X_clean.shape[0]), | |
| } | |
| return X_clean, y_clean, stats | |
| def remove_sparse_samples( | |
| X: np.ndarray, y: np.ndarray, min_nnz: int = 10 | |
| ) -> Tuple[np.ndarray, np.ndarray, Dict]: | |
| """ | |
| Remove samples with too few non-zero features (incompatible with SMOTE). | |
| Args: | |
| X: Feature matrix | |
| y: Label matrix | |
| min_nnz: Minimum number of non-zero features required | |
| Returns: | |
| Tuple of (X_filtered, y_filtered, statistics_dict) | |
| """ | |
| print("\n" + "=" * 80) | |
| print(f"STEP 3: REMOVING SPARSE SAMPLES (min_nnz={min_nnz})") | |
| print("=" * 80) | |
| n_initial = X.shape[0] | |
| print(f"Initial samples: {n_initial:,}") | |
| nnz_counts = (X != 0).sum(axis=1) | |
| valid_mask = nnz_counts >= min_nnz | |
| X_filtered = X[valid_mask] | |
| y_filtered = y[valid_mask] | |
| n_removed = n_initial - X_filtered.shape[0] | |
| removal_pct = (n_removed / n_initial * 100) if n_initial > 0 else 0 | |
| print(f"Sparse samples (< {min_nnz} features): {n_removed:,} ({removal_pct:.2f}%)") | |
| print(f"Samples after filtering: {X_filtered.shape[0]:,}") | |
| stats = { | |
| "initial_samples": int(n_initial), | |
| "min_nnz_threshold": min_nnz, | |
| "sparse_samples_removed": int(n_removed), | |
| "removal_percentage": float(removal_pct), | |
| "final_samples": int(X_filtered.shape[0]), | |
| } | |
| return X_filtered, y_filtered, stats | |
| def remove_empty_labels( | |
| X: np.ndarray, y: np.ndarray, min_count: int = 5 | |
| ) -> Tuple[np.ndarray, np.ndarray, Dict]: | |
| """ | |
| Remove labels with too few occurrences (cannot be stratified). | |
| Args: | |
| X: Feature matrix | |
| y: Label matrix | |
| min_count: Minimum number of occurrences required per label | |
| Returns: | |
| Tuple of (X_same, y_filtered, statistics_dict) | |
| """ | |
| print("\n" + "=" * 80) | |
| print(f"STEP 4: REMOVING RARE LABELS (min_count={min_count})") | |
| print("=" * 80) | |
| n_initial_labels = y.shape[1] | |
| print(f"Initial labels: {n_initial_labels:,}") | |
| label_counts = y.sum(axis=0) | |
| valid_labels = label_counts >= min_count | |
| y_filtered = y[:, valid_labels] | |
| n_removed = n_initial_labels - y_filtered.shape[1] | |
| removal_pct = (n_removed / n_initial_labels * 100) if n_initial_labels > 0 else 0 | |
| print(f"Rare labels (< {min_count} occurrences): {n_removed:,} ({removal_pct:.2f}%)") | |
| print(f"Labels after filtering: {y_filtered.shape[1]:,}") | |
| stats = { | |
| "initial_labels": int(n_initial_labels), | |
| "min_count_threshold": min_count, | |
| "rare_labels_removed": int(n_removed), | |
| "removal_percentage": float(removal_pct), | |
| "final_labels": int(y_filtered.shape[1]), | |
| } | |
| return X, y_filtered, stats | |
| def create_clean_train_test_split( | |
| X: np.ndarray, y: np.ndarray, test_size: float = 0.2, random_state: int = 42 | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, Dict]: | |
| """ | |
| Create train/test split with verification of no data leakage. | |
| Uses MultilabelStratifiedShuffleSplit if available. | |
| Args: | |
| X: Feature matrix | |
| y: Label matrix | |
| test_size: Proportion of test set (default: 0.2 = 20%) | |
| random_state: Random seed for reproducibility | |
| Returns: | |
| Tuple of (X_train, X_test, y_train, y_test, stats_dict) | |
| """ | |
| print("\n" + "=" * 80) | |
| print("STEP 5: CREATING CLEAN TRAIN/TEST SPLIT") | |
| print("=" * 80) | |
| print(f"Total samples: {X.shape[0]:,}") | |
| print(f"Test size: {test_size * 100:.1f}%") | |
| print(f"Random state: {random_state}") | |
| # Try to use iterative-stratification for better multi-label splits | |
| try: | |
| from iterstrat.ml_stratifiers import MultilabelStratifiedShuffleSplit | |
| has_iterstrat = True | |
| print("Using MultilabelStratifiedShuffleSplit (iterative-stratification)") | |
| except ImportError: | |
| has_iterstrat = False | |
| print( | |
| "WARNING: iterative-stratification not installed. Using standard stratification (suboptimal for multi-label)." | |
| ) | |
| if has_iterstrat: | |
| msss = MultilabelStratifiedShuffleSplit( | |
| n_splits=1, test_size=test_size, random_state=random_state | |
| ) | |
| train_index, test_index = next(msss.split(X, y)) | |
| X_train, X_test = X[train_index], X[test_index] | |
| y_train, y_test = y[train_index], y[test_index] | |
| else: | |
| # Fallback: Perform stratified split based on first label column (approximate stratification) | |
| stratify_column = y[:, 0] if y.ndim > 1 else y | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, y, test_size=test_size, random_state=random_state, stratify=stratify_column | |
| ) | |
| # Verify no data leakage: check for overlapping samples | |
| print("\nVerifying no data leakage...") | |
| # Convert to sets of row hashes for efficient comparison | |
| train_hashes = set(pd.util.hash_pandas_object(pd.DataFrame(X_train), index=False)) | |
| test_hashes = set(pd.util.hash_pandas_object(pd.DataFrame(X_test), index=False)) | |
| overlap = train_hashes & test_hashes | |
| if len(overlap) > 0: | |
| raise ValueError( | |
| f"DATA LEAKAGE DETECTED: {len(overlap)} samples appear in both train and test!" | |
| ) | |
| print("No data leakage detected") | |
| print(f"Train samples: {X_train.shape[0]:,} ({X_train.shape[0] / X.shape[0] * 100:.1f}%)") | |
| print(f"Test samples: {X_test.shape[0]:,} ({X_test.shape[0] / X.shape[0] * 100:.1f}%)") | |
| # Verify feature dimensions match | |
| if X_train.shape[1] != X_test.shape[1]: | |
| raise ValueError( | |
| f"Feature dimensions don't match: train={X_train.shape[1]}, test={X_test.shape[1]}" | |
| ) | |
| print(f"Feature dimensions match: {X_train.shape[1]:,}") | |
| stats = { | |
| "total_samples": int(X.shape[0]), | |
| "train_samples": int(X_train.shape[0]), | |
| "test_samples": int(X_test.shape[0]), | |
| "train_percentage": float(X_train.shape[0] / X.shape[0] * 100), | |
| "test_percentage": float(X_test.shape[0] / X.shape[0] * 100), | |
| "features": int(X_train.shape[1]), | |
| "labels": int(y_train.shape[1]) if y_train.ndim > 1 else 1, | |
| "data_leakage": False, | |
| "overlap_samples": 0, | |
| "stratification_method": "MultilabelStratifiedShuffleSplit" | |
| if has_iterstrat | |
| else "Standard StratifiedShuffleSplit", | |
| } | |
| return X_train, X_test, y_train, y_test, stats | |
| def save_cleaned_data( | |
| X_train: np.ndarray, | |
| X_test: np.ndarray, | |
| y_train: np.ndarray, | |
| y_test: np.ndarray, | |
| stats: Dict, | |
| output_dir: Optional[Path] = None, | |
| feature_type: str = "tfidf", | |
| ) -> None: | |
| """ | |
| Save cleaned train/test split to disk. | |
| Args: | |
| X_train: Training features | |
| X_test: Test features | |
| y_train: Training labels | |
| y_test: Test labels | |
| stats: Dictionary with cleaning statistics | |
| output_dir: Output directory (default: data/processed/{feature_type}/) | |
| feature_type: Type of features ('tfidf' or 'embedding') | |
| """ | |
| print("\n" + "=" * 80) | |
| print("STEP 6: SAVING CLEANED DATA") | |
| print("=" * 80) | |
| if output_dir is None: | |
| output_dir = PROCESSED_DATA_DIR / feature_type | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Save cleaned data with "_clean" suffix | |
| files = { | |
| "features_train": output_dir / f"features_{feature_type}_clean.npy", | |
| "labels_train": output_dir / f"labels_{feature_type}_clean.npy", | |
| "features_test": output_dir / f"X_test_{feature_type}_clean.npy", | |
| "labels_test": output_dir / f"Y_test_{feature_type}_clean.npy", | |
| } | |
| np.save(files["features_train"], X_train) | |
| np.save(files["labels_train"], y_train) | |
| np.save(files["features_test"], X_test) | |
| np.save(files["labels_test"], y_test) | |
| print(f"\nSaved cleaned data to: {output_dir}") | |
| for name, path in files.items(): | |
| print(f" - {path.name}") | |
| def clean_and_split_data( | |
| test_size: float = 0.2, | |
| random_state: int = 42, | |
| regenerate_features: bool = True, | |
| feature_type: str = "embedding", # 'tfidf' or 'embedding' | |
| model_name: str = "all-MiniLM-L6-v2", | |
| max_features: int = 2000, # Only for TF-IDF (must match features.py default) | |
| ) -> Dict: | |
| """ | |
| Main function to clean data and create proper train/test split. | |
| This function: | |
| 1. Loads or regenerates features (TF-IDF or Embeddings) | |
| 2. Removes duplicate samples | |
| 3. Resolves conflicting labels | |
| 4. Creates clean train/test split | |
| 5. Verifies no data leakage | |
| 6. Saves cleaned data | |
| Args: | |
| test_size: Proportion of test set (default: 0.2) | |
| random_state: Random seed for reproducibility (default: 42) | |
| regenerate_features: If True, regenerate features from database (default: True) | |
| feature_type: Type of features to extract ('tfidf' or 'embedding') | |
| model_name: Model name for embeddings | |
| max_features: Maximum number of TF-IDF features (default: 1000) | |
| Returns: | |
| Dictionary with all cleaning statistics | |
| """ | |
| print("=" * 80) | |
| print("DATA CLEANING AND QUALITY ASSURANCE PIPELINE") | |
| print("=" * 80) | |
| print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"Test size: {test_size * 100:.1f}%") | |
| print(f"Random state: {random_state}") | |
| print(f"Regenerate features: {regenerate_features}") | |
| print(f"Feature type: {feature_type}") | |
| if feature_type == "embedding": | |
| print(f"Model name: {model_name}") | |
| else: | |
| print(f"Max features: {max_features}") | |
| # Step 0: Load or generate features | |
| if regenerate_features: | |
| print("\nRegenerating features from database...") | |
| # Load data and extract features | |
| from hopcroft_skill_classification_tool_competition.features import create_feature_dataset | |
| # Use the unified create_feature_dataset function | |
| features, labels, _, _ = create_feature_dataset( | |
| save_processed=False, # Don't save intermediate raw features, just return them | |
| feature_type=feature_type, | |
| model_name=model_name, | |
| ) | |
| X = features | |
| y = labels.values | |
| else: | |
| print(f"\nLoading existing features ({feature_type})...") | |
| data_dir = PROCESSED_DATA_DIR / feature_type | |
| X = np.load(data_dir / f"features_{feature_type}.npy") | |
| y = np.load(data_dir / f"labels_{feature_type}.npy") | |
| print("\nInitial data shape:") | |
| print(f" Features: {X.shape}") | |
| print(f" Labels: {y.shape}") | |
| # Step 1: Remove duplicates | |
| X_no_dup, y_no_dup, dup_stats = remove_duplicates(X, y) | |
| # Step 2: Resolve conflicting labels | |
| X_no_conf, y_no_conf, conflict_stats = resolve_conflicting_labels(X_no_dup, y_no_dup) | |
| # Step 3: Remove sparse samples | |
| # For embeddings, we don't have "sparse" features in the same way as TF-IDF (zeros). | |
| # But we can check for near-zero vectors if needed. | |
| # For now, we skip sparse check for embeddings or keep it if it checks for all-zeros. | |
| if feature_type == "tfidf": | |
| X_no_sparse, y_no_sparse, sparse_stats = remove_sparse_samples( | |
| X_no_conf, y_no_conf, min_nnz=10 | |
| ) | |
| else: | |
| # Skip sparse check for embeddings as they are dense | |
| X_no_sparse, y_no_sparse = X_no_conf, y_no_conf | |
| sparse_stats = {"sparse_samples_removed": 0, "removal_percentage": 0.0} | |
| print("\nSkipping sparse sample removal for dense embeddings.") | |
| # Step 4: Remove rare labels | |
| X_clean, y_clean, rare_stats = remove_empty_labels(X_no_sparse, y_no_sparse, min_count=5) | |
| # Step 5: Create clean train/test split | |
| X_train, X_test, y_train, y_test, split_stats = create_clean_train_test_split( | |
| X_clean, y_clean, test_size=test_size, random_state=random_state | |
| ) | |
| # Step 6: Save cleaned data | |
| all_stats = { | |
| "duplicates": dup_stats, | |
| "conflicts": conflict_stats, | |
| "sparse_samples": sparse_stats, | |
| "rare_labels": rare_stats, | |
| "split": split_stats, | |
| "feature_type": feature_type, | |
| } | |
| # Save to specific directory based on feature type | |
| output_dir = PROCESSED_DATA_DIR / feature_type | |
| save_cleaned_data( | |
| X_train, | |
| X_test, | |
| y_train, | |
| y_test, | |
| all_stats, | |
| output_dir=output_dir, | |
| feature_type=feature_type, | |
| ) | |
| # Print final summary | |
| print("\n" + "=" * 80) | |
| print("CLEANING PIPELINE COMPLETED SUCCESSFULLY") | |
| print("=" * 80) | |
| print("\nSummary:") | |
| print(f" Original samples: {X.shape[0]:,}") | |
| print(f" Original labels: {y.shape[1]:,}") | |
| print( | |
| f" Duplicates removed: {dup_stats['duplicates_found']:,} ({dup_stats['duplicates_percentage']:.2f}%)" | |
| ) | |
| print( | |
| f" Conflicts resolved: {conflict_stats['conflict_samples']:,} ({conflict_stats['conflict_percentage']:.2f}%)" | |
| ) | |
| print( | |
| f" Sparse samples removed: {sparse_stats['sparse_samples_removed']:,} ({sparse_stats['removal_percentage']:.2f}%)" | |
| ) | |
| print( | |
| f" Rare labels removed: {rare_stats['rare_labels_removed']:,} ({rare_stats['removal_percentage']:.2f}%)" | |
| ) | |
| print(f" Final clean samples: {split_stats['total_samples']:,}") | |
| print(f" Final clean labels: {y_clean.shape[1]:,}") | |
| print( | |
| f" Train samples: {split_stats['train_samples']:,} ({split_stats['train_percentage']:.1f}%)" | |
| ) | |
| print( | |
| f" Test samples: {split_stats['test_samples']:,} ({split_stats['test_percentage']:.1f}%)" | |
| ) | |
| print("\nData quality issues resolved:") | |
| print(" - Duplicates removed") | |
| print(" - Label conflicts resolved") | |
| if feature_type == "tfidf": | |
| print(" - Sparse samples removed") | |
| print(" - Rare labels removed") | |
| print(" - Clean train/test split created") | |
| print(" - No data leakage verified") | |
| print("=" * 80) | |
| return all_stats | |
| if __name__ == "__main__": | |
| # Run the cleaning pipeline | |
| stats = clean_and_split_data( | |
| test_size=0.2, # 80/20 split | |
| random_state=42, | |
| regenerate_features=True, | |
| feature_type="embedding", | |
| model_name="all-MiniLM-L6-v2", | |
| ) | |