""" Data preprocessing module. Cleans raw PDSD data, engineers features per PROJECT_CONTEXT §6, and prepares for training. """ import pandas as pd import numpy as np from sklearn.preprocessing import OrdinalEncoder from salesforce_client import SUCCESS_STATUSES # ─── Feature columns per PROJECT_CONTEXT §6 ────────────────────────────────── # Direct PDSD numeric fields NUMERIC_FIELDS = [ "Negoptim__SellOutDiscountPerc__c", # sell-out discount % "Negoptim__SellOutDiscountUAmt__c", # unit discount amount "Negoptim__SellOutDiscountQtyTarget__c", # quantity target "Negoptim__SellOutSourceContribPerc__c", # supplier contribution % "Negoptim__SellOutExecutionRateTarget__c",# execution rate target "Negoptim__Perc__c", # sell-in discount % "Negoptim__UAmt__c", # sell-in unit amount "Negoptim__SellOutManagementFix__c", # services fixed management fee "Negoptim__SellOutManagementUnit__c", # services unit management fee ] # Categorical fields CATEGORICAL_FIELDS = [ "Negoptim__SellOutDiscountType__c", # sell-out mechanic (RI_XX, CR_XX, VR_XX, …) "Negoptim__Supplier__c", # supplier ID "Negoptim__NegoScope__c", # negotiation perimeter "Negoptim__BusinessUnit__c", # buyer BU (retailer) "pds_promo_agreement_type", # from PDS Header: promo type "_mechanic_family", # first 2 chars of SellOutDiscountType ] # Boolean flags from PDS Header BOOLEAN_FIELDS = [ "pds_is_sell_in", "pds_is_sell_out", "pds_is_services", ] # Historical aggregation features (computed leakage-safe in salesforce_client) HISTORICAL_FIELDS = [ "supplier_acceptance_rate", "supplier_rejection_rate", "negoscope_acceptance_rate", "mechanic_family_acceptance_rate", "supplier_pg_prior_count", "days_since_last_promo_for_supplier_pg", ] def create_label(df: pd.DataFrame) -> pd.DataFrame: """Binary label: 1 = Validated, 0 = Rejected or Requested.""" df["label"] = df["Negoptim__Status__c"].apply( lambda s: 1 if s in SUCCESS_STATUSES else 0 ) print(f"Label distribution:\n{df['label'].value_counts().to_string()}") return df def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame: """Fill missing values: median for numeric, 'Unknown' for categorical, False for booleans.""" for col in NUMERIC_FIELDS: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce") df[col] = df[col].fillna(df[col].median()) for col in CATEGORICAL_FIELDS: if col in df.columns: df[col] = df[col].fillna("Unknown").astype(str) else: df[col] = "Unknown" for col in BOOLEAN_FIELDS: if col in df.columns: df[col] = df[col].fillna(False).astype(int) else: df[col] = 0 for col in HISTORICAL_FIELDS: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0.0) else: df[col] = 0.0 return df def engineer_features(df: pd.DataFrame) -> pd.DataFrame: """Derive features from PDSD fields per PROJECT_CONTEXT §6.""" # Timing features df["Negoptim__SellOutBDate__c"] = pd.to_datetime( df.get("Negoptim__SellOutBDate__c"), errors="coerce" ) df["Negoptim__SellOutEDate__c"] = pd.to_datetime( df.get("Negoptim__SellOutEDate__c"), errors="coerce" ) df["campaign_duration_days"] = ( df["Negoptim__SellOutEDate__c"] - df["Negoptim__SellOutBDate__c"] ).dt.days df["sell_out_month"] = df["Negoptim__SellOutBDate__c"].dt.month df["sell_out_quarter"] = df["Negoptim__SellOutBDate__c"].dt.quarter # Discount depth vs gross price (uses PG.GrossPrice if available, else unit amount proxy) # PG pricing columns may be available from SOQL joins; fall back gracefully. if "pg_gross_price" in df.columns: gross = pd.to_numeric(df["pg_gross_price"], errors="coerce").replace(0, np.nan) unit_disc = pd.to_numeric(df.get("Negoptim__SellOutDiscountUAmt__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) df["discount_depth_vs_gross"] = (unit_disc / gross).clip(0, 1).fillna(0) else: df["discount_depth_vs_gross"] = ( pd.to_numeric(df.get("Negoptim__SellOutDiscountPerc__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) / 100 ).clip(0, 1) # Price tier from PG.NetPrice (bucketed into Low/Medium/High) if "pg_net_price" in df.columns: price = pd.to_numeric(df["pg_net_price"], errors="coerce").fillna(0) df["price_tier"] = pd.cut( price, bins=[0, 5, 20, 100, float("inf")], labels=[0, 1, 2, 3], right=False ).astype(float).fillna(0) else: df["price_tier"] = 0.0 # Has significant sell-in commitment df["has_sell_in_discount"] = ( pd.to_numeric(df.get("Negoptim__Perc__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) > 0 ).astype(int) # Has services component df["has_services_component"] = ( pd.to_numeric(df.get("Negoptim__ServicesDiscountFixedAmt__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) > 0 ).astype(int) engineered = [ "campaign_duration_days", "sell_out_month", "sell_out_quarter", "discount_depth_vs_gross", "price_tier", "has_sell_in_discount", "has_services_component", ] for col in engineered: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) print(f"Engineered {len(engineered)} features") return df def encode_categoricals(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]: """Ordinal-encode categorical features using a single joint encoder. Returns the DataFrame and an encoders dict with a single key ``_ordinal`` pointing to the fitted OrdinalEncoder. Unknown categories encountered at serving time are mapped to -1 (``handle_unknown='use_encoded_value'``). """ present_cats = [c for c in CATEGORICAL_FIELDS if c in df.columns] enc = OrdinalEncoder( handle_unknown="use_encoded_value", unknown_value=-1, encoded_missing_value=-1, dtype=np.float64, ) df[present_cats] = enc.fit_transform(df[present_cats].astype(str)) print(f"Encoded {len(present_cats)} categorical features (OrdinalEncoder)") return df, {"_ordinal": enc} def get_feature_columns() -> list[str]: """All feature column names used for training.""" return ( NUMERIC_FIELDS + CATEGORICAL_FIELDS + BOOLEAN_FIELDS + HISTORICAL_FIELDS + [ "campaign_duration_days", "sell_out_month", "sell_out_quarter", "discount_depth_vs_gross", "price_tier", "has_sell_in_discount", "has_services_component", ] ) def get_roi_feature_columns() -> list[str]: """Feature columns for the ROI regressor.""" return get_feature_columns() def compute_roi_label(df: pd.DataFrame) -> pd.DataFrame: """ Compute ROI-related labels from linked CPD actuals (via CommercialPlanDetail__c FK). Only rows where CPD QuantityFact is available get valid ROI labels. """ qty_target = pd.to_numeric( df.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=df.index)), errors="coerce" ).clip(lower=1) qty_fact = pd.to_numeric(df.get("cpd_qty_fact", None), errors="coerce") df["qty_achievement_ratio"] = (qty_fact / qty_target).clip(0, 5) return df def preprocess(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]: """Full preprocessing pipeline. Returns processed DataFrame + fitted encoders.""" df = create_label(df) df = handle_missing_values(df) df = engineer_features(df) df, encoders = encode_categoricals(df) return df, encoders