Spaces:
Runtime error
Runtime error
File size: 5,674 Bytes
182b923 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import sys
import numpy as np
import pandas as pd
import tensorflow as tf
from datasets import Dataset
from sentence_transformers import SentenceTransformer
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, LSTM, Dropout, Input, Concatenate, BatchNormalization
# arguments
# example: python train.py data.csv feedback_type
df_path = sys.argv[1] # path to data file
train_mode = sys.argv[2] # type of model: feedback_type or thematic_area only
def process(examples):
'''
process(pd.DataFrame) -> Union[List[torch.Tensor], numpy.ndarray, torch.Tensor]
Function encodes texts from dataframe columns "text" and "org" to embeddings.
'''
tokenized_text = model.encode(examples["text"])
tokenized_org = model.encode(examples["org"])
return tokenized_text, tokenized_org
def get_feedback_data(f):
'''
get_feedback_data(pd.DataFrame) -> pd.DataFrame, List[str]
Function for one hot encoding of feedback types
'''
f_types = []
f = f.dropna(subset=['feedback_type'])
for row in f.feedback_type:
f_types += row.split(' | ')
types_list = list(set(f_types))
for t in types_list:
t_list = []
for row in f.feedback_type:
if t in row:
t_list.append(1)
else:
t_list.append(0)
f['feedback_' + str(t)] = t_list
label_list = ['feedback_Request', 'feedback_Thanks', 'feedback_Question', 'feedback_Opinion', 'feedback_Concern']
return f, label_list
def get_area_data(f):
'''
get_feedback_data(pd.DataFrame) -> pd.DataFrame, List[str]
Function for one hot encoding of thematic area categories
'''
f_types = []
f = f.dropna(subset=['thematic_area'])
for row in f.thematic_area:
f_types += row.split(' | ')
types_list = list(set(f_types))
for t in types_list:
t_list = []
for row in f.thematic_area:
if t in row:
t_list.append(1)
else:
t_list.append(0)
f['area_' + str(t)] = t_list
label_list = ['area_cross-cutting', 'area_education', 'area_food security', 'area_governance',
'area_health', 'area_protection', 'area_shelter', 'area_wash']
return f, label_list
df = pd.read_csv(df_path) #reading data
# check the training mode type
if train_mode == 'feedback_type':
df, label_list = get_feedback_data(df)
elif train_mode == 'thematic_area':
df, label_list = get_feedback_data(df)
else:
print('The script supports "feedback_type" or "thematic_area" modes only.')
# parameters for reproducibility
SEED_VALUE = 13
tf.random.set_seed(SEED_VALUE)
tf.keras.utils.set_random_seed(SEED_VALUE)
tf.config.experimental.enable_op_determinism()
LABELS_NUM = len(label_list) # detect number of classes
model = SentenceTransformer('distiluse-base-multilingual-cased-v2') # define pretrained model
EMB_DIM = 512 # vector dimensionality
df = df.dropna(subset=['feedback_content', 'organisation']) # drop duplicated texts AND organizations
# dataset processing part
new_df = pd.DataFrame(data=df, columns=label_list) # new dataset with content and classes only
dataset = Dataset.from_dict({"text": df['feedback_content'], "org": df['organisation'], "label": new_df.to_numpy()}) # preparing data columns
# train-validation splitting
dataset = dataset.shuffle(seed=SEED_VALUE)
train, val = dataset.train_test_split(test_size=0.2, seed=SEED_VALUE).values()
# tokenization
train_text, train_org = process(train)
val_text, val_org = process(val)
# training data format preparation
X1_text = tf.reshape(train_text, [-1, 1, EMB_DIM])
X1_org = tf.reshape(train_org, [-1, 1, EMB_DIM])
Y1 = tf.reshape(np.array(train['label']), [-1, 1, LABELS_NUM])
# validation data format preparation
X2_text = tf.reshape(val_text, [-1, 1, EMB_DIM])
X2_org = tf.reshape(val_org, [-1, 1, EMB_DIM])
Y2 = tf.reshape(np.array(val['label']), [-1, 1, LABELS_NUM])
# Defining the model paramaters.
inputA = Input(shape=(1, EMB_DIM, ))
inputB = Input(shape=(1, EMB_DIM, ))
# the first branch operates on the transformer embeddings
x = LSTM(EMB_DIM, input_shape=(1, EMB_DIM), return_sequences=True, dropout=0.1, recurrent_dropout=0.1)(inputA)
x = Dense(EMB_DIM,activation='relu')(x)
x = Dense(256, activation="sigmoid")(x)
x = Dropout(0.5)(x)
x = Dense(128,activation='sigmoid')(x)
x_model = Model(inputs=inputA, outputs=x)
# the first branch operates on the transformer embeddings
y = LSTM(EMB_DIM, input_shape=(1, EMB_DIM), return_sequences=True, dropout=0.1, recurrent_dropout=0.1)(inputB)
y = Dense(EMB_DIM,activation='relu')(y)
y = Dense(256, activation="sigmoid")(y)
y = Dropout(0.5)(y)
y = Dense(128,activation='sigmoid')(y)
y_model = Model(inputs=inputB, outputs=y)
# combine the output of the three branches
combined = Concatenate()([x_model.output, y_model.output])
# apply a FC layer and then a regression prediction on the combined outputs
combo = BatchNormalization()(combined)
combo1 = Dense(LABELS_NUM, activation="sigmoid")(combo)
# our model will accept the inputs of the two branches and then output a single value
model = Model(inputs=[x_model.inputs, y_model.inputs], outputs=combo1)
# summary of model
model.compile(optimizer='adam', loss='binary_crossentropy',
metrics=[tf.keras.metrics.BinaryAccuracy(name='Acc'), tf.keras.metrics.Precision(name='Prec'),
tf.keras.metrics.Recall(name='Rec'), tf.keras.metrics.AUC(name='AUC')])
# model training
model.fit([X1_text, X1_org], Y1, validation_data=([X2_text, X2_org], Y2), epochs=20)
# saving of trained model
model.save('models/' + str(train_mode)) |