Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Corrected User Genes Microsynteny Plot Generator (v2) - Python Implementation | |
| This script generates microsynteny plots for user-specified genes across multiple genomes | |
| using JCVI's plotting capabilities. | |
| Key Features: | |
| 1. Uses directional .i1.blocks files (A>B only - query genome must be first column) | |
| 2. Determines high-confidence by checking if gene pair exists in last.filtered | |
| 3. Supports asymmetric padding and gene limiting per genome | |
| 4. Generates PNG/SVG plots with colored syntenic connections | |
| Usage: | |
| python plot_user_genes_microsynteny_v2.py --query <genome> --genes <gene1> [gene2] ... \\ | |
| --comparisons <genome1> [genome2] ... | |
| Example: | |
| python plot_user_genes_microsynteny_v2.py --query hordeum_vulgare \\ | |
| --genes HORVU.MOREX.r3.1HG0089770 HORVU.MOREX.r3.1HG0089780 \\ | |
| --comparisons oryza_sativa triticum_aestivum | |
| Environment: | |
| Requires JCVI (jcvi310 or jcvi312 conda environment) with matplotlib, numpy, PIL. | |
| Assumes the correct Python environment is already activated. | |
| Output: | |
| Creates a timestamped directory in ../Microsynteny_plots/ containing: | |
| - microsynteny_plot.png/svg : Final plot images | |
| - gene_colors.txt : Color/confidence assignments for user genes | |
| - gene_summary.csv : Detailed alignment scores for all matches | |
| - Various intermediate .bed and .blocks files for debugging | |
| Author: Converted from plot_user_genes_microsynteny_v2.sh | |
| """ | |
| import argparse | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| from collections import Counter, defaultdict | |
| from datetime import datetime | |
| from pathlib import Path | |
| import pandas as pd | |
| from pandas_utils import ( | |
| load_bed_file, load_blocks_file, load_last_filtered, | |
| find_last_filtered_file, check_pair_in_last, filter_bed_by_region, | |
| save_bed | |
| ) | |
| from genome_config import SHORT_DISPLAY_NAMES as DISPLAY_NAMES, LATIN_DISPLAY_NAMES | |
| # Import centralized logging and error messages | |
| from logger import get_logger, log_session_start, log_session_end | |
| from error_messages import ErrorMessages, WarningMessages, format_error, format_warning | |
| # Module logger | |
| logger = get_logger(__name__) | |
| # ============================================================================= | |
| # CONSTANTS: Color Palette | |
| # ============================================================================= | |
| # Default color palette for user genes (cycles if more genes than colors) | |
| COLOR_PALETTE = ["red", "blue", "green", "orange", "purple", "cyan", "magenta", "brown", "pink", "olive"] | |
| # ============================================================================= | |
| # ARGUMENT PARSING | |
| # ============================================================================= | |
| def parse_args(): | |
| """ | |
| Parse command-line arguments. | |
| All arguments match the original bash script exactly: | |
| - Required: --query, --genes, --comparisons | |
| - Optional: --padding, --colors, --annotations, --layout, --genome-order, etc. | |
| """ | |
| p = argparse.ArgumentParser( | |
| description="Generate microsynteny plots for user-specified genes across multiple genomes.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Available genomes: | |
| arabidopsis_thaliana, brachypodium_distachyon, glycine_max, hordeum_vulgare, | |
| lolium_perenne, oryza_sativa, populus_trichocarpa, solanum_lycopersicum, | |
| sorghum_bicolor, triticum_aestivum, vitis_vinifera, and more... | |
| Example: | |
| %(prog)s --query hordeum_vulgare \\ | |
| --genes HORVU.MOREX.r3.1HG0089770 HORVU.MOREX.r3.1HG0089780 \\ | |
| --comparisons oryza_sativa triticum_aestivum | |
| """) | |
| # --- Required arguments --- | |
| p.add_argument("--query", required=True, help="Query genome name (e.g., hordeum_vulgare)") | |
| p.add_argument("--genes", nargs="+", required=True, help="One or more gene IDs from the query genome") | |
| p.add_argument("--comparisons", nargs="+", required=True, help="One or more comparison genome names") | |
| # --- Basic options --- | |
| p.add_argument("--padding", type=int, default=500000, | |
| help="Padding around target region in bp (default: 500000)") | |
| p.add_argument("--colors", default="", | |
| help="Comma-separated colors for each gene (optional)") | |
| p.add_argument("--annotations", default="", | |
| help="User annotations for legend ('|||' separated)") | |
| p.add_argument("-f", "--force", action="store_true", | |
| help="Force continue even if some checks fail") | |
| # --- Layout options --- | |
| p.add_argument("--layout", default="", | |
| help="Comma-separated layout counts per row (e.g., '2,3,1')") | |
| p.add_argument("--genome-order", default="", | |
| help="Comma-separated ordered list of all genomes") | |
| # --- Query genome tweaking --- | |
| p.add_argument("--query-padding-left", type=int, default=None, | |
| help="Left padding for query genome (overrides --padding)") | |
| p.add_argument("--query-padding-right", type=int, default=None, | |
| help="Right padding for query genome (overrides --padding)") | |
| p.add_argument("--query-max-genes-left", type=int, default=None, | |
| help="Max genes on left side of user genes") | |
| p.add_argument("--query-max-genes-right", type=int, default=None, | |
| help="Max genes on right side of user genes") | |
| # --- Per-comparison tweaking --- | |
| p.add_argument("--comp-padding-config", default="", | |
| help="Per-comparison padding: 'genome:left:right|genome2:left:right'") | |
| p.add_argument("--comp-max-genes-config", default="", | |
| help="Per-comparison gene limits: 'genome:left:right|genome2:left:right'") | |
| p.add_argument("--display-names", default="", | |
| help="Custom display names: 'genome:CustomName|genome2:Name2'") | |
| # --- Gene labeling --- | |
| p.add_argument("--genelabels", default="", | |
| help="Comma-separated genes to label on plot") | |
| p.add_argument("--genelabelsize", type=int, default=0, | |
| help="Font size for gene labels (0 = disabled)") | |
| # --- Confidence options --- | |
| p.add_argument("--keep-lowconf-color", action="store_true", | |
| help="Treat all syntenic matches as high-confidence (keep colors)") | |
| # --- MCscan parameters for CSV Method row --- | |
| p.add_argument("--mcscan-params", default="", | |
| help="JSON string with MCscan params for Method row in CSV") | |
| return p.parse_args() | |
| # ============================================================================= | |
| # HELPER FUNCTIONS | |
| # ============================================================================= | |
| def get_mcscan_results_dir(script_dir: Path) -> Path: | |
| """Return the MCscan results directory path. | |
| Checks for data in this order: | |
| 1. PLANTMSYN_DATA_DIR environment variable + /mcscan_results | |
| 2. ../data/mcscan_results (HuggingFace Spaces structure) | |
| 3. ../Mcscan_results/protein_pairwise (legacy local structure) | |
| """ | |
| # Check environment variable first | |
| data_dir = os.environ.get('PLANTMSYN_DATA_DIR') | |
| if data_dir: | |
| mcscan_path = Path(data_dir) / "mcscan_results" | |
| if mcscan_path.is_dir(): | |
| return mcscan_path | |
| # Try HuggingFace Spaces structure: ../data/mcscan_results | |
| hf_path = script_dir.parent / "data" / "mcscan_results" | |
| if hf_path.is_dir(): | |
| return hf_path | |
| # Fallback to legacy structure: ../Mcscan_results/protein_pairwise | |
| legacy_path = script_dir.parent / "Mcscan_results" / "protein_pairwise" | |
| if legacy_path.is_dir(): | |
| return legacy_path | |
| # Return HF path as default (will fail with clear error if not found) | |
| return hf_path | |
| def get_display_name(genome: str, custom_names: dict) -> str: | |
| """ | |
| Get human-readable display name for a genome. | |
| Priority order: | |
| 1. User-provided custom names (--display-names) | |
| 2. Environment variable CUSTOM_DISPLAY_NAME (for custom genomes) | |
| 3. Built-in DISPLAY_NAMES mapping | |
| 4. Genome identifier as-is | |
| """ | |
| if genome in custom_names: | |
| return custom_names[genome] | |
| env_name = os.environ.get("CUSTOM_DISPLAY_NAME", "") | |
| if env_name and genome == os.environ.get("CUSTOM_GENOME_KEY", ""): | |
| return env_name | |
| return DISPLAY_NAMES.get(genome, genome) | |
| def get_plot_label_name(genome: str, custom_names: dict) -> str: | |
| """ | |
| Get Latin/scientific name for genome labels on plots. | |
| Priority order: | |
| 1. User-provided custom names (--display-names) | |
| 2. Environment variable CUSTOM_DISPLAY_NAME (for custom genomes) | |
| 3. Built-in LATIN_DISPLAY_NAMES mapping | |
| 4. Auto-generated Latin name from genome_id (genus_species -> Genus species) | |
| """ | |
| if genome in custom_names: | |
| return custom_names[genome] | |
| env_name = os.environ.get("CUSTOM_DISPLAY_NAME", "") | |
| if env_name and genome == os.environ.get("CUSTOM_GENOME_KEY", ""): | |
| return env_name | |
| if genome in LATIN_DISPLAY_NAMES: | |
| return LATIN_DISPLAY_NAMES[genome] | |
| # Auto-generate Latin name: genus_species -> Genus species | |
| parts = genome.split('_') | |
| if len(parts) >= 2: | |
| return parts[0].capitalize() + ' ' + '_'.join(parts[1:]) | |
| return genome.replace('_', ' ').title() | |
| def get_blocks_file(query: str, comp: str, blocks_dir: Path) -> Path: | |
| """ | |
| Get the correct directional blocks file (query genome in column 1). | |
| JCVI blocks files are directional: A.B.i1.blocks means gene pairs where | |
| A's genes are in column 1 and B's genes are in column 2. We need the file | |
| where query genome genes are in column 1. | |
| """ | |
| path = blocks_dir / f"{query}.{comp}.i1.blocks" | |
| return path if path.is_file() else None | |
| def is_high_confidence(query_gene: str, comp_gene: str, query: str, comp: str, | |
| mcscan_dir: Path, last_cache: dict = None) -> bool: | |
| """ | |
| Check if a gene pair is high-confidence by looking in last.filtered files. | |
| Uses caching for efficiency when processing multiple genes. | |
| """ | |
| cache_key = (query, comp) | |
| # Load from cache or file | |
| if last_cache is not None: | |
| if cache_key not in last_cache: | |
| last_file = find_last_filtered_file(mcscan_dir, query, comp) | |
| last_cache[cache_key] = load_last_filtered(last_file) if last_file else pd.DataFrame() | |
| last_df = last_cache[cache_key] | |
| else: | |
| last_file = find_last_filtered_file(mcscan_dir, query, comp) | |
| last_df = load_last_filtered(last_file) if last_file else pd.DataFrame() | |
| return check_pair_in_last(last_df, query_gene, comp_gene) | |
| def read_bed_file(path: Path) -> pd.DataFrame: | |
| """Read BED file into DataFrame using pandas_utils.""" | |
| return load_bed_file(path) | |
| def write_bed_file(data, path: Path): | |
| """Write DataFrame or list of dicts to BED file using pandas_utils.""" | |
| if isinstance(data, list): | |
| # Convert list of dicts to DataFrame | |
| df = pd.DataFrame(data) | |
| else: | |
| df = data | |
| save_bed(df, path) | |
| def parse_config_string(config_str: str) -> dict: | |
| """ | |
| Parse configuration strings like 'genome:left:right|genome2:left:right'. | |
| Returns dict: {genome: (left, right)} | |
| """ | |
| result = {} | |
| if not config_str: | |
| return result | |
| for entry in config_str.split("|"): | |
| parts = entry.split(":") | |
| if len(parts) >= 3: | |
| result[parts[0]] = (int(parts[1]), int(parts[2])) | |
| return result | |
| def parse_display_names_string(config_str: str) -> dict: | |
| """Parse display names like 'genome:CustomName|genome2:Name2'.""" | |
| result = {} | |
| if not config_str: | |
| return result | |
| for entry in config_str.split("|"): | |
| parts = entry.split(":", 1) | |
| if len(parts) == 2: | |
| result[parts[0]] = parts[1] | |
| return result | |
| def run_command(cmd: list, cwd: Path = None, check: bool = True): | |
| """Run a subprocess command, optionally in a specific directory.""" | |
| result = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True) | |
| if check and result.returncode != 0: | |
| print(f"Command failed: {' '.join(cmd)}") | |
| print(result.stderr) | |
| return result | |
| # ============================================================================= | |
| # MAIN SCRIPT | |
| # ============================================================================= | |
| def main(): | |
| args = parse_args() | |
| # --- Setup paths (adapted for HuggingFace Spaces) --- | |
| script_dir = Path(__file__).resolve().parent | |
| mcscan_dir = get_mcscan_results_dir(script_dir) | |
| bed_dir = mcscan_dir / "bed_files" | |
| blocks_dir = mcscan_dir / "i1_blocks" | |
| # Output directory - use PLANTMSYN_PROJECT_DIR if set, otherwise same as script_dir | |
| project_dir = os.environ.get('PLANTMSYN_PROJECT_DIR') | |
| if project_dir: | |
| output_base = Path(project_dir) / "Microsynteny_plots" | |
| else: | |
| # On HF Spaces, scripts are in /app/ directly, so use script_dir | |
| # On local, scripts are in Scripts/, so use parent | |
| if script_dir.name == "app" or (script_dir / "data").is_dir(): | |
| output_base = script_dir / "Microsynteny_plots" | |
| else: | |
| output_base = script_dir.parent / "Microsynteny_plots" | |
| # Parse configuration strings | |
| comp_padding_cfg = parse_config_string(args.comp_padding_config) | |
| comp_max_genes_cfg = parse_config_string(args.comp_max_genes_config) | |
| custom_display_names = parse_display_names_string(args.display_names) | |
| # Collect warnings for user feedback | |
| analysis_warnings = [] | |
| # --- Validate MCscan directory --- | |
| if not mcscan_dir.is_dir(): | |
| error_msg = format_error(ErrorMessages.MCSCAN_RESULTS_NOT_FOUND, expected_path=str(mcscan_dir)) | |
| logger.error(error_msg) | |
| sys.exit(f"ERROR: {error_msg}") | |
| # --- Create output directory with timestamp --- | |
| query_display = get_display_name(args.query, custom_display_names) | |
| comp_str = "_".join(args.comparisons) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| work_dir = output_base / f"{query_display}_usergenes_{comp_str}_{timestamp}" | |
| work_dir.mkdir(parents=True, exist_ok=True) | |
| log_session_start("plot_user_genes_microsynteny_v2") | |
| logger.info("=" * 42) | |
| logger.info("Corrected Microsynteny Plot Generator v2") | |
| logger.info("=" * 42) | |
| logger.info(f"Query genome: {query_display} ({args.query})") | |
| logger.info(f"User genes: {' '.join(args.genes)}") | |
| logger.info(f"Comparison genomes: {' '.join(args.comparisons)}") | |
| logger.info(f"Padding: {args.padding} bp") | |
| logger.info(f"Working directory: {work_dir}") | |
| # --- Verify query BED file exists --- | |
| query_bed_path = bed_dir / f"{args.query}.bed" | |
| if not query_bed_path.is_file(): | |
| error_msg = format_error(ErrorMessages.BED_FILE_NOT_FOUND, genome=args.query, expected_path=str(query_bed_path)) | |
| logger.error(error_msg) | |
| sys.exit(f"ERROR: {error_msg}") | |
| # Write user genes to file | |
| (work_dir / "user_genes.txt").write_text("\n".join(args.genes) + "\n") | |
| # ========================================================================= | |
| # STEP 1: Get coordinates for user genes in query genome | |
| # ========================================================================= | |
| logger.info("Step 1: Locating user genes in query genome...") | |
| query_bed_df = read_bed_file(query_bed_path) | |
| # Find user genes in query BED using pandas filtering | |
| user_genes_df = query_bed_df[query_bed_df['gene'].isin(args.genes)] | |
| missing_genes = set(args.genes) - set(user_genes_df['gene']) | |
| # Report missing genes with proper warning | |
| if missing_genes: | |
| for gid in missing_genes: | |
| warn_msg = format_error(ErrorMessages.GENE_NOT_FOUND, gene_id=gid, genome=args.query) | |
| logger.warning(warn_msg) | |
| analysis_warnings.append(warn_msg) | |
| if user_genes_df.empty: | |
| error_msg = format_error(ErrorMessages.GENES_NOT_FOUND_MULTIPLE, | |
| genome=args.query, | |
| gene_list=', '.join(args.genes), | |
| found_count=0, | |
| total_count=len(args.genes)) | |
| logger.error(error_msg) | |
| sys.exit(f"ERROR: {error_msg}") | |
| write_bed_file(user_genes_df, work_dir / "query_genes_coords.bed") | |
| # --- CHROMOSOME MISMATCH DETECTION --- | |
| unique_chromosomes = user_genes_df['chr'].unique().tolist() | |
| if len(unique_chromosomes) > 1: | |
| # Genes span multiple chromosomes - ERROR and stop execution! | |
| error_msg = format_error(ErrorMessages.MULTIPLE_CHROMOSOMES_ERROR, | |
| chromosomes=', '.join(unique_chromosomes)) | |
| logger.error(error_msg) | |
| # Write error to file for webapp to detect | |
| (work_dir / "analysis_errors.txt").write_text(f"ERROR: {error_msg}\n") | |
| sys.exit(f"ERROR: {error_msg}") | |
| else: | |
| query_chr = unique_chromosomes[0] | |
| # Calculate query region with padding | |
| chr_genes_df = user_genes_df[user_genes_df['chr'] == query_chr] | |
| min_pos = chr_genes_df['start'].min() | |
| max_pos = chr_genes_df['end'].max() | |
| left_pad = args.query_padding_left if args.query_padding_left is not None else args.padding | |
| right_pad = args.query_padding_right if args.query_padding_right is not None else args.padding | |
| query_start = max(0, min_pos - left_pad) | |
| query_end = max_pos + right_pad | |
| logger.info(f" Found {len(chr_genes_df)} genes on {query_chr}") | |
| if left_pad == right_pad: | |
| logger.info(f" Query region: {query_chr}:{query_start}-{query_end} (with {left_pad}bp padding)") | |
| else: | |
| logger.info(f" Query region: {query_chr}:{query_start}-{query_end} (left: {left_pad}bp, right: {right_pad}bp)") | |
| # Get all genes in padded region using pandas | |
| query_region_full_df = filter_bed_by_region(query_bed_df, query_chr, query_start, query_end) | |
| write_bed_file(query_region_full_df, work_dir / "query.filtered.full.bed") | |
| total_genes = len(query_region_full_df) | |
| logger.info(f" Query window contains {total_genes} genes (before limiting)") | |
| # --- GENE DISTANCE CHECK --- | |
| # Check if any user gene is more than MAX_GENE_DISTANCE from its nearest neighbor | |
| # This uses nearest-neighbor gap checking, not max-min span | |
| MAX_GENE_DISTANCE = 20 # Maximum genes allowed between any two adjacent highlighted genes | |
| query_region_full_df_indexed = query_region_full_df.reset_index(drop=True) | |
| # Get gene positions with their names for error reporting | |
| user_genes_with_indices = query_region_full_df_indexed[ | |
| query_region_full_df_indexed['gene'].isin(args.genes) | |
| ][['gene']].copy() | |
| user_genes_with_indices['idx'] = user_genes_with_indices.index | |
| if len(user_genes_with_indices) >= 2: | |
| # Sort by position (index) to check consecutive gaps | |
| sorted_genes = user_genes_with_indices.sort_values('idx').reset_index(drop=True) | |
| # Check each consecutive pair for gaps exceeding threshold | |
| max_gap = 0 | |
| max_gap_gene1 = None | |
| max_gap_gene2 = None | |
| for i in range(1, len(sorted_genes)): | |
| prev_gene = sorted_genes.iloc[i - 1] | |
| curr_gene = sorted_genes.iloc[i] | |
| gap = curr_gene['idx'] - prev_gene['idx'] | |
| if gap > max_gap: | |
| max_gap = gap | |
| max_gap_gene1 = prev_gene['gene'] | |
| max_gap_gene2 = curr_gene['gene'] | |
| if max_gap > MAX_GENE_DISTANCE: | |
| error_msg = format_error(ErrorMessages.GENE_GAP_TOO_LARGE, | |
| gene1=max_gap_gene1, | |
| gene2=max_gap_gene2, | |
| distance=max_gap, | |
| max_distance=MAX_GENE_DISTANCE) | |
| logger.error(error_msg) | |
| # Write error to file for webapp to detect | |
| (work_dir / "analysis_errors.txt").write_text(f"ERROR: Gap check failed - {error_msg}\n") | |
| sys.exit(f"ERROR: Gap check failed - {error_msg}") | |
| elif max_gap > MAX_GENE_DISTANCE // 2: | |
| # Warn if distance is getting close to limit | |
| warn_msg = format_warning(WarningMessages.LARGE_REGION, n_genes=max_gap) | |
| logger.warning(warn_msg) | |
| analysis_warnings.append(warn_msg) | |
| # --- Gene limiting logic --- | |
| max_left = args.query_max_genes_left if args.query_max_genes_left is not None else 15 | |
| max_right = args.query_max_genes_right if args.query_max_genes_right is not None else 15 | |
| user_explicit_limits = args.query_max_genes_left is not None or args.query_max_genes_right is not None | |
| max_total = max_left + max_right | |
| # Apply limiting if user requested it OR region exceeds default | |
| should_limit = user_explicit_limits or total_genes > (max_total + len(args.genes)) | |
| if should_limit: | |
| if user_explicit_limits: | |
| logger.info(f" User-specified gene limits: {max_left} left + {max_right} right flanking genes") | |
| else: | |
| logger.info(f" Region exceeds default limits, applying: {max_total} flanking genes (plus {len(args.genes)} user genes)...") | |
| # Reset index for positional access | |
| query_region_full_df = query_region_full_df.reset_index(drop=True) | |
| # Find positions of user genes in the region | |
| user_mask = query_region_full_df['gene'].isin(args.genes) | |
| user_idxs = query_region_full_df[user_mask].index.tolist() | |
| if user_idxs: | |
| min_idx, max_idx = min(user_idxs), max(user_idxs) | |
| start_idx = max(0, min_idx - max_left) | |
| end_idx = min(len(query_region_full_df) - 1, max_idx + max_right) | |
| query_region_df = query_region_full_df.iloc[start_idx:end_idx + 1].copy() | |
| # Recalculate window bounds | |
| new_start = query_region_df['start'].min() | |
| new_end = query_region_df['end'].max() | |
| buffer = max(10000, (new_end - new_start) // 20) | |
| query_start = max(0, new_start - buffer) | |
| query_end = new_end + buffer | |
| logger.info(f" Final: {len(query_region_df)} genes in {query_end - query_start}bp window") | |
| logger.info(f" Recalculated query window: {query_chr}:{query_start}-{query_end}") | |
| else: | |
| logger.error(" None of the user genes found in the padded region!") | |
| query_region_df = query_region_full_df | |
| else: | |
| logger.info(f" No gene limiting applied (region has {total_genes} genes)") | |
| query_region_df = query_region_full_df | |
| write_bed_file(query_region_df, work_dir / "query.filtered.bed") | |
| query_window_genes = query_region_df['gene'].tolist() | |
| (work_dir / "query_window_genes.txt").write_text("\n".join(query_window_genes) + "\n") | |
| logger.info(f" Final query window contains {len(query_window_genes)} genes") | |
| print() | |
| # ========================================================================= | |
| # STEP 2: Find syntenic regions in comparison genomes | |
| # ========================================================================= | |
| logger.info("Step 2: Finding syntenic regions in comparison genomes...") | |
| comp_regions = {} # {genome: (chr, start, end)} | |
| for comp in args.comparisons: | |
| comp_display = get_display_name(comp, custom_display_names) | |
| logger.info(f" Processing {comp_display}...") | |
| blocks_file = get_blocks_file(args.query, comp, blocks_dir) | |
| if not blocks_file: | |
| warn_msg = format_error(ErrorMessages.NO_BLOCKS_FILE, | |
| query_genome=args.query, | |
| comparison_genome=comp, | |
| expected_path=f"{args.query}.{comp}.i1.blocks") | |
| logger.warning(warn_msg) | |
| analysis_warnings.append(warn_msg) | |
| continue | |
| comp_bed_path = bed_dir / f"{comp}.bed" | |
| if not comp_bed_path.is_file(): | |
| warn_msg = format_error(ErrorMessages.BED_FILE_NOT_FOUND, genome=comp, expected_path=str(comp_bed_path)) | |
| logger.warning(warn_msg) | |
| analysis_warnings.append(warn_msg) | |
| continue | |
| comp_bed_df = read_bed_file(comp_bed_path) | |
| comp_bed = comp_bed_df.to_dict('records') # Convert DataFrame to list of dicts | |
| comp_gene_lookup = {r["gene"]: r for r in comp_bed} | |
| # Read blocks file (query_gene -> comp_gene) | |
| blocks = {} | |
| with open(blocks_file) as f: | |
| for line in f: | |
| parts = line.strip().split("\t") | |
| if len(parts) >= 2 and parts[1] != ".": | |
| blocks[parts[0]] = parts[1] | |
| # Find matches for query window genes | |
| matches = [(qg, blocks[qg]) for qg in query_window_genes if qg in blocks] | |
| (work_dir / f"{comp}_matches.blocks").write_text("\n".join(f"{q}\t{c}" for q, c in matches) + "\n") | |
| logger.info(f" Found {len(matches)} syntenic matches") | |
| if not matches: | |
| warn_msg = format_error(ErrorMessages.NO_SYNTENIC_MATCHES_SIMPLE, | |
| query_genome=args.query, | |
| comparison_genome=comp) | |
| logger.warning(warn_msg) | |
| analysis_warnings.append(warn_msg) | |
| continue | |
| # Get coordinates for orthologs | |
| ortholog_coords = [comp_gene_lookup[orth] for _, orth in matches if orth in comp_gene_lookup] | |
| write_bed_file(ortholog_coords, work_dir / f"{comp}_ortholog_coords.bed") | |
| # Prioritize chromosome with most USER GENE orthologs | |
| user_orths = [(g, blocks.get(g)) for g in args.genes if blocks.get(g) in comp_gene_lookup] | |
| user_orth_coords = [comp_gene_lookup[o] for _, o in user_orths if o] | |
| if user_orth_coords: | |
| chr_counts = Counter(r["chr"] for r in user_orth_coords) | |
| # --- CHECK: User gene orthologs must be on a single chromosome --- | |
| if len(chr_counts) > 1: | |
| # User gene orthologs are scattered across multiple chromosomes | |
| # Build detailed breakdown for error message | |
| gene_to_chr = {} | |
| for gene, orth in user_orths: | |
| if orth in comp_gene_lookup: | |
| orth_chr = comp_gene_lookup[orth]["chr"] | |
| if gene not in gene_to_chr: | |
| gene_to_chr[gene] = [] | |
| gene_to_chr[gene].append(f"{orth} on {orth_chr}") | |
| details_lines = [] | |
| for gene in args.genes: | |
| if gene in gene_to_chr: | |
| details_lines.append(f" • {gene}: {', '.join(gene_to_chr[gene])}") | |
| else: | |
| details_lines.append(f" • {gene}: no ortholog found") | |
| error_msg = format_error( | |
| ErrorMessages.ORTHOLOG_CHROMOSOMES_MISMATCH, | |
| comparison_genome=get_display_name(comp, custom_display_names), | |
| n_chromosomes=len(chr_counts), | |
| chromosomes=', '.join(sorted(chr_counts.keys())), | |
| details='\n'.join(details_lines) | |
| ) | |
| logger.error(error_msg) | |
| (work_dir / "analysis_errors.txt").write_text(f"ERROR: {error_msg}\n") | |
| sys.exit(f"ERROR: {error_msg}") | |
| comp_chr = chr_counts.most_common(1)[0][0] | |
| user_on_chr = chr_counts.most_common(1)[0][1] | |
| logger.info(f" Chromosome selection: {comp_chr} ({user_on_chr} user gene orthologs - prioritized)") | |
| else: | |
| chr_counts = Counter(r["chr"] for r in ortholog_coords) | |
| comp_chr = chr_counts.most_common(1)[0][0] | |
| logger.info(f" Chromosome selection: {comp_chr} (no user gene orthologs, using most total matches)") | |
| # Get region bounds on best chromosome | |
| chr_orths = [r for r in ortholog_coords if r["chr"] == comp_chr] | |
| if not chr_orths: | |
| warn_msg = f"Could not determine comparison chromosome for {comp}" | |
| logger.warning(warn_msg) | |
| analysis_warnings.append(warn_msg) | |
| continue | |
| # Get user gene ortholog range (window MUST include all of them) | |
| user_chr_orths = [r for r in user_orth_coords if r["chr"] == comp_chr] | |
| if user_chr_orths: | |
| orth_start = min(r["start"] for r in user_chr_orths) | |
| orth_end = max(r["end"] for r in user_chr_orths) | |
| else: | |
| orth_start = min(r["start"] for r in chr_orths) | |
| orth_end = max(r["end"] for r in chr_orths) | |
| # Get padding for this comparison | |
| comp_left_pad, comp_right_pad = comp_padding_cfg.get(comp, (1500000, 1500000)) | |
| comp_start = max(0, orth_start - comp_left_pad) | |
| comp_end = orth_end + comp_right_pad | |
| if user_chr_orths: | |
| logger.info(f" User gene ortholog range: {comp_chr}:{orth_start}-{orth_end}") | |
| if comp_left_pad == comp_right_pad: | |
| logger.info(f" With ±{comp_left_pad}bp padding: {comp_chr}:{comp_start}-{comp_end}") | |
| else: | |
| print(f" With left:{comp_left_pad}bp, right:{comp_right_pad}bp padding: {comp_chr}:{comp_start}-{comp_end}") | |
| comp_regions[comp] = (comp_chr, comp_start, comp_end) | |
| print(f" Region: {comp_chr}:{comp_start}-{comp_end}") | |
| # Filter comparison BED to this region | |
| comp_region_full = [r for r in comp_bed if r["chr"] == comp_chr and r["end"] >= comp_start and r["start"] <= comp_end] | |
| comp_region_full.sort(key=lambda x: x["start"]) | |
| write_bed_file(comp_region_full, work_dir / f"{comp}.filtered.full.bed") | |
| # --- Per-comparison gene limiting --- | |
| if comp in comp_max_genes_cfg: | |
| c_max_left, c_max_right = comp_max_genes_cfg[comp] | |
| print(f" User-specified gene limits: {c_max_left} left + {c_max_right} right genes around user orthologs") | |
| print(f" Total genes in region: {len(comp_region_full)}") | |
| gene_to_idx = {r["gene"]: i for i, r in enumerate(comp_region_full)} | |
| user_orth_idxs = [gene_to_idx[o] for _, o in user_orths if o in gene_to_idx] | |
| if user_orth_idxs: | |
| min_idx, max_idx = min(user_orth_idxs), max(user_orth_idxs) | |
| start_idx = max(0, min_idx - c_max_left) | |
| end_idx = min(len(comp_region_full) - 1, max_idx + c_max_right) | |
| comp_region = comp_region_full[start_idx:end_idx + 1] | |
| # Recalculate window | |
| new_start = comp_region[0]["start"] | |
| new_end = comp_region[-1]["end"] | |
| buffer = max(10000, (new_end - new_start) // 20) | |
| comp_start = max(0, new_start - buffer) | |
| comp_end = new_end + buffer | |
| comp_regions[comp] = (comp_chr, comp_start, comp_end) | |
| print(f" Final: {len(comp_region)} genes in {comp_end - comp_start}bp window") | |
| print(f" Recalculated window: {comp_chr}:{comp_start}-{comp_end}") | |
| else: | |
| print(" WARNING: No user gene orthologs found in region, keeping full region") | |
| comp_region = comp_region_full | |
| else: | |
| comp_region = comp_region_full | |
| write_bed_file(comp_region, work_dir / f"{comp}.filtered.bed") | |
| comp_window_genes = [r["gene"] for r in comp_region] | |
| (work_dir / f"{comp}_window_genes.txt").write_text("\n".join(comp_window_genes) + "\n") | |
| print(f" Window contains {len(comp_window_genes)} genes") | |
| print() | |
| # ========================================================================= | |
| # STEP 3: Create filtered blocks for window-to-window connections | |
| # ========================================================================= | |
| print("Step 3: Creating filtered blocks for plotting...") | |
| for comp in args.comparisons: | |
| if comp not in comp_regions: | |
| comp_display = get_display_name(comp, custom_display_names) | |
| print(f" {comp_display}: skipped (no region found)") | |
| continue | |
| blocks_file = get_blocks_file(args.query, comp, blocks_dir) | |
| comp_window_genes = set((work_dir / f"{comp}_window_genes.txt").read_text().strip().split("\n")) | |
| # Filter blocks to genes in both windows | |
| filtered_blocks = [] | |
| with open(blocks_file) as f: | |
| for line in f: | |
| parts = line.strip().split("\t") | |
| if len(parts) >= 2 and parts[0] in query_window_genes and parts[1] in comp_window_genes: | |
| filtered_blocks.append(f"{parts[0]}\t{parts[1]}") | |
| (work_dir / f"{comp}_window.blocks").write_text("\n".join(filtered_blocks) + "\n") | |
| comp_display = get_display_name(comp, custom_display_names) | |
| print(f" {comp_display}: {len(filtered_blocks)} connections") | |
| print() | |
| # ========================================================================= | |
| # STEP 4: Determine high-confidence status for user genes | |
| # ========================================================================= | |
| print("Step 4: Determining confidence levels for user genes...") | |
| # Parse user colors and annotations | |
| color_list = [c.strip() for c in args.colors.split(",") if c.strip()] if args.colors else [] | |
| ann_list = args.annotations.split("|||") if args.annotations else [] | |
| gene_colors = {} | |
| gene_confidence = {} | |
| gene_annotations = {} | |
| high_conf_pairs = set() # (gene, comp_genome) pairs that are high-confidence | |
| with open(work_dir / "gene_colors.txt", "w") as f: | |
| f.write("# Gene_ID\tColor\tConfidence\tAnnotation\tMatches\n") | |
| for i, gene in enumerate(args.genes): | |
| color = color_list[i] if i < len(color_list) else COLOR_PALETTE[i % len(COLOR_PALETTE)] | |
| annotation = ann_list[i].strip() if i < len(ann_list) else "" | |
| gene_annotations[gene] = annotation | |
| has_high_conf = False | |
| high_conf_genomes = [] | |
| low_conf_genomes = [] | |
| for comp in args.comparisons: | |
| blocks_file = get_blocks_file(args.query, comp, blocks_dir) | |
| if not blocks_file: | |
| continue | |
| ortholog = None | |
| with open(blocks_file) as bf: | |
| for line in bf: | |
| parts = line.strip().split("\t") | |
| if parts[0] == gene and len(parts) >= 2 and parts[1] != ".": | |
| ortholog = parts[1] | |
| break | |
| if ortholog: | |
| comp_display = get_display_name(comp, custom_display_names) | |
| if args.keep_lowconf_color: | |
| has_high_conf = True | |
| high_conf_genomes.append(f"{comp_display}:{ortholog}") | |
| high_conf_pairs.add((gene, comp)) | |
| elif is_high_confidence(gene, ortholog, args.query, comp, mcscan_dir): | |
| has_high_conf = True | |
| high_conf_genomes.append(f"{comp_display}:{ortholog}") | |
| high_conf_pairs.add((gene, comp)) | |
| else: | |
| low_conf_genomes.append(f"{comp_display}:{ortholog}(low)") | |
| if has_high_conf: | |
| gene_colors[gene] = color | |
| gene_confidence[gene] = "HIGH" | |
| msg = f" {gene} -> {color} (HIGH confidence)" | |
| if annotation: | |
| msg += f" [{annotation}]" | |
| print(msg) | |
| matches = ";".join(high_conf_genomes + low_conf_genomes) | |
| f.write(f"{gene}\t{color}\tHIGH\t{annotation}\t{matches}\n") | |
| else: | |
| gene_colors[gene] = "grey" | |
| gene_confidence[gene] = "LOW" | |
| print(f" {gene} -> grey (LOW confidence only)") | |
| f.write(f"{gene}\tgrey\tLOW\t{annotation}\t{';'.join(low_conf_genomes)}\n") | |
| print() | |
| # ========================================================================= | |
| # STEP 5: Merge BED files and create multi-column blocks | |
| # ========================================================================= | |
| print("Step 5: Preparing files for JCVI plotting...") | |
| # Build list of valid comparison genomes | |
| valid_comps = [g for g in args.comparisons if (work_dir / f"{g}.filtered.bed").is_file() and (work_dir / f"{g}.filtered.bed").stat().st_size > 0] | |
| if not valid_comps: | |
| sys.exit("ERROR: No valid comparison genomes with syntenic matches found.") | |
| # Determine genome order | |
| if args.genome_order: | |
| print(f" Using custom genome order: {args.genome_order}") | |
| order_list = [g.strip() for g in args.genome_order.split(",")] | |
| ordered_genomes = [] | |
| for g in order_list: | |
| if g == args.query or g in valid_comps: | |
| ordered_genomes.append(g) | |
| else: | |
| ordered_genomes = [args.query] + valid_comps | |
| print(f" BED file order: {' '.join(ordered_genomes)}") | |
| # Find query position | |
| query_pos = ordered_genomes.index(args.query) if args.query in ordered_genomes else 0 | |
| if query_pos == 0: | |
| print(" Query genome position: TOP (row 1)") | |
| elif query_pos == len(ordered_genomes) - 1: | |
| print(f" Query genome position: BOTTOM (row {len(ordered_genomes)})") | |
| else: | |
| print(f" Query genome position: MIDDLE (row {query_pos + 1})") | |
| # Add track order prefix to BED files (t00_, t01_, etc.) | |
| ordered_beds_dir = work_dir / "ordered_beds" | |
| ordered_beds_dir.mkdir(exist_ok=True) | |
| prefixed_beds = [] | |
| for i, genome in enumerate(ordered_genomes): | |
| prefix = f"t{i:02d}" | |
| if genome == args.query: | |
| src = work_dir / "query.filtered.bed" | |
| else: | |
| src = work_dir / f"{genome}.filtered.bed" | |
| dst = ordered_beds_dir / f"{prefix}_{genome}.bed" | |
| with open(src) as inf, open(dst, "w") as outf: | |
| for line in inf: | |
| parts = line.strip().split("\t") | |
| parts[0] = f"{prefix}_{parts[0]}" | |
| outf.write("\t".join(parts) + "\n") | |
| prefixed_beds.append(str(dst)) | |
| # Merge BED files using JCVI | |
| run_command([sys.executable, "-m", "jcvi.formats.bed", "merge"] + prefixed_beds + ["-o", str(work_dir / "all_genomes.bed")]) | |
| # Merge blocks files using simple_merge_blocks.py | |
| merge_script = script_dir / "simple_merge_blocks.py" | |
| blocks_args = [str(work_dir / f"{comp}_window.blocks") for comp in args.comparisons if (work_dir / f"{comp}_window.blocks").is_file()] | |
| if not merge_script.is_file(): | |
| sys.exit(f"ERROR: Required script not found: {merge_script}") | |
| if blocks_args: | |
| run_command([sys.executable, str(merge_script), str(work_dir / "all.blocks")] + blocks_args) | |
| else: | |
| sys.exit("ERROR: No blocks files found to merge.") | |
| # --- Apply colors and filter low-confidence orthologs --- | |
| colored_lines = [] | |
| with open(work_dir / "all.blocks") as f: | |
| for line in f: | |
| parts = line.strip().split("\t") | |
| query_gene = parts[0] | |
| if query_gene in gene_colors and gene_confidence.get(query_gene) == "HIGH": | |
| color = gene_colors[query_gene] | |
| if args.keep_lowconf_color: | |
| rest = "\t".join(parts[1:]) | |
| colored_lines.append(f"{color}*{query_gene}\t{rest}") | |
| else: | |
| new_cols = [f"{color}*{query_gene}"] | |
| for j, comp in enumerate(args.comparisons): | |
| orth = parts[j + 1] if j + 1 < len(parts) else "." | |
| if (query_gene, comp) in high_conf_pairs: | |
| new_cols.append(orth) | |
| else: | |
| new_cols.append(".") | |
| colored_lines.append("\t".join(new_cols)) | |
| elif query_gene in gene_colors: | |
| color = gene_colors[query_gene] | |
| rest = "\t".join(parts[1:]) | |
| colored_lines.append(f"{color}*{query_gene}\t{rest}") | |
| else: | |
| colored_lines.append(line.strip()) | |
| (work_dir / "all_colored.blocks").write_text("\n".join(colored_lines) + "\n") | |
| # ========================================================================= | |
| # STEP 5b: Generate CSV summary | |
| # ========================================================================= | |
| print("Step 5b: Generating gene summary CSV...") | |
| (work_dir / "gene_list.txt").write_text("\n".join(args.genes) + "\n") | |
| (work_dir / "comparisons_list.txt").write_text("\n".join(args.comparisons) + "\n") | |
| annotations_file = "" | |
| if args.annotations: | |
| annotations_file = str(work_dir / "user_annotations.txt") | |
| with open(annotations_file, "w") as f: | |
| for gene in args.genes: | |
| ann = gene_annotations.get(gene, "") | |
| f.write(f"{gene}|||{ann}\n") | |
| # Use Python version of CSV generator | |
| csv_generator = script_dir / "generate_gene_summary_csv.py" | |
| if csv_generator.is_file(): | |
| csv_cmd = ["python", str(csv_generator), str(work_dir), args.query, str(work_dir / "gene_list.txt"), str(work_dir / "comparisons_list.txt")] | |
| if annotations_file: | |
| csv_cmd.append(annotations_file) | |
| # Pass MCscan parameters for the Method explanation row | |
| # Use incoming params if provided (from Custom pages), else use defaults (Microsynteny page) | |
| if args.mcscan_params: | |
| mcscan_params_json = args.mcscan_params | |
| else: | |
| # Default parameters for database comparisons | |
| mcscan_params_json = '{"cscore": "0.99", "max_dist": 20, "min_size": 4, "tandem_max": 10}' | |
| csv_cmd.extend(['--mcscan-params', mcscan_params_json]) | |
| # Pass query genome display name for proper labeling in Method text (especially for custom genomes) | |
| csv_cmd.extend(['--query-display-name', query_display]) | |
| run_command(csv_cmd, check=False) | |
| print(" Created: gene_summary.csv") | |
| else: | |
| print(" Warning: CSV generator not found") | |
| # --- Reorder blocks columns to match genome order --- | |
| print(f" Reordering blocks columns to match genome order: {' '.join(ordered_genomes)}") | |
| genome_to_col = {args.query: 0} | |
| for j, comp in enumerate(args.comparisons): | |
| if comp in valid_comps: | |
| genome_to_col[comp] = j + 1 | |
| reordered_lines = [] | |
| for line in colored_lines: | |
| parts = line.strip().split("\t") | |
| new_parts = [] | |
| for g in ordered_genomes: | |
| col_idx = genome_to_col.get(g, 0) | |
| new_parts.append(parts[col_idx] if col_idx < len(parts) else ".") | |
| reordered_lines.append("\t".join(new_parts)) | |
| # Move color prefix to column 1 | |
| final_lines = [] | |
| for line in reordered_lines: | |
| parts = line.split("\t") | |
| color_found = None | |
| new_parts = [] | |
| for p in parts: | |
| m = re.match(r"^([a-zA-Z]+)\*(.+)$", p) | |
| if m: | |
| color_found = m.group(1) | |
| new_parts.append(m.group(2)) | |
| else: | |
| new_parts.append(p) | |
| if color_found: | |
| new_parts[0] = f"{color_found}*{new_parts[0]}" | |
| final_lines.append("\t".join(new_parts)) | |
| # Sort: non-colored first, grey second, colors last (drawn on top) | |
| non_colored = [l for l in final_lines if "*" not in l] | |
| grey_lines = [l for l in final_lines if l.startswith("grey*")] | |
| color_lines = [l for l in final_lines if "*" in l and not l.startswith("grey*")] | |
| (work_dir / "all_colored.blocks").write_text("\n".join(non_colored + grey_lines + color_lines) + "\n") | |
| print(" Created all_genomes.bed and all_colored.blocks (reordered with colors fixed)") | |
| print() | |
| # ========================================================================= | |
| # STEP 6: Create layout file | |
| # ========================================================================= | |
| print("Step 6: Creating layout file...") | |
| num_comp = len(valid_comps) | |
| if num_comp == 0: | |
| sys.exit("ERROR: No valid comparison genomes for layout.") | |
| layout_lines = ["# x, y, rotation, ha, va, color, ratio, label"] | |
| if args.layout and args.genome_order: | |
| print(f" Using custom layout: {args.layout} with genome order: {args.genome_order}") | |
| layout_rows = [int(x.strip()) for x in args.layout.split(",")] | |
| num_rows = len(layout_rows) | |
| print(f" Track order (from BED files): {' '.join(ordered_genomes)}") | |
| print(f" Total valid genomes: {len(ordered_genomes)}, Layout expects: {sum(layout_rows)}") | |
| row_spacing = 0.70 / (num_rows - 1) if num_rows > 1 else 0.0 | |
| start_y = 0.85 if num_rows > 1 else 0.70 | |
| genome_idx = 0 | |
| query_track_idx = -1 | |
| track_genomes = [] | |
| for row_idx, genomes_in_row in enumerate(layout_rows): | |
| y_pos = start_y - row_idx * row_spacing | |
| ha, va = "center", "top" if row_idx == 0 else ("bottom" if row_idx == num_rows - 1 else "center") | |
| if row_idx not in (0, num_rows - 1): | |
| ha = "left" | |
| for col in range(genomes_in_row): | |
| if genome_idx < len(ordered_genomes): | |
| current_genome = ordered_genomes[genome_idx] | |
| if genomes_in_row == 1: | |
| x_pos = 0.5 | |
| elif genomes_in_row == 2: | |
| x_pos = 0.25 + col * 0.50 | |
| elif genomes_in_row == 3: | |
| x_pos = 0.17 + col * 0.33 | |
| else: | |
| x_pos = 0.10 + col * 0.80 / (genomes_in_row - 1) | |
| ratio = {1: "1", 2: "0.40", 3: "0.28"}.get(genomes_in_row, "0.20") | |
| display = get_plot_label_name(current_genome, custom_display_names) | |
| # Add chromosome info to label only if no custom display name was provided | |
| if current_genome == args.query: | |
| if args.query not in custom_display_names: | |
| display = f"{display} ({query_chr})" | |
| elif current_genome in comp_regions: | |
| if current_genome not in custom_display_names: | |
| comp_chr_label = comp_regions[current_genome][0] | |
| display = f"{display} ({comp_chr_label})" | |
| layout_lines.append(f"{x_pos:.3f}, {y_pos:.3f}, 0, {ha}, {va}, , {ratio}, {display}, 15") | |
| track_genomes.append(current_genome) | |
| if current_genome == args.query: | |
| query_track_idx = genome_idx | |
| genome_idx += 1 | |
| layout_lines.append("# edges") | |
| for i, g in enumerate(track_genomes): | |
| if i != query_track_idx: | |
| layout_lines.append(f"e, {query_track_idx}, {i}") | |
| elif args.layout: | |
| print(f" Using custom layout: {args.layout} (query genome at top)") | |
| layout_rows = [int(x.strip()) for x in args.layout.split(",")] | |
| num_rows = len(layout_rows) | |
| total_rows = num_rows + 1 | |
| query_display = get_plot_label_name(args.query, custom_display_names) | |
| # Only add chromosome info if no custom display name was provided | |
| if args.query not in custom_display_names: | |
| query_display = f"{query_display} ({query_chr})" | |
| q_ha, q_va = ("left", "center") if total_rows == 1 else ("center", "top") | |
| layout_lines.append(f"0.5, 0.90, 0, {q_ha}, {q_va}, , 1, {query_display}, 15") | |
| row_spacing = 0.60 / (num_rows - 1) if num_rows > 1 else 0.0 | |
| start_y = 0.70 if num_rows > 1 else 0.50 | |
| genome_idx = 0 | |
| for row_idx, genomes_in_row in enumerate(layout_rows): | |
| y_pos = start_y - row_idx * row_spacing | |
| ha, va = ("center", "bottom") if row_idx == num_rows - 1 else ("left", "center") | |
| for col in range(genomes_in_row): | |
| if genome_idx < num_comp: | |
| comp = valid_comps[genome_idx] | |
| comp_display = get_plot_label_name(comp, custom_display_names) | |
| # Add chromosome info only if no custom display name was provided | |
| if comp not in custom_display_names and comp in comp_regions: | |
| comp_chr_label = comp_regions[comp][0] | |
| comp_display = f"{comp_display} ({comp_chr_label})" | |
| x_pos = 0.5 if genomes_in_row == 1 else 0.15 + col * 0.70 / (genomes_in_row - 1) | |
| layout_lines.append(f"{x_pos:.3f}, {y_pos:.3f}, 0, {ha}, {va}, , 1, {comp_display}, 15") | |
| genome_idx += 1 | |
| layout_lines.append("# edges") | |
| for i in range(1, num_comp + 1): | |
| layout_lines.append(f"e, 0, {i}") | |
| else: | |
| print(" Using default single-column layout") | |
| query_display = get_plot_label_name(args.query, custom_display_names) | |
| # Only add chromosome info if no custom display name was provided | |
| if args.query not in custom_display_names: | |
| query_display = f"{query_display} ({query_chr})" | |
| total_rows = num_comp + 1 | |
| q_ha, q_va = ("left", "center") if total_rows == 1 else ("center", "top") | |
| layout_lines.append(f"0.5, 0.90, 0, {q_ha}, {q_va}, , 1, {query_display}, 15") | |
| if num_comp == 1: | |
| spacing, start_y = 0.0, 0.50 | |
| elif num_comp == 2: | |
| spacing, start_y = 0.20, 0.60 | |
| else: | |
| spacing, start_y = 0.65 / (num_comp - 1), 0.70 | |
| for i, comp in enumerate(valid_comps): | |
| comp_display = get_plot_label_name(comp, custom_display_names) | |
| # Add chromosome info only if no custom display name was provided | |
| if comp not in custom_display_names and comp in comp_regions: | |
| comp_chr_label = comp_regions[comp][0] | |
| comp_display = f"{comp_display} ({comp_chr_label})" | |
| y_pos = start_y - i * spacing | |
| ha, va = ("center", "bottom") if i == num_comp - 1 else ("left", "center") | |
| layout_lines.append(f"0.5, {y_pos:.3f}, 0, {ha}, {va}, , 1, {comp_display}, 15") | |
| layout_lines.append("# edges") | |
| for i in range(1, num_comp + 1): | |
| layout_lines.append(f"e, 0, {i}") | |
| (work_dir / "cluster.layout").write_text("\n".join(layout_lines) + "\n") | |
| print(f" Layout file created with {num_comp} comparison genomes") | |
| print() | |
| # ========================================================================= | |
| # STEP 7: Generate plots | |
| # ========================================================================= | |
| print("Step 7: Generating plots...") | |
| # Calculate figure size | |
| fig_width, fig_height = 14, 10 | |
| if args.layout: | |
| layout_rows = [int(x.strip()) for x in args.layout.split(",")] | |
| max_in_row = max(layout_rows) | |
| num_rows = len(layout_rows) | |
| fig_width = {1: 14, 2: 20, 3: 24}.get(max_in_row, 28 if max_in_row >= 4 else 14) | |
| fig_height = {2: 12, 3: 14, 4: 16}.get(num_rows, 18 if num_rows >= 5 else 10) | |
| print(f" Figure size: {fig_width}x{fig_height} (max {max_in_row} genomes per row, {num_rows} rows)") | |
| else: | |
| total_genomes = len(ordered_genomes) | |
| if total_genomes >= 6: | |
| fig_height = 16 | |
| elif total_genomes >= 4: | |
| fig_height = 14 | |
| # Build gene label args | |
| label_args = [] | |
| if args.genelabels and args.genelabelsize > 0: | |
| label_args = [f"--genelabels={args.genelabels}", f"--genelabelsize={args.genelabelsize}"] | |
| print(f" Gene labels enabled: {args.genelabels} (size: {args.genelabelsize})") | |
| # Generate PNG | |
| cmd = [ | |
| sys.executable, "-m", "jcvi.graphics.synteny", | |
| "all_colored.blocks", "all_genomes.bed", "cluster.layout", | |
| "--shadestyle=curve", "--format=png", | |
| f"--figsize={fig_width}x{fig_height}", "--dpi=150" | |
| ] + label_args | |
| run_command(cmd, cwd=work_dir, check=False) | |
| png_out = work_dir / "all_colored.png" | |
| if png_out.is_file(): | |
| png_out.rename(work_dir / "microsynteny_plot.png") | |
| print(" Created: microsynteny_plot.png") | |
| # Generate SVG - rebuild command with svg format | |
| cmd = [ | |
| sys.executable, "-m", "jcvi.graphics.synteny", | |
| "all_colored.blocks", "all_genomes.bed", "cluster.layout", | |
| "--shadestyle=curve", "--format=svg", | |
| f"--figsize={fig_width}x{fig_height}", "--dpi=150" | |
| ] + label_args | |
| run_command(cmd, cwd=work_dir, check=False) | |
| svg_out = work_dir / "all_colored.svg" | |
| if svg_out.is_file(): | |
| svg_out.rename(work_dir / "microsynteny_plot.svg") | |
| print(" Created: microsynteny_plot.svg") | |
| # ========================================================================= | |
| # STEP 7b: Add legend to plots | |
| # ========================================================================= | |
| legend_script = script_dir / "add_legend_to_plot.py" | |
| if legend_script.is_file(): | |
| print("Step 7b: Adding legend to plots...") | |
| run_command([sys.executable, str(legend_script), str(work_dir)], check=False) | |
| print() | |
| print("=" * 42) | |
| print("✓ Complete!") | |
| print("=" * 42) | |
| print() | |
| print("Output files:") | |
| print(f" - {work_dir}/microsynteny_plot.png") | |
| print(f" - {work_dir}/microsynteny_plot.svg") | |
| print(f" - {work_dir}/gene_colors.txt") | |
| print(f" - {work_dir}/gene_summary.csv") | |
| print() | |
| if __name__ == "__main__": | |
| main() | |