| | import os |
| | os.chdir("/data/public/GenNet") |
| | import sys |
| | import glob |
| | import numpy as np |
| | import pandas as pd |
| | |
| | import matplotlib |
| | matplotlib.use('agg') |
| | import tensorflow as tf |
| | import tensorflow.keras as K |
| | import scipy |
| | import tables |
| |
|
| | tf.keras.backend.set_epsilon(0.0000001) |
| | tf_version = tf.__version__ |
| | if tf_version <= '1.13.1': |
| | from GenNet_utils.LocallyDirectedConnected import LocallyDirected1D |
| | print('= or less then 1.13.1: tensorflow version is', tf_version) |
| | elif tf_version >= '2.0': |
| | from GenNet_utils.LocallyDirectedConnected_tf2 import LocallyDirected1D |
| | print('= or more then 2.0: tensorflow version is', tf_version) |
| | else: |
| | print("unexpected tensorflow version") |
| | from GenNet_utils.LocallyDirectedConnected_tf2 import LocallyDirected1D |
| |
|
| | studyname = 'test_GN_ref' |
| |
|
| | def layer_block(model, mask, i, regression): |
| | |
| | if regression: |
| | activation_type="relu" |
| | else: |
| | activation_type="tanh" |
| | |
| | model = LocallyDirected1D(mask=mask, filters=1, input_shape=(mask.shape[0], 1), |
| | name="LocallyDirected_" + str(i))(model) |
| | model = K.layers.Flatten()(model) |
| |
|
| | model = K.layers.Activation(activation_type)(model) |
| | model = K.layers.BatchNormalization(center=False, scale=False)(model) |
| | return model |
| |
|
| |
|
| | def add_covariates(model, input_cov, num_covariates, regression, negative_values_ytrain, mean_ytrain): |
| | if num_covariates > 0: |
| | model = activation_layer(model, regression, negative_values_ytrain) |
| | model = K.layers.concatenate([model, input_cov], axis=1) |
| | model = K.layers.BatchNormalization(center=False, scale=False)(model) |
| | model = K.layers.Dense(units=1, bias_initializer= tf.keras.initializers.Constant(mean_ytrain))(model) |
| | return model |
| |
|
| |
|
| | def activation_layer(model, regression, negative_values_ytrain): |
| | |
| | if regression: |
| | if negative_values_ytrain: |
| | model = K.layers.Activation("linear")(model) |
| | print('using a linear activation function') |
| | else: |
| | model = K.layers.Activation("relu")(model) |
| | print('using a relu activation function') |
| | else: |
| | model = K.layers.Activation("sigmoid")(model) |
| | |
| | return model |
| |
|
| | def create_network_from_npz(datapath, |
| | inputsize, |
| | genotype_path, |
| | l1_value=0.01, |
| | regression=False, |
| | num_covariates=0, |
| | mask_order = []): |
| | print("Creating networks from npz masks") |
| | print("regression", regression) |
| | if regression: |
| | mean_ytrain, negative_values_ytrain = regression_properties(datapath) |
| | else: |
| | mean_ytrain = 0 |
| | negative_values_ytrain = False |
| |
|
| | masks = [] |
| | mask_shapes_x = [] |
| | mask_shapes_y = [] |
| |
|
| | print(mask_order) |
| |
|
| | if len(mask_order) > 0: |
| | for mask in mask_order: |
| | mask = scipy.sparse.load_npz(datapath + '/'+str(mask)+'.npz') |
| | masks.append(mask) |
| | mask_shapes_x.append(mask.shape[0]) |
| | mask_shapes_y.append(mask.shape[1]) |
| |
|
| | for x in range(len(masks) - 1): |
| | assert mask_shapes_y[x] == mask_shapes_x[x + 1] |
| | else: |
| | |
| | for npz_path in glob.glob(datapath + '/*.npz'): |
| | mask = scipy.sparse.load_npz(npz_path) |
| | masks.append(mask) |
| | mask_shapes_x.append(mask.shape[0]) |
| | mask_shapes_y.append(mask.shape[1]) |
| |
|
| | for i in range(len(masks)): |
| | argsort_x = np.argsort(mask_shapes_x)[::-1] |
| | argsort_y = np.argsort(mask_shapes_y)[::-1] |
| |
|
| | mask_shapes_x = np.array(mask_shapes_x) |
| | mask_shapes_y = np.array(mask_shapes_y) |
| | assert all(argsort_x == argsort_y) |
| |
|
| | masks = [masks[i] for i in argsort_y] |
| | mask_shapes_x = mask_shapes_x[argsort_x] |
| | mask_shapes_y = mask_shapes_y[argsort_y] |
| |
|
| | for x in range(len(masks) - 1): |
| | assert mask_shapes_y[x] == mask_shapes_x[x + 1] |
| | print('mask_shapes_x[0]', mask_shapes_x[0]) |
| | assert mask_shapes_x[0] == inputsize |
| | print('mask_shapes_y[-1]', mask_shapes_y[-1]) |
| | if mask_shapes_y[-1] == 1: |
| | all_masks_available = True |
| | else: |
| | all_masks_available = False |
| |
|
| | input_layer = K.Input((inputsize,), name='input_layer') |
| | input_cov = K.Input((num_covariates,), name='inputs_cov') |
| |
|
| | model = K.layers.Reshape(input_shape=(inputsize,), target_shape=(inputsize, 1))(input_layer) |
| | |
| | for i in range(len(masks)): |
| | mask = masks[i] |
| | model = layer_block(model, mask, i, regression) |
| |
|
| | model = K.layers.Flatten()(model) |
| |
|
| | if all_masks_available: |
| | model = LocallyDirected1D(mask=masks[-1], filters=1, input_shape=(mask.shape[0], 1), |
| | name="output_layer")(model) |
| | else: |
| | model = K.layers.Dense(units=1, name="output_layer", |
| | kernel_regularizer=tf.keras.regularizers.l1(l=l1_value) |
| | )(model) |
| |
|
| | model = add_covariates(model, input_cov, num_covariates, regression, negative_values_ytrain, mean_ytrain) |
| |
|
| | output_layer = activation_layer(model, regression, negative_values_ytrain) |
| | model = K.Model(inputs=[input_layer, input_cov], outputs=output_layer) |
| |
|
| | print(model.summary()) |
| |
|
| | return model, masks |
| |
|
| | def get_testdata(datapath): |
| | |
| | h5file = tables.open_file(datapath + studyname + '_genotype_processed.h5', "r") |
| | |
| | |
| | xbatch = h5file.root.data[:] |
| | |
| | h5file.close() |
| | return xbatch |
| |
|
| | def predict(): |
| | xtest = get_testdata(datapath) |
| | pred = model.predict(xtest) |
| | print('model prediction: ', pred) |
| |
|
| | datapath = '/data/public/GenNet/processed_data/' |
| | inputsize = 6986636 |
| | num_covariates = 0 |
| | genotype_path = datapath |
| | l1_value = 0.001 |
| | model, masks = create_network_from_npz(datapath=datapath, inputsize=inputsize, genotype_path=genotype_path,mask_order=['UKBB_sparse_connection_mask_refseq_alligned'], |
| | l1_value=l1_value, regression=False, num_covariates=num_covariates,) |
| | model.load_weights(datapath + 'bestweight_job_diabetes.h5') |
| | print('weights have been loaded') |
| | predict() |