Spaces:
Sleeping
Sleeping
File size: 7,904 Bytes
070061f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
import copy
import sklearn
import sklearn.preprocessing
import sklearn.model_selection
import numpy as np
import lime
import lime.lime_tabular
import os
class Bunch(dict):
def __init__(self, *args, **kwargs):
super(Bunch, self).__init__(*args, **kwargs)
self.__dict__ = self
def load_dataset(dataset_name, balance=False, discretize=True, dataset_folder='./'):
if dataset_name == 'adult':
feature_names = ["Age", "Workclass", "fnlwgt", "Education",
"Education-Num", "Marital Status", "Occupation",
"Relationship", "Race", "Sex", "Capital Gain",
"Capital Loss", "Hours per week", "Country", 'Income']
features_to_use = [0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
categorical_features = [1, 5, 6, 7, 8, 9, 13]
dataset = load_csv_dataset(
os.path.join(dataset_folder, 'adult/adult.data'), -1, ', ',
feature_names=feature_names, features_to_use=features_to_use,
categorical_features=categorical_features, discretize=discretize,
balance=balance, feature_transformations=None)
elif dataset_name == 'german-credit':
categorical_features = [1, 2, 3, 4, 5, 8]
dataset = load_csv_dataset(
os.path.join(dataset_folder, 'german-credit/german_credit_data.csv'), -1, ',',
categorical_features=categorical_features, discretize=discretize,
balance=balance)
else:
raise ValueError(f"Unsupported dataset: {dataset_name}")
return dataset
def load_csv_dataset(data, target_idx, delimiter=',',
feature_names=None, categorical_features=None,
features_to_use=None, feature_transformations=None,
discretize=False, balance=False, fill_na='-1', filter_fn=None, skip_first=False):
if feature_transformations is None:
feature_transformations = {}
try:
data = np.genfromtxt(data, delimiter=delimiter, dtype='|S128')
except:
import pandas
data = pandas.read_csv(data,
header=None,
delimiter=delimiter,
na_filter=True,
dtype=str).fillna(fill_na).values
if target_idx < 0:
target_idx = data.shape[1] + target_idx
ret = Bunch({})
if feature_names is None:
feature_names = list(data[0])
data = data[1:]
else:
feature_names = copy.deepcopy(feature_names)
if skip_first:
data = data[1:]
if filter_fn is not None:
data = filter_fn(data)
for feature, fun in feature_transformations.items():
data[:, feature] = fun(data[:, feature])
labels = data[:, target_idx]
le = sklearn.preprocessing.LabelEncoder()
le.fit(labels)
ret['labels'] = le.transform(labels)
labels = ret['labels']
ret['class_names'] = list(le.classes_)
ret['class_target'] = feature_names[target_idx]
if features_to_use is not None:
data = data[:, features_to_use]
feature_names = ([x for i, x in enumerate(feature_names)
if i in features_to_use])
if categorical_features is not None:
categorical_features = ([features_to_use.index(x)
for x in categorical_features])
else:
data = np.delete(data, target_idx, 1)
feature_names.pop(target_idx)
if categorical_features:
categorical_features = ([x if x < target_idx else x - 1
for x in categorical_features])
if categorical_features is None:
categorical_features = []
for f in range(data.shape[1]):
if len(np.unique(data[:, f])) < 20:
categorical_features.append(f)
categorical_names = {}
for feature in categorical_features:
le = sklearn.preprocessing.LabelEncoder()
le.fit(data[:, feature])
data[:, feature] = le.transform(data[:, feature])
categorical_names[feature] = le.classes_
data = data.astype(float)
ordinal_features = []
if discretize:
disc = lime.lime_tabular.QuartileDiscretizer(data,
categorical_features,
feature_names)
data = disc.discretize(data)
ordinal_features = [x for x in range(data.shape[1])
if x not in categorical_features]
categorical_features = list(range(data.shape[1]))
categorical_names.update(disc.names)
for x in categorical_names:
categorical_names[x] = [y.decode() if type(y) == np.bytes_ else y for y in categorical_names[x]]
ret['ordinal_features'] = ordinal_features
ret['categorical_features'] = categorical_features
ret['categorical_names'] = categorical_names
ret['feature_names'] = feature_names
np.random.seed(1)
if balance:
idxs = np.array([], dtype='int')
min_labels = np.min(np.bincount(labels))
for label in np.unique(labels):
idx = np.random.choice(np.where(labels == label)[0], min_labels)
idxs = np.hstack((idxs, idx))
data = data[idxs]
labels = labels[idxs]
ret['data'] = data
ret['labels'] = labels
splits = sklearn.model_selection.ShuffleSplit(n_splits=1,
test_size=.2,
random_state=1)
train_idx, test_idx = [x for x in splits.split(data)][0]
ret['train'] = data[train_idx]
ret['labels_train'] = labels[train_idx]
cv_splits = sklearn.model_selection.ShuffleSplit(n_splits=1,
test_size=.5,
random_state=1)
cv_idx, ntest_idx = [x for x in cv_splits.split(test_idx)][0]
cv_idx = test_idx[cv_idx]
test_idx = test_idx[ntest_idx]
ret['validation'] = data[cv_idx]
ret['labels_validation'] = labels[cv_idx]
ret['test'] = data[test_idx]
ret['labels_test'] = labels[test_idx]
ret['test_idx'] = test_idx
ret['validation_idx'] = cv_idx
ret['train_idx'] = train_idx
ret['data'] = data
return ret
import logging
def print_log(turn, msg=None, state=None):
if turn == "xagent":
print(f"\033[1m\033[94mX-Agent:\033[0m")
if msg is not None:
print(msg)
if turn == "user":
print('\033[91m\033[1mUser:\033[0m')
msg = input()
logging.log(25, f"{turn}: {msg}")
if state is not None:
logging.log(25, state)
return msg
def ask_for_feature(agent):
if len(agent.l_exist_features) == 0:
msg = "which feature?"
print_log("xagent", msg)
user_input = print_log("user")
while user_input not in agent.l_features:
msg = f"please choose one of the following features: {agent.l_features}"
print_log("xagent", msg)
user_input = print_log("user")
agent.l_exist_features.append(user_input)
def map_array_values(array, value_map):
ret = array.copy()
for src, target in value_map.items():
ret[ret == src] = target
return ret
def replace_binary_values(array, values):
return map_array_values(array, {'0': values[0], '1': values[1]})
def log_user_feedback(feedback, save_path):
# Save feedback to a local file (append mode)
try:
with open(save_path, 'a', encoding='utf-8') as f:
f.write(str(feedback) + '\n')
except Exception as e:
print(f"Error saving feedback: {e}")
|