import sympy as sp import numpy as np from sklearn.metrics import mutual_info_score # 符号 import sympy as sp import pandas as pd # symbols X = sp.Symbol("X") Y = sp.Symbol("Y") Z = sp.Symbol("Z") class MI(sp.Function): nargs = (2,) class CMI(sp.Function): nargs = (3,) class II(sp.Function): nargs = (3,) # interaction information ALLOWED_LOCALS = { "X": X, "Y": Y, "Z": Z, "I": MI, # I(X,Y) "CI": CMI, # I(X,Y|Z) 条件互信息 "II": II # I(X;Y;Z)交互信息 } def parse_expression(expr_str: str) -> sp.Expr: """ String → SymPy Expression """ expr = sp.sympify(expr_str, locals=ALLOWED_LOCALS) return expr def entropy(x):#计算熵 _, cnt = np.unique(x, return_counts=True) p = cnt / cnt.sum() return -np.sum(p * np.log(p + 1e-12)) def mi(x, y):#互信息 return mutual_info_score(x, y) def cmi(x, y, z):#条件互信息(通过熵的加减计算) # I(X;Y|Z) = H(X,Z)+H(Y,Z)-H(Z)-H(X,Y,Z) return ( entropy(np.c_[x, z].tolist()) + entropy(np.c_[y, z].tolist()) - entropy(z) - entropy(np.c_[x, y, z].tolist()) ) def interaction_info(x, y, z):#交互信息 # I(X;Y;Z) = I(X;Y) - I(X;Y|Z) return mi(x, y) - cmi(x, y, z) def expr_to_callable(expr: sp.Expr): def eval_node(node, ctx): if isinstance(node, MI): return mi(eval_node(node.args[0], ctx), eval_node(node.args[1], ctx)) if isinstance(node, CMI): return cmi(eval_node(node.args[0], ctx), eval_node(node.args[1], ctx), eval_node(node.args[2], ctx)) if isinstance(node, II): return interaction_info( eval_node(node.args[0], ctx), eval_node(node.args[1], ctx), eval_node(node.args[2], ctx) ) if node == X: return ctx["X"] if node == Y: return ctx["Y"] if node == Z: return ctx["Z"] if node.is_Number: return float(node) if node.is_Add: return sum(eval_node(arg, ctx) for arg in node.args) if node.is_Mul: r = 1.0 for arg in node.args: r *= eval_node(arg, ctx) return r if node.is_Pow: base, exp = node.args return eval_node(base, ctx) ** eval_node(exp, ctx) raise ValueError(f"Unsupported node: {node}") def f(X_arr, Y_arr, Z_arr=None): ctx = {"X": X_arr, "Y": Y_arr} if Z_arr is not None: ctx["Z"] = Z_arr return eval_node(expr, ctx) return f from sklearn.preprocessing import LabelEncoder def changetosinge(x): return float(x) # scores = f(X, y, X_other_list) def prepare_data(dataname, base_url): url = os.path.join(base_url, dataname + '.mat') data = scio.loadmat(url) X0 = pd.DataFrame(data['X']) y0 = pd.DataFrame(data['Y']) if dataname == 'Dermatology': Special = X0.iloc[:, -1] a = np.array([item[0] for item in Special]) label_encoder = LabelEncoder() a33 = label_encoder.fit_transform(a) X0 = X0.iloc[:, :-1] X0[33] = a33 X0 = X0.applymap(changetosinge) y0 = y0.applymap(changetosinge) label_encoder = LabelEncoder() y_encoded = label_encoder.fit_transform(y0) y = pd.DataFrame(y_encoded) X = pd.DataFrame() for col in X0.columns: X[col] = pd.cut(X0[col], bins=5, labels=False) new_columns = [str(i) for i in range(X.shape[1] + 1)] X = X.rename(columns=dict(zip(X.columns, new_columns[:-1]))) y = y.rename(columns=dict(zip(y.columns, [new_columns[-1]]))) data_processed = pd.concat([X, y], axis=1) # data_processed = pd.DataFrame(X) return data_processed, list(set(y_encoded)) import os import scipy.io as scio dataname = 'Authorship' base_url = '/home/fangsensen/AutoFS/data/' data_processed, class_set = prepare_data(dataname, base_url) # print(data_processed) # X_arr = data_processed['0'] # y_arr = data_processed['69'] print(111111,X_arr,2222222,y_arr) expr = parse_expression("I(X,Y)") f = expr_to_callable(expr) score = f(X_arr, y_arr) print(score)