import pandas as pd import numpy as np import joblib from sklearn.experimental import enable_iterative_imputer from sklearn.impute import IterativeImputer from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_absolute_error, r2_score class RandomForestInsuranceModel: """ A Random Forest regressor class with: 1. Data loading & cleaning (iterative imputation, outlier clipping) 2. A fixed set of hyperparameters (n_estimators=100, max_depth=4, min_samples_split=15) 3. A ColumnTransformer for numeric & categorical data 4. Consistent API: preprocessing, predict, postprocessing """ def __init__(self, csv_path): """ Loads the CSV, cleans data, sets up the column transformer, trains a RandomForestRegressor with fixed hyperparameters, and evaluates on a test set. """ # ----------------------------------------------------- # 1. Load and clean data # ----------------------------------------------------- df = pd.read_csv(csv_path) # Drop irrelevant columns if present, remove any leftover NaNs df = df.drop(columns=["index", "PatientID"], errors="ignore").dropna() # Apply iterative imputation for the specified columns self._impute(df, columns=['age', 'bmi', 'bloodpressure']) # Clip outliers in 'claim' (1st to 98th percentile) lower_percentile = df['claim'].quantile(0.01) upper_percentile = df['claim'].quantile(0.98) df = df[ (df['claim'] >= lower_percentile) & (df['claim'] <= upper_percentile) ] # ----------------------------------------------------- # 2. Separate features & target # ----------------------------------------------------- features = df.drop(columns=['claim']) target = df['claim'].values # or df['claim'].to_numpy() # ----------------------------------------------------- # 3. Create ColumnTransformer # ----------------------------------------------------- text_pipeline = Pipeline([ ('one-hot', OneHotEncoder(handle_unknown='ignore')) ]) nums_pipeline = Pipeline([ ('normalize', StandardScaler(with_mean=False)) ]) self.ct = ColumnTransformer([ ('categorical', text_pipeline, ['diabetic', 'gender', 'region', 'smoker']), ('numerical', nums_pipeline, ['children', 'age', 'bmi', 'bloodpressure']) ]) # Fit the ColumnTransformer on the entire dataset X_full_transformed = self.ct.fit_transform(features) # ----------------------------------------------------- # 4. Train/test split # ----------------------------------------------------- X_train, X_test, y_train, y_test = train_test_split( X_full_transformed, target, test_size=0.2, random_state=42 ) # ----------------------------------------------------- # 5. RandomForest with fixed hyperparameters # ----------------------------------------------------- self.model = RandomForestRegressor( n_estimators=100, max_depth=4, min_samples_split=15, random_state=42 ) self.model.fit(X_train, y_train) # ----------------------------------------------------- # 6. Evaluate # ----------------------------------------------------- mae, r2 = self._evaluate(X_test, y_test) print(f"[RANDOM FOREST] Test MAE: {mae:.3f}") print(f"[RANDOM FOREST] Test R^2: {r2:.3f}") # ------------------------------------------- # Private: iterative imputation # ------------------------------------------- def _impute(self, df, columns): imp = IterativeImputer(max_iter=5, verbose=2) arr = imp.fit_transform(df[columns]) df[columns] = arr # ------------------------------------------- # Private: evaluation # ------------------------------------------- def _evaluate(self, X_test, y_test): y_pred = self.model.predict(X_test) mae = mean_absolute_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) return mae, r2 # ------------------------------------------- # Public: preprocessing # ------------------------------------------- def preprocessing(self, raw_df): """ Takes a new DataFrame with the columns the pipeline expects, and returns the transformed matrix. """ return self.ct.transform(raw_df) # ------------------------------------------- # Public: predict # ------------------------------------------- def predict(self, preprocessed_data): """ Takes feature data already processed by `preprocessing`, returns predictions in the original claim scale. """ preds = self.model.predict(preprocessed_data) return self.postprocessing(preds) # ------------------------------------------- # Public: postprocessing # ------------------------------------------- def postprocessing(self, preds): """ Currently a pass-through, as there's no target scaling to invert. """ return preds if __name__ == "__main__": # Instantiate and train on "cleaned_insurance_data.csv" rf_model = RandomForestInsuranceModel("cleaned_insurance_data.csv") # Export the entire trained class instance (including the ColumnTransformer) joblib.dump(rf_model, "RandomForestInsuranceModel.joblib") print("Exported RandomForestInsuranceModel to RandomForestInsuranceModel.joblib")