Gilmullin Almaz
Refactor code structure for improved readability and maintainability
72a3513
"""Module containing functions for protocol of reaction rules extraction."""
import logging
import pickle
from collections import defaultdict
from itertools import islice
from os.path import splitext
from typing import Dict, List, Set, Tuple
import ray
from chython import smarts
from chython import QueryContainer as QueryContainerChython
from CGRtools.containers.cgr import CGRContainer
from CGRtools.containers.molecule import MoleculeContainer
from CGRtools.containers.query import QueryContainer
from CGRtools.containers.reaction import ReactionContainer
from CGRtools.exceptions import InvalidAromaticRing
from CGRtools.reactor import Reactor
from tqdm import tqdm
from synplan.chem.data.standardizing import RemoveReagentsStandardizer
from synplan.chem.utils import (
reverse_reaction,
cgrtools_to_chython_molecule,
chython_query_to_cgrtools,
)
from synplan.utils.config import RuleExtractionConfig
from synplan.utils.files import ReactionReader
def add_environment_atoms(
cgr: CGRContainer, center_atoms: Set[int], environment_atom_count: int
) -> Set[int]:
"""
Adds environment atoms to the set of center atoms based on the specified depth.
:param cgr: A complete graph representation of a reaction (ReactionContainer
object).
:param center_atoms: A set of atom id corresponding to the center atoms of the
reaction.
:param environment_atom_count: An integer specifying the depth of the environment
around the reaction center to be included. If it's 0, only the reaction center
is included. If it's 1, the first layer of surrounding atoms is included, and so
on.
:return: A set of atom id including the center atoms and their environment atoms up
to the specified depth. If environment_atom_count is 0, the original set of
center atoms is returned unchanged.
"""
if environment_atom_count:
env_cgr = cgr.augmented_substructure(center_atoms, deep=environment_atom_count)
# combine the original center atoms with the new environment atoms
return center_atoms | set(env_cgr)
# if no environment is to be included, return the original center atoms
return center_atoms
def add_functional_groups(
reaction: ReactionContainer,
center_atoms: Set[int],
func_groups_list: List[QueryContainerChython],
) -> Set[int]:
"""
Augments the set of reaction rule atoms with functional groups if specified.
:param reaction: The reaction object (ReactionContainer) from which molecules are
extracted.
:param center_atoms: A set of atom id corresponding to the center atoms of the
reaction.
:param func_groups_list: A list of functional group objects (MoleculeContainer or
QueryContainer) to be considered when including functional groups. These objects
define the structure of the functional groups to be included.
:return: A set of atom id corresponding to the rule atoms, including atoms from the
specified functional groups if include_func_groups is True. If
include_func_groups is False, the original set of center atoms is returned.
"""
rule_atoms = center_atoms.copy()
# iterate over each molecule in the reaction
for molecule in reaction.molecules():
molecule_chython = cgrtools_to_chython_molecule(molecule)
# for each functional group specified in the list
for func_group in func_groups_list:
# find mappings of the functional group in the molecule
for mapping in func_group.get_mapping(molecule_chython):
# remap the functional group based on the found mapping
func_group.remap(mapping)
# if the functional group intersects with center atoms, include it
if set(func_group.atoms_numbers) & center_atoms:
rule_atoms |= set(func_group.atoms_numbers)
# reset the mapping to its original state for the next iteration
func_group.remap({v: k for k, v in mapping.items()})
return rule_atoms
def add_ring_structures(cgr: CGRContainer, rule_atoms: Set[int]) -> Set[int]:
"""
Adds ring structures to the set of rule atoms if they intersect with the reaction
center atoms.
:param cgr: A condensed graph representation of a reaction (CGRContainer object).
:param rule_atoms: A set of atom id corresponding to the center atoms of the
reaction.
:return: A set of atom id corresponding to the original rule atoms and the included
ring structures.
"""
for ring in cgr.sssr:
# check if the current ring intersects with the set of rule atoms
if set(ring) & rule_atoms:
# if the intersection exists, include all atoms in the ring to the rule atoms
rule_atoms |= set(ring)
return rule_atoms
def add_leaving_incoming_groups(
reaction: ReactionContainer,
rule_atoms: Set[int],
keep_leaving_groups: bool,
keep_incoming_groups: bool,
) -> Tuple[Set[int], Dict[str, Set]]:
"""
Identifies and includes leaving and incoming groups to the rule atoms based on
specified flags.
:param reaction: The reaction object (ReactionContainer) from which leaving and
incoming groups are extracted.
:param rule_atoms: A set of atom id corresponding to the center atoms of the
reaction.
:param keep_leaving_groups: A boolean flag indicating whether to include leaving
groups in the rule.
:param keep_incoming_groups: A boolean flag indicating whether to include incoming
groups in the rule.
:return: Updated set of rule atoms including leaving and incoming groups if
specified, and metadata about added groups.
"""
meta_debug = {"leaving": set(), "incoming": set()}
# extract atoms from reactants and products
reactant_atoms = {atom for reactant in reaction.reactants for atom in reactant}
product_atoms = {atom for product in reaction.products for atom in product}
# identify leaving groups (reactant atoms not in products)
if keep_leaving_groups:
leaving_atoms = reactant_atoms - product_atoms
new_leaving_atoms = leaving_atoms - rule_atoms
# include leaving atoms in the rule atoms
rule_atoms |= leaving_atoms
# add leaving atoms to metadata
meta_debug["leaving"] |= new_leaving_atoms
# identify incoming groups (product atoms not in reactants)
if keep_incoming_groups:
incoming_atoms = product_atoms - reactant_atoms
new_incoming_atoms = incoming_atoms - rule_atoms
# Include incoming atoms in the rule atoms
rule_atoms |= incoming_atoms
# Add incoming atoms to metadata
meta_debug["incoming"] |= new_incoming_atoms
return rule_atoms, meta_debug
def clean_molecules(
rule_molecules: List[MoleculeContainer],
reaction_molecules: Tuple[MoleculeContainer],
reaction_center_atoms: Set[int],
atom_retention_details: Dict[str, Dict[str, bool]],
) -> List[QueryContainer]:
"""
Cleans rule molecules by removing specified information about atoms based on
retention details provided.
:param rule_molecules: A list of query container objects representing the rule molecules.
:param reaction_molecules: A list of molecule container objects involved in the reaction.
:param reaction_center_atoms: A set of id corresponding to the atom numbers in the reaction center.
:param atom_retention_details: A dictionary specifying what atom information to retain or remove.
This dictionary should have two keys: "reaction_center" and "environment",
each mapping to another dictionary. The nested dictionaries should have
keys representing atom attributes (like "neighbors", "hybridization",
"implicit_hydrogens", "ring_sizes") and boolean values.
A value of True indicates that the corresponding attribute
should be retained, while False indicates it should be removed from the atom.
:return: A list of QueryContainer objects representing the cleaned rule molecules.
"""
cleaned_rule_molecules = []
for rule_molecule in rule_molecules:
for reaction_molecule in reaction_molecules:
if set(rule_molecule.atoms_numbers) <= set(reaction_molecule.atoms_numbers):
query_reaction_molecule = reaction_molecule.substructure(
reaction_molecule, as_query=True
)
query_rule_molecule = query_reaction_molecule.substructure(
rule_molecule
)
# clean reaction center atoms
if not all(
atom_retention_details["reaction_center"].values()
): # if everything True, we keep all marks
local_reaction_center_atoms = (
set(rule_molecule.atoms_numbers) & reaction_center_atoms
)
for atom_number in local_reaction_center_atoms:
query_rule_molecule = clean_atom(
query_rule_molecule,
atom_retention_details["reaction_center"],
atom_number,
)
# clean environment atoms
if not all(
atom_retention_details["environment"].values()
): # if everything True, we keep all marks
local_environment_atoms = (
set(rule_molecule.atoms_numbers) - reaction_center_atoms
)
for atom_number in local_environment_atoms:
query_rule_molecule = clean_atom(
query_rule_molecule,
atom_retention_details["environment"],
atom_number,
)
cleaned_rule_molecules.append(query_rule_molecule)
break
return cleaned_rule_molecules
def clean_atom(
query_molecule: QueryContainer,
attributes_to_keep: Dict[str, bool],
atom_number: int,
) -> QueryContainer:
"""
Removes specified information from a given atom in a query molecule.
:param query_molecule: The QueryContainer of molecule.
:param attributes_to_keep: Dictionary indicating which attributes to keep in the atom. The keys should be strings
representing the attribute names, and the values should be booleans indicating whether
to retain (True) or remove(False) that attribute. Expected keys are:
- "neighbors": Indicates if neighbors of the atom should be removed.
- "hybridization": Indicates if hybridization information of the atom should be removed.
- "implicit_hydrogens": Indicates if implicit hydrogen information of the atom should be removed.
- "ring_sizes": Indicates if ring size information of the atom should be removed.
:param atom_number: The number of the atom to be modified in the query molecule.
"""
target_atom = query_molecule.atom(atom_number)
if not attributes_to_keep["neighbors"]:
target_atom.neighbors = None
if not attributes_to_keep["hybridization"]:
target_atom.hybridization = None
if not attributes_to_keep["implicit_hydrogens"]:
target_atom.implicit_hydrogens = None
if not attributes_to_keep["ring_sizes"]:
target_atom.ring_sizes = None
return query_molecule
def create_substructures_and_reagents(
reaction: ReactionContainer,
rule_atoms: Set[int],
as_query_container: bool,
keep_reagents: bool,
) -> Tuple[List[MoleculeContainer], List[MoleculeContainer], List]:
"""
Creates substructures for reactants and products, and optionally includes
reagents, based on specified parameters. The function processes the reaction to
create substructures for reactants and products based on the rule atoms. It also
handles the inclusion of reagents based on the keep_reagents flag and converts these
structures to query containers if required.
:param reaction: The reaction object (ReactionContainer) from which to extract substructures.
This object represents a chemical reaction with specified reactants, products, and possibly reagents.
:param rule_atoms: A set of atom id corresponding to the rule atoms. These are used to identify relevant
substructures in reactants and products.
:param as_query_container: A boolean flag indicating whether the substructures should be converted to query containers.
Query containers are used for pattern matching in chemical structures.
:param keep_reagents: A boolean flag indicating whether reagents should be included in the resulting structures.
Reagents are additional substances that are present in the reaction but are not reactants or products.
:return: A tuple containing three elements:
- A list of reactant substructures, each corresponding to a part of the reactants that matches the rule atoms.
- A list of product substructures, each corresponding to a part of the products that matches the rule atoms.
- A list of reagents, included as is or as substructures, depending on the as_query_container flag.
"""
reactant_substructures = [
reactant.substructure(rule_atoms.intersection(reactant.atoms_numbers))
for reactant in reaction.reactants
if rule_atoms.intersection(reactant.atoms_numbers)
]
product_substructures = [
product.substructure(rule_atoms.intersection(product.atoms_numbers))
for product in reaction.products
if rule_atoms.intersection(product.atoms_numbers)
]
reagents = []
if keep_reagents:
if as_query_container:
reagents = [
reagent.substructure(reagent, as_query=True)
for reagent in reaction.reagents
]
else:
reagents = reaction.reagents
return reactant_substructures, product_substructures, reagents
def assemble_final_rule(
reactant_substructures: List[QueryContainer],
product_substructures: List[QueryContainer],
reagents: List,
meta_debug: Dict[str, Set],
keep_metadata: bool,
reaction: ReactionContainer,
) -> ReactionContainer:
"""
Assembles the final reaction rule from the provided substructures and metadata.
This function brings together the various components of a reaction rule, including
reactant and product substructures, reagents, and metadata. It creates a
comprehensive representation of the reaction rule, which can be used for further
processing or analysis.
:param reactant_substructures: A list of substructures derived from the reactants of
the reaction. These substructures represent parts of reactants that are relevant
to the rule.
:param product_substructures: A list of substructures derived from the products of
the reaction. These substructures represent parts of products that are relevant
to the rule.
:param reagents: A list of reagents involved in the reaction. These may be included
as-is or as substructures, depending on earlier processing steps.
:param meta_debug: A dictionary containing additional metadata about the reaction,
such as leaving and incoming groups.
:param keep_metadata: A boolean flag indicating whether to retain the metadata
associated with the reaction in the rule.
:param reaction: The original reaction object (ReactionContainer) from which the
rule is being created.
:return: A ReactionContainer object representing the assembled reaction rule. This
container includes the reactant and product substructures, reagents, and any
additional metadata if keep_metadata is True.
"""
rule_metadata = meta_debug if keep_metadata else {}
rule_metadata.update(reaction.meta if keep_metadata else {})
rule = ReactionContainer(
reactant_substructures, product_substructures, reagents, rule_metadata
)
if keep_metadata:
rule.name = reaction.name
rule.flush_cache()
return rule
def validate_rule(rule: ReactionContainer, reaction: ReactionContainer) -> bool:
"""
Validates a reaction rule by ensuring it can correctly generate the products from
the reactants. The function uses a chemical reactor to simulate the reaction based
on the provided rule. It then compares the products generated by the simulation with
the actual products of the reaction. If they match, the rule is considered valid. If
not, a ValueError is raised, indicating an issue with the rule.
:param rule: The reaction rule to be validated. This is a ReactionContainer object
representing a chemical reaction rule, which includes the necessary information
to perform a reaction.
:param reaction: The original reaction object (ReactionContainer) against which the
rule is to be validated. This object contains the actual reactants and products
of the reaction.
:return: The validated rule if the rule correctly generates the products from the
reactants.
:raises ValueError: If the rule does not correctly generate the products from the
reactants, indicating an incorrect or incomplete rule.
"""
# create a reactor with the given rule
reactor = Reactor(rule)
try:
for result_reaction in reactor(reaction.reactants):
result_products = []
for result_product in result_reaction.products:
tmp = result_product.copy()
try:
tmp.kekule()
if tmp.check_valence():
continue
except InvalidAromaticRing:
continue
result_products.append(result_product)
if set(reaction.products) == set(result_products) and len(
reaction.products
) == len(result_products):
return True
except (KeyError, IndexError):
# KeyError - iteration over reactor is finished and products are different from the original reaction
# IndexError - mistake in __contract_ions, possibly problems with charges in reaction rule
return False
return False
def create_rule(
config: RuleExtractionConfig, reaction: ReactionContainer
) -> ReactionContainer:
"""
Creates a reaction rule from a given reaction based on the specified
configuration. The function processes the reaction to create a rule that matches the
configuration settings. It handles the inclusion of environmental atoms, functional
groups, ring structures, and leaving and incoming groups. It also constructs
substructures for reactants, products, and reagents, and cleans molecule
representations if required. Optionally, it validates the rule using a reactor.
:param config: An instance of ExtractRuleConfig, containing various settings that
determine how the rule is created, such as environmental atom count, inclusion
of functional groups, rings, leaving and incoming groups, and other parameters.
:param reaction: The reaction object (ReactionContainer) from which to create the
rule. This object represents a chemical reaction with specified reactants,
products, and possibly reagents.
:return: A ReactionContainer object representing the extracted reaction rule. This
rule includes various elements of the reaction as specified by the
configuration, such as reaction centers, environmental atoms, functional groups,
and others.
"""
# 1. create reaction CGR
cgr = ~reaction
center_atoms = set(cgr.center_atoms)
# 2. add atoms of reaction environment based on config settings
center_atoms = add_environment_atoms(
cgr, center_atoms, config.environment_atom_count
)
# 3. include functional groups in the rule if specified in config
if config.include_func_groups and config.func_groups_list:
rule_atoms = add_functional_groups(
reaction, center_atoms, config.func_groups_list
)
else:
rule_atoms = center_atoms.copy()
# 4. include ring structures in the rule if specified in config
if config.include_rings:
rule_atoms = add_ring_structures(cgr, rule_atoms)
# 5. add leaving and incoming groups to the rule based on config settings
rule_atoms, meta_debug = add_leaving_incoming_groups(
reaction, rule_atoms, config.keep_leaving_groups, config.keep_incoming_groups
)
# 6. create substructures for reactants, products, and reagents
reactant_substructures, product_substructures, reagents = (
create_substructures_and_reagents(
reaction, rule_atoms, config.as_query_container, config.keep_reagents
)
)
# 7. clean atom marks in the molecules if they are being converted to query containers
if config.as_query_container:
reactant_substructures = clean_molecules(
reactant_substructures,
reaction.reactants,
center_atoms,
config.atom_info_retention,
)
product_substructures = clean_molecules(
product_substructures,
reaction.products,
center_atoms,
config.atom_info_retention,
)
# 8. assemble the final rule including metadata if specified
rule = assemble_final_rule(
reactant_substructures,
product_substructures,
reagents,
meta_debug,
config.keep_metadata,
reaction,
)
# 9. reverse extracted reaction rule and reaction
if config.reverse_rule:
rule = reverse_reaction(rule)
reaction = reverse_reaction(reaction)
# 10. validate the rule using a reactor if validation is enabled in config
if config.reactor_validation:
if validate_rule(rule, reaction):
rule.meta["reactor_validation"] = "passed"
else:
rule.meta["reactor_validation"] = "failed"
return rule
def extract_rules(
config: RuleExtractionConfig, reaction: ReactionContainer
) -> List[ReactionContainer]:
"""
Extracts reaction rules from a given reaction based on the specified
configuration.
:param config: An instance of ExtractRuleConfig, which contains various
configuration settings for rule extraction, such as whether to include
multicenter rules, functional groups, ring structures, leaving and incoming
groups, etc.
:param reaction: The reaction object (ReactionContainer) from which to extract
rules. The reaction object represents a chemical reaction with specified
reactants, products, and possibly reagents.
:return: A list of ReactionContainer objects, each representing a distinct reaction
rule. If config.multicenter_rules is True, a single rule encompassing all
reaction centers is returned. Otherwise, separate rules for each reaction center
are extracted, up to a maximum of 15 distinct centers.
"""
standardizer = (
RemoveReagentsStandardizer()
) # reagents are needed if they are the part of reaction rule specification
reaction = standardizer(reaction)
if config.multicenter_rules:
# extract a single rule encompassing all reaction centers
return [create_rule(config, reaction)]
# extract separate rules for each distinct reaction center
distinct_rules = set()
for center_reaction in islice(reaction.enumerate_centers(), 15):
single_rule = create_rule(config, center_reaction)
distinct_rules.add(single_rule)
return list(distinct_rules)
@ray.remote
def process_reaction_batch(
batch: List[Tuple[int, ReactionContainer]], config: RuleExtractionConfig
) -> List[Tuple[int, List[ReactionContainer]]]:
"""
Processes a batch of reactions to extract reaction rules based on the given
configuration. This function operates as a remote task in a distributed system using
Ray. It takes a batch of reactions, where each reaction is paired with an index. For
each reaction in the batch, it extracts reaction rules as specified by the
configuration object. The extracted rules for each reaction are then returned along
with the corresponding index. This function is intended to be used in a distributed
manner with Ray to parallelize the rule extraction process across multiple
reactions.
:param batch: A list where each element is a tuple containing an index (int) and a
ReactionContainer object. The index is typically used to keep track of the
reaction's position in a larger dataset.
:param config: An instance of ExtractRuleConfig that provides settings and
parameters for the rule extraction process.
:return: A list where each element is a tuple. The first element of the tuple is an
index (int), and the second is a list of ReactionContainer objects representing
the extracted rules for the corresponding reaction.
"""
extracted_rules_list = []
for index, reaction in batch:
try:
extracted_rules = extract_rules(config, reaction)
extracted_rules_list.append((index, extracted_rules))
except Exception as e:
logging.debug(e)
continue
return extracted_rules_list
def process_completed_batch(
futures: Dict,
rules_statistics: Dict,
) -> None:
"""
Processes completed batches of reactions, updating the rules statistics and
writing rules to a file. This function waits for the completion of a batch of
reactions processed in parallel (using Ray), updates the statistics for each
extracted rule, and writes the rules to a result file if they are new. It also
updates the progress bar with the size of the processed batch.
:param futures: A dictionary of futures representing ongoing batch processing tasks.
:param rules_statistics: A dictionary to keep track of statistics for each rule.
:return: None
"""
ready_id, running_id = ray.wait(list(futures.keys()), num_returns=1)
completed_batch = ray.get(ready_id[0])
for index, extracted_rules in completed_batch:
for rule in extracted_rules:
prev_stats_len = len(rules_statistics)
rules_statistics[rule].append(index)
if len(rules_statistics) != prev_stats_len:
rule.meta["first_reaction_index"] = index
del futures[ready_id[0]]
def sort_rules(
rules_stats: Dict, min_popularity: int, single_reactant_only: bool
) -> List[Tuple[ReactionContainer, List[int]]]:
"""
Sorts reaction rules based on their popularity and validation status. This
function sorts the given rules according to their popularity (i.e., the number of
times they have been applied) and filters out rules that haven't passed reactor
validation or are less popular than the specified minimum popularity threshold.
:param rules_stats: A dictionary where each key is a reaction rule and the value is
a list of integers. Each integer represents an index where the rule was applied.
:type rules_stats: The number of occurrence of the reaction rules.
:param min_popularity: The minimum number of times a rule must be applied to be
considered. Default is 3.
:type min_popularity: The minimum number of occurrence of the reaction rule to be
selected.
:param single_reactant_only: Whether to keep only reaction rules with a single
molecule on the right side of reaction arrow. Default is True.
:return: A list of tuples, where each tuple contains a reaction rule and a list of
indices representing the rule's applications. The list is sorted in descending
order of the rule's popularity.
"""
return sorted(
(
(rule, indices)
for rule, indices in rules_stats.items()
if len(indices) >= min_popularity
and rule.meta["reactor_validation"] == "passed"
and (not single_reactant_only or len(rule.reactants) == 1)
),
key=lambda x: -len(x[1]),
)
def extract_rules_from_reactions(
config: RuleExtractionConfig,
reaction_data_path: str,
reaction_rules_path: str,
num_cpus: int,
batch_size: int,
) -> None:
"""
Extracts reaction rules from a set of reactions based on the given configuration.
This function initializes a Ray environment for distributed computing and processes
each reaction in the provided reaction database to extract reaction rules. It
handles the reactions in batches, parallelize the rule extraction process. Extracted
rules are written to RDF files and their statistics are recorded. The function also
sorts the rules based on their popularity and saves the sorted rules.
:param config: Configuration settings for rule extraction, including file paths,
batch size, and other parameters.
:param reaction_data_path: Path to the file containing reaction database.
:param reaction_rules_path: Name of the file to store the extracted rules.
:param num_cpus: Number of CPU cores to use for processing. Defaults to 1.
:param batch_size: Number of reactions to process in each batch. Defaults to 10.
:return: None
"""
ray.init(num_cpus=num_cpus, ignore_reinit_error=True, logging_level=logging.ERROR)
reaction_rules_path, _ = splitext(reaction_rules_path)
with ReactionReader(reaction_data_path) as reactions:
futures = {}
batch = []
max_concurrent_batches = num_cpus
extracted_rules_and_statistics = defaultdict(list)
for index, reaction in tqdm(
enumerate(reactions),
desc="Number of reactions processed: ",
bar_format="{desc}{n} [{elapsed}]",
):
# reaction ready to use
batch.append((index, reaction))
if len(batch) == batch_size:
future = process_reaction_batch.remote(batch, config)
futures[future] = None
batch = []
while len(futures) >= max_concurrent_batches:
process_completed_batch(
futures,
extracted_rules_and_statistics,
)
if batch:
future = process_reaction_batch.remote(batch, config)
futures[future] = None
while futures:
process_completed_batch(
futures,
extracted_rules_and_statistics,
)
sorted_rules = sort_rules(
extracted_rules_and_statistics,
min_popularity=config.min_popularity,
single_reactant_only=config.single_reactant_only,
)
ray.shutdown()
with open(f"{reaction_rules_path}.pickle", "wb") as statistics_file:
pickle.dump(sorted_rules, statistics_file)
print(f"Number of extracted reaction rules: {len(sorted_rules)}")