Spaces:
Runtime error
Runtime error
| import sys | |
| import numpy as np | |
| import pandas as pd | |
| import tensorflow as tf | |
| from datasets import Dataset | |
| from sentence_transformers import SentenceTransformer | |
| from tensorflow.keras.models import Model | |
| from tensorflow.keras.layers import Dense, LSTM, Dropout, Input, Concatenate, BatchNormalization | |
| # arguments | |
| # example: python train.py data.csv feedback_type | |
| df_path = sys.argv[1] # path to data file | |
| train_mode = sys.argv[2] # type of model: feedback_type or thematic_area only | |
| def process(examples): | |
| ''' | |
| process(pd.DataFrame) -> Union[List[torch.Tensor], numpy.ndarray, torch.Tensor] | |
| Function encodes texts from dataframe columns "text" and "org" to embeddings. | |
| ''' | |
| tokenized_text = model.encode(examples["text"]) | |
| tokenized_org = model.encode(examples["org"]) | |
| return tokenized_text, tokenized_org | |
| def get_feedback_data(f): | |
| ''' | |
| get_feedback_data(pd.DataFrame) -> pd.DataFrame, List[str] | |
| Function for one hot encoding of feedback types | |
| ''' | |
| f_types = [] | |
| f = f.dropna(subset=['feedback_type']) | |
| for row in f.feedback_type: | |
| f_types += row.split(' | ') | |
| types_list = list(set(f_types)) | |
| for t in types_list: | |
| t_list = [] | |
| for row in f.feedback_type: | |
| if t in row: | |
| t_list.append(1) | |
| else: | |
| t_list.append(0) | |
| f['feedback_' + str(t)] = t_list | |
| label_list = ['feedback_Request', 'feedback_Thanks', 'feedback_Question', 'feedback_Opinion', 'feedback_Concern'] | |
| return f, label_list | |
| def get_area_data(f): | |
| ''' | |
| get_feedback_data(pd.DataFrame) -> pd.DataFrame, List[str] | |
| Function for one hot encoding of thematic area categories | |
| ''' | |
| f_types = [] | |
| f = f.dropna(subset=['thematic_area']) | |
| for row in f.thematic_area: | |
| f_types += row.split(' | ') | |
| types_list = list(set(f_types)) | |
| for t in types_list: | |
| t_list = [] | |
| for row in f.thematic_area: | |
| if t in row: | |
| t_list.append(1) | |
| else: | |
| t_list.append(0) | |
| f['area_' + str(t)] = t_list | |
| label_list = ['area_cross-cutting', 'area_education', 'area_food security', 'area_governance', | |
| 'area_health', 'area_protection', 'area_shelter', 'area_wash'] | |
| return f, label_list | |
| df = pd.read_csv(df_path) #reading data | |
| # check the training mode type | |
| if train_mode == 'feedback_type': | |
| df, label_list = get_feedback_data(df) | |
| elif train_mode == 'thematic_area': | |
| df, label_list = get_feedback_data(df) | |
| else: | |
| print('The script supports "feedback_type" or "thematic_area" modes only.') | |
| # parameters for reproducibility | |
| SEED_VALUE = 13 | |
| tf.random.set_seed(SEED_VALUE) | |
| tf.keras.utils.set_random_seed(SEED_VALUE) | |
| tf.config.experimental.enable_op_determinism() | |
| LABELS_NUM = len(label_list) # detect number of classes | |
| model = SentenceTransformer('distiluse-base-multilingual-cased-v2') # define pretrained model | |
| EMB_DIM = 512 # vector dimensionality | |
| df = df.dropna(subset=['feedback_content', 'organisation']) # drop duplicated texts AND organizations | |
| # dataset processing part | |
| new_df = pd.DataFrame(data=df, columns=label_list) # new dataset with content and classes only | |
| dataset = Dataset.from_dict({"text": df['feedback_content'], "org": df['organisation'], "label": new_df.to_numpy()}) # preparing data columns | |
| # train-validation splitting | |
| dataset = dataset.shuffle(seed=SEED_VALUE) | |
| train, val = dataset.train_test_split(test_size=0.2, seed=SEED_VALUE).values() | |
| # tokenization | |
| train_text, train_org = process(train) | |
| val_text, val_org = process(val) | |
| # training data format preparation | |
| X1_text = tf.reshape(train_text, [-1, 1, EMB_DIM]) | |
| X1_org = tf.reshape(train_org, [-1, 1, EMB_DIM]) | |
| Y1 = tf.reshape(np.array(train['label']), [-1, 1, LABELS_NUM]) | |
| # validation data format preparation | |
| X2_text = tf.reshape(val_text, [-1, 1, EMB_DIM]) | |
| X2_org = tf.reshape(val_org, [-1, 1, EMB_DIM]) | |
| Y2 = tf.reshape(np.array(val['label']), [-1, 1, LABELS_NUM]) | |
| # Defining the model paramaters. | |
| inputA = Input(shape=(1, EMB_DIM, )) | |
| inputB = Input(shape=(1, EMB_DIM, )) | |
| # the first branch operates on the transformer embeddings | |
| x = LSTM(EMB_DIM, input_shape=(1, EMB_DIM), return_sequences=True, dropout=0.1, recurrent_dropout=0.1)(inputA) | |
| x = Dense(EMB_DIM,activation='relu')(x) | |
| x = Dense(256, activation="sigmoid")(x) | |
| x = Dropout(0.5)(x) | |
| x = Dense(128,activation='sigmoid')(x) | |
| x_model = Model(inputs=inputA, outputs=x) | |
| # the first branch operates on the transformer embeddings | |
| y = LSTM(EMB_DIM, input_shape=(1, EMB_DIM), return_sequences=True, dropout=0.1, recurrent_dropout=0.1)(inputB) | |
| y = Dense(EMB_DIM,activation='relu')(y) | |
| y = Dense(256, activation="sigmoid")(y) | |
| y = Dropout(0.5)(y) | |
| y = Dense(128,activation='sigmoid')(y) | |
| y_model = Model(inputs=inputB, outputs=y) | |
| # combine the output of the three branches | |
| combined = Concatenate()([x_model.output, y_model.output]) | |
| # apply a FC layer and then a regression prediction on the combined outputs | |
| combo = BatchNormalization()(combined) | |
| combo1 = Dense(LABELS_NUM, activation="sigmoid")(combo) | |
| # our model will accept the inputs of the two branches and then output a single value | |
| model = Model(inputs=[x_model.inputs, y_model.inputs], outputs=combo1) | |
| # summary of model | |
| model.compile(optimizer='adam', loss='binary_crossentropy', | |
| metrics=[tf.keras.metrics.BinaryAccuracy(name='Acc'), tf.keras.metrics.Precision(name='Prec'), | |
| tf.keras.metrics.Recall(name='Rec'), tf.keras.metrics.AUC(name='AUC')]) | |
| # model training | |
| model.fit([X1_text, X1_org], Y1, validation_data=([X2_text, X2_org], Y2), epochs=20) | |
| # saving of trained model | |
| model.save('models/' + str(train_mode)) |