Ashkan Taghipour (The University of Western Australia)
Initial deploy: Pigeon Pea Pangenome Atlas
16e4ad5 | """Data parsing and validation for the Pigeon Pea Pangenome Atlas.""" | |
| import re | |
| import pandas as pd | |
| import numpy as np | |
| from pathlib import Path | |
| from collections import Counter | |
| from src.utils import logger, timer | |
| def load_pav(path: str) -> pd.DataFrame: | |
| """ | |
| Load 89_line_PAV.txt. | |
| Returns DataFrame: index=gene_id (str), columns=line_ids (str), values=int {0,1}. | |
| """ | |
| df = pd.read_csv(path, sep="\t", index_col=0) | |
| df.index.name = "gene" | |
| df.index = df.index.astype(str) | |
| df.columns = df.columns.astype(str) | |
| # Validate all values are 0 or 1 | |
| unique_vals = set(df.values.flatten()) | |
| assert unique_vals.issubset({0, 1}), f"PAV contains values other than 0/1: {unique_vals - {0, 1}}" | |
| logger.info(f"PAV matrix loaded: {df.shape[0]} genes x {df.shape[1]} lines") | |
| return df | |
| def parse_gff_genes(path: str) -> pd.DataFrame: | |
| """ | |
| Parse GFF3; keep only feature == 'gene' rows. | |
| Returns DataFrame: gene_id, contig_id, start, end, strand. | |
| """ | |
| records = [] | |
| with open(path, "r") as f: | |
| for line in f: | |
| if line.startswith("#"): | |
| continue | |
| parts = line.strip().split("\t") | |
| if len(parts) < 9: | |
| continue | |
| if parts[2] != "gene": | |
| continue | |
| contig_id = parts[0] | |
| start = int(parts[3]) | |
| end = int(parts[4]) | |
| strand = parts[6] | |
| attrs = parts[8] | |
| # Extract gene_id from attributes: ID=<value> | |
| gene_id = None | |
| for attr in attrs.split(";"): | |
| attr = attr.strip() | |
| if attr.startswith("ID="): | |
| gene_id = attr[3:] | |
| break | |
| if gene_id: | |
| records.append({ | |
| "gene_id": gene_id, | |
| "contig_id": contig_id, | |
| "start": start, | |
| "end": end, | |
| "strand": strand, | |
| }) | |
| df = pd.DataFrame(records) | |
| logger.info(f"GFF parsed: {len(df)} genes on {df['contig_id'].nunique()} contigs") | |
| return df | |
| def parse_protein_fasta(path: str) -> pd.DataFrame: | |
| """ | |
| Returns DataFrame: gene_id, protein_length, aa_composition (dict as string). | |
| gene_id = header token after '>' up to first whitespace. | |
| """ | |
| records = [] | |
| current_id = None | |
| current_seq = [] | |
| def flush(): | |
| if current_id and current_seq: | |
| seq = "".join(current_seq).replace("*", "") | |
| length = len(seq) | |
| counts = Counter(seq) | |
| total = max(length, 1) | |
| top_aas = sorted(counts.items(), key=lambda x: -x[1])[:5] | |
| comp_str = ", ".join(f"{aa}:{count/total*100:.1f}%" for aa, count in top_aas) | |
| records.append({ | |
| "gene_id": current_id, | |
| "protein_length": length, | |
| "composition_summary": comp_str, | |
| }) | |
| with open(path, "r") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line.startswith(">"): | |
| flush() | |
| current_id = line[1:].split()[0] | |
| current_seq = [] | |
| else: | |
| current_seq.append(line) | |
| flush() | |
| df = pd.DataFrame(records) | |
| logger.info(f"Protein FASTA parsed: {len(df)} proteins") | |
| return df | |
| def build_contig_index(path: str) -> dict: | |
| """ | |
| Returns dict: {contig_id: length}. | |
| Sequential scan of FASTA headers and sequences. | |
| """ | |
| contig_index = {} | |
| current_contig = None | |
| current_len = 0 | |
| with open(path, "r") as f: | |
| for line in f: | |
| if line.startswith(">"): | |
| if current_contig is not None: | |
| contig_index[current_contig] = current_len | |
| current_contig = line[1:].strip().split()[0] | |
| current_len = 0 | |
| else: | |
| current_len += len(line.strip()) | |
| if current_contig is not None: | |
| contig_index[current_contig] = current_len | |
| logger.info(f"Contig index built: {len(contig_index)} contigs") | |
| return contig_index | |
| def build_contig_name_mapping(gff_genes: pd.DataFrame, contig_index: dict) -> dict: | |
| """ | |
| Build mapping from GFF contig IDs to FASTA contig IDs. | |
| Strategy: exact match first, then substring match on accession tokens. | |
| Returns dict: {gff_contig_id: fasta_contig_id} | |
| """ | |
| gff_contigs = set(gff_genes["contig_id"].unique()) | |
| fasta_contigs = set(contig_index.keys()) | |
| mapping = {} | |
| # Exact match | |
| for gc in gff_contigs: | |
| if gc in fasta_contigs: | |
| mapping[gc] = gc | |
| # For unmatched, try accession-based matching | |
| unmatched = gff_contigs - set(mapping.keys()) | |
| if unmatched: | |
| # Extract accession-like tokens from FASTA headers (e.g. NC_033813.1) | |
| fasta_accession_map = {} | |
| for fc in fasta_contigs: | |
| # Try to extract RefSeq accession | |
| match = re.search(r'(N[CWZ]_\d+\.\d+)', fc) | |
| if match: | |
| fasta_accession_map[match.group(1)] = fc | |
| for gc in unmatched: | |
| match = re.search(r'(N[CWZ]_\d+\.\d+)', gc) | |
| if match and match.group(1) in fasta_accession_map: | |
| mapping[gc] = fasta_accession_map[match.group(1)] | |
| logger.info(f"Contig mapping: {len(mapping)}/{len(gff_contigs)} GFF contigs matched to FASTA") | |
| return mapping | |
| def validate_joins(pav: pd.DataFrame, gff_genes: pd.DataFrame, | |
| protein_index: pd.DataFrame, contig_index: dict) -> dict: | |
| """ | |
| Returns validation report with coverage percentages and orphan genes. | |
| """ | |
| pav_genes = set(pav.index) | |
| gff_gene_set = set(gff_genes["gene_id"]) | |
| protein_gene_set = set(protein_index["gene_id"]) | |
| contig_set = set(contig_index.keys()) | |
| gff_contig_set = set(gff_genes["contig_id"]) | |
| pav_in_gff = pav_genes & gff_gene_set | |
| pav_in_protein = pav_genes & protein_gene_set | |
| gff_contigs_in_fasta = gff_contig_set & contig_set | |
| orphans = pav_genes - (gff_gene_set | protein_gene_set) | |
| report = { | |
| "pav_gene_count": len(pav_genes), | |
| "gff_gene_count": len(gff_gene_set), | |
| "protein_gene_count": len(protein_gene_set), | |
| "pav_genes_in_gff_pct": len(pav_in_gff) / max(len(pav_genes), 1) * 100, | |
| "pav_genes_in_protein_pct": len(pav_in_protein) / max(len(pav_genes), 1) * 100, | |
| "gff_contigs_in_fasta_pct": len(gff_contigs_in_fasta) / max(len(gff_contig_set), 1) * 100, | |
| "orphan_genes_count": len(orphans), | |
| } | |
| if orphans: | |
| logger.warning(f"{len(orphans)} orphan genes (in PAV but missing from both GFF and protein)") | |
| for key, val in report.items(): | |
| logger.info(f" {key}: {val}") | |
| return report | |