plant-msyn / plot_user_genes_microsynteny_v2.py
Yoshigold's picture
Update webapp with Scripts files for HF Spaces deployment
f342936 verified
#!/usr/bin/env python3
"""
Corrected User Genes Microsynteny Plot Generator (v2) - Python Implementation
This script generates microsynteny plots for user-specified genes across multiple genomes
using JCVI's plotting capabilities.
Key Features:
1. Uses directional .i1.blocks files (A>B only - query genome must be first column)
2. Determines high-confidence by checking if gene pair exists in last.filtered
3. Supports asymmetric padding and gene limiting per genome
4. Generates PNG/SVG plots with colored syntenic connections
Usage:
python plot_user_genes_microsynteny_v2.py --query <genome> --genes <gene1> [gene2] ... \\
--comparisons <genome1> [genome2] ...
Example:
python plot_user_genes_microsynteny_v2.py --query hordeum_vulgare \\
--genes HORVU.MOREX.r3.1HG0089770 HORVU.MOREX.r3.1HG0089780 \\
--comparisons oryza_sativa triticum_aestivum
Environment:
Requires JCVI (jcvi310 or jcvi312 conda environment) with matplotlib, numpy, PIL.
Assumes the correct Python environment is already activated.
Output:
Creates a timestamped directory in ../Microsynteny_plots/ containing:
- microsynteny_plot.png/svg : Final plot images
- gene_colors.txt : Color/confidence assignments for user genes
- gene_summary.csv : Detailed alignment scores for all matches
- Various intermediate .bed and .blocks files for debugging
Author: Converted from plot_user_genes_microsynteny_v2.sh
"""
import argparse
import os
import re
import subprocess
import sys
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
import pandas as pd
from pandas_utils import (
load_bed_file, load_blocks_file, load_last_filtered,
find_last_filtered_file, check_pair_in_last, filter_bed_by_region,
save_bed
)
from genome_config import SHORT_DISPLAY_NAMES as DISPLAY_NAMES, LATIN_DISPLAY_NAMES
# Import centralized logging and error messages
from logger import get_logger, log_session_start, log_session_end
from error_messages import ErrorMessages, WarningMessages, format_error, format_warning
# Module logger
logger = get_logger(__name__)
# =============================================================================
# CONSTANTS: Color Palette
# =============================================================================
# Default color palette for user genes (cycles if more genes than colors)
COLOR_PALETTE = ["red", "blue", "green", "orange", "purple", "cyan", "magenta", "brown", "pink", "olive"]
# =============================================================================
# ARGUMENT PARSING
# =============================================================================
def parse_args():
"""
Parse command-line arguments.
All arguments match the original bash script exactly:
- Required: --query, --genes, --comparisons
- Optional: --padding, --colors, --annotations, --layout, --genome-order, etc.
"""
p = argparse.ArgumentParser(
description="Generate microsynteny plots for user-specified genes across multiple genomes.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Available genomes:
arabidopsis_thaliana, brachypodium_distachyon, glycine_max, hordeum_vulgare,
lolium_perenne, oryza_sativa, populus_trichocarpa, solanum_lycopersicum,
sorghum_bicolor, triticum_aestivum, vitis_vinifera, and more...
Example:
%(prog)s --query hordeum_vulgare \\
--genes HORVU.MOREX.r3.1HG0089770 HORVU.MOREX.r3.1HG0089780 \\
--comparisons oryza_sativa triticum_aestivum
""")
# --- Required arguments ---
p.add_argument("--query", required=True, help="Query genome name (e.g., hordeum_vulgare)")
p.add_argument("--genes", nargs="+", required=True, help="One or more gene IDs from the query genome")
p.add_argument("--comparisons", nargs="+", required=True, help="One or more comparison genome names")
# --- Basic options ---
p.add_argument("--padding", type=int, default=500000,
help="Padding around target region in bp (default: 500000)")
p.add_argument("--colors", default="",
help="Comma-separated colors for each gene (optional)")
p.add_argument("--annotations", default="",
help="User annotations for legend ('|||' separated)")
p.add_argument("-f", "--force", action="store_true",
help="Force continue even if some checks fail")
# --- Layout options ---
p.add_argument("--layout", default="",
help="Comma-separated layout counts per row (e.g., '2,3,1')")
p.add_argument("--genome-order", default="",
help="Comma-separated ordered list of all genomes")
# --- Query genome tweaking ---
p.add_argument("--query-padding-left", type=int, default=None,
help="Left padding for query genome (overrides --padding)")
p.add_argument("--query-padding-right", type=int, default=None,
help="Right padding for query genome (overrides --padding)")
p.add_argument("--query-max-genes-left", type=int, default=None,
help="Max genes on left side of user genes")
p.add_argument("--query-max-genes-right", type=int, default=None,
help="Max genes on right side of user genes")
# --- Per-comparison tweaking ---
p.add_argument("--comp-padding-config", default="",
help="Per-comparison padding: 'genome:left:right|genome2:left:right'")
p.add_argument("--comp-max-genes-config", default="",
help="Per-comparison gene limits: 'genome:left:right|genome2:left:right'")
p.add_argument("--display-names", default="",
help="Custom display names: 'genome:CustomName|genome2:Name2'")
# --- Gene labeling ---
p.add_argument("--genelabels", default="",
help="Comma-separated genes to label on plot")
p.add_argument("--genelabelsize", type=int, default=0,
help="Font size for gene labels (0 = disabled)")
# --- Confidence options ---
p.add_argument("--keep-lowconf-color", action="store_true",
help="Treat all syntenic matches as high-confidence (keep colors)")
# --- MCscan parameters for CSV Method row ---
p.add_argument("--mcscan-params", default="",
help="JSON string with MCscan params for Method row in CSV")
return p.parse_args()
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def get_mcscan_results_dir(script_dir: Path) -> Path:
"""Return the MCscan results directory path.
Checks for data in this order:
1. PLANTMSYN_DATA_DIR environment variable + /mcscan_results
2. ../data/mcscan_results (HuggingFace Spaces structure)
3. ../Mcscan_results/protein_pairwise (legacy local structure)
"""
# Check environment variable first
data_dir = os.environ.get('PLANTMSYN_DATA_DIR')
if data_dir:
mcscan_path = Path(data_dir) / "mcscan_results"
if mcscan_path.is_dir():
return mcscan_path
# Try HuggingFace Spaces structure: ../data/mcscan_results
hf_path = script_dir.parent / "data" / "mcscan_results"
if hf_path.is_dir():
return hf_path
# Fallback to legacy structure: ../Mcscan_results/protein_pairwise
legacy_path = script_dir.parent / "Mcscan_results" / "protein_pairwise"
if legacy_path.is_dir():
return legacy_path
# Return HF path as default (will fail with clear error if not found)
return hf_path
def get_display_name(genome: str, custom_names: dict) -> str:
"""
Get human-readable display name for a genome.
Priority order:
1. User-provided custom names (--display-names)
2. Environment variable CUSTOM_DISPLAY_NAME (for custom genomes)
3. Built-in DISPLAY_NAMES mapping
4. Genome identifier as-is
"""
if genome in custom_names:
return custom_names[genome]
env_name = os.environ.get("CUSTOM_DISPLAY_NAME", "")
if env_name and genome == os.environ.get("CUSTOM_GENOME_KEY", ""):
return env_name
return DISPLAY_NAMES.get(genome, genome)
def get_plot_label_name(genome: str, custom_names: dict) -> str:
"""
Get Latin/scientific name for genome labels on plots.
Priority order:
1. User-provided custom names (--display-names)
2. Environment variable CUSTOM_DISPLAY_NAME (for custom genomes)
3. Built-in LATIN_DISPLAY_NAMES mapping
4. Auto-generated Latin name from genome_id (genus_species -> Genus species)
"""
if genome in custom_names:
return custom_names[genome]
env_name = os.environ.get("CUSTOM_DISPLAY_NAME", "")
if env_name and genome == os.environ.get("CUSTOM_GENOME_KEY", ""):
return env_name
if genome in LATIN_DISPLAY_NAMES:
return LATIN_DISPLAY_NAMES[genome]
# Auto-generate Latin name: genus_species -> Genus species
parts = genome.split('_')
if len(parts) >= 2:
return parts[0].capitalize() + ' ' + '_'.join(parts[1:])
return genome.replace('_', ' ').title()
def get_blocks_file(query: str, comp: str, blocks_dir: Path) -> Path:
"""
Get the correct directional blocks file (query genome in column 1).
JCVI blocks files are directional: A.B.i1.blocks means gene pairs where
A's genes are in column 1 and B's genes are in column 2. We need the file
where query genome genes are in column 1.
"""
path = blocks_dir / f"{query}.{comp}.i1.blocks"
return path if path.is_file() else None
def is_high_confidence(query_gene: str, comp_gene: str, query: str, comp: str,
mcscan_dir: Path, last_cache: dict = None) -> bool:
"""
Check if a gene pair is high-confidence by looking in last.filtered files.
Uses caching for efficiency when processing multiple genes.
"""
cache_key = (query, comp)
# Load from cache or file
if last_cache is not None:
if cache_key not in last_cache:
last_file = find_last_filtered_file(mcscan_dir, query, comp)
last_cache[cache_key] = load_last_filtered(last_file) if last_file else pd.DataFrame()
last_df = last_cache[cache_key]
else:
last_file = find_last_filtered_file(mcscan_dir, query, comp)
last_df = load_last_filtered(last_file) if last_file else pd.DataFrame()
return check_pair_in_last(last_df, query_gene, comp_gene)
def read_bed_file(path: Path) -> pd.DataFrame:
"""Read BED file into DataFrame using pandas_utils."""
return load_bed_file(path)
def write_bed_file(data, path: Path):
"""Write DataFrame or list of dicts to BED file using pandas_utils."""
if isinstance(data, list):
# Convert list of dicts to DataFrame
df = pd.DataFrame(data)
else:
df = data
save_bed(df, path)
def parse_config_string(config_str: str) -> dict:
"""
Parse configuration strings like 'genome:left:right|genome2:left:right'.
Returns dict: {genome: (left, right)}
"""
result = {}
if not config_str:
return result
for entry in config_str.split("|"):
parts = entry.split(":")
if len(parts) >= 3:
result[parts[0]] = (int(parts[1]), int(parts[2]))
return result
def parse_display_names_string(config_str: str) -> dict:
"""Parse display names like 'genome:CustomName|genome2:Name2'."""
result = {}
if not config_str:
return result
for entry in config_str.split("|"):
parts = entry.split(":", 1)
if len(parts) == 2:
result[parts[0]] = parts[1]
return result
def run_command(cmd: list, cwd: Path = None, check: bool = True):
"""Run a subprocess command, optionally in a specific directory."""
result = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True)
if check and result.returncode != 0:
print(f"Command failed: {' '.join(cmd)}")
print(result.stderr)
return result
# =============================================================================
# MAIN SCRIPT
# =============================================================================
def main():
args = parse_args()
# --- Setup paths (adapted for HuggingFace Spaces) ---
script_dir = Path(__file__).resolve().parent
mcscan_dir = get_mcscan_results_dir(script_dir)
bed_dir = mcscan_dir / "bed_files"
blocks_dir = mcscan_dir / "i1_blocks"
# Output directory - use PLANTMSYN_PROJECT_DIR if set, otherwise same as script_dir
project_dir = os.environ.get('PLANTMSYN_PROJECT_DIR')
if project_dir:
output_base = Path(project_dir) / "Microsynteny_plots"
else:
# On HF Spaces, scripts are in /app/ directly, so use script_dir
# On local, scripts are in Scripts/, so use parent
if script_dir.name == "app" or (script_dir / "data").is_dir():
output_base = script_dir / "Microsynteny_plots"
else:
output_base = script_dir.parent / "Microsynteny_plots"
# Parse configuration strings
comp_padding_cfg = parse_config_string(args.comp_padding_config)
comp_max_genes_cfg = parse_config_string(args.comp_max_genes_config)
custom_display_names = parse_display_names_string(args.display_names)
# Collect warnings for user feedback
analysis_warnings = []
# --- Validate MCscan directory ---
if not mcscan_dir.is_dir():
error_msg = format_error(ErrorMessages.MCSCAN_RESULTS_NOT_FOUND, expected_path=str(mcscan_dir))
logger.error(error_msg)
sys.exit(f"ERROR: {error_msg}")
# --- Create output directory with timestamp ---
query_display = get_display_name(args.query, custom_display_names)
comp_str = "_".join(args.comparisons)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
work_dir = output_base / f"{query_display}_usergenes_{comp_str}_{timestamp}"
work_dir.mkdir(parents=True, exist_ok=True)
log_session_start("plot_user_genes_microsynteny_v2")
logger.info("=" * 42)
logger.info("Corrected Microsynteny Plot Generator v2")
logger.info("=" * 42)
logger.info(f"Query genome: {query_display} ({args.query})")
logger.info(f"User genes: {' '.join(args.genes)}")
logger.info(f"Comparison genomes: {' '.join(args.comparisons)}")
logger.info(f"Padding: {args.padding} bp")
logger.info(f"Working directory: {work_dir}")
# --- Verify query BED file exists ---
query_bed_path = bed_dir / f"{args.query}.bed"
if not query_bed_path.is_file():
error_msg = format_error(ErrorMessages.BED_FILE_NOT_FOUND, genome=args.query, expected_path=str(query_bed_path))
logger.error(error_msg)
sys.exit(f"ERROR: {error_msg}")
# Write user genes to file
(work_dir / "user_genes.txt").write_text("\n".join(args.genes) + "\n")
# =========================================================================
# STEP 1: Get coordinates for user genes in query genome
# =========================================================================
logger.info("Step 1: Locating user genes in query genome...")
query_bed_df = read_bed_file(query_bed_path)
# Find user genes in query BED using pandas filtering
user_genes_df = query_bed_df[query_bed_df['gene'].isin(args.genes)]
missing_genes = set(args.genes) - set(user_genes_df['gene'])
# Report missing genes with proper warning
if missing_genes:
for gid in missing_genes:
warn_msg = format_error(ErrorMessages.GENE_NOT_FOUND, gene_id=gid, genome=args.query)
logger.warning(warn_msg)
analysis_warnings.append(warn_msg)
if user_genes_df.empty:
error_msg = format_error(ErrorMessages.GENES_NOT_FOUND_MULTIPLE,
genome=args.query,
gene_list=', '.join(args.genes),
found_count=0,
total_count=len(args.genes))
logger.error(error_msg)
sys.exit(f"ERROR: {error_msg}")
write_bed_file(user_genes_df, work_dir / "query_genes_coords.bed")
# --- CHROMOSOME MISMATCH DETECTION ---
unique_chromosomes = user_genes_df['chr'].unique().tolist()
if len(unique_chromosomes) > 1:
# Genes span multiple chromosomes - ERROR and stop execution!
error_msg = format_error(ErrorMessages.MULTIPLE_CHROMOSOMES_ERROR,
chromosomes=', '.join(unique_chromosomes))
logger.error(error_msg)
# Write error to file for webapp to detect
(work_dir / "analysis_errors.txt").write_text(f"ERROR: {error_msg}\n")
sys.exit(f"ERROR: {error_msg}")
else:
query_chr = unique_chromosomes[0]
# Calculate query region with padding
chr_genes_df = user_genes_df[user_genes_df['chr'] == query_chr]
min_pos = chr_genes_df['start'].min()
max_pos = chr_genes_df['end'].max()
left_pad = args.query_padding_left if args.query_padding_left is not None else args.padding
right_pad = args.query_padding_right if args.query_padding_right is not None else args.padding
query_start = max(0, min_pos - left_pad)
query_end = max_pos + right_pad
logger.info(f" Found {len(chr_genes_df)} genes on {query_chr}")
if left_pad == right_pad:
logger.info(f" Query region: {query_chr}:{query_start}-{query_end} (with {left_pad}bp padding)")
else:
logger.info(f" Query region: {query_chr}:{query_start}-{query_end} (left: {left_pad}bp, right: {right_pad}bp)")
# Get all genes in padded region using pandas
query_region_full_df = filter_bed_by_region(query_bed_df, query_chr, query_start, query_end)
write_bed_file(query_region_full_df, work_dir / "query.filtered.full.bed")
total_genes = len(query_region_full_df)
logger.info(f" Query window contains {total_genes} genes (before limiting)")
# --- GENE DISTANCE CHECK ---
# Check if any user gene is more than MAX_GENE_DISTANCE from its nearest neighbor
# This uses nearest-neighbor gap checking, not max-min span
MAX_GENE_DISTANCE = 20 # Maximum genes allowed between any two adjacent highlighted genes
query_region_full_df_indexed = query_region_full_df.reset_index(drop=True)
# Get gene positions with their names for error reporting
user_genes_with_indices = query_region_full_df_indexed[
query_region_full_df_indexed['gene'].isin(args.genes)
][['gene']].copy()
user_genes_with_indices['idx'] = user_genes_with_indices.index
if len(user_genes_with_indices) >= 2:
# Sort by position (index) to check consecutive gaps
sorted_genes = user_genes_with_indices.sort_values('idx').reset_index(drop=True)
# Check each consecutive pair for gaps exceeding threshold
max_gap = 0
max_gap_gene1 = None
max_gap_gene2 = None
for i in range(1, len(sorted_genes)):
prev_gene = sorted_genes.iloc[i - 1]
curr_gene = sorted_genes.iloc[i]
gap = curr_gene['idx'] - prev_gene['idx']
if gap > max_gap:
max_gap = gap
max_gap_gene1 = prev_gene['gene']
max_gap_gene2 = curr_gene['gene']
if max_gap > MAX_GENE_DISTANCE:
error_msg = format_error(ErrorMessages.GENE_GAP_TOO_LARGE,
gene1=max_gap_gene1,
gene2=max_gap_gene2,
distance=max_gap,
max_distance=MAX_GENE_DISTANCE)
logger.error(error_msg)
# Write error to file for webapp to detect
(work_dir / "analysis_errors.txt").write_text(f"ERROR: Gap check failed - {error_msg}\n")
sys.exit(f"ERROR: Gap check failed - {error_msg}")
elif max_gap > MAX_GENE_DISTANCE // 2:
# Warn if distance is getting close to limit
warn_msg = format_warning(WarningMessages.LARGE_REGION, n_genes=max_gap)
logger.warning(warn_msg)
analysis_warnings.append(warn_msg)
# --- Gene limiting logic ---
max_left = args.query_max_genes_left if args.query_max_genes_left is not None else 15
max_right = args.query_max_genes_right if args.query_max_genes_right is not None else 15
user_explicit_limits = args.query_max_genes_left is not None or args.query_max_genes_right is not None
max_total = max_left + max_right
# Apply limiting if user requested it OR region exceeds default
should_limit = user_explicit_limits or total_genes > (max_total + len(args.genes))
if should_limit:
if user_explicit_limits:
logger.info(f" User-specified gene limits: {max_left} left + {max_right} right flanking genes")
else:
logger.info(f" Region exceeds default limits, applying: {max_total} flanking genes (plus {len(args.genes)} user genes)...")
# Reset index for positional access
query_region_full_df = query_region_full_df.reset_index(drop=True)
# Find positions of user genes in the region
user_mask = query_region_full_df['gene'].isin(args.genes)
user_idxs = query_region_full_df[user_mask].index.tolist()
if user_idxs:
min_idx, max_idx = min(user_idxs), max(user_idxs)
start_idx = max(0, min_idx - max_left)
end_idx = min(len(query_region_full_df) - 1, max_idx + max_right)
query_region_df = query_region_full_df.iloc[start_idx:end_idx + 1].copy()
# Recalculate window bounds
new_start = query_region_df['start'].min()
new_end = query_region_df['end'].max()
buffer = max(10000, (new_end - new_start) // 20)
query_start = max(0, new_start - buffer)
query_end = new_end + buffer
logger.info(f" Final: {len(query_region_df)} genes in {query_end - query_start}bp window")
logger.info(f" Recalculated query window: {query_chr}:{query_start}-{query_end}")
else:
logger.error(" None of the user genes found in the padded region!")
query_region_df = query_region_full_df
else:
logger.info(f" No gene limiting applied (region has {total_genes} genes)")
query_region_df = query_region_full_df
write_bed_file(query_region_df, work_dir / "query.filtered.bed")
query_window_genes = query_region_df['gene'].tolist()
(work_dir / "query_window_genes.txt").write_text("\n".join(query_window_genes) + "\n")
logger.info(f" Final query window contains {len(query_window_genes)} genes")
print()
# =========================================================================
# STEP 2: Find syntenic regions in comparison genomes
# =========================================================================
logger.info("Step 2: Finding syntenic regions in comparison genomes...")
comp_regions = {} # {genome: (chr, start, end)}
for comp in args.comparisons:
comp_display = get_display_name(comp, custom_display_names)
logger.info(f" Processing {comp_display}...")
blocks_file = get_blocks_file(args.query, comp, blocks_dir)
if not blocks_file:
warn_msg = format_error(ErrorMessages.NO_BLOCKS_FILE,
query_genome=args.query,
comparison_genome=comp,
expected_path=f"{args.query}.{comp}.i1.blocks")
logger.warning(warn_msg)
analysis_warnings.append(warn_msg)
continue
comp_bed_path = bed_dir / f"{comp}.bed"
if not comp_bed_path.is_file():
warn_msg = format_error(ErrorMessages.BED_FILE_NOT_FOUND, genome=comp, expected_path=str(comp_bed_path))
logger.warning(warn_msg)
analysis_warnings.append(warn_msg)
continue
comp_bed_df = read_bed_file(comp_bed_path)
comp_bed = comp_bed_df.to_dict('records') # Convert DataFrame to list of dicts
comp_gene_lookup = {r["gene"]: r for r in comp_bed}
# Read blocks file (query_gene -> comp_gene)
blocks = {}
with open(blocks_file) as f:
for line in f:
parts = line.strip().split("\t")
if len(parts) >= 2 and parts[1] != ".":
blocks[parts[0]] = parts[1]
# Find matches for query window genes
matches = [(qg, blocks[qg]) for qg in query_window_genes if qg in blocks]
(work_dir / f"{comp}_matches.blocks").write_text("\n".join(f"{q}\t{c}" for q, c in matches) + "\n")
logger.info(f" Found {len(matches)} syntenic matches")
if not matches:
warn_msg = format_error(ErrorMessages.NO_SYNTENIC_MATCHES_SIMPLE,
query_genome=args.query,
comparison_genome=comp)
logger.warning(warn_msg)
analysis_warnings.append(warn_msg)
continue
# Get coordinates for orthologs
ortholog_coords = [comp_gene_lookup[orth] for _, orth in matches if orth in comp_gene_lookup]
write_bed_file(ortholog_coords, work_dir / f"{comp}_ortholog_coords.bed")
# Prioritize chromosome with most USER GENE orthologs
user_orths = [(g, blocks.get(g)) for g in args.genes if blocks.get(g) in comp_gene_lookup]
user_orth_coords = [comp_gene_lookup[o] for _, o in user_orths if o]
if user_orth_coords:
chr_counts = Counter(r["chr"] for r in user_orth_coords)
# --- CHECK: User gene orthologs must be on a single chromosome ---
if len(chr_counts) > 1:
# User gene orthologs are scattered across multiple chromosomes
# Build detailed breakdown for error message
gene_to_chr = {}
for gene, orth in user_orths:
if orth in comp_gene_lookup:
orth_chr = comp_gene_lookup[orth]["chr"]
if gene not in gene_to_chr:
gene_to_chr[gene] = []
gene_to_chr[gene].append(f"{orth} on {orth_chr}")
details_lines = []
for gene in args.genes:
if gene in gene_to_chr:
details_lines.append(f" • {gene}: {', '.join(gene_to_chr[gene])}")
else:
details_lines.append(f" • {gene}: no ortholog found")
error_msg = format_error(
ErrorMessages.ORTHOLOG_CHROMOSOMES_MISMATCH,
comparison_genome=get_display_name(comp, custom_display_names),
n_chromosomes=len(chr_counts),
chromosomes=', '.join(sorted(chr_counts.keys())),
details='\n'.join(details_lines)
)
logger.error(error_msg)
(work_dir / "analysis_errors.txt").write_text(f"ERROR: {error_msg}\n")
sys.exit(f"ERROR: {error_msg}")
comp_chr = chr_counts.most_common(1)[0][0]
user_on_chr = chr_counts.most_common(1)[0][1]
logger.info(f" Chromosome selection: {comp_chr} ({user_on_chr} user gene orthologs - prioritized)")
else:
chr_counts = Counter(r["chr"] for r in ortholog_coords)
comp_chr = chr_counts.most_common(1)[0][0]
logger.info(f" Chromosome selection: {comp_chr} (no user gene orthologs, using most total matches)")
# Get region bounds on best chromosome
chr_orths = [r for r in ortholog_coords if r["chr"] == comp_chr]
if not chr_orths:
warn_msg = f"Could not determine comparison chromosome for {comp}"
logger.warning(warn_msg)
analysis_warnings.append(warn_msg)
continue
# Get user gene ortholog range (window MUST include all of them)
user_chr_orths = [r for r in user_orth_coords if r["chr"] == comp_chr]
if user_chr_orths:
orth_start = min(r["start"] for r in user_chr_orths)
orth_end = max(r["end"] for r in user_chr_orths)
else:
orth_start = min(r["start"] for r in chr_orths)
orth_end = max(r["end"] for r in chr_orths)
# Get padding for this comparison
comp_left_pad, comp_right_pad = comp_padding_cfg.get(comp, (1500000, 1500000))
comp_start = max(0, orth_start - comp_left_pad)
comp_end = orth_end + comp_right_pad
if user_chr_orths:
logger.info(f" User gene ortholog range: {comp_chr}:{orth_start}-{orth_end}")
if comp_left_pad == comp_right_pad:
logger.info(f" With ±{comp_left_pad}bp padding: {comp_chr}:{comp_start}-{comp_end}")
else:
print(f" With left:{comp_left_pad}bp, right:{comp_right_pad}bp padding: {comp_chr}:{comp_start}-{comp_end}")
comp_regions[comp] = (comp_chr, comp_start, comp_end)
print(f" Region: {comp_chr}:{comp_start}-{comp_end}")
# Filter comparison BED to this region
comp_region_full = [r for r in comp_bed if r["chr"] == comp_chr and r["end"] >= comp_start and r["start"] <= comp_end]
comp_region_full.sort(key=lambda x: x["start"])
write_bed_file(comp_region_full, work_dir / f"{comp}.filtered.full.bed")
# --- Per-comparison gene limiting ---
if comp in comp_max_genes_cfg:
c_max_left, c_max_right = comp_max_genes_cfg[comp]
print(f" User-specified gene limits: {c_max_left} left + {c_max_right} right genes around user orthologs")
print(f" Total genes in region: {len(comp_region_full)}")
gene_to_idx = {r["gene"]: i for i, r in enumerate(comp_region_full)}
user_orth_idxs = [gene_to_idx[o] for _, o in user_orths if o in gene_to_idx]
if user_orth_idxs:
min_idx, max_idx = min(user_orth_idxs), max(user_orth_idxs)
start_idx = max(0, min_idx - c_max_left)
end_idx = min(len(comp_region_full) - 1, max_idx + c_max_right)
comp_region = comp_region_full[start_idx:end_idx + 1]
# Recalculate window
new_start = comp_region[0]["start"]
new_end = comp_region[-1]["end"]
buffer = max(10000, (new_end - new_start) // 20)
comp_start = max(0, new_start - buffer)
comp_end = new_end + buffer
comp_regions[comp] = (comp_chr, comp_start, comp_end)
print(f" Final: {len(comp_region)} genes in {comp_end - comp_start}bp window")
print(f" Recalculated window: {comp_chr}:{comp_start}-{comp_end}")
else:
print(" WARNING: No user gene orthologs found in region, keeping full region")
comp_region = comp_region_full
else:
comp_region = comp_region_full
write_bed_file(comp_region, work_dir / f"{comp}.filtered.bed")
comp_window_genes = [r["gene"] for r in comp_region]
(work_dir / f"{comp}_window_genes.txt").write_text("\n".join(comp_window_genes) + "\n")
print(f" Window contains {len(comp_window_genes)} genes")
print()
# =========================================================================
# STEP 3: Create filtered blocks for window-to-window connections
# =========================================================================
print("Step 3: Creating filtered blocks for plotting...")
for comp in args.comparisons:
if comp not in comp_regions:
comp_display = get_display_name(comp, custom_display_names)
print(f" {comp_display}: skipped (no region found)")
continue
blocks_file = get_blocks_file(args.query, comp, blocks_dir)
comp_window_genes = set((work_dir / f"{comp}_window_genes.txt").read_text().strip().split("\n"))
# Filter blocks to genes in both windows
filtered_blocks = []
with open(blocks_file) as f:
for line in f:
parts = line.strip().split("\t")
if len(parts) >= 2 and parts[0] in query_window_genes and parts[1] in comp_window_genes:
filtered_blocks.append(f"{parts[0]}\t{parts[1]}")
(work_dir / f"{comp}_window.blocks").write_text("\n".join(filtered_blocks) + "\n")
comp_display = get_display_name(comp, custom_display_names)
print(f" {comp_display}: {len(filtered_blocks)} connections")
print()
# =========================================================================
# STEP 4: Determine high-confidence status for user genes
# =========================================================================
print("Step 4: Determining confidence levels for user genes...")
# Parse user colors and annotations
color_list = [c.strip() for c in args.colors.split(",") if c.strip()] if args.colors else []
ann_list = args.annotations.split("|||") if args.annotations else []
gene_colors = {}
gene_confidence = {}
gene_annotations = {}
high_conf_pairs = set() # (gene, comp_genome) pairs that are high-confidence
with open(work_dir / "gene_colors.txt", "w") as f:
f.write("# Gene_ID\tColor\tConfidence\tAnnotation\tMatches\n")
for i, gene in enumerate(args.genes):
color = color_list[i] if i < len(color_list) else COLOR_PALETTE[i % len(COLOR_PALETTE)]
annotation = ann_list[i].strip() if i < len(ann_list) else ""
gene_annotations[gene] = annotation
has_high_conf = False
high_conf_genomes = []
low_conf_genomes = []
for comp in args.comparisons:
blocks_file = get_blocks_file(args.query, comp, blocks_dir)
if not blocks_file:
continue
ortholog = None
with open(blocks_file) as bf:
for line in bf:
parts = line.strip().split("\t")
if parts[0] == gene and len(parts) >= 2 and parts[1] != ".":
ortholog = parts[1]
break
if ortholog:
comp_display = get_display_name(comp, custom_display_names)
if args.keep_lowconf_color:
has_high_conf = True
high_conf_genomes.append(f"{comp_display}:{ortholog}")
high_conf_pairs.add((gene, comp))
elif is_high_confidence(gene, ortholog, args.query, comp, mcscan_dir):
has_high_conf = True
high_conf_genomes.append(f"{comp_display}:{ortholog}")
high_conf_pairs.add((gene, comp))
else:
low_conf_genomes.append(f"{comp_display}:{ortholog}(low)")
if has_high_conf:
gene_colors[gene] = color
gene_confidence[gene] = "HIGH"
msg = f" {gene} -> {color} (HIGH confidence)"
if annotation:
msg += f" [{annotation}]"
print(msg)
matches = ";".join(high_conf_genomes + low_conf_genomes)
f.write(f"{gene}\t{color}\tHIGH\t{annotation}\t{matches}\n")
else:
gene_colors[gene] = "grey"
gene_confidence[gene] = "LOW"
print(f" {gene} -> grey (LOW confidence only)")
f.write(f"{gene}\tgrey\tLOW\t{annotation}\t{';'.join(low_conf_genomes)}\n")
print()
# =========================================================================
# STEP 5: Merge BED files and create multi-column blocks
# =========================================================================
print("Step 5: Preparing files for JCVI plotting...")
# Build list of valid comparison genomes
valid_comps = [g for g in args.comparisons if (work_dir / f"{g}.filtered.bed").is_file() and (work_dir / f"{g}.filtered.bed").stat().st_size > 0]
if not valid_comps:
sys.exit("ERROR: No valid comparison genomes with syntenic matches found.")
# Determine genome order
if args.genome_order:
print(f" Using custom genome order: {args.genome_order}")
order_list = [g.strip() for g in args.genome_order.split(",")]
ordered_genomes = []
for g in order_list:
if g == args.query or g in valid_comps:
ordered_genomes.append(g)
else:
ordered_genomes = [args.query] + valid_comps
print(f" BED file order: {' '.join(ordered_genomes)}")
# Find query position
query_pos = ordered_genomes.index(args.query) if args.query in ordered_genomes else 0
if query_pos == 0:
print(" Query genome position: TOP (row 1)")
elif query_pos == len(ordered_genomes) - 1:
print(f" Query genome position: BOTTOM (row {len(ordered_genomes)})")
else:
print(f" Query genome position: MIDDLE (row {query_pos + 1})")
# Add track order prefix to BED files (t00_, t01_, etc.)
ordered_beds_dir = work_dir / "ordered_beds"
ordered_beds_dir.mkdir(exist_ok=True)
prefixed_beds = []
for i, genome in enumerate(ordered_genomes):
prefix = f"t{i:02d}"
if genome == args.query:
src = work_dir / "query.filtered.bed"
else:
src = work_dir / f"{genome}.filtered.bed"
dst = ordered_beds_dir / f"{prefix}_{genome}.bed"
with open(src) as inf, open(dst, "w") as outf:
for line in inf:
parts = line.strip().split("\t")
parts[0] = f"{prefix}_{parts[0]}"
outf.write("\t".join(parts) + "\n")
prefixed_beds.append(str(dst))
# Merge BED files using JCVI
run_command([sys.executable, "-m", "jcvi.formats.bed", "merge"] + prefixed_beds + ["-o", str(work_dir / "all_genomes.bed")])
# Merge blocks files using simple_merge_blocks.py
merge_script = script_dir / "simple_merge_blocks.py"
blocks_args = [str(work_dir / f"{comp}_window.blocks") for comp in args.comparisons if (work_dir / f"{comp}_window.blocks").is_file()]
if not merge_script.is_file():
sys.exit(f"ERROR: Required script not found: {merge_script}")
if blocks_args:
run_command([sys.executable, str(merge_script), str(work_dir / "all.blocks")] + blocks_args)
else:
sys.exit("ERROR: No blocks files found to merge.")
# --- Apply colors and filter low-confidence orthologs ---
colored_lines = []
with open(work_dir / "all.blocks") as f:
for line in f:
parts = line.strip().split("\t")
query_gene = parts[0]
if query_gene in gene_colors and gene_confidence.get(query_gene) == "HIGH":
color = gene_colors[query_gene]
if args.keep_lowconf_color:
rest = "\t".join(parts[1:])
colored_lines.append(f"{color}*{query_gene}\t{rest}")
else:
new_cols = [f"{color}*{query_gene}"]
for j, comp in enumerate(args.comparisons):
orth = parts[j + 1] if j + 1 < len(parts) else "."
if (query_gene, comp) in high_conf_pairs:
new_cols.append(orth)
else:
new_cols.append(".")
colored_lines.append("\t".join(new_cols))
elif query_gene in gene_colors:
color = gene_colors[query_gene]
rest = "\t".join(parts[1:])
colored_lines.append(f"{color}*{query_gene}\t{rest}")
else:
colored_lines.append(line.strip())
(work_dir / "all_colored.blocks").write_text("\n".join(colored_lines) + "\n")
# =========================================================================
# STEP 5b: Generate CSV summary
# =========================================================================
print("Step 5b: Generating gene summary CSV...")
(work_dir / "gene_list.txt").write_text("\n".join(args.genes) + "\n")
(work_dir / "comparisons_list.txt").write_text("\n".join(args.comparisons) + "\n")
annotations_file = ""
if args.annotations:
annotations_file = str(work_dir / "user_annotations.txt")
with open(annotations_file, "w") as f:
for gene in args.genes:
ann = gene_annotations.get(gene, "")
f.write(f"{gene}|||{ann}\n")
# Use Python version of CSV generator
csv_generator = script_dir / "generate_gene_summary_csv.py"
if csv_generator.is_file():
csv_cmd = ["python", str(csv_generator), str(work_dir), args.query, str(work_dir / "gene_list.txt"), str(work_dir / "comparisons_list.txt")]
if annotations_file:
csv_cmd.append(annotations_file)
# Pass MCscan parameters for the Method explanation row
# Use incoming params if provided (from Custom pages), else use defaults (Microsynteny page)
if args.mcscan_params:
mcscan_params_json = args.mcscan_params
else:
# Default parameters for database comparisons
mcscan_params_json = '{"cscore": "0.99", "max_dist": 20, "min_size": 4, "tandem_max": 10}'
csv_cmd.extend(['--mcscan-params', mcscan_params_json])
# Pass query genome display name for proper labeling in Method text (especially for custom genomes)
csv_cmd.extend(['--query-display-name', query_display])
run_command(csv_cmd, check=False)
print(" Created: gene_summary.csv")
else:
print(" Warning: CSV generator not found")
# --- Reorder blocks columns to match genome order ---
print(f" Reordering blocks columns to match genome order: {' '.join(ordered_genomes)}")
genome_to_col = {args.query: 0}
for j, comp in enumerate(args.comparisons):
if comp in valid_comps:
genome_to_col[comp] = j + 1
reordered_lines = []
for line in colored_lines:
parts = line.strip().split("\t")
new_parts = []
for g in ordered_genomes:
col_idx = genome_to_col.get(g, 0)
new_parts.append(parts[col_idx] if col_idx < len(parts) else ".")
reordered_lines.append("\t".join(new_parts))
# Move color prefix to column 1
final_lines = []
for line in reordered_lines:
parts = line.split("\t")
color_found = None
new_parts = []
for p in parts:
m = re.match(r"^([a-zA-Z]+)\*(.+)$", p)
if m:
color_found = m.group(1)
new_parts.append(m.group(2))
else:
new_parts.append(p)
if color_found:
new_parts[0] = f"{color_found}*{new_parts[0]}"
final_lines.append("\t".join(new_parts))
# Sort: non-colored first, grey second, colors last (drawn on top)
non_colored = [l for l in final_lines if "*" not in l]
grey_lines = [l for l in final_lines if l.startswith("grey*")]
color_lines = [l for l in final_lines if "*" in l and not l.startswith("grey*")]
(work_dir / "all_colored.blocks").write_text("\n".join(non_colored + grey_lines + color_lines) + "\n")
print(" Created all_genomes.bed and all_colored.blocks (reordered with colors fixed)")
print()
# =========================================================================
# STEP 6: Create layout file
# =========================================================================
print("Step 6: Creating layout file...")
num_comp = len(valid_comps)
if num_comp == 0:
sys.exit("ERROR: No valid comparison genomes for layout.")
layout_lines = ["# x, y, rotation, ha, va, color, ratio, label"]
if args.layout and args.genome_order:
print(f" Using custom layout: {args.layout} with genome order: {args.genome_order}")
layout_rows = [int(x.strip()) for x in args.layout.split(",")]
num_rows = len(layout_rows)
print(f" Track order (from BED files): {' '.join(ordered_genomes)}")
print(f" Total valid genomes: {len(ordered_genomes)}, Layout expects: {sum(layout_rows)}")
row_spacing = 0.70 / (num_rows - 1) if num_rows > 1 else 0.0
start_y = 0.85 if num_rows > 1 else 0.70
genome_idx = 0
query_track_idx = -1
track_genomes = []
for row_idx, genomes_in_row in enumerate(layout_rows):
y_pos = start_y - row_idx * row_spacing
ha, va = "center", "top" if row_idx == 0 else ("bottom" if row_idx == num_rows - 1 else "center")
if row_idx not in (0, num_rows - 1):
ha = "left"
for col in range(genomes_in_row):
if genome_idx < len(ordered_genomes):
current_genome = ordered_genomes[genome_idx]
if genomes_in_row == 1:
x_pos = 0.5
elif genomes_in_row == 2:
x_pos = 0.25 + col * 0.50
elif genomes_in_row == 3:
x_pos = 0.17 + col * 0.33
else:
x_pos = 0.10 + col * 0.80 / (genomes_in_row - 1)
ratio = {1: "1", 2: "0.40", 3: "0.28"}.get(genomes_in_row, "0.20")
display = get_plot_label_name(current_genome, custom_display_names)
# Add chromosome info to label only if no custom display name was provided
if current_genome == args.query:
if args.query not in custom_display_names:
display = f"{display} ({query_chr})"
elif current_genome in comp_regions:
if current_genome not in custom_display_names:
comp_chr_label = comp_regions[current_genome][0]
display = f"{display} ({comp_chr_label})"
layout_lines.append(f"{x_pos:.3f}, {y_pos:.3f}, 0, {ha}, {va}, , {ratio}, {display}, 15")
track_genomes.append(current_genome)
if current_genome == args.query:
query_track_idx = genome_idx
genome_idx += 1
layout_lines.append("# edges")
for i, g in enumerate(track_genomes):
if i != query_track_idx:
layout_lines.append(f"e, {query_track_idx}, {i}")
elif args.layout:
print(f" Using custom layout: {args.layout} (query genome at top)")
layout_rows = [int(x.strip()) for x in args.layout.split(",")]
num_rows = len(layout_rows)
total_rows = num_rows + 1
query_display = get_plot_label_name(args.query, custom_display_names)
# Only add chromosome info if no custom display name was provided
if args.query not in custom_display_names:
query_display = f"{query_display} ({query_chr})"
q_ha, q_va = ("left", "center") if total_rows == 1 else ("center", "top")
layout_lines.append(f"0.5, 0.90, 0, {q_ha}, {q_va}, , 1, {query_display}, 15")
row_spacing = 0.60 / (num_rows - 1) if num_rows > 1 else 0.0
start_y = 0.70 if num_rows > 1 else 0.50
genome_idx = 0
for row_idx, genomes_in_row in enumerate(layout_rows):
y_pos = start_y - row_idx * row_spacing
ha, va = ("center", "bottom") if row_idx == num_rows - 1 else ("left", "center")
for col in range(genomes_in_row):
if genome_idx < num_comp:
comp = valid_comps[genome_idx]
comp_display = get_plot_label_name(comp, custom_display_names)
# Add chromosome info only if no custom display name was provided
if comp not in custom_display_names and comp in comp_regions:
comp_chr_label = comp_regions[comp][0]
comp_display = f"{comp_display} ({comp_chr_label})"
x_pos = 0.5 if genomes_in_row == 1 else 0.15 + col * 0.70 / (genomes_in_row - 1)
layout_lines.append(f"{x_pos:.3f}, {y_pos:.3f}, 0, {ha}, {va}, , 1, {comp_display}, 15")
genome_idx += 1
layout_lines.append("# edges")
for i in range(1, num_comp + 1):
layout_lines.append(f"e, 0, {i}")
else:
print(" Using default single-column layout")
query_display = get_plot_label_name(args.query, custom_display_names)
# Only add chromosome info if no custom display name was provided
if args.query not in custom_display_names:
query_display = f"{query_display} ({query_chr})"
total_rows = num_comp + 1
q_ha, q_va = ("left", "center") if total_rows == 1 else ("center", "top")
layout_lines.append(f"0.5, 0.90, 0, {q_ha}, {q_va}, , 1, {query_display}, 15")
if num_comp == 1:
spacing, start_y = 0.0, 0.50
elif num_comp == 2:
spacing, start_y = 0.20, 0.60
else:
spacing, start_y = 0.65 / (num_comp - 1), 0.70
for i, comp in enumerate(valid_comps):
comp_display = get_plot_label_name(comp, custom_display_names)
# Add chromosome info only if no custom display name was provided
if comp not in custom_display_names and comp in comp_regions:
comp_chr_label = comp_regions[comp][0]
comp_display = f"{comp_display} ({comp_chr_label})"
y_pos = start_y - i * spacing
ha, va = ("center", "bottom") if i == num_comp - 1 else ("left", "center")
layout_lines.append(f"0.5, {y_pos:.3f}, 0, {ha}, {va}, , 1, {comp_display}, 15")
layout_lines.append("# edges")
for i in range(1, num_comp + 1):
layout_lines.append(f"e, 0, {i}")
(work_dir / "cluster.layout").write_text("\n".join(layout_lines) + "\n")
print(f" Layout file created with {num_comp} comparison genomes")
print()
# =========================================================================
# STEP 7: Generate plots
# =========================================================================
print("Step 7: Generating plots...")
# Calculate figure size
fig_width, fig_height = 14, 10
if args.layout:
layout_rows = [int(x.strip()) for x in args.layout.split(",")]
max_in_row = max(layout_rows)
num_rows = len(layout_rows)
fig_width = {1: 14, 2: 20, 3: 24}.get(max_in_row, 28 if max_in_row >= 4 else 14)
fig_height = {2: 12, 3: 14, 4: 16}.get(num_rows, 18 if num_rows >= 5 else 10)
print(f" Figure size: {fig_width}x{fig_height} (max {max_in_row} genomes per row, {num_rows} rows)")
else:
total_genomes = len(ordered_genomes)
if total_genomes >= 6:
fig_height = 16
elif total_genomes >= 4:
fig_height = 14
# Build gene label args
label_args = []
if args.genelabels and args.genelabelsize > 0:
label_args = [f"--genelabels={args.genelabels}", f"--genelabelsize={args.genelabelsize}"]
print(f" Gene labels enabled: {args.genelabels} (size: {args.genelabelsize})")
# Generate PNG
cmd = [
sys.executable, "-m", "jcvi.graphics.synteny",
"all_colored.blocks", "all_genomes.bed", "cluster.layout",
"--shadestyle=curve", "--format=png",
f"--figsize={fig_width}x{fig_height}", "--dpi=150"
] + label_args
run_command(cmd, cwd=work_dir, check=False)
png_out = work_dir / "all_colored.png"
if png_out.is_file():
png_out.rename(work_dir / "microsynteny_plot.png")
print(" Created: microsynteny_plot.png")
# Generate SVG - rebuild command with svg format
cmd = [
sys.executable, "-m", "jcvi.graphics.synteny",
"all_colored.blocks", "all_genomes.bed", "cluster.layout",
"--shadestyle=curve", "--format=svg",
f"--figsize={fig_width}x{fig_height}", "--dpi=150"
] + label_args
run_command(cmd, cwd=work_dir, check=False)
svg_out = work_dir / "all_colored.svg"
if svg_out.is_file():
svg_out.rename(work_dir / "microsynteny_plot.svg")
print(" Created: microsynteny_plot.svg")
# =========================================================================
# STEP 7b: Add legend to plots
# =========================================================================
legend_script = script_dir / "add_legend_to_plot.py"
if legend_script.is_file():
print("Step 7b: Adding legend to plots...")
run_command([sys.executable, str(legend_script), str(work_dir)], check=False)
print()
print("=" * 42)
print("✓ Complete!")
print("=" * 42)
print()
print("Output files:")
print(f" - {work_dir}/microsynteny_plot.png")
print(f" - {work_dir}/microsynteny_plot.svg")
print(f" - {work_dir}/gene_colors.txt")
print(f" - {work_dir}/gene_summary.csv")
print()
if __name__ == "__main__":
main()