|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.base import BaseEstimator, TransformerMixin |
|
|
from sklearn.model_selection import StratifiedGroupKFold |
|
|
from skopt import BayesSearchCV |
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
from sklearn.svm import SVC |
|
|
from xgboost import XGBClassifier |
|
|
import joblib |
|
|
from skopt.space import Real, Integer, Categorical |
|
|
from sklearn.metrics import classification_report, accuracy_score |
|
|
import json |
|
|
from sklearn.preprocessing import LabelEncoder |
|
|
|
|
|
|
|
|
class TrainModel(BaseEstimator, TransformerMixin): |
|
|
def __init__(self, classifier, train_label, target): |
|
|
|
|
|
|
|
|
self.classifier = classifier |
|
|
self.train_label = train_label |
|
|
self.target = target |
|
|
self.label_encoder = LabelEncoder() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_default_param_space(self, classifier): |
|
|
""" Returns the default hyperparameter space for a given classifier. """ |
|
|
if classifier == 'xgboost': |
|
|
return { |
|
|
'learning_rate': Real(0.01, 0.3, prior='log-uniform'), |
|
|
'n_estimators': Integer(100, 1000), |
|
|
'max_depth': Integer(3, 10), |
|
|
'min_child_weight': (1, 10), |
|
|
'subsample': (0.5, 1.0), |
|
|
'colsample_bytree': (0.5, 1.0), |
|
|
'gamma': (0, 10), |
|
|
'reg_alpha': (0, 10), |
|
|
'reg_lambda': (0, 10), |
|
|
} |
|
|
elif classifier == 'svm': |
|
|
return { |
|
|
'C': Real(0.1, 10, prior='log-uniform'), |
|
|
'kernel': Categorical(['linear', 'rbf']) |
|
|
} |
|
|
elif classifier == 'randomforest': |
|
|
return { |
|
|
'n_estimators': Integer(100, 1000), |
|
|
'max_depth': Integer(3, 10) |
|
|
} |
|
|
else: |
|
|
raise ValueError(f"Unsupported classifier type: {classifier}") |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
|
|
|
if self.target not in X.columns: |
|
|
raise ValueError(f"Target label '{self.target}' not found in the dataset.") |
|
|
|
|
|
|
|
|
print(f"Encoding the target labels for '{self.target}'...") |
|
|
self.label_encoder.fit(X[self.target]) |
|
|
|
|
|
|
|
|
original_labels = list(self.label_encoder.classes_) |
|
|
encoded_labels = list(range(len(original_labels))) |
|
|
label_mapping = dict(zip(encoded_labels, original_labels)) |
|
|
print(f"Label encoding complete. Mapping: {label_mapping}") |
|
|
|
|
|
|
|
|
X['encoded_target'] = self.label_encoder.transform(X[self.target]) |
|
|
|
|
|
|
|
|
value_counts = X['encoded_target'].value_counts().to_dict() |
|
|
print(f"Value counts for encoded target: {value_counts}") |
|
|
print(X.columns) |
|
|
|
|
|
groups = X.pop('groupid') |
|
|
print(f"Group IDs popped from the dataset.") |
|
|
|
|
|
|
|
|
self.train_label = self.train_label.split(",") |
|
|
for label in self.train_label: |
|
|
X.pop(label) |
|
|
|
|
|
print(f"Label columns popped from the dataset.") |
|
|
|
|
|
y = X.pop('encoded_target') |
|
|
print(f"Encoded target column popped from the dataset.") |
|
|
print(X.columns) |
|
|
|
|
|
|
|
|
feature_names = X.columns.tolist() |
|
|
print(f"hallo") |
|
|
|
|
|
classifier = self.classifier |
|
|
if classifier == 'xgboost': |
|
|
model = XGBClassifier(objective='multi:softmax', random_state=42) |
|
|
elif classifier == 'svm': |
|
|
model = SVC(probability=True) |
|
|
elif classifier == 'randomforest': |
|
|
model = RandomForestClassifier(random_state=42) |
|
|
else: |
|
|
raise ValueError(f"Unsupported classifier type: {classifier}") |
|
|
|
|
|
print(f"Training the model using {classifier}...") |
|
|
|
|
|
|
|
|
print(f"Classifier: {classifier}") |
|
|
default_param_space = self.get_default_param_space(classifier) |
|
|
param_space = default_param_space |
|
|
|
|
|
|
|
|
sgkf = StratifiedGroupKFold(n_splits=5) |
|
|
print(f"Parameter space being used: {param_space}") |
|
|
if param_space is None: |
|
|
raise ValueError("Parameter space cannot be None. Please check the classifier configuration.") |
|
|
|
|
|
opt = BayesSearchCV( |
|
|
estimator=model, |
|
|
search_spaces=param_space, |
|
|
cv=sgkf, |
|
|
n_iter=5, |
|
|
n_jobs=-1, |
|
|
n_points=1, |
|
|
verbose=1, |
|
|
scoring='accuracy' |
|
|
) |
|
|
|
|
|
print("Hyperparameter tuning in progress...") |
|
|
print(X.describe(),X.columns) |
|
|
print(f"stop") |
|
|
|
|
|
|
|
|
opt.fit(X, y, groups=groups) |
|
|
self.best_model = opt.best_estimator_ |
|
|
print(f"Best parameters found: {opt.best_params_}") |
|
|
|
|
|
|
|
|
y_pred = self.best_model.predict(X) |
|
|
accuracy = accuracy_score(y, y_pred) |
|
|
report = classification_report(y, y_pred, target_names=self.label_encoder.classes_, output_dict=True) |
|
|
|
|
|
|
|
|
classification_report_json = report |
|
|
with open(f'classification_report_{self.target}.json', 'w') as f: |
|
|
json.dump(classification_report_json, f, indent=4) |
|
|
|
|
|
print(f"Accuracy: {accuracy}") |
|
|
print(f"Classification Report:\n{report}") |
|
|
|
|
|
|
|
|
model_name = f"{classifier}_best_model_{self.target}.pkl" |
|
|
joblib.dump(self.best_model, model_name) |
|
|
print("Model saved successfully.") |
|
|
|
|
|
|
|
|
model_metadata = { |
|
|
"best_params": opt.best_params_, |
|
|
"accuracy": accuracy, |
|
|
"classification_report": classification_report_json, |
|
|
"label_mapping": label_mapping, |
|
|
"model_name": model_name, |
|
|
"value_counts": value_counts, |
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
if hasattr(self.best_model, "feature_importances_"): |
|
|
feature_importances = self.best_model.feature_importances_ |
|
|
|
|
|
feature_importance_dict = {feature: float(importance) for feature, importance in zip(feature_names, feature_importances)} |
|
|
model_metadata["feature_importances"] = feature_importance_dict |
|
|
print("Feature Importances:") |
|
|
for feature, importance in feature_importance_dict.items(): |
|
|
print(f"{feature}: {importance:.4f}") |
|
|
|
|
|
|
|
|
metadata_file = f"{classifier}_model_metadata_{self.target}.json" |
|
|
with open(metadata_file, "w") as f: |
|
|
json.dump(model_metadata, f, indent=4) |
|
|
print(f"Model metadata saved to {metadata_file}.") |
|
|
|
|
|
|
|
|
self.model_file = f"{classifier}_best_model_{self.target}.pkl" |
|
|
self.metadata_file = f"{classifier}_model_metadata_{self.target}.json" |
|
|
|
|
|
return self |
|
|
|
|
|
def get_output_files(self): |
|
|
return self.model_file, self.metadata_file |
|
|
|
|
|
def transform(self, X): |
|
|
return X |