Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Process a single pairwise comparison for Custom Synteny (Python Version) | |
| ========================================================================= | |
| Runs MCscan LAST + jcvi comparison between two genomes. | |
| Expects BED and PEP files to already exist in the main results directories. | |
| This is a Python port of process_custom_synteny_pair.sh with identical behavior. | |
| Usage: | |
| # Via environment variables (matching bash script interface): | |
| GENOME_A=Arabidopsis GENOME_B=Glycine python process_custom_synteny_pair.py | |
| # Via command-line arguments: | |
| python process_custom_synteny_pair.py --genome-a Arabidopsis --genome-b Glycine | |
| # With custom parameters: | |
| python process_custom_synteny_pair.py -a Arabidopsis -b Glycine --cscore 0.95 | |
| Requirements: | |
| - Must run with jcvi310 conda environment activated | |
| - BED files must exist in $MCSCAN_RESULTS_DIR/bed_files/ | |
| - PEP files must exist in pep_files/ or Genomes/<species>/ | |
| Exit Codes: | |
| 0 - Success | |
| 1 - Validation error (missing files, bad IDs) | |
| 2 - Processing error (JCVI command failed) | |
| """ | |
| import argparse | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, Tuple | |
| from pandas_utils import MCscanConfigBase, log_step, log_error | |
| # ============================================================================= | |
| # CONFIGURATION | |
| # ============================================================================= | |
| class Config(MCscanConfigBase): | |
| """ | |
| MCscan parameters and directory paths for pairwise synteny processing. | |
| Extends MCscanConfigBase with all common paths already configured. | |
| """ | |
| pass # All needed fields inherited from MCscanConfigBase | |
| # ============================================================================= | |
| # UTILITY FUNCTIONS | |
| # ============================================================================= | |
| def run_cmd(cmd: list, cwd: Optional[Path] = None, check: bool = False) -> Tuple[int, str, str]: | |
| """ | |
| Execute subprocess command and return (returncode, stdout, stderr). | |
| Args: | |
| cmd: Command and arguments as list | |
| cwd: Working directory for command | |
| check: If True, raise on non-zero exit (not used here, we handle manually) | |
| Returns: | |
| Tuple of (return_code, stdout_text, stderr_text) | |
| """ | |
| result = subprocess.run( | |
| cmd, | |
| cwd=cwd, | |
| capture_output=True, | |
| text=True | |
| ) | |
| return result.returncode, result.stdout, result.stderr | |
| def find_pep_file(genome: str, cfg: Config) -> Optional[Path]: | |
| """ | |
| Locate PEP file for genome, checking pep_files/ first then Genomes/<genome>/. | |
| This matches the bash script's search order exactly. | |
| Returns: | |
| Path to PEP file if found, None otherwise | |
| """ | |
| # Check custom pep_files directory first | |
| pep_in_results = cfg.pep_dir / f"{genome}.pep" | |
| if pep_in_results.is_file(): | |
| return pep_in_results | |
| # Then check Genomes directory | |
| pep_in_genomes = cfg.genomes_dir / genome / f"{genome}.pep" | |
| if pep_in_genomes.is_file(): | |
| return pep_in_genomes | |
| return None | |
| def validate_bed_pep_matching(genome: str, bed_file: Path, pep_file: Path) -> Tuple[int, int, int, float]: | |
| """ | |
| Validate that BED gene IDs match PEP protein IDs. | |
| JCVI strips the final '.suffix' from transcript IDs (e.g., AT1G01010.1 -> AT1G01010), | |
| so we must compare stripped IDs. | |
| Args: | |
| genome: Genome key for logging | |
| bed_file: Path to BED file (gene IDs in column 4) | |
| pep_file: Path to PEP FASTA file (IDs in header lines) | |
| Returns: | |
| Tuple of (bed_unique_count, pep_unique_count, matching_count, match_percent) | |
| """ | |
| # Extract unique gene IDs from BED column 4 | |
| bed_genes = set() | |
| with open(bed_file) as f: | |
| for line in f: | |
| if line.strip(): | |
| parts = line.strip().split('\t') | |
| if len(parts) >= 4: | |
| bed_genes.add(parts[3]) | |
| # Extract stripped PEP IDs (remove trailing .suffix like .p, .t, .1, etc.) | |
| pep_ids = set() | |
| with open(pep_file) as f: | |
| for line in f: | |
| if line.startswith('>'): | |
| # Get ID without '>', strip trailing .suffix | |
| raw_id = line[1:].split()[0] | |
| # JCVI strips final .suffix (letter or number suffix) | |
| import re | |
| stripped = re.sub(r'\.[a-zA-Z]*$', '', raw_id) | |
| pep_ids.add(stripped) | |
| # Count matching IDs | |
| matching = bed_genes & pep_ids | |
| bed_count = len(bed_genes) | |
| pep_count = len(pep_ids) | |
| match_count = len(matching) | |
| match_pct = (match_count * 100 / bed_count) if bed_count > 0 else 0.0 | |
| return bed_count, pep_count, match_count, match_pct | |
| # ============================================================================= | |
| # PROCESSING FUNCTIONS | |
| # ============================================================================= | |
| def run_ortholog_detection( | |
| genome_a: str, genome_b: str, cfg: Config, work_dir: Path | |
| ) -> Tuple[bool, Path]: | |
| """ | |
| Run jcvi.compara.catalog ortholog to detect orthologs between two genomes. | |
| Produces: | |
| - {genome_a}.{genome_b}.last.filtered (LAST alignment results) | |
| - {genome_a}.{genome_b}.lifted.anchors (chained anchor pairs) | |
| Args: | |
| genome_a: First genome key | |
| genome_b: Second genome key | |
| cfg: Configuration with MCscan parameters | |
| work_dir: Temporary working directory with BED/PEP files | |
| Returns: | |
| Tuple of (success, path_to_last_filtered) | |
| """ | |
| result_prefix = f"{genome_a}.{genome_b}" | |
| # Alphabetical naming for last.filtered (matches bash behavior) | |
| if genome_a < genome_b: | |
| alpha_prefix = f"{genome_a}.{genome_b}" | |
| else: | |
| alpha_prefix = f"{genome_b}.{genome_a}" | |
| last_output = cfg.last_dir / f"{alpha_prefix}.last.filtered" | |
| # Skip if already exists | |
| if last_output.is_file(): | |
| log_step(f"LAST output already exists: {last_output}") | |
| shutil.copy(last_output, work_dir / f"{result_prefix}.last.filtered") | |
| return True, last_output | |
| log_step(f"Running ortholog detection (dbtype={cfg.dbtype}, cscore={cfg.cscore})...") | |
| cmd = [ | |
| sys.executable, "-m", "jcvi.compara.catalog", "ortholog", | |
| genome_a, genome_b, | |
| f"--dbtype={cfg.dbtype}", | |
| f"--cscore={cfg.cscore}", | |
| f"--dist={cfg.dist}", | |
| f"--cpus={cfg.threads}" | |
| ] | |
| rc, stdout, stderr = run_cmd(cmd, cwd=work_dir) | |
| if rc != 0: | |
| log_error("Ortholog detection failed") | |
| log_error("Check that BED gene IDs match PEP protein IDs") | |
| if stderr: | |
| log_error(stderr[:500]) | |
| return False, last_output | |
| # Verify lifted anchors were produced | |
| lifted_anchors = work_dir / f"{result_prefix}.lifted.anchors" | |
| if not lifted_anchors.is_file() or lifted_anchors.stat().st_size == 0: | |
| log_error("No lifted anchors found - comparison may have no synteny") | |
| return False, last_output | |
| # Copy last.filtered to output directory with alphabetical naming | |
| last_filtered = work_dir / f"{result_prefix}.last.filtered" | |
| if last_filtered.is_file() and last_filtered.stat().st_size > 0: | |
| cfg.last_dir.mkdir(parents=True, exist_ok=True) | |
| shutil.copy(last_filtered, last_output) | |
| log_step(f"Copied LAST output to: {last_output}") | |
| else: | |
| log_error(f"LAST filtered output not found: {last_filtered}") | |
| return False, last_output | |
| return True, last_output | |
| def filter_anchors(genome_a: str, genome_b: str, work_dir: Path) -> Tuple[bool, int]: | |
| """ | |
| Filter lifted.anchors to keep only gene pairs present in both BED files. | |
| This replicates the inline AWK filter from the bash script exactly: | |
| - Load genes from both BED files into sets | |
| - Keep anchor lines where both gene1 and gene2 exist | |
| - Preserve comment/separator lines (###, #) | |
| Produces: | |
| {genome_a}.{genome_b}.lifted.filtered.anchors | |
| Returns: | |
| Tuple of (success, anchor_count) | |
| """ | |
| result_prefix = f"{genome_a}.{genome_b}" | |
| lifted_in = work_dir / f"{result_prefix}.lifted.anchors" | |
| lifted_out = work_dir / f"{result_prefix}.lifted.filtered.anchors" | |
| if not lifted_in.is_file(): | |
| log_error("No lifted anchors file found") | |
| return False, 0 | |
| # Load gene sets from BED files | |
| genes_a = set() | |
| with open(work_dir / f"{genome_a}.bed") as f: | |
| for line in f: | |
| parts = line.strip().split('\t') | |
| if len(parts) >= 4: | |
| genes_a.add(parts[3]) | |
| genes_b = set() | |
| with open(work_dir / f"{genome_b}.bed") as f: | |
| for line in f: | |
| parts = line.strip().split('\t') | |
| if len(parts) >= 4: | |
| genes_b.add(parts[3]) | |
| # Filter anchors: keep lines where gene1 in A and gene2 in B | |
| anchor_count = 0 | |
| with open(lifted_in) as fin, open(lifted_out, 'w') as fout: | |
| for line in fin: | |
| stripped = line.strip() | |
| # Preserve separator/comment lines | |
| if stripped.startswith('###') or stripped.startswith('#'): | |
| fout.write(line) | |
| continue | |
| # Parse anchor line: gene1 gene2 score | |
| parts = stripped.split() | |
| if len(parts) >= 2: | |
| gene1, gene2 = parts[0], parts[1] | |
| if gene1 in genes_a and gene2 in genes_b: | |
| fout.write(line) | |
| anchor_count += 1 | |
| log_step(f" Filtered anchors: {anchor_count}") | |
| return True, anchor_count | |
| def run_mcscan( | |
| genome_a: str, genome_b: str, cfg: Config, work_dir: Path | |
| ) -> Tuple[bool, int, int]: | |
| """ | |
| Run jcvi.compara.synteny mcscan in both directions. | |
| Produces: | |
| - {genome_a}.{genome_b}.i1.blocks (forward: using genome_a.bed as reference) | |
| - {genome_b}.{genome_a}.i1.blocks (reverse: using genome_b.bed as reference) | |
| Both directions are needed for complete synteny coverage. | |
| Returns: | |
| Tuple of (success, forward_line_count, reverse_line_count) | |
| """ | |
| result_prefix = f"{genome_a}.{genome_b}" | |
| reverse_prefix = f"{genome_b}.{genome_a}" | |
| i1_forward = cfg.i1_dir / f"{result_prefix}.i1.blocks" | |
| i1_reverse = cfg.i1_dir / f"{reverse_prefix}.i1.blocks" | |
| anchors_file = work_dir / f"{result_prefix}.lifted.filtered.anchors" | |
| forward_lines = 0 | |
| reverse_lines = 0 | |
| # Forward direction: genome_a -> genome_b | |
| if not i1_forward.is_file() or i1_forward.stat().st_size == 0: | |
| log_step(f"Running MCscan ({genome_a} -> {genome_b})...") | |
| cmd = [ | |
| sys.executable, "-m", "jcvi.compara.synteny", "mcscan", | |
| f"{genome_a}.bed", | |
| str(anchors_file.name), | |
| "--iter=1", | |
| "-o", f"{result_prefix}.i1.blocks" | |
| ] | |
| run_cmd(cmd, cwd=work_dir) # Ignore errors, check output file | |
| local_blocks = work_dir / f"{result_prefix}.i1.blocks" | |
| if local_blocks.is_file() and local_blocks.stat().st_size > 0: | |
| cfg.i1_dir.mkdir(parents=True, exist_ok=True) | |
| shutil.copy(local_blocks, i1_forward) | |
| log_step(f" Copied to: {i1_forward}") | |
| else: | |
| log_step(f"Forward synteny blocks already exist: {i1_forward}") | |
| # Reverse direction: genome_b -> genome_a | |
| if not i1_reverse.is_file() or i1_reverse.stat().st_size == 0: | |
| log_step(f"Running MCscan ({genome_b} -> {genome_a})...") | |
| cmd = [ | |
| sys.executable, "-m", "jcvi.compara.synteny", "mcscan", | |
| f"{genome_b}.bed", | |
| str(anchors_file.name), | |
| "--iter=1", | |
| "-o", f"{reverse_prefix}.i1.blocks" | |
| ] | |
| run_cmd(cmd, cwd=work_dir) | |
| local_blocks = work_dir / f"{reverse_prefix}.i1.blocks" | |
| if local_blocks.is_file() and local_blocks.stat().st_size > 0: | |
| cfg.i1_dir.mkdir(parents=True, exist_ok=True) | |
| shutil.copy(local_blocks, i1_reverse) | |
| log_step(f" Copied to: {i1_reverse}") | |
| else: | |
| log_step(f"Reverse synteny blocks already exist: {i1_reverse}") | |
| # Count output lines | |
| if i1_forward.is_file(): | |
| forward_lines = sum(1 for _ in open(i1_forward)) | |
| if i1_reverse.is_file(): | |
| reverse_lines = sum(1 for _ in open(i1_reverse)) | |
| return True, forward_lines, reverse_lines | |
| def copy_lifted_anchors(genome_a: str, genome_b: str, cfg: Config, work_dir: Path) -> None: | |
| """ | |
| Copy filtered lifted anchors to output directory. | |
| Output path: lifted_anchors/{genome_a}.{genome_b}.lifted.anchors | |
| (Note: Output uses .lifted.anchors extension but contains filtered data) | |
| """ | |
| result_prefix = f"{genome_a}.{genome_b}" | |
| local_file = work_dir / f"{result_prefix}.lifted.filtered.anchors" | |
| # Output as .lifted.anchors (standardized name for filtered version) | |
| output_file = cfg.lifted_dir / f"{result_prefix}.lifted.anchors" | |
| if local_file.is_file() and local_file.stat().st_size > 0 and not output_file.is_file(): | |
| cfg.lifted_dir.mkdir(parents=True, exist_ok=True) | |
| shutil.copy(local_file, output_file) | |
| log_step(f"Copied lifted anchors to: {output_file}") | |
| # ============================================================================= | |
| # CLI PARSING | |
| # ============================================================================= | |
| def parse_args() -> argparse.Namespace: | |
| """ | |
| Parse command-line arguments, with fallback to environment variables. | |
| Environment variables (matching bash script): | |
| GENOME_A, GENOME_B, CSCORE, MIN_ANCHOR, GAP_LENGTH | |
| CLI arguments take precedence over environment variables. | |
| """ | |
| parser = argparse.ArgumentParser( | |
| description="Process a single pairwise synteny comparison using MCscan/JCVI.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=__doc__ | |
| ) | |
| # Required arguments (can come from env vars) | |
| parser.add_argument( | |
| "-a", "--genome-a", | |
| default=os.environ.get("GENOME_A"), | |
| help="Key of first genome (or set GENOME_A env var)" | |
| ) | |
| parser.add_argument( | |
| "-b", "--genome-b", | |
| default=os.environ.get("GENOME_B"), | |
| help="Key of second genome (or set GENOME_B env var)" | |
| ) | |
| # Optional parameters | |
| parser.add_argument( | |
| "--cscore", | |
| default=os.environ.get("CSCORE", "0.99"), | |
| help="C-score filter for ortholog detection (default: 0.99)" | |
| ) | |
| parser.add_argument( | |
| "--min-anchor", | |
| type=int, | |
| default=int(os.environ.get("MIN_ANCHOR", "4")), | |
| help="Minimum anchor count (default: 4)" | |
| ) | |
| parser.add_argument( | |
| "--gap-length", | |
| type=int, | |
| default=int(os.environ.get("GAP_LENGTH", "20")), | |
| help="Gap length for synteny chaining (default: 20)" | |
| ) | |
| parser.add_argument( | |
| "--threads", | |
| type=int, | |
| default=8, | |
| help="CPU threads for LAST alignment (default: 8)" | |
| ) | |
| return parser.parse_args() | |
| # ============================================================================= | |
| # MAIN | |
| # ============================================================================= | |
| def main() -> int: | |
| """ | |
| Main entry point. Returns exit code (0=success, 1=validation, 2=processing). | |
| """ | |
| args = parse_args() | |
| # Validate required arguments | |
| if not args.genome_a: | |
| log_error("GENOME_A is required (use --genome-a or set GENOME_A env var)") | |
| return 1 | |
| if not args.genome_b: | |
| log_error("GENOME_B is required (use --genome-b or set GENOME_B env var)") | |
| return 1 | |
| genome_a = args.genome_a | |
| genome_b = args.genome_b | |
| # Initialize configuration | |
| cfg = Config( | |
| cscore=args.cscore, | |
| dist=args.gap_length, | |
| min_size=args.min_anchor, | |
| threads=args.threads | |
| ) | |
| # Validate BED files exist | |
| bed_a = cfg.bed_dir / f"{genome_a}.bed" | |
| bed_b = cfg.bed_dir / f"{genome_b}.bed" | |
| if not bed_a.is_file(): | |
| log_error(f"BED file not found for {genome_a}: {bed_a}") | |
| return 1 | |
| if not bed_b.is_file(): | |
| log_error(f"BED file not found for {genome_b}: {bed_b}") | |
| return 1 | |
| # Find PEP files | |
| pep_a = find_pep_file(genome_a, cfg) | |
| pep_b = find_pep_file(genome_b, cfg) | |
| if not pep_a: | |
| log_error(f"PEP file not found for {genome_a} (checked pep_files/ and Genomes/)") | |
| return 1 | |
| if not pep_b: | |
| log_error(f"PEP file not found for {genome_b} (checked pep_files/ and Genomes/)") | |
| return 1 | |
| log_step(f"Using PEP files: {pep_a}, {pep_b}") | |
| # Create temporary working directory | |
| work_dir = Path(tempfile.mkdtemp(prefix="csynteny_pair_")) | |
| log_step(f"Working directory: {work_dir}") | |
| try: | |
| # Copy BED and PEP files to work directory | |
| log_step(f"Preparing files for {genome_a} vs {genome_b}...") | |
| shutil.copy(bed_a, work_dir / f"{genome_a}.bed") | |
| shutil.copy(bed_b, work_dir / f"{genome_b}.bed") | |
| shutil.copy(pep_a, work_dir / f"{genome_a}.pep") | |
| shutil.copy(pep_b, work_dir / f"{genome_b}.pep") | |
| # Validate BED-PEP ID matching | |
| log_step("Validating BED-PEP ID matching...") | |
| for genome, bed_file, pep_file in [ | |
| (genome_a, work_dir / f"{genome_a}.bed", work_dir / f"{genome_a}.pep"), | |
| (genome_b, work_dir / f"{genome_b}.bed", work_dir / f"{genome_b}.pep"), | |
| ]: | |
| bed_count, pep_count, match_count, match_pct = validate_bed_pep_matching( | |
| genome, bed_file, pep_file | |
| ) | |
| log_step(f" {genome}: BED={bed_count}, PEP={pep_count}, Matching={match_count} ({match_pct:.1f}%)") | |
| if match_count < 4: | |
| log_error(f"BED-PEP ID mismatch for {genome}: Only {match_count} matching genes found") | |
| # Show example IDs for debugging | |
| with open(bed_file) as f: | |
| bed_examples = [line.split('\t')[3] for line in f if line.strip()][:3] | |
| with open(pep_file) as f: | |
| pep_examples = [line[1:].split()[0] for line in f if line.startswith('>')][:3] | |
| log_error(f"Example BED IDs: {' '.join(bed_examples)}") | |
| log_error(f"Example PEP IDs: {' '.join(pep_examples)}") | |
| return 1 | |
| log_step(" ✓ ID validation passed for both genomes") | |
| # Run ortholog detection | |
| success, last_output = run_ortholog_detection(genome_a, genome_b, cfg, work_dir) | |
| if not success: | |
| return 2 | |
| # Filter anchors | |
| log_step("Filtering anchors...") | |
| success, anchor_count = filter_anchors(genome_a, genome_b, work_dir) | |
| if not success: | |
| return 2 | |
| # Run MCscan in both directions | |
| success, forward_lines, reverse_lines = run_mcscan(genome_a, genome_b, cfg, work_dir) | |
| # Copy lifted anchors to output | |
| copy_lifted_anchors(genome_a, genome_b, cfg, work_dir) | |
| # Final summary | |
| result_prefix = f"{genome_a}.{genome_b}" | |
| reverse_prefix = f"{genome_b}.{genome_a}" | |
| log_step(f"Successfully processed {genome_a} vs {genome_b}") | |
| log_step(f" LAST output: {last_output}") | |
| log_step(f" Forward blocks ({genome_a}->{genome_b}): {forward_lines} lines") | |
| log_step(f" Reverse blocks ({genome_b}->{genome_a}): {reverse_lines} lines") | |
| return 0 | |
| finally: | |
| # Cleanup temp directory (matches bash trap cleanup EXIT) | |
| if work_dir.is_dir(): | |
| shutil.rmtree(work_dir) | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |