"""Training script for salary prediction model.""" import pickle from pathlib import Path import pandas as pd import numpy as np import yaml from xgboost import XGBRegressor from sklearn.model_selection import KFold, train_test_split from src.preprocessing import prepare_features, reduce_cardinality CATEGORICAL_FEATURES = [ "Country", "EdLevel", "DevType", "Industry", "Age", "ICorPM", "OrgSize", "Employment", ] def filter_salaries(df: pd.DataFrame, config: dict) -> pd.DataFrame: """Filter rows by minimum salary and per-country percentile outlier removal. Args: df: DataFrame with ConvertedCompYearly and Country columns. config: Config dict with data.min_salary, data.lower_percentile, data.upper_percentile. Returns: Filtered DataFrame with outliers removed. """ main_label = "ConvertedCompYearly" min_salary = config["data"]["min_salary"] df = df[df[main_label] > min_salary] lower_pct = config["data"]["lower_percentile"] / 100 upper_pct = config["data"]["upper_percentile"] / 100 lower_bound = df.groupby("Country")[main_label].transform("quantile", lower_pct) upper_bound = df.groupby("Country")[main_label].transform("quantile", upper_pct) df = df[(df[main_label] > lower_bound) & (df[main_label] < upper_bound)] df = df.dropna(subset=[main_label]) return df def apply_cardinality_reduction(df: pd.DataFrame) -> pd.DataFrame: """Apply cardinality reduction and unicode normalization to categorical columns. Args: df: DataFrame with categorical feature columns. Returns: DataFrame with reduced cardinality categories. """ df = df.copy() for col in CATEGORICAL_FEATURES: df[col] = df[col].str.replace("\u2019", "'", regex=False) df[col] = reduce_cardinality(df[col]) return df def drop_other_rows(df: pd.DataFrame, config: dict) -> pd.DataFrame: """Drop rows where specified features have the 'Other' catch-all category. Args: df: DataFrame with categorical feature columns (after cardinality reduction). config: Config dict with features.cardinality.other_category and features.cardinality.drop_other_from. Returns: DataFrame with 'Other' rows removed from specified features. """ other_name = config["features"]["cardinality"].get("other_category", "Other") drop_other_from = config["features"]["cardinality"].get("drop_other_from", []) for col in drop_other_from: df = df[df[col] != other_name] return df def extract_valid_categories(df: pd.DataFrame) -> dict: """Extract sorted unique values per categorical feature for inference validation. Args: df: DataFrame with categorical feature columns (after cardinality reduction). Returns: Dict mapping feature name to sorted list of valid category values. """ return { col: sorted(df[col].dropna().unique().tolist()) for col in CATEGORICAL_FEATURES } def compute_currency_rates(df: pd.DataFrame, valid_countries: list[str]) -> dict: """Compute median currency conversion rates per country. Args: df: DataFrame with Country, Currency, CompTotal, ConvertedCompYearly columns. valid_countries: List of valid country names to compute rates for. Returns: Dict mapping country to {code, name, rate}. """ main_label = "ConvertedCompYearly" currency_df = df[["Country", "Currency", "CompTotal", main_label]].dropna() currency_df = currency_df.copy() currency_df["CurrencyCode"] = currency_df["Currency"].str.split(r"\s+", n=1).str[0] currency_df["CurrencyName"] = currency_df["Currency"].str.split(r"\s+", n=1).str[1] currency_df["rate"] = currency_df["CompTotal"] / currency_df[main_label] currency_df = currency_df[ (currency_df["rate"] > 0.001) & (currency_df["rate"] < 100000) ] currency_rates = {} for country in valid_countries: country_data = currency_df[currency_df["Country"] == country] if country_data.empty: continue most_common = country_data["CurrencyCode"].mode() if most_common.empty: continue code = most_common.iloc[0] name_row = country_data[country_data["CurrencyCode"] == code].iloc[0] full_name = name_row["CurrencyName"] rates = country_data[country_data["CurrencyCode"] == code]["rate"] median_rate = round(float(rates.median()), 2) currency_rates[country] = { "code": code, "name": full_name, "rate": median_rate, } return currency_rates def main(): """Train and save the salary prediction model.""" # Load configuration print("Loading configuration...") config_path = Path("config/model_parameters.yaml") with open(config_path, "r") as f: config = yaml.safe_load(f) print("Loading data...") data_path = Path("data/survey_results_public.csv") if not data_path.exists(): print(f"Error: Data file not found at {data_path}") print( "Please download the Stack Overflow Developer Survey CSV and place it in the data/ directory." ) print("Download from: https://insights.stackoverflow.com/survey") return # Load only required columns to save memory df = pd.read_csv( data_path, usecols=[ "Country", "YearsCode", "WorkExp", "EdLevel", "DevType", "Industry", "Age", "ICorPM", "OrgSize", "Employment", "Currency", "CompTotal", "ConvertedCompYearly", ], ) print(f"Loaded {len(df):,} rows") print("Removing null, extremely small and large reported salaries") df = filter_salaries(df, config) print(f"After filtering: {len(df):,} rows") # Apply cardinality reduction to a copy (for valid_categories extraction) # and to the main df (for training) df_copy = apply_cardinality_reduction(df) df = apply_cardinality_reduction(df) # Drop rows with "Other" in specified features (low-quality catch-all categories) before_drop = len(df) df = drop_other_rows(df, config) df_copy = drop_other_rows(df_copy, config) drop_other_from = config["features"]["cardinality"].get("drop_other_from", []) if drop_other_from: print( f"Dropped {before_drop - len(df):,} rows with 'Other' in {drop_other_from}" ) print(f"After dropping 'Other': {len(df):,} rows") # Now apply full feature transformations for model training main_label = "ConvertedCompYearly" X = prepare_features(df) y = df[main_label] # Save valid categories after cardinality reduction for validation during inference valid_categories = extract_valid_categories(df_copy) valid_categories_path = Path("config/valid_categories.yaml") with open(valid_categories_path, "w") as f: yaml.dump(valid_categories, f, default_flow_style=False, sort_keys=False) print( f"\nSaved {len(valid_categories['Country'])} valid countries, {len(valid_categories['EdLevel'])} valid education levels, {len(valid_categories['DevType'])} valid developer types, {len(valid_categories['Industry'])} valid industries, {len(valid_categories['Age'])} valid age ranges, and {len(valid_categories['ICorPM'])} valid IC/PM values to {valid_categories_path}" ) # Compute currency conversion rates per country print("\nComputing currency conversion rates per country...") currency_rates = compute_currency_rates(df, valid_categories["Country"]) currency_rates_path = Path("config/currency_rates.yaml") with open(currency_rates_path, "w") as f: yaml.dump( currency_rates, f, default_flow_style=False, sort_keys=True, allow_unicode=True, ) print( f"Saved currency rates for {len(currency_rates)} countries to {currency_rates_path}" ) for country, info in sorted(currency_rates.items()): print( f" {country:45s} -> {info['code']} ({info['name']}, rate: {info['rate']})" ) print(f"\nFeature matrix shape: {X.shape}") print(f"Total features: {X.shape[1]}") # Display feature information for debugging and inference comparison print("\n" + "=" * 60) print("FEATURE ANALYSIS (for comparing with inference)") print("=" * 60) # Show top countries in the dataset print("\nšŸ“ Top 10 Countries:") top_countries = df["Country"].value_counts().head(10) for country, count in top_countries.items(): print(f" - {country}: {count:,} ({count / len(df) * 100:.1f}%)") # Show top education levels print("\nšŸŽ“ Top Education Levels:") top_edu = df["EdLevel"].value_counts().head(10) for edu, count in top_edu.items(): print(f" - {edu}: {count:,} ({count / len(df) * 100:.1f}%)") # Show top developer types print("\nšŸ‘Øā€šŸ’» Top Developer Types:") top_devtype = df["DevType"].value_counts().head(10) for devtype, count in top_devtype.items(): print(f" - {devtype}: {count:,} ({count / len(df) * 100:.1f}%)") # Show top industries print("\nšŸ¢ Top Industries:") top_industry = df["Industry"].value_counts().head(10) for industry, count in top_industry.items(): print(f" - {industry}: {count:,} ({count / len(df) * 100:.1f}%)") # Show age distribution print("\nšŸŽ‚ Age Distribution:") top_age = df["Age"].value_counts().head(10) for age, count in top_age.items(): print(f" - {age}: {count:,} ({count / len(df) * 100:.1f}%)") # Show IC or PM distribution print("\nšŸ‘„ IC or PM Distribution:") top_icorpm = df["ICorPM"].value_counts().head(10) for icorpm, count in top_icorpm.items(): print(f" - {icorpm}: {count:,} ({count / len(df) * 100:.1f}%)") # Show employment distribution print("\nšŸ’¼ Employment Distribution:") top_employment = df["Employment"].value_counts() for emp, count in top_employment.items(): print(f" - {emp}: {count:,} ({count / len(df) * 100:.1f}%)") # Show YearsCode statistics print("\nšŸ’¼ Years of Coding Experience:") print(f" - Min: {df['YearsCode'].min():.1f}") print(f" - Max: {df['YearsCode'].max():.1f}") print(f" - Mean: {df['YearsCode'].mean():.1f}") print(f" - Median: {df['YearsCode'].median():.1f}") print(f" - 25th percentile: {df['YearsCode'].quantile(0.25):.1f}") print(f" - 75th percentile: {df['YearsCode'].quantile(0.75):.1f}") # Show WorkExp statistics print("\nšŸ’¼ Years of Professional Work Experience:") print(f" - Min: {df['WorkExp'].min():.1f}") print(f" - Max: {df['WorkExp'].max():.1f}") print(f" - Mean: {df['WorkExp'].mean():.1f}") print(f" - Median: {df['WorkExp'].median():.1f}") print(f" - 25th percentile: {df['WorkExp'].quantile(0.25):.1f}") print(f" - 75th percentile: {df['WorkExp'].quantile(0.75):.1f}") # Show most common one-hot encoded features (by frequency) # Separate analysis for each categorical feature # Calculate feature frequencies (sum of each column for one-hot encoded) feature_counts = X.sum().sort_values(ascending=False) # Exclude numeric features (YearsCode) categorical_features = feature_counts[ ~feature_counts.index.str.startswith("YearsCode") ] # Country features print("\nšŸŒ Top 15 Country Features (most common):") country_features = categorical_features[ categorical_features.index.str.startswith("Country_") ] for i, (feature, count) in enumerate(country_features.head(15).items(), 1): percentage = (count / len(X)) * 100 country_name = feature.replace("Country_", "") print( f" {i:2d}. {country_name:45s} - {count:6.0f} occurrences ({percentage:5.1f}%)" ) # Education level features print("\nšŸŽ“ Top 10 Education Level Features (most common):") edlevel_features = categorical_features[ categorical_features.index.str.startswith("EdLevel_") ] for i, (feature, count) in enumerate(edlevel_features.head(10).items(), 1): percentage = (count / len(X)) * 100 edu_name = feature.replace("EdLevel_", "") print( f" {i:2d}. {edu_name:45s} - {count:6.0f} occurrences ({percentage:5.1f}%)" ) # Developer type features print("\nšŸ‘Øā€šŸ’» Top 10 Developer Type Features (most common):") devtype_features = categorical_features[ categorical_features.index.str.startswith("DevType_") ] for i, (feature, count) in enumerate(devtype_features.head(10).items(), 1): percentage = (count / len(X)) * 100 devtype_name = feature.replace("DevType_", "") print( f" {i:2d}. {devtype_name:45s} - {count:6.0f} occurrences ({percentage:5.1f}%)" ) # Industry features print("\nšŸ¢ Top 10 Industry Features (most common):") industry_features = categorical_features[ categorical_features.index.str.startswith("Industry_") ] for i, (feature, count) in enumerate(industry_features.head(10).items(), 1): percentage = (count / len(X)) * 100 industry_name = feature.replace("Industry_", "") print( f" {i:2d}. {industry_name:45s} - {count:6.0f} occurrences ({percentage:5.1f}%)" ) # Age features print("\nšŸŽ‚ Top 10 Age Features (most common):") age_features = categorical_features[ categorical_features.index.str.startswith("Age_") ] for i, (feature, count) in enumerate(age_features.head(10).items(), 1): percentage = (count / len(X)) * 100 age_name = feature.replace("Age_", "") print( f" {i:2d}. {age_name:45s} - {count:6.0f} occurrences ({percentage:5.1f}%)" ) # ICorPM features print("\nšŸ‘„ Top 10 IC/PM Features (most common):") icorpm_features = categorical_features[ categorical_features.index.str.startswith("ICorPM_") ] for i, (feature, count) in enumerate(icorpm_features.head(10).items(), 1): percentage = (count / len(X)) * 100 icorpm_name = feature.replace("ICorPM_", "") print( f" {i:2d}. {icorpm_name:45s} - {count:6.0f} occurrences ({percentage:5.1f}%)" ) print(f"\nšŸ“Š Total one-hot encoded features: {len(X.columns)}") print(" - Numeric: 2 (YearsCode, WorkExp)") print(f" - Country: {len(country_features)}") print(f" - Education: {len(edlevel_features)}") print(f" - DevType: {len(devtype_features)}") print(f" - Industry: {len(industry_features)}") print(f" - Age: {len(age_features)}") print(f" - ICorPM: {len(icorpm_features)}") print("=" * 60 + "\n") # Cross-validation for robust evaluation n_splits = config["data"].get("cv_splits", 5) random_state = config["data"]["random_state"] model_config = config["model"] print(f"Running {n_splits}-fold cross-validation...") kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state) train_scores = [] test_scores = [] best_iterations = [] for fold, (train_idx, test_idx) in enumerate(kf.split(X), 1): X_train, X_test = X.iloc[train_idx], X.iloc[test_idx] y_train, y_test = y.iloc[train_idx], y.iloc[test_idx] model = XGBRegressor( n_estimators=model_config["n_estimators"], learning_rate=model_config["learning_rate"], max_depth=model_config["max_depth"], min_child_weight=model_config["min_child_weight"], random_state=model_config["random_state"], n_jobs=model_config["n_jobs"], early_stopping_rounds=model_config["early_stopping_rounds"], ) model.fit( X_train, y_train, eval_set=[(X_test, y_test)], verbose=False, ) train_mape = np.mean(np.abs((y_train - model.predict(X_train)) / y_train)) * 100 test_mape = np.mean(np.abs((y_test - model.predict(X_test)) / y_test)) * 100 train_scores.append(train_mape) test_scores.append(test_mape) best_iterations.append(model.best_iteration + 1) print( f" Fold {fold}: Train MAPE = {train_mape:.2f}%, Test MAPE = {test_mape:.2f}% (best iter: {model.best_iteration + 1})" ) avg_train = np.mean(train_scores) avg_test = np.mean(test_scores) std_test = np.std(test_scores) avg_best_iter = int(np.mean(best_iterations)) print(f"\nCV Average Train MAPE: {avg_train:.2f}%") print(f"CV Average Test MAPE: {avg_test:.2f}% (+/- {std_test:.2f}%)") print(f"CV Average best iteration: {avg_best_iter}") # Train final model on all data for deployment # Use a small held-out split for early stopping only print("\nTraining final model on full dataset...") X_train_final, X_es, y_train_final, y_es = train_test_split( X, y, test_size=0.1, random_state=random_state ) final_model = XGBRegressor( n_estimators=model_config["n_estimators"], learning_rate=model_config["learning_rate"], max_depth=model_config["max_depth"], min_child_weight=model_config["min_child_weight"], random_state=model_config["random_state"], n_jobs=model_config["n_jobs"], early_stopping_rounds=model_config["early_stopping_rounds"], ) final_model.fit( X_train_final, y_train_final, eval_set=[(X_es, y_es)], verbose=config["training"]["verbose"], ) print(f"Final model best iteration: {final_model.best_iteration + 1}") # Save model and feature columns for inference model_path = Path(config["training"]["model_path"]) model_path.parent.mkdir(parents=True, exist_ok=True) artifacts = { "model": final_model, "feature_columns": list(X.columns), } with open(model_path, "wb") as f: pickle.dump(artifacts, f) print(f"Model saved to {model_path}") if __name__ == "__main__": main()