Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from sklearn.metrics import accuracy_score, fbeta_score, confusion_matrix, ConfusionMatrixDisplay | |
| from sklearn.utils.class_weight import compute_sample_weight | |
| import pickle as pkl | |
| from tqdm import tqdm | |
| import time | |
| import os | |
| import shutil | |
| import json | |
| from copy import deepcopy | |
| from helpers.required_classes import * | |
| def log(*args): | |
| print(*args, flush=True) | |
| def train_code_classifier(vecs_train_codes, vecs_test_for_groups, | |
| labels_train_codes, labels_test_groups_codes, labels_test_groups_groups, | |
| labels_train_groups, | |
| models_folder, group_name, balance=None, logging=True, use_gpu=True): | |
| """ | |
| balance - is a type of balancing dataset: | |
| remove - remove items per class until amount texts per clas is not the same as minimum amount | |
| duplicate - duplicate items per class until amount texts per clas is not the same as maximum amount | |
| weight - weighted training model | |
| None - without any balancing method | |
| """ | |
| log(f"training model for codes classifiers in group {group_name}") | |
| # create / remove folder | |
| experiment_path = f"{models_folder}/{group_name}" | |
| if not os.path.exists(experiment_path): | |
| os.makedirs(experiment_path, exist_ok=True) | |
| else: | |
| shutil.rmtree(experiment_path) | |
| os.makedirs(experiment_path, exist_ok=True) | |
| labels_train_for_group = labels_train_codes[labels_train_groups==group_name] | |
| if logging: | |
| log(f"e.g. labels in the group: {labels_train_for_group[:3]} cng of codes: {len(np.unique(labels_train_for_group))} cnt of texts: {len(labels_train_for_group)}") | |
| # prepare train labels | |
| if len(np.unique(labels_train_for_group)) < 2: | |
| # if group have only one code inside | |
| code_name = labels_train_for_group[0] | |
| if logging: | |
| log(f'group {group_name} have only one code inside {code_name}') | |
| simple_clf = SimpleModel() | |
| simple_clf.fit([], [code_name]) | |
| pkl.dump(simple_clf, open(f"{experiment_path}/{group_name}_code_clf.pkl", 'wb')) | |
| return {"f1_score": 'one_cls', "accuracy": 'one_cls'} | |
| sample_weights = compute_sample_weight( | |
| class_weight='balanced', | |
| y=labels_train_for_group | |
| ) | |
| # prepare other data | |
| vecs_train_for_group = vecs_train_codes[labels_train_groups==group_name] | |
| vecs_test_for_group = vecs_test_for_groups[labels_test_groups_groups==group_name] | |
| labels_test_for_group = labels_test_groups_codes[labels_test_groups_groups==group_name] | |
| labels_train_for_group, vecs_train_for_group = balance_dataset( | |
| labels_train_for_group, vecs_train_for_group, balance=balance | |
| ) | |
| fit_start_time = time.time() | |
| model = CustomXGBoost(use_gpu) | |
| if balance == 'weight': | |
| model.fit(vecs_train_for_group, labels_train_for_group, sample_weight=sample_weights) | |
| else: | |
| model.fit(vecs_train_for_group, labels_train_for_group) | |
| pkl.dump(model, open(f"{experiment_path}/{group_name}_code_clf.pkl", 'wb')) | |
| if logging: | |
| log(f'Trained in {time.time() - fit_start_time}s') | |
| pred_start_time = time.time() | |
| predictions_group = model.predict(vecs_test_for_group) | |
| scores = { | |
| "f1_score": fbeta_score(labels_test_for_group, predictions_group, beta=1, average='macro'), | |
| "accuracy": accuracy_score(labels_test_for_group, predictions_group) | |
| } | |
| if logging: | |
| log(scores, f'Predicted in {time.time() - pred_start_time}s') | |
| with open(f"{experiment_path}/{group_name}_scores.json", 'w') as f: | |
| f.write(json.dumps(scores)) | |
| conf_matrix = confusion_matrix(labels_test_for_group, predictions_group) | |
| disp_code = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, | |
| display_labels=model.classes_, ) | |
| fig, ax = plt.subplots(figsize=(5,5)) | |
| disp_code.plot(ax=ax) | |
| plt.xticks(rotation=90) | |
| plt.savefig(f"{experiment_path}/{group_name}_matrix.png") | |
| return scores | |
| def train_codes_for_groups(vecs_train_codes, vecs_test_groups, | |
| labels_train_codes, labels_test_groups_codes, labels_test_groups_groups, | |
| labels_train_groups, | |
| output_path, logging, use_gpu=True): | |
| all_scores = [] | |
| for group_name in tqdm(np.unique(labels_train_groups)): | |
| row = {'group': group_name} | |
| for balanced_method in ['weight']: # [None, 'remove', 'weight', 'duplicate']: | |
| if logging: | |
| log('\n', '-'*50) | |
| scores = train_code_classifier(vecs_train_codes, vecs_test_groups, | |
| labels_train_codes, labels_test_groups_codes, labels_test_groups_groups, | |
| labels_train_groups, | |
| output_path, group_name, balanced_method, logging, use_gpu) | |
| scores = {f"{balanced_method}_{k}": v for k, v in scores.items()} | |
| row.update(scores) | |
| all_scores.append(row) | |
| df = pd.DataFrame(all_scores) | |
| columns = df.columns.tolist() | |
| columns.remove('group') | |
| mean_scores = {'group': 'MEAN'} | |
| for score_name in columns: | |
| mean_score = df[df[score_name] != 'one_cls'][score_name].mean() | |
| mean_scores.update({score_name: float(mean_score)}) | |
| df = pd.concat([df, pd.DataFrame([mean_scores])], ignore_index=True) | |
| return df | |
| def make_experiment_classifier(vecs_train_codes, vecs_test_codes, vecs_test_group, | |
| labels_train_codes, labels_test_codes, | |
| labels_test_groups, labels_train_groups, | |
| sample_weights_codes, sample_weights_groups, | |
| texts_test_codes, texts_test_groups, | |
| experiment_name, classifier_model_code, classifier_model_group, experiment_path, balance=None): | |
| # train different models as base model for group and codes | |
| log(f'Model: {experiment_name}') | |
| # create / remove experiment folder | |
| experiment_path = f"{experiment_path}/{experiment_name}" | |
| if not os.path.exists(experiment_path): | |
| os.makedirs(experiment_path, exist_ok=True) | |
| else: | |
| shutil.rmtree(experiment_path) | |
| os.makedirs(experiment_path, exist_ok=True) | |
| # fit the models | |
| cls_codes = deepcopy(classifier_model_code) | |
| cls_groups = deepcopy(classifier_model_group) | |
| labels_train_codes_balanced, vecs_train_codes_balanced = balance_dataset( | |
| labels_train_codes, vecs_train_codes, balance=balance | |
| ) | |
| labels_train_groups_balanced, vecs_train_codes_balanced = balance_dataset( | |
| labels_train_groups, vecs_train_codes, balance=balance | |
| ) | |
| log('start training base model') | |
| if balance == 'weight': | |
| try: | |
| start_time = time.time() | |
| cls_codes.fit(vecs_train_codes_balanced, labels_train_codes_balanced, sample_weight=sample_weights_codes) | |
| log(f'codes classify trained in {(time.time() - start_time) / 60}m') | |
| start_time = time.time() | |
| cls_groups.fit(vecs_train_codes_balanced, labels_train_groups_balanced, sample_weight=sample_weights_groups) | |
| log(f'groups classify trained in {(time.time() - start_time) / 60}m') | |
| except Exception as e: | |
| log(str(e)) | |
| start_time = time.time() | |
| cls_codes.fit(vecs_train_codes_balanced, labels_train_codes_balanced) | |
| log(f'codes classify trained in {(time.time() - start_time) / 60}m') | |
| start_time = time.time() | |
| cls_groups.fit(vecs_train_codes_balanced, labels_train_groups_balanced) | |
| log(f'groups classify trained in {(time.time() - start_time) / 60}m') | |
| else: | |
| start_time = time.time() | |
| cls_codes.fit(vecs_train_codes_balanced, labels_train_codes_balanced) | |
| log(f'codes classify trained in {(time.time() - start_time) / 60}m') | |
| start_time = time.time() | |
| cls_groups.fit(vecs_train_codes_balanced, labels_train_groups_balanced) | |
| log(f'groups classify trained in {(time.time() - start_time) / 60}m') | |
| pkl.dump(cls_codes, open(f"{experiment_path}/{experiment_name}_codes.pkl", 'wb')) | |
| pkl.dump(cls_groups, open(f"{experiment_path}/{experiment_name}_groups.pkl", 'wb')) | |
| # inference the model | |
| predictions_code = cls_codes.predict(vecs_test_codes) | |
| predictions_group = cls_groups.predict(vecs_test_group) | |
| scores = { | |
| "f1_score_code": fbeta_score(labels_test_codes, predictions_code, beta=1, average='macro'), | |
| "f1_score_group": fbeta_score(labels_test_groups, predictions_group, beta=1, average='macro'), | |
| "accuracy_code": accuracy_score(labels_test_codes, predictions_code), | |
| "accuracy_group": accuracy_score(labels_test_groups, predictions_group) | |
| } | |
| with open(f"{experiment_path}/{experiment_name}_scores.json", 'w') as f: | |
| f.write(json.dumps(scores)) | |
| conf_matrix = confusion_matrix(labels_test_codes, predictions_code) | |
| disp_code = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, | |
| display_labels=cls_codes.classes_, ) | |
| fig, ax = plt.subplots(figsize=(20,20)) | |
| disp_code.plot(ax=ax) | |
| plt.xticks(rotation=90) | |
| plt.savefig(f"{experiment_path}/{experiment_name}_codes_matrix.png") | |
| conf_matrix = confusion_matrix(labels_test_groups, predictions_group) | |
| disp_group = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, | |
| display_labels=cls_groups.classes_, ) | |
| fig, ax = plt.subplots(figsize=(20,20)) | |
| disp_group.plot(ax=ax) | |
| plt.xticks(rotation=90) | |
| plt.savefig(f"{experiment_path}/{experiment_name}_groups_matrix.png") | |
| pd.DataFrame({'codes': predictions_code, 'truth': labels_test_codes, 'text': texts_test_codes}).to_csv(f"{experiment_path}/{experiment_name}_pred_codes.csv") | |
| pd.DataFrame({'groups': predictions_group, 'truth': labels_test_groups, 'text': texts_test_groups}).to_csv(f"{experiment_path}/{experiment_name}_pred_groups.csv") | |
| return predictions_code, predictions_group, scores | |
| def train_base_clfs(classifiers, vecs_train_codes, vecs_test_codes, vecs_test_group, | |
| labels_train_codes, labels_test_codes, | |
| labels_test_groups_codes, labels_test_groups_groups, labels_train_groups, | |
| sample_weights_codes, sample_weights_groups, | |
| texts_test_codes, texts_test_groups, output_path): | |
| results = '' | |
| for experiment_data in classifiers: | |
| for balanced_method in ['weight']: | |
| exp_name = experiment_data['name'] | |
| cls_model = experiment_data['model'] | |
| _, _, scores = make_experiment_classifier(vecs_train_codes, vecs_test_codes, vecs_test_group, | |
| labels_train_codes, labels_test_codes, | |
| labels_test_groups_groups, labels_train_groups, | |
| sample_weights_codes, sample_weights_groups, | |
| texts_test_codes, texts_test_groups, | |
| exp_name, cls_model, cls_model, output_path, balance=None) | |
| res = f"\n\n{exp_name} balanced by: {balanced_method} scores: {scores}" | |
| results += res | |
| log(res) | |
| return results | |