|
|
""" |
|
|
Data Cleaning and Quality Assurance Module |
|
|
|
|
|
This module addresses data quality issues identified by Deepchecks validation: |
|
|
1. Removes duplicate samples (6.5% duplicates detected) |
|
|
2. Resolves conflicting labels (8.9% samples with conflicts) |
|
|
3. Ensures proper train/test split without data leakage |
|
|
4. Removes highly correlated features |
|
|
|
|
|
This script should be run BEFORE training to ensure data quality. |
|
|
It regenerates the processed data files with cleaned data. |
|
|
|
|
|
Usage: |
|
|
python -m hopcroft_skill_classification_tool_competition.data_cleaning |
|
|
|
|
|
Output: |
|
|
- data/processed/tfidf/features_tfidf_clean.npy (cleaned training features) |
|
|
- data/processed/tfidf/labels_tfidf_clean.npy (cleaned training labels) |
|
|
- data/processed/tfidf/X_test_clean.npy (cleaned test features) |
|
|
- data/processed/tfidf/Y_test_clean.npy (cleaned test labels) |
|
|
""" |
|
|
|
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Dict, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from sklearn.model_selection import train_test_split |
|
|
|
|
|
from hopcroft_skill_classification_tool_competition.config import PROCESSED_DATA_DIR |
|
|
|
|
|
|
|
|
def remove_duplicates(X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Dict]: |
|
|
""" |
|
|
Remove duplicate samples from the dataset. |
|
|
|
|
|
Duplicates are identified by identical feature vectors. |
|
|
When duplicates are found with different labels, we keep the first occurrence. |
|
|
|
|
|
Args: |
|
|
X: Feature matrix (samples x features) |
|
|
y: Label matrix (samples x labels) |
|
|
|
|
|
Returns: |
|
|
Tuple of (cleaned_X, cleaned_y, stats_dict) |
|
|
""" |
|
|
print("\n" + "=" * 80) |
|
|
print("STEP 1: REMOVING DUPLICATES") |
|
|
print("=" * 80) |
|
|
|
|
|
initial_samples = X.shape[0] |
|
|
|
|
|
|
|
|
|
|
|
df_features = pd.DataFrame(X) |
|
|
|
|
|
|
|
|
duplicates_mask = df_features.duplicated(keep="first") |
|
|
n_duplicates = duplicates_mask.sum() |
|
|
|
|
|
print(f"Initial samples: {initial_samples:,}") |
|
|
print(f"Duplicates found: {n_duplicates:,} ({n_duplicates / initial_samples * 100:.2f}%)") |
|
|
|
|
|
if n_duplicates > 0: |
|
|
|
|
|
X_clean = X[~duplicates_mask] |
|
|
y_clean = y[~duplicates_mask] |
|
|
|
|
|
print(f"Samples after removing duplicates: {X_clean.shape[0]:,}") |
|
|
print(f"Removed: {n_duplicates:,} duplicate samples") |
|
|
else: |
|
|
X_clean = X |
|
|
y_clean = y |
|
|
print("No duplicates found") |
|
|
|
|
|
stats = { |
|
|
"initial_samples": int(initial_samples), |
|
|
"duplicates_found": int(n_duplicates), |
|
|
"duplicates_percentage": float(n_duplicates / initial_samples * 100), |
|
|
"final_samples": int(X_clean.shape[0]), |
|
|
} |
|
|
|
|
|
return X_clean, y_clean, stats |
|
|
|
|
|
|
|
|
def resolve_conflicting_labels( |
|
|
X: np.ndarray, y: np.ndarray |
|
|
) -> Tuple[np.ndarray, np.ndarray, Dict]: |
|
|
""" |
|
|
Resolve samples with conflicting labels. |
|
|
|
|
|
Conflicting labels occur when identical feature vectors have different labels. |
|
|
Resolution strategy: Use majority voting for each label across duplicates. |
|
|
|
|
|
Args: |
|
|
X: Feature matrix (samples x features) |
|
|
y: Label matrix (samples x labels) |
|
|
|
|
|
Returns: |
|
|
Tuple of (cleaned_X, cleaned_y, stats_dict) |
|
|
""" |
|
|
print("\n" + "=" * 80) |
|
|
print("STEP 2: RESOLVING CONFLICTING LABELS") |
|
|
print("=" * 80) |
|
|
|
|
|
initial_samples = X.shape[0] |
|
|
|
|
|
|
|
|
df_X = pd.DataFrame(X) |
|
|
df_y = pd.DataFrame(y) |
|
|
|
|
|
|
|
|
|
|
|
feature_hashes = pd.util.hash_pandas_object(df_X, index=False) |
|
|
|
|
|
|
|
|
groups = df_y.groupby(feature_hashes) |
|
|
|
|
|
|
|
|
conflicts = groups.size() |
|
|
n_conflict_groups = (conflicts > 1).sum() |
|
|
n_conflict_samples = (conflicts[conflicts > 1]).sum() |
|
|
|
|
|
print(f"Initial samples: {initial_samples:,}") |
|
|
print(f"Duplicate feature groups: {n_conflict_groups:,}") |
|
|
print( |
|
|
f"Samples in conflict groups: {n_conflict_samples:,} ({n_conflict_samples / initial_samples * 100:.2f}%)" |
|
|
) |
|
|
|
|
|
if n_conflict_groups > 0: |
|
|
|
|
|
|
|
|
resolved_labels = groups.apply( |
|
|
lambda x: x.mode(axis=0).iloc[0] if len(x) > 1 else x.iloc[0] |
|
|
) |
|
|
|
|
|
|
|
|
unique_indices = ~df_X.duplicated(keep="first") |
|
|
X_clean = X[unique_indices] |
|
|
|
|
|
|
|
|
unique_hashes = feature_hashes[unique_indices] |
|
|
y_clean = np.array([resolved_labels.loc[h].values for h in unique_hashes]) |
|
|
|
|
|
print(f"Samples after conflict resolution: {X_clean.shape[0]:,}") |
|
|
print("Conflicts resolved using majority voting") |
|
|
else: |
|
|
X_clean = X |
|
|
y_clean = y |
|
|
print("No conflicting labels found") |
|
|
|
|
|
stats = { |
|
|
"initial_samples": int(initial_samples), |
|
|
"conflict_groups": int(n_conflict_groups), |
|
|
"conflict_samples": int(n_conflict_samples), |
|
|
"conflict_percentage": float(n_conflict_samples / initial_samples * 100), |
|
|
"final_samples": int(X_clean.shape[0]), |
|
|
} |
|
|
|
|
|
return X_clean, y_clean, stats |
|
|
|
|
|
|
|
|
def remove_sparse_samples( |
|
|
X: np.ndarray, y: np.ndarray, min_nnz: int = 10 |
|
|
) -> Tuple[np.ndarray, np.ndarray, Dict]: |
|
|
""" |
|
|
Remove samples with too few non-zero features (incompatible with SMOTE). |
|
|
|
|
|
Args: |
|
|
X: Feature matrix |
|
|
y: Label matrix |
|
|
min_nnz: Minimum number of non-zero features required |
|
|
|
|
|
Returns: |
|
|
Tuple of (X_filtered, y_filtered, statistics_dict) |
|
|
""" |
|
|
print("\n" + "=" * 80) |
|
|
print(f"STEP 3: REMOVING SPARSE SAMPLES (min_nnz={min_nnz})") |
|
|
print("=" * 80) |
|
|
|
|
|
n_initial = X.shape[0] |
|
|
print(f"Initial samples: {n_initial:,}") |
|
|
|
|
|
nnz_counts = (X != 0).sum(axis=1) |
|
|
valid_mask = nnz_counts >= min_nnz |
|
|
|
|
|
X_filtered = X[valid_mask] |
|
|
y_filtered = y[valid_mask] |
|
|
|
|
|
n_removed = n_initial - X_filtered.shape[0] |
|
|
removal_pct = (n_removed / n_initial * 100) if n_initial > 0 else 0 |
|
|
|
|
|
print(f"Sparse samples (< {min_nnz} features): {n_removed:,} ({removal_pct:.2f}%)") |
|
|
print(f"Samples after filtering: {X_filtered.shape[0]:,}") |
|
|
|
|
|
stats = { |
|
|
"initial_samples": int(n_initial), |
|
|
"min_nnz_threshold": min_nnz, |
|
|
"sparse_samples_removed": int(n_removed), |
|
|
"removal_percentage": float(removal_pct), |
|
|
"final_samples": int(X_filtered.shape[0]), |
|
|
} |
|
|
|
|
|
return X_filtered, y_filtered, stats |
|
|
|
|
|
|
|
|
def remove_empty_labels( |
|
|
X: np.ndarray, y: np.ndarray, min_count: int = 5 |
|
|
) -> Tuple[np.ndarray, np.ndarray, Dict]: |
|
|
""" |
|
|
Remove labels with too few occurrences (cannot be stratified). |
|
|
|
|
|
Args: |
|
|
X: Feature matrix |
|
|
y: Label matrix |
|
|
min_count: Minimum number of occurrences required per label |
|
|
|
|
|
Returns: |
|
|
Tuple of (X_same, y_filtered, statistics_dict) |
|
|
""" |
|
|
print("\n" + "=" * 80) |
|
|
print(f"STEP 4: REMOVING RARE LABELS (min_count={min_count})") |
|
|
print("=" * 80) |
|
|
|
|
|
n_initial_labels = y.shape[1] |
|
|
print(f"Initial labels: {n_initial_labels:,}") |
|
|
|
|
|
label_counts = y.sum(axis=0) |
|
|
valid_labels = label_counts >= min_count |
|
|
|
|
|
y_filtered = y[:, valid_labels] |
|
|
|
|
|
n_removed = n_initial_labels - y_filtered.shape[1] |
|
|
removal_pct = (n_removed / n_initial_labels * 100) if n_initial_labels > 0 else 0 |
|
|
|
|
|
print(f"Rare labels (< {min_count} occurrences): {n_removed:,} ({removal_pct:.2f}%)") |
|
|
print(f"Labels after filtering: {y_filtered.shape[1]:,}") |
|
|
|
|
|
stats = { |
|
|
"initial_labels": int(n_initial_labels), |
|
|
"min_count_threshold": min_count, |
|
|
"rare_labels_removed": int(n_removed), |
|
|
"removal_percentage": float(removal_pct), |
|
|
"final_labels": int(y_filtered.shape[1]), |
|
|
} |
|
|
|
|
|
return X, y_filtered, stats |
|
|
|
|
|
|
|
|
def create_clean_train_test_split( |
|
|
X: np.ndarray, y: np.ndarray, test_size: float = 0.2, random_state: int = 42 |
|
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, Dict]: |
|
|
""" |
|
|
Create train/test split with verification of no data leakage. |
|
|
Uses MultilabelStratifiedShuffleSplit if available. |
|
|
|
|
|
Args: |
|
|
X: Feature matrix |
|
|
y: Label matrix |
|
|
test_size: Proportion of test set (default: 0.2 = 20%) |
|
|
random_state: Random seed for reproducibility |
|
|
|
|
|
Returns: |
|
|
Tuple of (X_train, X_test, y_train, y_test, stats_dict) |
|
|
""" |
|
|
print("\n" + "=" * 80) |
|
|
print("STEP 5: CREATING CLEAN TRAIN/TEST SPLIT") |
|
|
print("=" * 80) |
|
|
|
|
|
print(f"Total samples: {X.shape[0]:,}") |
|
|
print(f"Test size: {test_size * 100:.1f}%") |
|
|
print(f"Random state: {random_state}") |
|
|
|
|
|
|
|
|
try: |
|
|
from iterstrat.ml_stratifiers import MultilabelStratifiedShuffleSplit |
|
|
|
|
|
has_iterstrat = True |
|
|
print("Using MultilabelStratifiedShuffleSplit (iterative-stratification)") |
|
|
except ImportError: |
|
|
has_iterstrat = False |
|
|
print( |
|
|
"WARNING: iterative-stratification not installed. Using standard stratification (suboptimal for multi-label)." |
|
|
) |
|
|
|
|
|
if has_iterstrat: |
|
|
msss = MultilabelStratifiedShuffleSplit( |
|
|
n_splits=1, test_size=test_size, random_state=random_state |
|
|
) |
|
|
train_index, test_index = next(msss.split(X, y)) |
|
|
X_train, X_test = X[train_index], X[test_index] |
|
|
y_train, y_test = y[train_index], y[test_index] |
|
|
else: |
|
|
|
|
|
stratify_column = y[:, 0] if y.ndim > 1 else y |
|
|
X_train, X_test, y_train, y_test = train_test_split( |
|
|
X, y, test_size=test_size, random_state=random_state, stratify=stratify_column |
|
|
) |
|
|
|
|
|
|
|
|
print("\nVerifying no data leakage...") |
|
|
|
|
|
|
|
|
train_hashes = set(pd.util.hash_pandas_object(pd.DataFrame(X_train), index=False)) |
|
|
test_hashes = set(pd.util.hash_pandas_object(pd.DataFrame(X_test), index=False)) |
|
|
|
|
|
overlap = train_hashes & test_hashes |
|
|
|
|
|
if len(overlap) > 0: |
|
|
raise ValueError( |
|
|
f"DATA LEAKAGE DETECTED: {len(overlap)} samples appear in both train and test!" |
|
|
) |
|
|
|
|
|
print("No data leakage detected") |
|
|
print(f"Train samples: {X_train.shape[0]:,} ({X_train.shape[0] / X.shape[0] * 100:.1f}%)") |
|
|
print(f"Test samples: {X_test.shape[0]:,} ({X_test.shape[0] / X.shape[0] * 100:.1f}%)") |
|
|
|
|
|
|
|
|
if X_train.shape[1] != X_test.shape[1]: |
|
|
raise ValueError( |
|
|
f"Feature dimensions don't match: train={X_train.shape[1]}, test={X_test.shape[1]}" |
|
|
) |
|
|
|
|
|
print(f"Feature dimensions match: {X_train.shape[1]:,}") |
|
|
|
|
|
stats = { |
|
|
"total_samples": int(X.shape[0]), |
|
|
"train_samples": int(X_train.shape[0]), |
|
|
"test_samples": int(X_test.shape[0]), |
|
|
"train_percentage": float(X_train.shape[0] / X.shape[0] * 100), |
|
|
"test_percentage": float(X_test.shape[0] / X.shape[0] * 100), |
|
|
"features": int(X_train.shape[1]), |
|
|
"labels": int(y_train.shape[1]) if y_train.ndim > 1 else 1, |
|
|
"data_leakage": False, |
|
|
"overlap_samples": 0, |
|
|
"stratification_method": "MultilabelStratifiedShuffleSplit" |
|
|
if has_iterstrat |
|
|
else "Standard StratifiedShuffleSplit", |
|
|
} |
|
|
|
|
|
return X_train, X_test, y_train, y_test, stats |
|
|
|
|
|
|
|
|
def save_cleaned_data( |
|
|
X_train: np.ndarray, |
|
|
X_test: np.ndarray, |
|
|
y_train: np.ndarray, |
|
|
y_test: np.ndarray, |
|
|
stats: Dict, |
|
|
output_dir: Optional[Path] = None, |
|
|
feature_type: str = "tfidf", |
|
|
) -> None: |
|
|
""" |
|
|
Save cleaned train/test split to disk. |
|
|
|
|
|
Args: |
|
|
X_train: Training features |
|
|
X_test: Test features |
|
|
y_train: Training labels |
|
|
y_test: Test labels |
|
|
stats: Dictionary with cleaning statistics |
|
|
output_dir: Output directory (default: data/processed/{feature_type}/) |
|
|
feature_type: Type of features ('tfidf' or 'embedding') |
|
|
""" |
|
|
print("\n" + "=" * 80) |
|
|
print("STEP 6: SAVING CLEANED DATA") |
|
|
print("=" * 80) |
|
|
|
|
|
if output_dir is None: |
|
|
output_dir = PROCESSED_DATA_DIR / feature_type |
|
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
files = { |
|
|
"features_train": output_dir / f"features_{feature_type}_clean.npy", |
|
|
"labels_train": output_dir / f"labels_{feature_type}_clean.npy", |
|
|
"features_test": output_dir / f"X_test_{feature_type}_clean.npy", |
|
|
"labels_test": output_dir / f"Y_test_{feature_type}_clean.npy", |
|
|
} |
|
|
|
|
|
np.save(files["features_train"], X_train) |
|
|
np.save(files["labels_train"], y_train) |
|
|
np.save(files["features_test"], X_test) |
|
|
np.save(files["labels_test"], y_test) |
|
|
|
|
|
print(f"\nSaved cleaned data to: {output_dir}") |
|
|
for name, path in files.items(): |
|
|
print(f" - {path.name}") |
|
|
|
|
|
|
|
|
def clean_and_split_data( |
|
|
test_size: float = 0.2, |
|
|
random_state: int = 42, |
|
|
regenerate_features: bool = True, |
|
|
feature_type: str = "embedding", |
|
|
model_name: str = "all-MiniLM-L6-v2", |
|
|
max_features: int = 2000, |
|
|
) -> Dict: |
|
|
""" |
|
|
Main function to clean data and create proper train/test split. |
|
|
|
|
|
This function: |
|
|
1. Loads or regenerates features (TF-IDF or Embeddings) |
|
|
2. Removes duplicate samples |
|
|
3. Resolves conflicting labels |
|
|
4. Creates clean train/test split |
|
|
5. Verifies no data leakage |
|
|
6. Saves cleaned data |
|
|
|
|
|
Args: |
|
|
test_size: Proportion of test set (default: 0.2) |
|
|
random_state: Random seed for reproducibility (default: 42) |
|
|
regenerate_features: If True, regenerate features from database (default: True) |
|
|
feature_type: Type of features to extract ('tfidf' or 'embedding') |
|
|
model_name: Model name for embeddings |
|
|
max_features: Maximum number of TF-IDF features (default: 1000) |
|
|
|
|
|
Returns: |
|
|
Dictionary with all cleaning statistics |
|
|
""" |
|
|
print("=" * 80) |
|
|
print("DATA CLEANING AND QUALITY ASSURANCE PIPELINE") |
|
|
print("=" * 80) |
|
|
print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
print(f"Test size: {test_size * 100:.1f}%") |
|
|
print(f"Random state: {random_state}") |
|
|
print(f"Regenerate features: {regenerate_features}") |
|
|
print(f"Feature type: {feature_type}") |
|
|
if feature_type == "embedding": |
|
|
print(f"Model name: {model_name}") |
|
|
else: |
|
|
print(f"Max features: {max_features}") |
|
|
|
|
|
|
|
|
if regenerate_features: |
|
|
print("\nRegenerating features from database...") |
|
|
|
|
|
from hopcroft_skill_classification_tool_competition.features import create_feature_dataset |
|
|
|
|
|
|
|
|
features, labels, _, _ = create_feature_dataset( |
|
|
save_processed=False, |
|
|
feature_type=feature_type, |
|
|
model_name=model_name, |
|
|
) |
|
|
|
|
|
X = features |
|
|
y = labels.values |
|
|
else: |
|
|
print(f"\nLoading existing features ({feature_type})...") |
|
|
data_dir = PROCESSED_DATA_DIR / feature_type |
|
|
X = np.load(data_dir / f"features_{feature_type}.npy") |
|
|
y = np.load(data_dir / f"labels_{feature_type}.npy") |
|
|
|
|
|
print("\nInitial data shape:") |
|
|
print(f" Features: {X.shape}") |
|
|
print(f" Labels: {y.shape}") |
|
|
|
|
|
|
|
|
X_no_dup, y_no_dup, dup_stats = remove_duplicates(X, y) |
|
|
|
|
|
|
|
|
X_no_conf, y_no_conf, conflict_stats = resolve_conflicting_labels(X_no_dup, y_no_dup) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if feature_type == "tfidf": |
|
|
X_no_sparse, y_no_sparse, sparse_stats = remove_sparse_samples( |
|
|
X_no_conf, y_no_conf, min_nnz=10 |
|
|
) |
|
|
else: |
|
|
|
|
|
X_no_sparse, y_no_sparse = X_no_conf, y_no_conf |
|
|
sparse_stats = {"sparse_samples_removed": 0, "removal_percentage": 0.0} |
|
|
print("\nSkipping sparse sample removal for dense embeddings.") |
|
|
|
|
|
|
|
|
X_clean, y_clean, rare_stats = remove_empty_labels(X_no_sparse, y_no_sparse, min_count=5) |
|
|
|
|
|
|
|
|
X_train, X_test, y_train, y_test, split_stats = create_clean_train_test_split( |
|
|
X_clean, y_clean, test_size=test_size, random_state=random_state |
|
|
) |
|
|
|
|
|
|
|
|
all_stats = { |
|
|
"duplicates": dup_stats, |
|
|
"conflicts": conflict_stats, |
|
|
"sparse_samples": sparse_stats, |
|
|
"rare_labels": rare_stats, |
|
|
"split": split_stats, |
|
|
"feature_type": feature_type, |
|
|
} |
|
|
|
|
|
|
|
|
output_dir = PROCESSED_DATA_DIR / feature_type |
|
|
save_cleaned_data( |
|
|
X_train, |
|
|
X_test, |
|
|
y_train, |
|
|
y_test, |
|
|
all_stats, |
|
|
output_dir=output_dir, |
|
|
feature_type=feature_type, |
|
|
) |
|
|
|
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("CLEANING PIPELINE COMPLETED SUCCESSFULLY") |
|
|
print("=" * 80) |
|
|
print("\nSummary:") |
|
|
print(f" Original samples: {X.shape[0]:,}") |
|
|
print(f" Original labels: {y.shape[1]:,}") |
|
|
print( |
|
|
f" Duplicates removed: {dup_stats['duplicates_found']:,} ({dup_stats['duplicates_percentage']:.2f}%)" |
|
|
) |
|
|
print( |
|
|
f" Conflicts resolved: {conflict_stats['conflict_samples']:,} ({conflict_stats['conflict_percentage']:.2f}%)" |
|
|
) |
|
|
print( |
|
|
f" Sparse samples removed: {sparse_stats['sparse_samples_removed']:,} ({sparse_stats['removal_percentage']:.2f}%)" |
|
|
) |
|
|
print( |
|
|
f" Rare labels removed: {rare_stats['rare_labels_removed']:,} ({rare_stats['removal_percentage']:.2f}%)" |
|
|
) |
|
|
print(f" Final clean samples: {split_stats['total_samples']:,}") |
|
|
print(f" Final clean labels: {y_clean.shape[1]:,}") |
|
|
print( |
|
|
f" Train samples: {split_stats['train_samples']:,} ({split_stats['train_percentage']:.1f}%)" |
|
|
) |
|
|
print( |
|
|
f" Test samples: {split_stats['test_samples']:,} ({split_stats['test_percentage']:.1f}%)" |
|
|
) |
|
|
print("\nData quality issues resolved:") |
|
|
print(" - Duplicates removed") |
|
|
print(" - Label conflicts resolved") |
|
|
if feature_type == "tfidf": |
|
|
print(" - Sparse samples removed") |
|
|
print(" - Rare labels removed") |
|
|
print(" - Clean train/test split created") |
|
|
print(" - No data leakage verified") |
|
|
print("=" * 80) |
|
|
|
|
|
return all_stats |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
stats = clean_and_split_data( |
|
|
test_size=0.2, |
|
|
random_state=42, |
|
|
regenerate_features=True, |
|
|
feature_type="embedding", |
|
|
model_name="all-MiniLM-L6-v2", |
|
|
) |
|
|
|