File size: 4,271 Bytes
b5567db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import sympy as sp
import numpy as np
from sklearn.metrics import mutual_info_score
# 符号
import sympy as sp
import pandas as pd

# symbols
X = sp.Symbol("X")
Y = sp.Symbol("Y")
Z = sp.Symbol("Z")

class MI(sp.Function):
    nargs = (2,)

class CMI(sp.Function):
    nargs = (3,)

class II(sp.Function):
    nargs = (3,)   # interaction information

ALLOWED_LOCALS = {
    "X": X,
    "Y": Y,
    "Z": Z,
    "I": MI,      # I(X,Y)
    "CI": CMI,    # I(X,Y|Z) 条件互信息
    "II": II      # I(X;Y;Z)交互信息
}

def parse_expression(expr_str: str) -> sp.Expr:
    """
    String → SymPy Expression
    """
    expr = sp.sympify(expr_str, locals=ALLOWED_LOCALS)
    return expr


def entropy(x):#计算熵
    _, cnt = np.unique(x, return_counts=True)
    p = cnt / cnt.sum()
    return -np.sum(p * np.log(p + 1e-12))

def mi(x, y):#互信息
    return mutual_info_score(x, y)

def cmi(x, y, z):#条件互信息(通过熵的加减计算)
    # I(X;Y|Z) = H(X,Z)+H(Y,Z)-H(Z)-H(X,Y,Z)
    return (
        entropy(np.c_[x, z].tolist())
        + entropy(np.c_[y, z].tolist())
        - entropy(z)
        - entropy(np.c_[x, y, z].tolist())
    )

def interaction_info(x, y, z):#交互信息
    # I(X;Y;Z) = I(X;Y) - I(X;Y|Z)
    return mi(x, y) - cmi(x, y, z)


def expr_to_callable(expr: sp.Expr):

    def eval_node(node, ctx):
        if isinstance(node, MI):
            return mi(eval_node(node.args[0], ctx),
                      eval_node(node.args[1], ctx))

        if isinstance(node, CMI):
            return cmi(eval_node(node.args[0], ctx),
                       eval_node(node.args[1], ctx),
                       eval_node(node.args[2], ctx))

        if isinstance(node, II):
            return interaction_info(
                eval_node(node.args[0], ctx),
                eval_node(node.args[1], ctx),
                eval_node(node.args[2], ctx)
            )

        if node == X:
            return ctx["X"]
        if node == Y:
            return ctx["Y"]
        if node == Z:
            return ctx["Z"]

        if node.is_Number:
            return float(node)

        if node.is_Add:
            return sum(eval_node(arg, ctx) for arg in node.args)

        if node.is_Mul:
            r = 1.0
            for arg in node.args:
                r *= eval_node(arg, ctx)
            return r

        if node.is_Pow:
            base, exp = node.args
            return eval_node(base, ctx) ** eval_node(exp, ctx)

        raise ValueError(f"Unsupported node: {node}")

    def f(X_arr, Y_arr, Z_arr=None):
        ctx = {"X": X_arr, "Y": Y_arr}
        if Z_arr is not None:
            ctx["Z"] = Z_arr
        return eval_node(expr, ctx)

    return f



from sklearn.preprocessing import LabelEncoder
def changetosinge(x):
    return float(x)
# scores = f(X, y, X_other_list)
def prepare_data(dataname, base_url):
    url = os.path.join(base_url, dataname + '.mat')
    data = scio.loadmat(url)
    X0 = pd.DataFrame(data['X'])
    y0 = pd.DataFrame(data['Y'])

    if dataname == 'Dermatology':
        Special = X0.iloc[:, -1]
        a = np.array([item[0] for item in Special])
        label_encoder = LabelEncoder()
        a33 = label_encoder.fit_transform(a)
        X0 = X0.iloc[:, :-1]
        X0[33] = a33

    X0 = X0.applymap(changetosinge)
    y0 = y0.applymap(changetosinge)
    label_encoder = LabelEncoder()
    y_encoded = label_encoder.fit_transform(y0)
    y = pd.DataFrame(y_encoded)
    X = pd.DataFrame()

    for col in X0.columns:
        X[col] = pd.cut(X0[col], bins=5, labels=False)

    new_columns = [str(i) for i in range(X.shape[1] + 1)]
    X = X.rename(columns=dict(zip(X.columns, new_columns[:-1])))
    y = y.rename(columns=dict(zip(y.columns, [new_columns[-1]])))
    data_processed = pd.concat([X, y], axis=1)
    # data_processed = pd.DataFrame(X)

    return data_processed, list(set(y_encoded))

import os
import scipy.io as scio
dataname = 'Authorship'
base_url = '/home/fangsensen/AutoFS/data/'
data_processed, class_set = prepare_data(dataname, base_url)
# print(data_processed)
# X_arr = data_processed['0']
# y_arr = data_processed['69']
print(111111,X_arr,2222222,y_arr)
expr = parse_expression("I(X,Y)")
f = expr_to_callable(expr)
score = f(X_arr, y_arr)
print(score)