Spaces:
Sleeping
Sleeping
| """Module containing functions for protocol of reaction rules extraction.""" | |
| import logging | |
| import pickle | |
| from collections import defaultdict | |
| from itertools import islice | |
| from os.path import splitext | |
| from typing import Dict, List, Set, Tuple | |
| import ray | |
| from chython import smarts | |
| from chython import QueryContainer as QueryContainerChython | |
| from CGRtools.containers.cgr import CGRContainer | |
| from CGRtools.containers.molecule import MoleculeContainer | |
| from CGRtools.containers.query import QueryContainer | |
| from CGRtools.containers.reaction import ReactionContainer | |
| from CGRtools.exceptions import InvalidAromaticRing | |
| from CGRtools.reactor import Reactor | |
| from tqdm import tqdm | |
| from synplan.chem.data.standardizing import RemoveReagentsStandardizer | |
| from synplan.chem.utils import ( | |
| reverse_reaction, | |
| cgrtools_to_chython_molecule, | |
| chython_query_to_cgrtools, | |
| ) | |
| from synplan.utils.config import RuleExtractionConfig | |
| from synplan.utils.files import ReactionReader | |
| def add_environment_atoms( | |
| cgr: CGRContainer, center_atoms: Set[int], environment_atom_count: int | |
| ) -> Set[int]: | |
| """ | |
| Adds environment atoms to the set of center atoms based on the specified depth. | |
| :param cgr: A complete graph representation of a reaction (ReactionContainer | |
| object). | |
| :param center_atoms: A set of atom id corresponding to the center atoms of the | |
| reaction. | |
| :param environment_atom_count: An integer specifying the depth of the environment | |
| around the reaction center to be included. If it's 0, only the reaction center | |
| is included. If it's 1, the first layer of surrounding atoms is included, and so | |
| on. | |
| :return: A set of atom id including the center atoms and their environment atoms up | |
| to the specified depth. If environment_atom_count is 0, the original set of | |
| center atoms is returned unchanged. | |
| """ | |
| if environment_atom_count: | |
| env_cgr = cgr.augmented_substructure(center_atoms, deep=environment_atom_count) | |
| # combine the original center atoms with the new environment atoms | |
| return center_atoms | set(env_cgr) | |
| # if no environment is to be included, return the original center atoms | |
| return center_atoms | |
| def add_functional_groups( | |
| reaction: ReactionContainer, | |
| center_atoms: Set[int], | |
| func_groups_list: List[QueryContainerChython], | |
| ) -> Set[int]: | |
| """ | |
| Augments the set of reaction rule atoms with functional groups if specified. | |
| :param reaction: The reaction object (ReactionContainer) from which molecules are | |
| extracted. | |
| :param center_atoms: A set of atom id corresponding to the center atoms of the | |
| reaction. | |
| :param func_groups_list: A list of functional group objects (MoleculeContainer or | |
| QueryContainer) to be considered when including functional groups. These objects | |
| define the structure of the functional groups to be included. | |
| :return: A set of atom id corresponding to the rule atoms, including atoms from the | |
| specified functional groups if include_func_groups is True. If | |
| include_func_groups is False, the original set of center atoms is returned. | |
| """ | |
| rule_atoms = center_atoms.copy() | |
| # iterate over each molecule in the reaction | |
| for molecule in reaction.molecules(): | |
| molecule_chython = cgrtools_to_chython_molecule(molecule) | |
| # for each functional group specified in the list | |
| for func_group in func_groups_list: | |
| # find mappings of the functional group in the molecule | |
| for mapping in func_group.get_mapping(molecule_chython): | |
| # remap the functional group based on the found mapping | |
| func_group.remap(mapping) | |
| # if the functional group intersects with center atoms, include it | |
| if set(func_group.atoms_numbers) & center_atoms: | |
| rule_atoms |= set(func_group.atoms_numbers) | |
| # reset the mapping to its original state for the next iteration | |
| func_group.remap({v: k for k, v in mapping.items()}) | |
| return rule_atoms | |
| def add_ring_structures(cgr: CGRContainer, rule_atoms: Set[int]) -> Set[int]: | |
| """ | |
| Adds ring structures to the set of rule atoms if they intersect with the reaction | |
| center atoms. | |
| :param cgr: A condensed graph representation of a reaction (CGRContainer object). | |
| :param rule_atoms: A set of atom id corresponding to the center atoms of the | |
| reaction. | |
| :return: A set of atom id corresponding to the original rule atoms and the included | |
| ring structures. | |
| """ | |
| for ring in cgr.sssr: | |
| # check if the current ring intersects with the set of rule atoms | |
| if set(ring) & rule_atoms: | |
| # if the intersection exists, include all atoms in the ring to the rule atoms | |
| rule_atoms |= set(ring) | |
| return rule_atoms | |
| def add_leaving_incoming_groups( | |
| reaction: ReactionContainer, | |
| rule_atoms: Set[int], | |
| keep_leaving_groups: bool, | |
| keep_incoming_groups: bool, | |
| ) -> Tuple[Set[int], Dict[str, Set]]: | |
| """ | |
| Identifies and includes leaving and incoming groups to the rule atoms based on | |
| specified flags. | |
| :param reaction: The reaction object (ReactionContainer) from which leaving and | |
| incoming groups are extracted. | |
| :param rule_atoms: A set of atom id corresponding to the center atoms of the | |
| reaction. | |
| :param keep_leaving_groups: A boolean flag indicating whether to include leaving | |
| groups in the rule. | |
| :param keep_incoming_groups: A boolean flag indicating whether to include incoming | |
| groups in the rule. | |
| :return: Updated set of rule atoms including leaving and incoming groups if | |
| specified, and metadata about added groups. | |
| """ | |
| meta_debug = {"leaving": set(), "incoming": set()} | |
| # extract atoms from reactants and products | |
| reactant_atoms = {atom for reactant in reaction.reactants for atom in reactant} | |
| product_atoms = {atom for product in reaction.products for atom in product} | |
| # identify leaving groups (reactant atoms not in products) | |
| if keep_leaving_groups: | |
| leaving_atoms = reactant_atoms - product_atoms | |
| new_leaving_atoms = leaving_atoms - rule_atoms | |
| # include leaving atoms in the rule atoms | |
| rule_atoms |= leaving_atoms | |
| # add leaving atoms to metadata | |
| meta_debug["leaving"] |= new_leaving_atoms | |
| # identify incoming groups (product atoms not in reactants) | |
| if keep_incoming_groups: | |
| incoming_atoms = product_atoms - reactant_atoms | |
| new_incoming_atoms = incoming_atoms - rule_atoms | |
| # Include incoming atoms in the rule atoms | |
| rule_atoms |= incoming_atoms | |
| # Add incoming atoms to metadata | |
| meta_debug["incoming"] |= new_incoming_atoms | |
| return rule_atoms, meta_debug | |
| def clean_molecules( | |
| rule_molecules: List[MoleculeContainer], | |
| reaction_molecules: Tuple[MoleculeContainer], | |
| reaction_center_atoms: Set[int], | |
| atom_retention_details: Dict[str, Dict[str, bool]], | |
| ) -> List[QueryContainer]: | |
| """ | |
| Cleans rule molecules by removing specified information about atoms based on | |
| retention details provided. | |
| :param rule_molecules: A list of query container objects representing the rule molecules. | |
| :param reaction_molecules: A list of molecule container objects involved in the reaction. | |
| :param reaction_center_atoms: A set of id corresponding to the atom numbers in the reaction center. | |
| :param atom_retention_details: A dictionary specifying what atom information to retain or remove. | |
| This dictionary should have two keys: "reaction_center" and "environment", | |
| each mapping to another dictionary. The nested dictionaries should have | |
| keys representing atom attributes (like "neighbors", "hybridization", | |
| "implicit_hydrogens", "ring_sizes") and boolean values. | |
| A value of True indicates that the corresponding attribute | |
| should be retained, while False indicates it should be removed from the atom. | |
| :return: A list of QueryContainer objects representing the cleaned rule molecules. | |
| """ | |
| cleaned_rule_molecules = [] | |
| for rule_molecule in rule_molecules: | |
| for reaction_molecule in reaction_molecules: | |
| if set(rule_molecule.atoms_numbers) <= set(reaction_molecule.atoms_numbers): | |
| query_reaction_molecule = reaction_molecule.substructure( | |
| reaction_molecule, as_query=True | |
| ) | |
| query_rule_molecule = query_reaction_molecule.substructure( | |
| rule_molecule | |
| ) | |
| # clean reaction center atoms | |
| if not all( | |
| atom_retention_details["reaction_center"].values() | |
| ): # if everything True, we keep all marks | |
| local_reaction_center_atoms = ( | |
| set(rule_molecule.atoms_numbers) & reaction_center_atoms | |
| ) | |
| for atom_number in local_reaction_center_atoms: | |
| query_rule_molecule = clean_atom( | |
| query_rule_molecule, | |
| atom_retention_details["reaction_center"], | |
| atom_number, | |
| ) | |
| # clean environment atoms | |
| if not all( | |
| atom_retention_details["environment"].values() | |
| ): # if everything True, we keep all marks | |
| local_environment_atoms = ( | |
| set(rule_molecule.atoms_numbers) - reaction_center_atoms | |
| ) | |
| for atom_number in local_environment_atoms: | |
| query_rule_molecule = clean_atom( | |
| query_rule_molecule, | |
| atom_retention_details["environment"], | |
| atom_number, | |
| ) | |
| cleaned_rule_molecules.append(query_rule_molecule) | |
| break | |
| return cleaned_rule_molecules | |
| def clean_atom( | |
| query_molecule: QueryContainer, | |
| attributes_to_keep: Dict[str, bool], | |
| atom_number: int, | |
| ) -> QueryContainer: | |
| """ | |
| Removes specified information from a given atom in a query molecule. | |
| :param query_molecule: The QueryContainer of molecule. | |
| :param attributes_to_keep: Dictionary indicating which attributes to keep in the atom. The keys should be strings | |
| representing the attribute names, and the values should be booleans indicating whether | |
| to retain (True) or remove(False) that attribute. Expected keys are: | |
| - "neighbors": Indicates if neighbors of the atom should be removed. | |
| - "hybridization": Indicates if hybridization information of the atom should be removed. | |
| - "implicit_hydrogens": Indicates if implicit hydrogen information of the atom should be removed. | |
| - "ring_sizes": Indicates if ring size information of the atom should be removed. | |
| :param atom_number: The number of the atom to be modified in the query molecule. | |
| """ | |
| target_atom = query_molecule.atom(atom_number) | |
| if not attributes_to_keep["neighbors"]: | |
| target_atom.neighbors = None | |
| if not attributes_to_keep["hybridization"]: | |
| target_atom.hybridization = None | |
| if not attributes_to_keep["implicit_hydrogens"]: | |
| target_atom.implicit_hydrogens = None | |
| if not attributes_to_keep["ring_sizes"]: | |
| target_atom.ring_sizes = None | |
| return query_molecule | |
| def create_substructures_and_reagents( | |
| reaction: ReactionContainer, | |
| rule_atoms: Set[int], | |
| as_query_container: bool, | |
| keep_reagents: bool, | |
| ) -> Tuple[List[MoleculeContainer], List[MoleculeContainer], List]: | |
| """ | |
| Creates substructures for reactants and products, and optionally includes | |
| reagents, based on specified parameters. The function processes the reaction to | |
| create substructures for reactants and products based on the rule atoms. It also | |
| handles the inclusion of reagents based on the keep_reagents flag and converts these | |
| structures to query containers if required. | |
| :param reaction: The reaction object (ReactionContainer) from which to extract substructures. | |
| This object represents a chemical reaction with specified reactants, products, and possibly reagents. | |
| :param rule_atoms: A set of atom id corresponding to the rule atoms. These are used to identify relevant | |
| substructures in reactants and products. | |
| :param as_query_container: A boolean flag indicating whether the substructures should be converted to query containers. | |
| Query containers are used for pattern matching in chemical structures. | |
| :param keep_reagents: A boolean flag indicating whether reagents should be included in the resulting structures. | |
| Reagents are additional substances that are present in the reaction but are not reactants or products. | |
| :return: A tuple containing three elements: | |
| - A list of reactant substructures, each corresponding to a part of the reactants that matches the rule atoms. | |
| - A list of product substructures, each corresponding to a part of the products that matches the rule atoms. | |
| - A list of reagents, included as is or as substructures, depending on the as_query_container flag. | |
| """ | |
| reactant_substructures = [ | |
| reactant.substructure(rule_atoms.intersection(reactant.atoms_numbers)) | |
| for reactant in reaction.reactants | |
| if rule_atoms.intersection(reactant.atoms_numbers) | |
| ] | |
| product_substructures = [ | |
| product.substructure(rule_atoms.intersection(product.atoms_numbers)) | |
| for product in reaction.products | |
| if rule_atoms.intersection(product.atoms_numbers) | |
| ] | |
| reagents = [] | |
| if keep_reagents: | |
| if as_query_container: | |
| reagents = [ | |
| reagent.substructure(reagent, as_query=True) | |
| for reagent in reaction.reagents | |
| ] | |
| else: | |
| reagents = reaction.reagents | |
| return reactant_substructures, product_substructures, reagents | |
| def assemble_final_rule( | |
| reactant_substructures: List[QueryContainer], | |
| product_substructures: List[QueryContainer], | |
| reagents: List, | |
| meta_debug: Dict[str, Set], | |
| keep_metadata: bool, | |
| reaction: ReactionContainer, | |
| ) -> ReactionContainer: | |
| """ | |
| Assembles the final reaction rule from the provided substructures and metadata. | |
| This function brings together the various components of a reaction rule, including | |
| reactant and product substructures, reagents, and metadata. It creates a | |
| comprehensive representation of the reaction rule, which can be used for further | |
| processing or analysis. | |
| :param reactant_substructures: A list of substructures derived from the reactants of | |
| the reaction. These substructures represent parts of reactants that are relevant | |
| to the rule. | |
| :param product_substructures: A list of substructures derived from the products of | |
| the reaction. These substructures represent parts of products that are relevant | |
| to the rule. | |
| :param reagents: A list of reagents involved in the reaction. These may be included | |
| as-is or as substructures, depending on earlier processing steps. | |
| :param meta_debug: A dictionary containing additional metadata about the reaction, | |
| such as leaving and incoming groups. | |
| :param keep_metadata: A boolean flag indicating whether to retain the metadata | |
| associated with the reaction in the rule. | |
| :param reaction: The original reaction object (ReactionContainer) from which the | |
| rule is being created. | |
| :return: A ReactionContainer object representing the assembled reaction rule. This | |
| container includes the reactant and product substructures, reagents, and any | |
| additional metadata if keep_metadata is True. | |
| """ | |
| rule_metadata = meta_debug if keep_metadata else {} | |
| rule_metadata.update(reaction.meta if keep_metadata else {}) | |
| rule = ReactionContainer( | |
| reactant_substructures, product_substructures, reagents, rule_metadata | |
| ) | |
| if keep_metadata: | |
| rule.name = reaction.name | |
| rule.flush_cache() | |
| return rule | |
| def validate_rule(rule: ReactionContainer, reaction: ReactionContainer) -> bool: | |
| """ | |
| Validates a reaction rule by ensuring it can correctly generate the products from | |
| the reactants. The function uses a chemical reactor to simulate the reaction based | |
| on the provided rule. It then compares the products generated by the simulation with | |
| the actual products of the reaction. If they match, the rule is considered valid. If | |
| not, a ValueError is raised, indicating an issue with the rule. | |
| :param rule: The reaction rule to be validated. This is a ReactionContainer object | |
| representing a chemical reaction rule, which includes the necessary information | |
| to perform a reaction. | |
| :param reaction: The original reaction object (ReactionContainer) against which the | |
| rule is to be validated. This object contains the actual reactants and products | |
| of the reaction. | |
| :return: The validated rule if the rule correctly generates the products from the | |
| reactants. | |
| :raises ValueError: If the rule does not correctly generate the products from the | |
| reactants, indicating an incorrect or incomplete rule. | |
| """ | |
| # create a reactor with the given rule | |
| reactor = Reactor(rule) | |
| try: | |
| for result_reaction in reactor(reaction.reactants): | |
| result_products = [] | |
| for result_product in result_reaction.products: | |
| tmp = result_product.copy() | |
| try: | |
| tmp.kekule() | |
| if tmp.check_valence(): | |
| continue | |
| except InvalidAromaticRing: | |
| continue | |
| result_products.append(result_product) | |
| if set(reaction.products) == set(result_products) and len( | |
| reaction.products | |
| ) == len(result_products): | |
| return True | |
| except (KeyError, IndexError): | |
| # KeyError - iteration over reactor is finished and products are different from the original reaction | |
| # IndexError - mistake in __contract_ions, possibly problems with charges in reaction rule | |
| return False | |
| return False | |
| def create_rule( | |
| config: RuleExtractionConfig, reaction: ReactionContainer | |
| ) -> ReactionContainer: | |
| """ | |
| Creates a reaction rule from a given reaction based on the specified | |
| configuration. The function processes the reaction to create a rule that matches the | |
| configuration settings. It handles the inclusion of environmental atoms, functional | |
| groups, ring structures, and leaving and incoming groups. It also constructs | |
| substructures for reactants, products, and reagents, and cleans molecule | |
| representations if required. Optionally, it validates the rule using a reactor. | |
| :param config: An instance of ExtractRuleConfig, containing various settings that | |
| determine how the rule is created, such as environmental atom count, inclusion | |
| of functional groups, rings, leaving and incoming groups, and other parameters. | |
| :param reaction: The reaction object (ReactionContainer) from which to create the | |
| rule. This object represents a chemical reaction with specified reactants, | |
| products, and possibly reagents. | |
| :return: A ReactionContainer object representing the extracted reaction rule. This | |
| rule includes various elements of the reaction as specified by the | |
| configuration, such as reaction centers, environmental atoms, functional groups, | |
| and others. | |
| """ | |
| # 1. create reaction CGR | |
| cgr = ~reaction | |
| center_atoms = set(cgr.center_atoms) | |
| # 2. add atoms of reaction environment based on config settings | |
| center_atoms = add_environment_atoms( | |
| cgr, center_atoms, config.environment_atom_count | |
| ) | |
| # 3. include functional groups in the rule if specified in config | |
| if config.include_func_groups and config.func_groups_list: | |
| rule_atoms = add_functional_groups( | |
| reaction, center_atoms, config.func_groups_list | |
| ) | |
| else: | |
| rule_atoms = center_atoms.copy() | |
| # 4. include ring structures in the rule if specified in config | |
| if config.include_rings: | |
| rule_atoms = add_ring_structures(cgr, rule_atoms) | |
| # 5. add leaving and incoming groups to the rule based on config settings | |
| rule_atoms, meta_debug = add_leaving_incoming_groups( | |
| reaction, rule_atoms, config.keep_leaving_groups, config.keep_incoming_groups | |
| ) | |
| # 6. create substructures for reactants, products, and reagents | |
| reactant_substructures, product_substructures, reagents = ( | |
| create_substructures_and_reagents( | |
| reaction, rule_atoms, config.as_query_container, config.keep_reagents | |
| ) | |
| ) | |
| # 7. clean atom marks in the molecules if they are being converted to query containers | |
| if config.as_query_container: | |
| reactant_substructures = clean_molecules( | |
| reactant_substructures, | |
| reaction.reactants, | |
| center_atoms, | |
| config.atom_info_retention, | |
| ) | |
| product_substructures = clean_molecules( | |
| product_substructures, | |
| reaction.products, | |
| center_atoms, | |
| config.atom_info_retention, | |
| ) | |
| # 8. assemble the final rule including metadata if specified | |
| rule = assemble_final_rule( | |
| reactant_substructures, | |
| product_substructures, | |
| reagents, | |
| meta_debug, | |
| config.keep_metadata, | |
| reaction, | |
| ) | |
| # 9. reverse extracted reaction rule and reaction | |
| if config.reverse_rule: | |
| rule = reverse_reaction(rule) | |
| reaction = reverse_reaction(reaction) | |
| # 10. validate the rule using a reactor if validation is enabled in config | |
| if config.reactor_validation: | |
| if validate_rule(rule, reaction): | |
| rule.meta["reactor_validation"] = "passed" | |
| else: | |
| rule.meta["reactor_validation"] = "failed" | |
| return rule | |
| def extract_rules( | |
| config: RuleExtractionConfig, reaction: ReactionContainer | |
| ) -> List[ReactionContainer]: | |
| """ | |
| Extracts reaction rules from a given reaction based on the specified | |
| configuration. | |
| :param config: An instance of ExtractRuleConfig, which contains various | |
| configuration settings for rule extraction, such as whether to include | |
| multicenter rules, functional groups, ring structures, leaving and incoming | |
| groups, etc. | |
| :param reaction: The reaction object (ReactionContainer) from which to extract | |
| rules. The reaction object represents a chemical reaction with specified | |
| reactants, products, and possibly reagents. | |
| :return: A list of ReactionContainer objects, each representing a distinct reaction | |
| rule. If config.multicenter_rules is True, a single rule encompassing all | |
| reaction centers is returned. Otherwise, separate rules for each reaction center | |
| are extracted, up to a maximum of 15 distinct centers. | |
| """ | |
| standardizer = ( | |
| RemoveReagentsStandardizer() | |
| ) # reagents are needed if they are the part of reaction rule specification | |
| reaction = standardizer(reaction) | |
| if config.multicenter_rules: | |
| # extract a single rule encompassing all reaction centers | |
| return [create_rule(config, reaction)] | |
| # extract separate rules for each distinct reaction center | |
| distinct_rules = set() | |
| for center_reaction in islice(reaction.enumerate_centers(), 15): | |
| single_rule = create_rule(config, center_reaction) | |
| distinct_rules.add(single_rule) | |
| return list(distinct_rules) | |
| def process_reaction_batch( | |
| batch: List[Tuple[int, ReactionContainer]], config: RuleExtractionConfig | |
| ) -> List[Tuple[int, List[ReactionContainer]]]: | |
| """ | |
| Processes a batch of reactions to extract reaction rules based on the given | |
| configuration. This function operates as a remote task in a distributed system using | |
| Ray. It takes a batch of reactions, where each reaction is paired with an index. For | |
| each reaction in the batch, it extracts reaction rules as specified by the | |
| configuration object. The extracted rules for each reaction are then returned along | |
| with the corresponding index. This function is intended to be used in a distributed | |
| manner with Ray to parallelize the rule extraction process across multiple | |
| reactions. | |
| :param batch: A list where each element is a tuple containing an index (int) and a | |
| ReactionContainer object. The index is typically used to keep track of the | |
| reaction's position in a larger dataset. | |
| :param config: An instance of ExtractRuleConfig that provides settings and | |
| parameters for the rule extraction process. | |
| :return: A list where each element is a tuple. The first element of the tuple is an | |
| index (int), and the second is a list of ReactionContainer objects representing | |
| the extracted rules for the corresponding reaction. | |
| """ | |
| extracted_rules_list = [] | |
| for index, reaction in batch: | |
| try: | |
| extracted_rules = extract_rules(config, reaction) | |
| extracted_rules_list.append((index, extracted_rules)) | |
| except Exception as e: | |
| logging.debug(e) | |
| continue | |
| return extracted_rules_list | |
| def process_completed_batch( | |
| futures: Dict, | |
| rules_statistics: Dict, | |
| ) -> None: | |
| """ | |
| Processes completed batches of reactions, updating the rules statistics and | |
| writing rules to a file. This function waits for the completion of a batch of | |
| reactions processed in parallel (using Ray), updates the statistics for each | |
| extracted rule, and writes the rules to a result file if they are new. It also | |
| updates the progress bar with the size of the processed batch. | |
| :param futures: A dictionary of futures representing ongoing batch processing tasks. | |
| :param rules_statistics: A dictionary to keep track of statistics for each rule. | |
| :return: None | |
| """ | |
| ready_id, running_id = ray.wait(list(futures.keys()), num_returns=1) | |
| completed_batch = ray.get(ready_id[0]) | |
| for index, extracted_rules in completed_batch: | |
| for rule in extracted_rules: | |
| prev_stats_len = len(rules_statistics) | |
| rules_statistics[rule].append(index) | |
| if len(rules_statistics) != prev_stats_len: | |
| rule.meta["first_reaction_index"] = index | |
| del futures[ready_id[0]] | |
| def sort_rules( | |
| rules_stats: Dict, min_popularity: int, single_reactant_only: bool | |
| ) -> List[Tuple[ReactionContainer, List[int]]]: | |
| """ | |
| Sorts reaction rules based on their popularity and validation status. This | |
| function sorts the given rules according to their popularity (i.e., the number of | |
| times they have been applied) and filters out rules that haven't passed reactor | |
| validation or are less popular than the specified minimum popularity threshold. | |
| :param rules_stats: A dictionary where each key is a reaction rule and the value is | |
| a list of integers. Each integer represents an index where the rule was applied. | |
| :type rules_stats: The number of occurrence of the reaction rules. | |
| :param min_popularity: The minimum number of times a rule must be applied to be | |
| considered. Default is 3. | |
| :type min_popularity: The minimum number of occurrence of the reaction rule to be | |
| selected. | |
| :param single_reactant_only: Whether to keep only reaction rules with a single | |
| molecule on the right side of reaction arrow. Default is True. | |
| :return: A list of tuples, where each tuple contains a reaction rule and a list of | |
| indices representing the rule's applications. The list is sorted in descending | |
| order of the rule's popularity. | |
| """ | |
| return sorted( | |
| ( | |
| (rule, indices) | |
| for rule, indices in rules_stats.items() | |
| if len(indices) >= min_popularity | |
| and rule.meta["reactor_validation"] == "passed" | |
| and (not single_reactant_only or len(rule.reactants) == 1) | |
| ), | |
| key=lambda x: -len(x[1]), | |
| ) | |
| def extract_rules_from_reactions( | |
| config: RuleExtractionConfig, | |
| reaction_data_path: str, | |
| reaction_rules_path: str, | |
| num_cpus: int, | |
| batch_size: int, | |
| ) -> None: | |
| """ | |
| Extracts reaction rules from a set of reactions based on the given configuration. | |
| This function initializes a Ray environment for distributed computing and processes | |
| each reaction in the provided reaction database to extract reaction rules. It | |
| handles the reactions in batches, parallelize the rule extraction process. Extracted | |
| rules are written to RDF files and their statistics are recorded. The function also | |
| sorts the rules based on their popularity and saves the sorted rules. | |
| :param config: Configuration settings for rule extraction, including file paths, | |
| batch size, and other parameters. | |
| :param reaction_data_path: Path to the file containing reaction database. | |
| :param reaction_rules_path: Name of the file to store the extracted rules. | |
| :param num_cpus: Number of CPU cores to use for processing. Defaults to 1. | |
| :param batch_size: Number of reactions to process in each batch. Defaults to 10. | |
| :return: None | |
| """ | |
| ray.init(num_cpus=num_cpus, ignore_reinit_error=True, logging_level=logging.ERROR) | |
| reaction_rules_path, _ = splitext(reaction_rules_path) | |
| with ReactionReader(reaction_data_path) as reactions: | |
| futures = {} | |
| batch = [] | |
| max_concurrent_batches = num_cpus | |
| extracted_rules_and_statistics = defaultdict(list) | |
| for index, reaction in tqdm( | |
| enumerate(reactions), | |
| desc="Number of reactions processed: ", | |
| bar_format="{desc}{n} [{elapsed}]", | |
| ): | |
| # reaction ready to use | |
| batch.append((index, reaction)) | |
| if len(batch) == batch_size: | |
| future = process_reaction_batch.remote(batch, config) | |
| futures[future] = None | |
| batch = [] | |
| while len(futures) >= max_concurrent_batches: | |
| process_completed_batch( | |
| futures, | |
| extracted_rules_and_statistics, | |
| ) | |
| if batch: | |
| future = process_reaction_batch.remote(batch, config) | |
| futures[future] = None | |
| while futures: | |
| process_completed_batch( | |
| futures, | |
| extracted_rules_and_statistics, | |
| ) | |
| sorted_rules = sort_rules( | |
| extracted_rules_and_statistics, | |
| min_popularity=config.min_popularity, | |
| single_reactant_only=config.single_reactant_only, | |
| ) | |
| ray.shutdown() | |
| with open(f"{reaction_rules_path}.pickle", "wb") as statistics_file: | |
| pickle.dump(sorted_rules, statistics_file) | |
| print(f"Number of extracted reaction rules: {len(sorted_rules)}") | |