Spaces:
No application file
No application file
DrVai-Rag-Testing
/
myenv
/lib
/python3.10
/site-packages
/Bio
/SearchIO
/ExonerateIO
/exonerate_text.py
| # Copyright 2012 by Wibowo Arindrarto. All rights reserved. | |
| # This file is part of the Biopython distribution and governed by your | |
| # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". | |
| # Please see the LICENSE file that should have been included as part of this | |
| # package. | |
| """Bio.SearchIO parser for Exonerate plain text output format.""" | |
| import re | |
| from itertools import chain | |
| from ._base import ( | |
| _BaseExonerateParser, | |
| _BaseExonerateIndexer, | |
| _STRAND_MAP, | |
| _parse_hit_or_query_line, | |
| ) | |
| from .exonerate_vulgar import _RE_VULGAR | |
| __all__ = ("ExonerateTextParser", "ExonerateTextIndexer") | |
| # for capturing sequences in alignment blocks | |
| # e.g. ' 529 : ATCCCTTATCTCTTTATCTTGTA : 472' | |
| _RE_ALN_ROW = re.compile(r"\s*\d+\s+: (.*) :\s+\d+") | |
| # for splitting the line based on intron annotations | |
| # e.g. ' >>>> Target Intron 1 >>>> ' or 'gt.........................ag' | |
| _RE_EXON = re.compile( | |
| r"[atgc ]{2}?(?:(?:[<>]+ \w+ Intron \d+ [<>]+)|(?:\.+))[atgc ]{2}?" | |
| ) | |
| # captures the intron length | |
| # from e.g. '61 bp // 154295 bp' (joint intron lengths) or '177446 bp' | |
| _RE_EXON_LEN = re.compile(r"(?:(\d+) bp // (\d+) bp)|(?:(\d+) bp)") | |
| # for splitting lines in the NER model | |
| _RE_NER = re.compile(r"--<\s+\d+\s+>--") | |
| # for capturing NER gap lengths | |
| _RE_NER_LEN = re.compile(r"--<\s+(\d+)\s+>--") | |
| # regexes for capturing the letters inside curly braces | |
| # no. of letters is either 1 or 2, since they are split codons | |
| _RE_SCODON_START = re.compile(r"\{(\w{1,2})\}$") | |
| _RE_SCODON_END = re.compile(r"^\{(\w{1,2})\}") | |
| def _flip_codons(codon_seq, target_seq): | |
| """Flips the codon characters from one seq to another (PRIVATE).""" | |
| a, b = "", "" | |
| for char1, char2 in zip(codon_seq, target_seq): | |
| # no need to do anything if the codon seq line has nothing | |
| if char1 == " ": | |
| a += char1 | |
| b += char2 | |
| else: | |
| a += char2 | |
| b += char1 | |
| return a, b | |
| def _get_block_coords(parsed_seq, row_dict, has_ner=False): | |
| """Return a list of start, end coordinates for each given block in the sequence (PRIVATE).""" | |
| start = 0 | |
| coords = [] | |
| if not has_ner: | |
| splitter = _RE_EXON | |
| else: | |
| splitter = _RE_NER | |
| # use the query line for reference | |
| seq = parsed_seq[row_dict["query"]] | |
| for block in re.split(splitter, seq): | |
| start += seq[start:].find(block) | |
| end = start + len(block) | |
| coords.append((start, end)) | |
| return coords | |
| def _get_inter_coords(coords, strand=1): | |
| """Return list of pairs covering intervening ranges (PRIVATE). | |
| From the given pairs of coordinates, returns a list of pairs | |
| covering the intervening ranges. | |
| """ | |
| # adapted from Python's itertools guide | |
| # if strand is -1, adjust coords to the ends and starts are chained | |
| if strand == -1: | |
| sorted_coords = [(max(a, b), min(a, b)) for a, b in coords] | |
| inter_coords = list(chain(*sorted_coords))[1:-1] | |
| return list(zip(inter_coords[1::2], inter_coords[::2])) | |
| else: | |
| inter_coords = list(chain(*coords))[1:-1] | |
| return list(zip(inter_coords[::2], inter_coords[1::2])) | |
| def _stitch_rows(raw_rows): | |
| """Stitches together the parsed alignment rows and returns them in a list (PRIVATE).""" | |
| # deal with possible codon surprise! | |
| # (i.e. alignments with codons using cdna2genome model) | |
| # by creating additional rows to contain the codons | |
| try: | |
| max_len = max(len(x) for x in raw_rows) | |
| for row in raw_rows: | |
| assert len(row) == max_len | |
| except AssertionError: | |
| for idx, row in enumerate(raw_rows): | |
| if len(row) != max_len: | |
| # codons must be present in the query and hit (so +2) | |
| assert len(row) + 2 == max_len | |
| # add additional empty lines to contain codons | |
| raw_rows[idx] = [" " * len(row[0])] + row + [" " * len(row[0])] | |
| cmbn_rows = [] | |
| for idx, row in enumerate(raw_rows[0]): | |
| cmbn_row = "".join(aln_row[idx] for aln_row in raw_rows) | |
| cmbn_rows.append(cmbn_row) | |
| # the real aligned sequence is always the 'outer' one, so we want | |
| # to flip them with their 'inner' pairs | |
| if len(cmbn_rows) == 5: | |
| # flip query sequence | |
| cmbn_rows[0], cmbn_rows[1] = _flip_codons(cmbn_rows[0], cmbn_rows[1]) | |
| # flip hit sequence | |
| cmbn_rows[4], cmbn_rows[3] = _flip_codons(cmbn_rows[4], cmbn_rows[3]) | |
| return cmbn_rows | |
| def _get_row_dict(row_len, model): | |
| """Return a dictionary of row indices for parsing alignment blocks (PRIVATE).""" | |
| idx = {} | |
| # 3 lines, usually in dna vs dna models | |
| if row_len == 3: | |
| idx["query"] = 0 | |
| idx["midline"] = 1 | |
| idx["hit"] = 2 | |
| idx["qannot"], idx["hannot"] = None, None | |
| # 4 lines, in protein vs dna models or dna vs protein models | |
| # TODO: currently we check this from the model string; is there | |
| # a better way to do it? | |
| elif row_len == 4: | |
| if "protein2" in model: | |
| idx["query"] = 0 | |
| idx["midline"] = 1 | |
| idx["hit"] = 2 | |
| idx["hannot"] = 3 | |
| idx["qannot"] = None | |
| elif "2protein" in model: | |
| idx["query"] = 1 | |
| idx["midline"] = 2 | |
| idx["hit"] = 3 | |
| idx["hannot"] = None | |
| idx["qannot"] = 0 | |
| else: | |
| raise ValueError("Unexpected model: " + model) | |
| # 5 lines, translated dna vs translated dna | |
| elif row_len == 5: | |
| # set sequence indexes | |
| idx["qannot"] = 0 | |
| idx["query"] = 1 | |
| idx["midline"] = 2 | |
| idx["hit"] = 3 | |
| idx["hannot"] = 4 | |
| else: | |
| raise ValueError("Unexpected row count in alignment block: %i" % row_len) | |
| return idx | |
| def _get_blocks(rows, coords, idx): | |
| """Return a list of dictionaries of sequences split by the coordinates (PRIVATE).""" | |
| for idx_name in ("query", "hit", "midline", "qannot", "hannot"): | |
| assert idx_name in idx | |
| blocks = [] | |
| for start, end in coords: | |
| block = {} | |
| # get seqs according to index | |
| block["query"] = rows[idx["query"]][start:end] | |
| block["hit"] = rows[idx["hit"]][start:end] | |
| block["similarity"] = rows[idx["midline"]][start:end] | |
| if idx["qannot"] is not None: | |
| block["query_annotation"] = rows[idx["qannot"]][start:end] | |
| if idx["hannot"] is not None: | |
| block["hit_annotation"] = rows[idx["hannot"]][start:end] | |
| blocks.append(block) | |
| return blocks | |
| def _get_scodon_moves(tmp_seq_blocks): | |
| """Get a dictionary of split codon locations relative to each fragment end (PRIVATE).""" | |
| scodon_moves = {"query": [], "hit": []} | |
| for seq_type in scodon_moves: | |
| scoords = [] | |
| for block in tmp_seq_blocks: | |
| # check both ends of the sequence for residues in curly braces | |
| m_start = re.search(_RE_SCODON_START, block[seq_type]) | |
| m_end = re.search(_RE_SCODON_END, block[seq_type]) | |
| if m_start: | |
| m_start = len(m_start.group(1)) | |
| scoords.append((m_start, 0)) | |
| else: | |
| scoords.append((0, 0)) | |
| if m_end: | |
| m_end = len(m_end.group(1)) | |
| scoords.append((0, m_end)) | |
| else: | |
| scoords.append((0, 0)) | |
| scodon_moves[seq_type] = scoords | |
| return scodon_moves | |
| def _clean_blocks(tmp_seq_blocks): | |
| """Remove curly braces (split codon markers) from the given sequences (PRIVATE).""" | |
| seq_blocks = [] | |
| for seq_block in tmp_seq_blocks: | |
| for line_name in seq_block: | |
| seq_block[line_name] = ( | |
| seq_block[line_name].replace("{", "").replace("}", "") | |
| ) | |
| seq_blocks.append(seq_block) | |
| return seq_blocks | |
| def _comp_intron_lens(seq_type, inter_blocks, raw_inter_lens): | |
| """Return the length of introns between fragments (PRIVATE).""" | |
| # set opposite type, for setting introns | |
| opp_type = "hit" if seq_type == "query" else "query" | |
| # list of flags to denote if an intron follows a block | |
| # it reads e.g. this line: | |
| # "ATGTT{TT} >>>> Target Intron 1 >>>> {G}TGTGTGTACATT" | |
| # and sets the opposing sequence type's intron (since this | |
| # line is present on the opposite sequence type line) | |
| has_intron_after = ["Intron" in x[seq_type] for x in inter_blocks] | |
| assert len(has_intron_after) == len(raw_inter_lens) | |
| # create list containing coord adjustments incorporating | |
| # intron lengths | |
| inter_lens = [] | |
| for flag, parsed_len in zip(has_intron_after, raw_inter_lens): | |
| if flag: | |
| # joint introns | |
| if all(parsed_len[:2]): | |
| # intron len is [0] if opp_type is query, otherwise it's [1] | |
| intron_len = ( | |
| int(parsed_len[0]) if opp_type == "query" else int(parsed_len[1]) | |
| ) | |
| # single hit/query introns | |
| elif parsed_len[2]: | |
| intron_len = int(parsed_len[2]) | |
| else: | |
| raise ValueError("Unexpected intron parsing result: %r" % parsed_len) | |
| else: | |
| intron_len = 0 | |
| inter_lens.append(intron_len) | |
| return inter_lens | |
| def _comp_coords(hsp, seq_type, inter_lens): | |
| """Fill the block coordinates of the given hsp dictionary (PRIVATE).""" | |
| assert seq_type in ("hit", "query") | |
| # manually fill the first coord | |
| seq_step = 1 if hsp["%s_strand" % seq_type] >= 0 else -1 | |
| fstart = hsp["%s_start" % seq_type] | |
| # fend is fstart + number of residues in the sequence, minus gaps | |
| fend = ( | |
| fstart | |
| + len(hsp[seq_type][0].replace("-", "").replace(">", "").replace("<", "")) | |
| * seq_step | |
| ) | |
| coords = [(fstart, fend)] | |
| # and start from the second block, after the first inter seq | |
| for idx, block in enumerate(hsp[seq_type][1:]): | |
| bstart = coords[-1][1] + inter_lens[idx] * seq_step | |
| bend = bstart + seq_step * len(block.replace("-", "")) | |
| coords.append((bstart, bend)) | |
| # adjust the coords so the smallest is [0], if strand is -1 | |
| # couldn't do this in the previous steps since we need the initial | |
| # block ordering | |
| if seq_step != 1: | |
| for idx, coord in enumerate(coords): | |
| coords[idx] = coords[idx][1], coords[idx][0] | |
| return coords | |
| def _comp_split_codons(hsp, seq_type, scodon_moves): | |
| """Compute positions of split codons, store in given HSP dictionary (PRIVATE).""" | |
| scodons = [] | |
| for idx in range(len(scodon_moves[seq_type])): | |
| pair = scodon_moves[seq_type][idx] | |
| if not any(pair): | |
| continue | |
| else: | |
| assert not all(pair) | |
| a, b = pair | |
| anchor_pair = hsp["%s_ranges" % seq_type][idx // 2] | |
| strand = 1 if hsp["%s_strand" % seq_type] >= 0 else -1 | |
| if a: | |
| func = max if strand == 1 else min | |
| anchor = func(anchor_pair) | |
| start_c, end_c = anchor + a * strand * -1, anchor | |
| elif b: | |
| func = min if strand == 1 else max | |
| anchor = func(anchor_pair) | |
| start_c, end_c = anchor + b * strand, anchor | |
| scodons.append((min(start_c, end_c), max(start_c, end_c))) | |
| return scodons | |
| class ExonerateTextParser(_BaseExonerateParser): | |
| """Parser for Exonerate plain text output.""" | |
| _ALN_MARK = "C4 Alignment:" | |
| def parse_alignment_block(self, header): | |
| """Parse alignment block, return query result, hits, hsps.""" | |
| qresult = header["qresult"] | |
| hit = header["hit"] | |
| hsp = header["hsp"] | |
| # check for values that must have been set by previous methods | |
| for val_name in ( | |
| "query_start", | |
| "query_end", | |
| "hit_start", | |
| "hit_end", | |
| "query_strand", | |
| "hit_strand", | |
| ): | |
| assert val_name in hsp, hsp | |
| # get the alignment rows | |
| # and stitch them so we have the full sequences in single strings | |
| raw_aln_blocks, vulgar_comp = self._read_alignment() | |
| # cmbn_rows still has split codon markers (curly braces) | |
| cmbn_rows = _stitch_rows(raw_aln_blocks) | |
| row_dict = _get_row_dict(len(cmbn_rows), qresult["model"]) | |
| # get the sequence blocks | |
| has_ner = "NER" in qresult["model"].upper() | |
| seq_coords = _get_block_coords(cmbn_rows, row_dict, has_ner) | |
| tmp_seq_blocks = _get_blocks(cmbn_rows, seq_coords, row_dict) | |
| # get split codon temp coords for later use | |
| # this result in pairs of base movement for both ends of each row | |
| scodon_moves = _get_scodon_moves(tmp_seq_blocks) | |
| # remove the split codon markers | |
| seq_blocks = _clean_blocks(tmp_seq_blocks) | |
| # adjust strands | |
| hsp["query_strand"] = _STRAND_MAP[hsp["query_strand"]] | |
| hsp["hit_strand"] = _STRAND_MAP[hsp["hit_strand"]] | |
| # cast coords into ints | |
| hsp["query_start"] = int(hsp["query_start"]) | |
| hsp["query_end"] = int(hsp["query_end"]) | |
| hsp["hit_start"] = int(hsp["hit_start"]) | |
| hsp["hit_end"] = int(hsp["hit_end"]) | |
| # cast score into ints | |
| hsp["score"] = int(hsp["score"]) | |
| # set sequences | |
| hsp["query"] = [x["query"] for x in seq_blocks] | |
| hsp["hit"] = [x["hit"] for x in seq_blocks] | |
| hsp["aln_annotation"] = {} | |
| # set the molecule type | |
| # currently only limited to models with protein queries | |
| if ( | |
| "protein2" in qresult["model"] | |
| or "coding2" in qresult["model"] | |
| or "2protein" in qresult["model"] | |
| ): | |
| hsp["molecule_type"] = "protein" | |
| # get the annotations if they exist | |
| for annot_type in ("similarity", "query_annotation", "hit_annotation"): | |
| try: | |
| hsp["aln_annotation"][annot_type] = [x[annot_type] for x in seq_blocks] | |
| except KeyError: | |
| pass | |
| # use vulgar coordinates if vulgar line is present and return | |
| # if vulgar_comp is not None: | |
| # hsp = parse_vulgar_comp(hsp, vulgar_comp) | |
| # return {'qresult': qresult, 'hit': hit, 'hsp': hsp} | |
| # otherwise we need to get the coordinates from the alignment | |
| # get the intervening blocks first, so we can use them | |
| # to adjust the coordinates | |
| if not has_ner: | |
| # get intervening coordinates and blocks, only if model is not ner | |
| # ner models have a much more simple coordinate calculation | |
| inter_coords = _get_inter_coords(seq_coords) | |
| inter_blocks = _get_blocks(cmbn_rows, inter_coords, row_dict) | |
| # returns a three-component tuple of intron lengths | |
| # first two component filled == intron in hit and query | |
| # last component filled == intron in hit or query | |
| raw_inter_lens = re.findall(_RE_EXON_LEN, cmbn_rows[row_dict["midline"]]) | |
| # compute start and end coords for each block | |
| for seq_type in ("query", "hit"): | |
| # ner blocks and intron blocks require different adjustments | |
| if not has_ner: | |
| opp_type = "hit" if seq_type == "query" else "query" | |
| inter_lens = _comp_intron_lens(seq_type, inter_blocks, raw_inter_lens) | |
| else: | |
| # for NER blocks, the length of the inter-fragment gaps is | |
| # written on the same strand, so opp_type is seq_type | |
| opp_type = seq_type | |
| inter_lens = [ | |
| int(x) | |
| for x in re.findall(_RE_NER_LEN, cmbn_rows[row_dict[seq_type]]) | |
| ] | |
| # check that inter_lens's length is len opp_type block - 1 | |
| if len(inter_lens) != len(hsp[opp_type]) - 1: | |
| raise ValueError( | |
| "Length mismatch: %r vs %r" | |
| % (len(inter_lens), len(hsp[opp_type]) - 1) | |
| ) | |
| # fill the hsp query and hit coordinates | |
| hsp["%s_ranges" % opp_type] = _comp_coords(hsp, opp_type, inter_lens) | |
| # and fill the split codon coordinates, if model != ner | |
| # can't do this in the if-else clause above since we need to | |
| # compute the ranges first | |
| if not has_ner: | |
| hsp["%s_split_codons" % opp_type] = _comp_split_codons( | |
| hsp, opp_type, scodon_moves | |
| ) | |
| # now that we've finished parsing coords, we can set the hit and start | |
| # coord according to Biopython's convention (start <= end) | |
| for seq_type in ("query", "hit"): | |
| if hsp["%s_strand" % seq_type] == -1: | |
| n_start = "%s_start" % seq_type | |
| n_end = "%s_end" % seq_type | |
| hsp[n_start], hsp[n_end] = hsp[n_end], hsp[n_start] | |
| return {"qresult": qresult, "hit": hit, "hsp": hsp} | |
| def _read_alignment(self): | |
| """Read the raw alignment block strings, returns them in a list (PRIVATE).""" | |
| raw_aln_blocks = [] | |
| # flag to check whether we're in an alignment row | |
| in_aln_row = False | |
| # flag for vulgar line, if present, we can parse coordinates from it | |
| vulgar_comp = None | |
| while True: | |
| match = re.search(_RE_ALN_ROW, self.line.strip()) | |
| # if we have a match, set flags and values | |
| if match and not in_aln_row: | |
| start_idx = self.line.index(match.group(1)) | |
| row_len = len(match.group(1)) | |
| in_aln_row = True | |
| raw_aln_block = [] | |
| # if we're in an alignment row, grab the sequence | |
| if in_aln_row: | |
| raw_aln_block.append(self.line[start_idx : start_idx + row_len]) | |
| # reset flags and values if the line matches, we're in an alignment | |
| # row, and there are more than 1 line in rows | |
| if match and in_aln_row and len(raw_aln_block) > 1: | |
| raw_aln_blocks.append(raw_aln_block) | |
| start_idx = None | |
| row_len = None | |
| in_aln_row = False | |
| self.line = self.handle.readline() | |
| # try to parse vulgar line if present | |
| if self.line.startswith("vulgar"): | |
| vulgar = re.search(_RE_VULGAR, self.line) | |
| vulgar_comp = vulgar.group(10) | |
| if not self.line or self.line.startswith(self._ALN_MARK): | |
| # HACK: this is so that the parse_qresult method does not | |
| # yield the objects before appending the last HSP. We are doing | |
| # this to keep the parser compatible with outputs without | |
| # human-readable alignment outputs. This also relies on the | |
| # fact that repeated readline() always returns '' on EOF. | |
| if not self.line: | |
| self.line = "mock" | |
| break | |
| return raw_aln_blocks, vulgar_comp | |
| class ExonerateTextIndexer(_BaseExonerateIndexer): | |
| """Indexer class for Exonerate plain text.""" | |
| _parser = ExonerateTextParser | |
| _query_mark = b"C4 Alignment" | |
| def get_qresult_id(self, pos): | |
| """Return the query ID from the nearest "Query:" line.""" | |
| handle = self._handle | |
| handle.seek(pos) | |
| sentinel = b"Query:" | |
| while True: | |
| line = handle.readline().strip() | |
| if line.startswith(sentinel): | |
| break | |
| if not line: | |
| raise StopIteration | |
| qid, desc = _parse_hit_or_query_line(line.decode()) | |
| return qid | |
| def get_raw(self, offset): | |
| """Return the raw string of a QueryResult object from the given offset.""" | |
| handle = self._handle | |
| handle.seek(offset) | |
| qresult_key = None | |
| qresult_raw = b"" | |
| while True: | |
| line = handle.readline() | |
| if not line: | |
| break | |
| elif line.startswith(self._query_mark): | |
| cur_pos = handle.tell() | |
| if qresult_key is None: | |
| qresult_key = self.get_qresult_id(cur_pos) | |
| else: | |
| curr_key = self.get_qresult_id(cur_pos) | |
| if curr_key != qresult_key: | |
| break | |
| handle.seek(cur_pos) | |
| qresult_raw += line | |
| return qresult_raw | |
| # if not used as a module, run the doctest | |
| if __name__ == "__main__": | |
| from Bio._utils import run_doctest | |
| run_doctest() | |