plant-msyn / sql_catalog_helper.py
Yoshigold's picture
Update webapp with improved logging and SQL search
9078740 verified
#!/usr/bin/env python3
"""
SQL Catalog Helper for Plant-mSyn Webapp (Hugging Face Edition).
This module provides fast lookup functions using the SQLite metadata catalog
to determine which target genomes have matches for given query genes.
Use this to filter the list of comparison files BEFORE calling the search scripts,
reducing the number of files that need to be scanned.
Usage in webapp:
from sql_catalog_helper import (
get_target_genomes_for_genes,
get_relevant_comparisons,
is_catalog_available
)
# Check if catalog exists for a genome
if is_catalog_available(query_genome):
# Get only the genomes that actually have matches
relevant_targets = get_target_genomes_for_genes(query_genome, gene_list)
# Filter comparisons to only relevant ones
filtered_comparisons = [c for c in comparisons if c in relevant_targets]
"""
import sqlite3
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
import os
import sys
# Try to import path_config from Scripts
try:
# When running from webapp
_script_dir = Path(__file__).resolve().parent.parent / "Scripts"
if str(_script_dir) not in sys.path:
sys.path.insert(0, str(_script_dir))
from path_config import SQL_DIR, SEARCH_CATALOGS_DIR, METADATA_DB_PATH, MCSCAN_RESULTS_DIR, DATA_DIR
from logger import get_logger
except ImportError:
# Fallback: compute paths relative to this file (HF structure)
_webapp_dir = Path(__file__).resolve().parent
_project_dir = _webapp_dir.parent
DATA_DIR = _project_dir / "data"
SQL_DIR = DATA_DIR / "sql"
SEARCH_CATALOGS_DIR = SQL_DIR / "search_catalogs"
METADATA_DB_PATH = SQL_DIR / "plantmsyn_metadata.db"
MCSCAN_RESULTS_DIR = DATA_DIR / "mcscan_results"
# Minimal fallback logger
import logging
def get_logger(name):
return logging.getLogger(name)
# Module logger
logger = get_logger("synteny.sql_catalog")
def is_catalog_available(query_genome: str) -> bool:
"""
Check if a search catalog exists for the given query genome.
Args:
query_genome: The genome name (e.g., 'arabidopsis_thaliana')
Returns:
True if catalog exists and is readable
"""
catalog_path = SEARCH_CATALOGS_DIR / f"{query_genome}.catalog.sqlite"
return catalog_path.exists() and catalog_path.stat().st_size > 0
def is_metadata_db_available() -> bool:
"""Check if the central metadata database exists."""
return METADATA_DB_PATH.exists() and METADATA_DB_PATH.stat().st_size > 0
def get_target_genomes_for_genes(
query_genome: str,
gene_ids: List[str]
) -> Dict[str, Set[str]]:
"""
Find which target genomes have matches for the given genes.
This is the main function for pre-filtering comparisons.
Args:
query_genome: The query genome name
gene_ids: List of gene IDs to look up
Returns:
Dict mapping gene_id -> set of target genome names that have matches
"""
catalog_path = SEARCH_CATALOGS_DIR / f"{query_genome}.catalog.sqlite"
if not catalog_path.exists():
# Catalog not available, return empty (caller should fall back to scanning all)
return {}
results = {}
try:
conn = sqlite3.connect(str(catalog_path))
conn.row_factory = sqlite3.Row
for gene_id in gene_ids:
cursor = conn.execute("""
SELECT target_genome_name
FROM gene_to_run
WHERE query_gene_id = ?
""", (gene_id,))
results[gene_id] = set(row['target_genome_name'] for row in cursor)
conn.close()
except sqlite3.Error as e:
logger.warning(f"Failed to query catalog for {query_genome}: {e}")
return {}
return results
def get_relevant_comparisons(
query_genome: str,
gene_ids: List[str],
requested_comparisons: List[str]
) -> Tuple[List[str], Dict[str, Set[str]]]:
"""
Filter the list of comparison genomes to only those that have matches.
This is the main entry point for the webapp to optimize searches.
Args:
query_genome: The query genome name
gene_ids: List of gene IDs being searched
requested_comparisons: List of comparison genome names the user selected
Returns:
Tuple of:
- List of comparison genomes that actually have matches (filtered)
- Dict mapping gene_id -> set of target genomes (for reference)
"""
# Get targets for all genes
gene_targets = get_target_genomes_for_genes(query_genome, gene_ids)
if not gene_targets:
# Catalog not available, return all requested comparisons
return requested_comparisons, {}
# Collect all unique target genomes that have matches for any gene
all_relevant_targets = set()
for targets in gene_targets.values():
all_relevant_targets.update(targets)
# Filter requested comparisons to only those with matches
filtered = [c for c in requested_comparisons if c in all_relevant_targets]
return filtered, gene_targets
def get_comparison_files(
query_genome: str,
target_genome: str,
file_kinds: Optional[List[str]] = None
) -> Dict[str, Path]:
"""
Get the file paths for a specific comparison pair.
Args:
query_genome: Query genome name
target_genome: Target genome name
file_kinds: List of file kinds to retrieve (default: all)
Options: 'i1.blocks', 'last.filtered', 'lifted.anchors'
Returns:
Dict mapping file_kind -> Path
"""
if file_kinds is None:
file_kinds = ['i1.blocks', 'last.filtered', 'lifted.anchors']
if not is_metadata_db_available():
# Fall back to direct file lookup
return _get_comparison_files_direct(query_genome, target_genome, file_kinds)
try:
conn = sqlite3.connect(str(METADATA_DB_PATH))
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT rf.file_kind, rf.file_path
FROM run_file rf
JOIN comparison_run cr ON rf.run_id = cr.run_id
JOIN genome gq ON cr.query_genome_id = gq.genome_id
JOIN genome gt ON cr.target_genome_id = gt.genome_id
WHERE gq.genome_name = ? AND gt.genome_name = ?
""", (query_genome, target_genome))
results = {}
for row in cursor:
kind = row['file_kind']
if kind in file_kinds or any(k in kind for k in file_kinds):
full_path = MCSCAN_RESULTS_DIR / row['file_path']
if full_path.exists():
results[kind] = full_path
conn.close()
return results
except sqlite3.Error as e:
logger.warning(f"Failed to query metadata DB: {e}")
return _get_comparison_files_direct(query_genome, target_genome, file_kinds)
def _get_comparison_files_direct(
query_genome: str,
target_genome: str,
file_kinds: List[str]
) -> Dict[str, Path]:
"""Direct file lookup fallback when metadata DB is not available."""
results = {}
# i1.blocks - directional: query.target.i1.blocks
if 'i1.blocks' in file_kinds:
blocks_path = MCSCAN_RESULTS_DIR / "i1_blocks" / f"{query_genome}.{target_genome}.i1.blocks"
if blocks_path.exists():
results['i1.blocks'] = blocks_path
# last.filtered - try both orderings
if 'last.filtered' in file_kinds:
for a, b in [(query_genome, target_genome), (target_genome, query_genome)]:
# Try filtered_by_i1 first
filtered_path = MCSCAN_RESULTS_DIR / "last_filtered" / f"{a}.{b}.filtered_by_i1.tsv"
if filtered_path.exists():
results['last.filtered'] = filtered_path
break
# Then raw last.filtered
last_path = MCSCAN_RESULTS_DIR / "last_filtered" / f"{a}.{b}.last.filtered"
if last_path.exists():
results['last.filtered'] = last_path
break
# lifted.anchors - try both orderings
if 'lifted.anchors' in file_kinds:
for a, b in [(query_genome, target_genome), (target_genome, query_genome)]:
anchors_path = MCSCAN_RESULTS_DIR / "lifted_anchors" / f"{a}.{b}.lifted.anchors"
if anchors_path.exists():
results['lifted.anchors'] = anchors_path
break
return results
def get_all_available_genomes() -> List[str]:
"""
Get list of all genomes that have search catalogs available.
Returns:
List of genome names
"""
if not SEARCH_CATALOGS_DIR.exists():
return []
return sorted([
f.stem.replace('.catalog', '')
for f in SEARCH_CATALOGS_DIR.glob("*.catalog.sqlite")
])
def get_genome_catalog_stats(query_genome: str) -> Optional[Dict]:
"""
Get statistics about a genome's search catalog.
Args:
query_genome: The genome name
Returns:
Dict with stats or None if catalog not available
"""
catalog_path = SEARCH_CATALOGS_DIR / f"{query_genome}.catalog.sqlite"
if not catalog_path.exists():
return None
try:
conn = sqlite3.connect(str(catalog_path))
cursor = conn.execute("""
SELECT
COUNT(DISTINCT query_gene_id) as total_genes,
COUNT(*) as total_mappings,
COUNT(DISTINCT target_genome_name) as target_genomes
FROM gene_to_run
""")
row = cursor.fetchone()
stats = {
'genome': query_genome,
'total_genes_with_matches': row[0],
'total_mappings': row[1],
'target_genomes': row[2],
'catalog_size_mb': catalog_path.stat().st_size / 1024 / 1024
}
conn.close()
return stats
except sqlite3.Error:
return None
def prefilter_search_comparisons(
query_genome: str,
gene_ids: List[str],
comparisons: List[str],
min_genes_with_matches: int = 1
) -> Tuple[List[str], Dict]:
"""
Pre-filter comparisons for a search request.
This is designed to be called by the webapp before invoking search_synteny_hits.py.
It returns only the comparisons worth searching, along with diagnostic info.
Args:
query_genome: Query genome name
gene_ids: List of genes to search
comparisons: List of requested comparison genomes
min_genes_with_matches: Minimum genes that must have matches in a comparison
Returns:
Tuple of:
- Filtered list of comparisons
- Dict with diagnostic info: {
'catalog_used': bool,
'original_count': int,
'filtered_count': int,
'genes_checked': int,
'skipped_comparisons': list (those with no matches)
}
"""
diagnostics = {
'catalog_used': False,
'original_count': len(comparisons),
'filtered_count': len(comparisons),
'genes_checked': len(gene_ids),
'skipped_comparisons': []
}
if not is_catalog_available(query_genome):
# No catalog, return all comparisons
return comparisons, diagnostics
diagnostics['catalog_used'] = True
# Get targets for all genes
gene_targets = get_target_genomes_for_genes(query_genome, gene_ids)
# Count how many genes have matches in each comparison genome
comparison_gene_counts = {}
for comp in comparisons:
count = sum(1 for gene, targets in gene_targets.items() if comp in targets)
comparison_gene_counts[comp] = count
# Filter: keep comparisons with at least min_genes_with_matches
filtered = []
skipped = []
for comp in comparisons:
if comparison_gene_counts.get(comp, 0) >= min_genes_with_matches:
filtered.append(comp)
else:
skipped.append(comp)
diagnostics['filtered_count'] = len(filtered)
diagnostics['skipped_comparisons'] = skipped
return filtered, diagnostics
# Quick test when run directly
if __name__ == '__main__':
logger.info("SQL Catalog Helper - Quick Test")
logger.info("=" * 50)
logger.info(f"\nMetadata DB available: {is_metadata_db_available()}")
logger.info(f"Metadata DB path: {METADATA_DB_PATH}")
genomes = get_all_available_genomes()
logger.info(f"\nGenomes with catalogs: {len(genomes)}")
if genomes:
test_genome = 'arabidopsis_thaliana' if 'arabidopsis_thaliana' in genomes else genomes[0]
logger.info(f"\nTesting with: {test_genome}")
stats = get_genome_catalog_stats(test_genome)
if stats:
logger.info(f" Genes with matches: {stats['total_genes_with_matches']}")
logger.info(f" Total mappings: {stats['total_mappings']}")
logger.info(f" Target genomes: {stats['target_genomes']}")
# Test gene lookup
test_genes = ['AT1G01010', 'AT1G01020', 'AT2G17930']
targets = get_target_genomes_for_genes(test_genome, test_genes)
logger.info(f"\n Gene lookup test ({len(test_genes)} genes):")
for gene, tgts in targets.items():
logger.info(f" {gene}: {len(tgts)} targets")