Spaces:
Sleeping
Sleeping
File size: 5,935 Bytes
5de1466 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | import pandas as pd
import numpy as np
import joblib
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, r2_score
class RandomForestInsuranceModel:
"""
A Random Forest regressor class with:
1. Data loading & cleaning (iterative imputation, outlier clipping)
2. A fixed set of hyperparameters (n_estimators=100, max_depth=4, min_samples_split=15)
3. A ColumnTransformer for numeric & categorical data
4. Consistent API: preprocessing, predict, postprocessing
"""
def __init__(self, csv_path):
"""
Loads the CSV, cleans data, sets up the column transformer,
trains a RandomForestRegressor with fixed hyperparameters,
and evaluates on a test set.
"""
# -----------------------------------------------------
# 1. Load and clean data
# -----------------------------------------------------
df = pd.read_csv(csv_path)
# Drop irrelevant columns if present, remove any leftover NaNs
df = df.drop(columns=["index", "PatientID"], errors="ignore").dropna()
# Apply iterative imputation for the specified columns
self._impute(df, columns=['age', 'bmi', 'bloodpressure'])
# Clip outliers in 'claim' (1st to 98th percentile)
lower_percentile = df['claim'].quantile(0.01)
upper_percentile = df['claim'].quantile(0.98)
df = df[
(df['claim'] >= lower_percentile) & (df['claim'] <= upper_percentile)
]
# -----------------------------------------------------
# 2. Separate features & target
# -----------------------------------------------------
features = df.drop(columns=['claim'])
target = df['claim'].values # or df['claim'].to_numpy()
# -----------------------------------------------------
# 3. Create ColumnTransformer
# -----------------------------------------------------
text_pipeline = Pipeline([
('one-hot', OneHotEncoder(handle_unknown='ignore'))
])
nums_pipeline = Pipeline([
('normalize', StandardScaler(with_mean=False))
])
self.ct = ColumnTransformer([
('categorical', text_pipeline, ['diabetic', 'gender', 'region', 'smoker']),
('numerical', nums_pipeline, ['children', 'age', 'bmi', 'bloodpressure'])
])
# Fit the ColumnTransformer on the entire dataset
X_full_transformed = self.ct.fit_transform(features)
# -----------------------------------------------------
# 4. Train/test split
# -----------------------------------------------------
X_train, X_test, y_train, y_test = train_test_split(
X_full_transformed,
target,
test_size=0.2,
random_state=42
)
# -----------------------------------------------------
# 5. RandomForest with fixed hyperparameters
# -----------------------------------------------------
self.model = RandomForestRegressor(
n_estimators=100,
max_depth=4,
min_samples_split=15,
random_state=42
)
self.model.fit(X_train, y_train)
# -----------------------------------------------------
# 6. Evaluate
# -----------------------------------------------------
mae, r2 = self._evaluate(X_test, y_test)
print(f"[RANDOM FOREST] Test MAE: {mae:.3f}")
print(f"[RANDOM FOREST] Test R^2: {r2:.3f}")
# -------------------------------------------
# Private: iterative imputation
# -------------------------------------------
def _impute(self, df, columns):
imp = IterativeImputer(max_iter=5, verbose=2)
arr = imp.fit_transform(df[columns])
df[columns] = arr
# -------------------------------------------
# Private: evaluation
# -------------------------------------------
def _evaluate(self, X_test, y_test):
y_pred = self.model.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
return mae, r2
# -------------------------------------------
# Public: preprocessing
# -------------------------------------------
def preprocessing(self, raw_df):
"""
Takes a new DataFrame with the columns the pipeline expects,
and returns the transformed matrix.
"""
return self.ct.transform(raw_df)
# -------------------------------------------
# Public: predict
# -------------------------------------------
def predict(self, preprocessed_data):
"""
Takes feature data already processed by `preprocessing`,
returns predictions in the original claim scale.
"""
preds = self.model.predict(preprocessed_data)
return self.postprocessing(preds)
# -------------------------------------------
# Public: postprocessing
# -------------------------------------------
def postprocessing(self, preds):
"""
Currently a pass-through, as there's no target scaling to invert.
"""
return preds
if __name__ == "__main__":
# Instantiate and train on "cleaned_insurance_data.csv"
rf_model = RandomForestInsuranceModel("cleaned_insurance_data.csv")
# Export the entire trained class instance (including the ColumnTransformer)
joblib.dump(rf_model, "RandomForestInsuranceModel.joblib")
print("Exported RandomForestInsuranceModel to RandomForestInsuranceModel.joblib")
|