plant-msyn / process_custom_synteny_pair.py
Yoshigold's picture
Update webapp with Scripts files for HF Spaces deployment
f342936 verified
#!/usr/bin/env python3
"""
Process a single pairwise comparison for Custom Synteny (Python Version)
=========================================================================
Runs MCscan LAST + jcvi comparison between two genomes.
Expects BED and PEP files to already exist in the main results directories.
This is a Python port of process_custom_synteny_pair.sh with identical behavior.
Usage:
# Via environment variables (matching bash script interface):
GENOME_A=Arabidopsis GENOME_B=Glycine python process_custom_synteny_pair.py
# Via command-line arguments:
python process_custom_synteny_pair.py --genome-a Arabidopsis --genome-b Glycine
# With custom parameters:
python process_custom_synteny_pair.py -a Arabidopsis -b Glycine --cscore 0.95
Requirements:
- Must run with jcvi310 conda environment activated
- BED files must exist in $MCSCAN_RESULTS_DIR/bed_files/
- PEP files must exist in pep_files/ or Genomes/<species>/
Exit Codes:
0 - Success
1 - Validation error (missing files, bad IDs)
2 - Processing error (JCVI command failed)
"""
import argparse
import os
import shutil
import subprocess
import sys
import tempfile
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple
from pandas_utils import MCscanConfigBase, log_step, log_error
# =============================================================================
# CONFIGURATION
# =============================================================================
@dataclass
class Config(MCscanConfigBase):
"""
MCscan parameters and directory paths for pairwise synteny processing.
Extends MCscanConfigBase with all common paths already configured.
"""
pass # All needed fields inherited from MCscanConfigBase
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def run_cmd(cmd: list, cwd: Optional[Path] = None, check: bool = False) -> Tuple[int, str, str]:
"""
Execute subprocess command and return (returncode, stdout, stderr).
Args:
cmd: Command and arguments as list
cwd: Working directory for command
check: If True, raise on non-zero exit (not used here, we handle manually)
Returns:
Tuple of (return_code, stdout_text, stderr_text)
"""
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True
)
return result.returncode, result.stdout, result.stderr
def find_pep_file(genome: str, cfg: Config) -> Optional[Path]:
"""
Locate PEP file for genome, checking pep_files/ first then Genomes/<genome>/.
This matches the bash script's search order exactly.
Returns:
Path to PEP file if found, None otherwise
"""
# Check custom pep_files directory first
pep_in_results = cfg.pep_dir / f"{genome}.pep"
if pep_in_results.is_file():
return pep_in_results
# Then check Genomes directory
pep_in_genomes = cfg.genomes_dir / genome / f"{genome}.pep"
if pep_in_genomes.is_file():
return pep_in_genomes
return None
def validate_bed_pep_matching(genome: str, bed_file: Path, pep_file: Path) -> Tuple[int, int, int, float]:
"""
Validate that BED gene IDs match PEP protein IDs.
JCVI strips the final '.suffix' from transcript IDs (e.g., AT1G01010.1 -> AT1G01010),
so we must compare stripped IDs.
Args:
genome: Genome key for logging
bed_file: Path to BED file (gene IDs in column 4)
pep_file: Path to PEP FASTA file (IDs in header lines)
Returns:
Tuple of (bed_unique_count, pep_unique_count, matching_count, match_percent)
"""
# Extract unique gene IDs from BED column 4
bed_genes = set()
with open(bed_file) as f:
for line in f:
if line.strip():
parts = line.strip().split('\t')
if len(parts) >= 4:
bed_genes.add(parts[3])
# Extract stripped PEP IDs (remove trailing .suffix like .p, .t, .1, etc.)
pep_ids = set()
with open(pep_file) as f:
for line in f:
if line.startswith('>'):
# Get ID without '>', strip trailing .suffix
raw_id = line[1:].split()[0]
# JCVI strips final .suffix (letter or number suffix)
import re
stripped = re.sub(r'\.[a-zA-Z]*$', '', raw_id)
pep_ids.add(stripped)
# Count matching IDs
matching = bed_genes & pep_ids
bed_count = len(bed_genes)
pep_count = len(pep_ids)
match_count = len(matching)
match_pct = (match_count * 100 / bed_count) if bed_count > 0 else 0.0
return bed_count, pep_count, match_count, match_pct
# =============================================================================
# PROCESSING FUNCTIONS
# =============================================================================
def run_ortholog_detection(
genome_a: str, genome_b: str, cfg: Config, work_dir: Path
) -> Tuple[bool, Path]:
"""
Run jcvi.compara.catalog ortholog to detect orthologs between two genomes.
Produces:
- {genome_a}.{genome_b}.last.filtered (LAST alignment results)
- {genome_a}.{genome_b}.lifted.anchors (chained anchor pairs)
Args:
genome_a: First genome key
genome_b: Second genome key
cfg: Configuration with MCscan parameters
work_dir: Temporary working directory with BED/PEP files
Returns:
Tuple of (success, path_to_last_filtered)
"""
result_prefix = f"{genome_a}.{genome_b}"
# Alphabetical naming for last.filtered (matches bash behavior)
if genome_a < genome_b:
alpha_prefix = f"{genome_a}.{genome_b}"
else:
alpha_prefix = f"{genome_b}.{genome_a}"
last_output = cfg.last_dir / f"{alpha_prefix}.last.filtered"
# Skip if already exists
if last_output.is_file():
log_step(f"LAST output already exists: {last_output}")
shutil.copy(last_output, work_dir / f"{result_prefix}.last.filtered")
return True, last_output
log_step(f"Running ortholog detection (dbtype={cfg.dbtype}, cscore={cfg.cscore})...")
cmd = [
sys.executable, "-m", "jcvi.compara.catalog", "ortholog",
genome_a, genome_b,
f"--dbtype={cfg.dbtype}",
f"--cscore={cfg.cscore}",
f"--dist={cfg.dist}",
f"--cpus={cfg.threads}"
]
rc, stdout, stderr = run_cmd(cmd, cwd=work_dir)
if rc != 0:
log_error("Ortholog detection failed")
log_error("Check that BED gene IDs match PEP protein IDs")
if stderr:
log_error(stderr[:500])
return False, last_output
# Verify lifted anchors were produced
lifted_anchors = work_dir / f"{result_prefix}.lifted.anchors"
if not lifted_anchors.is_file() or lifted_anchors.stat().st_size == 0:
log_error("No lifted anchors found - comparison may have no synteny")
return False, last_output
# Copy last.filtered to output directory with alphabetical naming
last_filtered = work_dir / f"{result_prefix}.last.filtered"
if last_filtered.is_file() and last_filtered.stat().st_size > 0:
cfg.last_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(last_filtered, last_output)
log_step(f"Copied LAST output to: {last_output}")
else:
log_error(f"LAST filtered output not found: {last_filtered}")
return False, last_output
return True, last_output
def filter_anchors(genome_a: str, genome_b: str, work_dir: Path) -> Tuple[bool, int]:
"""
Filter lifted.anchors to keep only gene pairs present in both BED files.
This replicates the inline AWK filter from the bash script exactly:
- Load genes from both BED files into sets
- Keep anchor lines where both gene1 and gene2 exist
- Preserve comment/separator lines (###, #)
Produces:
{genome_a}.{genome_b}.lifted.filtered.anchors
Returns:
Tuple of (success, anchor_count)
"""
result_prefix = f"{genome_a}.{genome_b}"
lifted_in = work_dir / f"{result_prefix}.lifted.anchors"
lifted_out = work_dir / f"{result_prefix}.lifted.filtered.anchors"
if not lifted_in.is_file():
log_error("No lifted anchors file found")
return False, 0
# Load gene sets from BED files
genes_a = set()
with open(work_dir / f"{genome_a}.bed") as f:
for line in f:
parts = line.strip().split('\t')
if len(parts) >= 4:
genes_a.add(parts[3])
genes_b = set()
with open(work_dir / f"{genome_b}.bed") as f:
for line in f:
parts = line.strip().split('\t')
if len(parts) >= 4:
genes_b.add(parts[3])
# Filter anchors: keep lines where gene1 in A and gene2 in B
anchor_count = 0
with open(lifted_in) as fin, open(lifted_out, 'w') as fout:
for line in fin:
stripped = line.strip()
# Preserve separator/comment lines
if stripped.startswith('###') or stripped.startswith('#'):
fout.write(line)
continue
# Parse anchor line: gene1 gene2 score
parts = stripped.split()
if len(parts) >= 2:
gene1, gene2 = parts[0], parts[1]
if gene1 in genes_a and gene2 in genes_b:
fout.write(line)
anchor_count += 1
log_step(f" Filtered anchors: {anchor_count}")
return True, anchor_count
def run_mcscan(
genome_a: str, genome_b: str, cfg: Config, work_dir: Path
) -> Tuple[bool, int, int]:
"""
Run jcvi.compara.synteny mcscan in both directions.
Produces:
- {genome_a}.{genome_b}.i1.blocks (forward: using genome_a.bed as reference)
- {genome_b}.{genome_a}.i1.blocks (reverse: using genome_b.bed as reference)
Both directions are needed for complete synteny coverage.
Returns:
Tuple of (success, forward_line_count, reverse_line_count)
"""
result_prefix = f"{genome_a}.{genome_b}"
reverse_prefix = f"{genome_b}.{genome_a}"
i1_forward = cfg.i1_dir / f"{result_prefix}.i1.blocks"
i1_reverse = cfg.i1_dir / f"{reverse_prefix}.i1.blocks"
anchors_file = work_dir / f"{result_prefix}.lifted.filtered.anchors"
forward_lines = 0
reverse_lines = 0
# Forward direction: genome_a -> genome_b
if not i1_forward.is_file() or i1_forward.stat().st_size == 0:
log_step(f"Running MCscan ({genome_a} -> {genome_b})...")
cmd = [
sys.executable, "-m", "jcvi.compara.synteny", "mcscan",
f"{genome_a}.bed",
str(anchors_file.name),
"--iter=1",
"-o", f"{result_prefix}.i1.blocks"
]
run_cmd(cmd, cwd=work_dir) # Ignore errors, check output file
local_blocks = work_dir / f"{result_prefix}.i1.blocks"
if local_blocks.is_file() and local_blocks.stat().st_size > 0:
cfg.i1_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(local_blocks, i1_forward)
log_step(f" Copied to: {i1_forward}")
else:
log_step(f"Forward synteny blocks already exist: {i1_forward}")
# Reverse direction: genome_b -> genome_a
if not i1_reverse.is_file() or i1_reverse.stat().st_size == 0:
log_step(f"Running MCscan ({genome_b} -> {genome_a})...")
cmd = [
sys.executable, "-m", "jcvi.compara.synteny", "mcscan",
f"{genome_b}.bed",
str(anchors_file.name),
"--iter=1",
"-o", f"{reverse_prefix}.i1.blocks"
]
run_cmd(cmd, cwd=work_dir)
local_blocks = work_dir / f"{reverse_prefix}.i1.blocks"
if local_blocks.is_file() and local_blocks.stat().st_size > 0:
cfg.i1_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(local_blocks, i1_reverse)
log_step(f" Copied to: {i1_reverse}")
else:
log_step(f"Reverse synteny blocks already exist: {i1_reverse}")
# Count output lines
if i1_forward.is_file():
forward_lines = sum(1 for _ in open(i1_forward))
if i1_reverse.is_file():
reverse_lines = sum(1 for _ in open(i1_reverse))
return True, forward_lines, reverse_lines
def copy_lifted_anchors(genome_a: str, genome_b: str, cfg: Config, work_dir: Path) -> None:
"""
Copy filtered lifted anchors to output directory.
Output path: lifted_anchors/{genome_a}.{genome_b}.lifted.anchors
(Note: Output uses .lifted.anchors extension but contains filtered data)
"""
result_prefix = f"{genome_a}.{genome_b}"
local_file = work_dir / f"{result_prefix}.lifted.filtered.anchors"
# Output as .lifted.anchors (standardized name for filtered version)
output_file = cfg.lifted_dir / f"{result_prefix}.lifted.anchors"
if local_file.is_file() and local_file.stat().st_size > 0 and not output_file.is_file():
cfg.lifted_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(local_file, output_file)
log_step(f"Copied lifted anchors to: {output_file}")
# =============================================================================
# CLI PARSING
# =============================================================================
def parse_args() -> argparse.Namespace:
"""
Parse command-line arguments, with fallback to environment variables.
Environment variables (matching bash script):
GENOME_A, GENOME_B, CSCORE, MIN_ANCHOR, GAP_LENGTH
CLI arguments take precedence over environment variables.
"""
parser = argparse.ArgumentParser(
description="Process a single pairwise synteny comparison using MCscan/JCVI.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
# Required arguments (can come from env vars)
parser.add_argument(
"-a", "--genome-a",
default=os.environ.get("GENOME_A"),
help="Key of first genome (or set GENOME_A env var)"
)
parser.add_argument(
"-b", "--genome-b",
default=os.environ.get("GENOME_B"),
help="Key of second genome (or set GENOME_B env var)"
)
# Optional parameters
parser.add_argument(
"--cscore",
default=os.environ.get("CSCORE", "0.99"),
help="C-score filter for ortholog detection (default: 0.99)"
)
parser.add_argument(
"--min-anchor",
type=int,
default=int(os.environ.get("MIN_ANCHOR", "4")),
help="Minimum anchor count (default: 4)"
)
parser.add_argument(
"--gap-length",
type=int,
default=int(os.environ.get("GAP_LENGTH", "20")),
help="Gap length for synteny chaining (default: 20)"
)
parser.add_argument(
"--threads",
type=int,
default=8,
help="CPU threads for LAST alignment (default: 8)"
)
return parser.parse_args()
# =============================================================================
# MAIN
# =============================================================================
def main() -> int:
"""
Main entry point. Returns exit code (0=success, 1=validation, 2=processing).
"""
args = parse_args()
# Validate required arguments
if not args.genome_a:
log_error("GENOME_A is required (use --genome-a or set GENOME_A env var)")
return 1
if not args.genome_b:
log_error("GENOME_B is required (use --genome-b or set GENOME_B env var)")
return 1
genome_a = args.genome_a
genome_b = args.genome_b
# Initialize configuration
cfg = Config(
cscore=args.cscore,
dist=args.gap_length,
min_size=args.min_anchor,
threads=args.threads
)
# Validate BED files exist
bed_a = cfg.bed_dir / f"{genome_a}.bed"
bed_b = cfg.bed_dir / f"{genome_b}.bed"
if not bed_a.is_file():
log_error(f"BED file not found for {genome_a}: {bed_a}")
return 1
if not bed_b.is_file():
log_error(f"BED file not found for {genome_b}: {bed_b}")
return 1
# Find PEP files
pep_a = find_pep_file(genome_a, cfg)
pep_b = find_pep_file(genome_b, cfg)
if not pep_a:
log_error(f"PEP file not found for {genome_a} (checked pep_files/ and Genomes/)")
return 1
if not pep_b:
log_error(f"PEP file not found for {genome_b} (checked pep_files/ and Genomes/)")
return 1
log_step(f"Using PEP files: {pep_a}, {pep_b}")
# Create temporary working directory
work_dir = Path(tempfile.mkdtemp(prefix="csynteny_pair_"))
log_step(f"Working directory: {work_dir}")
try:
# Copy BED and PEP files to work directory
log_step(f"Preparing files for {genome_a} vs {genome_b}...")
shutil.copy(bed_a, work_dir / f"{genome_a}.bed")
shutil.copy(bed_b, work_dir / f"{genome_b}.bed")
shutil.copy(pep_a, work_dir / f"{genome_a}.pep")
shutil.copy(pep_b, work_dir / f"{genome_b}.pep")
# Validate BED-PEP ID matching
log_step("Validating BED-PEP ID matching...")
for genome, bed_file, pep_file in [
(genome_a, work_dir / f"{genome_a}.bed", work_dir / f"{genome_a}.pep"),
(genome_b, work_dir / f"{genome_b}.bed", work_dir / f"{genome_b}.pep"),
]:
bed_count, pep_count, match_count, match_pct = validate_bed_pep_matching(
genome, bed_file, pep_file
)
log_step(f" {genome}: BED={bed_count}, PEP={pep_count}, Matching={match_count} ({match_pct:.1f}%)")
if match_count < 4:
log_error(f"BED-PEP ID mismatch for {genome}: Only {match_count} matching genes found")
# Show example IDs for debugging
with open(bed_file) as f:
bed_examples = [line.split('\t')[3] for line in f if line.strip()][:3]
with open(pep_file) as f:
pep_examples = [line[1:].split()[0] for line in f if line.startswith('>')][:3]
log_error(f"Example BED IDs: {' '.join(bed_examples)}")
log_error(f"Example PEP IDs: {' '.join(pep_examples)}")
return 1
log_step(" ✓ ID validation passed for both genomes")
# Run ortholog detection
success, last_output = run_ortholog_detection(genome_a, genome_b, cfg, work_dir)
if not success:
return 2
# Filter anchors
log_step("Filtering anchors...")
success, anchor_count = filter_anchors(genome_a, genome_b, work_dir)
if not success:
return 2
# Run MCscan in both directions
success, forward_lines, reverse_lines = run_mcscan(genome_a, genome_b, cfg, work_dir)
# Copy lifted anchors to output
copy_lifted_anchors(genome_a, genome_b, cfg, work_dir)
# Final summary
result_prefix = f"{genome_a}.{genome_b}"
reverse_prefix = f"{genome_b}.{genome_a}"
log_step(f"Successfully processed {genome_a} vs {genome_b}")
log_step(f" LAST output: {last_output}")
log_step(f" Forward blocks ({genome_a}->{genome_b}): {forward_lines} lines")
log_step(f" Reverse blocks ({genome_b}->{genome_a}): {reverse_lines} lines")
return 0
finally:
# Cleanup temp directory (matches bash trap cleanup EXIT)
if work_dir.is_dir():
shutil.rmtree(work_dir)
if __name__ == "__main__":
sys.exit(main())