import copy import sklearn import sklearn.preprocessing import sklearn.model_selection import numpy as np import lime import lime.lime_tabular import os class Bunch(dict): def __init__(self, *args, **kwargs): super(Bunch, self).__init__(*args, **kwargs) self.__dict__ = self def load_dataset(dataset_name, balance=False, discretize=True, dataset_folder='./'): if dataset_name == 'adult': feature_names = ["Age", "Workclass", "fnlwgt", "Education", "Education-Num", "Marital Status", "Occupation", "Relationship", "Race", "Sex", "Capital Gain", "Capital Loss", "Hours per week", "Country", 'Income'] features_to_use = [0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] categorical_features = [1, 5, 6, 7, 8, 9, 13] dataset = load_csv_dataset( os.path.join(dataset_folder, 'adult/adult.data'), -1, ', ', feature_names=feature_names, features_to_use=features_to_use, categorical_features=categorical_features, discretize=discretize, balance=balance, feature_transformations=None) elif dataset_name == 'german-credit': categorical_features = [1, 2, 3, 4, 5, 8] dataset = load_csv_dataset( os.path.join(dataset_folder, 'german-credit/german_credit_data.csv'), -1, ',', categorical_features=categorical_features, discretize=discretize, balance=balance) else: raise ValueError(f"Unsupported dataset: {dataset_name}") return dataset def load_csv_dataset(data, target_idx, delimiter=',', feature_names=None, categorical_features=None, features_to_use=None, feature_transformations=None, discretize=False, balance=False, fill_na='-1', filter_fn=None, skip_first=False): if feature_transformations is None: feature_transformations = {} try: data = np.genfromtxt(data, delimiter=delimiter, dtype='|S128') except: import pandas data = pandas.read_csv(data, header=None, delimiter=delimiter, na_filter=True, dtype=str).fillna(fill_na).values if target_idx < 0: target_idx = data.shape[1] + target_idx ret = Bunch({}) if feature_names is None: feature_names = list(data[0]) data = data[1:] else: feature_names = copy.deepcopy(feature_names) if skip_first: data = data[1:] if filter_fn is not None: data = filter_fn(data) for feature, fun in feature_transformations.items(): data[:, feature] = fun(data[:, feature]) labels = data[:, target_idx] le = sklearn.preprocessing.LabelEncoder() le.fit(labels) ret['labels'] = le.transform(labels) labels = ret['labels'] ret['class_names'] = list(le.classes_) ret['class_target'] = feature_names[target_idx] if features_to_use is not None: data = data[:, features_to_use] feature_names = ([x for i, x in enumerate(feature_names) if i in features_to_use]) if categorical_features is not None: categorical_features = ([features_to_use.index(x) for x in categorical_features]) else: data = np.delete(data, target_idx, 1) feature_names.pop(target_idx) if categorical_features: categorical_features = ([x if x < target_idx else x - 1 for x in categorical_features]) if categorical_features is None: categorical_features = [] for f in range(data.shape[1]): if len(np.unique(data[:, f])) < 20: categorical_features.append(f) categorical_names = {} for feature in categorical_features: le = sklearn.preprocessing.LabelEncoder() le.fit(data[:, feature]) data[:, feature] = le.transform(data[:, feature]) categorical_names[feature] = le.classes_ data = data.astype(float) ordinal_features = [] if discretize: disc = lime.lime_tabular.QuartileDiscretizer(data, categorical_features, feature_names) data = disc.discretize(data) ordinal_features = [x for x in range(data.shape[1]) if x not in categorical_features] categorical_features = list(range(data.shape[1])) categorical_names.update(disc.names) for x in categorical_names: categorical_names[x] = [y.decode() if type(y) == np.bytes_ else y for y in categorical_names[x]] ret['ordinal_features'] = ordinal_features ret['categorical_features'] = categorical_features ret['categorical_names'] = categorical_names ret['feature_names'] = feature_names np.random.seed(1) if balance: idxs = np.array([], dtype='int') min_labels = np.min(np.bincount(labels)) for label in np.unique(labels): idx = np.random.choice(np.where(labels == label)[0], min_labels) idxs = np.hstack((idxs, idx)) data = data[idxs] labels = labels[idxs] ret['data'] = data ret['labels'] = labels splits = sklearn.model_selection.ShuffleSplit(n_splits=1, test_size=.2, random_state=1) train_idx, test_idx = [x for x in splits.split(data)][0] ret['train'] = data[train_idx] ret['labels_train'] = labels[train_idx] cv_splits = sklearn.model_selection.ShuffleSplit(n_splits=1, test_size=.5, random_state=1) cv_idx, ntest_idx = [x for x in cv_splits.split(test_idx)][0] cv_idx = test_idx[cv_idx] test_idx = test_idx[ntest_idx] ret['validation'] = data[cv_idx] ret['labels_validation'] = labels[cv_idx] ret['test'] = data[test_idx] ret['labels_test'] = labels[test_idx] ret['test_idx'] = test_idx ret['validation_idx'] = cv_idx ret['train_idx'] = train_idx ret['data'] = data return ret import logging def print_log(turn, msg=None, state=None): if turn == "xagent": print(f"\033[1m\033[94mX-Agent:\033[0m") if msg is not None: print(msg) if turn == "user": print('\033[91m\033[1mUser:\033[0m') msg = input() logging.log(25, f"{turn}: {msg}") if state is not None: logging.log(25, state) return msg def ask_for_feature(agent): if len(agent.l_exist_features) == 0: msg = "which feature?" print_log("xagent", msg) user_input = print_log("user") while user_input not in agent.l_features: msg = f"please choose one of the following features: {agent.l_features}" print_log("xagent", msg) user_input = print_log("user") agent.l_exist_features.append(user_input) def map_array_values(array, value_map): ret = array.copy() for src, target in value_map.items(): ret[ret == src] = target return ret def replace_binary_values(array, values): return map_array_values(array, {'0': values[0], '1': values[1]}) def log_user_feedback(feedback, save_path): # Save feedback to a local file (append mode) try: with open(save_path, 'a', encoding='utf-8') as f: f.write(str(feedback) + '\n') except Exception as e: print(f"Error saving feedback: {e}")