Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import joblib | |
| from sklearn.experimental import enable_iterative_imputer | |
| from sklearn.impute import IterativeImputer | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import OneHotEncoder, StandardScaler | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.metrics import mean_absolute_error, r2_score | |
| class RandomForestInsuranceModel: | |
| """ | |
| A Random Forest regressor class with: | |
| 1. Data loading & cleaning (iterative imputation, outlier clipping) | |
| 2. A fixed set of hyperparameters (n_estimators=100, max_depth=4, min_samples_split=15) | |
| 3. A ColumnTransformer for numeric & categorical data | |
| 4. Consistent API: preprocessing, predict, postprocessing | |
| """ | |
| def __init__(self, csv_path): | |
| """ | |
| Loads the CSV, cleans data, sets up the column transformer, | |
| trains a RandomForestRegressor with fixed hyperparameters, | |
| and evaluates on a test set. | |
| """ | |
| # ----------------------------------------------------- | |
| # 1. Load and clean data | |
| # ----------------------------------------------------- | |
| df = pd.read_csv(csv_path) | |
| # Drop irrelevant columns if present, remove any leftover NaNs | |
| df = df.drop(columns=["index", "PatientID"], errors="ignore").dropna() | |
| # Apply iterative imputation for the specified columns | |
| self._impute(df, columns=['age', 'bmi', 'bloodpressure']) | |
| # Clip outliers in 'claim' (1st to 98th percentile) | |
| lower_percentile = df['claim'].quantile(0.01) | |
| upper_percentile = df['claim'].quantile(0.98) | |
| df = df[ | |
| (df['claim'] >= lower_percentile) & (df['claim'] <= upper_percentile) | |
| ] | |
| # ----------------------------------------------------- | |
| # 2. Separate features & target | |
| # ----------------------------------------------------- | |
| features = df.drop(columns=['claim']) | |
| target = df['claim'].values # or df['claim'].to_numpy() | |
| # ----------------------------------------------------- | |
| # 3. Create ColumnTransformer | |
| # ----------------------------------------------------- | |
| text_pipeline = Pipeline([ | |
| ('one-hot', OneHotEncoder(handle_unknown='ignore')) | |
| ]) | |
| nums_pipeline = Pipeline([ | |
| ('normalize', StandardScaler(with_mean=False)) | |
| ]) | |
| self.ct = ColumnTransformer([ | |
| ('categorical', text_pipeline, ['diabetic', 'gender', 'region', 'smoker']), | |
| ('numerical', nums_pipeline, ['children', 'age', 'bmi', 'bloodpressure']) | |
| ]) | |
| # Fit the ColumnTransformer on the entire dataset | |
| X_full_transformed = self.ct.fit_transform(features) | |
| # ----------------------------------------------------- | |
| # 4. Train/test split | |
| # ----------------------------------------------------- | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X_full_transformed, | |
| target, | |
| test_size=0.2, | |
| random_state=42 | |
| ) | |
| # ----------------------------------------------------- | |
| # 5. RandomForest with fixed hyperparameters | |
| # ----------------------------------------------------- | |
| self.model = RandomForestRegressor( | |
| n_estimators=100, | |
| max_depth=4, | |
| min_samples_split=15, | |
| random_state=42 | |
| ) | |
| self.model.fit(X_train, y_train) | |
| # ----------------------------------------------------- | |
| # 6. Evaluate | |
| # ----------------------------------------------------- | |
| mae, r2 = self._evaluate(X_test, y_test) | |
| print(f"[RANDOM FOREST] Test MAE: {mae:.3f}") | |
| print(f"[RANDOM FOREST] Test R^2: {r2:.3f}") | |
| # ------------------------------------------- | |
| # Private: iterative imputation | |
| # ------------------------------------------- | |
| def _impute(self, df, columns): | |
| imp = IterativeImputer(max_iter=5, verbose=2) | |
| arr = imp.fit_transform(df[columns]) | |
| df[columns] = arr | |
| # ------------------------------------------- | |
| # Private: evaluation | |
| # ------------------------------------------- | |
| def _evaluate(self, X_test, y_test): | |
| y_pred = self.model.predict(X_test) | |
| mae = mean_absolute_error(y_test, y_pred) | |
| r2 = r2_score(y_test, y_pred) | |
| return mae, r2 | |
| # ------------------------------------------- | |
| # Public: preprocessing | |
| # ------------------------------------------- | |
| def preprocessing(self, raw_df): | |
| """ | |
| Takes a new DataFrame with the columns the pipeline expects, | |
| and returns the transformed matrix. | |
| """ | |
| return self.ct.transform(raw_df) | |
| # ------------------------------------------- | |
| # Public: predict | |
| # ------------------------------------------- | |
| def predict(self, preprocessed_data): | |
| """ | |
| Takes feature data already processed by `preprocessing`, | |
| returns predictions in the original claim scale. | |
| """ | |
| preds = self.model.predict(preprocessed_data) | |
| return self.postprocessing(preds) | |
| # ------------------------------------------- | |
| # Public: postprocessing | |
| # ------------------------------------------- | |
| def postprocessing(self, preds): | |
| """ | |
| Currently a pass-through, as there's no target scaling to invert. | |
| """ | |
| return preds | |
| if __name__ == "__main__": | |
| # Instantiate and train on "cleaned_insurance_data.csv" | |
| rf_model = RandomForestInsuranceModel("cleaned_insurance_data.csv") | |
| # Export the entire trained class instance (including the ColumnTransformer) | |
| joblib.dump(rf_model, "RandomForestInsuranceModel.joblib") | |
| print("Exported RandomForestInsuranceModel to RandomForestInsuranceModel.joblib") | |