| | """Functions for working with Protein Structure Graphs.""" |
| | |
| | |
| | |
| | |
| | |
| | |
| | from __future__ import annotations |
| |
|
| | import logging |
| | import traceback |
| | from functools import partial |
| | from typing import Any, Callable, Dict, List, Optional, Tuple, Union |
| |
|
| | import networkx as nx |
| | import numpy as np |
| | import pandas as pd |
| |
|
| | try: |
| | from biopandas.pdb import PandasPdb |
| | from biopandas.mmcif import PandasMmcif |
| | except ImportError: |
| | raise Exception('You need to install BioPandas and its dependecies to use this model.') |
| |
|
| | from rich.progress import Progress |
| | from tqdm.contrib.concurrent import process_map |
| |
|
| | try: |
| | from graphein.protein.config import ( |
| | DSSPConfig, |
| | GetContactsConfig, |
| | ProteinGraphConfig, |
| | ) |
| | from graphein.protein.edges.distance import ( |
| | add_distance_to_edges, |
| | compute_distmat, |
| | ) |
| | from graphein.protein.resi_atoms import BACKBONE_ATOMS, RESI_THREE_TO_1 |
| | from graphein.protein.subgraphs import extract_subgraph_from_chains |
| | from graphein.protein.utils import ( |
| | ProteinGraphConfigurationError, |
| | compute_rgroup_dataframe, |
| | filter_dataframe, |
| | get_protein_name_from_filename, |
| | three_to_one_with_mods, |
| | ) |
| | from graphein.rna.constants import RNA_ATOMS |
| | from graphein.utils.utils import ( |
| | annotate_edge_metadata, |
| | annotate_graph_metadata, |
| | annotate_node_metadata, |
| | compute_edges, |
| | ) |
| | except ImportError: |
| | raise Exception('You need to install graphein from source in addition to DSSP to use this model please refer to https://github.com/a-r-j/graphein and https://ssbio.readthedocs.io/en/latest/instructions/dssp.html') |
| |
|
| | from .utils_convert import biopandas_mmcif2pdb |
| |
|
| | |
| | log = logging.getLogger(__name__) |
| |
|
| |
|
| |
|
| | def subset_structure_to_rna( |
| | df: pd.DataFrame, |
| | ) -> pd.DataFrame: |
| | """ |
| | Return a subset of atomic dataframe that contains only certain atom names relevant for RNA structures. |
| | |
| | :param df: Protein Structure dataframe to subset |
| | :type df: pd.DataFrame |
| | :returns: Subsetted protein structure dataframe |
| | :rtype: pd.DataFrame |
| | """ |
| | return filter_dataframe( |
| | df, by_column="atom_name", list_of_values=RNA_ATOMS, boolean=True |
| | ) |
| |
|
| |
|
| | def read_pdb_to_dataframe( |
| | pdb_path: Optional[str] = None, |
| | pdb_code: Optional[str] = None, |
| | uniprot_id: Optional[str] = None, |
| | model_index: int = 1, |
| | ) -> pd.DataFrame: |
| | """ |
| | Reads PDB file to ``PandasPDB`` object. |
| | |
| | Returns ``atomic_df``, which is a dataframe enumerating all atoms and their cartesian coordinates in 3D space. Also |
| | contains associated metadata from the PDB file. |
| | |
| | :param pdb_path: path to PDB file. Defaults to ``None``. |
| | :type pdb_path: str, optional |
| | :param pdb_code: 4-character PDB accession. Defaults to ``None``. |
| | :type pdb_code: str, optional |
| | :param uniprot_id: UniProt ID to build graph from AlphaFoldDB. Defaults to ``None``. |
| | :type uniprot_id: str, optional |
| | :param model_index: Index of model to read. Only relevant for structures containing ensembles. Defaults to ``1``. |
| | :type model_index: int, optional |
| | :param verbose: print dataframe? |
| | :type verbose: bool |
| | :param granularity: Specifies granularity of dataframe. See :class:`~graphein.protein.config.ProteinGraphConfig` for further |
| | details. |
| | :type granularity: str |
| | :returns: ``pd.DataFrame`` containing protein structure |
| | :rtype: pd.DataFrame |
| | """ |
| | if pdb_code is None and pdb_path is None and uniprot_id is None: |
| | raise NameError( |
| | "One of pdb_code, pdb_path or uniprot_id must be specified!" |
| | ) |
| |
|
| | if pdb_path is not None: |
| | if pdb_path.endswith('cif'): |
| | atomic_df = PandasMmcif().read_mmcif(pdb_path) |
| | atomic_df = biopandas_mmcif2pdb(atomic_df, model_index) |
| | else: |
| | atomic_df = PandasPdb().read_pdb(pdb_path) |
| | else: |
| | if uniprot_id is not None: |
| | atomic_df = PandasPdb().fetch_pdb( |
| | uniprot_id=uniprot_id, source="alphafold2-v2" |
| | ) |
| | else: |
| | atomic_df = PandasPdb().fetch_pdb(pdb_code) |
| |
|
| | atomic_df = atomic_df.get_model(model_index) |
| | if len(atomic_df.df["ATOM"]) == 0: |
| | raise ValueError(f"No model found for index: {model_index}") |
| |
|
| | return pd.concat([atomic_df.df["ATOM"], atomic_df.df["HETATM"]]) |
| |
|
| |
|
| | def label_node_id(df: pd.DataFrame, granularity: str) -> pd.DataFrame: |
| | df["node_id"] = ( |
| | df["chain_id"].apply(str) |
| | + ":" |
| | + df["residue_name"] |
| | + ":" |
| | + df["residue_number"].apply(str) |
| | ) |
| | df["residue_id"] = df["node_id"] |
| | if granularity == "atom": |
| | df["node_id"] = df["node_id"] + ":" + df["atom_name"] |
| | elif granularity in {"rna_atom", "rna_centroid"}: |
| | df["node_id"] = ( |
| | df["node_id"] |
| | + ":" |
| | + df["atom_number"].apply(str) |
| | + ":" |
| | + df["atom_name"] |
| | ) |
| | return df |
| |
|
| |
|
| | def deprotonate_structure(df: pd.DataFrame) -> pd.DataFrame: |
| | """Remove protons from PDB dataframe. |
| | |
| | :param df: Atomic dataframe. |
| | :type df: pd.DataFrame |
| | :returns: Atomic dataframe with all ``atom_name == "H"`` removed. |
| | :rtype: pd.DataFrame |
| | """ |
| | log.debug( |
| | "Deprotonating protein. This removes H atoms from the pdb_df dataframe" |
| | ) |
| | return filter_dataframe( |
| | df, by_column="element_symbol", list_of_values=["H"], boolean=False |
| | ) |
| |
|
| |
|
| | def convert_structure_to_centroids(df: pd.DataFrame) -> pd.DataFrame: |
| | """Overwrite existing ``(x, y, z)`` coordinates with centroids of the amino acids. |
| | |
| | :param df: Pandas Dataframe protein structure to convert into a dataframe of centroid positions. |
| | :type df: pd.DataFrame |
| | :return: pd.DataFrame with atoms/residues positions converted into centroid positions. |
| | :rtype: pd.DataFrame |
| | """ |
| | log.debug( |
| | "Converting dataframe to centroids. This averages XYZ coords of the atoms in a residue" |
| | ) |
| |
|
| | centroids = calculate_centroid_positions(df) |
| | df = df.loc[df["atom_name"] == "CA"].reset_index(drop=True) |
| | df["x_coord"] = centroids["x_coord"] |
| | df["y_coord"] = centroids["y_coord"] |
| | df["z_coord"] = centroids["z_coord"] |
| |
|
| | return df |
| |
|
| |
|
| | def subset_structure_to_atom_type( |
| | df: pd.DataFrame, granularity: str |
| | ) -> pd.DataFrame: |
| | """ |
| | Return a subset of atomic dataframe that contains only certain atom names. |
| | |
| | :param df: Protein Structure dataframe to subset. |
| | :type df: pd.DataFrame |
| | :returns: Subsetted protein structure dataframe. |
| | :rtype: pd.DataFrame |
| | """ |
| | return filter_dataframe( |
| | df, by_column="atom_name", list_of_values=[granularity], boolean=True |
| | ) |
| |
|
| |
|
| | def remove_insertions(df: pd.DataFrame, keep: str = "first") -> pd.DataFrame: |
| | """ |
| | This function removes insertions from PDB dataframes. |
| | |
| | :param df: Protein Structure dataframe to remove insertions from. |
| | :type df: pd.DataFrame |
| | :param keep: Specifies which insertion to keep. Options are ``"first"`` or ``"last"``. |
| | Default is ``"first"`` |
| | :type keep: str |
| | :return: Protein structure dataframe with insertions removed |
| | :rtype: pd.DataFrame |
| | """ |
| | |
| | duplicates = df.duplicated( |
| | subset=["chain_id", "residue_number", "atom_name"], keep=keep |
| | ) |
| | df = df[~duplicates] |
| |
|
| | |
| | df = filter_dataframe( |
| | df, by_column="insertion", list_of_values=[""], boolean=True |
| | ) |
| |
|
| | |
| | df = filter_dataframe( |
| | df, by_column="alt_loc", list_of_values=["", "A"], boolean=True |
| | ) |
| |
|
| | return df |
| |
|
| |
|
| | def filter_hetatms( |
| | df: pd.DataFrame, keep_hets: List[str] |
| | ) -> List[pd.DataFrame]: |
| | """Return hetatms of interest. |
| | |
| | :param df: Protein Structure dataframe to filter hetatoms from. |
| | :type df: pd.DataFrame |
| | :param keep_hets: List of hetero atom names to keep. |
| | :returns: Protein structure dataframe with heteroatoms removed |
| | :rtype: pd.DataFrame |
| | """ |
| | return [df.loc[df["residue_name"] == hetatm] for hetatm in keep_hets] |
| |
|
| |
|
| | def process_dataframe( |
| | protein_df: pd.DataFrame, |
| | atom_df_processing_funcs: Optional[List[Callable]] = None, |
| | hetatom_df_processing_funcs: Optional[List[Callable]] = None, |
| | granularity: str = "centroids", |
| | chain_selection: str = "all", |
| | insertions: bool = False, |
| | deprotonate: bool = True, |
| | keep_hets: List[str] = [], |
| | verbose: bool = False, |
| | ) -> pd.DataFrame: |
| | """ |
| | Process ATOM and HETATM dataframes to produce singular dataframe used for graph construction. |
| | |
| | :param protein_df: Dataframe to process. |
| | Should be the object returned from :func:`~graphein.protein.graphs.read_pdb_to_dataframe`. |
| | :type protein_df: pd.DataFrame |
| | :param atom_df_processing_funcs: List of functions to process dataframe. These must take in a dataframe and return a |
| | dataframe. Defaults to None. |
| | :type atom_df_processing_funcs: List[Callable], optional |
| | :param hetatom_df_processing_funcs: List of functions to process the hetatom dataframe. These must take in a dataframe and return a dataframe |
| | :type hetatom_df_processing_funcs: List[Callable], optional |
| | :param granularity: The level of granularity for the graph. This determines the node definition. |
| | Acceptable values include: ``"centroids"``, ``"atoms"``, |
| | any of the atom_names in the PDB file (e.g. ``"CA"``, ``"CB"``, ``"OG"``, etc.). |
| | See: :const:`~graphein.protein.config.GRAPH_ATOMS` and :const:`~graphein.protein.config.GRANULARITY_OPTS`. |
| | :type granularity: str |
| | :param insertions: Whether or not to keep insertions. |
| | :param insertions: bool |
| | :param deprotonate: Whether or not to remove hydrogen atoms (i.e. deprotonation). |
| | :type deprotonate: bool |
| | :param keep_hets: Hetatoms to keep. Defaults to an empty list. |
| | To keep a hetatom, pass it inside a list of hetatom names to keep. |
| | :type keep_hets: List[str] |
| | :param verbose: Verbosity level. |
| | :type verbose: bool |
| | :param chain_selection: Which protein chain to select. Defaults to ``"all"``. Eg can use ``"ACF"`` |
| | to select 3 chains (``A``, ``C`` & ``F``) |
| | :type chain_selection: str |
| | :return: A protein dataframe that can be consumed by |
| | other graph construction functions. |
| | :rtype: pd.DataFrame |
| | """ |
| | protein_df = label_node_id(protein_df, granularity=granularity) |
| | |
| | atoms = filter_dataframe( |
| | protein_df, |
| | by_column="record_name", |
| | list_of_values=["ATOM"], |
| | boolean=True, |
| | ) |
| | hetatms = filter_dataframe( |
| | protein_df, |
| | by_column="record_name", |
| | list_of_values=["HETATM"], |
| | boolean=True, |
| | ) |
| |
|
| | |
| | |
| | |
| | if atom_df_processing_funcs is not None: |
| | for func in atom_df_processing_funcs: |
| | atoms = func(atoms) |
| | if hetatom_df_processing_funcs is None: |
| | return atoms |
| |
|
| | if hetatom_df_processing_funcs is not None: |
| | for func in hetatom_df_processing_funcs: |
| | hetatms = func(hetatms) |
| | return pd.concat([atoms, hetatms]) |
| |
|
| | if keep_hets: |
| | hetatms_to_keep = filter_hetatms(hetatms, keep_hets) |
| | atoms = pd.concat([atoms] + hetatms_to_keep) |
| |
|
| | |
| | if deprotonate: |
| | atoms = deprotonate_structure(atoms) |
| |
|
| | |
| | if granularity == "atom": |
| | pass |
| | elif granularity in {"centroids", "rna_centroid"}: |
| | atoms = convert_structure_to_centroids(atoms) |
| | elif granularity == "rna_atom": |
| | atoms = subset_structure_to_rna(atoms) |
| | else: |
| | atoms = subset_structure_to_atom_type(atoms, granularity) |
| |
|
| | protein_df = atoms |
| |
|
| | |
| | if not insertions: |
| | protein_df = remove_insertions(protein_df) |
| |
|
| | |
| | protein_df = select_chains( |
| | protein_df, chain_selection=chain_selection, verbose=verbose |
| | ) |
| |
|
| | log.debug(f"Detected {len(protein_df)} total nodes") |
| |
|
| | |
| | protein_df = sort_dataframe(protein_df) |
| |
|
| | return protein_df |
| |
|
| |
|
| | def sort_dataframe(df: pd.DataFrame) -> pd.DataFrame: |
| | """Sorts a protein dataframe by chain->residue number->atom number |
| | |
| | This is useful for distributing hetatms/modified residues through the DF. |
| | |
| | :param df: Protein dataframe to sort. |
| | :type df: pd.DataFrame |
| | :return: Sorted protein dataframe. |
| | :rtype: pd.DataFrame |
| | """ |
| | return df.sort_values(by=["chain_id", "residue_number", "atom_number"]) |
| |
|
| |
|
| | def assign_node_id_to_dataframe( |
| | protein_df: pd.DataFrame, granularity: str |
| | ) -> pd.DataFrame: |
| | """ |
| | Assigns the node ID back to the ``pdb_df`` dataframe |
| | |
| | :param protein_df: Structure Dataframe |
| | :type protein_df: pd.DataFrame |
| | :param granularity: Granularity of graph. Atom-level, |
| | residue (e.g. ``CA``) or ``centroids``. |
| | See: :const:`~graphein.protein.config.GRAPH_ATOMS` |
| | and :const:`~graphein.protein.config.GRANULARITY_OPTS`. |
| | :type granularity: str |
| | :return: Returns dataframe with added ``node_ids`` |
| | :rtype: pd.DataFrame |
| | """ |
| | protein_df["node_id"] = ( |
| | protein_df["chain_id"].apply(str) |
| | + ":" |
| | + protein_df["residue_name"] |
| | + ":" |
| | + protein_df["residue_number"].apply(str) |
| | ) |
| | if granularity in {"atom", "rna_atom"}: |
| | protein_df[ |
| | "node_id" |
| | ] = f'{protein_df["node_id"]}:{protein_df["atom_name"]}' |
| |
|
| |
|
| | def select_chains( |
| | protein_df: pd.DataFrame, chain_selection: str, verbose: bool = False |
| | ) -> pd.DataFrame: |
| | """ |
| | Extracts relevant chains from ``protein_df``. |
| | |
| | :param protein_df: pandas dataframe of PDB subsetted to relevant atoms |
| | (``CA``, ``CB``). |
| | :type protein_df: pd.DataFrame |
| | :param chain_selection: Specifies chains that should be extracted from |
| | the larger complexed structure. |
| | :type chain_selection: str |
| | :param verbose: Print dataframe? |
| | :type verbose: bool |
| | :return: Protein structure dataframe containing only entries in the |
| | chain selection. |
| | :rtype: pd.DataFrame |
| | """ |
| | if chain_selection != "all": |
| | protein_df = filter_dataframe( |
| | protein_df, |
| | by_column="chain_id", |
| | list_of_values=list(chain_selection), |
| | boolean=True, |
| | ) |
| |
|
| | return protein_df |
| |
|
| |
|
| | def initialise_graph_with_metadata( |
| | protein_df: pd.DataFrame, |
| | raw_pdb_df: pd.DataFrame, |
| | granularity: str, |
| | name: Optional[str] = None, |
| | pdb_code: Optional[str] = None, |
| | pdb_path: Optional[str] = None, |
| | ) -> nx.Graph: |
| | """ |
| | Initializes the nx Graph object with initial metadata. |
| | |
| | :param protein_df: Processed Dataframe of protein structure. |
| | :type protein_df: pd.DataFrame |
| | :param raw_pdb_df: Unprocessed dataframe of protein structure for comparison and traceability downstream. |
| | :type raw_pdb_df: pd.DataFrame |
| | :param granularity: Granularity of the graph (eg ``"atom"``, ``"CA"``, ``"CB"`` etc or ``"centroid"``). |
| | See: :const:`~graphein.protein.config.GRAPH_ATOMS` and :const:`~graphein.protein.config.GRANULARITY_OPTS`. |
| | :type granularity: str |
| | :param name: specified given name for the graph. If None, the PDB code or the file name will be used to name the graph. |
| | :type name: Optional[str], defaults to ``None`` |
| | :param pdb_code: PDB ID / Accession code, if the PDB is available on the PDB database. |
| | :type pdb_code: Optional[str], defaults to ``None`` |
| | :param pdb_path: path to local PDB file, if constructing a graph from a local file. |
| | :type pdb_path: Optional[str], defaults to ``None`` |
| | :return: Returns initial protein structure graph with metadata. |
| | :rtype: nx.Graph |
| | """ |
| |
|
| | |
| | if name is None: |
| | if pdb_path is not None: |
| | name = get_protein_name_from_filename(pdb_path) |
| | else: |
| | name = pdb_code |
| |
|
| | G = nx.Graph( |
| | name=name, |
| | pdb_code=pdb_code, |
| | pdb_path=pdb_path, |
| | chain_ids=list(protein_df["chain_id"].unique()), |
| | pdb_df=protein_df, |
| | raw_pdb_df=raw_pdb_df, |
| | rgroup_df=compute_rgroup_dataframe(remove_insertions(raw_pdb_df)), |
| | coords=np.asarray(protein_df[["x_coord", "y_coord", "z_coord"]]), |
| | ) |
| |
|
| | |
| | G.graph["node_type"] = granularity |
| |
|
| | |
| | for c in G.graph["chain_ids"]: |
| | if granularity == "rna_atom": |
| | sequence = protein_df.loc[protein_df["chain_id"] == c][ |
| | "residue_name" |
| | ].str.cat() |
| | else: |
| | sequence = ( |
| | protein_df.loc[protein_df["chain_id"] == c]["residue_name"] |
| | .apply(three_to_one_with_mods) |
| | .str.cat() |
| | ) |
| | G.graph[f"sequence_{c}"] = sequence |
| | return G |
| |
|
| |
|
| | def add_nodes_to_graph( |
| | G: nx.Graph, |
| | protein_df: Optional[pd.DataFrame] = None, |
| | verbose: bool = False, |
| | ) -> nx.Graph: |
| | """Add nodes into protein graph. |
| | |
| | :param G: ``nx.Graph`` with metadata to populate with nodes. |
| | :type G: nx.Graph |
| | :protein_df: DataFrame of protein structure containing nodes & initial node metadata to add to the graph. |
| | :type protein_df: pd.DataFrame, optional |
| | :param verbose: Controls verbosity of this step. |
| | :type verbose: bool |
| | :returns: nx.Graph with nodes added. |
| | :rtype: nx.Graph |
| | """ |
| |
|
| | |
| | if protein_df is None: |
| | protein_df = G.graph["pdb_df"] |
| | |
| | chain_id = protein_df["chain_id"].apply(str) |
| | residue_name = protein_df["residue_name"] |
| | residue_number = protein_df["residue_number"] |
| | coords = np.asarray(protein_df[["x_coord", "y_coord", "z_coord"]]) |
| | b_factor = protein_df["b_factor"] |
| | atom_type = protein_df["atom_name"] |
| | nodes = protein_df["node_id"] |
| | element_symbol = protein_df["element_symbol"] |
| | G.add_nodes_from(nodes) |
| |
|
| | |
| | nx.set_node_attributes(G, dict(zip(nodes, chain_id)), "chain_id") |
| | nx.set_node_attributes(G, dict(zip(nodes, residue_name)), "residue_name") |
| | nx.set_node_attributes( |
| | G, dict(zip(nodes, residue_number)), "residue_number" |
| | ) |
| | nx.set_node_attributes(G, dict(zip(nodes, atom_type)), "atom_type") |
| | nx.set_node_attributes( |
| | G, dict(zip(nodes, element_symbol)), "element_symbol" |
| | ) |
| | nx.set_node_attributes(G, dict(zip(nodes, coords)), "coords") |
| | nx.set_node_attributes(G, dict(zip(nodes, b_factor)), "b_factor") |
| |
|
| | |
| | if verbose: |
| | print(nx.info(G)) |
| | print(G.nodes()) |
| |
|
| | return G |
| |
|
| |
|
| | def calculate_centroid_positions( |
| | atoms: pd.DataFrame, verbose: bool = False |
| | ) -> pd.DataFrame: |
| | """ |
| | Calculates position of sidechain centroids. |
| | |
| | :param atoms: ATOM df of protein structure. |
| | :type atoms: pd.DataFrame |
| | :param verbose: bool controlling verbosity. |
| | :type verbose: bool |
| | :return: centroids (df). |
| | :rtype: pd.DataFrame |
| | """ |
| | centroids = ( |
| | atoms.groupby("residue_number") |
| | .mean()[["x_coord", "y_coord", "z_coord"]] |
| | .reset_index() |
| | ) |
| | if verbose: |
| | print(f"Calculated {len(centroids)} centroid nodes") |
| | log.debug(f"Calculated {len(centroids)} centroid nodes") |
| | return centroids |
| |
|
| |
|
| | def compute_edges( |
| | G: nx.Graph, |
| | funcs: List[Callable], |
| | get_contacts_config: Optional[GetContactsConfig] = None, |
| | ) -> nx.Graph: |
| | """ |
| | Computes edges for the protein structure graph. Will compute a pairwise |
| | distance matrix between nodes which is |
| | added to the graph metadata to facilitate some edge computations. |
| | |
| | :param G: nx.Graph with nodes to add edges to. |
| | :type G: nx.Graph |
| | :param funcs: List of edge construction functions. |
| | :type funcs: List[Callable] |
| | :param get_contacts_config: Config object for ``GetContacts`` if |
| | intramolecular edges are being used. |
| | :type get_contacts_config: graphein.protein.config.GetContactsConfig |
| | :return: Graph with added edges. |
| | :rtype: nx.Graph |
| | """ |
| | |
| | if "config" in G.graph: |
| | if G.graph["config"].granularity == "atom": |
| | G.graph["atomic_dist_mat"] = compute_distmat(G.graph["pdb_df"]) |
| | else: |
| | G.graph["dist_mat"] = compute_distmat(G.graph["pdb_df"]) |
| |
|
| | for func in funcs: |
| | func(G) |
| |
|
| | return add_distance_to_edges(G) |
| |
|
| |
|
| | def construct_graph( |
| | config: Optional[ProteinGraphConfig] = None, |
| | name: Optional[str] = None, |
| | pdb_path: Optional[str] = None, |
| | uniprot_id: Optional[str] = None, |
| | pdb_code: Optional[str] = None, |
| | chain_selection: str = "all", |
| | model_index: int = 1, |
| | df_processing_funcs: Optional[List[Callable]] = None, |
| | edge_construction_funcs: Optional[List[Callable]] = None, |
| | edge_annotation_funcs: Optional[List[Callable]] = None, |
| | node_annotation_funcs: Optional[List[Callable]] = None, |
| | graph_annotation_funcs: Optional[List[Callable]] = None, |
| | ) -> nx.Graph: |
| | """ |
| | Constructs protein structure graph from a ``pdb_code`` or ``pdb_path``. |
| | |
| | Users can provide a :class:`~graphein.protein.config.ProteinGraphConfig` |
| | object to specify construction parameters. |
| | |
| | However, config parameters can be overridden by passing arguments directly to the function. |
| | |
| | :param config: :class:`~graphein.protein.config.ProteinGraphConfig` object. If None, defaults to config in ``graphein.protein.config``. |
| | :type config: graphein.protein.config.ProteinGraphConfig, optional |
| | :param name: an optional given name for the graph. the PDB ID or PDB file name will be used if not specified. |
| | :type name: str, optional |
| | :param pdb_path: Path to ``pdb_file`` when constructing a graph from a local pdb file. Default is ``None``. |
| | :type pdb_path: Optional[str], defaults to ``None`` |
| | :param pdb_code: A 4-character PDB ID / accession to be used to construct the graph, if available. Default is ``None``. |
| | :type pdb_code: Optional[str], defaults to ``None`` |
| | :param uniprot_id: UniProt accession ID to build graph from AlphaFold2DB. Default is ``None``. |
| | :type uniprot_id: str, optional |
| | :param chain_selection: String of polypeptide chains to include in graph. E.g ``"ABDF"`` or ``"all"``. Default is ``"all"``. |
| | :type chain_selection: str |
| | :param model_index: Index of model to use in the case of structural ensembles. Default is ``1``. |
| | :type model_index: int |
| | :param df_processing_funcs: List of dataframe processing functions. Default is ``None``. |
| | :type df_processing_funcs: List[Callable], optional |
| | :param edge_construction_funcs: List of edge construction functions. Default is ``None``. |
| | :type edge_construction_funcs: List[Callable], optional |
| | :param edge_annotation_funcs: List of edge annotation functions. Default is ``None``. |
| | :type edge_annotation_funcs: List[Callable], optional |
| | :param node_annotation_funcs: List of node annotation functions. Default is ``None``. |
| | :type node_annotation_funcs: List[Callable], optional |
| | :param graph_annotation_funcs: List of graph annotation function. Default is ``None``. |
| | :type graph_annotation_funcs: List[Callable] |
| | :return: Protein Structure Graph |
| | :rtype: nx.Graph |
| | """ |
| |
|
| | if pdb_code is None and pdb_path is None and uniprot_id is None: |
| | raise ValueError( |
| | "Either a PDB ID, UniProt ID or a path to a local PDB file" |
| | " must be specified to construct a graph" |
| | ) |
| | |
| | |
| | if config is None: |
| | config = ProteinGraphConfig() |
| | with Progress(transient=True) as progress: |
| | task1 = progress.add_task("Reading PDB file...", total=1) |
| | |
| | |
| | |
| | |
| | progress.advance(task1) |
| |
|
| | |
| | config.protein_df_processing_functions = ( |
| | df_processing_funcs |
| | if config.protein_df_processing_functions is None |
| | else config.protein_df_processing_functions |
| | ) |
| | config.edge_construction_functions = ( |
| | edge_construction_funcs |
| | if config.edge_construction_functions is None |
| | else config.edge_construction_functions |
| | ) |
| | config.node_metadata_functions = ( |
| | node_annotation_funcs |
| | if config.node_metadata_functions is None |
| | else config.node_metadata_functions |
| | ) |
| | config.graph_metadata_functions = ( |
| | graph_annotation_funcs |
| | if config.graph_metadata_functions is None |
| | else config.graph_metadata_functions |
| | ) |
| | config.edge_metadata_functions = ( |
| | edge_annotation_funcs |
| | if config.edge_metadata_functions is None |
| | else config.edge_metadata_functions |
| | ) |
| |
|
| | raw_df = read_pdb_to_dataframe( |
| | pdb_path, |
| | pdb_code, |
| | uniprot_id, |
| | model_index=model_index, |
| | ) |
| | |
| | |
| | task2 = progress.add_task("Processing PDB dataframe...", total=1) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | raw_df = sort_dataframe(raw_df) |
| | protein_df = process_dataframe( |
| | raw_df, |
| | chain_selection=chain_selection, |
| | granularity=config.granularity, |
| | insertions=config.insertions, |
| | keep_hets=config.keep_hets, |
| | ) |
| | progress.advance(task2) |
| |
|
| | task3 = progress.add_task("Initializing graph...", total=1) |
| | |
| | g = initialise_graph_with_metadata( |
| | protein_df=protein_df, |
| | raw_pdb_df=raw_df, |
| | name=name, |
| | pdb_code=pdb_code, |
| | pdb_path=pdb_path, |
| | granularity=config.granularity, |
| | ) |
| | |
| | g = add_nodes_to_graph(g) |
| | |
| | g.graph["config"] = config |
| | g.graph["path"] = g.graph["pdb_path"] |
| |
|
| | |
| | if config.node_metadata_functions is not None: |
| | g = annotate_node_metadata(g, config.node_metadata_functions) |
| | progress.advance(task3) |
| | task4 = progress.add_task("Constructing edges...", total=1) |
| | |
| | g = compute_edges( |
| | g, |
| | funcs=config.edge_construction_functions, |
| | get_contacts_config=None, |
| | ) |
| | progress.advance(task4) |
| |
|
| | |
| | |
| | if config.graph_metadata_functions is not None: |
| | g = annotate_graph_metadata(g, config.graph_metadata_functions) |
| |
|
| | |
| | if config.edge_metadata_functions is not None: |
| | g = annotate_edge_metadata(g, config.edge_metadata_functions) |
| |
|
| | return g |
| |
|
| |
|
| | def _mp_graph_constructor( |
| | args: Tuple[str, str, int], source: str, config: ProteinGraphConfig |
| | ) -> Union[nx.Graph, None]: |
| | """ |
| | Protein graph constructor for use in multiprocessing several protein structure graphs. |
| | |
| | :param args: Tuple of pdb code/path and the chain selection for that PDB. |
| | :type args: Tuple[str, str] |
| | :param use_pdb_code: Whether we are using ``"pdb_code"``s, ``pdb_path``s or ``"uniprot_id"``s. |
| | :type use_pdb_code: bool |
| | :param config: Protein structure graph construction config (see: :class:`graphein.protein.config.ProteinGraphConfig`). |
| | :type config: ProteinGraphConfig |
| | :return: Protein structure graph or ``None`` if an error is encountered. |
| | :rtype: Union[nx.Graph, None] |
| | """ |
| | log.info( |
| | f"Constructing graph for: {args[0]}. Chain selection: {args[1]}. Model index: {args[2]}" |
| | ) |
| | func = partial(construct_graph, config=config) |
| | try: |
| | if source == "pdb_code": |
| | return func( |
| | pdb_code=args[0], chain_selection=args[1], model_index=args[2] |
| | ) |
| | elif source == "pdb_path": |
| | return func( |
| | pdb_path=args[0], chain_selection=args[1], model_index=args[2] |
| | ) |
| | elif source == "uniprot_id": |
| | return func( |
| | uniprot_id=args[0], |
| | chain_selection=args[1], |
| | model_index=args[2], |
| | ) |
| |
|
| | except Exception as ex: |
| | log.info( |
| | f"Graph construction error (PDB={args[0]})! {traceback.format_exc()}" |
| | ) |
| | log.info(ex) |
| | return None |
| |
|
| |
|
| | def construct_graphs_mp( |
| | pdb_code_it: Optional[List[str]] = None, |
| | pdb_path_it: Optional[List[str]] = None, |
| | uniprot_id_it: Optional[List[str]] = None, |
| | chain_selections: Optional[List[str]] = None, |
| | model_indices: Optional[List[str]] = None, |
| | config: ProteinGraphConfig = ProteinGraphConfig(), |
| | num_cores: int = 16, |
| | return_dict: bool = True, |
| | out_path: Optional[str] = None, |
| | ) -> Union[List[nx.Graph], Dict[str, nx.Graph]]: |
| | """ |
| | Constructs protein graphs for a list of pdb codes or pdb paths using multiprocessing. |
| | |
| | :param pdb_code_it: List of pdb codes to use for protein graph construction |
| | :type pdb_code_it: Optional[List[str]], defaults to ``None`` |
| | :param pdb_path_it: List of paths to PDB files to use for protein graph construction |
| | :type pdb_path_it: Optional[List[str]], defaults to ``None`` |
| | :param chain_selections: List of chains to select from the protein structures (e.g. ``["ABC", "A", "L", "CD"...]``) |
| | :type chain_selections: Optional[List[str]], defaults to ``None`` |
| | :param model_indices: List of model indices to use for protein graph construction. Only relevant for structures containing ensembles of models. |
| | :type model_indices: Optional[List[str]], defaults to ``None`` |
| | :param config: ProteinGraphConfig to use. |
| | :type config: graphein.protein.config.ProteinGraphConfig, defaults to default config params |
| | :param num_cores: Number of cores to use for multiprocessing. The more the merrier |
| | :type num_cores: int, defaults to ``16`` |
| | :param return_dict: Whether or not to return a dictionary (indexed by pdb codes/paths) or a list of graphs. |
| | :type return_dict: bool, default to ``True`` |
| | :param out_path: Path to save the graphs to. If None, graphs are not saved. |
| | :type out_path: Optional[str], defaults to ``None`` |
| | :return: Iterable of protein graphs. None values indicate there was a problem in constructing the graph for this particular pdb |
| | :rtype: Union[List[nx.Graph], Dict[str, nx.Graph]] |
| | """ |
| | assert ( |
| | pdb_code_it is not None or pdb_path_it is not None |
| | ), "Iterable of pdb codes, pdb paths or uniprot IDs required." |
| |
|
| | if pdb_code_it is not None: |
| | pdbs = pdb_code_it |
| | source = "pdb_code" |
| |
|
| | if pdb_path_it is not None: |
| | pdbs = pdb_path_it |
| | source = "pdb_path" |
| |
|
| | if uniprot_id_it is not None: |
| | pdbs = uniprot_id_it |
| | source = "uniprot_id" |
| |
|
| | if chain_selections is None: |
| | chain_selections = ["all"] * len(pdbs) |
| |
|
| | if model_indices is None: |
| | model_indices = [1] * len(pdbs) |
| |
|
| | constructor = partial(_mp_graph_constructor, source=source, config=config) |
| |
|
| | graphs = list( |
| | process_map( |
| | constructor, |
| | [ |
| | (pdb, chain_selections[i], model_indices[i]) |
| | for i, pdb in enumerate(pdbs) |
| | ], |
| | max_workers=num_cores, |
| | ) |
| | ) |
| | if out_path is not None: |
| | [ |
| | nx.write_gpickle( |
| | g, str(f"{out_path}/" + f"{g.graph['name']}.pickle") |
| | ) |
| | for g in graphs |
| | ] |
| |
|
| | if return_dict: |
| | graphs = {pdb: graphs[i] for i, pdb in enumerate(pdbs)} |
| |
|
| | return graphs |
| |
|
| |
|
| | def compute_chain_graph( |
| | g: nx.Graph, |
| | chain_list: Optional[List[str]] = None, |
| | remove_self_loops: bool = False, |
| | return_weighted_graph: bool = False, |
| | ) -> Union[nx.Graph, nx.MultiGraph]: |
| | """Computes a chain-level graph from a protein structure graph. |
| | |
| | This graph features nodes as individual chains in a complex and edges as |
| | the interactions between constituent nodes in each chain. You have the |
| | option of returning an unweighted graph (multigraph, |
| | ``return_weighted_graph=False``) or a weighted graph |
| | (``return_weighted_graph=True``). The difference between these is the |
| | unweighted graph features and edge for each interaction between chains |
| | (ie the number of edges will be equal to the number of edges in the input |
| | protein structure graph), while the weighted graph sums these interactions |
| | to a single edge between chains with the counts stored as features. |
| | |
| | :param g: A protein structure graph to compute the chain graph of. |
| | :type g: nx.Graph |
| | :param chain_list: A list of chains to extract from the input graph. |
| | If ``None``, all chains will be used. This is provided as input to |
| | ``extract_subgraph_from_chains``. Default is ``None``. |
| | :type chain_list: Optional[List[str]] |
| | :param remove_self_loops: Whether to remove self-loops from the graph. |
| | Default is False. |
| | :type remove_self_loops: bool |
| | :return: A chain-level graph. |
| | :rtype: Union[nx.Graph, nx.MultiGraph] |
| | """ |
| | |
| | if chain_list is not None: |
| | g = extract_subgraph_from_chains(g, chain_list) |
| |
|
| | |
| | h = nx.MultiGraph() |
| | h.graph = g.graph |
| | h.graph["node_type"] = "chain" |
| |
|
| | |
| | nodes_per_chain = {chain: 0 for chain in g.graph["chain_ids"]} |
| | sequences = {chain: "" for chain in g.graph["chain_ids"]} |
| | for n, d in g.nodes(data=True): |
| | nodes_per_chain[d["chain_id"]] += 1 |
| | sequences[d["chain_id"]] += RESI_THREE_TO_1[d["residue_name"]] |
| |
|
| | h.add_nodes_from(g.graph["chain_ids"]) |
| |
|
| | for n, d in h.nodes(data=True): |
| | d["num_residues"] = nodes_per_chain[n] |
| | d["sequence"] = sequences[n] |
| |
|
| | |
| | for u, v, d in g.edges(data=True): |
| | h.add_edge( |
| | g.nodes[u]["chain_id"], g.nodes[v]["chain_id"], kind=d["kind"] |
| | ) |
| | |
| | if remove_self_loops: |
| | edges_to_remove: List[Tuple[str]] = [ |
| | (u, v) for u, v in h.edges() if u == v |
| | ] |
| | h.remove_edges_from(edges_to_remove) |
| |
|
| | |
| | if return_weighted_graph: |
| | return compute_weighted_graph_from_multigraph(h) |
| | return h |
| |
|
| |
|
| | def compute_weighted_graph_from_multigraph(g: nx.MultiGraph) -> nx.Graph: |
| | """Computes a weighted graph from a multigraph. |
| | |
| | This function is used to convert a multigraph to a weighted graph. The |
| | weights of the edges are the number of interactions between the nodes. |
| | |
| | :param g: A multigraph. |
| | :type g: nx.MultiGraph |
| | :return: A weighted graph. |
| | :rtype: nx.Graph |
| | """ |
| | H = nx.Graph() |
| | H.graph = g.graph |
| | H.add_nodes_from(g.nodes(data=True)) |
| | for u, v, d in g.edges(data=True): |
| | if H.has_edge(u, v): |
| | H[u][v]["weight"] += len(d["kind"]) |
| | H[u][v]["kind"].update(d["kind"]) |
| | for kind in list(d["kind"]): |
| | try: |
| | H[u][v][kind] += 1 |
| | except KeyError: |
| | H[u][v][kind] = 1 |
| | else: |
| | H.add_edge(u, v, weight=len(d["kind"]), kind=d["kind"]) |
| | for kind in list(d["kind"]): |
| | H[u][v][kind] = 1 |
| | return H |
| |
|
| |
|
| | def number_groups_of_runs(list_of_values: List[Any]) -> List[str]: |
| | """Numbers groups of runs in a list of values. |
| | |
| | E.g. ``["A", "A", "B", "A", "A", "A", "B", "B"] -> |
| | ["A1", "A1", "B1", "A2", "A2", "A2", "B2", "B2"]`` |
| | |
| | :param list_of_values: List of values to number. |
| | :type list_of_values: List[Any] |
| | :return: List of numbered values. |
| | :rtype: List[str] |
| | """ |
| | df = pd.DataFrame({"val": list_of_values}) |
| | df["idx"] = df["val"].shift() != df["val"] |
| | df["sum"] = df.groupby("val")["idx"].cumsum() |
| | return list(df["val"].astype(str) + df["sum"].astype(str)) |
| |
|
| |
|
| | def compute_secondary_structure_graph( |
| | g: nx.Graph, |
| | allowable_ss_elements: Optional[List[str]] = None, |
| | remove_non_ss: bool = True, |
| | remove_self_loops: bool = False, |
| | return_weighted_graph: bool = False, |
| | ) -> Union[nx.Graph, nx.MultiGraph]: |
| | """Computes a secondary structure graph from a protein structure graph. |
| | |
| | :param g: A protein structure graph to compute the secondary structure |
| | graph of. |
| | :type g: nx.Graph |
| | :param remove_non_ss: Whether to remove non-secondary structure nodes from |
| | the graph. These are denoted as ``"-"`` by DSSP. Default is True. |
| | :type remove_non_ss: bool |
| | :param remove_self_loops: Whether to remove self-loops from the graph. |
| | Default is ``False``. |
| | :type remove_self_loops: bool |
| | :param return_weighted_graph: Whether to return a weighted graph. |
| | Default is False. |
| | :type return_weighted_graph: bool |
| | :raises ProteinGraphConfigurationError: If the protein structure graph is |
| | not configured correctly with secondary structure assignments on all |
| | nodes. |
| | :return: A secondary structure graph. |
| | :rtype: Union[nx.Graph, nx.MultiGraph] |
| | """ |
| | |
| | ss_list: List[str] = [] |
| |
|
| | |
| | for _, d in g.nodes(data=True): |
| | if "ss" not in d.keys(): |
| | raise ProteinGraphConfigurationError( |
| | "Secondary structure not defined for all nodes." |
| | ) |
| | ss_list.append(d["ss"]) |
| |
|
| | |
| | ss_list = pd.Series(number_groups_of_runs(ss_list)) |
| | ss_list.index = list(g.nodes()) |
| |
|
| | |
| | if remove_non_ss: |
| | ss_list = ss_list[~ss_list.str.contains("-")] |
| | |
| | if allowable_ss_elements: |
| | ss_list = ss_list[ |
| | ss_list.str.contains("|".join(allowable_ss_elements)) |
| | ] |
| |
|
| | constituent_residues: Dict[str, List[str]] = ss_list.index.groupby( |
| | ss_list.values |
| | ) |
| | constituent_residues = { |
| | k: list(v) for k, v in constituent_residues.items() |
| | } |
| | residue_counts: Dict[str, int] = ss_list.groupby(ss_list).count().to_dict() |
| |
|
| | |
| | h = nx.MultiGraph() |
| | h.add_nodes_from(ss_list) |
| | nx.set_node_attributes(h, residue_counts, "residue_counts") |
| | nx.set_node_attributes(h, constituent_residues, "constituent_residues") |
| | |
| | for n, d in h.nodes(data=True): |
| | d["ss"] = n[0] |
| |
|
| | |
| | h.graph = g.graph |
| | h.graph["node_type"] = "secondary_structure" |
| |
|
| | |
| | for u, v, d in g.edges(data=True): |
| | try: |
| | h.add_edge( |
| | ss_list[u], ss_list[v], kind=d["kind"], source=f"{u}_{v}" |
| | ) |
| | except KeyError as e: |
| | log.debug( |
| | f"Edge {u}-{v} not added to secondary structure graph. \ |
| | Reason: {e} not in graph" |
| | ) |
| |
|
| | |
| | |
| | if remove_self_loops: |
| | edges_to_remove: List[Tuple[str]] = [ |
| | (u, v) for u, v in h.edges() if u == v |
| | ] |
| | h.remove_edges_from(edges_to_remove) |
| |
|
| | |
| | if return_weighted_graph: |
| | return compute_weighted_graph_from_multigraph(h) |
| | return h |
| |
|
| |
|
| | def compute_line_graph(g: nx.Graph, repopulate_data: bool = True) -> nx.Graph: |
| | """Computes the line graph of a graph. |
| | |
| | The line graph of a graph G has a node for each edge in G and an edge |
| | joining those nodes if the two edges in G share a common node. For directed |
| | graphs, nodes are adjacent exactly when the edges they represent form a |
| | directed path of length two. |
| | |
| | The nodes of the line graph are 2-tuples of nodes in the original graph (or |
| | 3-tuples for multigraphs, with the key of the edge as the third element). |
| | |
| | :param g: Graph to compute the line graph of. |
| | :type g: nx.Graph |
| | :param repopulate_data: Whether or not to map node and edge data to edges |
| | and nodes of the line graph, defaults to True |
| | :type repopulate_data: bool, optional |
| | :return: Line graph of g. |
| | :rtype: nx.Graph |
| | """ |
| | l_g = nx.generators.line_graph(g) |
| | l_g.graph = g.graph |
| |
|
| | if repopulate_data: |
| | source_edge_data = {(u, v): d for u, v, d in g.edges(data=True)} |
| | nx.set_node_attributes(l_g, source_edge_data) |
| |
|
| | node_list = {} |
| | for u, v, d in l_g.edges(data=True): |
| | node_union = u + v |
| | for n in node_union: |
| | if node_union.count(n) > 1: |
| | node_list[(u, v)] = n |
| | break |
| |
|
| | source_node_data = {k: g.nodes[v] for k, v in node_list.items()} |
| | nx.set_edge_attributes(l_g, source_node_data) |
| | return l_g |
| |
|