Suvh
Update to v1.1-chatty-luna (2025-12-07)
070061f
import copy
import sklearn
import sklearn.preprocessing
import sklearn.model_selection
import numpy as np
import lime
import lime.lime_tabular
import os
class Bunch(dict):
def __init__(self, *args, **kwargs):
super(Bunch, self).__init__(*args, **kwargs)
self.__dict__ = self
def load_dataset(dataset_name, balance=False, discretize=True, dataset_folder='./'):
if dataset_name == 'adult':
feature_names = ["Age", "Workclass", "fnlwgt", "Education",
"Education-Num", "Marital Status", "Occupation",
"Relationship", "Race", "Sex", "Capital Gain",
"Capital Loss", "Hours per week", "Country", 'Income']
features_to_use = [0, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
categorical_features = [1, 5, 6, 7, 8, 9, 13]
dataset = load_csv_dataset(
os.path.join(dataset_folder, 'adult/adult.data'), -1, ', ',
feature_names=feature_names, features_to_use=features_to_use,
categorical_features=categorical_features, discretize=discretize,
balance=balance, feature_transformations=None)
elif dataset_name == 'german-credit':
categorical_features = [1, 2, 3, 4, 5, 8]
dataset = load_csv_dataset(
os.path.join(dataset_folder, 'german-credit/german_credit_data.csv'), -1, ',',
categorical_features=categorical_features, discretize=discretize,
balance=balance)
else:
raise ValueError(f"Unsupported dataset: {dataset_name}")
return dataset
def load_csv_dataset(data, target_idx, delimiter=',',
feature_names=None, categorical_features=None,
features_to_use=None, feature_transformations=None,
discretize=False, balance=False, fill_na='-1', filter_fn=None, skip_first=False):
if feature_transformations is None:
feature_transformations = {}
try:
data = np.genfromtxt(data, delimiter=delimiter, dtype='|S128')
except:
import pandas
data = pandas.read_csv(data,
header=None,
delimiter=delimiter,
na_filter=True,
dtype=str).fillna(fill_na).values
if target_idx < 0:
target_idx = data.shape[1] + target_idx
ret = Bunch({})
if feature_names is None:
feature_names = list(data[0])
data = data[1:]
else:
feature_names = copy.deepcopy(feature_names)
if skip_first:
data = data[1:]
if filter_fn is not None:
data = filter_fn(data)
for feature, fun in feature_transformations.items():
data[:, feature] = fun(data[:, feature])
labels = data[:, target_idx]
le = sklearn.preprocessing.LabelEncoder()
le.fit(labels)
ret['labels'] = le.transform(labels)
labels = ret['labels']
ret['class_names'] = list(le.classes_)
ret['class_target'] = feature_names[target_idx]
if features_to_use is not None:
data = data[:, features_to_use]
feature_names = ([x for i, x in enumerate(feature_names)
if i in features_to_use])
if categorical_features is not None:
categorical_features = ([features_to_use.index(x)
for x in categorical_features])
else:
data = np.delete(data, target_idx, 1)
feature_names.pop(target_idx)
if categorical_features:
categorical_features = ([x if x < target_idx else x - 1
for x in categorical_features])
if categorical_features is None:
categorical_features = []
for f in range(data.shape[1]):
if len(np.unique(data[:, f])) < 20:
categorical_features.append(f)
categorical_names = {}
for feature in categorical_features:
le = sklearn.preprocessing.LabelEncoder()
le.fit(data[:, feature])
data[:, feature] = le.transform(data[:, feature])
categorical_names[feature] = le.classes_
data = data.astype(float)
ordinal_features = []
if discretize:
disc = lime.lime_tabular.QuartileDiscretizer(data,
categorical_features,
feature_names)
data = disc.discretize(data)
ordinal_features = [x for x in range(data.shape[1])
if x not in categorical_features]
categorical_features = list(range(data.shape[1]))
categorical_names.update(disc.names)
for x in categorical_names:
categorical_names[x] = [y.decode() if type(y) == np.bytes_ else y for y in categorical_names[x]]
ret['ordinal_features'] = ordinal_features
ret['categorical_features'] = categorical_features
ret['categorical_names'] = categorical_names
ret['feature_names'] = feature_names
np.random.seed(1)
if balance:
idxs = np.array([], dtype='int')
min_labels = np.min(np.bincount(labels))
for label in np.unique(labels):
idx = np.random.choice(np.where(labels == label)[0], min_labels)
idxs = np.hstack((idxs, idx))
data = data[idxs]
labels = labels[idxs]
ret['data'] = data
ret['labels'] = labels
splits = sklearn.model_selection.ShuffleSplit(n_splits=1,
test_size=.2,
random_state=1)
train_idx, test_idx = [x for x in splits.split(data)][0]
ret['train'] = data[train_idx]
ret['labels_train'] = labels[train_idx]
cv_splits = sklearn.model_selection.ShuffleSplit(n_splits=1,
test_size=.5,
random_state=1)
cv_idx, ntest_idx = [x for x in cv_splits.split(test_idx)][0]
cv_idx = test_idx[cv_idx]
test_idx = test_idx[ntest_idx]
ret['validation'] = data[cv_idx]
ret['labels_validation'] = labels[cv_idx]
ret['test'] = data[test_idx]
ret['labels_test'] = labels[test_idx]
ret['test_idx'] = test_idx
ret['validation_idx'] = cv_idx
ret['train_idx'] = train_idx
ret['data'] = data
return ret
import logging
def print_log(turn, msg=None, state=None):
if turn == "xagent":
print(f"\033[1m\033[94mX-Agent:\033[0m")
if msg is not None:
print(msg)
if turn == "user":
print('\033[91m\033[1mUser:\033[0m')
msg = input()
logging.log(25, f"{turn}: {msg}")
if state is not None:
logging.log(25, state)
return msg
def ask_for_feature(agent):
if len(agent.l_exist_features) == 0:
msg = "which feature?"
print_log("xagent", msg)
user_input = print_log("user")
while user_input not in agent.l_features:
msg = f"please choose one of the following features: {agent.l_features}"
print_log("xagent", msg)
user_input = print_log("user")
agent.l_exist_features.append(user_input)
def map_array_values(array, value_map):
ret = array.copy()
for src, target in value_map.items():
ret[ret == src] = target
return ret
def replace_binary_values(array, values):
return map_array_values(array, {'0': values[0], '1': values[1]})
def log_user_feedback(feedback, save_path):
# Save feedback to a local file (append mode)
try:
with open(save_path, 'a', encoding='utf-8') as f:
f.write(str(feedback) + '\n')
except Exception as e:
print(f"Error saving feedback: {e}")