negoptim-promotion-predictor / data_preprocessing.py
samir12321's picture
Upload data_preprocessing.py with huggingface_hub
9a03bda verified
Raw
History Blame Contribute Delete
8.07 kB
"""
Data preprocessing module.
Cleans raw PDSD data, engineers features per PROJECT_CONTEXT §6, and prepares for training.
"""
import pandas as pd
import numpy as np
from sklearn.preprocessing import OrdinalEncoder
from salesforce_client import SUCCESS_STATUSES
# ─── Feature columns per PROJECT_CONTEXT §6 ──────────────────────────────────
# Direct PDSD numeric fields
NUMERIC_FIELDS = [
"Negoptim__SellOutDiscountPerc__c", # sell-out discount %
"Negoptim__SellOutDiscountUAmt__c", # unit discount amount
"Negoptim__SellOutDiscountQtyTarget__c", # quantity target
"Negoptim__SellOutSourceContribPerc__c", # supplier contribution %
"Negoptim__SellOutExecutionRateTarget__c",# execution rate target
"Negoptim__Perc__c", # sell-in discount %
"Negoptim__UAmt__c", # sell-in unit amount
"Negoptim__SellOutManagementFix__c", # services fixed management fee
"Negoptim__SellOutManagementUnit__c", # services unit management fee
]
# Categorical fields
CATEGORICAL_FIELDS = [
"Negoptim__SellOutDiscountType__c", # sell-out mechanic (RI_XX, CR_XX, VR_XX, …)
"Negoptim__Supplier__c", # supplier ID
"Negoptim__NegoScope__c", # negotiation perimeter
"Negoptim__BusinessUnit__c", # buyer BU (retailer)
"pds_promo_agreement_type", # from PDS Header: promo type
"_mechanic_family", # first 2 chars of SellOutDiscountType
]
# Boolean flags from PDS Header
BOOLEAN_FIELDS = [
"pds_is_sell_in",
"pds_is_sell_out",
"pds_is_services",
]
# Historical aggregation features (computed leakage-safe in salesforce_client)
HISTORICAL_FIELDS = [
"supplier_acceptance_rate",
"supplier_rejection_rate",
"negoscope_acceptance_rate",
"mechanic_family_acceptance_rate",
"supplier_pg_prior_count",
"days_since_last_promo_for_supplier_pg",
]
def create_label(df: pd.DataFrame) -> pd.DataFrame:
"""Binary label: 1 = Validated, 0 = Rejected or Requested."""
df["label"] = df["Negoptim__Status__c"].apply(
lambda s: 1 if s in SUCCESS_STATUSES else 0
)
print(f"Label distribution:\n{df['label'].value_counts().to_string()}")
return df
def handle_missing_values(df: pd.DataFrame) -> pd.DataFrame:
"""Fill missing values: median for numeric, 'Unknown' for categorical, False for booleans."""
for col in NUMERIC_FIELDS:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
df[col] = df[col].fillna(df[col].median())
for col in CATEGORICAL_FIELDS:
if col in df.columns:
df[col] = df[col].fillna("Unknown").astype(str)
else:
df[col] = "Unknown"
for col in BOOLEAN_FIELDS:
if col in df.columns:
df[col] = df[col].fillna(False).astype(int)
else:
df[col] = 0
for col in HISTORICAL_FIELDS:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0.0)
else:
df[col] = 0.0
return df
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Derive features from PDSD fields per PROJECT_CONTEXT §6."""
# Timing features
df["Negoptim__SellOutBDate__c"] = pd.to_datetime(
df.get("Negoptim__SellOutBDate__c"), errors="coerce"
)
df["Negoptim__SellOutEDate__c"] = pd.to_datetime(
df.get("Negoptim__SellOutEDate__c"), errors="coerce"
)
df["campaign_duration_days"] = (
df["Negoptim__SellOutEDate__c"] - df["Negoptim__SellOutBDate__c"]
).dt.days
df["sell_out_month"] = df["Negoptim__SellOutBDate__c"].dt.month
df["sell_out_quarter"] = df["Negoptim__SellOutBDate__c"].dt.quarter
# Discount depth vs gross price (uses PG.GrossPrice if available, else unit amount proxy)
# PG pricing columns may be available from SOQL joins; fall back gracefully.
if "pg_gross_price" in df.columns:
gross = pd.to_numeric(df["pg_gross_price"], errors="coerce").replace(0, np.nan)
unit_disc = pd.to_numeric(df.get("Negoptim__SellOutDiscountUAmt__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0)
df["discount_depth_vs_gross"] = (unit_disc / gross).clip(0, 1).fillna(0)
else:
df["discount_depth_vs_gross"] = (
pd.to_numeric(df.get("Negoptim__SellOutDiscountPerc__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) / 100
).clip(0, 1)
# Price tier from PG.NetPrice (bucketed into Low/Medium/High)
if "pg_net_price" in df.columns:
price = pd.to_numeric(df["pg_net_price"], errors="coerce").fillna(0)
df["price_tier"] = pd.cut(
price, bins=[0, 5, 20, 100, float("inf")],
labels=[0, 1, 2, 3], right=False
).astype(float).fillna(0)
else:
df["price_tier"] = 0.0
# Has significant sell-in commitment
df["has_sell_in_discount"] = (
pd.to_numeric(df.get("Negoptim__Perc__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) > 0
).astype(int)
# Has services component
df["has_services_component"] = (
pd.to_numeric(df.get("Negoptim__ServicesDiscountFixedAmt__c", pd.Series(0, index=df.index)), errors="coerce").fillna(0) > 0
).astype(int)
engineered = [
"campaign_duration_days", "sell_out_month", "sell_out_quarter",
"discount_depth_vs_gross", "price_tier",
"has_sell_in_discount", "has_services_component",
]
for col in engineered:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
print(f"Engineered {len(engineered)} features")
return df
def encode_categoricals(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
"""Ordinal-encode categorical features using a single joint encoder.
Returns the DataFrame and an encoders dict with a single key ``_ordinal``
pointing to the fitted OrdinalEncoder. Unknown categories encountered at
serving time are mapped to -1 (``handle_unknown='use_encoded_value'``).
"""
present_cats = [c for c in CATEGORICAL_FIELDS if c in df.columns]
enc = OrdinalEncoder(
handle_unknown="use_encoded_value",
unknown_value=-1,
encoded_missing_value=-1,
dtype=np.float64,
)
df[present_cats] = enc.fit_transform(df[present_cats].astype(str))
print(f"Encoded {len(present_cats)} categorical features (OrdinalEncoder)")
return df, {"_ordinal": enc}
def get_feature_columns() -> list[str]:
"""All feature column names used for training."""
return (
NUMERIC_FIELDS
+ CATEGORICAL_FIELDS
+ BOOLEAN_FIELDS
+ HISTORICAL_FIELDS
+ [
"campaign_duration_days",
"sell_out_month",
"sell_out_quarter",
"discount_depth_vs_gross",
"price_tier",
"has_sell_in_discount",
"has_services_component",
]
)
def get_roi_feature_columns() -> list[str]:
"""Feature columns for the ROI regressor."""
return get_feature_columns()
def compute_roi_label(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute ROI-related labels from linked CPD actuals (via CommercialPlanDetail__c FK).
Only rows where CPD QuantityFact is available get valid ROI labels.
"""
qty_target = pd.to_numeric(
df.get("Negoptim__SellOutDiscountQtyTarget__c", pd.Series(0, index=df.index)), errors="coerce"
).clip(lower=1)
qty_fact = pd.to_numeric(df.get("cpd_qty_fact", None), errors="coerce")
df["qty_achievement_ratio"] = (qty_fact / qty_target).clip(0, 5)
return df
def preprocess(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
"""Full preprocessing pipeline. Returns processed DataFrame + fitted encoders."""
df = create_label(df)
df = handle_missing_values(df)
df = engineer_features(df)
df, encoders = encode_categoricals(df)
return df, encoders