BioMedGPT-Mol / evaluation /utils /mumoinstruct_metrics.py
leofansq's picture
update for evaluation
3824ea0 verified
from collections import defaultdict
from admet_ai import ADMETModel
import networkx as nx
import rdkit
from rdkit import Chem, DataStructs
from rdkit.Chem import AllChem, Descriptors
import rdkit.Chem.QED as QED
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.*')
# import sascorer as sascorer
#
# calculation of synthetic accessibility score as described in:
#
# Estimation of Synthetic Accessibility Score of Drug-like Molecules based on Molecular Complexity and Fragment Contributions
# Peter Ertl and Ansgar Schuffenhauer
# Journal of Cheminformatics 1:8 (2009)
# http://www.jcheminf.com/content/1/1/8
#
# several small modifications to the original paper are included
# particularly slightly different formula for marocyclic penalty
# and taking into account also molecule symmetry (fingerprint density)
#
# for a set of 10k diverse molecules the agreement between the original method
# as implemented in PipelinePilot and this implementation is r2 = 0.97
#
# peter ertl & greg landrum, september 2013
#
from rdkit.Chem import rdMolDescriptors
import pickle
import math
import os.path as op
_fscores = None
def readFragmentScores(name='fpscores'):
import gzip
global _fscores
# generate the full path filename:
if name == "fpscores":
name = op.join(op.dirname(__file__), name)
_fscores = pickle.load(gzip.open('%s.pkl.gz' % name))
outDict = {}
for i in _fscores:
for j in range(1, len(i)):
outDict[i[j]] = float(i[0])
_fscores = outDict
def numBridgeheadsAndSpiro(mol, ri=None):
nSpiro = rdMolDescriptors.CalcNumSpiroAtoms(mol)
nBridgehead = rdMolDescriptors.CalcNumBridgeheadAtoms(mol)
return nBridgehead, nSpiro
def calculateScore(m):
if _fscores is None:
readFragmentScores()
# fragment score
fp = rdMolDescriptors.GetMorganFingerprint(m,
2) # <- 2 is the *radius* of the circular fingerprint
fps = fp.GetNonzeroElements()
score1 = 0.
nf = 1e-6
for bitId, v in fps.items():
nf += v
sfp = bitId
score1 += _fscores.get(sfp, -4) * v
score1 /= nf
# features score
nAtoms = m.GetNumAtoms()
nChiralCenters = len(Chem.FindMolChiralCenters(m, includeUnassigned=True))
ri = m.GetRingInfo()
nBridgeheads, nSpiro = numBridgeheadsAndSpiro(m, ri)
nMacrocycles = 0
for x in ri.AtomRings():
if len(x) > 8:
nMacrocycles += 1
sizePenalty = nAtoms**1.005 - nAtoms
stereoPenalty = math.log10(nChiralCenters + 1)
spiroPenalty = math.log10(nSpiro + 1)
bridgePenalty = math.log10(nBridgeheads + 1)
macrocyclePenalty = 0.
# ---------------------------------------
# This differs from the paper, which defines:
# macrocyclePenalty = math.log10(nMacrocycles+1)
# This form generates better results when 2 or more macrocycles are present
if nMacrocycles > 0:
macrocyclePenalty = math.log10(2)
score2 = 0. - sizePenalty - stereoPenalty - spiroPenalty - bridgePenalty - macrocyclePenalty
# correction for the fingerprint density
# not in the original publication, added in version 1.1
# to make highly symmetrical molecules easier to synthetise
score3 = 0.
if nAtoms > len(fps):
score3 = math.log(float(nAtoms) / len(fps)) * .5
sascore = score1 + score2 + score3
# need to transform "raw" value into scale between 1 and 10
min = -4.0
max = 2.5
sascore = 11. - (sascore - min + 1) / (max - min) * 9.
# smooth the 10-end
if sascore > 8.:
sascore = 8. + math.log(sascore + 1. - 9.)
if sascore > 10.:
sascore = 10.0
elif sascore < 1.:
sascore = 1.0
return sascore
#
# Copyright (c) 2013, Novartis Institutes for BioMedical Research Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Novartis Institutes for BioMedical Research Inc.
# nor the names of its contributors may be used to endorse or promote
# products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# import drd2_scorer as drd2_scorer
import numpy as np
from rdkit import rdBase
from sklearn import svm
import sys
# 创建一个虚拟模块路径
sys.modules['sklearn.svm.classes'] = svm
import re
rdBase.DisableLog('rdApp.error')
"""Scores based on an ECFP classifier for activity."""
clf_model = None
def load_model():
global clf_model
name = op.join(op.dirname(__file__), 'clf_py36.pkl')
with open(name, "rb") as f:
clf_model = pickle.load(f)
def get_score(smile):
if clf_model is None:
load_model()
mol = Chem.MolFromSmiles(smile)
if mol:
fp = fingerprints_from_mol(mol)
score = clf_model.predict_proba(fp)[:, 1]
return float(score)
return 0.0
def fingerprints_from_mol(mol):
fp = AllChem.GetMorganFingerprint(mol, 3, useCounts=True, useFeatures=True)
size = 2048
nfp = np.zeros((1, size), np.int32)
for idx,v in fp.GetNonzeroElements().items():
nidx = idx%size
nfp[0, nidx] += int(v)
return nfp
import pandas as pd
def similarity(a, b):
if a is None or b is None:
return 0.0
amol = Chem.MolFromSmiles(a)
bmol = Chem.MolFromSmiles(b)
if amol is None or bmol is None:
return 0.0
fp1 = AllChem.GetMorganFingerprintAsBitVect(amol, 2, nBits=2048, useChirality=False)
fp2 = AllChem.GetMorganFingerprintAsBitVect(bmol, 2, nBits=2048, useChirality=False)
return DataStructs.TanimotoSimilarity(fp1, fp2)
def drd2(s):
if s is None: return 0.0
if Chem.MolFromSmiles(s) is None:
return 0.0
# return drd2_scorer.get_score(s)
return get_score(s)
def qed(s):
if s is None: return 0.0
mol = Chem.MolFromSmiles(s)
if mol is None: return 0.0
return QED.qed(mol)
def sas(s):
if s is None: return 0.0
mol = Chem.MolFromSmiles(s)
if mol is None: return 0.0
# return sascorer.calculateScore(mol)
return calculateScore(mol)
# Modified from https://github.com/bowenliu16/rl_graph_generation
def penalized_logp(s):
if s is None: return -100.0
mol = Chem.MolFromSmiles(s)
if mol is None: return -100.0
logP_mean = 2.4570953396190123
logP_std = 1.434324401111988
SA_mean = -3.0525811293166134
SA_std = 0.8335207024513095
cycle_mean = -0.0485696876403053
cycle_std = 0.2860212110245455
log_p = Descriptors.MolLogP(mol)
# SA = -sascorer.calculateScore(mol)
SA = -calculateScore(mol)
# cycle score
cycle_list = nx.cycle_basis(nx.Graph(Chem.rdmolops.GetAdjacencyMatrix(mol)))
if len(cycle_list) == 0:
cycle_length = 0
else:
cycle_length = max([len(j) for j in cycle_list])
if cycle_length <= 6:
cycle_length = 0
else:
cycle_length = cycle_length - 6
cycle_score = -cycle_length
normalized_log_p = (log_p - logP_mean) / logP_std
normalized_SA = (SA - SA_mean) / SA_std
normalized_cycle = (cycle_score - cycle_mean) / cycle_std
return normalized_log_p + normalized_SA + normalized_cycle
def smiles2D(s):
mol = Chem.MolFromSmiles(s)
return Chem.MolToSmiles(mol)
# Function to canonicalize SMILES
def canonicalize_smiles(smiles):
try:
mol = Chem.MolFromSmiles(smiles)
return Chem.MolToSmiles(mol, canonical=True)
except:
return None
def generate_props(smiles):
"""
Generates properties using the ADMET model
:param smiles: list(list(str))
:return props_df: pandas df
"""
smi = list()
for i in smiles:
smi.extend(i)
# remove duplicates
smi = set(smi)
print(f"Number of preprocessed SMILES: {len(smi)}")
smi = [canonicalize_smiles(s) for s in smi]
smi = [s for s in smi if s is not None]
print(f"Number of postprocessed SMILES: {len(smiles)}")
# compute properties
model = ADMETModel(num_workers=4)
props_df = model.predict(smi)
props_df = pd.DataFrame(props_df)
props_df.reset_index(inplace=True)
# rename BBB_Martins to BBBP
props_df.rename(columns={'index': 'smiles', 'BBB_Martins': 'bbbp', 'AMES': 'mutagenicity', 'HIA_Hou': 'hia'}, inplace=True)
props_df = props_df[['smiles', 'bbbp', 'mutagenicity', 'hia']].round(2)
# compute plog, qed, drd2, sas for the optimized smiles and merge with previous propertie
props_df.set_index('smiles', inplace=True)
props = defaultdict(dict)
for s in smi:
for col in props_df.columns:
props[s][col] = props_df.loc[s][col]
if 'plogp' not in props[s]:
props[s]['plogp'] = penalized_logp(s)
if 'qed' not in props[s]:
props[s]['qed'] = qed(s)
# if 'drd2' not in props[s]:
# props[s]['drd2'] = drd2(s)
if 'sas' not in props[s]:
props[s]['sas'] = sas(s)
# save the props as a dataframe
props_df = pd.DataFrame(props).T
props_df = props_df.reset_index().rename(columns={'index': 'smiles',
'BBBP': 'bbbp', 'AMES': 'mutagenicity',
'HIA_Hou': 'hia'})
props_df = props_df.round(2)
return props_df
# props_df.to_csv(props_path, index=False)
def pair_similarity(amol, bmol, sim_type):
if amol is None or bmol is None:
return 0.0
if isinstance(amol, str):
amol = Chem.MolFromSmiles(amol)
if isinstance(bmol, str):
bmol = Chem.MolFromSmiles(bmol)
if amol is None or bmol is None:
return 0.0
if sim_type == "binary":
fp1 = AllChem.GetMorganFingerprintAsBitVect(amol, 2, nBits=2048, useChirality=False)
fp2 = AllChem.GetMorganFingerprintAsBitVect(bmol, 2, nBits=2048, useChirality=False)
else:
fp1 = AllChem.GetMorganFingerprint(amol, 2, useChirality=False)
fp2 = AllChem.GetMorganFingerprint(bmol, 2, useChirality=False)
sim = DataStructs.TanimotoSimilarity(fp1, fp2)
return sim
def compute_FP_sim(smiles):
# assume there might be invalid smiles
mols = []
for smi in smiles:
mol = Chem.MolFromSmiles(smi)
if mol:
mols.append(mol)
sim = np.zeros((len(mols), len(mols)))
for i in range(len(mols)):
for j in range(i, len(mols)):
sim[i, j] = sim[j, i] = pair_similarity(mols[i], mols[j], "binary")
return sim
def normalize_prop(val, min_val, max_val):
normalized_val = (val - min_val) / (max_val - min_val)
return max(0, min(1, normalized_val))
def find_best_optimized(input_prop, output_prop, property):
"""
Find the best optimized SMILES that satisfies constraints and maximizes improvement.
Args:
input_prop (dict): Properties of the input molecule.
output_prop (dict): Properties of the optimized molecules.
property (list): List of properties to optimize.
Returns:
int: Index of the best optimized molecule.
"""
best_idx = None
best_improvement = -1
num_candidates = len(output_prop[property[0]])
for i in range(num_candidates):
improvement = 0
# Skip candidate if the change in property is in the wrong direction
wrong_direction = False
for prop in property:
input_val = input_prop[prop]
output_val = output_prop[prop][i]
#print(i, prop, input_val, output_val)
if prop == "mutagenicity":
if output_val >= input_val: # Mutagenicity must decrease
wrong_direction = True
break
else:
if output_val <= input_val: # Other properties must increase
wrong_direction = True
break
#print(wrong_direction)
if wrong_direction:
continue # Skip this candidate
for prop in property:
input_val = input_prop[prop]
output_val = output_prop[prop][i]
if input_val == 0:
improvement += output_val
else:
improvement += (output_val - input_val) / abs(input_val) if prop != 'mutagenicity' else (input_val - output_val) / abs(input_val)
#print(i, prop, input_prop[prop], output_prop[prop][i], improvement)
if improvement >= best_improvement:
best_improvement = improvement
best_idx = i
#print(f"Best improvement: {best_improvement}, index: {best_idx}")
return best_idx
def compute_metrics(input_smiles, input_props, output_smiles, output_props_df, task,
normalize = {'plogp': (-20, 10)}):
"""
Compute the metrics for the optimization task.
Args:
input_smiles: list(str)
input_props: list(dict)
output_smiles: list(list(str))
props_df: pandas df
task: str
"""
# SR is the fraction of molecules that have properties either decreased or increased as per the prompt
# measure novelty as the fraction of valid and successful molecules that are not in the training set
# measure similarity with input smiles as the average Tanimoto similarity between the input and the successful optimized molecules
# measure diversity as the average pairwise Tanimoto similarity among the successful optimized molecules
# measure synthetic accessibility as the average SAS score of the successful optimized molecules
# measure change percentage as the average percentage change in the properties of the successful and valid molecules
property = task.split('+')
output_props_df.set_index('smiles', inplace=True)
SR = diversity = 0
SAS = []
num_invalid = 0
num_seen_smiles = 0
most_optimized_smiles = []
perc_change = defaultdict(list)
norm_perc_change = defaultdict(list)
avg_change = defaultdict(list)
avg_value = defaultdict(list)
composite_change = []
norm_composite_change = []
similarity_with_input = []
num_inputs = len(input_smiles)
# save the most optimized smiles and their properties
most_optimized_smiles_props = []
# unseen_smiles = set()
# compute properties for all smiles for each input
for i, opt_smiles in enumerate(output_smiles):
# if the row is empty, there were no smiles parsed by the process-output.ipynb
if len(opt_smiles)==0:
num_invalid += 1
continue
# otherwise, they were parseable, but could still be invalid based on rdkit
all_invalid_gen = True
output_props = defaultdict(list)
for smile in opt_smiles:
# assume that the properties were precomputed for all output smiles
# hence, if the smile is not in the properties dataframe, then prob it is invalid
if smile not in output_props_df.index:
# if smile != 'Invalid':
# #print(i, smile)
# unseen_smiles.add(smile)
continue
# if there was any one valid generation out of at most 20 opt_smiles generated
all_invalid_gen = False
for prop in property:
output_props[prop].append(output_props_df.loc[smile, prop])
output_props['sim'].append(pair_similarity(input_smiles[i], smile, "binary"))
num_invalid += 1 if all_invalid_gen else 0
# find the most optimized smiles based on the most improvement that satisfies the constraints
best_idx = find_best_optimized(input_props[i], output_props, property)
if best_idx is None:
continue
all_props_desired = True
# increase the success count if the best optimized smile has better properties
for j, prop in enumerate(property):
#change = test_data[i]['properties'][prop]['change']
change = -1 if prop == 'mutagenicity' else 1
if np.sign(output_props[prop][best_idx] - input_props[i][prop]) != np.sign(change):
all_props_desired = False
break
# find_best_optimized should return the best optimized molecule that satisfies the constraints and
# has the most improvement in the properties
assert all_props_desired
# if all_props_desired:
# print_str = f"{i}, {input_smiles[i]}"
# for prop in output_props.keys():
# print_str += f", {prop}: ({input_props[i].get(prop, 0)}, {output_props[prop][best_idx]})"
# print(print_str)
if all_props_desired:
most_optimized_smiles.append(opt_smiles[best_idx])
most_optimized_smiles_props.append({'opt_smiles': opt_smiles[best_idx],
'source_smiles': input_smiles[i],
'source_properties': input_props[i],
'optimized_properties': {prop: output_props[prop][best_idx] for prop in property}})
#SR += 1
# if opt_smiles[best_idx] in seen_smiles:
# num_seen_smiles += 1
similarity_with_input.append(output_props['sim'][best_idx])
try:
SAS.append(sas(opt_smiles[best_idx]))
except:
pass
for prop in property:
avg_value[prop].append(output_props[prop][best_idx])
input_val = input_props[i].get(prop, 0)
output_val = output_props[prop][best_idx]
# we can compute the absolute change in the property because
# we already ensured that this is optimized
avg_change[prop].append(abs(output_val - input_val))
if input_val == 0:
perc_change[prop].append((output_val - 0.1)/0.1)
else:
perc_change[prop].append(abs(input_val - output_val) / abs(input_val))
# for percentage change, compute an unnormalized and normalized change
if normalize and prop in normalize:
input_val = normalize_prop(input_val, *normalize[prop])
output_val = normalize_prop(output_val, *normalize[prop])
if input_val == 0:
norm_perc_change[prop].append((output_val - 0.1)/0.1)
else:
norm_perc_change[prop].append(abs(input_val - output_val) / abs(input_val))
composite_change.append(np.mean([perc_change[prop][-1] for prop in property]))
if normalize:
norm_composite_change.append(np.mean([norm_perc_change[prop][-1] for prop in property]))
fp_sim = compute_FP_sim(most_optimized_smiles)
diversity = 1 - np.mean(fp_sim[np.triu_indices(len(fp_sim), k=1)])
num_success = len(most_optimized_smiles)
# num_unseen = num_success - num_seen_smiles
num_valid = num_inputs - num_invalid
#print(len(unseen_smiles))
SR = num_success / num_inputs
SR_V = num_success / num_valid if num_valid else 0
validity = num_valid / num_inputs
ret = {
"SR": SR*100,
"Sim": np.mean(similarity_with_input),
"RI": np.mean(composite_change),
"Val": validity*100,
"SR (V)": SR_V*100,
# "Nov": (num_unseen / num_success) * 100 if num_success else 0,
"Div": diversity,
"SAS": np.mean(SAS),
"Norm RI": np.nanmean(norm_composite_change)
}
for prop in property:
ret[f"{prop}-APS"] = np.mean(avg_value[prop])
ret[f"{prop}-impv"] = np.mean(avg_change[prop])
ret[f"{prop}-impv%"] = np.mean(perc_change[prop]) * 100
if normalize:
ret[f"{prop}-impv(n)%"] = np.mean(norm_perc_change[prop]) * 100
return ret, most_optimized_smiles_props