Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| SQL Catalog Helper for Plant-mSyn Webapp (Hugging Face Edition). | |
| This module provides fast lookup functions using the SQLite metadata catalog | |
| to determine which target genomes have matches for given query genes. | |
| Use this to filter the list of comparison files BEFORE calling the search scripts, | |
| reducing the number of files that need to be scanned. | |
| Usage in webapp: | |
| from sql_catalog_helper import ( | |
| get_target_genomes_for_genes, | |
| get_relevant_comparisons, | |
| is_catalog_available | |
| ) | |
| # Check if catalog exists for a genome | |
| if is_catalog_available(query_genome): | |
| # Get only the genomes that actually have matches | |
| relevant_targets = get_target_genomes_for_genes(query_genome, gene_list) | |
| # Filter comparisons to only relevant ones | |
| filtered_comparisons = [c for c in comparisons if c in relevant_targets] | |
| """ | |
| import sqlite3 | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Set, Tuple | |
| import os | |
| import sys | |
| # Try to import path_config from Scripts | |
| try: | |
| # When running from webapp | |
| _script_dir = Path(__file__).resolve().parent.parent / "Scripts" | |
| if str(_script_dir) not in sys.path: | |
| sys.path.insert(0, str(_script_dir)) | |
| from path_config import SQL_DIR, SEARCH_CATALOGS_DIR, METADATA_DB_PATH, MCSCAN_RESULTS_DIR, DATA_DIR | |
| from logger import get_logger | |
| except ImportError: | |
| # Fallback: compute paths relative to this file (HF structure) | |
| _webapp_dir = Path(__file__).resolve().parent | |
| _project_dir = _webapp_dir.parent | |
| DATA_DIR = _project_dir / "data" | |
| SQL_DIR = DATA_DIR / "sql" | |
| SEARCH_CATALOGS_DIR = SQL_DIR / "search_catalogs" | |
| METADATA_DB_PATH = SQL_DIR / "plantmsyn_metadata.db" | |
| MCSCAN_RESULTS_DIR = DATA_DIR / "mcscan_results" | |
| # Minimal fallback logger | |
| import logging | |
| def get_logger(name): | |
| return logging.getLogger(name) | |
| # Module logger | |
| logger = get_logger("synteny.sql_catalog") | |
| def is_catalog_available(query_genome: str) -> bool: | |
| """ | |
| Check if a search catalog exists for the given query genome. | |
| Args: | |
| query_genome: The genome name (e.g., 'arabidopsis_thaliana') | |
| Returns: | |
| True if catalog exists and is readable | |
| """ | |
| catalog_path = SEARCH_CATALOGS_DIR / f"{query_genome}.catalog.sqlite" | |
| return catalog_path.exists() and catalog_path.stat().st_size > 0 | |
| def is_metadata_db_available() -> bool: | |
| """Check if the central metadata database exists.""" | |
| return METADATA_DB_PATH.exists() and METADATA_DB_PATH.stat().st_size > 0 | |
| def get_target_genomes_for_genes( | |
| query_genome: str, | |
| gene_ids: List[str] | |
| ) -> Dict[str, Set[str]]: | |
| """ | |
| Find which target genomes have matches for the given genes. | |
| This is the main function for pre-filtering comparisons. | |
| Args: | |
| query_genome: The query genome name | |
| gene_ids: List of gene IDs to look up | |
| Returns: | |
| Dict mapping gene_id -> set of target genome names that have matches | |
| """ | |
| catalog_path = SEARCH_CATALOGS_DIR / f"{query_genome}.catalog.sqlite" | |
| if not catalog_path.exists(): | |
| # Catalog not available, return empty (caller should fall back to scanning all) | |
| return {} | |
| results = {} | |
| try: | |
| conn = sqlite3.connect(str(catalog_path)) | |
| conn.row_factory = sqlite3.Row | |
| for gene_id in gene_ids: | |
| cursor = conn.execute(""" | |
| SELECT target_genome_name | |
| FROM gene_to_run | |
| WHERE query_gene_id = ? | |
| """, (gene_id,)) | |
| results[gene_id] = set(row['target_genome_name'] for row in cursor) | |
| conn.close() | |
| except sqlite3.Error as e: | |
| logger.warning(f"Failed to query catalog for {query_genome}: {e}") | |
| return {} | |
| return results | |
| def get_relevant_comparisons( | |
| query_genome: str, | |
| gene_ids: List[str], | |
| requested_comparisons: List[str] | |
| ) -> Tuple[List[str], Dict[str, Set[str]]]: | |
| """ | |
| Filter the list of comparison genomes to only those that have matches. | |
| This is the main entry point for the webapp to optimize searches. | |
| Args: | |
| query_genome: The query genome name | |
| gene_ids: List of gene IDs being searched | |
| requested_comparisons: List of comparison genome names the user selected | |
| Returns: | |
| Tuple of: | |
| - List of comparison genomes that actually have matches (filtered) | |
| - Dict mapping gene_id -> set of target genomes (for reference) | |
| """ | |
| # Get targets for all genes | |
| gene_targets = get_target_genomes_for_genes(query_genome, gene_ids) | |
| if not gene_targets: | |
| # Catalog not available, return all requested comparisons | |
| return requested_comparisons, {} | |
| # Collect all unique target genomes that have matches for any gene | |
| all_relevant_targets = set() | |
| for targets in gene_targets.values(): | |
| all_relevant_targets.update(targets) | |
| # Filter requested comparisons to only those with matches | |
| filtered = [c for c in requested_comparisons if c in all_relevant_targets] | |
| return filtered, gene_targets | |
| def get_comparison_files( | |
| query_genome: str, | |
| target_genome: str, | |
| file_kinds: Optional[List[str]] = None | |
| ) -> Dict[str, Path]: | |
| """ | |
| Get the file paths for a specific comparison pair. | |
| Args: | |
| query_genome: Query genome name | |
| target_genome: Target genome name | |
| file_kinds: List of file kinds to retrieve (default: all) | |
| Options: 'i1.blocks', 'last.filtered', 'lifted.anchors' | |
| Returns: | |
| Dict mapping file_kind -> Path | |
| """ | |
| if file_kinds is None: | |
| file_kinds = ['i1.blocks', 'last.filtered', 'lifted.anchors'] | |
| if not is_metadata_db_available(): | |
| # Fall back to direct file lookup | |
| return _get_comparison_files_direct(query_genome, target_genome, file_kinds) | |
| try: | |
| conn = sqlite3.connect(str(METADATA_DB_PATH)) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.execute(""" | |
| SELECT rf.file_kind, rf.file_path | |
| FROM run_file rf | |
| JOIN comparison_run cr ON rf.run_id = cr.run_id | |
| JOIN genome gq ON cr.query_genome_id = gq.genome_id | |
| JOIN genome gt ON cr.target_genome_id = gt.genome_id | |
| WHERE gq.genome_name = ? AND gt.genome_name = ? | |
| """, (query_genome, target_genome)) | |
| results = {} | |
| for row in cursor: | |
| kind = row['file_kind'] | |
| if kind in file_kinds or any(k in kind for k in file_kinds): | |
| full_path = MCSCAN_RESULTS_DIR / row['file_path'] | |
| if full_path.exists(): | |
| results[kind] = full_path | |
| conn.close() | |
| return results | |
| except sqlite3.Error as e: | |
| logger.warning(f"Failed to query metadata DB: {e}") | |
| return _get_comparison_files_direct(query_genome, target_genome, file_kinds) | |
| def _get_comparison_files_direct( | |
| query_genome: str, | |
| target_genome: str, | |
| file_kinds: List[str] | |
| ) -> Dict[str, Path]: | |
| """Direct file lookup fallback when metadata DB is not available.""" | |
| results = {} | |
| # i1.blocks - directional: query.target.i1.blocks | |
| if 'i1.blocks' in file_kinds: | |
| blocks_path = MCSCAN_RESULTS_DIR / "i1_blocks" / f"{query_genome}.{target_genome}.i1.blocks" | |
| if blocks_path.exists(): | |
| results['i1.blocks'] = blocks_path | |
| # last.filtered - try both orderings | |
| if 'last.filtered' in file_kinds: | |
| for a, b in [(query_genome, target_genome), (target_genome, query_genome)]: | |
| # Try filtered_by_i1 first | |
| filtered_path = MCSCAN_RESULTS_DIR / "last_filtered" / f"{a}.{b}.filtered_by_i1.tsv" | |
| if filtered_path.exists(): | |
| results['last.filtered'] = filtered_path | |
| break | |
| # Then raw last.filtered | |
| last_path = MCSCAN_RESULTS_DIR / "last_filtered" / f"{a}.{b}.last.filtered" | |
| if last_path.exists(): | |
| results['last.filtered'] = last_path | |
| break | |
| # lifted.anchors - try both orderings | |
| if 'lifted.anchors' in file_kinds: | |
| for a, b in [(query_genome, target_genome), (target_genome, query_genome)]: | |
| anchors_path = MCSCAN_RESULTS_DIR / "lifted_anchors" / f"{a}.{b}.lifted.anchors" | |
| if anchors_path.exists(): | |
| results['lifted.anchors'] = anchors_path | |
| break | |
| return results | |
| def get_all_available_genomes() -> List[str]: | |
| """ | |
| Get list of all genomes that have search catalogs available. | |
| Returns: | |
| List of genome names | |
| """ | |
| if not SEARCH_CATALOGS_DIR.exists(): | |
| return [] | |
| return sorted([ | |
| f.stem.replace('.catalog', '') | |
| for f in SEARCH_CATALOGS_DIR.glob("*.catalog.sqlite") | |
| ]) | |
| def get_genome_catalog_stats(query_genome: str) -> Optional[Dict]: | |
| """ | |
| Get statistics about a genome's search catalog. | |
| Args: | |
| query_genome: The genome name | |
| Returns: | |
| Dict with stats or None if catalog not available | |
| """ | |
| catalog_path = SEARCH_CATALOGS_DIR / f"{query_genome}.catalog.sqlite" | |
| if not catalog_path.exists(): | |
| return None | |
| try: | |
| conn = sqlite3.connect(str(catalog_path)) | |
| cursor = conn.execute(""" | |
| SELECT | |
| COUNT(DISTINCT query_gene_id) as total_genes, | |
| COUNT(*) as total_mappings, | |
| COUNT(DISTINCT target_genome_name) as target_genomes | |
| FROM gene_to_run | |
| """) | |
| row = cursor.fetchone() | |
| stats = { | |
| 'genome': query_genome, | |
| 'total_genes_with_matches': row[0], | |
| 'total_mappings': row[1], | |
| 'target_genomes': row[2], | |
| 'catalog_size_mb': catalog_path.stat().st_size / 1024 / 1024 | |
| } | |
| conn.close() | |
| return stats | |
| except sqlite3.Error: | |
| return None | |
| def prefilter_search_comparisons( | |
| query_genome: str, | |
| gene_ids: List[str], | |
| comparisons: List[str], | |
| min_genes_with_matches: int = 1 | |
| ) -> Tuple[List[str], Dict]: | |
| """ | |
| Pre-filter comparisons for a search request. | |
| This is designed to be called by the webapp before invoking search_synteny_hits.py. | |
| It returns only the comparisons worth searching, along with diagnostic info. | |
| Args: | |
| query_genome: Query genome name | |
| gene_ids: List of genes to search | |
| comparisons: List of requested comparison genomes | |
| min_genes_with_matches: Minimum genes that must have matches in a comparison | |
| Returns: | |
| Tuple of: | |
| - Filtered list of comparisons | |
| - Dict with diagnostic info: { | |
| 'catalog_used': bool, | |
| 'original_count': int, | |
| 'filtered_count': int, | |
| 'genes_checked': int, | |
| 'skipped_comparisons': list (those with no matches) | |
| } | |
| """ | |
| diagnostics = { | |
| 'catalog_used': False, | |
| 'original_count': len(comparisons), | |
| 'filtered_count': len(comparisons), | |
| 'genes_checked': len(gene_ids), | |
| 'skipped_comparisons': [] | |
| } | |
| if not is_catalog_available(query_genome): | |
| # No catalog, return all comparisons | |
| return comparisons, diagnostics | |
| diagnostics['catalog_used'] = True | |
| # Get targets for all genes | |
| gene_targets = get_target_genomes_for_genes(query_genome, gene_ids) | |
| # Count how many genes have matches in each comparison genome | |
| comparison_gene_counts = {} | |
| for comp in comparisons: | |
| count = sum(1 for gene, targets in gene_targets.items() if comp in targets) | |
| comparison_gene_counts[comp] = count | |
| # Filter: keep comparisons with at least min_genes_with_matches | |
| filtered = [] | |
| skipped = [] | |
| for comp in comparisons: | |
| if comparison_gene_counts.get(comp, 0) >= min_genes_with_matches: | |
| filtered.append(comp) | |
| else: | |
| skipped.append(comp) | |
| diagnostics['filtered_count'] = len(filtered) | |
| diagnostics['skipped_comparisons'] = skipped | |
| return filtered, diagnostics | |
| # Quick test when run directly | |
| if __name__ == '__main__': | |
| logger.info("SQL Catalog Helper - Quick Test") | |
| logger.info("=" * 50) | |
| logger.info(f"\nMetadata DB available: {is_metadata_db_available()}") | |
| logger.info(f"Metadata DB path: {METADATA_DB_PATH}") | |
| genomes = get_all_available_genomes() | |
| logger.info(f"\nGenomes with catalogs: {len(genomes)}") | |
| if genomes: | |
| test_genome = 'arabidopsis_thaliana' if 'arabidopsis_thaliana' in genomes else genomes[0] | |
| logger.info(f"\nTesting with: {test_genome}") | |
| stats = get_genome_catalog_stats(test_genome) | |
| if stats: | |
| logger.info(f" Genes with matches: {stats['total_genes_with_matches']}") | |
| logger.info(f" Total mappings: {stats['total_mappings']}") | |
| logger.info(f" Target genomes: {stats['target_genomes']}") | |
| # Test gene lookup | |
| test_genes = ['AT1G01010', 'AT1G01020', 'AT2G17930'] | |
| targets = get_target_genomes_for_genes(test_genome, test_genes) | |
| logger.info(f"\n Gene lookup test ({len(test_genes)} genes):") | |
| for gene, tgts in targets.items(): | |
| logger.info(f" {gene}: {len(tgts)} targets") | |