| """ |
| Data preprocessing module. |
| Cleans raw PDSD data, engineers features per PROJECT_CONTEXT §6, and prepares for training. |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| from sklearn.preprocessing import OrdinalEncoder |
|
|
| from salesforce_client import SUCCESS_STATUSES |
|
|
|
|
| |
|
|
| |
| NUMERIC_FIELDS = [ |
| "Negoptim__SellOutDiscountPerc__c", |
| "Negoptim__SellOutDiscountUAmt__c", |
| "Negoptim__SellOutDiscountQtyTarget__c", |
| "Negoptim__SellOutSourceContribPerc__c", |
| "Negoptim__SellOutExecutionRateTarget__c", |
| "Negoptim__Perc__c", |
| "Negoptim__UAmt__c", |
| "Negoptim__SellOutManagementFix__c", |
| "Negoptim__SellOutManagementUnit__c", |
| ] |
|
|
| |
| CATEGORICAL_FIELDS = [ |
| "Negoptim__SellOutDiscountType__c", |
| "Negoptim__Supplier__c", |
| "Negoptim__NegoScope__c", |
| "Negoptim__BusinessUnit__c", |
| "pds_promo_agreement_type", |
| "_mechanic_family", |
| ] |
|
|
| |
| BOOLEAN_FIELDS = [ |
| "pds_is_sell_in", |
| "pds_is_sell_out", |
| "pds_is_services", |
| ] |
|
|
| |
| HISTORICAL_FIELDS = [ |
| "supplier_acceptance_rate", |
| "supplier_rejection_rate", |
| "negoscope_acceptance_rate", |
| "mechanic_family_acceptance_rate", |
| "supplier_pg_prior_count", |
| "days_since_last_promo_for_supplier_pg", |
| ] |
|
|
|
|
| def create_label(df: pd.DataFrame) -> pd.DataFrame: |
| """Binary label: 1 = Validated, 0 = Rejected or Requested.""" |
| df["label"] = df["Negoptim__Status__c"].apply( |
| lambda s: 1 if s in SUCCESS_STATUSES else 0 |
| ) |
| print(f"Label distribution:\n{df['label'].value_counts().to_string()}") |
| return df |
|
|
|
|
| def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame: |
| """Fill missing values: median for numeric, 'Unknown' for categorical, False for booleans.""" |
| for col in NUMERIC_FIELDS: |
| if col in df.columns: |
| df[col] = pd.to_numeric(df[col], errors="coerce") |
| df[col] = df[col].fillna(df[col].median()) |
|
|
| for col in CATEGORICAL_FIELDS: |
| if col in df.columns: |
| df[col] = df[col].fillna("Unknown").astype(str) |
| else: |
| df[col] = "Unknown" |
|
|
| for col in BOOLEAN_FIELDS: |
| if col in df.columns: |
| df[col] = df[col].fillna(False).astype(int) |
| else: |
| df[col] = 0 |
|
|
| for col in HISTORICAL_FIELDS: |
| if col in df.columns: |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0.0) |
| else: |
| df[col] = 0.0 |
|
|
| return df |
|
|
|
|
| def engineer_features(df: pd.DataFrame) -> pd.DataFrame: |
| """Derive features from PDSD fields per PROJECT_CONTEXT §6.""" |
| |
| df["Negoptim__SellOutBDate__c"] = pd.to_datetime( |
| df.get("Negoptim__SellOutBDate__c"), errors="coerce" |
| ) |
| df["Negoptim__SellOutEDate__c"] = pd.to_datetime( |
| df.get("Negoptim__SellOutEDate__c"), errors="coerce" |
| ) |
| df["campaign_duration_days"] = ( |
| df["Negoptim__SellOutEDate__c"] - df["Negoptim__SellOutBDate__c"] |
| ).dt.days |
| df["sell_out_month"] = df["Negoptim__SellOutBDate__c"].dt.month |
| df["sell_out_quarter"] = df["Negoptim__SellOutBDate__c"].dt.quarter |
|
|
| |
| |
| if "pg_gross_price" in df.columns: |
| gross = pd.to_numeric(df["pg_gross_price"], errors="coerce").replace(0, np.nan) |
| unit_disc = pd.to_numeric(df.get("Negoptim__SellOutDiscountUAmt__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) |
| df["discount_depth_vs_gross"] = (unit_disc / gross).clip(0, 1).fillna(0) |
| else: |
| df["discount_depth_vs_gross"] = ( |
| pd.to_numeric(df.get("Negoptim__SellOutDiscountPerc__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) / 100 |
| ).clip(0, 1) |
|
|
| |
| if "pg_net_price" in df.columns: |
| price = pd.to_numeric(df["pg_net_price"], errors="coerce").fillna(0) |
| df["price_tier"] = pd.cut( |
| price, bins=[0, 5, 20, 100, float("inf")], |
| labels=[0, 1, 2, 3], right=False |
| ).astype(float).fillna(0) |
| else: |
| df["price_tier"] = 0.0 |
|
|
| |
| df["has_sell_in_discount"] = ( |
| pd.to_numeric(df.get("Negoptim__Perc__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) > 0 |
| ).astype(int) |
|
|
| |
| df["has_services_component"] = ( |
| pd.to_numeric(df.get("Negoptim__ServicesDiscountFixedAmt__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) > 0 |
| ).astype(int) |
|
|
| engineered = [ |
| "campaign_duration_days", "sell_out_month", "sell_out_quarter", |
| "discount_depth_vs_gross", "price_tier", |
| "has_sell_in_discount", "has_services_component", |
| ] |
| for col in engineered: |
| if col in df.columns: |
| df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) |
|
|
| print(f"Engineered {len(engineered)} features") |
| return df |
|
|
|
|
| def encode_categoricals(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]: |
| """Ordinal-encode categorical features using a single joint encoder. |
| |
| Returns the DataFrame and an encoders dict with a single key ``_ordinal`` |
| pointing to the fitted OrdinalEncoder. Unknown categories encountered at |
| serving time are mapped to -1 (``handle_unknown='use_encoded_value'``). |
| """ |
| present_cats = [c for c in CATEGORICAL_FIELDS if c in df.columns] |
| enc = OrdinalEncoder( |
| handle_unknown="use_encoded_value", |
| unknown_value=-1, |
| encoded_missing_value=-1, |
| dtype=np.float64, |
| ) |
| df[present_cats] = enc.fit_transform(df[present_cats].astype(str)) |
| print(f"Encoded {len(present_cats)} categorical features (OrdinalEncoder)") |
| return df, {"_ordinal": enc} |
|
|
|
|
| def get_feature_columns() -> list[str]: |
| """All feature column names used for training.""" |
| return ( |
| NUMERIC_FIELDS |
| + CATEGORICAL_FIELDS |
| + BOOLEAN_FIELDS |
| + HISTORICAL_FIELDS |
| + [ |
| "campaign_duration_days", |
| "sell_out_month", |
| "sell_out_quarter", |
| "discount_depth_vs_gross", |
| "price_tier", |
| "has_sell_in_discount", |
| "has_services_component", |
| ] |
| ) |
|
|
|
|
| def get_roi_feature_columns() -> list[str]: |
| """Feature columns for the ROI regressor.""" |
| return get_feature_columns() |
|
|
|
|
| def compute_roi_label(df: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Compute ROI-related labels from linked CPD actuals (via CommercialPlanDetail__c FK). |
| Only rows where CPD QuantityFact is available get valid ROI labels. |
| """ |
| qty_target = pd.to_numeric( |
| df.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=df.index)), errors="coerce" |
| ).clip(lower=1) |
| qty_fact = pd.to_numeric(df.get("cpd_qty_fact", None), errors="coerce") |
|
|
| df["qty_achievement_ratio"] = (qty_fact / qty_target).clip(0, 5) |
| return df |
|
|
|
|
| def preprocess(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]: |
| """Full preprocessing pipeline. Returns processed DataFrame + fitted encoders.""" |
| df = create_label(df) |
| df = handle_missing_values(df) |
| df = engineer_features(df) |
| df, encoders = encode_categoricals(df) |
| return df, encoders |
|
|