Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Plant-mSyn - Plant Microsynteny Web Application (Hugging Face Edition) | |
| Flask backend for serving the microsynteny plotting interface | |
| This version is adapted for Hugging Face Spaces deployment. | |
| Data is loaded from a separate Hugging Face Dataset repository. | |
| """ | |
| import os | |
| import csv | |
| import subprocess | |
| import tempfile | |
| import shutil | |
| import json | |
| import uuid | |
| import threading | |
| import time | |
| import sys | |
| import random | |
| import string | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict | |
| from flask import Flask, jsonify, request, send_file, render_template | |
| from flask_cors import CORS | |
| from flask_limiter import Limiter | |
| from flask_limiter.util import get_remote_address | |
| from werkzeug.utils import secure_filename | |
| # Hugging Face Hub for dataset access (optional - for downloading data) | |
| try: | |
| from huggingface_hub import snapshot_download, hf_hub_download | |
| HF_HUB_AVAILABLE = True | |
| except ImportError: | |
| HF_HUB_AVAILABLE = False | |
| # Load environment variables from .env file (for local development) | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except ImportError: | |
| pass # python-dotenv not installed, use system env vars only | |
| # ============================================================================ | |
| # Hugging Face Configuration | |
| # ============================================================================ | |
| # Dataset repository ID (change this to your actual dataset repo) | |
| HF_DATASET_REPO = os.environ.get('HF_DATASET_REPO', 'Yoshigold/plant-msyn-data') | |
| # Base directories - adapted for Hugging Face Spaces | |
| # On HF Spaces, app.py is at /app/app.py, so APP_DIR is /app | |
| # In local dev, app.py is at huggingface/webapp/app.py | |
| APP_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # PROJECT_DIR is the parent of webapp/ (huggingface/ folder) | |
| # This is needed so that path_config.py in Scripts/ computes paths consistently | |
| # In HF Spaces, everything is in /app, so parent is also fine | |
| PROJECT_DIR = os.path.dirname(APP_DIR) if os.path.basename(APP_DIR) == 'webapp' else APP_DIR | |
| BASE_DIR = PROJECT_DIR | |
| # Data directory - where dataset files will be loaded from | |
| # IMPORTANT: Data is in 'data/' folder which is a sibling of 'webapp/', not inside it | |
| # Structure: huggingface/data/ and huggingface/webapp/ and huggingface/Scripts/ | |
| # On HF Spaces, data is at /app/data (when everything is flattened into /app) | |
| def _compute_data_dir(): | |
| """Compute DATA_DIR with environment variable override.""" | |
| env_data_dir = os.environ.get('PLANTMSYN_DATA_DIR') | |
| if env_data_dir: | |
| return env_data_dir | |
| # Check if data folder exists as sibling (local dev structure) | |
| sibling_data = os.path.join(PROJECT_DIR, 'data') | |
| if os.path.isdir(sibling_data): | |
| return sibling_data | |
| # Fallback to /app/data for HF Spaces | |
| return os.path.join(APP_DIR, 'data') | |
| DATA_DIR = _compute_data_dir() | |
| # Set environment variable so scripts can find data directory | |
| os.environ['PLANTMSYN_DATA_DIR'] = DATA_DIR | |
| os.environ['PLANTMSYN_PROJECT_DIR'] = PROJECT_DIR | |
| def initialize_data(): | |
| """Download dataset from HuggingFace Hub if not already present.""" | |
| # Check if mcscan_results exists (the key folder we need) | |
| mcscan_dir = os.path.join(DATA_DIR, 'mcscan_results') | |
| if HF_HUB_AVAILABLE and not os.path.exists(mcscan_dir): | |
| print(f"Downloading dataset from {HF_DATASET_REPO} to {DATA_DIR}...") | |
| try: | |
| snapshot_download( | |
| repo_id=HF_DATASET_REPO, | |
| repo_type='dataset', | |
| local_dir=DATA_DIR, | |
| cache_dir=None, # Don't use cache, download directly | |
| ) | |
| print(f"Dataset downloaded to {DATA_DIR}") | |
| except Exception as e: | |
| print(f"Failed to download dataset: {e}") | |
| print("App will continue but may not have access to data files") | |
| else: | |
| print(f"Using existing data directory: {DATA_DIR}") | |
| if os.path.exists(mcscan_dir): | |
| print(f"MCscan results found at: {mcscan_dir}") | |
| else: | |
| print(f"WARNING: MCscan results NOT found at: {mcscan_dir}") | |
| # Initialize data on startup | |
| initialize_data() | |
| # Scripts directory - on HF Spaces, scripts are in the same folder as app.py | |
| SCRIPTS_PATH = APP_DIR | |
| if SCRIPTS_PATH not in sys.path: | |
| sys.path.insert(0, SCRIPTS_PATH) | |
| from genome_config import ( | |
| GENOME_DISPLAY_NAMES, | |
| SHORT_DISPLAY_NAMES, | |
| EXAMPLE_GENE_IDS, | |
| get_genome_display_name | |
| ) | |
| # Import logger for proper logging (replaces print statements) | |
| from logger import get_webapp_logger | |
| logger = get_webapp_logger() | |
| # Import centralized error message utilities | |
| from error_messages import extract_user_error_from_log | |
| # ============================================================================ | |
| # Analytics DISABLED for Hugging Face Spaces | |
| # ============================================================================ | |
| # Analytics is disabled on HF Spaces as email reports won't work | |
| # Usage metrics can be tracked via HF's built-in Space analytics instead | |
| ANALYTICS_AVAILABLE = False | |
| # Dummy record_event function for disabled analytics | |
| def record_event(*args, **kwargs): | |
| pass | |
| # Feature constants (for compatibility with code that references them) | |
| FEATURE_PLOT = 'plot' | |
| FEATURE_PLOT_TWEAKS = 'plot_tweaks' | |
| FEATURE_DISCOVERY = 'discovery' | |
| FEATURE_ADVANCED_SEARCH = 'advanced_search' | |
| FEATURE_CUSTOM_GENOME = 'custom_genome' | |
| FEATURE_CUSTOM_SYNTENY = 'custom_synteny' | |
| # Import SQL catalog helper for fast pre-filtering of searches | |
| try: | |
| from sql_catalog_helper import ( | |
| is_catalog_available, | |
| get_target_genomes_for_genes, | |
| prefilter_search_comparisons | |
| ) | |
| SQL_CATALOG_AVAILABLE = True | |
| except ImportError: | |
| SQL_CATALOG_AVAILABLE = False | |
| logger.warning("SQL catalog helper not available, searches will scan all files") | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Rate limiting - 5 uploads per IP per hour | |
| limiter = Limiter( | |
| app=app, | |
| key_func=get_remote_address, | |
| default_limits=[], | |
| storage_uri="memory://" | |
| ) | |
| # Custom error handler for rate limit exceeded | |
| def ratelimit_handler(e): | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'You have exceeded the upload limit of 5 per hour. Please wait 1 hour before trying again.' | |
| }), 429 | |
| # ============================================================================ | |
| # MCscan Job Queue System - Limit Concurrent Runs | |
| # ============================================================================ | |
| # Semaphore to limit concurrent MCscan jobs (only 1 at a time) | |
| MCSCAN_JOB_SEMAPHORE = threading.Semaphore(1) | |
| # Track queued jobs for status reporting | |
| _job_queue_status = { | |
| 'current_job': None, # run_key of currently running job | |
| 'queue_position': {}, # run_key -> queue position | |
| 'queue_lock': threading.Lock() | |
| } | |
| # IP-based run tracking (limit MCscan runs per IP per 3 hours) | |
| _ip_run_tracker = { | |
| 'runs': defaultdict(list), # IP -> list of timestamps | |
| 'lock': threading.Lock() | |
| } | |
| MAX_RUNS_PER_IP = 5 # Maximum MCscan runs per IP per 3-hour window | |
| RUN_LIMIT_WINDOW_HOURS = 3 # Time window for run limit | |
| # Name length limits | |
| MAX_NAME_LENGTH = 100 # Maximum characters for project names, run names, display names | |
| def validate_name_length(name, field_name='Name'): | |
| """Validate that a name doesn't exceed the maximum length. Returns (valid, error_msg).""" | |
| if name and len(name) > MAX_NAME_LENGTH: | |
| return False, f'{field_name} must be {MAX_NAME_LENGTH} characters or less (currently {len(name)} characters)' | |
| return True, None | |
| def check_ip_run_limit(ip_address): | |
| """Check if IP has exceeded run limit. Returns (allowed, message).""" | |
| with _ip_run_tracker['lock']: | |
| now = datetime.now() | |
| cutoff = now - timedelta(hours=RUN_LIMIT_WINDOW_HOURS) | |
| # Clean old entries | |
| _ip_run_tracker['runs'][ip_address] = [ | |
| ts for ts in _ip_run_tracker['runs'][ip_address] | |
| if ts > cutoff | |
| ] | |
| runs_in_window = len(_ip_run_tracker['runs'][ip_address]) | |
| if runs_in_window >= MAX_RUNS_PER_IP: | |
| oldest = min(_ip_run_tracker['runs'][ip_address]) | |
| wait_time = oldest + timedelta(hours=RUN_LIMIT_WINDOW_HOURS) - now | |
| hours = int(wait_time.total_seconds() // 3600) | |
| minutes = int((wait_time.total_seconds() % 3600) // 60) | |
| return False, f'You have reached the limit of {MAX_RUNS_PER_IP} MCscan runs per {RUN_LIMIT_WINDOW_HOURS} hours. Please wait {hours}h {minutes}m.' | |
| return True, f'{MAX_RUNS_PER_IP - runs_in_window} runs remaining in current {RUN_LIMIT_WINDOW_HOURS}-hour window' | |
| def record_ip_run(ip_address): | |
| """Record that an IP started a run.""" | |
| with _ip_run_tracker['lock']: | |
| _ip_run_tracker['runs'][ip_address].append(datetime.now()) | |
| def get_queue_position(run_key): | |
| """Get current queue position for a job (0 = running, >0 = waiting).""" | |
| with _job_queue_status['queue_lock']: | |
| return _job_queue_status['queue_position'].get(run_key, -1) | |
| def acquire_job_slot(run_key, timeout=None): | |
| """Try to acquire a slot to run MCscan. Returns True if acquired.""" | |
| # Update queue position | |
| with _job_queue_status['queue_lock']: | |
| if run_key not in _job_queue_status['queue_position']: | |
| _job_queue_status['queue_position'][run_key] = len(_job_queue_status['queue_position']) | |
| acquired = MCSCAN_JOB_SEMAPHORE.acquire(blocking=True, timeout=timeout) | |
| if acquired: | |
| with _job_queue_status['queue_lock']: | |
| _job_queue_status['current_job'] = run_key | |
| _job_queue_status['queue_position'][run_key] = 0 | |
| # Decrement position for all other waiting jobs | |
| for key in list(_job_queue_status['queue_position'].keys()): | |
| if key != run_key and _job_queue_status['queue_position'][key] > 0: | |
| _job_queue_status['queue_position'][key] -= 1 | |
| return acquired | |
| def release_job_slot(run_key): | |
| """Release the MCscan job slot.""" | |
| with _job_queue_status['queue_lock']: | |
| if _job_queue_status['current_job'] == run_key: | |
| _job_queue_status['current_job'] = None | |
| if run_key in _job_queue_status['queue_position']: | |
| del _job_queue_status['queue_position'][run_key] | |
| MCSCAN_JOB_SEMAPHORE.release() | |
| # ============================================================================ | |
| # Configuration (Hugging Face Spaces Adapted) | |
| # ============================================================================ | |
| # Auto-cleanup settings | |
| CUSTOM_GENOME_RETENTION_DAYS = 14 # Delete custom genomes after 14 days | |
| OUTPUT_RETENTION_HOURS = 24 # Delete generated plots/tables after 24 hours | |
| # Base directories - adapted for HF Spaces structure | |
| # APP_DIR and BASE_DIR are defined above in HF Configuration section | |
| # SCRIPT_DIR points to where scripts are located (Scripts/ folder) | |
| # In local dev: huggingface/Scripts/, in HF Spaces: /app/ (flattened) | |
| SCRIPT_DIR = os.path.join(PROJECT_DIR, 'Scripts') if os.path.isdir(os.path.join(PROJECT_DIR, 'Scripts')) else PROJECT_DIR | |
| SCRIPTS_DIR = SCRIPT_DIR # Alias for compatibility | |
| # Data paths - point to HF dataset folder structure | |
| ANNOTATIONS_DIR = os.path.join(DATA_DIR, 'annotations') # Genome annotations folder | |
| OUTPUT_DIR = os.path.join(APP_DIR, 'Microsynteny_plots') # Main output folder | |
| # MCscan results directory - in data folder for HF | |
| MCSCAN_RESULTS_DIR = os.path.join(DATA_DIR, 'mcscan_results') | |
| CUSTOM_META_DIR = os.path.join(MCSCAN_RESULTS_DIR, 'custom_meta') # Custom genome metadata storage | |
| CUSTOM_TEMP_DIR = os.path.join(tempfile.gettempdir(), 'plantmsyn_custom') # Temp dir for processing | |
| # Upload configuration - File size limits | |
| # These limits balance usability with protection against abuse | |
| MAX_GFF3_SIZE = 1024 * 1024 * 1024 # 1 GB - GFF3 annotation files can be large | |
| MAX_PEP_SIZE = 1024 * 1024 * 1024 # 1 GB - protein FASTA files (large genomes) | |
| MAX_BED_SIZE = 200 * 1024 * 1024 # 200 MB - BED files (large genomes) | |
| MAX_ANNOTATION_SIZE = 50 * 1024 * 1024 # 50 MB - custom annotation TSV files | |
| ALLOWED_EXTENSIONS = {'gff3', 'gff', 'pep', 'fa', 'fasta', 'faa'} | |
| # MCscan job timeout (5 hours max) | |
| MCSCAN_JOB_TIMEOUT_SECONDS = 5 * 60 * 60 # 5 hours = 18000 seconds | |
| # Python binary - configurable via environment variable for cloud deployment | |
| # Falls back to current Python interpreter if not specified | |
| PYTHON_BIN = os.environ.get('PYTHON_BIN', sys.executable) | |
| # ============================================================================ | |
| # Input Validation / Sanitization | |
| # ============================================================================ | |
| import re | |
| # Regex for valid gene IDs: letters, numbers, underscores, dots, hyphens, colons | |
| # Examples: AT1G01010, HORVU1Hr1G000010, Glyma.01G000100, LOC_Os01g01010 | |
| GENE_ID_PATTERN = re.compile(r'^[A-Za-z0-9_.\-:]+$') | |
| MAX_GENE_ID_LENGTH = 100 | |
| MAX_GENES_PER_REQUEST = 50 # Limit number of genes in a single request | |
| def is_valid_gene_id(gene_id): | |
| """Validate a gene ID contains only allowed characters.""" | |
| if not gene_id or not isinstance(gene_id, str): | |
| return False | |
| if len(gene_id) > MAX_GENE_ID_LENGTH: | |
| return False | |
| return bool(GENE_ID_PATTERN.match(gene_id)) | |
| def sanitize_gene_ids(genes): | |
| """Validate and sanitize a list of gene IDs. | |
| Returns: (valid_genes, error_message) | |
| - valid_genes: list of validated gene IDs (or None if error) | |
| - error_message: string describing the error (or None if valid) | |
| """ | |
| if not genes: | |
| return None, 'No genes provided' | |
| if not isinstance(genes, list): | |
| return None, 'Genes must be a list' | |
| if len(genes) > MAX_GENES_PER_REQUEST: | |
| return None, f'Too many genes ({len(genes)}). Maximum is {MAX_GENES_PER_REQUEST}' | |
| invalid_genes = [] | |
| valid_genes = [] | |
| for gene in genes: | |
| gene_str = str(gene).strip() | |
| if is_valid_gene_id(gene_str): | |
| valid_genes.append(gene_str) | |
| else: | |
| invalid_genes.append(gene_str[:50]) # Truncate for safety in error message | |
| if invalid_genes: | |
| sample = ', '.join(invalid_genes[:3]) | |
| if len(invalid_genes) > 3: | |
| sample += f' (and {len(invalid_genes) - 3} more)' | |
| return None, f'Invalid gene ID format: {sample}. Use only letters, numbers, underscores, dots, hyphens, colons.' | |
| return valid_genes, None | |
| # ============================================================================ | |
| # Auto-Discovery of Available Genomes | |
| # ============================================================================ | |
| def discover_available_genomes(): | |
| """Load database genomes from whitelist file. | |
| Uses database_genomes.txt in bed_files/ folder to determine which genomes | |
| are official database genomes (vs user-uploaded custom genomes). | |
| This prevents user uploads from appearing in the public genome dropdown. | |
| To add a new database genome, add its name to database_genomes.txt | |
| """ | |
| bed_files_dir = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files') | |
| whitelist_file = os.path.join(bed_files_dir, 'database_genomes.txt') | |
| genomes = [] | |
| if os.path.exists(whitelist_file): | |
| # Use whitelist if it exists | |
| try: | |
| with open(whitelist_file, 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| # Skip empty lines and comments | |
| if line and not line.startswith('#'): | |
| # Verify the bed file actually exists | |
| bed_file = os.path.join(bed_files_dir, f'{line}.bed') | |
| if os.path.exists(bed_file): | |
| genomes.append(line) | |
| else: | |
| logger.warning(f"Genome in whitelist but no bed file found: {line}") | |
| except Exception as e: | |
| logger.error(f"Failed to read database_genomes.txt: {e}") | |
| # Fallback to scanning all bed files if whitelist read fails | |
| genomes = _scan_all_bed_files(bed_files_dir) | |
| else: | |
| # Fallback: scan all bed files (legacy behavior) | |
| logger.warning("database_genomes.txt not found, falling back to scanning all bed files") | |
| genomes = _scan_all_bed_files(bed_files_dir) | |
| return sorted(genomes) | |
| def _scan_all_bed_files(bed_files_dir): | |
| """Fallback function to scan all bed files in directory""" | |
| genomes = [] | |
| if os.path.exists(bed_files_dir): | |
| for f in os.listdir(bed_files_dir): | |
| if f.endswith('.bed'): | |
| genome_name = f[:-4] # Remove .bed extension | |
| genomes.append(genome_name) | |
| return genomes | |
| # GENOME_DISPLAY_NAMES, SHORT_DISPLAY_NAMES, EXAMPLE_GENE_IDS, and helper functions | |
| # are now imported from genome_config.py in the Scripts folder | |
| # Auto-discover available genomes from bed_files folder | |
| AVAILABLE_GENOMES = discover_available_genomes() | |
| # Cache for genome annotations | |
| _annotation_cache = {} | |
| # ============================================================================ | |
| # Auto-Cleanup Scheduler for Custom Genomes | |
| # ============================================================================ | |
| def cleanup_mcscan_results_files(manifest): | |
| """Delete custom genome files from main Mcscan_results folder based on manifest""" | |
| files_deleted = 0 | |
| mcscan_files = manifest.get('mcscan_results_files', []) | |
| for relative_path in mcscan_files: | |
| full_path = os.path.join(MCSCAN_RESULTS_DIR, relative_path) | |
| if os.path.exists(full_path): | |
| try: | |
| os.remove(full_path) | |
| files_deleted += 1 | |
| logger.info(f"Cleanup: Deleted Mcscan file: {relative_path}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Failed to delete {relative_path}: {e}") | |
| return files_deleted | |
| def cleanup_old_custom_genomes(): | |
| """Delete custom genome metadata older than CUSTOM_GENOME_RETENTION_DAYS | |
| Also cleans up associated files in main Mcscan_results folder""" | |
| if not os.path.exists(CUSTOM_META_DIR): | |
| return 0 | |
| deleted_count = 0 | |
| files_deleted = 0 | |
| cutoff_time = datetime.now() - timedelta(days=CUSTOM_GENOME_RETENTION_DAYS) | |
| for run_key in os.listdir(CUSTOM_META_DIR): | |
| run_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| if not os.path.isdir(run_dir): | |
| continue | |
| should_delete = False | |
| manifest_file = os.path.join(run_dir, 'manifest.json') | |
| manifest = {} | |
| # Check manifest for created_at date | |
| if os.path.exists(manifest_file): | |
| try: | |
| with open(manifest_file, 'r') as f: | |
| manifest = json.load(f) | |
| created_at = manifest.get('created_at', '') | |
| if created_at: | |
| # Parse ISO format datetime | |
| created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) | |
| if created_dt.replace(tzinfo=None) < cutoff_time: | |
| should_delete = True | |
| except: | |
| pass | |
| # Fallback to directory modification time | |
| if not should_delete: | |
| dir_mtime = datetime.fromtimestamp(os.path.getmtime(run_dir)) | |
| if dir_mtime < cutoff_time: | |
| should_delete = True | |
| if should_delete: | |
| # First, clean up files in main Mcscan_results folder | |
| files_deleted += cleanup_mcscan_results_files(manifest) | |
| # Then delete the custom genome metadata folder | |
| try: | |
| shutil.rmtree(run_dir) | |
| deleted_count += 1 | |
| logger.info(f"Cleanup: Deleted expired custom genome: {run_key}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Failed to delete {run_key}: {e}") | |
| if files_deleted > 0: | |
| logger.info(f"Cleanup: Also removed {files_deleted} files from Mcscan_results") | |
| return deleted_count | |
| def cleanup_old_custom_synteny(): | |
| """Delete custom synteny project metadata older than CUSTOM_GENOME_RETENTION_DAYS | |
| Also cleans up associated genome entries in custom_meta""" | |
| # Import here to avoid circular reference at module load time | |
| synteny_meta_dir = os.path.join(MCSCAN_RESULTS_DIR, 'custom_synteny_meta') | |
| if not os.path.exists(synteny_meta_dir): | |
| return 0 | |
| deleted_count = 0 | |
| cutoff_time = datetime.now() - timedelta(days=CUSTOM_GENOME_RETENTION_DAYS) | |
| for run_key in os.listdir(synteny_meta_dir): | |
| run_dir = os.path.join(synteny_meta_dir, run_key) | |
| if not os.path.isdir(run_dir): | |
| continue | |
| should_delete = False | |
| manifest_file = os.path.join(run_dir, 'manifest.json') | |
| manifest = {} | |
| # Check manifest for created_at date | |
| if os.path.exists(manifest_file): | |
| try: | |
| with open(manifest_file, 'r') as f: | |
| manifest = json.load(f) | |
| created_at = manifest.get('created_at', '') | |
| if created_at: | |
| created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) | |
| if created_dt.replace(tzinfo=None) < cutoff_time: | |
| should_delete = True | |
| except: | |
| pass | |
| # Fallback to directory modification time | |
| if not should_delete: | |
| dir_mtime = datetime.fromtimestamp(os.path.getmtime(run_dir)) | |
| if dir_mtime < cutoff_time: | |
| should_delete = True | |
| if should_delete: | |
| # Clean up genome entries in custom_meta that were created for this project | |
| genome_keys = manifest.get('genome_keys', {}) | |
| for genome_key in genome_keys.values(): | |
| genome_meta_dir = os.path.join(CUSTOM_META_DIR, genome_key) | |
| if os.path.exists(genome_meta_dir): | |
| # Check if this genome_meta has its own manifest with mcscan_results_files | |
| genome_manifest_file = os.path.join(genome_meta_dir, 'manifest.json') | |
| if os.path.exists(genome_manifest_file): | |
| try: | |
| with open(genome_manifest_file, 'r') as f: | |
| genome_manifest = json.load(f) | |
| cleanup_mcscan_results_files(genome_manifest) | |
| except: | |
| pass | |
| try: | |
| shutil.rmtree(genome_meta_dir) | |
| logger.info(f"Cleanup: Deleted custom synteny genome: {genome_key}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Failed to delete genome {genome_key}: {e}") | |
| # Delete the custom synteny project folder | |
| try: | |
| shutil.rmtree(run_dir) | |
| deleted_count += 1 | |
| logger.info(f"Cleanup: Deleted expired custom synteny project: {run_key}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Failed to delete {run_key}: {e}") | |
| return deleted_count | |
| def cleanup_old_output_files(): | |
| """Delete generated output folders (plots, tables) older than OUTPUT_RETENTION_HOURS. | |
| These are the timestamped folders in OUTPUT_DIR containing PNG, SVG, and CSV files.""" | |
| if not os.path.exists(OUTPUT_DIR): | |
| return 0 | |
| deleted_count = 0 | |
| cutoff_time = datetime.now() - timedelta(hours=OUTPUT_RETENTION_HOURS) | |
| for folder_name in os.listdir(OUTPUT_DIR): | |
| folder_path = os.path.join(OUTPUT_DIR, folder_name) | |
| # Only process directories (output folders are timestamped directories) | |
| if not os.path.isdir(folder_path): | |
| continue | |
| # Check folder modification time | |
| try: | |
| folder_mtime = datetime.fromtimestamp(os.path.getmtime(folder_path)) | |
| if folder_mtime < cutoff_time: | |
| # Delete the entire output folder | |
| shutil.rmtree(folder_path) | |
| deleted_count += 1 | |
| logger.info(f"Cleanup: Deleted expired output folder: {folder_name}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Failed to delete output folder {folder_name}: {e}") | |
| return deleted_count | |
| def cleanup_old_temp_files(): | |
| """Delete orphaned temp files older than OUTPUT_RETENTION_HOURS. | |
| This cleans up: | |
| - discovery_annotations_*.tsv files in system temp | |
| - Orphaned directories in CUSTOM_TEMP_DIR | |
| These temp files should normally be deleted after use, but this handles | |
| cases where the server crashed or sessions were abandoned. | |
| """ | |
| deleted_count = 0 | |
| cutoff_time = datetime.now() - timedelta(hours=OUTPUT_RETENTION_HOURS) | |
| # Clean up discovery annotation temp files in system temp directory | |
| temp_dir = tempfile.gettempdir() | |
| try: | |
| for filename in os.listdir(temp_dir): | |
| if filename.startswith('discovery_annotations_') and filename.endswith('.tsv'): | |
| file_path = os.path.join(temp_dir, filename) | |
| try: | |
| file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path)) | |
| if file_mtime < cutoff_time: | |
| os.remove(file_path) | |
| deleted_count += 1 | |
| logger.info(f"Cleanup: Deleted expired discovery annotation file: {filename}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Failed to delete temp file {filename}: {e}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Error scanning system temp directory: {e}") | |
| # Clean up orphaned directories in CUSTOM_TEMP_DIR | |
| if os.path.exists(CUSTOM_TEMP_DIR): | |
| try: | |
| for dirname in os.listdir(CUSTOM_TEMP_DIR): | |
| dir_path = os.path.join(CUSTOM_TEMP_DIR, dirname) | |
| if os.path.isdir(dir_path): | |
| try: | |
| dir_mtime = datetime.fromtimestamp(os.path.getmtime(dir_path)) | |
| if dir_mtime < cutoff_time: | |
| shutil.rmtree(dir_path) | |
| deleted_count += 1 | |
| logger.info(f"Cleanup: Deleted orphaned temp directory: {dirname}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Failed to delete temp dir {dirname}: {e}") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Error scanning CUSTOM_TEMP_DIR: {e}") | |
| return deleted_count | |
| def start_cleanup_scheduler(): | |
| """Start background thread that runs cleanup periodically. | |
| Output files are cleaned hourly (24h retention), custom genomes daily (14d retention).""" | |
| def cleanup_loop(): | |
| hourly_counter = 0 | |
| while True: | |
| try: | |
| # Always clean up output files (runs every hour) | |
| deleted_outputs = cleanup_old_output_files() | |
| if deleted_outputs > 0: | |
| logger.info(f"Cleanup: Removed {deleted_outputs} expired output folder(s)") | |
| # Also clean up orphaned temp files hourly | |
| deleted_temp = cleanup_old_temp_files() | |
| if deleted_temp > 0: | |
| logger.info(f"Cleanup: Removed {deleted_temp} orphaned temp file(s)") | |
| # Clean up custom genomes every 24 iterations (once per day) | |
| hourly_counter += 1 | |
| if hourly_counter >= 24: | |
| hourly_counter = 0 | |
| deleted = cleanup_old_custom_genomes() | |
| if deleted > 0: | |
| logger.info(f"Cleanup: Removed {deleted} expired custom genome(s)") | |
| # Also cleanup custom synteny projects | |
| deleted_synteny = cleanup_old_custom_synteny() | |
| if deleted_synteny > 0: | |
| logger.info(f"Cleanup: Removed {deleted_synteny} expired custom synteny project(s)") | |
| except Exception as e: | |
| logger.error(f"Cleanup: Error during cleanup: {e}") | |
| # Sleep for 1 hour | |
| time.sleep(3600) | |
| thread = threading.Thread(target=cleanup_loop, daemon=True) | |
| thread.start() | |
| logger.info(f"Cleanup: Auto-cleanup scheduler started (outputs: {OUTPUT_RETENTION_HOURS}h, custom genomes: {CUSTOM_GENOME_RETENTION_DAYS} days)") | |
| # ============================================================================ | |
| # Helper Functions | |
| # ============================================================================ | |
| def load_genome_annotations(genome_name): | |
| """Load gene annotations for a genome from Annotations folder""" | |
| if genome_name in _annotation_cache: | |
| return _annotation_cache[genome_name] | |
| annotation_file = os.path.join(ANNOTATIONS_DIR, genome_name, 'gene_annotation.tsv') | |
| annotations = {} | |
| if os.path.exists(annotation_file): | |
| try: | |
| with open(annotation_file, 'r') as f: | |
| reader = csv.DictReader(f, delimiter='\t') | |
| for row in reader: | |
| gene_id = row.get('gene', '') | |
| description = row.get('description', '') | |
| if gene_id: | |
| annotations[gene_id] = description | |
| except Exception as e: | |
| logger.error(f"Error loading annotations for {genome_name}: {e}") | |
| _annotation_cache[genome_name] = annotations | |
| return annotations | |
| def get_gene_annotation(genome_name, gene_id): | |
| """Get annotation for a specific gene""" | |
| annotations = load_genome_annotations(genome_name) | |
| return annotations.get(gene_id, '') | |
| def generate_layouts(n): | |
| """ | |
| Generate all valid layout configurations for n total genomes (including query). | |
| Each layout is a list of integers representing genomes per row. | |
| Query genome must be alone in one row (at least one row with value 1). | |
| """ | |
| if n < 2 or n > 8: | |
| return [] | |
| all_layouts = [] | |
| max_rows = min(n, 5) # Maximum 5 rows | |
| def generate_compositions(total, num_parts, current=[]): | |
| """Generate all ordered compositions of 'total' into 'num_parts' positive integers""" | |
| if num_parts == 1: | |
| all_layouts.append(current + [total]) | |
| return | |
| for i in range(1, total - num_parts + 2): | |
| generate_compositions(total - i, num_parts - 1, current + [i]) | |
| for r in range(1, max_rows + 1): | |
| generate_compositions(n, r, []) | |
| # Filter: only keep layouts that have at least one row with exactly 1 genome | |
| # This ensures the query genome can be placed alone in its own row | |
| valid_layouts = [layout for layout in all_layouts if 1 in layout] | |
| return valid_layouts | |
| def layout_to_string(layout): | |
| """Convert layout array to string representation (e.g., [2,3,1] -> '2-3-1')""" | |
| return '-'.join(map(str, layout)) | |
| # ============================================================================ | |
| # API Routes | |
| # ============================================================================ | |
| def index(): | |
| """Serve the main application page""" | |
| return render_template('index.html') | |
| def api_genomes(): | |
| """Get list of available genomes""" | |
| genomes = [] | |
| for genome in AVAILABLE_GENOMES: | |
| # Format scientific name: arabidopsis_thaliana -> Arabidopsis thaliana | |
| parts = genome.split('_') | |
| scientific_name = parts[0].capitalize() + ' ' + ' '.join(parts[1:]) | |
| genomes.append({ | |
| 'id': genome, | |
| 'name': get_genome_display_name(genome), | |
| 'scientific_name': scientific_name | |
| }) | |
| return jsonify(genomes) | |
| def api_catalog_status(): | |
| """Get status of SQL metadata catalog system""" | |
| if not SQL_CATALOG_AVAILABLE: | |
| return jsonify({ | |
| 'available': False, | |
| 'message': 'SQL catalog helper not installed' | |
| }) | |
| try: | |
| from sql_catalog_helper import ( | |
| is_metadata_db_available, | |
| get_all_available_genomes, | |
| get_genome_catalog_stats | |
| ) | |
| genomes_with_catalogs = get_all_available_genomes() | |
| return jsonify({ | |
| 'available': True, | |
| 'metadata_db_available': is_metadata_db_available(), | |
| 'genomes_with_catalogs': len(genomes_with_catalogs), | |
| 'catalog_genomes': genomes_with_catalogs | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'available': False, | |
| 'error': str(e) | |
| }) | |
| def api_catalog_genome_stats(genome): | |
| """Get catalog statistics for a specific genome""" | |
| if not SQL_CATALOG_AVAILABLE: | |
| return jsonify({'available': False}) | |
| try: | |
| from sql_catalog_helper import get_genome_catalog_stats, is_catalog_available | |
| if not is_catalog_available(genome): | |
| return jsonify({ | |
| 'available': False, | |
| 'genome': genome, | |
| 'message': f'No catalog available for {genome}' | |
| }) | |
| stats = get_genome_catalog_stats(genome) | |
| if stats: | |
| stats['available'] = True | |
| return jsonify(stats) | |
| else: | |
| return jsonify({'available': False, 'genome': genome}) | |
| except Exception as e: | |
| return jsonify({'available': False, 'error': str(e)}) | |
| def api_catalog_prefilter(): | |
| """Pre-filter comparisons using catalog to show which have matches""" | |
| if not SQL_CATALOG_AVAILABLE: | |
| return jsonify({'available': False}) | |
| data = request.json | |
| query_genome = data.get('query_genome') | |
| genes = data.get('genes', []) | |
| comparisons = data.get('comparisons', []) | |
| if not query_genome or not genes: | |
| return jsonify({'success': False, 'error': 'Missing query_genome or genes'}) | |
| try: | |
| filtered, diagnostics = prefilter_search_comparisons( | |
| query_genome, genes, comparisons or [], min_genes_with_matches=1 | |
| ) | |
| # Also get per-gene target info | |
| gene_targets = get_target_genomes_for_genes(query_genome, genes) | |
| gene_target_counts = {g: len(targets) for g, targets in gene_targets.items()} | |
| return jsonify({ | |
| 'success': True, | |
| 'filtered_comparisons': filtered, | |
| 'diagnostics': diagnostics, | |
| 'gene_target_counts': gene_target_counts | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_annotation(genome, gene_id): | |
| """Get annotation for a specific gene""" | |
| annotation = get_gene_annotation(genome, gene_id) | |
| return jsonify({'annotation': annotation}) | |
| def api_layouts(n): | |
| """Get valid layout configurations for n comparison genomes""" | |
| layouts = generate_layouts(n) | |
| result = [] | |
| for layout in layouts: | |
| result.append({ | |
| 'layout': layout, | |
| 'name': layout_to_string(layout), | |
| 'rows': len(layout), | |
| 'total': sum(layout) | |
| }) | |
| return jsonify(result) | |
| def api_plot_usergenes(): | |
| """Generate microsynteny plot for user-specified genes""" | |
| start_time = time.time() | |
| data = request.json | |
| query_genome = data.get('query_genome') | |
| genes = data.get('genes', []) | |
| comparisons = data.get('comparisons', []) | |
| colors = data.get('colors', []) | |
| annotations = data.get('annotations', []) # User-provided annotations for legend | |
| layout = data.get('layout', None) # Layout array e.g., [2, 3, 1] | |
| genome_order = data.get('genome_order', None) # Full ordered list of genomes | |
| query_position = data.get('query_position', 0) # Index of query genome in order | |
| # Advanced tweaking parameters (optional) | |
| padding_config = data.get('padding_config', {}) # {genome: {left: bp, right: bp}} | |
| max_genes_config = data.get('max_genes_config', {}) # {genome: {left: count, right: count}} | |
| display_names = data.get('display_names', {}) # {genome: "Custom Name"} | |
| # Gene labels parameters (optional) | |
| gene_labels = data.get('gene_labels', []) # List of gene IDs to label on the plot | |
| gene_label_size = data.get('gene_label_size', 0) # Font size for labels (0=disabled, 2-8 recommended) | |
| # Determine if this is a plot with tweaks | |
| has_tweaks = bool(padding_config or max_genes_config or display_names or gene_labels) | |
| feature_type = FEATURE_PLOT_TWEAKS if has_tweaks else FEATURE_PLOT | |
| # Debug logging for tweaking parameters | |
| logger.debug(f"api_plot_usergenes received:") | |
| logger.debug(f" padding_config: {padding_config}") | |
| logger.debug(f" max_genes_config: {max_genes_config}") | |
| logger.debug(f" display_names: {display_names}") | |
| logger.debug(f" gene_labels: {gene_labels}") | |
| logger.debug(f" gene_label_size: {gene_label_size}") | |
| # Helper to record analytics and return response | |
| def _record_and_return(response, success=True): | |
| if ANALYTICS_AVAILABLE: | |
| duration_ms = int((time.time() - start_time) * 1000) | |
| record_event( | |
| feature_type=feature_type, | |
| query_genome=query_genome, | |
| status='success' if success else 'failure', | |
| duration_ms=duration_ms, | |
| request=request | |
| ) | |
| return response | |
| if not query_genome or not genes or not comparisons: | |
| return _record_and_return(jsonify({'success': False, 'error': 'Missing required parameters'}), success=False) | |
| # Validate and sanitize gene IDs (security measure) | |
| validated_genes, error_msg = sanitize_gene_ids(genes) | |
| if error_msg: | |
| return _record_and_return(jsonify({'success': False, 'error': error_msg}), success=False) | |
| genes = validated_genes | |
| # Also validate gene_labels if provided | |
| if gene_labels: | |
| validated_labels, label_error = sanitize_gene_ids(gene_labels) | |
| if label_error: | |
| return _record_and_return(jsonify({'success': False, 'error': f'Gene labels: {label_error}'}), success=False) | |
| gene_labels = validated_labels | |
| # Build command arguments | |
| script_path = os.path.join(SCRIPTS_DIR, 'plot_user_genes_microsynteny_v2.py') | |
| if not os.path.exists(script_path): | |
| return _record_and_return(jsonify({'success': False, 'error': f'Script not found: {script_path}'}), success=False) | |
| # Base args - using PYTHON_BIN for HuggingFace Spaces | |
| args = [PYTHON_BIN, script_path, '--query', query_genome, '--genes'] + genes + ['--comparisons'] + comparisons | |
| # Add colors if provided | |
| if colors: | |
| args = [PYTHON_BIN, script_path, '--colors', ','.join(colors), '--query', query_genome, '--genes'] + genes + ['--comparisons'] + comparisons | |
| # Add annotations if provided | |
| if annotations: | |
| # Encode annotations: join with ||| delimiter (unlikely to appear in annotation text) | |
| annotations_str = '|||'.join(str(a) for a in annotations) | |
| args.extend(['--annotations', annotations_str]) | |
| # Add layout and genome order if provided | |
| if layout and isinstance(layout, list) and len(layout) > 0: | |
| layout_str = ','.join(map(str, layout)) | |
| args.extend(['--layout', layout_str]) | |
| # Add genome order for proper placement | |
| if genome_order and isinstance(genome_order, list): | |
| order_str = ','.join(genome_order) | |
| args.extend(['--genome-order', order_str]) | |
| # Add advanced tweaking parameters | |
| # Padding configuration for query genome (asymmetric) | |
| if query_genome in padding_config and padding_config[query_genome]: | |
| query_pad = padding_config[query_genome] | |
| if 'left' in query_pad and query_pad['left'] is not None: | |
| args.extend(['--query-padding-left', str(int(query_pad['left']))]) | |
| if 'right' in query_pad and query_pad['right'] is not None: | |
| args.extend(['--query-padding-right', str(int(query_pad['right']))]) | |
| # Max genes configuration for query genome (asymmetric) | |
| if query_genome in max_genes_config and max_genes_config[query_genome]: | |
| query_genes = max_genes_config[query_genome] | |
| if 'left' in query_genes and query_genes['left'] is not None: | |
| args.extend(['--query-max-genes-left', str(int(query_genes['left']))]) | |
| if 'right' in query_genes and query_genes['right'] is not None: | |
| args.extend(['--query-max-genes-right', str(int(query_genes['right']))]) | |
| # Comparison genomes padding (format: genome:left:right|genome2:left:right) | |
| comp_padding_parts = [] | |
| for comp in comparisons: | |
| if comp in padding_config and padding_config[comp]: | |
| comp_pad = padding_config[comp] | |
| left_val = int(comp_pad.get('left', 1500000)) | |
| right_val = int(comp_pad.get('right', 1500000)) | |
| comp_padding_parts.append(f"{comp}:{left_val}:{right_val}") | |
| if comp_padding_parts: | |
| comp_padding_str = '|'.join(comp_padding_parts) | |
| args.extend(['--comp-padding-config', comp_padding_str]) | |
| # Comparison genomes max genes (format: genome:left:right|genome2:left:right) | |
| comp_genes_parts = [] | |
| for comp in comparisons: | |
| if comp in max_genes_config and max_genes_config[comp]: | |
| comp_genes = max_genes_config[comp] | |
| left_val = int(comp_genes.get('left', 50)) | |
| right_val = int(comp_genes.get('right', 50)) | |
| comp_genes_parts.append(f"{comp}:{left_val}:{right_val}") | |
| if comp_genes_parts: | |
| comp_genes_str = '|'.join(comp_genes_parts) | |
| args.extend(['--comp-max-genes-config', comp_genes_str]) | |
| # Custom display names (format: genome:CustomName|genome2:Name2) | |
| if display_names: | |
| display_parts = [] | |
| for genome, name in display_names.items(): | |
| if name and name.strip(): | |
| # Validate and truncate display name length | |
| truncated_name = name.strip()[:MAX_NAME_LENGTH] | |
| # Escape special characters in display name | |
| safe_name = truncated_name.replace('|', '_').replace(':', '_') | |
| display_parts.append(f"{genome}:{safe_name}") | |
| if display_parts: | |
| display_str = '|'.join(display_parts) | |
| args.extend(['--display-names', display_str]) | |
| # Gene labels (list of gene IDs to display labels for on the plot) | |
| if gene_labels and isinstance(gene_labels, list) and len(gene_labels) > 0: | |
| gene_labels_str = ','.join(str(g) for g in gene_labels if g) | |
| if gene_labels_str: | |
| args.extend(['--genelabels', gene_labels_str]) | |
| # Add label size (default to 8 if labels are provided but size not specified) | |
| label_size = int(gene_label_size) if gene_label_size else 8 | |
| if label_size > 0: | |
| args.extend(['--genelabelsize', str(label_size)]) | |
| # Keep low-confidence coloring option (optional - colors all syntenic matches) | |
| keep_lowconf_color = data.get('keep_lowconf_color', False) | |
| if keep_lowconf_color: | |
| args.extend(['--keep-lowconf-color']) | |
| # Debug: print final command | |
| logger.debug(f"Final command args: {' '.join(args)}") | |
| try: | |
| # Run the script | |
| env = os.environ.copy() | |
| env['PYTHON'] = PYTHON_BIN | |
| result = subprocess.run( | |
| args, | |
| capture_output=True, | |
| text=True, | |
| env=env, | |
| cwd=SCRIPTS_DIR | |
| ) | |
| if result.returncode == 0: | |
| # Find the output folder | |
| # Try multiple naming conventions for folder matching | |
| # The bash script uses short display names (e.g., "Goatgrass") for folder names | |
| comp_str = '_'.join(comparisons) | |
| # SHORT_DISPLAY_NAMES is imported from genome_config.py | |
| # Build list of possible prefixes to search for | |
| possible_prefixes = [] | |
| # First priority: custom display name if provided | |
| if display_names and query_genome in display_names: | |
| possible_prefixes.append(f"{display_names[query_genome]}_usergenes_{comp_str}_") | |
| # Second: short display name (used by bash script for folder creation) | |
| if query_genome in SHORT_DISPLAY_NAMES: | |
| possible_prefixes.append(f"{SHORT_DISPLAY_NAMES[query_genome]}_usergenes_{comp_str}_") | |
| # Third: genome ID (fallback when no display name is set) | |
| possible_prefixes.append(f"{query_genome}_usergenes_{comp_str}_") | |
| # Fourth: full display name from GENOME_DISPLAY_NAMES | |
| if query_genome in GENOME_DISPLAY_NAMES: | |
| possible_prefixes.append(f"{GENOME_DISPLAY_NAMES[query_genome]}_usergenes_{comp_str}_") | |
| logger.debug(f"Looking for output folder with prefixes: {possible_prefixes}") | |
| # Find most recent matching folder with exact comparison match | |
| output_folder = None | |
| if os.path.exists(OUTPUT_DIR): | |
| import re | |
| # Folders must match pattern_prefix followed by timestamp (digits) | |
| folders = [] | |
| for pattern_prefix in possible_prefixes: | |
| for f in os.listdir(OUTPUT_DIR): | |
| if f.startswith(pattern_prefix): | |
| # Check that after the prefix comes a timestamp (YYYYMMDD_HHMMSS) | |
| remainder = f[len(pattern_prefix):] | |
| if re.match(r'^\d{8}_\d{6}$', remainder): | |
| folders.append(f) | |
| if folders: | |
| folders.sort(reverse=True) | |
| output_folder = os.path.join(OUTPUT_DIR, folders[0]) | |
| if folders: | |
| folders.sort(reverse=True) | |
| output_folder = os.path.join(OUTPUT_DIR, folders[0]) | |
| if output_folder and os.path.exists(output_folder): | |
| # List generated files | |
| files = {} | |
| for fname in os.listdir(output_folder): | |
| if fname == 'microsynteny_plot.png': | |
| files['png'] = fname | |
| elif fname == 'microsynteny_plot.svg': | |
| files['svg'] = fname | |
| elif fname.endswith('.csv'): | |
| files['csv'] = fname | |
| return _record_and_return(jsonify({ | |
| 'success': True, | |
| 'message': 'Plot generated successfully', | |
| 'output_folder': os.path.basename(output_folder), | |
| 'files': files | |
| }), success=True) | |
| else: | |
| # Log details for debugging (not shown to user) | |
| if app.debug: | |
| logger.debug(f"Output folder not found. stdout: {result.stdout}") | |
| logger.debug(f"stderr: {result.stderr}") | |
| return _record_and_return(jsonify({ | |
| 'success': False, | |
| 'error': 'Output folder not found after plot generation' | |
| }), success=False) | |
| else: | |
| # Parse error message | |
| error_msg = 'Script failed' | |
| combined = result.stdout + '\n' + result.stderr | |
| if 'No syntenic matches' in combined: | |
| error_msg = 'No syntenic matches found for the provided genes' | |
| elif 'Gap check failed' in combined: | |
| error_msg = 'Input genes are too far apart (max 20 genes distance)' | |
| elif 'ERROR:' in combined: | |
| for line in combined.split('\n'): | |
| if 'ERROR:' in line: | |
| error_msg = line.split('ERROR:')[-1].strip() | |
| break | |
| # Log details for debugging (not shown to user) | |
| if app.debug: | |
| logger.debug(f"Plot script failed. stdout: {result.stdout}") | |
| logger.debug(f"stderr: {result.stderr}") | |
| return _record_and_return(jsonify({ | |
| 'success': False, | |
| 'error': error_msg | |
| }), success=False) | |
| except Exception as e: | |
| return _record_and_return(jsonify({'success': False, 'error': str(e)}), success=False) | |
| def api_download(folder, filename): | |
| """Download a generated file""" | |
| file_path = os.path.join(OUTPUT_DIR, folder, filename) | |
| if not os.path.exists(file_path): | |
| return jsonify({'error': 'File not found'}), 404 | |
| # For CSV files, use clean download names without timestamps | |
| download_name = filename | |
| if filename.endswith('.csv'): | |
| # gene_summary.csv should download as gene_summary.csv (not with timestamp) | |
| download_name = 'gene_summary.csv' | |
| return send_file(file_path, as_attachment=True, download_name=download_name) | |
| def api_image(folder, filename): | |
| """Serve an image file""" | |
| file_path = os.path.join(OUTPUT_DIR, folder, filename) | |
| if not os.path.exists(file_path): | |
| return jsonify({'error': 'File not found'}), 404 | |
| return send_file(file_path) | |
| def api_batch_match(): | |
| """Run batch match summary for user genes""" | |
| data = request.json | |
| query_genome = data.get('query_genome') | |
| genes = data.get('genes', []) | |
| comparisons = data.get('comparisons', []) | |
| if not query_genome or not genes or not comparisons: | |
| return jsonify({'success': False, 'error': 'Missing required parameters'}) | |
| script_path = os.path.join(SCRIPTS_DIR, 'count_usergene_matches.py') | |
| if not os.path.exists(script_path): | |
| return jsonify({'success': False, 'error': 'Script not found'}) | |
| try: | |
| # Using PYTHON_BIN for HuggingFace Spaces | |
| args = [PYTHON_BIN, script_path, query_genome, ','.join(comparisons), ','.join(genes)] | |
| result = subprocess.run(args, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| # Parse TSV output | |
| lines = result.stdout.strip().split('\n') | |
| if len(lines) > 1: | |
| headers = lines[0].split('\t') | |
| rows = [] | |
| for line in lines[1:]: | |
| values = line.split('\t') | |
| if len(values) == len(headers): | |
| rows.append(dict(zip(headers, values))) | |
| return jsonify({'success': True, 'data': rows}) | |
| else: | |
| return jsonify({'success': True, 'data': []}) | |
| else: | |
| return jsonify({'success': False, 'error': result.stderr}) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_search_hits(): | |
| """Search for high-confidence syntenic hits between query genes and comparison genomes""" | |
| start_time = time.time() | |
| data = request.json | |
| query_genome = data.get('query_genome') | |
| genes = data.get('genes', []) | |
| comparisons = data.get('comparisons', []) | |
| min_hits = data.get('min_hits', 1) | |
| use_catalog = data.get('use_catalog', True) # Enable catalog pre-filtering by default | |
| # Support both single required_gene (legacy) and multiple required_genes | |
| required_genes = data.get('required_genes', []) | |
| if not required_genes: | |
| # Fallback to legacy single required_gene | |
| single_gene = data.get('required_gene', '') | |
| if single_gene: | |
| required_genes = [single_gene] | |
| # Helper to record analytics and return response | |
| def _record_and_return(response, success=True): | |
| if ANALYTICS_AVAILABLE: | |
| duration_ms = int((time.time() - start_time) * 1000) | |
| record_event( | |
| feature_type=FEATURE_ADVANCED_SEARCH, | |
| query_genome=query_genome, | |
| status='success' if success else 'failure', | |
| duration_ms=duration_ms, | |
| request=request | |
| ) | |
| return response | |
| if not query_genome or not genes or not comparisons: | |
| return _record_and_return(jsonify({'success': False, 'error': 'Missing required parameters'}), success=False) | |
| # Validate and sanitize gene IDs (security measure) | |
| validated_genes, error_msg = sanitize_gene_ids(genes) | |
| if error_msg: | |
| return _record_and_return(jsonify({'success': False, 'error': error_msg}), success=False) | |
| genes = validated_genes | |
| # Also validate required_genes if provided | |
| if required_genes: | |
| validated_required, req_error = sanitize_gene_ids(required_genes) | |
| if req_error: | |
| return _record_and_return(jsonify({'success': False, 'error': f'Required genes: {req_error}'}), success=False) | |
| required_genes = validated_required | |
| script_path = os.path.join(SCRIPTS_DIR, 'search_synteny_hits.py') | |
| if not os.path.exists(script_path): | |
| return _record_and_return(jsonify({'success': False, 'error': 'Search script not found'}), success=False) | |
| # Pre-filter comparisons using SQL catalog if available | |
| catalog_diagnostics = None | |
| filtered_comparisons = comparisons | |
| if use_catalog and SQL_CATALOG_AVAILABLE and is_catalog_available(query_genome): | |
| filtered_comparisons, catalog_diagnostics = prefilter_search_comparisons( | |
| query_genome, genes, comparisons, min_genes_with_matches=1 | |
| ) | |
| # If no comparisons have matches, return early | |
| if not filtered_comparisons: | |
| return _record_and_return(jsonify({ | |
| 'success': True, | |
| 'data': { | |
| 'results': [], | |
| 'total_matches': 0, | |
| 'filter_message': 'No comparisons have matches for the specified genes', | |
| 'catalog_prefilter': catalog_diagnostics | |
| } | |
| }), success=True) | |
| try: | |
| # Build command arguments - using PYTHON_BIN for HuggingFace Spaces | |
| args = [ | |
| PYTHON_BIN, script_path, | |
| '--query', query_genome, | |
| '--genes'] + genes + [ | |
| '--comparisons'] + filtered_comparisons + [ | |
| '--min-hits', str(min_hits), | |
| '--format', 'json' | |
| ] | |
| # Add required genes if specified (multiple) | |
| if required_genes: | |
| args.extend(['--required-genes'] + required_genes) | |
| # Set up environment (custom genomes use same main Mcscan_results folder) | |
| env = os.environ.copy() | |
| result = subprocess.run(args, capture_output=True, text=True, env=env) | |
| # Check for NO_RESULTS marker | |
| if 'NO_RESULTS' in result.stdout: | |
| # Extract filter failure reason from stderr | |
| filter_msg = '' | |
| for line in result.stderr.split('\n'): | |
| if 'FILTER_FAILED' in line: | |
| filter_msg = line.replace('FILTER_FAILED:', '').strip() | |
| break | |
| response_data = { | |
| 'results': [], | |
| 'total_matches': 0, | |
| 'filter_message': filter_msg | |
| } | |
| if catalog_diagnostics: | |
| response_data['catalog_prefilter'] = catalog_diagnostics | |
| return _record_and_return(jsonify({'success': True, 'data': response_data}), success=True) | |
| if result.returncode == 0: | |
| try: | |
| output = result.stdout.strip() | |
| # Parse JSON output | |
| response_data = json.loads(output) | |
| # Add catalog diagnostics to response | |
| if catalog_diagnostics: | |
| response_data['catalog_prefilter'] = catalog_diagnostics | |
| return _record_and_return(jsonify({'success': True, 'data': response_data}), success=True) | |
| except json.JSONDecodeError as e: | |
| return _record_and_return(jsonify({'success': False, 'error': f'Failed to parse results: {str(e)}'}), success=False) | |
| else: | |
| return _record_and_return(jsonify({'success': False, 'error': result.stderr}), success=False) | |
| except Exception as e: | |
| return _record_and_return(jsonify({'success': False, 'error': str(e)}), success=False) | |
| # ============================================================================ | |
| # Discovery API Routes | |
| # ============================================================================ | |
| # Temporary storage for custom annotations during discovery sessions | |
| _discovery_annotations = {} | |
| def api_discovery_annotations(genome): | |
| """Get unique annotation terms for dropdown in Discovery page""" | |
| annotations = load_genome_annotations(genome) | |
| if not annotations: | |
| return jsonify({'success': False, 'error': f'No annotations found for {genome}', 'terms': []}) | |
| # Count genes with actual annotations (non-empty descriptions) | |
| annotated_genes = sum(1 for desc in annotations.values() if desc and desc.strip()) | |
| # Try to get total gene count from BED file | |
| total_genes_in_genome = len(annotations) # Default to annotations count | |
| bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome}.bed') | |
| if os.path.exists(bed_file): | |
| try: | |
| with open(bed_file, 'r') as f: | |
| total_genes_in_genome = sum(1 for line in f if line.strip() and not line.startswith('#')) | |
| except: | |
| pass | |
| # Extract meaningful keywords from annotations | |
| skip_words = { | |
| 'of', 'the', 'a', 'an', 'and', 'or', 'in', 'to', 'for', 'with', 'by', 'on', 'at', 'from', | |
| 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', | |
| 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', | |
| 'not', 'no', 'nor', 'but', 'so', 'if', 'when', 'where', 'how', 'what', 'which', 'who', 'whom', | |
| 'this', 'that', 'these', 'those', 'it', 'its', 'as', 'than', 'such', 'like', | |
| 'protein', 'gene', 'family', 'domain', 'related', 'similar', 'homolog', 'putative', | |
| 'unnamed', 'unknown', 'hypothetical', 'uncharacterized', 'predicted', | |
| '-', '//', '/', '|', 'pf', 'sf', 'pthr', 'kog', 'subfamily', 'superfamily' | |
| } | |
| term_counts = {} | |
| for gene_id, description in annotations.items(): | |
| if not description: | |
| continue | |
| import re | |
| clean_desc = re.sub(r'^\([^)]+\)\s*', '', description) | |
| clean_desc = re.sub(r'\b[A-Z]{2,}\d+(?::\w+)?\b', '', clean_desc) | |
| words = re.split(r'[\s\-_/\[\](),;:]+', clean_desc) | |
| for word in words: | |
| word = word.strip().lower() | |
| if len(word) >= 4 and not word.isdigit() and word not in skip_words: | |
| if word[0].isdigit(): | |
| continue | |
| if word not in term_counts: | |
| term_counts[word] = 0 | |
| term_counts[word] += 1 | |
| sorted_terms = sorted(term_counts.items(), key=lambda x: (-x[1], x[0])) | |
| terms = [{'term': term.capitalize(), 'count': count} for term, count in sorted_terms[:500]] | |
| return jsonify({ | |
| 'success': True, | |
| 'genome': genome, | |
| 'total_genes': total_genes_in_genome, | |
| 'annotated_genes': annotated_genes, | |
| 'unique_terms': len(term_counts), | |
| 'terms': terms | |
| }) | |
| def api_discovery_upload_annotations(): | |
| """Upload custom annotations for discovery search""" | |
| # Get genome ID from form data for validation | |
| genome_id = request.form.get('genome', '').strip() | |
| if 'file' in request.files: | |
| # File upload | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'success': False, 'error': 'No file selected'}) | |
| # Validate file size before saving | |
| file.seek(0, 2) # Seek to end | |
| file_size = file.tell() | |
| file.seek(0) # Reset to beginning | |
| if file_size > MAX_ANNOTATION_SIZE: | |
| max_mb = MAX_ANNOTATION_SIZE // (1024 * 1024) | |
| return jsonify({'success': False, 'error': f'Annotation file exceeds {max_mb} MB limit (uploaded: {file_size // (1024*1024)} MB)'}) | |
| # Save to temp location | |
| session_id = str(uuid.uuid4()) | |
| temp_file = os.path.join(tempfile.gettempdir(), f'discovery_annotations_{session_id}.tsv') | |
| file.save(temp_file) | |
| # Parse and validate - auto-detect delimiter | |
| annotations = {} | |
| try: | |
| with open(temp_file, 'r') as f: | |
| # Read first line to detect delimiter | |
| first_line = f.readline() | |
| f.seek(0) # Reset to beginning | |
| # Auto-detect delimiter: prefer tab, then comma | |
| if '\t' in first_line: | |
| delimiter = '\t' | |
| elif ',' in first_line: | |
| delimiter = ',' | |
| else: | |
| delimiter = '\t' # Default to tab | |
| reader = csv.reader(f, delimiter=delimiter) | |
| for row in reader: | |
| if len(row) >= 2: | |
| gene_id = row[0].strip() | |
| description = row[1].strip() if len(row) > 1 else '' | |
| # Skip header rows (check if first column looks like a header) | |
| if gene_id and gene_id.lower() not in ['gene', 'gene_id', 'geneid', 'id', 'name']: | |
| annotations[gene_id] = description | |
| except Exception as e: | |
| os.remove(temp_file) | |
| return jsonify({'success': False, 'error': f'Failed to parse file: {str(e)}'}) | |
| if not annotations: | |
| os.remove(temp_file) | |
| return jsonify({'success': False, 'error': 'No valid annotations found in file'}) | |
| # Validate gene overlap with selected genome (if genome provided) | |
| matched_genes = 0 | |
| total_genome_genes = 0 | |
| if genome_id: | |
| bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_id}.bed') | |
| if os.path.exists(bed_file): | |
| genome_genes = set() | |
| try: | |
| with open(bed_file, 'r') as f: | |
| for line in f: | |
| if line.strip() and not line.startswith('#'): | |
| parts = line.strip().split('\t') | |
| if len(parts) >= 4: | |
| genome_genes.add(parts[3]) # Gene ID is column 4 | |
| total_genome_genes = len(genome_genes) | |
| matched_genes = len(set(annotations.keys()) & genome_genes) | |
| match_percentage = (matched_genes / len(annotations) * 100) if annotations else 0 | |
| # Require at least 20% of uploaded genes to match the genome | |
| if match_percentage < 20: | |
| os.remove(temp_file) | |
| logger.warning(f"Annotation upload rejected: Only {matched_genes}/{len(annotations)} genes ({match_percentage:.1f}%) matched {genome_id}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Annotation file does not match the selected genome. ' | |
| f'Only {matched_genes} of {len(annotations)} genes ' | |
| f'({match_percentage:.1f}%) were found in {genome_id}. ' | |
| f'At least 20% must match.' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error reading BED file for validation: {e}") | |
| # Count genes with actual annotations (non-empty descriptions) | |
| annotated_genes = sum(1 for desc in annotations.values() if desc and desc.strip()) | |
| # Store the annotation file path for this session | |
| _discovery_annotations[session_id] = { | |
| 'file_path': temp_file, | |
| 'gene_count': len(annotations), | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # Extract keyword terms (same logic as database genomes) | |
| skip_words = { | |
| 'of', 'the', 'a', 'an', 'and', 'or', 'in', 'to', 'for', 'with', 'by', 'on', 'at', 'from', | |
| 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', | |
| 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', | |
| 'not', 'no', 'nor', 'but', 'so', 'if', 'when', 'where', 'how', 'what', 'which', 'who', 'whom', | |
| 'this', 'that', 'these', 'those', 'it', 'its', 'as', 'than', 'such', 'like', | |
| 'protein', 'gene', 'family', 'domain', 'related', 'similar', 'homolog', 'putative', | |
| 'unnamed', 'unknown', 'hypothetical', 'uncharacterized', 'predicted', | |
| '-', '//', '/', '|', 'pf', 'sf', 'pthr', 'kog', 'subfamily', 'superfamily' | |
| } | |
| term_counts = {} | |
| for gene_id, description in annotations.items(): | |
| if not description: | |
| continue | |
| import re | |
| clean_desc = re.sub(r'^\([^)]+\)\s*', '', description) | |
| clean_desc = re.sub(r'\b[A-Z]{2,}\d+(?::\w+)?\b', '', clean_desc) | |
| words = re.split(r'[\s\-_/\[\](),;:]+', clean_desc) | |
| for word in words: | |
| word = word.strip().lower() | |
| if len(word) >= 4 and not word.isdigit() and word not in skip_words: | |
| if word[0].isdigit(): | |
| continue | |
| if word not in term_counts: | |
| term_counts[word] = 0 | |
| term_counts[word] += 1 | |
| sorted_terms = sorted(term_counts.items(), key=lambda x: (-x[1], x[0])) | |
| terms = [{'term': term.capitalize(), 'count': count} for term, count in sorted_terms[:500]] | |
| # Also extract full annotations for Annotation tab | |
| annotation_counts = {} | |
| for gene_id, description in annotations.items(): | |
| if description and description.strip(): | |
| display_desc = description[:200] + '...' if len(description) > 200 else description | |
| if display_desc not in annotation_counts: | |
| annotation_counts[display_desc] = 0 | |
| annotation_counts[display_desc] += 1 | |
| sorted_annotations = sorted(annotation_counts.items(), key=lambda x: -x[1]) | |
| full_annotations = [{'annotation': ann, 'count': count} for ann, count in sorted_annotations[:1000]] | |
| # Also get gene names for Paralogous tab (no limit - users need access to all genes) | |
| gene_names = [{'gene': gene_id, 'annotation': annotations.get(gene_id, '')[:100]} | |
| for gene_id in sorted(annotations.keys())] | |
| return jsonify({ | |
| 'success': True, | |
| 'session_id': session_id, | |
| 'gene_count': len(annotations), | |
| 'annotated_genes': annotated_genes, | |
| 'matched_genes': matched_genes, | |
| 'total_genome_genes': total_genome_genes, | |
| 'unique_terms': len(term_counts), | |
| 'terms': terms, | |
| 'annotations': full_annotations, | |
| 'genes': gene_names | |
| }) | |
| return jsonify({'success': False, 'error': 'No file provided'}) | |
| def api_discovery_search(): | |
| """Run discovery search to find syntenic blocks with specific annotations""" | |
| start_time = time.time() | |
| data = request.json | |
| query_genome = data.get('query_genome') | |
| comparisons = data.get('comparisons', []) | |
| groups = data.get('groups', []) # New format: [{terms: [...], minMatch: N}, ...] | |
| required_groups = data.get('required_groups', []) # Legacy: List of lists of terms | |
| required_terms = data.get('required_terms', []) # Legacy: flat list of terms | |
| optional_terms = data.get('optional_terms', []) # Optional terms to include in results | |
| match_mode = data.get('match_mode', 'all') # 'all' (must match all genomes) or 'any' (match any genome) | |
| search_type = data.get('search_type', 'term') # 'term', 'annotation', or 'gene' | |
| annotation_session_id = data.get('annotation_session_id') # For custom annotations | |
| # Helper to record analytics and return response | |
| def _record_and_return(response, success=True): | |
| if ANALYTICS_AVAILABLE: | |
| duration_ms = int((time.time() - start_time) * 1000) | |
| record_event( | |
| feature_type=FEATURE_DISCOVERY, | |
| query_genome=query_genome, | |
| status='success' if success else 'failure', | |
| duration_ms=duration_ms, | |
| request=request | |
| ) | |
| return response | |
| if not query_genome: | |
| return _record_and_return(jsonify({'success': False, 'error': 'Query genome is required'}), success=False) | |
| if not comparisons: | |
| return _record_and_return(jsonify({'success': False, 'error': 'At least one comparison genome is required'}), success=False) | |
| # Convert new groups format to required_groups if provided | |
| if groups and not required_groups: | |
| required_groups = groups # Keep the new format with minMatch | |
| # Convert legacy required_terms to required_groups (each term becomes its own group) | |
| if required_terms and not required_groups: | |
| required_groups = [{'terms': [term], 'minMatch': 1} for term in required_terms] | |
| # Ensure backward compatibility - convert old [[...]] format to new format | |
| if required_groups and isinstance(required_groups[0], list): | |
| required_groups = [{'terms': group, 'minMatch': 1} for group in required_groups] | |
| if not required_groups: | |
| return _record_and_return(jsonify({'success': False, 'error': 'At least one search term is required'}), success=False) | |
| script_path = os.path.join(SCRIPTS_DIR, 'discovery_search.py') | |
| if not os.path.exists(script_path): | |
| return _record_and_return(jsonify({'success': False, 'error': 'Discovery search script not found'}), success=False) | |
| try: | |
| # Build command - using PYTHON_BIN for HuggingFace Spaces | |
| args = [ | |
| PYTHON_BIN, script_path, | |
| '--query', query_genome, | |
| '--comparisons'] + comparisons + [ | |
| '--search-type', search_type, # Pass search type to script | |
| '--match-mode', match_mode, # 'all' or 'any' | |
| '--format', 'json' | |
| ] | |
| # Pass required_groups as JSON (new format with terms and minMatch) | |
| if required_groups: | |
| args.extend(['--required-groups', json.dumps(required_groups)]) | |
| # Pass optional terms as JSON | |
| if optional_terms: | |
| args.extend(['--optional-terms', json.dumps(optional_terms)]) | |
| # Add custom annotation file if provided | |
| if annotation_session_id and annotation_session_id in _discovery_annotations: | |
| annotation_file = _discovery_annotations[annotation_session_id]['file_path'] | |
| if os.path.exists(annotation_file): | |
| args.extend(['--annotation-file', annotation_file]) | |
| result = subprocess.run(args, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| try: | |
| output = result.stdout.strip() | |
| results = json.loads(output) | |
| return _record_and_return(jsonify({'success': True, 'data': results}), success=True) | |
| except json.JSONDecodeError as e: | |
| return _record_and_return(jsonify({'success': False, 'error': f'Failed to parse results: {str(e)}', 'stdout': result.stdout, 'stderr': result.stderr}), success=False) | |
| else: | |
| return _record_and_return(jsonify({'success': False, 'error': result.stderr or 'Search failed', 'stdout': result.stdout}), success=False) | |
| except Exception as e: | |
| return _record_and_return(jsonify({'success': False, 'error': str(e)}), success=False) | |
| def api_discovery_check_genome_annotations(genome): | |
| """Check if a genome has annotations available""" | |
| # Check database annotations | |
| annotation_file = os.path.join(ANNOTATIONS_DIR, genome, 'gene_annotation.tsv') | |
| has_db_annotations = os.path.exists(annotation_file) | |
| gene_count = 0 | |
| if has_db_annotations: | |
| annotations = load_genome_annotations(genome) | |
| gene_count = len(annotations) | |
| # Check if this is a custom genome | |
| is_custom = genome not in AVAILABLE_GENOMES | |
| return jsonify({ | |
| 'success': True, | |
| 'genome': genome, | |
| 'has_database_annotations': has_db_annotations, | |
| 'is_custom_genome': is_custom, | |
| 'gene_count': gene_count, | |
| 'requires_custom_annotations': is_custom and not has_db_annotations | |
| }) | |
| def api_discovery_full_annotations(genome): | |
| """Get full annotation descriptions for Annotation Search tab""" | |
| annotations = load_genome_annotations(genome) | |
| if not annotations: | |
| return jsonify({'success': False, 'error': f'No annotations found for {genome}', 'annotations': []}) | |
| # Group by unique annotation descriptions and count occurrences | |
| annotation_counts = {} | |
| for gene_id, description in annotations.items(): | |
| if description and description.strip(): | |
| # Truncate very long annotations for display | |
| display_desc = description[:200] + '...' if len(description) > 200 else description | |
| if display_desc not in annotation_counts: | |
| annotation_counts[display_desc] = 0 | |
| annotation_counts[display_desc] += 1 | |
| # Sort by count (most common first) | |
| sorted_annotations = sorted(annotation_counts.items(), key=lambda x: -x[1]) | |
| result = [{'annotation': ann, 'count': count} for ann, count in sorted_annotations[:1000]] | |
| return jsonify({ | |
| 'success': True, | |
| 'genome': genome, | |
| 'total_annotations': len(annotation_counts), | |
| 'annotations': result | |
| }) | |
| def api_discovery_gene_names(genome): | |
| """Get gene names for Paralogous Search tab""" | |
| # Load from BED file | |
| bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome}.bed') | |
| genes = [] | |
| if os.path.exists(bed_file): | |
| try: | |
| # Also load annotations to show alongside gene names | |
| annotations = load_genome_annotations(genome) | |
| with open(bed_file, 'r') as f: | |
| for line in f: | |
| if line.strip() and not line.startswith('#'): | |
| parts = line.strip().split('\t') | |
| if len(parts) >= 4: | |
| gene_id = parts[3] # Gene ID is in 4th column | |
| annotation = annotations.get(gene_id, '') | |
| genes.append({ | |
| 'gene': gene_id, | |
| 'annotation': annotation[:100] if annotation else '' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error loading BED file for {genome}: {e}") | |
| if not genes: | |
| return jsonify({'success': False, 'error': f'No genes found for {genome}', 'genes': []}) | |
| return jsonify({ | |
| 'success': True, | |
| 'genome': genome, | |
| 'total_genes': len(genes), | |
| 'genes': genes | |
| }) | |
| # ============================================================================ | |
| # Custom Genome API Routes | |
| # ============================================================================ | |
| def allowed_file(filename): | |
| """Check if file has allowed extension""" | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def generate_private_run_key(project_name: str) -> str: | |
| """Generate a private run key: [project_name]_[5 random chars with at least one symbol] | |
| This makes private keys more secure by adding randomness with special characters, | |
| so the project name alone is not enough to access the results. | |
| """ | |
| # URL-safe symbols that work well in file paths | |
| symbols = '!@#$' | |
| alphanumeric = string.ascii_letters + string.digits | |
| # Generate 5 characters: at least one symbol, rest alphanumeric | |
| suffix_chars = [] | |
| # Add one guaranteed symbol at a random position | |
| symbol_pos = random.randint(0, 4) | |
| for i in range(5): | |
| if i == symbol_pos: | |
| suffix_chars.append(random.choice(symbols)) | |
| else: | |
| suffix_chars.append(random.choice(alphanumeric)) | |
| suffix = ''.join(suffix_chars) | |
| return f"{project_name}_{suffix}" | |
| def run_mcscan_background(run_key, gff3_path, pep_path, genomes, display_name=None, bed_path=None, visibility='public', | |
| cscore=None, min_anchor=None, gap_length=None): | |
| """Run MCscan processing in background thread with queue management | |
| Args: | |
| run_key: Unique run identifier | |
| gff3_path: Path to GFF3 file (None for sequences-based uploads) | |
| pep_path: Path to protein sequences file | |
| genomes: List of comparison genomes | |
| display_name: User-friendly genome name | |
| bed_path: Path to BED file (for sequences-based uploads) | |
| visibility: 'public' or 'private' - controls listing visibility | |
| cscore: C-score threshold (default: 0.99) | |
| min_anchor: Minimum number of gene anchors (default: 4) | |
| gap_length: Maximum gap length between genes (default: 20) | |
| """ | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| status_file = os.path.join(meta_dir, 'job_status.json') | |
| def update_status(status, progress, message, **extra): | |
| """Helper to update job status file""" | |
| os.makedirs(meta_dir, exist_ok=True) | |
| data = { | |
| 'status': status, | |
| 'progress': progress, | |
| 'message': message, | |
| 'run_key': run_key, | |
| 'last_updated': datetime.now().isoformat() | |
| } | |
| data.update(extra) | |
| with open(status_file, 'w') as f: | |
| json.dump(data, f) | |
| try: | |
| # Wait for job slot (queue system) | |
| queue_pos = get_queue_position(run_key) | |
| if queue_pos > 0: | |
| update_status('queued', 0, f'Waiting in queue (position {queue_pos})...', queue_position=queue_pos) | |
| # Block until we get a slot (only 1 MCscan job at a time) | |
| acquire_job_slot(run_key) | |
| update_status('running', 5, 'Starting MCscan analysis...') | |
| script_path = os.path.join(SCRIPTS_DIR, 'process_custom_genome.py') | |
| genomes_str = ','.join(genomes) | |
| # Using PYTHON_BIN for HuggingFace Spaces | |
| cmd = [ | |
| PYTHON_BIN, script_path, | |
| '--run-key', run_key, | |
| '--pep', pep_path, | |
| '--genomes', genomes_str, | |
| '--meta-dir', meta_dir, | |
| '--visibility', visibility | |
| ] | |
| # Add input source (either GFF3 or BED) | |
| if bed_path and os.path.exists(bed_path): | |
| # Sequences-based upload: use BED directly | |
| cmd.extend(['--bed', bed_path]) | |
| elif gff3_path and os.path.exists(gff3_path): | |
| # GFF3-based upload | |
| cmd.extend(['--gff3', gff3_path]) | |
| else: | |
| raise ValueError("Neither GFF3 nor BED input file found") | |
| # Add display name if provided | |
| if display_name: | |
| cmd.extend(['--display-name', display_name]) | |
| # Add MCscan parameters if provided (user overrides) | |
| if cscore is not None: | |
| cmd.extend(['--cscore', str(cscore)]) | |
| if min_anchor is not None: | |
| cmd.extend(['--min-anchor', str(min_anchor)]) | |
| if gap_length is not None: | |
| cmd.extend(['--gap-length', str(gap_length)]) | |
| # Run in background, output to log file in metadata directory | |
| os.makedirs(meta_dir, exist_ok=True) | |
| log_file = os.path.join(meta_dir, 'process.log') | |
| with open(log_file, 'w') as log: | |
| try: | |
| # Run with timeout (5 hours max) | |
| subprocess.run(cmd, stdout=log, stderr=subprocess.STDOUT, cwd=SCRIPTS_DIR, | |
| timeout=MCSCAN_JOB_TIMEOUT_SECONDS) | |
| except subprocess.TimeoutExpired: | |
| update_status('failed', 0, f'Job timed out after {MCSCAN_JOB_TIMEOUT_SECONDS // 3600} hours. The analysis was too complex.') | |
| return | |
| except Exception as e: | |
| # Update status file with error | |
| update_status('failed', 0, f'Processing error: {str(e)}') | |
| finally: | |
| # Always release the job slot | |
| release_job_slot(run_key) | |
| # Clean up temp directory - files are no longer needed after processing | |
| temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key) | |
| if os.path.exists(temp_dir): | |
| try: | |
| shutil.rmtree(temp_dir) | |
| logger.info(f"Cleanup: Deleted temp directory for {run_key}") | |
| except Exception as e: | |
| logger.warning(f"Cleanup: Failed to delete temp directory {run_key}: {e}") | |
| # Record analytics based on actual job outcome | |
| if ANALYTICS_AVAILABLE: | |
| try: | |
| final_status = 'failure' # Default to failure | |
| if os.path.exists(status_file): | |
| with open(status_file, 'r') as f: | |
| status_data = json.load(f) | |
| if status_data.get('status') == 'completed': | |
| final_status = 'success' | |
| record_event( | |
| feature_type=FEATURE_CUSTOM_GENOME, | |
| query_genome=display_name or run_key, | |
| status=final_status, | |
| duration_ms=0, # Duration not tracked for background jobs | |
| extra_data={'genomes_count': len(genomes)} | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Analytics: Failed to record event for {run_key}: {e}") | |
| def api_custom_upload(): | |
| """Upload and validate custom genome files""" | |
| try: | |
| # Check if files are present | |
| if 'gff3' not in request.files or 'pep' not in request.files: | |
| return jsonify({'success': False, 'error': 'Both GFF3 and PEP files are required'}) | |
| gff3_file = request.files['gff3'] | |
| pep_file = request.files['pep'] | |
| run_name = request.form.get('run_name', '').strip() | |
| display_name = request.form.get('display_name', '').strip() # User-friendly genome name | |
| visibility = request.form.get('visibility', 'public').strip() # 'public' or 'private' | |
| # Validate visibility | |
| if visibility not in ('public', 'private'): | |
| visibility = 'public' | |
| # Run name is now required | |
| if not run_name: | |
| return jsonify({'success': False, 'error': 'Run Name is required'}) | |
| # Validate name lengths | |
| valid, error = validate_name_length(run_name, 'Run Name') | |
| if not valid: | |
| return jsonify({'success': False, 'error': error}) | |
| valid, error = validate_name_length(display_name, 'Display Name') | |
| if not valid: | |
| return jsonify({'success': False, 'error': error}) | |
| # Sanitize run name to create base key | |
| base_key = secure_filename(run_name.replace(' ', '_').lower()) | |
| if not base_key: | |
| return jsonify({'success': False, 'error': 'Run Name contains invalid characters'}) | |
| # For private runs, always generate a unique key with random suffix including symbol | |
| if visibility == 'private': | |
| run_key = generate_private_run_key(base_key) | |
| else: | |
| run_key = base_key | |
| # Check if run key already exists (check both temp and meta dirs) | |
| temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key) | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| if os.path.exists(temp_dir) or os.path.exists(meta_dir): | |
| # Append random suffix | |
| run_key = run_key + '_' + uuid.uuid4().hex[:4] | |
| temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| # Validate and save files | |
| if gff3_file.filename == '' or pep_file.filename == '': | |
| return jsonify({'success': False, 'error': 'No files selected'}) | |
| # Validate file extensions | |
| if not allowed_file(gff3_file.filename): | |
| return jsonify({'success': False, 'error': f'Invalid GFF3 file extension. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'}) | |
| if not allowed_file(pep_file.filename): | |
| return jsonify({'success': False, 'error': f'Invalid PEP file extension. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'}) | |
| # Save GFF3 | |
| gff3_filename = secure_filename(gff3_file.filename) | |
| gff3_path = os.path.join(temp_dir, 'input.gff3') | |
| gff3_file.save(gff3_path) | |
| # Save PEP | |
| pep_filename = secure_filename(pep_file.filename) | |
| pep_path = os.path.join(temp_dir, 'input.pep') | |
| pep_file.save(pep_path) | |
| # Basic validation | |
| validation_errors = [] | |
| # Check GFF3 has content and size | |
| gff3_size = os.path.getsize(gff3_path) | |
| if gff3_size == 0: | |
| validation_errors.append('GFF3 file is empty') | |
| elif gff3_size > MAX_GFF3_SIZE: | |
| max_mb = MAX_GFF3_SIZE // (1024 * 1024) | |
| validation_errors.append(f'GFF3 file exceeds {max_mb} MB limit (uploaded: {gff3_size // (1024*1024)} MB)') | |
| # Check PEP has content, size, and FASTA format | |
| pep_size = os.path.getsize(pep_path) | |
| if pep_size == 0: | |
| validation_errors.append('PEP file is empty') | |
| elif pep_size > MAX_PEP_SIZE: | |
| max_mb = MAX_PEP_SIZE // (1024 * 1024) | |
| validation_errors.append(f'PEP file exceeds {max_mb} MB limit (uploaded: {pep_size // (1024*1024)} MB)') | |
| else: | |
| # Check if PEP looks like FASTA | |
| with open(pep_path, 'r') as f: | |
| first_line = f.readline().strip() | |
| if not first_line.startswith('>'): | |
| validation_errors.append('PEP file does not appear to be FASTA format (should start with >)') | |
| if validation_errors: | |
| # Cleanup on validation failure | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return jsonify({'success': False, 'error': '; '.join(validation_errors)}) | |
| # Count genes in GFF3 and proteins in PEP | |
| gene_count = 0 | |
| with open(gff3_path, 'r') as f: | |
| for line in f: | |
| if not line.startswith('#') and '\tgene\t' in line: | |
| gene_count += 1 | |
| protein_count = 0 | |
| with open(pep_path, 'r') as f: | |
| for line in f: | |
| if line.startswith('>'): | |
| protein_count += 1 | |
| return jsonify({ | |
| 'success': True, | |
| 'run_key': run_key, | |
| 'visibility': visibility, | |
| 'gff3_size': gff3_size, | |
| 'pep_size': pep_size, | |
| 'gene_count': gene_count, | |
| 'protein_count': protein_count, | |
| 'message': f'Files uploaded successfully. Found ~{gene_count} genes and {protein_count} proteins.' | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_custom_upload_sequences(): | |
| """Upload BED content and protein sequences for custom genome""" | |
| try: | |
| # Get BED content as text (not file upload) | |
| bed_content = request.form.get('bed_content', '').strip() | |
| sequences_text = request.form.get('sequences', '').strip() | |
| run_name = request.form.get('run_name', '').strip() | |
| display_name = request.form.get('display_name', '').strip() | |
| visibility = request.form.get('visibility', 'public').strip() # 'public' or 'private' | |
| # Validate visibility | |
| if visibility not in ('public', 'private'): | |
| visibility = 'public' | |
| if not bed_content: | |
| return jsonify({'success': False, 'error': 'BED content is required'}) | |
| # Validate BED content size | |
| bed_size = len(bed_content.encode('utf-8')) | |
| if bed_size > MAX_BED_SIZE: | |
| max_mb = MAX_BED_SIZE // (1024 * 1024) | |
| return jsonify({'success': False, 'error': f'BED content exceeds {max_mb} MB limit (uploaded: {bed_size // (1024*1024)} MB)'}) | |
| if not sequences_text: | |
| return jsonify({'success': False, 'error': 'Protein sequences are required'}) | |
| if not display_name: | |
| return jsonify({'success': False, 'error': 'Genome display name is required'}) | |
| # Run name is now required | |
| if not run_name: | |
| return jsonify({'success': False, 'error': 'Run Name is required'}) | |
| # Validate name lengths | |
| valid, error = validate_name_length(run_name, 'Run Name') | |
| if not valid: | |
| return jsonify({'success': False, 'error': error}) | |
| valid, error = validate_name_length(display_name, 'Display Name') | |
| if not valid: | |
| return jsonify({'success': False, 'error': error}) | |
| # Sanitize run name to create base key | |
| base_key = secure_filename(run_name.replace(' ', '_').lower()) | |
| if not base_key: | |
| return jsonify({'success': False, 'error': 'Run Name contains invalid characters'}) | |
| # For private runs, always generate a unique key with random suffix including symbol | |
| if visibility == 'private': | |
| run_key = generate_private_run_key(base_key) | |
| else: | |
| run_key = base_key | |
| # Check if run key already exists | |
| temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key) | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| if os.path.exists(temp_dir) or os.path.exists(meta_dir): | |
| run_key = run_key + '_' + uuid.uuid4().hex[:4] | |
| temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key) | |
| os.makedirs(temp_dir, exist_ok=True) | |
| # Parse BED content with header detection | |
| validation_errors = [] | |
| gene_names = [] | |
| bed_lines = [] | |
| raw_lines = [line.strip() for line in bed_content.split('\n') if line.strip() and not line.startswith('#')] | |
| if len(raw_lines) == 0: | |
| validation_errors.append('BED content is empty') | |
| else: | |
| # Detect header row: if column 5 is not "0" or column 6 is not "+" or "-" | |
| first_parts = raw_lines[0].split('\t') | |
| skip_first = False | |
| if len(first_parts) >= 6: | |
| col5 = first_parts[4] | |
| col6 = first_parts[5].strip() | |
| if col5 != '0' or (col6 != '+' and col6 != '-'): | |
| # This is a header row, skip it | |
| skip_first = True | |
| data_lines = raw_lines[1:] if skip_first else raw_lines | |
| for line_idx, line in enumerate(data_lines): | |
| orig_line_num = line_idx + (2 if skip_first else 1) | |
| parts = line.split('\t') | |
| if len(parts) < 6: | |
| validation_errors.append(f'Line {orig_line_num}: BED must have 6 tab-separated columns (found {len(parts)})') | |
| break | |
| # Validate strand | |
| strand = parts[5].strip() | |
| if strand not in ('+', '-'): | |
| validation_errors.append(f'Line {orig_line_num}: Strand must be "+" or "-" (found "{strand}")') | |
| break | |
| gene_names.append(parts[3]) | |
| bed_lines.append(line) | |
| # Check minimum gene count (4 genes required) | |
| if len(gene_names) < 4: | |
| validation_errors.append(f'Minimum 4 genes required. Found {len(gene_names)} data rows.') | |
| if validation_errors: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return jsonify({'success': False, 'error': '; '.join(validation_errors)}) | |
| # Save validated BED content (without header) | |
| bed_path = os.path.join(temp_dir, 'input.bed') | |
| with open(bed_path, 'w') as f: | |
| for line in bed_lines: | |
| f.write(line + '\n') | |
| # Parse and validate protein sequences (FASTA format) | |
| pep_path = os.path.join(temp_dir, 'input.pep') | |
| protein_names = [] | |
| try: | |
| with open(pep_path, 'w') as f: | |
| # Parse FASTA from text | |
| current_header = None | |
| current_seq = [] | |
| for line in sequences_text.split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if line.startswith('>'): | |
| # Save previous sequence | |
| if current_header and current_seq: | |
| f.write(f'>{current_header}\n') | |
| f.write(''.join(current_seq) + '\n') | |
| # Parse new header | |
| current_header = line[1:].split()[0] # Take first word after > | |
| protein_names.append(current_header) | |
| current_seq = [] | |
| else: | |
| current_seq.append(line) | |
| # Save last sequence | |
| if current_header and current_seq: | |
| f.write(f'>{current_header}\n') | |
| f.write(''.join(current_seq) + '\n') | |
| except Exception as e: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return jsonify({'success': False, 'error': f'Error parsing protein sequences: {str(e)}'}) | |
| if len(protein_names) == 0: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return jsonify({'success': False, 'error': 'No valid protein sequences found. Check FASTA format.'}) | |
| # Check if BED genes match protein sequences | |
| bed_genes_set = set(gene_names) | |
| protein_genes_set = set(protein_names) | |
| # Also check for isoform naming (gene.N pattern) | |
| for prot in protein_names: | |
| base_name = prot.rsplit('.', 1)[0] if '.' in prot else prot | |
| protein_genes_set.add(base_name) | |
| missing_in_proteins = bed_genes_set - protein_genes_set | |
| if len(missing_in_proteins) > len(gene_names) * 0.5: | |
| # More than 50% of genes are missing - warn user | |
| sample_missing = list(missing_in_proteins)[:5] | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Many BED genes not found in protein sequences ({len(missing_in_proteins)}/{len(gene_names)}). Sample: {", ".join(sample_missing)}' | |
| }) | |
| return jsonify({ | |
| 'success': True, | |
| 'run_key': run_key, | |
| 'visibility': visibility, | |
| 'bed_size': len(bed_content), | |
| 'gene_count': len(gene_names), | |
| 'protein_count': len(protein_names), | |
| 'message': f'Sequences uploaded successfully. Found {len(gene_names)} genes and {len(protein_names)} proteins.' | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_custom_run_mcscan(): | |
| """Start MCscan analysis for uploaded genome""" | |
| start_time = time.time() | |
| data = request.json | |
| # Check IP-based run limit FIRST (before any other processing) | |
| client_ip = get_remote_address() | |
| allowed, limit_msg = check_ip_run_limit(client_ip) | |
| if not allowed: | |
| # Record rate limit failure | |
| if ANALYTICS_AVAILABLE: | |
| record_event( | |
| feature_type=FEATURE_CUSTOM_GENOME, | |
| query_genome=None, | |
| status='failure', | |
| duration_ms=int((time.time() - start_time) * 1000), | |
| request=request, | |
| extra_data={'error': 'rate_limit'} | |
| ) | |
| return jsonify({'success': False, 'error': limit_msg}), 429 | |
| run_key = data.get('run_key') | |
| genomes = data.get('genomes', []) | |
| display_name = data.get('display_name', '') # User-friendly genome name | |
| visibility = data.get('visibility', 'public') # 'public' or 'private' | |
| # MCscan parameters (optional user overrides) | |
| cscore = data.get('cscore') # Default: 0.99 (set in shell script) | |
| min_anchor = data.get('min_anchor') # Default: 4 (set in shell script) | |
| gap_length = data.get('gap_length') # Default: 20 (set in shell script) | |
| # Validate numeric parameters if provided | |
| if cscore is not None: | |
| try: | |
| cscore = float(cscore) | |
| if not (0.0 <= cscore <= 1.0): | |
| return jsonify({'success': False, 'error': 'C-score must be between 0 and 1'}) | |
| except (ValueError, TypeError): | |
| return jsonify({'success': False, 'error': 'Invalid C-score value'}) | |
| if min_anchor is not None: | |
| try: | |
| min_anchor = int(min_anchor) | |
| if min_anchor < 1: | |
| return jsonify({'success': False, 'error': 'Minimum anchor count must be at least 1'}) | |
| except (ValueError, TypeError): | |
| return jsonify({'success': False, 'error': 'Invalid minimum anchor count value'}) | |
| if gap_length is not None: | |
| try: | |
| gap_length = int(gap_length) | |
| if gap_length < 1: | |
| return jsonify({'success': False, 'error': 'Gap length must be at least 1'}) | |
| except (ValueError, TypeError): | |
| return jsonify({'success': False, 'error': 'Invalid gap length value'}) | |
| if not run_key: | |
| return jsonify({'success': False, 'error': 'run_key is required'}) | |
| if not display_name or not display_name.strip(): | |
| return jsonify({'success': False, 'error': 'Genome Display Name is required'}) | |
| if not genomes: | |
| return jsonify({'success': False, 'error': 'At least one comparison genome is required'}) | |
| # Validate genomes | |
| for g in genomes: | |
| if g not in AVAILABLE_GENOMES: | |
| return jsonify({'success': False, 'error': f'Invalid genome: {g}'}) | |
| # Check temp directory for uploaded files | |
| temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key) | |
| if not os.path.exists(temp_dir): | |
| return jsonify({'success': False, 'error': f'Run key not found: {run_key}'}) | |
| # Detect upload type: GFF3-based or sequences-based (BED) | |
| gff3_path = os.path.join(temp_dir, 'input.gff3') | |
| bed_path = os.path.join(temp_dir, 'input.bed') | |
| pep_path = os.path.join(temp_dir, 'input.pep') | |
| is_sequences_based = os.path.exists(bed_path) and not os.path.exists(gff3_path) | |
| is_gff3_based = os.path.exists(gff3_path) | |
| if not os.path.exists(pep_path): | |
| return jsonify({'success': False, 'error': 'Protein sequences file not found. Please upload files first.'}) | |
| if is_sequences_based: | |
| # Sequences-based upload: use BED file directly | |
| input_file = bed_path | |
| elif is_gff3_based: | |
| # GFF3-based upload: use GFF3 file | |
| input_file = gff3_path | |
| else: | |
| return jsonify({'success': False, 'error': 'Input files not found. Please upload files first.'}) | |
| # Create metadata directory for status tracking | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| os.makedirs(meta_dir, exist_ok=True) | |
| # Create initial status file in metadata directory | |
| status_file = os.path.join(meta_dir, 'job_status.json') | |
| with open(status_file, 'w') as f: | |
| json.dump({ | |
| 'status': 'queued', | |
| 'progress': 0, | |
| 'message': 'Job queued, waiting for available slot...', | |
| 'run_key': run_key, | |
| 'genomes': genomes, | |
| 'visibility': visibility, | |
| 'total_steps': len(genomes) + 3, | |
| 'input_type': 'sequences' if is_sequences_based else 'gff3', | |
| 'params': { | |
| 'cscore': cscore, | |
| 'min_anchor': min_anchor, | |
| 'gap_length': gap_length | |
| } | |
| }, f) | |
| # Record this run against the IP (for daily limit tracking) | |
| record_ip_run(client_ip) | |
| # Start background processing | |
| thread = threading.Thread( | |
| target=run_mcscan_background, | |
| args=(run_key, gff3_path if is_gff3_based else None, pep_path, genomes, display_name), | |
| kwargs={ | |
| 'bed_path': bed_path if is_sequences_based else None, | |
| 'visibility': visibility, | |
| 'cscore': cscore, | |
| 'min_anchor': min_anchor, | |
| 'gap_length': gap_length | |
| } | |
| ) | |
| thread.daemon = True | |
| thread.start() | |
| # Note: Analytics is now recorded in run_mcscan_background when job completes | |
| # to capture actual success/failure status | |
| # Estimate time based on number of genomes | |
| est_time = len(genomes) * 1 # ~1 minute per genome | |
| return jsonify({ | |
| 'success': True, | |
| 'run_key': run_key, | |
| 'genomes': genomes, | |
| 'message': f'MCscan analysis queued against {len(genomes)} genome(s). Jobs run one at a time.', | |
| 'estimated_minutes': est_time, | |
| 'runs_remaining': MAX_RUNS_PER_IP - len(_ip_run_tracker['runs'].get(client_ip, [])) | |
| }) | |
| def api_custom_status(run_key): | |
| """Get status of a custom genome MCscan job""" | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| if not os.path.exists(meta_dir): | |
| return jsonify({'success': False, 'error': f'Run key not found: {run_key}'}) | |
| status_file = os.path.join(meta_dir, 'job_status.json') | |
| if os.path.exists(status_file): | |
| with open(status_file, 'r') as f: | |
| status = json.load(f) | |
| # Transform internal error codes to user-friendly messages | |
| if status.get('status') == 'failed' and 'message' in status: | |
| status['message'] = extract_user_error_from_log(status['message'], run_key) | |
| # Add queue position info | |
| queue_pos = get_queue_position(run_key) | |
| if queue_pos >= 0: | |
| status['queue_position'] = queue_pos | |
| return jsonify({'success': True, 'data': status}) | |
| else: | |
| return jsonify({ | |
| 'success': True, | |
| 'data': { | |
| 'status': 'unknown', | |
| 'message': 'Job status not available' | |
| } | |
| }) | |
| def api_queue_status(): | |
| """Get current MCscan job queue status""" | |
| with _job_queue_status['queue_lock']: | |
| current_job = _job_queue_status['current_job'] | |
| queue_positions = dict(_job_queue_status['queue_position']) | |
| waiting_jobs = [k for k, v in queue_positions.items() if v > 0] | |
| return jsonify({ | |
| 'success': True, | |
| 'current_job': current_job, | |
| 'jobs_in_queue': len(waiting_jobs), | |
| 'waiting_jobs': waiting_jobs, | |
| 'max_concurrent': 1, | |
| 'max_runs_per_ip': MAX_RUNS_PER_IP, | |
| 'run_limit_window_hours': RUN_LIMIT_WINDOW_HOURS | |
| }) | |
| def api_custom_lookup(run_key): | |
| """Lookup a custom genome run by its key (for sharing)""" | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| if not os.path.exists(meta_dir): | |
| return jsonify({'success': False, 'error': f'Run not found: {run_key}'}) | |
| manifest_file = os.path.join(meta_dir, 'manifest.json') | |
| status_file = os.path.join(meta_dir, 'job_status.json') | |
| result = { | |
| 'run_key': run_key, | |
| 'exists': True | |
| } | |
| if os.path.exists(manifest_file): | |
| with open(manifest_file, 'r') as f: | |
| result['manifest'] = json.load(f) | |
| if os.path.exists(status_file): | |
| with open(status_file, 'r') as f: | |
| result['status'] = json.load(f) | |
| return jsonify({'success': True, 'data': result}) | |
| def api_custom_genomes(): | |
| """List all available custom genomes""" | |
| if not os.path.exists(CUSTOM_META_DIR): | |
| return jsonify({'success': True, 'genomes': []}) | |
| genomes = [] | |
| for run_key in os.listdir(CUSTOM_META_DIR): | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| if not os.path.isdir(meta_dir): | |
| continue | |
| genome_info = {'run_key': run_key} | |
| visibility = 'public' # Default to public for legacy runs | |
| manifest_file = os.path.join(meta_dir, 'manifest.json') | |
| if os.path.exists(manifest_file): | |
| try: | |
| with open(manifest_file, 'r') as f: | |
| manifest_data = json.load(f) | |
| genome_info['manifest'] = manifest_data | |
| visibility = manifest_data.get('visibility', 'public') | |
| except: | |
| pass | |
| # Check job_status.json for visibility if not in manifest (for runs still processing) | |
| status_file = os.path.join(meta_dir, 'job_status.json') | |
| if os.path.exists(status_file): | |
| try: | |
| with open(status_file, 'r') as f: | |
| status_data = json.load(f) | |
| genome_info['status'] = status_data.get('status', 'unknown') | |
| genome_info['progress'] = status_data.get('progress', 0) | |
| # Use visibility from status if manifest doesn't have it | |
| if visibility == 'public' and 'visibility' in status_data: | |
| visibility = status_data.get('visibility', 'public') | |
| except: | |
| genome_info['status'] = 'unknown' | |
| # Debug logging | |
| # print(f"DEBUG: Processing {run_key}. Visibility: {visibility}. Included: {visibility == 'public'}") | |
| # Only include public runs in the listing | |
| if visibility == 'public': | |
| genomes.append(genome_info) | |
| # Sort by creation date (newest first) | |
| genomes.sort(key=lambda x: x.get('manifest', {}).get('created_at', ''), reverse=True) | |
| response = jsonify({'success': True, 'genomes': genomes}) | |
| response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' | |
| return response | |
| def api_custom_delete(run_key): | |
| """Delete a custom genome run and its associated files in Mcscan_results""" | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| if not os.path.exists(meta_dir): | |
| return jsonify({'success': False, 'error': f'Run not found: {run_key}'}) | |
| try: | |
| # First, load manifest to get list of files in Mcscan_results | |
| manifest_file = os.path.join(meta_dir, 'manifest.json') | |
| if os.path.exists(manifest_file): | |
| with open(manifest_file, 'r') as f: | |
| manifest = json.load(f) | |
| # Clean up files in Mcscan_results folder | |
| cleanup_mcscan_results_files(manifest) | |
| # Then delete the custom genome metadata folder | |
| shutil.rmtree(meta_dir) | |
| return jsonify({'success': True, 'message': f'Deleted run: {run_key}'}) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_custom_plot(): | |
| """Generate microsynteny plot using a custom genome as query""" | |
| data = request.json | |
| run_key = data.get('run_key') | |
| genes = data.get('genes', []) | |
| comparisons = data.get('comparisons', []) | |
| colors = data.get('colors', []) | |
| annotations = data.get('annotations', []) | |
| layout = data.get('layout', None) | |
| genome_order = data.get('genome_order', None) | |
| # Advanced tweaking parameters (optional) | |
| padding_config = data.get('padding_config', {}) # {genome: {left: bp, right: bp}} | |
| max_genes_config = data.get('max_genes_config', {}) # {genome: {left: count, right: count}} | |
| display_names = data.get('display_names', {}) # {genome: "Custom Name"} | |
| # Gene labels parameters (optional) | |
| gene_labels = data.get('gene_labels', []) # List of gene IDs to label on the plot | |
| gene_label_size = data.get('gene_label_size', 0) # Font size for labels (0=disabled, 2-8 recommended) | |
| # Debug logging for tweaking parameters | |
| logger.debug(f"api_custom_plot received:") | |
| logger.debug(f" padding_config: {padding_config}") | |
| logger.debug(f" max_genes_config: {max_genes_config}") | |
| logger.debug(f" display_names: {display_names}") | |
| logger.debug(f" gene_labels: {gene_labels}") | |
| logger.debug(f" gene_label_size: {gene_label_size}") | |
| if not run_key: | |
| return jsonify({'success': False, 'error': 'run_key is required'}) | |
| if not genes: | |
| return jsonify({'success': False, 'error': 'At least one gene is required'}) | |
| if not comparisons: | |
| return jsonify({'success': False, 'error': 'At least one comparison genome is required'}) | |
| # Validate and sanitize gene IDs (security measure) | |
| validated_genes, error_msg = sanitize_gene_ids(genes) | |
| if error_msg: | |
| return jsonify({'success': False, 'error': error_msg}) | |
| genes = validated_genes | |
| # Also validate gene_labels if provided | |
| if gene_labels: | |
| validated_labels, label_error = sanitize_gene_ids(gene_labels) | |
| if label_error: | |
| return jsonify({'success': False, 'error': f'Gene labels: {label_error}'}) | |
| gene_labels = validated_labels | |
| # Files are in the main Mcscan_results folder - check there | |
| i1_blocks_dir = os.path.join(MCSCAN_RESULTS_DIR, 'i1_blocks') | |
| # Check that comparison results exist (try both name orderings) | |
| for comp in comparisons: | |
| blocks_file1 = os.path.join(i1_blocks_dir, f'{run_key}.{comp}.i1.blocks') | |
| blocks_file2 = os.path.join(i1_blocks_dir, f'{comp}.{run_key}.i1.blocks') | |
| if not os.path.exists(blocks_file1) and not os.path.exists(blocks_file2): | |
| return jsonify({'success': False, 'error': f'No MCscan results for comparison with {comp}'}) | |
| # Get display name from manifest (user-provided genome name), fallback to run_key | |
| custom_display_name = run_key | |
| mcscan_params = None # Will be read from manifest if available | |
| meta_dir = os.path.join(CUSTOM_META_DIR, run_key) | |
| manifest_file = os.path.join(meta_dir, 'manifest.json') | |
| if os.path.exists(manifest_file): | |
| try: | |
| with open(manifest_file, 'r') as f: | |
| manifest = json.load(f) | |
| if manifest.get('display_name'): | |
| custom_display_name = manifest['display_name'] | |
| # Read MCscan parameters from manifest for Method row in CSV | |
| if manifest.get('mcscan_params'): | |
| mcscan_params = manifest['mcscan_params'] | |
| except Exception as e: | |
| logger.warning(f"Could not read manifest: {e}") | |
| try: | |
| # Build command for the plotting script | |
| script_path = os.path.join(SCRIPTS_DIR, 'plot_user_genes_microsynteny_v2.py') | |
| if not os.path.exists(script_path): | |
| return jsonify({'success': False, 'error': f'Plotting script not found'}) | |
| # Build args - use run_key as the query genome, using PYTHON_BIN for HuggingFace Spaces | |
| args = [PYTHON_BIN, script_path] | |
| if colors: | |
| args.extend(['--colors', ','.join(colors)]) | |
| args.extend(['--query', run_key, '--genes'] + genes + ['--comparisons'] + comparisons) | |
| if annotations: | |
| annotations_str = '|||'.join(str(a) for a in annotations) | |
| args.extend(['--annotations', annotations_str]) | |
| # Pass MCscan parameters for Method row in CSV (if available from manifest) | |
| if mcscan_params: | |
| args.extend(['--mcscan-params', json.dumps(mcscan_params)]) | |
| if layout and isinstance(layout, list) and len(layout) > 0: | |
| layout_str = ','.join(map(str, layout)) | |
| args.extend(['--layout', layout_str]) | |
| if genome_order and isinstance(genome_order, list): | |
| order_str = ','.join(genome_order) | |
| args.extend(['--genome-order', order_str]) | |
| # Add advanced tweaking parameters | |
| # For custom genomes, the query is referenced as 'custom_query' in the frontend | |
| # but the actual run_key is used in the backend | |
| query_ref = 'custom_query' # Frontend reference for query genome | |
| # Padding configuration for query genome (asymmetric) | |
| if query_ref in padding_config and padding_config[query_ref]: | |
| query_pad = padding_config[query_ref] | |
| if 'left' in query_pad and query_pad['left'] is not None: | |
| args.extend(['--query-padding-left', str(int(query_pad['left']))]) | |
| if 'right' in query_pad and query_pad['right'] is not None: | |
| args.extend(['--query-padding-right', str(int(query_pad['right']))]) | |
| # Max genes configuration for query genome (asymmetric) | |
| if query_ref in max_genes_config and max_genes_config[query_ref]: | |
| query_genes = max_genes_config[query_ref] | |
| if 'left' in query_genes and query_genes['left'] is not None: | |
| args.extend(['--query-max-genes-left', str(int(query_genes['left']))]) | |
| if 'right' in query_genes and query_genes['right'] is not None: | |
| args.extend(['--query-max-genes-right', str(int(query_genes['right']))]) | |
| # Comparison genomes padding (format: genome:left:right|genome2:left:right) | |
| comp_padding_parts = [] | |
| for comp in comparisons: | |
| if comp in padding_config and padding_config[comp]: | |
| comp_pad = padding_config[comp] | |
| left_val = int(comp_pad.get('left', 1500000)) | |
| right_val = int(comp_pad.get('right', 1500000)) | |
| comp_padding_parts.append(f"{comp}:{left_val}:{right_val}") | |
| if comp_padding_parts: | |
| comp_padding_str = '|'.join(comp_padding_parts) | |
| args.extend(['--comp-padding-config', comp_padding_str]) | |
| # Comparison genomes max genes (format: genome:left:right|genome2:left:right) | |
| comp_genes_parts = [] | |
| for comp in comparisons: | |
| if comp in max_genes_config and max_genes_config[comp]: | |
| comp_genes = max_genes_config[comp] | |
| left_val = int(comp_genes.get('left', 50)) | |
| right_val = int(comp_genes.get('right', 50)) | |
| comp_genes_parts.append(f"{comp}:{left_val}:{right_val}") | |
| if comp_genes_parts: | |
| comp_genes_str = '|'.join(comp_genes_parts) | |
| args.extend(['--comp-max-genes-config', comp_genes_str]) | |
| # Custom display names (format: genome:CustomName|genome2:Name2) | |
| # For custom genomes: 'custom_query' in frontend maps to the actual run_key | |
| if display_names: | |
| display_parts = [] | |
| for genome, name in display_names.items(): | |
| if name and name.strip(): | |
| # Map 'custom_query' to actual run_key for the plotting script | |
| actual_genome = run_key if genome == 'custom_query' else genome | |
| # Escape special characters in display name | |
| safe_name = name.replace('|', '_').replace(':', '_') | |
| display_parts.append(f"{actual_genome}:{safe_name}") | |
| if display_parts: | |
| display_str = '|'.join(display_parts) | |
| args.extend(['--display-names', display_str]) | |
| # Gene labels (list of gene IDs to display labels for on the plot) | |
| if gene_labels and isinstance(gene_labels, list) and len(gene_labels) > 0: | |
| gene_labels_str = ','.join(str(g) for g in gene_labels if g) | |
| if gene_labels_str: | |
| args.extend(['--genelabels', gene_labels_str]) | |
| # Add label size (default to 8 if labels are provided but size not specified) | |
| label_size = int(gene_label_size) if gene_label_size else 8 | |
| if label_size > 0: | |
| args.extend(['--genelabelsize', str(label_size)]) | |
| # Keep low-confidence coloring option (optional - colors all syntenic matches) | |
| keep_lowconf_color = data.get('keep_lowconf_color', False) | |
| if keep_lowconf_color: | |
| args.extend(['--keep-lowconf-color']) | |
| # Debug: print final command | |
| logger.debug(f"api_custom_plot final command args: {' '.join(args)}") | |
| # Set environment - files are in standard Mcscan_results folder now | |
| env = os.environ.copy() | |
| env['PYTHON'] = PYTHON_BIN | |
| # Mark this as a custom genome for the script to handle appropriately | |
| env['CUSTOM_GENOME_KEY'] = run_key | |
| # Pass the display name for plot labels | |
| env['CUSTOM_DISPLAY_NAME'] = custom_display_name | |
| result = subprocess.run( | |
| args, | |
| capture_output=True, | |
| text=True, | |
| env=env, | |
| cwd=SCRIPTS_DIR | |
| ) | |
| if result.returncode == 0: | |
| # Find the output folder | |
| # Priority 1: Check if custom_query has a display name in tweaking config | |
| # (note: 'custom_query' in frontend maps to run_key for the script) | |
| # Priority 2: Use custom_display_name from manifest | |
| folder_display_name = display_names.get('custom_query') if display_names and 'custom_query' in display_names else custom_display_name | |
| # Escape special characters to match what was passed to script | |
| folder_display_name = folder_display_name.replace('|', '_').replace(':', '_') | |
| comp_str = '_'.join(comparisons) | |
| pattern_prefix = f"{folder_display_name}_usergenes_{comp_str}_" | |
| logger.debug(f"Looking for custom genome output folder with prefix: {pattern_prefix}") | |
| output_folder = None | |
| if os.path.exists(OUTPUT_DIR): | |
| import re | |
| folders = [] | |
| for f in os.listdir(OUTPUT_DIR): | |
| if f.startswith(pattern_prefix): | |
| remainder = f[len(pattern_prefix):] | |
| if re.match(r'^\d{8}_\d{6}$', remainder): | |
| folders.append(f) | |
| if folders: | |
| folders.sort(reverse=True) | |
| output_folder = os.path.join(OUTPUT_DIR, folders[0]) | |
| if output_folder and os.path.exists(output_folder): | |
| files = {} | |
| for fname in os.listdir(output_folder): | |
| if fname == 'microsynteny_plot.png': | |
| files['png'] = fname | |
| elif fname == 'microsynteny_plot.svg': | |
| files['svg'] = fname | |
| elif fname.endswith('.csv'): | |
| files['csv'] = fname | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'Plot generated successfully', | |
| 'output_folder': os.path.basename(output_folder), | |
| 'files': files | |
| }) | |
| else: | |
| # Log details for debugging (not shown to user) | |
| if app.debug: | |
| logger.debug(f"Custom plot output folder not found. stdout: {result.stdout}") | |
| logger.debug(f"stderr: {result.stderr}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Output folder not found after plot generation' | |
| }) | |
| else: | |
| error_msg = 'Script failed' | |
| combined = result.stdout + '\n' + result.stderr | |
| if 'No syntenic matches' in combined: | |
| error_msg = 'No syntenic matches found for the specified genes' | |
| elif 'ERROR:' in combined: | |
| for line in combined.split('\n'): | |
| if 'ERROR:' in line: | |
| error_msg = line.split('ERROR:')[-1].strip() | |
| break | |
| # Log details for debugging (not shown to user) | |
| if app.debug: | |
| logger.debug(f"Custom plot script failed. stdout: {result.stdout}") | |
| logger.debug(f"stderr: {result.stderr}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': error_msg | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_custom_genes(run_key): | |
| """Get list of genes from a custom genome's BED file in main Mcscan_results folder""" | |
| # BED file is now in the main bed_files folder | |
| bed_files_dir = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files') | |
| bed_file = os.path.join(bed_files_dir, f'{run_key}.bed') | |
| if not os.path.exists(bed_file): | |
| return jsonify({'success': False, 'error': f'BED file not found for run: {run_key}'}) | |
| genes = [] | |
| try: | |
| with open(bed_file, 'r') as f: | |
| for line in f: | |
| if line.strip(): | |
| parts = line.strip().split('\t') | |
| if len(parts) >= 4: | |
| genes.append({ | |
| 'chr': parts[0], | |
| 'start': int(parts[1]), | |
| 'end': int(parts[2]), | |
| 'gene_id': parts[3] | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| return jsonify({'success': True, 'genes': genes, 'total': len(genes)}) | |
| # ============================================================================ | |
| # Custom Synteny (Multi-Genome) API Routes | |
| # ============================================================================ | |
| # Custom Synteny metadata directory | |
| CUSTOM_SYNTENY_META_DIR = os.path.join(MCSCAN_RESULTS_DIR, 'custom_synteny_meta') | |
| os.makedirs(CUSTOM_SYNTENY_META_DIR, exist_ok=True) | |
| # Ensure bed_files and pep_files directories exist | |
| os.makedirs(os.path.join(MCSCAN_RESULTS_DIR, 'bed_files'), exist_ok=True) | |
| os.makedirs(os.path.join(MCSCAN_RESULTS_DIR, 'pep_files'), exist_ok=True) | |
| def api_custom_synteny_upload(): | |
| """Upload multiple genomes for custom synteny analysis""" | |
| try: | |
| project_name = request.form.get('project_name', '').strip() | |
| if not project_name: | |
| return jsonify({'success': False, 'error': 'Project name is required'}) | |
| # Validate project name length | |
| valid, error = validate_name_length(project_name, 'Project Name') | |
| if not valid: | |
| return jsonify({'success': False, 'error': error}) | |
| visibility = request.form.get('visibility', 'public').strip() | |
| # Get or create run key | |
| run_key = request.form.get('run_key', '').strip() | |
| if not run_key: | |
| # Create new run key from project name | |
| base_key = secure_filename(project_name.replace(' ', '_').lower()) | |
| if not base_key: | |
| return jsonify({'success': False, 'error': 'Project name contains invalid characters'}) | |
| # For private runs, always generate a unique key with random suffix including symbol | |
| if visibility == 'private': | |
| run_key = generate_private_run_key(base_key) | |
| else: | |
| run_key = base_key | |
| # Check if exists, append suffix if needed | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| if os.path.exists(meta_dir): | |
| run_key = run_key + '_' + uuid.uuid4().hex[:4] | |
| # Parse genome metadata | |
| genomes_json = request.form.get('genomes', '[]') | |
| try: | |
| genomes_meta = json.loads(genomes_json) | |
| except json.JSONDecodeError: | |
| return jsonify({'success': False, 'error': 'Invalid genomes metadata'}) | |
| # Parse DB genomes and comparison pairs | |
| db_genomes_json = request.form.get('db_genomes', '[]') | |
| pairs_json = request.form.get('pairs', '[]') | |
| try: | |
| db_genomes = json.loads(db_genomes_json) | |
| selected_pairs = json.loads(pairs_json) | |
| except json.JSONDecodeError: | |
| return jsonify({'success': False, 'error': 'Invalid JSON data'}) | |
| # Create directories | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| os.makedirs(meta_dir, exist_ok=True) | |
| # Process each genome | |
| uploaded_genomes = [] | |
| for idx, genome_info in enumerate(genomes_meta): | |
| gff3_key = f'gff3_{idx}' | |
| pep_key = f'pep_{idx}' | |
| if gff3_key not in request.files or pep_key not in request.files: | |
| continue # Skip if files not present | |
| gff3_file = request.files[gff3_key] | |
| pep_file = request.files[pep_key] | |
| if gff3_file.filename == '' or pep_file.filename == '': | |
| continue # Skip empty files | |
| # Validate file extensions | |
| if not allowed_file(gff3_file.filename): | |
| return jsonify({'success': False, 'error': f'Invalid GFF3 file extension for genome {genome_info.get("displayName", idx)}. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'}) | |
| if not allowed_file(pep_file.filename): | |
| return jsonify({'success': False, 'error': f'Invalid PEP file extension for genome {genome_info.get("displayName", idx)}. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'}) | |
| genome_id = genome_info.get('id', idx) | |
| display_name = genome_info.get('displayName', f'Genome_{idx}') | |
| # Create genome directory | |
| genome_dir = os.path.join(meta_dir, f'genome_{genome_id}') | |
| os.makedirs(genome_dir, exist_ok=True) | |
| # Save files | |
| gff3_path = os.path.join(genome_dir, 'input.gff3') | |
| pep_path = os.path.join(genome_dir, 'input.pep') | |
| gff3_file.save(gff3_path) | |
| pep_file.save(pep_path) | |
| # Basic validation | |
| gff3_size = os.path.getsize(gff3_path) | |
| pep_size = os.path.getsize(pep_path) | |
| if gff3_size == 0 or pep_size == 0: | |
| shutil.rmtree(genome_dir, ignore_errors=True) | |
| return jsonify({'success': False, 'error': f'Empty file for genome: {display_name}'}) | |
| # Count genes/proteins | |
| gene_count = 0 | |
| with open(gff3_path, 'r') as f: | |
| for line in f: | |
| if not line.startswith('#') and '\tgene\t' in line: | |
| gene_count += 1 | |
| protein_count = 0 | |
| with open(pep_path, 'r') as f: | |
| for line in f: | |
| if line.startswith('>'): | |
| protein_count += 1 | |
| # Create sanitized key from display name | |
| genome_key = secure_filename(display_name.replace(' ', '_').lower()) | |
| # Save genome metadata | |
| genome_meta = { | |
| 'id': genome_id, | |
| 'key': genome_key, | |
| 'displayName': display_name, | |
| 'visibility': visibility, | |
| 'gff3_size': gff3_size, | |
| 'pep_size': pep_size, | |
| 'gene_count': gene_count, | |
| 'protein_count': protein_count, | |
| 'uploaded_at': datetime.now().isoformat() | |
| } | |
| with open(os.path.join(genome_dir, 'metadata.json'), 'w') as f: | |
| json.dump(genome_meta, f, indent=2) | |
| uploaded_genomes.append(genome_meta) | |
| if len(uploaded_genomes) < 2: | |
| shutil.rmtree(meta_dir, ignore_errors=True) | |
| return jsonify({'success': False, 'error': 'At least 2 genomes with files are required'}) | |
| # Create project manifest | |
| manifest = { | |
| 'project_name': project_name, | |
| 'run_key': run_key, | |
| 'created_at': datetime.now().isoformat(), | |
| 'visibility': visibility, | |
| 'genomes': uploaded_genomes, | |
| 'db_genomes': db_genomes, | |
| 'selected_pairs': selected_pairs, | |
| 'genome_count': len(uploaded_genomes), | |
| 'comparison_count': len(selected_pairs) | |
| } | |
| manifest_path = os.path.join(meta_dir, 'manifest.json') | |
| with open(manifest_path, 'w') as f: | |
| json.dump(manifest, f, indent=2) | |
| return jsonify({ | |
| 'success': True, | |
| 'run_key': run_key, | |
| 'genomes': uploaded_genomes, | |
| 'genome_count': len(uploaded_genomes), | |
| 'message': f'Successfully uploaded {len(uploaded_genomes)} genomes' | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_custom_synteny_run_mcscan(): | |
| """Start MCscan analysis for custom synteny project""" | |
| start_time = time.time() | |
| data = request.json | |
| # Check IP-based run limit FIRST (before any other processing) | |
| client_ip = get_remote_address() | |
| allowed, limit_msg = check_ip_run_limit(client_ip) | |
| if not allowed: | |
| # Record rate limit failure | |
| if ANALYTICS_AVAILABLE: | |
| record_event( | |
| feature_type=FEATURE_CUSTOM_SYNTENY, | |
| query_genome=None, | |
| status='failure', | |
| duration_ms=int((time.time() - start_time) * 1000), | |
| request=request, | |
| extra_data={'error': 'rate_limit'} | |
| ) | |
| return jsonify({'success': False, 'error': limit_msg}), 429 | |
| run_key = data.get('run_key') | |
| # MCscan parameters | |
| cscore = data.get('cscore') | |
| min_anchor = data.get('min_anchor') | |
| gap_length = data.get('gap_length') | |
| if not run_key: | |
| return jsonify({'success': False, 'error': 'run_key is required'}) | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| if not os.path.exists(meta_dir): | |
| return jsonify({'success': False, 'error': f'Run key not found: {run_key}'}) | |
| # Load manifest to get pairs and genomes | |
| manifest_path = os.path.join(meta_dir, 'manifest.json') | |
| if not os.path.exists(manifest_path): | |
| return jsonify({'success': False, 'error': 'Manifest not found'}) | |
| with open(manifest_path, 'r') as f: | |
| manifest = json.load(f) | |
| selected_pairs = manifest.get('selected_pairs', []) | |
| db_genomes = manifest.get('db_genomes', []) | |
| if not selected_pairs: | |
| return jsonify({'success': False, 'error': 'No comparison pairs found in manifest'}) | |
| # Validate db_genomes | |
| for g in db_genomes: | |
| if g not in AVAILABLE_GENOMES: | |
| return jsonify({'success': False, 'error': f'Invalid database genome: {g}'}) | |
| # Estimate time based on pairs | |
| estimated_minutes = len(selected_pairs) * 2 # ~2 minutes per pair | |
| # Create status file | |
| status_file = os.path.join(meta_dir, 'job_status.json') | |
| with open(status_file, 'w') as f: | |
| json.dump({ | |
| 'status': 'queued', | |
| 'progress': 0, | |
| 'message': 'Job queued, waiting for available slot...', | |
| 'run_key': run_key, | |
| 'selected_pairs': selected_pairs, | |
| 'db_genomes': db_genomes, | |
| 'total_pairs': len(selected_pairs), | |
| 'completed_pairs': 0, | |
| 'params': { | |
| 'cscore': cscore, | |
| 'min_anchor': min_anchor, | |
| 'gap_length': gap_length | |
| } | |
| }, f, indent=2) | |
| # Record this run against the IP (for daily limit tracking) | |
| record_ip_run(client_ip) | |
| # Start background processing | |
| thread = threading.Thread( | |
| target=run_custom_synteny_background, | |
| args=(run_key, selected_pairs, db_genomes), | |
| kwargs={ | |
| 'cscore': cscore, | |
| 'min_anchor': min_anchor, | |
| 'gap_length': gap_length | |
| } | |
| ) | |
| thread.daemon = True | |
| thread.start() | |
| # Note: Analytics is now recorded in run_custom_synteny_background when job completes | |
| # to capture actual success/failure status | |
| return jsonify({ | |
| 'success': True, | |
| 'run_key': run_key, | |
| 'total_pairs': len(selected_pairs), | |
| 'estimated_minutes': estimated_minutes, | |
| 'message': f'MCscan analysis queued for {len(selected_pairs)} comparison pairs. Jobs run one at a time.', | |
| 'runs_remaining': MAX_RUNS_PER_IP - len(_ip_run_tracker['runs'].get(client_ip, [])) | |
| }) | |
| def run_custom_synteny_background(run_key, selected_pairs, db_genomes, cscore=None, min_anchor=None, gap_length=None): | |
| """Background task for custom synteny MCscan processing with queue management | |
| Uses the existing process_custom_genome.sh script for each custom genome, | |
| which properly handles GFF3->BED conversion and places files in the correct | |
| Mcscan_results directories. | |
| """ | |
| import traceback # For detailed error logging | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| status_file = os.path.join(meta_dir, 'job_status.json') | |
| manifest_path = os.path.join(meta_dir, 'manifest.json') | |
| def update_status(status, progress, message, **extra): | |
| data = { | |
| 'status': status, | |
| 'progress': progress, | |
| 'message': message, | |
| 'run_key': run_key, | |
| 'selected_pairs': selected_pairs, | |
| 'db_genomes': db_genomes, | |
| 'total_pairs': len(selected_pairs), | |
| 'last_updated': datetime.now().isoformat() # Track when status was last updated | |
| } | |
| data.update(extra) | |
| with open(status_file, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| try: | |
| # Wait for job slot (queue system - only 1 MCscan at a time) | |
| queue_pos = get_queue_position(run_key) | |
| if queue_pos > 0: | |
| update_status('queued', 0, f'Waiting in queue (position {queue_pos})...', queue_position=queue_pos) | |
| # Block until we get a slot | |
| acquire_job_slot(run_key) | |
| update_status('running', 2, 'Starting custom synteny analysis...') | |
| # Load manifest | |
| with open(manifest_path, 'r') as f: | |
| manifest = json.load(f) | |
| genomes_list = manifest.get('genomes', []) | |
| total_genomes = len(genomes_list) | |
| genome_keys = {} # Map genome_id -> key for MCscan | |
| # Add DB genomes to genome_keys and copy their BED files to bed_files directory | |
| for db_genome in db_genomes: | |
| genome_keys[db_genome] = db_genome | |
| # Copy BED file from Genomes/ to bed_files/ if not already there | |
| src_bed = os.path.join(SCRIPT_DIR, '..', 'Genomes', db_genome, f'{db_genome}.bed') | |
| dst_bed = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{db_genome}.bed') | |
| if os.path.exists(src_bed) and not os.path.exists(dst_bed): | |
| shutil.copy2(src_bed, dst_bed) | |
| # Step 1: Process each custom genome using the existing process_custom_genome.sh script | |
| # This properly converts GFF3 to BED and sets up all necessary files | |
| for idx, genome_meta in enumerate(genomes_list): | |
| genome_id = genome_meta.get('id') | |
| genome_key = genome_meta.get('key') | |
| display_name = genome_meta.get('displayName', genome_key) | |
| genome_dir = os.path.join(meta_dir, f'genome_{genome_id}') | |
| gff3_path = os.path.join(genome_dir, 'input.gff3') | |
| pep_path = os.path.join(genome_dir, 'input.pep') | |
| if not os.path.exists(gff3_path) or not os.path.exists(pep_path): | |
| update_status('failed', 0, f'Input files not found for genome {display_name}') | |
| return | |
| genome_keys[str(genome_id)] = genome_key | |
| genome_keys[f'custom_{genome_id}'] = genome_key | |
| # Check if this genome has already been processed (BED file exists) | |
| bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_key}.bed') | |
| if os.path.exists(bed_file): | |
| update_status('running', int(5 + (idx + 1) * 20 / total_genomes), | |
| f'Genome {display_name} already processed, skipping...') | |
| continue | |
| # Copy input files to temp directory (process_custom_genome.sh deletes its work dir) | |
| temp_dir = os.path.join(CUSTOM_TEMP_DIR, f'{run_key}_{genome_key}') | |
| os.makedirs(temp_dir, exist_ok=True) | |
| temp_gff3 = os.path.join(temp_dir, 'input.gff3') | |
| temp_pep = os.path.join(temp_dir, 'input.pep') | |
| shutil.copy2(gff3_path, temp_gff3) | |
| shutil.copy2(pep_path, temp_pep) | |
| # Use the existing process_custom_genome.py script | |
| # Find ALL db_genomes that this custom genome should be compared against | |
| # based on the selected_pairs | |
| comparison_genomes_for_this = [] | |
| for pair in selected_pairs: | |
| if isinstance(pair, dict): | |
| pair_genome1 = pair.get('genome1') | |
| pair_genome2 = pair.get('genome2') | |
| else: | |
| pair_genome1, pair_genome2 = pair | |
| # Check if this custom genome is involved in this pair | |
| if str(pair_genome1) == str(genome_id) or pair_genome1 == f'custom_{genome_id}' or pair_genome1 == genome_key: | |
| # This custom genome is genome1, check if genome2 is a db_genome | |
| if pair_genome2 in db_genomes: | |
| if pair_genome2 not in comparison_genomes_for_this: | |
| comparison_genomes_for_this.append(pair_genome2) | |
| elif str(pair_genome2) == str(genome_id) or pair_genome2 == f'custom_{genome_id}' or pair_genome2 == genome_key: | |
| # This custom genome is genome2, check if genome1 is a db_genome | |
| if pair_genome1 in db_genomes: | |
| if pair_genome1 not in comparison_genomes_for_this: | |
| comparison_genomes_for_this.append(pair_genome1) | |
| # Fallback: if no pairs found, use all db_genomes | |
| if not comparison_genomes_for_this: | |
| comparison_genomes_for_this = db_genomes if db_genomes else ['arabidopsis_thaliana'] | |
| # Join all comparison genomes with comma for the script | |
| comparison_genomes_str = ','.join(comparison_genomes_for_this) | |
| # Log the comparisons we're running for this genome | |
| logger.info(f"CustomSynteny: Processing {display_name} ({genome_key}) against {len(comparison_genomes_for_this)} db genomes: {comparison_genomes_str}") | |
| # Update status with comparison count info | |
| update_status('running', int(5 + idx * 20 / total_genomes), | |
| f'Processing genome {idx + 1}/{total_genomes}: {display_name} (running {len(comparison_genomes_for_this)} comparisons)...') | |
| script_path = os.path.join(SCRIPTS_DIR, 'process_custom_genome.py') | |
| # Using PYTHON_BIN for HuggingFace Spaces | |
| cmd = [ | |
| PYTHON_BIN, script_path, | |
| '--run-key', genome_key, # Use genome_key as run_key for file naming | |
| '--gff3', temp_gff3, | |
| '--pep', temp_pep, | |
| '--genomes', comparison_genomes_str, # Pass ALL comparison genomes | |
| '--meta-dir', os.path.join(CUSTOM_META_DIR, genome_key), # Use standard custom_meta location | |
| '--display-name', display_name, | |
| '--visibility', 'public' | |
| ] | |
| # Add MCscan parameters if provided | |
| if cscore is not None: | |
| cmd.extend(['--cscore', str(cscore)]) | |
| if min_anchor is not None: | |
| cmd.extend(['--min-anchor', str(min_anchor)]) | |
| if gap_length is not None: | |
| cmd.extend(['--gap-length', str(gap_length)]) | |
| # Run the script with timeout | |
| log_file = os.path.join(genome_dir, 'process.log') | |
| with open(log_file, 'w') as log: | |
| try: | |
| result = subprocess.run(cmd, stdout=log, stderr=subprocess.STDOUT, cwd=SCRIPTS_DIR, | |
| timeout=MCSCAN_JOB_TIMEOUT_SECONDS) | |
| except subprocess.TimeoutExpired: | |
| update_status('failed', 0, f'Job timed out for {display_name} after {MCSCAN_JOB_TIMEOUT_SECONDS // 3600} hours.') | |
| return | |
| if result.returncode != 0: | |
| # Read log for error details and extract meaningful error message | |
| with open(log_file, 'r') as log: | |
| log_content = log.read() | |
| # Extract user-friendly error message from log | |
| user_error_msg = extract_user_error_from_log(log_content, display_name) | |
| update_status('failed', 0, user_error_msg, | |
| error_log=log_content[-2000:]) # Keep last 2000 chars for debugging | |
| return | |
| # Store genome key mapping in manifest | |
| manifest['genome_keys'] = genome_keys | |
| with open(manifest_path, 'w') as f: | |
| json.dump(manifest, f, indent=2) | |
| update_status('running', 30, 'All genomes processed. Running additional pairwise comparisons...') | |
| # Step 2: Run any additional pairwise comparisons that weren't covered | |
| # The process_custom_genome.py script already ran each custom genome vs db_genomes | |
| # Now we need to run custom-to-custom comparisons (and any remaining pairs) | |
| completed = 0 | |
| results = {} | |
| # Log the genome_keys mapping for debugging | |
| logger.info(f"CustomSynteny: genome_keys mapping: {genome_keys}") | |
| logger.info(f"CustomSynteny: Processing {len(selected_pairs)} selected pairs") | |
| for pair in selected_pairs: | |
| # Handle different pair formats | |
| if isinstance(pair, dict): | |
| genome_a_id = pair.get('genome1') | |
| genome_b_id = pair.get('genome2') | |
| else: | |
| genome_a_id, genome_b_id = pair | |
| # Look up genome keys - check AVAILABLE_GENOMES first for both | |
| if genome_a_id in AVAILABLE_GENOMES: | |
| genome_a_key = genome_a_id | |
| else: | |
| genome_a_key = genome_keys.get(str(genome_a_id)) or genome_keys.get(genome_a_id) | |
| if genome_b_id in AVAILABLE_GENOMES: | |
| genome_b_key = genome_b_id | |
| else: | |
| genome_b_key = genome_keys.get(str(genome_b_id)) or genome_keys.get(genome_b_id) | |
| logger.info(f"CustomSynteny: Pair {genome_a_id} vs {genome_b_id} -> keys: {genome_a_key} vs {genome_b_key}") | |
| if not genome_a_key or not genome_b_key: | |
| logger.warning(f"CustomSynteny: Skipping pair - genome key not found: a={genome_a_key}, b={genome_b_key}") | |
| results[f'{genome_a_id}_vs_{genome_b_id}'] = {'status': 'skipped', 'error': 'Genome key not found'} | |
| completed += 1 | |
| continue | |
| # Check if this comparison was already done by process_custom_genome.py | |
| # The script runs custom_genome vs all specified db_genomes | |
| # For custom-to-custom pairs, we may need to run them | |
| pair_key = f'{genome_a_key}.{genome_b_key}' | |
| last_file = os.path.join(MCSCAN_RESULTS_DIR, 'last_filtered', f'{pair_key}.last.filtered') | |
| i1_file = os.path.join(MCSCAN_RESULTS_DIR, 'i1_blocks', f'{pair_key}.i1.blocks') | |
| # Also check reverse order | |
| pair_key_rev = f'{genome_b_key}.{genome_a_key}' | |
| last_file_rev = os.path.join(MCSCAN_RESULTS_DIR, 'last_filtered', f'{pair_key_rev}.last.filtered') | |
| i1_file_rev = os.path.join(MCSCAN_RESULTS_DIR, 'i1_blocks', f'{pair_key_rev}.i1.blocks') | |
| if os.path.exists(last_file) or os.path.exists(last_file_rev): | |
| logger.info(f"CustomSynteny: Pair {genome_a_key} vs {genome_b_key} already processed, skipping") | |
| results[f'{genome_a_key}_vs_{genome_b_key}'] = {'status': 'completed', 'note': 'Already processed'} | |
| completed += 1 | |
| continue | |
| logger.info(f"CustomSynteny: Running pairwise comparison: {genome_a_key} vs {genome_b_key}") | |
| update_status('running', 30 + int(65 * completed / len(selected_pairs)), | |
| f'Running comparison: {genome_a_key} vs {genome_b_key}...') | |
| # Run the pairwise comparison script - using PYTHON_BIN for HuggingFace Spaces | |
| script_path = os.path.join(SCRIPTS_DIR, 'process_custom_synteny_pair.py') | |
| if os.path.exists(script_path): | |
| # Build command with arguments instead of environment variables | |
| cmd = [ | |
| PYTHON_BIN, script_path, | |
| '--genome-a', genome_a_key, | |
| '--genome-b', genome_b_key | |
| ] | |
| if cscore: | |
| cmd.extend(['--cscore', str(cscore)]) | |
| if min_anchor: | |
| cmd.extend(['--min-anchor', str(min_anchor)]) | |
| if gap_length: | |
| cmd.extend(['--gap-length', str(gap_length)]) | |
| logger.info(f"CustomSynteny: Running command: {' '.join(cmd)}") | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| cwd=SCRIPTS_DIR | |
| ) | |
| if result.returncode == 0: | |
| logger.info(f"CustomSynteny: Comparison {genome_a_key} vs {genome_b_key} completed successfully") | |
| results[f'{genome_a_key}_vs_{genome_b_key}'] = {'status': 'completed'} | |
| else: | |
| logger.error(f"CustomSynteny: Comparison {genome_a_key} vs {genome_b_key} failed: {result.stderr[:500] if result.stderr else 'Unknown error'}") | |
| results[f'{genome_a_key}_vs_{genome_b_key}'] = { | |
| 'status': 'failed', | |
| 'error': result.stderr[:500] if result.stderr else 'Unknown error' | |
| } | |
| else: | |
| logger.error(f"CustomSynteny: Script not found: {script_path}") | |
| results[f'{genome_a_key}_vs_{genome_b_key}'] = {'status': 'skipped', 'error': 'Script not found'} | |
| completed += 1 | |
| # Verify that BED and PEP files were created for all custom genomes | |
| missing_files = [] | |
| for genome_meta in genomes_list: | |
| genome_key = genome_meta.get('key') | |
| bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_key}.bed') | |
| pep_file = os.path.join(MCSCAN_RESULTS_DIR, 'pep_files', f'{genome_key}.pep') | |
| if not os.path.exists(bed_file): | |
| missing_files.append(f'{genome_key}.bed') | |
| if not os.path.exists(pep_file): | |
| missing_files.append(f'{genome_key}.pep') | |
| if missing_files: | |
| update_status('failed', 95, f'Missing output files: {missing_files}', | |
| results=results, genome_keys=genome_keys, completed_pairs=completed) | |
| return | |
| # Mark as complete | |
| update_status('completed', 100, f'Analysis complete. {completed} pairs processed.', | |
| results=results, genome_keys=genome_keys, completed_pairs=completed) | |
| # Clean up input files to save space (keep only metadata) | |
| # Results are already copied to bed_files/, i1_blocks/, last_filtered/ | |
| for genome_meta in genomes_list: | |
| genome_id = genome_meta.get('id') | |
| genome_dir = os.path.join(meta_dir, f'genome_{genome_id}') | |
| if os.path.exists(genome_dir): | |
| # Remove large input files, keep metadata.json | |
| for filename in ['input.gff3', 'input.pep', 'process.log']: | |
| file_path = os.path.join(genome_dir, filename) | |
| if os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| error_traceback = traceback.format_exc() | |
| logger.error(f"CustomSynteny: Error in background job {run_key}: {e}") | |
| logger.error(f"CustomSynteny: Traceback: {error_traceback}") | |
| update_status('failed', 0, f'Error: {str(e)}', error_traceback=error_traceback[-2000:]) | |
| finally: | |
| # Always release the job slot | |
| release_job_slot(run_key) | |
| # Clean up temp directories used during processing | |
| # These are created for each custom genome: CUSTOM_TEMP_DIR/{run_key}_{genome_key} | |
| try: | |
| if os.path.exists(CUSTOM_TEMP_DIR): | |
| for dirname in os.listdir(CUSTOM_TEMP_DIR): | |
| if dirname.startswith(f'{run_key}_'): | |
| temp_dir_path = os.path.join(CUSTOM_TEMP_DIR, dirname) | |
| try: | |
| shutil.rmtree(temp_dir_path) | |
| logger.info(f"CustomSynteny: Cleaned up temp directory: {dirname}") | |
| except Exception as e: | |
| logger.warning(f"CustomSynteny: Failed to clean temp dir {dirname}: {e}") | |
| except Exception as e: | |
| logger.warning(f"CustomSynteny: Error cleaning temp directories: {e}") | |
| # Record analytics based on actual job outcome | |
| if ANALYTICS_AVAILABLE: | |
| try: | |
| final_status = 'failure' # Default to failure | |
| if os.path.exists(status_file): | |
| with open(status_file, 'r') as f: | |
| status_data = json.load(f) | |
| if status_data.get('status') == 'completed': | |
| final_status = 'success' | |
| record_event( | |
| feature_type=FEATURE_CUSTOM_SYNTENY, | |
| query_genome=run_key, | |
| status=final_status, | |
| duration_ms=0, # Duration not tracked for background jobs | |
| extra_data={'pairs_count': len(selected_pairs)} | |
| ) | |
| except Exception as e: | |
| logger.warning(f"CustomSynteny Analytics: Failed to record event for {run_key}: {e}") | |
| # Stale job detection timeout (in seconds) - mark job as stale if no update for 30 minutes | |
| CUSTOM_SYNTENY_STALE_TIMEOUT = 30 * 60 | |
| def api_custom_synteny_status(run_key): | |
| """Get status of a custom synteny MCscan job""" | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| if not os.path.exists(meta_dir): | |
| return jsonify({'success': False, 'error': f'Run key not found: {run_key}'}) | |
| status_file = os.path.join(meta_dir, 'job_status.json') | |
| if not os.path.exists(status_file): | |
| return jsonify({ | |
| 'success': True, | |
| 'data': { | |
| 'status': 'unknown', | |
| 'message': 'Job status not available' | |
| } | |
| }) | |
| with open(status_file, 'r') as f: | |
| status_data = json.load(f) | |
| # Transform internal error codes to user-friendly messages | |
| if status_data.get('status') == 'failed' and 'message' in status_data: | |
| status_data['message'] = extract_user_error_from_log(status_data['message'], run_key) | |
| # Include manifest data when status is completed (needed for plotting transition) | |
| if status_data.get('status') == 'completed': | |
| manifest_file = os.path.join(meta_dir, 'manifest.json') | |
| if os.path.exists(manifest_file): | |
| with open(manifest_file, 'r') as f: | |
| status_data['manifest'] = json.load(f) | |
| return jsonify({'success': True, 'data': status_data}) | |
| def api_custom_synteny_lookup(run_key): | |
| """Look up details about a custom synteny project""" | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| if not os.path.exists(meta_dir): | |
| return jsonify({'success': False, 'error': f'Run key not found: {run_key}'}) | |
| manifest_path = os.path.join(meta_dir, 'manifest.json') | |
| status_path = os.path.join(meta_dir, 'job_status.json') | |
| result = { | |
| 'run_key': run_key, | |
| 'exists': True | |
| } | |
| if os.path.exists(manifest_path): | |
| with open(manifest_path, 'r') as f: | |
| result['manifest'] = json.load(f) | |
| if os.path.exists(status_path): | |
| with open(status_path, 'r') as f: | |
| result['status'] = json.load(f) | |
| return jsonify({'success': True, 'data': result}) | |
| def api_custom_synteny_projects(): | |
| """List all available custom synteny projects""" | |
| projects = [] | |
| if os.path.exists(CUSTOM_SYNTENY_META_DIR): | |
| for run_key in os.listdir(CUSTOM_SYNTENY_META_DIR): | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| if not os.path.isdir(meta_dir): | |
| continue | |
| manifest_path = os.path.join(meta_dir, 'manifest.json') | |
| status_path = os.path.join(meta_dir, 'job_status.json') | |
| project_info = {'run_key': run_key} | |
| if os.path.exists(manifest_path): | |
| with open(manifest_path, 'r') as f: | |
| project_info['manifest'] = json.load(f) | |
| if os.path.exists(status_path): | |
| with open(status_path, 'r') as f: | |
| status_data = json.load(f) | |
| project_info['status'] = status_data.get('status', 'unknown') | |
| projects.append(project_info) | |
| # Sort by creation date (newest first) | |
| projects.sort(key=lambda p: p.get('manifest', {}).get('created_at', ''), reverse=True) | |
| return jsonify({'success': True, 'projects': projects}) | |
| def api_custom_synteny_delete(run_key): | |
| """Delete a custom synteny project""" | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| if not os.path.exists(meta_dir): | |
| return jsonify({'success': False, 'error': f'Run key not found: {run_key}'}) | |
| try: | |
| # Load manifest to get genome keys for cleanup | |
| manifest_path = os.path.join(meta_dir, 'manifest.json') | |
| if os.path.exists(manifest_path): | |
| with open(manifest_path, 'r') as f: | |
| manifest = json.load(f) | |
| # Remove generated BED and PEP files | |
| for idx, key in manifest.get('genome_keys', {}).items(): | |
| bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{key}.bed') | |
| pep_file = os.path.join(MCSCAN_RESULTS_DIR, 'pep_files', f'{key}.pep') | |
| if os.path.exists(bed_file): | |
| os.remove(bed_file) | |
| if os.path.exists(pep_file): | |
| os.remove(pep_file) | |
| # Remove the project directory | |
| shutil.rmtree(meta_dir) | |
| return jsonify({'success': True, 'message': f'Project {run_key} deleted successfully'}) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_custom_synteny_plot(): | |
| """Generate microsynteny plot for custom synteny project - using same approach as Custom Genome""" | |
| data = request.json | |
| run_key = data.get('run_key') | |
| query_genome = data.get('query_genome') | |
| genes = data.get('genes', []) | |
| colors = data.get('colors', []) | |
| annotations = data.get('annotations', []) | |
| comparisons = data.get('comparisons', []) | |
| layout = data.get('layout') | |
| genome_order = data.get('genome_order', []) | |
| # Advanced tweaking parameters (optional) | |
| padding_config = data.get('padding_config', {}) # {genome: {left: bp, right: bp}} | |
| max_genes_config = data.get('max_genes_config', {}) # {genome: {left: count, right: count}} | |
| display_names = data.get('display_names', {}) # {genome: "Custom Name"} | |
| # Gene labels parameters (optional) | |
| gene_labels = data.get('gene_labels', []) # List of gene IDs to label on the plot | |
| gene_label_size = data.get('gene_label_size', 0) # Font size for labels (0=disabled, 2-8 recommended) | |
| # Debug logging for tweaking parameters | |
| logger.debug(f"api_custom_synteny_plot received:") | |
| logger.debug(f" padding_config: {padding_config}") | |
| logger.debug(f" max_genes_config: {max_genes_config}") | |
| logger.debug(f" display_names: {display_names}") | |
| logger.debug(f" gene_labels: {gene_labels}") | |
| logger.debug(f" gene_label_size: {gene_label_size}") | |
| if not run_key: | |
| return jsonify({'success': False, 'error': 'run_key is required'}) | |
| if not query_genome: | |
| return jsonify({'success': False, 'error': 'query_genome is required'}) | |
| if not genes: | |
| return jsonify({'success': False, 'error': 'At least one gene is required'}) | |
| if not comparisons: | |
| return jsonify({'success': False, 'error': 'At least one comparison genome is required'}) | |
| # Validate and sanitize gene IDs (security measure) | |
| validated_genes, error_msg = sanitize_gene_ids(genes) | |
| if error_msg: | |
| return jsonify({'success': False, 'error': error_msg}) | |
| genes = validated_genes | |
| # Also validate gene_labels if provided | |
| if gene_labels: | |
| validated_labels, label_error = sanitize_gene_ids(gene_labels) | |
| if label_error: | |
| return jsonify({'success': False, 'error': f'Gene labels: {label_error}'}) | |
| gene_labels = validated_labels | |
| # Look up the project | |
| meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key) | |
| if not os.path.exists(meta_dir): | |
| return jsonify({'success': False, 'error': f'Run key not found: {run_key}'}) | |
| # Get display name and MCscan params for query genome from manifest | |
| query_display_name = query_genome | |
| mcscan_params = None | |
| manifest_file = os.path.join(meta_dir, 'manifest.json') | |
| if os.path.exists(manifest_file): | |
| try: | |
| with open(manifest_file, 'r') as f: | |
| manifest = json.load(f) | |
| # Look for the query genome in uploaded_genomes | |
| if manifest.get('uploaded_genomes'): | |
| for genome_info in manifest['uploaded_genomes']: | |
| if genome_info.get('genome_id') == query_genome: | |
| query_display_name = genome_info.get('display_name', query_genome) | |
| break | |
| # Read MCscan parameters from manifest for Method row in CSV | |
| if manifest.get('mcscan_params'): | |
| mcscan_params = manifest['mcscan_params'] | |
| except Exception as e: | |
| logger.warning(f"Could not read manifest: {e}") | |
| try: | |
| # Use the SAME Python script as Custom Genome (plot_user_genes_microsynteny_v2.py) | |
| script_path = os.path.join(SCRIPTS_DIR, 'plot_user_genes_microsynteny_v2.py') | |
| if not os.path.exists(script_path): | |
| return jsonify({'success': False, 'error': f'Plotting script not found: {script_path}'}) | |
| # Build args - use query_genome as the query genome, using PYTHON_BIN for HuggingFace Spaces | |
| args = [PYTHON_BIN, script_path] | |
| if colors: | |
| args.extend(['--colors', ','.join(colors)]) | |
| args.extend(['--query', query_genome, '--genes'] + genes + ['--comparisons'] + comparisons) | |
| if annotations: | |
| annotations_str = '|||'.join(str(a) for a in annotations) | |
| args.extend(['--annotations', annotations_str]) | |
| # Pass MCscan parameters for Method row in CSV (if available from manifest) | |
| if mcscan_params: | |
| args.extend(['--mcscan-params', json.dumps(mcscan_params)]) | |
| # Convert layout from list to comma-separated string (CRITICAL FIX) | |
| if layout and isinstance(layout, list) and len(layout) > 0: | |
| layout_str = ','.join(map(str, layout)) | |
| args.extend(['--layout', layout_str]) | |
| if genome_order and isinstance(genome_order, list): | |
| order_str = ','.join(genome_order) | |
| args.extend(['--genome-order', order_str]) | |
| # Add advanced tweaking parameters | |
| # Padding configuration for query genome (asymmetric) | |
| if query_genome in padding_config and padding_config[query_genome]: | |
| query_pad = padding_config[query_genome] | |
| if 'left' in query_pad and query_pad['left'] is not None: | |
| args.extend(['--query-padding-left', str(int(query_pad['left']))]) | |
| if 'right' in query_pad and query_pad['right'] is not None: | |
| args.extend(['--query-padding-right', str(int(query_pad['right']))]) | |
| # Max genes configuration for query genome (asymmetric) | |
| if query_genome in max_genes_config and max_genes_config[query_genome]: | |
| query_genes = max_genes_config[query_genome] | |
| if 'left' in query_genes and query_genes['left'] is not None: | |
| args.extend(['--query-max-genes-left', str(int(query_genes['left']))]) | |
| if 'right' in query_genes and query_genes['right'] is not None: | |
| args.extend(['--query-max-genes-right', str(int(query_genes['right']))]) | |
| # Comparison genomes padding (format: genome:left:right|genome2:left:right) | |
| comp_padding_parts = [] | |
| for comp in comparisons: | |
| if comp in padding_config and padding_config[comp]: | |
| comp_pad = padding_config[comp] | |
| left_val = int(comp_pad.get('left', 1500000)) | |
| right_val = int(comp_pad.get('right', 1500000)) | |
| comp_padding_parts.append(f"{comp}:{left_val}:{right_val}") | |
| if comp_padding_parts: | |
| comp_padding_str = '|'.join(comp_padding_parts) | |
| args.extend(['--comp-padding-config', comp_padding_str]) | |
| # Comparison genomes max genes (format: genome:left:right|genome2:left:right) | |
| comp_genes_parts = [] | |
| for comp in comparisons: | |
| if comp in max_genes_config and max_genes_config[comp]: | |
| comp_genes = max_genes_config[comp] | |
| left_val = int(comp_genes.get('left', 50)) | |
| right_val = int(comp_genes.get('right', 50)) | |
| comp_genes_parts.append(f"{comp}:{left_val}:{right_val}") | |
| if comp_genes_parts: | |
| comp_genes_str = '|'.join(comp_genes_parts) | |
| args.extend(['--comp-max-genes-config', comp_genes_str]) | |
| # Custom display names (format: genome:CustomName|genome2:Name2) | |
| if display_names: | |
| display_parts = [] | |
| for genome, name in display_names.items(): | |
| if name and name.strip(): | |
| # Escape special characters in display name | |
| safe_name = name.replace('|', '_').replace(':', '_') | |
| display_parts.append(f"{genome}:{safe_name}") | |
| if display_parts: | |
| display_str = '|'.join(display_parts) | |
| args.extend(['--display-names', display_str]) | |
| # Gene labels (list of gene IDs to display labels for on the plot) | |
| if gene_labels and isinstance(gene_labels, list) and len(gene_labels) > 0: | |
| gene_labels_str = ','.join(str(g) for g in gene_labels if g) | |
| if gene_labels_str: | |
| args.extend(['--genelabels', gene_labels_str]) | |
| # Add label size (default to 8 if labels are provided but size not specified) | |
| label_size = int(gene_label_size) if gene_label_size else 8 | |
| if label_size > 0: | |
| args.extend(['--genelabelsize', str(label_size)]) | |
| # Keep low-confidence coloring option (optional - colors all syntenic matches) | |
| keep_lowconf_color = data.get('keep_lowconf_color', False) | |
| if keep_lowconf_color: | |
| args.extend(['--keep-lowconf-color']) | |
| # Debug: print final command | |
| logger.debug(f"api_custom_synteny_plot final command args: {' '.join(args)}") | |
| # Set environment - files are in standard Mcscan_results folder | |
| env = os.environ.copy() | |
| env['PYTHON'] = PYTHON_BIN | |
| # Mark this as a custom genome for the script to handle appropriately | |
| env['CUSTOM_GENOME_KEY'] = query_genome | |
| # Pass the display name for plot labels | |
| env['CUSTOM_DISPLAY_NAME'] = query_display_name | |
| result = subprocess.run( | |
| args, | |
| capture_output=True, | |
| text=True, | |
| env=env, | |
| cwd=SCRIPTS_DIR | |
| ) | |
| if result.returncode == 0: | |
| # Find the output folder | |
| # Priority 1: Check if query_genome has a display name in tweaking config | |
| # Priority 2: Use query_display_name from manifest | |
| folder_display_name = display_names.get(query_genome) if display_names and query_genome in display_names else query_display_name | |
| # Escape special characters to match what was passed to script | |
| folder_display_name = folder_display_name.replace('|', '_').replace(':', '_') | |
| comp_str = '_'.join(comparisons) | |
| pattern_prefix = f"{folder_display_name}_usergenes_{comp_str}_" | |
| logger.debug(f"Looking for custom synteny output folder with prefix: {pattern_prefix}") | |
| output_folder = None | |
| if os.path.exists(OUTPUT_DIR): | |
| import re | |
| folders = [] | |
| for f in os.listdir(OUTPUT_DIR): | |
| if f.startswith(pattern_prefix): | |
| remainder = f[len(pattern_prefix):] | |
| if re.match(r'^\d{8}_\d{6}$', remainder): | |
| folders.append(f) | |
| if folders: | |
| folders.sort(reverse=True) | |
| output_folder = os.path.join(OUTPUT_DIR, folders[0]) | |
| if output_folder and os.path.exists(output_folder): | |
| files = {} | |
| for fname in os.listdir(output_folder): | |
| if fname == 'microsynteny_plot.png': | |
| files['png'] = fname | |
| elif fname == 'microsynteny_plot.svg': | |
| files['svg'] = fname | |
| elif fname.endswith('.csv'): | |
| files['csv'] = fname | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'Plot generated successfully', | |
| 'output_folder': os.path.basename(output_folder), | |
| 'files': files | |
| }) | |
| else: | |
| # Log details for debugging (not shown to user) | |
| if app.debug: | |
| logger.debug(f"Custom synteny output folder not found. stdout: {result.stdout}") | |
| logger.debug(f"stderr: {result.stderr}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Output folder not found after plot generation' | |
| }) | |
| else: | |
| error_msg = 'Script failed' | |
| combined = result.stdout + '\n' + result.stderr | |
| if 'No syntenic matches' in combined: | |
| error_msg = 'No syntenic matches found for the specified genes' | |
| elif 'ERROR:' in combined: | |
| for line in combined.split('\n'): | |
| if 'ERROR:' in line: | |
| error_msg = line.split('ERROR:')[-1].strip() | |
| break | |
| # Log details for debugging (not shown to user) | |
| if app.debug: | |
| logger.debug(f"Custom synteny plot script failed. stdout: {result.stdout}") | |
| logger.debug(f"stderr: {result.stderr}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': error_msg | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| def api_custom_synteny_genes(run_key, genome_key): | |
| """Get list of genes from a custom synteny genome's BED file""" | |
| # Check if it's a database genome | |
| if genome_key in AVAILABLE_GENOMES: | |
| bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_key}.bed') | |
| else: | |
| bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_key}.bed') | |
| if not os.path.exists(bed_file): | |
| return jsonify({'success': False, 'error': f'BED file not found for genome: {genome_key}'}) | |
| genes = [] | |
| try: | |
| with open(bed_file, 'r') as f: | |
| for line in f: | |
| if line.strip(): | |
| parts = line.strip().split('\t') | |
| if len(parts) >= 4: | |
| genes.append({ | |
| 'chr': parts[0], | |
| 'start': int(parts[1]), | |
| 'end': int(parts[2]), | |
| 'gene_id': parts[3] | |
| }) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': str(e)}) | |
| return jsonify({'success': True, 'genes': genes, 'total': len(genes)}) | |
| # ============================================================================ | |
| # Run Application | |
| # ============================================================================ | |
| if __name__ == '__main__': | |
| # Get debug mode from environment variable (default: False for production safety) | |
| # Set FLASK_DEBUG=1 for local development, FLASK_DEBUG=0 for production | |
| debug_mode = os.environ.get('FLASK_DEBUG', '0') == '1' | |
| logger.info("Starting Plant-mSyn - Plant Microsynteny Web Application...") | |
| logger.info(f"Script directory: {SCRIPT_DIR}") | |
| logger.info(f"Annotations directory: {ANNOTATIONS_DIR}") | |
| logger.info(f"Output directory: {OUTPUT_DIR}") | |
| logger.info(f"Debug mode: {debug_mode}") | |
| if debug_mode: | |
| logger.warning("Debug mode is ENABLED - do not use in production!") | |
| # Start auto-cleanup scheduler | |
| start_cleanup_scheduler() | |
| # Start analytics weekly email scheduler | |
| if ANALYTICS_AVAILABLE: | |
| analytics_email = os.environ.get('ANALYTICS_EMAIL', '') | |
| if analytics_email: | |
| start_analytics_scheduler(analytics_email) | |
| else: | |
| logger.info("ANALYTICS_EMAIL not set - weekly reports disabled") | |
| # Use port 7860 for HuggingFace Spaces, fallback to 5000 for local dev | |
| port = int(os.environ.get('PORT', 7860)) | |
| logger.info(f"Open http://127.0.0.1:{port} in your browser") | |
| logger.info("Press Ctrl+C to stop") | |
| app.run(debug=debug_mode, host='0.0.0.0', port=port) | |