plant-msyn / app.py
Yoshigold's picture
Update webapp scripts with path fixes and improvements
1aadd9b verified
#!/usr/bin/env python3
"""
Plant-mSyn - Plant Microsynteny Web Application (Hugging Face Edition)
Flask backend for serving the microsynteny plotting interface
This version is adapted for Hugging Face Spaces deployment.
Data is loaded from a separate Hugging Face Dataset repository.
"""
import os
import csv
import subprocess
import tempfile
import shutil
import json
import uuid
import threading
import time
import sys
import random
import string
from datetime import datetime, timedelta
from collections import defaultdict
from flask import Flask, jsonify, request, send_file, render_template
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from werkzeug.utils import secure_filename
# Hugging Face Hub for dataset access (optional - for downloading data)
try:
from huggingface_hub import snapshot_download, hf_hub_download
HF_HUB_AVAILABLE = True
except ImportError:
HF_HUB_AVAILABLE = False
# Load environment variables from .env file (for local development)
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass # python-dotenv not installed, use system env vars only
# ============================================================================
# Hugging Face Configuration
# ============================================================================
# Dataset repository ID (change this to your actual dataset repo)
HF_DATASET_REPO = os.environ.get('HF_DATASET_REPO', 'Yoshigold/plant-msyn-data')
# Base directories - adapted for Hugging Face Spaces
# On HF Spaces, app.py is at /app/app.py, so APP_DIR is /app
# In local dev, app.py is at huggingface/webapp/app.py
APP_DIR = os.path.dirname(os.path.abspath(__file__))
# PROJECT_DIR is the parent of webapp/ (huggingface/ folder)
# This is needed so that path_config.py in Scripts/ computes paths consistently
# In HF Spaces, everything is in /app, so parent is also fine
PROJECT_DIR = os.path.dirname(APP_DIR) if os.path.basename(APP_DIR) == 'webapp' else APP_DIR
BASE_DIR = PROJECT_DIR
# Data directory - where dataset files will be loaded from
# IMPORTANT: Data is in 'data/' folder which is a sibling of 'webapp/', not inside it
# Structure: huggingface/data/ and huggingface/webapp/ and huggingface/Scripts/
# On HF Spaces, data is at /app/data (when everything is flattened into /app)
def _compute_data_dir():
"""Compute DATA_DIR with environment variable override."""
env_data_dir = os.environ.get('PLANTMSYN_DATA_DIR')
if env_data_dir:
return env_data_dir
# Check if data folder exists as sibling (local dev structure)
sibling_data = os.path.join(PROJECT_DIR, 'data')
if os.path.isdir(sibling_data):
return sibling_data
# Fallback to /app/data for HF Spaces
return os.path.join(APP_DIR, 'data')
DATA_DIR = _compute_data_dir()
# Set environment variable so scripts can find data directory
os.environ['PLANTMSYN_DATA_DIR'] = DATA_DIR
os.environ['PLANTMSYN_PROJECT_DIR'] = PROJECT_DIR
def initialize_data():
"""Download dataset from HuggingFace Hub if not already present."""
# Check if mcscan_results exists (the key folder we need)
mcscan_dir = os.path.join(DATA_DIR, 'mcscan_results')
if HF_HUB_AVAILABLE and not os.path.exists(mcscan_dir):
print(f"Downloading dataset from {HF_DATASET_REPO} to {DATA_DIR}...")
try:
snapshot_download(
repo_id=HF_DATASET_REPO,
repo_type='dataset',
local_dir=DATA_DIR,
cache_dir=None, # Don't use cache, download directly
)
print(f"Dataset downloaded to {DATA_DIR}")
except Exception as e:
print(f"Failed to download dataset: {e}")
print("App will continue but may not have access to data files")
else:
print(f"Using existing data directory: {DATA_DIR}")
if os.path.exists(mcscan_dir):
print(f"MCscan results found at: {mcscan_dir}")
else:
print(f"WARNING: MCscan results NOT found at: {mcscan_dir}")
# Initialize data on startup
initialize_data()
# Scripts directory - on HF Spaces, scripts are in the same folder as app.py
SCRIPTS_PATH = APP_DIR
if SCRIPTS_PATH not in sys.path:
sys.path.insert(0, SCRIPTS_PATH)
from genome_config import (
GENOME_DISPLAY_NAMES,
SHORT_DISPLAY_NAMES,
EXAMPLE_GENE_IDS,
get_genome_display_name
)
# Import logger for proper logging (replaces print statements)
from logger import get_webapp_logger
logger = get_webapp_logger()
# Import centralized error message utilities
from error_messages import extract_user_error_from_log
# ============================================================================
# Analytics DISABLED for Hugging Face Spaces
# ============================================================================
# Analytics is disabled on HF Spaces as email reports won't work
# Usage metrics can be tracked via HF's built-in Space analytics instead
ANALYTICS_AVAILABLE = False
# Dummy record_event function for disabled analytics
def record_event(*args, **kwargs):
pass
# Feature constants (for compatibility with code that references them)
FEATURE_PLOT = 'plot'
FEATURE_PLOT_TWEAKS = 'plot_tweaks'
FEATURE_DISCOVERY = 'discovery'
FEATURE_ADVANCED_SEARCH = 'advanced_search'
FEATURE_CUSTOM_GENOME = 'custom_genome'
FEATURE_CUSTOM_SYNTENY = 'custom_synteny'
# Import SQL catalog helper for fast pre-filtering of searches
try:
from sql_catalog_helper import (
is_catalog_available,
get_target_genomes_for_genes,
prefilter_search_comparisons
)
SQL_CATALOG_AVAILABLE = True
except ImportError:
SQL_CATALOG_AVAILABLE = False
logger.warning("SQL catalog helper not available, searches will scan all files")
app = Flask(__name__)
CORS(app)
# Rate limiting - 5 uploads per IP per hour
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=[],
storage_uri="memory://"
)
# Custom error handler for rate limit exceeded
@app.errorhandler(429)
def ratelimit_handler(e):
return jsonify({
'success': False,
'error': 'You have exceeded the upload limit of 5 per hour. Please wait 1 hour before trying again.'
}), 429
# ============================================================================
# MCscan Job Queue System - Limit Concurrent Runs
# ============================================================================
# Semaphore to limit concurrent MCscan jobs (only 1 at a time)
MCSCAN_JOB_SEMAPHORE = threading.Semaphore(1)
# Track queued jobs for status reporting
_job_queue_status = {
'current_job': None, # run_key of currently running job
'queue_position': {}, # run_key -> queue position
'queue_lock': threading.Lock()
}
# IP-based run tracking (limit MCscan runs per IP per 3 hours)
_ip_run_tracker = {
'runs': defaultdict(list), # IP -> list of timestamps
'lock': threading.Lock()
}
MAX_RUNS_PER_IP = 5 # Maximum MCscan runs per IP per 3-hour window
RUN_LIMIT_WINDOW_HOURS = 3 # Time window for run limit
# Name length limits
MAX_NAME_LENGTH = 100 # Maximum characters for project names, run names, display names
def validate_name_length(name, field_name='Name'):
"""Validate that a name doesn't exceed the maximum length. Returns (valid, error_msg)."""
if name and len(name) > MAX_NAME_LENGTH:
return False, f'{field_name} must be {MAX_NAME_LENGTH} characters or less (currently {len(name)} characters)'
return True, None
def check_ip_run_limit(ip_address):
"""Check if IP has exceeded run limit. Returns (allowed, message)."""
with _ip_run_tracker['lock']:
now = datetime.now()
cutoff = now - timedelta(hours=RUN_LIMIT_WINDOW_HOURS)
# Clean old entries
_ip_run_tracker['runs'][ip_address] = [
ts for ts in _ip_run_tracker['runs'][ip_address]
if ts > cutoff
]
runs_in_window = len(_ip_run_tracker['runs'][ip_address])
if runs_in_window >= MAX_RUNS_PER_IP:
oldest = min(_ip_run_tracker['runs'][ip_address])
wait_time = oldest + timedelta(hours=RUN_LIMIT_WINDOW_HOURS) - now
hours = int(wait_time.total_seconds() // 3600)
minutes = int((wait_time.total_seconds() % 3600) // 60)
return False, f'You have reached the limit of {MAX_RUNS_PER_IP} MCscan runs per {RUN_LIMIT_WINDOW_HOURS} hours. Please wait {hours}h {minutes}m.'
return True, f'{MAX_RUNS_PER_IP - runs_in_window} runs remaining in current {RUN_LIMIT_WINDOW_HOURS}-hour window'
def record_ip_run(ip_address):
"""Record that an IP started a run."""
with _ip_run_tracker['lock']:
_ip_run_tracker['runs'][ip_address].append(datetime.now())
def get_queue_position(run_key):
"""Get current queue position for a job (0 = running, >0 = waiting)."""
with _job_queue_status['queue_lock']:
return _job_queue_status['queue_position'].get(run_key, -1)
def acquire_job_slot(run_key, timeout=None):
"""Try to acquire a slot to run MCscan. Returns True if acquired."""
# Update queue position
with _job_queue_status['queue_lock']:
if run_key not in _job_queue_status['queue_position']:
_job_queue_status['queue_position'][run_key] = len(_job_queue_status['queue_position'])
acquired = MCSCAN_JOB_SEMAPHORE.acquire(blocking=True, timeout=timeout)
if acquired:
with _job_queue_status['queue_lock']:
_job_queue_status['current_job'] = run_key
_job_queue_status['queue_position'][run_key] = 0
# Decrement position for all other waiting jobs
for key in list(_job_queue_status['queue_position'].keys()):
if key != run_key and _job_queue_status['queue_position'][key] > 0:
_job_queue_status['queue_position'][key] -= 1
return acquired
def release_job_slot(run_key):
"""Release the MCscan job slot."""
with _job_queue_status['queue_lock']:
if _job_queue_status['current_job'] == run_key:
_job_queue_status['current_job'] = None
if run_key in _job_queue_status['queue_position']:
del _job_queue_status['queue_position'][run_key]
MCSCAN_JOB_SEMAPHORE.release()
# ============================================================================
# Configuration (Hugging Face Spaces Adapted)
# ============================================================================
# Auto-cleanup settings
CUSTOM_GENOME_RETENTION_DAYS = 14 # Delete custom genomes after 14 days
OUTPUT_RETENTION_HOURS = 24 # Delete generated plots/tables after 24 hours
# Base directories - adapted for HF Spaces structure
# APP_DIR and BASE_DIR are defined above in HF Configuration section
# SCRIPT_DIR points to where scripts are located (Scripts/ folder)
# In local dev: huggingface/Scripts/, in HF Spaces: /app/ (flattened)
SCRIPT_DIR = os.path.join(PROJECT_DIR, 'Scripts') if os.path.isdir(os.path.join(PROJECT_DIR, 'Scripts')) else PROJECT_DIR
SCRIPTS_DIR = SCRIPT_DIR # Alias for compatibility
# Data paths - point to HF dataset folder structure
ANNOTATIONS_DIR = os.path.join(DATA_DIR, 'annotations') # Genome annotations folder
OUTPUT_DIR = os.path.join(APP_DIR, 'Microsynteny_plots') # Main output folder
# MCscan results directory - in data folder for HF
MCSCAN_RESULTS_DIR = os.path.join(DATA_DIR, 'mcscan_results')
CUSTOM_META_DIR = os.path.join(MCSCAN_RESULTS_DIR, 'custom_meta') # Custom genome metadata storage
CUSTOM_TEMP_DIR = os.path.join(tempfile.gettempdir(), 'plantmsyn_custom') # Temp dir for processing
# Upload configuration - File size limits
# These limits balance usability with protection against abuse
MAX_GFF3_SIZE = 1024 * 1024 * 1024 # 1 GB - GFF3 annotation files can be large
MAX_PEP_SIZE = 1024 * 1024 * 1024 # 1 GB - protein FASTA files (large genomes)
MAX_BED_SIZE = 200 * 1024 * 1024 # 200 MB - BED files (large genomes)
MAX_ANNOTATION_SIZE = 50 * 1024 * 1024 # 50 MB - custom annotation TSV files
ALLOWED_EXTENSIONS = {'gff3', 'gff', 'pep', 'fa', 'fasta', 'faa'}
# MCscan job timeout (5 hours max)
MCSCAN_JOB_TIMEOUT_SECONDS = 5 * 60 * 60 # 5 hours = 18000 seconds
# Python binary - configurable via environment variable for cloud deployment
# Falls back to current Python interpreter if not specified
PYTHON_BIN = os.environ.get('PYTHON_BIN', sys.executable)
# ============================================================================
# Input Validation / Sanitization
# ============================================================================
import re
# Regex for valid gene IDs: letters, numbers, underscores, dots, hyphens, colons
# Examples: AT1G01010, HORVU1Hr1G000010, Glyma.01G000100, LOC_Os01g01010
GENE_ID_PATTERN = re.compile(r'^[A-Za-z0-9_.\-:]+$')
MAX_GENE_ID_LENGTH = 100
MAX_GENES_PER_REQUEST = 50 # Limit number of genes in a single request
def is_valid_gene_id(gene_id):
"""Validate a gene ID contains only allowed characters."""
if not gene_id or not isinstance(gene_id, str):
return False
if len(gene_id) > MAX_GENE_ID_LENGTH:
return False
return bool(GENE_ID_PATTERN.match(gene_id))
def sanitize_gene_ids(genes):
"""Validate and sanitize a list of gene IDs.
Returns: (valid_genes, error_message)
- valid_genes: list of validated gene IDs (or None if error)
- error_message: string describing the error (or None if valid)
"""
if not genes:
return None, 'No genes provided'
if not isinstance(genes, list):
return None, 'Genes must be a list'
if len(genes) > MAX_GENES_PER_REQUEST:
return None, f'Too many genes ({len(genes)}). Maximum is {MAX_GENES_PER_REQUEST}'
invalid_genes = []
valid_genes = []
for gene in genes:
gene_str = str(gene).strip()
if is_valid_gene_id(gene_str):
valid_genes.append(gene_str)
else:
invalid_genes.append(gene_str[:50]) # Truncate for safety in error message
if invalid_genes:
sample = ', '.join(invalid_genes[:3])
if len(invalid_genes) > 3:
sample += f' (and {len(invalid_genes) - 3} more)'
return None, f'Invalid gene ID format: {sample}. Use only letters, numbers, underscores, dots, hyphens, colons.'
return valid_genes, None
# ============================================================================
# Auto-Discovery of Available Genomes
# ============================================================================
def discover_available_genomes():
"""Load database genomes from whitelist file.
Uses database_genomes.txt in bed_files/ folder to determine which genomes
are official database genomes (vs user-uploaded custom genomes).
This prevents user uploads from appearing in the public genome dropdown.
To add a new database genome, add its name to database_genomes.txt
"""
bed_files_dir = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files')
whitelist_file = os.path.join(bed_files_dir, 'database_genomes.txt')
genomes = []
if os.path.exists(whitelist_file):
# Use whitelist if it exists
try:
with open(whitelist_file, 'r') as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if line and not line.startswith('#'):
# Verify the bed file actually exists
bed_file = os.path.join(bed_files_dir, f'{line}.bed')
if os.path.exists(bed_file):
genomes.append(line)
else:
logger.warning(f"Genome in whitelist but no bed file found: {line}")
except Exception as e:
logger.error(f"Failed to read database_genomes.txt: {e}")
# Fallback to scanning all bed files if whitelist read fails
genomes = _scan_all_bed_files(bed_files_dir)
else:
# Fallback: scan all bed files (legacy behavior)
logger.warning("database_genomes.txt not found, falling back to scanning all bed files")
genomes = _scan_all_bed_files(bed_files_dir)
return sorted(genomes)
def _scan_all_bed_files(bed_files_dir):
"""Fallback function to scan all bed files in directory"""
genomes = []
if os.path.exists(bed_files_dir):
for f in os.listdir(bed_files_dir):
if f.endswith('.bed'):
genome_name = f[:-4] # Remove .bed extension
genomes.append(genome_name)
return genomes
# GENOME_DISPLAY_NAMES, SHORT_DISPLAY_NAMES, EXAMPLE_GENE_IDS, and helper functions
# are now imported from genome_config.py in the Scripts folder
# Auto-discover available genomes from bed_files folder
AVAILABLE_GENOMES = discover_available_genomes()
# Cache for genome annotations
_annotation_cache = {}
# ============================================================================
# Auto-Cleanup Scheduler for Custom Genomes
# ============================================================================
def cleanup_mcscan_results_files(manifest):
"""Delete custom genome files from main Mcscan_results folder based on manifest"""
files_deleted = 0
mcscan_files = manifest.get('mcscan_results_files', [])
for relative_path in mcscan_files:
full_path = os.path.join(MCSCAN_RESULTS_DIR, relative_path)
if os.path.exists(full_path):
try:
os.remove(full_path)
files_deleted += 1
logger.info(f"Cleanup: Deleted Mcscan file: {relative_path}")
except Exception as e:
logger.error(f"Cleanup: Failed to delete {relative_path}: {e}")
return files_deleted
def cleanup_old_custom_genomes():
"""Delete custom genome metadata older than CUSTOM_GENOME_RETENTION_DAYS
Also cleans up associated files in main Mcscan_results folder"""
if not os.path.exists(CUSTOM_META_DIR):
return 0
deleted_count = 0
files_deleted = 0
cutoff_time = datetime.now() - timedelta(days=CUSTOM_GENOME_RETENTION_DAYS)
for run_key in os.listdir(CUSTOM_META_DIR):
run_dir = os.path.join(CUSTOM_META_DIR, run_key)
if not os.path.isdir(run_dir):
continue
should_delete = False
manifest_file = os.path.join(run_dir, 'manifest.json')
manifest = {}
# Check manifest for created_at date
if os.path.exists(manifest_file):
try:
with open(manifest_file, 'r') as f:
manifest = json.load(f)
created_at = manifest.get('created_at', '')
if created_at:
# Parse ISO format datetime
created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
if created_dt.replace(tzinfo=None) < cutoff_time:
should_delete = True
except:
pass
# Fallback to directory modification time
if not should_delete:
dir_mtime = datetime.fromtimestamp(os.path.getmtime(run_dir))
if dir_mtime < cutoff_time:
should_delete = True
if should_delete:
# First, clean up files in main Mcscan_results folder
files_deleted += cleanup_mcscan_results_files(manifest)
# Then delete the custom genome metadata folder
try:
shutil.rmtree(run_dir)
deleted_count += 1
logger.info(f"Cleanup: Deleted expired custom genome: {run_key}")
except Exception as e:
logger.error(f"Cleanup: Failed to delete {run_key}: {e}")
if files_deleted > 0:
logger.info(f"Cleanup: Also removed {files_deleted} files from Mcscan_results")
return deleted_count
def cleanup_old_custom_synteny():
"""Delete custom synteny project metadata older than CUSTOM_GENOME_RETENTION_DAYS
Also cleans up associated genome entries in custom_meta"""
# Import here to avoid circular reference at module load time
synteny_meta_dir = os.path.join(MCSCAN_RESULTS_DIR, 'custom_synteny_meta')
if not os.path.exists(synteny_meta_dir):
return 0
deleted_count = 0
cutoff_time = datetime.now() - timedelta(days=CUSTOM_GENOME_RETENTION_DAYS)
for run_key in os.listdir(synteny_meta_dir):
run_dir = os.path.join(synteny_meta_dir, run_key)
if not os.path.isdir(run_dir):
continue
should_delete = False
manifest_file = os.path.join(run_dir, 'manifest.json')
manifest = {}
# Check manifest for created_at date
if os.path.exists(manifest_file):
try:
with open(manifest_file, 'r') as f:
manifest = json.load(f)
created_at = manifest.get('created_at', '')
if created_at:
created_dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
if created_dt.replace(tzinfo=None) < cutoff_time:
should_delete = True
except:
pass
# Fallback to directory modification time
if not should_delete:
dir_mtime = datetime.fromtimestamp(os.path.getmtime(run_dir))
if dir_mtime < cutoff_time:
should_delete = True
if should_delete:
# Clean up genome entries in custom_meta that were created for this project
genome_keys = manifest.get('genome_keys', {})
for genome_key in genome_keys.values():
genome_meta_dir = os.path.join(CUSTOM_META_DIR, genome_key)
if os.path.exists(genome_meta_dir):
# Check if this genome_meta has its own manifest with mcscan_results_files
genome_manifest_file = os.path.join(genome_meta_dir, 'manifest.json')
if os.path.exists(genome_manifest_file):
try:
with open(genome_manifest_file, 'r') as f:
genome_manifest = json.load(f)
cleanup_mcscan_results_files(genome_manifest)
except:
pass
try:
shutil.rmtree(genome_meta_dir)
logger.info(f"Cleanup: Deleted custom synteny genome: {genome_key}")
except Exception as e:
logger.error(f"Cleanup: Failed to delete genome {genome_key}: {e}")
# Delete the custom synteny project folder
try:
shutil.rmtree(run_dir)
deleted_count += 1
logger.info(f"Cleanup: Deleted expired custom synteny project: {run_key}")
except Exception as e:
logger.error(f"Cleanup: Failed to delete {run_key}: {e}")
return deleted_count
def cleanup_old_output_files():
"""Delete generated output folders (plots, tables) older than OUTPUT_RETENTION_HOURS.
These are the timestamped folders in OUTPUT_DIR containing PNG, SVG, and CSV files."""
if not os.path.exists(OUTPUT_DIR):
return 0
deleted_count = 0
cutoff_time = datetime.now() - timedelta(hours=OUTPUT_RETENTION_HOURS)
for folder_name in os.listdir(OUTPUT_DIR):
folder_path = os.path.join(OUTPUT_DIR, folder_name)
# Only process directories (output folders are timestamped directories)
if not os.path.isdir(folder_path):
continue
# Check folder modification time
try:
folder_mtime = datetime.fromtimestamp(os.path.getmtime(folder_path))
if folder_mtime < cutoff_time:
# Delete the entire output folder
shutil.rmtree(folder_path)
deleted_count += 1
logger.info(f"Cleanup: Deleted expired output folder: {folder_name}")
except Exception as e:
logger.error(f"Cleanup: Failed to delete output folder {folder_name}: {e}")
return deleted_count
def cleanup_old_temp_files():
"""Delete orphaned temp files older than OUTPUT_RETENTION_HOURS.
This cleans up:
- discovery_annotations_*.tsv files in system temp
- Orphaned directories in CUSTOM_TEMP_DIR
These temp files should normally be deleted after use, but this handles
cases where the server crashed or sessions were abandoned.
"""
deleted_count = 0
cutoff_time = datetime.now() - timedelta(hours=OUTPUT_RETENTION_HOURS)
# Clean up discovery annotation temp files in system temp directory
temp_dir = tempfile.gettempdir()
try:
for filename in os.listdir(temp_dir):
if filename.startswith('discovery_annotations_') and filename.endswith('.tsv'):
file_path = os.path.join(temp_dir, filename)
try:
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_time:
os.remove(file_path)
deleted_count += 1
logger.info(f"Cleanup: Deleted expired discovery annotation file: {filename}")
except Exception as e:
logger.error(f"Cleanup: Failed to delete temp file {filename}: {e}")
except Exception as e:
logger.error(f"Cleanup: Error scanning system temp directory: {e}")
# Clean up orphaned directories in CUSTOM_TEMP_DIR
if os.path.exists(CUSTOM_TEMP_DIR):
try:
for dirname in os.listdir(CUSTOM_TEMP_DIR):
dir_path = os.path.join(CUSTOM_TEMP_DIR, dirname)
if os.path.isdir(dir_path):
try:
dir_mtime = datetime.fromtimestamp(os.path.getmtime(dir_path))
if dir_mtime < cutoff_time:
shutil.rmtree(dir_path)
deleted_count += 1
logger.info(f"Cleanup: Deleted orphaned temp directory: {dirname}")
except Exception as e:
logger.error(f"Cleanup: Failed to delete temp dir {dirname}: {e}")
except Exception as e:
logger.error(f"Cleanup: Error scanning CUSTOM_TEMP_DIR: {e}")
return deleted_count
def start_cleanup_scheduler():
"""Start background thread that runs cleanup periodically.
Output files are cleaned hourly (24h retention), custom genomes daily (14d retention)."""
def cleanup_loop():
hourly_counter = 0
while True:
try:
# Always clean up output files (runs every hour)
deleted_outputs = cleanup_old_output_files()
if deleted_outputs > 0:
logger.info(f"Cleanup: Removed {deleted_outputs} expired output folder(s)")
# Also clean up orphaned temp files hourly
deleted_temp = cleanup_old_temp_files()
if deleted_temp > 0:
logger.info(f"Cleanup: Removed {deleted_temp} orphaned temp file(s)")
# Clean up custom genomes every 24 iterations (once per day)
hourly_counter += 1
if hourly_counter >= 24:
hourly_counter = 0
deleted = cleanup_old_custom_genomes()
if deleted > 0:
logger.info(f"Cleanup: Removed {deleted} expired custom genome(s)")
# Also cleanup custom synteny projects
deleted_synteny = cleanup_old_custom_synteny()
if deleted_synteny > 0:
logger.info(f"Cleanup: Removed {deleted_synteny} expired custom synteny project(s)")
except Exception as e:
logger.error(f"Cleanup: Error during cleanup: {e}")
# Sleep for 1 hour
time.sleep(3600)
thread = threading.Thread(target=cleanup_loop, daemon=True)
thread.start()
logger.info(f"Cleanup: Auto-cleanup scheduler started (outputs: {OUTPUT_RETENTION_HOURS}h, custom genomes: {CUSTOM_GENOME_RETENTION_DAYS} days)")
# ============================================================================
# Helper Functions
# ============================================================================
def load_genome_annotations(genome_name):
"""Load gene annotations for a genome from Annotations folder"""
if genome_name in _annotation_cache:
return _annotation_cache[genome_name]
annotation_file = os.path.join(ANNOTATIONS_DIR, genome_name, 'gene_annotation.tsv')
annotations = {}
if os.path.exists(annotation_file):
try:
with open(annotation_file, 'r') as f:
reader = csv.DictReader(f, delimiter='\t')
for row in reader:
gene_id = row.get('gene', '')
description = row.get('description', '')
if gene_id:
annotations[gene_id] = description
except Exception as e:
logger.error(f"Error loading annotations for {genome_name}: {e}")
_annotation_cache[genome_name] = annotations
return annotations
def get_gene_annotation(genome_name, gene_id):
"""Get annotation for a specific gene"""
annotations = load_genome_annotations(genome_name)
return annotations.get(gene_id, '')
def generate_layouts(n):
"""
Generate all valid layout configurations for n total genomes (including query).
Each layout is a list of integers representing genomes per row.
Query genome must be alone in one row (at least one row with value 1).
"""
if n < 2 or n > 8:
return []
all_layouts = []
max_rows = min(n, 5) # Maximum 5 rows
def generate_compositions(total, num_parts, current=[]):
"""Generate all ordered compositions of 'total' into 'num_parts' positive integers"""
if num_parts == 1:
all_layouts.append(current + [total])
return
for i in range(1, total - num_parts + 2):
generate_compositions(total - i, num_parts - 1, current + [i])
for r in range(1, max_rows + 1):
generate_compositions(n, r, [])
# Filter: only keep layouts that have at least one row with exactly 1 genome
# This ensures the query genome can be placed alone in its own row
valid_layouts = [layout for layout in all_layouts if 1 in layout]
return valid_layouts
def layout_to_string(layout):
"""Convert layout array to string representation (e.g., [2,3,1] -> '2-3-1')"""
return '-'.join(map(str, layout))
# ============================================================================
# API Routes
# ============================================================================
@app.route('/')
def index():
"""Serve the main application page"""
return render_template('index.html')
@app.route('/api/genomes')
def api_genomes():
"""Get list of available genomes"""
genomes = []
for genome in AVAILABLE_GENOMES:
# Format scientific name: arabidopsis_thaliana -> Arabidopsis thaliana
parts = genome.split('_')
scientific_name = parts[0].capitalize() + ' ' + ' '.join(parts[1:])
genomes.append({
'id': genome,
'name': get_genome_display_name(genome),
'scientific_name': scientific_name
})
return jsonify(genomes)
@app.route('/api/catalog/status')
def api_catalog_status():
"""Get status of SQL metadata catalog system"""
if not SQL_CATALOG_AVAILABLE:
return jsonify({
'available': False,
'message': 'SQL catalog helper not installed'
})
try:
from sql_catalog_helper import (
is_metadata_db_available,
get_all_available_genomes,
get_genome_catalog_stats
)
genomes_with_catalogs = get_all_available_genomes()
return jsonify({
'available': True,
'metadata_db_available': is_metadata_db_available(),
'genomes_with_catalogs': len(genomes_with_catalogs),
'catalog_genomes': genomes_with_catalogs
})
except Exception as e:
return jsonify({
'available': False,
'error': str(e)
})
@app.route('/api/catalog/genome/<genome>')
def api_catalog_genome_stats(genome):
"""Get catalog statistics for a specific genome"""
if not SQL_CATALOG_AVAILABLE:
return jsonify({'available': False})
try:
from sql_catalog_helper import get_genome_catalog_stats, is_catalog_available
if not is_catalog_available(genome):
return jsonify({
'available': False,
'genome': genome,
'message': f'No catalog available for {genome}'
})
stats = get_genome_catalog_stats(genome)
if stats:
stats['available'] = True
return jsonify(stats)
else:
return jsonify({'available': False, 'genome': genome})
except Exception as e:
return jsonify({'available': False, 'error': str(e)})
@app.route('/api/catalog/prefilter', methods=['POST'])
def api_catalog_prefilter():
"""Pre-filter comparisons using catalog to show which have matches"""
if not SQL_CATALOG_AVAILABLE:
return jsonify({'available': False})
data = request.json
query_genome = data.get('query_genome')
genes = data.get('genes', [])
comparisons = data.get('comparisons', [])
if not query_genome or not genes:
return jsonify({'success': False, 'error': 'Missing query_genome or genes'})
try:
filtered, diagnostics = prefilter_search_comparisons(
query_genome, genes, comparisons or [], min_genes_with_matches=1
)
# Also get per-gene target info
gene_targets = get_target_genomes_for_genes(query_genome, genes)
gene_target_counts = {g: len(targets) for g, targets in gene_targets.items()}
return jsonify({
'success': True,
'filtered_comparisons': filtered,
'diagnostics': diagnostics,
'gene_target_counts': gene_target_counts
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/annotation/<genome>/<gene_id>')
def api_annotation(genome, gene_id):
"""Get annotation for a specific gene"""
annotation = get_gene_annotation(genome, gene_id)
return jsonify({'annotation': annotation})
@app.route('/api/layouts/<int:n>')
def api_layouts(n):
"""Get valid layout configurations for n comparison genomes"""
layouts = generate_layouts(n)
result = []
for layout in layouts:
result.append({
'layout': layout,
'name': layout_to_string(layout),
'rows': len(layout),
'total': sum(layout)
})
return jsonify(result)
@app.route('/api/plot/usergenes', methods=['POST'])
def api_plot_usergenes():
"""Generate microsynteny plot for user-specified genes"""
start_time = time.time()
data = request.json
query_genome = data.get('query_genome')
genes = data.get('genes', [])
comparisons = data.get('comparisons', [])
colors = data.get('colors', [])
annotations = data.get('annotations', []) # User-provided annotations for legend
layout = data.get('layout', None) # Layout array e.g., [2, 3, 1]
genome_order = data.get('genome_order', None) # Full ordered list of genomes
query_position = data.get('query_position', 0) # Index of query genome in order
# Advanced tweaking parameters (optional)
padding_config = data.get('padding_config', {}) # {genome: {left: bp, right: bp}}
max_genes_config = data.get('max_genes_config', {}) # {genome: {left: count, right: count}}
display_names = data.get('display_names', {}) # {genome: "Custom Name"}
# Gene labels parameters (optional)
gene_labels = data.get('gene_labels', []) # List of gene IDs to label on the plot
gene_label_size = data.get('gene_label_size', 0) # Font size for labels (0=disabled, 2-8 recommended)
# Determine if this is a plot with tweaks
has_tweaks = bool(padding_config or max_genes_config or display_names or gene_labels)
feature_type = FEATURE_PLOT_TWEAKS if has_tweaks else FEATURE_PLOT
# Debug logging for tweaking parameters
logger.debug(f"api_plot_usergenes received:")
logger.debug(f" padding_config: {padding_config}")
logger.debug(f" max_genes_config: {max_genes_config}")
logger.debug(f" display_names: {display_names}")
logger.debug(f" gene_labels: {gene_labels}")
logger.debug(f" gene_label_size: {gene_label_size}")
# Helper to record analytics and return response
def _record_and_return(response, success=True):
if ANALYTICS_AVAILABLE:
duration_ms = int((time.time() - start_time) * 1000)
record_event(
feature_type=feature_type,
query_genome=query_genome,
status='success' if success else 'failure',
duration_ms=duration_ms,
request=request
)
return response
if not query_genome or not genes or not comparisons:
return _record_and_return(jsonify({'success': False, 'error': 'Missing required parameters'}), success=False)
# Validate and sanitize gene IDs (security measure)
validated_genes, error_msg = sanitize_gene_ids(genes)
if error_msg:
return _record_and_return(jsonify({'success': False, 'error': error_msg}), success=False)
genes = validated_genes
# Also validate gene_labels if provided
if gene_labels:
validated_labels, label_error = sanitize_gene_ids(gene_labels)
if label_error:
return _record_and_return(jsonify({'success': False, 'error': f'Gene labels: {label_error}'}), success=False)
gene_labels = validated_labels
# Build command arguments
script_path = os.path.join(SCRIPTS_DIR, 'plot_user_genes_microsynteny_v2.py')
if not os.path.exists(script_path):
return _record_and_return(jsonify({'success': False, 'error': f'Script not found: {script_path}'}), success=False)
# Base args - using PYTHON_BIN for HuggingFace Spaces
args = [PYTHON_BIN, script_path, '--query', query_genome, '--genes'] + genes + ['--comparisons'] + comparisons
# Add colors if provided
if colors:
args = [PYTHON_BIN, script_path, '--colors', ','.join(colors), '--query', query_genome, '--genes'] + genes + ['--comparisons'] + comparisons
# Add annotations if provided
if annotations:
# Encode annotations: join with ||| delimiter (unlikely to appear in annotation text)
annotations_str = '|||'.join(str(a) for a in annotations)
args.extend(['--annotations', annotations_str])
# Add layout and genome order if provided
if layout and isinstance(layout, list) and len(layout) > 0:
layout_str = ','.join(map(str, layout))
args.extend(['--layout', layout_str])
# Add genome order for proper placement
if genome_order and isinstance(genome_order, list):
order_str = ','.join(genome_order)
args.extend(['--genome-order', order_str])
# Add advanced tweaking parameters
# Padding configuration for query genome (asymmetric)
if query_genome in padding_config and padding_config[query_genome]:
query_pad = padding_config[query_genome]
if 'left' in query_pad and query_pad['left'] is not None:
args.extend(['--query-padding-left', str(int(query_pad['left']))])
if 'right' in query_pad and query_pad['right'] is not None:
args.extend(['--query-padding-right', str(int(query_pad['right']))])
# Max genes configuration for query genome (asymmetric)
if query_genome in max_genes_config and max_genes_config[query_genome]:
query_genes = max_genes_config[query_genome]
if 'left' in query_genes and query_genes['left'] is not None:
args.extend(['--query-max-genes-left', str(int(query_genes['left']))])
if 'right' in query_genes and query_genes['right'] is not None:
args.extend(['--query-max-genes-right', str(int(query_genes['right']))])
# Comparison genomes padding (format: genome:left:right|genome2:left:right)
comp_padding_parts = []
for comp in comparisons:
if comp in padding_config and padding_config[comp]:
comp_pad = padding_config[comp]
left_val = int(comp_pad.get('left', 1500000))
right_val = int(comp_pad.get('right', 1500000))
comp_padding_parts.append(f"{comp}:{left_val}:{right_val}")
if comp_padding_parts:
comp_padding_str = '|'.join(comp_padding_parts)
args.extend(['--comp-padding-config', comp_padding_str])
# Comparison genomes max genes (format: genome:left:right|genome2:left:right)
comp_genes_parts = []
for comp in comparisons:
if comp in max_genes_config and max_genes_config[comp]:
comp_genes = max_genes_config[comp]
left_val = int(comp_genes.get('left', 50))
right_val = int(comp_genes.get('right', 50))
comp_genes_parts.append(f"{comp}:{left_val}:{right_val}")
if comp_genes_parts:
comp_genes_str = '|'.join(comp_genes_parts)
args.extend(['--comp-max-genes-config', comp_genes_str])
# Custom display names (format: genome:CustomName|genome2:Name2)
if display_names:
display_parts = []
for genome, name in display_names.items():
if name and name.strip():
# Validate and truncate display name length
truncated_name = name.strip()[:MAX_NAME_LENGTH]
# Escape special characters in display name
safe_name = truncated_name.replace('|', '_').replace(':', '_')
display_parts.append(f"{genome}:{safe_name}")
if display_parts:
display_str = '|'.join(display_parts)
args.extend(['--display-names', display_str])
# Gene labels (list of gene IDs to display labels for on the plot)
if gene_labels and isinstance(gene_labels, list) and len(gene_labels) > 0:
gene_labels_str = ','.join(str(g) for g in gene_labels if g)
if gene_labels_str:
args.extend(['--genelabels', gene_labels_str])
# Add label size (default to 8 if labels are provided but size not specified)
label_size = int(gene_label_size) if gene_label_size else 8
if label_size > 0:
args.extend(['--genelabelsize', str(label_size)])
# Keep low-confidence coloring option (optional - colors all syntenic matches)
keep_lowconf_color = data.get('keep_lowconf_color', False)
if keep_lowconf_color:
args.extend(['--keep-lowconf-color'])
# Debug: print final command
logger.debug(f"Final command args: {' '.join(args)}")
try:
# Run the script
env = os.environ.copy()
env['PYTHON'] = PYTHON_BIN
result = subprocess.run(
args,
capture_output=True,
text=True,
env=env,
cwd=SCRIPTS_DIR
)
if result.returncode == 0:
# Find the output folder
# Try multiple naming conventions for folder matching
# The bash script uses short display names (e.g., "Goatgrass") for folder names
comp_str = '_'.join(comparisons)
# SHORT_DISPLAY_NAMES is imported from genome_config.py
# Build list of possible prefixes to search for
possible_prefixes = []
# First priority: custom display name if provided
if display_names and query_genome in display_names:
possible_prefixes.append(f"{display_names[query_genome]}_usergenes_{comp_str}_")
# Second: short display name (used by bash script for folder creation)
if query_genome in SHORT_DISPLAY_NAMES:
possible_prefixes.append(f"{SHORT_DISPLAY_NAMES[query_genome]}_usergenes_{comp_str}_")
# Third: genome ID (fallback when no display name is set)
possible_prefixes.append(f"{query_genome}_usergenes_{comp_str}_")
# Fourth: full display name from GENOME_DISPLAY_NAMES
if query_genome in GENOME_DISPLAY_NAMES:
possible_prefixes.append(f"{GENOME_DISPLAY_NAMES[query_genome]}_usergenes_{comp_str}_")
logger.debug(f"Looking for output folder with prefixes: {possible_prefixes}")
# Find most recent matching folder with exact comparison match
output_folder = None
if os.path.exists(OUTPUT_DIR):
import re
# Folders must match pattern_prefix followed by timestamp (digits)
folders = []
for pattern_prefix in possible_prefixes:
for f in os.listdir(OUTPUT_DIR):
if f.startswith(pattern_prefix):
# Check that after the prefix comes a timestamp (YYYYMMDD_HHMMSS)
remainder = f[len(pattern_prefix):]
if re.match(r'^\d{8}_\d{6}$', remainder):
folders.append(f)
if folders:
folders.sort(reverse=True)
output_folder = os.path.join(OUTPUT_DIR, folders[0])
if folders:
folders.sort(reverse=True)
output_folder = os.path.join(OUTPUT_DIR, folders[0])
if output_folder and os.path.exists(output_folder):
# List generated files
files = {}
for fname in os.listdir(output_folder):
if fname == 'microsynteny_plot.png':
files['png'] = fname
elif fname == 'microsynteny_plot.svg':
files['svg'] = fname
elif fname.endswith('.csv'):
files['csv'] = fname
return _record_and_return(jsonify({
'success': True,
'message': 'Plot generated successfully',
'output_folder': os.path.basename(output_folder),
'files': files
}), success=True)
else:
# Log details for debugging (not shown to user)
if app.debug:
logger.debug(f"Output folder not found. stdout: {result.stdout}")
logger.debug(f"stderr: {result.stderr}")
return _record_and_return(jsonify({
'success': False,
'error': 'Output folder not found after plot generation'
}), success=False)
else:
# Parse error message
error_msg = 'Script failed'
combined = result.stdout + '\n' + result.stderr
if 'No syntenic matches' in combined:
error_msg = 'No syntenic matches found for the provided genes'
elif 'Gap check failed' in combined:
error_msg = 'Input genes are too far apart (max 20 genes distance)'
elif 'ERROR:' in combined:
for line in combined.split('\n'):
if 'ERROR:' in line:
error_msg = line.split('ERROR:')[-1].strip()
break
# Log details for debugging (not shown to user)
if app.debug:
logger.debug(f"Plot script failed. stdout: {result.stdout}")
logger.debug(f"stderr: {result.stderr}")
return _record_and_return(jsonify({
'success': False,
'error': error_msg
}), success=False)
except Exception as e:
return _record_and_return(jsonify({'success': False, 'error': str(e)}), success=False)
@app.route('/api/download/<folder>/<filename>')
def api_download(folder, filename):
"""Download a generated file"""
file_path = os.path.join(OUTPUT_DIR, folder, filename)
if not os.path.exists(file_path):
return jsonify({'error': 'File not found'}), 404
# For CSV files, use clean download names without timestamps
download_name = filename
if filename.endswith('.csv'):
# gene_summary.csv should download as gene_summary.csv (not with timestamp)
download_name = 'gene_summary.csv'
return send_file(file_path, as_attachment=True, download_name=download_name)
@app.route('/api/image/<folder>/<filename>')
def api_image(folder, filename):
"""Serve an image file"""
file_path = os.path.join(OUTPUT_DIR, folder, filename)
if not os.path.exists(file_path):
return jsonify({'error': 'File not found'}), 404
return send_file(file_path)
@app.route('/api/batch-match', methods=['POST'])
def api_batch_match():
"""Run batch match summary for user genes"""
data = request.json
query_genome = data.get('query_genome')
genes = data.get('genes', [])
comparisons = data.get('comparisons', [])
if not query_genome or not genes or not comparisons:
return jsonify({'success': False, 'error': 'Missing required parameters'})
script_path = os.path.join(SCRIPTS_DIR, 'count_usergene_matches.py')
if not os.path.exists(script_path):
return jsonify({'success': False, 'error': 'Script not found'})
try:
# Using PYTHON_BIN for HuggingFace Spaces
args = [PYTHON_BIN, script_path, query_genome, ','.join(comparisons), ','.join(genes)]
result = subprocess.run(args, capture_output=True, text=True)
if result.returncode == 0:
# Parse TSV output
lines = result.stdout.strip().split('\n')
if len(lines) > 1:
headers = lines[0].split('\t')
rows = []
for line in lines[1:]:
values = line.split('\t')
if len(values) == len(headers):
rows.append(dict(zip(headers, values)))
return jsonify({'success': True, 'data': rows})
else:
return jsonify({'success': True, 'data': []})
else:
return jsonify({'success': False, 'error': result.stderr})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/search-hits', methods=['POST'])
def api_search_hits():
"""Search for high-confidence syntenic hits between query genes and comparison genomes"""
start_time = time.time()
data = request.json
query_genome = data.get('query_genome')
genes = data.get('genes', [])
comparisons = data.get('comparisons', [])
min_hits = data.get('min_hits', 1)
use_catalog = data.get('use_catalog', True) # Enable catalog pre-filtering by default
# Support both single required_gene (legacy) and multiple required_genes
required_genes = data.get('required_genes', [])
if not required_genes:
# Fallback to legacy single required_gene
single_gene = data.get('required_gene', '')
if single_gene:
required_genes = [single_gene]
# Helper to record analytics and return response
def _record_and_return(response, success=True):
if ANALYTICS_AVAILABLE:
duration_ms = int((time.time() - start_time) * 1000)
record_event(
feature_type=FEATURE_ADVANCED_SEARCH,
query_genome=query_genome,
status='success' if success else 'failure',
duration_ms=duration_ms,
request=request
)
return response
if not query_genome or not genes or not comparisons:
return _record_and_return(jsonify({'success': False, 'error': 'Missing required parameters'}), success=False)
# Validate and sanitize gene IDs (security measure)
validated_genes, error_msg = sanitize_gene_ids(genes)
if error_msg:
return _record_and_return(jsonify({'success': False, 'error': error_msg}), success=False)
genes = validated_genes
# Also validate required_genes if provided
if required_genes:
validated_required, req_error = sanitize_gene_ids(required_genes)
if req_error:
return _record_and_return(jsonify({'success': False, 'error': f'Required genes: {req_error}'}), success=False)
required_genes = validated_required
script_path = os.path.join(SCRIPTS_DIR, 'search_synteny_hits.py')
if not os.path.exists(script_path):
return _record_and_return(jsonify({'success': False, 'error': 'Search script not found'}), success=False)
# Pre-filter comparisons using SQL catalog if available
catalog_diagnostics = None
filtered_comparisons = comparisons
if use_catalog and SQL_CATALOG_AVAILABLE and is_catalog_available(query_genome):
filtered_comparisons, catalog_diagnostics = prefilter_search_comparisons(
query_genome, genes, comparisons, min_genes_with_matches=1
)
# If no comparisons have matches, return early
if not filtered_comparisons:
return _record_and_return(jsonify({
'success': True,
'data': {
'results': [],
'total_matches': 0,
'filter_message': 'No comparisons have matches for the specified genes',
'catalog_prefilter': catalog_diagnostics
}
}), success=True)
try:
# Build command arguments - using PYTHON_BIN for HuggingFace Spaces
args = [
PYTHON_BIN, script_path,
'--query', query_genome,
'--genes'] + genes + [
'--comparisons'] + filtered_comparisons + [
'--min-hits', str(min_hits),
'--format', 'json'
]
# Add required genes if specified (multiple)
if required_genes:
args.extend(['--required-genes'] + required_genes)
# Set up environment (custom genomes use same main Mcscan_results folder)
env = os.environ.copy()
result = subprocess.run(args, capture_output=True, text=True, env=env)
# Check for NO_RESULTS marker
if 'NO_RESULTS' in result.stdout:
# Extract filter failure reason from stderr
filter_msg = ''
for line in result.stderr.split('\n'):
if 'FILTER_FAILED' in line:
filter_msg = line.replace('FILTER_FAILED:', '').strip()
break
response_data = {
'results': [],
'total_matches': 0,
'filter_message': filter_msg
}
if catalog_diagnostics:
response_data['catalog_prefilter'] = catalog_diagnostics
return _record_and_return(jsonify({'success': True, 'data': response_data}), success=True)
if result.returncode == 0:
try:
output = result.stdout.strip()
# Parse JSON output
response_data = json.loads(output)
# Add catalog diagnostics to response
if catalog_diagnostics:
response_data['catalog_prefilter'] = catalog_diagnostics
return _record_and_return(jsonify({'success': True, 'data': response_data}), success=True)
except json.JSONDecodeError as e:
return _record_and_return(jsonify({'success': False, 'error': f'Failed to parse results: {str(e)}'}), success=False)
else:
return _record_and_return(jsonify({'success': False, 'error': result.stderr}), success=False)
except Exception as e:
return _record_and_return(jsonify({'success': False, 'error': str(e)}), success=False)
# ============================================================================
# Discovery API Routes
# ============================================================================
# Temporary storage for custom annotations during discovery sessions
_discovery_annotations = {}
@app.route('/api/discovery/annotations/<genome>')
def api_discovery_annotations(genome):
"""Get unique annotation terms for dropdown in Discovery page"""
annotations = load_genome_annotations(genome)
if not annotations:
return jsonify({'success': False, 'error': f'No annotations found for {genome}', 'terms': []})
# Count genes with actual annotations (non-empty descriptions)
annotated_genes = sum(1 for desc in annotations.values() if desc and desc.strip())
# Try to get total gene count from BED file
total_genes_in_genome = len(annotations) # Default to annotations count
bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome}.bed')
if os.path.exists(bed_file):
try:
with open(bed_file, 'r') as f:
total_genes_in_genome = sum(1 for line in f if line.strip() and not line.startswith('#'))
except:
pass
# Extract meaningful keywords from annotations
skip_words = {
'of', 'the', 'a', 'an', 'and', 'or', 'in', 'to', 'for', 'with', 'by', 'on', 'at', 'from',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can',
'not', 'no', 'nor', 'but', 'so', 'if', 'when', 'where', 'how', 'what', 'which', 'who', 'whom',
'this', 'that', 'these', 'those', 'it', 'its', 'as', 'than', 'such', 'like',
'protein', 'gene', 'family', 'domain', 'related', 'similar', 'homolog', 'putative',
'unnamed', 'unknown', 'hypothetical', 'uncharacterized', 'predicted',
'-', '//', '/', '|', 'pf', 'sf', 'pthr', 'kog', 'subfamily', 'superfamily'
}
term_counts = {}
for gene_id, description in annotations.items():
if not description:
continue
import re
clean_desc = re.sub(r'^\([^)]+\)\s*', '', description)
clean_desc = re.sub(r'\b[A-Z]{2,}\d+(?::\w+)?\b', '', clean_desc)
words = re.split(r'[\s\-_/\[\](),;:]+', clean_desc)
for word in words:
word = word.strip().lower()
if len(word) >= 4 and not word.isdigit() and word not in skip_words:
if word[0].isdigit():
continue
if word not in term_counts:
term_counts[word] = 0
term_counts[word] += 1
sorted_terms = sorted(term_counts.items(), key=lambda x: (-x[1], x[0]))
terms = [{'term': term.capitalize(), 'count': count} for term, count in sorted_terms[:500]]
return jsonify({
'success': True,
'genome': genome,
'total_genes': total_genes_in_genome,
'annotated_genes': annotated_genes,
'unique_terms': len(term_counts),
'terms': terms
})
@app.route('/api/discovery/upload-annotations', methods=['POST'])
def api_discovery_upload_annotations():
"""Upload custom annotations for discovery search"""
# Get genome ID from form data for validation
genome_id = request.form.get('genome', '').strip()
if 'file' in request.files:
# File upload
file = request.files['file']
if file.filename == '':
return jsonify({'success': False, 'error': 'No file selected'})
# Validate file size before saving
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Reset to beginning
if file_size > MAX_ANNOTATION_SIZE:
max_mb = MAX_ANNOTATION_SIZE // (1024 * 1024)
return jsonify({'success': False, 'error': f'Annotation file exceeds {max_mb} MB limit (uploaded: {file_size // (1024*1024)} MB)'})
# Save to temp location
session_id = str(uuid.uuid4())
temp_file = os.path.join(tempfile.gettempdir(), f'discovery_annotations_{session_id}.tsv')
file.save(temp_file)
# Parse and validate - auto-detect delimiter
annotations = {}
try:
with open(temp_file, 'r') as f:
# Read first line to detect delimiter
first_line = f.readline()
f.seek(0) # Reset to beginning
# Auto-detect delimiter: prefer tab, then comma
if '\t' in first_line:
delimiter = '\t'
elif ',' in first_line:
delimiter = ','
else:
delimiter = '\t' # Default to tab
reader = csv.reader(f, delimiter=delimiter)
for row in reader:
if len(row) >= 2:
gene_id = row[0].strip()
description = row[1].strip() if len(row) > 1 else ''
# Skip header rows (check if first column looks like a header)
if gene_id and gene_id.lower() not in ['gene', 'gene_id', 'geneid', 'id', 'name']:
annotations[gene_id] = description
except Exception as e:
os.remove(temp_file)
return jsonify({'success': False, 'error': f'Failed to parse file: {str(e)}'})
if not annotations:
os.remove(temp_file)
return jsonify({'success': False, 'error': 'No valid annotations found in file'})
# Validate gene overlap with selected genome (if genome provided)
matched_genes = 0
total_genome_genes = 0
if genome_id:
bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_id}.bed')
if os.path.exists(bed_file):
genome_genes = set()
try:
with open(bed_file, 'r') as f:
for line in f:
if line.strip() and not line.startswith('#'):
parts = line.strip().split('\t')
if len(parts) >= 4:
genome_genes.add(parts[3]) # Gene ID is column 4
total_genome_genes = len(genome_genes)
matched_genes = len(set(annotations.keys()) & genome_genes)
match_percentage = (matched_genes / len(annotations) * 100) if annotations else 0
# Require at least 20% of uploaded genes to match the genome
if match_percentage < 20:
os.remove(temp_file)
logger.warning(f"Annotation upload rejected: Only {matched_genes}/{len(annotations)} genes ({match_percentage:.1f}%) matched {genome_id}")
return jsonify({
'success': False,
'error': f'Annotation file does not match the selected genome. '
f'Only {matched_genes} of {len(annotations)} genes '
f'({match_percentage:.1f}%) were found in {genome_id}. '
f'At least 20% must match.'
})
except Exception as e:
logger.error(f"Error reading BED file for validation: {e}")
# Count genes with actual annotations (non-empty descriptions)
annotated_genes = sum(1 for desc in annotations.values() if desc and desc.strip())
# Store the annotation file path for this session
_discovery_annotations[session_id] = {
'file_path': temp_file,
'gene_count': len(annotations),
'created_at': datetime.now().isoformat()
}
# Extract keyword terms (same logic as database genomes)
skip_words = {
'of', 'the', 'a', 'an', 'and', 'or', 'in', 'to', 'for', 'with', 'by', 'on', 'at', 'from',
'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can',
'not', 'no', 'nor', 'but', 'so', 'if', 'when', 'where', 'how', 'what', 'which', 'who', 'whom',
'this', 'that', 'these', 'those', 'it', 'its', 'as', 'than', 'such', 'like',
'protein', 'gene', 'family', 'domain', 'related', 'similar', 'homolog', 'putative',
'unnamed', 'unknown', 'hypothetical', 'uncharacterized', 'predicted',
'-', '//', '/', '|', 'pf', 'sf', 'pthr', 'kog', 'subfamily', 'superfamily'
}
term_counts = {}
for gene_id, description in annotations.items():
if not description:
continue
import re
clean_desc = re.sub(r'^\([^)]+\)\s*', '', description)
clean_desc = re.sub(r'\b[A-Z]{2,}\d+(?::\w+)?\b', '', clean_desc)
words = re.split(r'[\s\-_/\[\](),;:]+', clean_desc)
for word in words:
word = word.strip().lower()
if len(word) >= 4 and not word.isdigit() and word not in skip_words:
if word[0].isdigit():
continue
if word not in term_counts:
term_counts[word] = 0
term_counts[word] += 1
sorted_terms = sorted(term_counts.items(), key=lambda x: (-x[1], x[0]))
terms = [{'term': term.capitalize(), 'count': count} for term, count in sorted_terms[:500]]
# Also extract full annotations for Annotation tab
annotation_counts = {}
for gene_id, description in annotations.items():
if description and description.strip():
display_desc = description[:200] + '...' if len(description) > 200 else description
if display_desc not in annotation_counts:
annotation_counts[display_desc] = 0
annotation_counts[display_desc] += 1
sorted_annotations = sorted(annotation_counts.items(), key=lambda x: -x[1])
full_annotations = [{'annotation': ann, 'count': count} for ann, count in sorted_annotations[:1000]]
# Also get gene names for Paralogous tab (no limit - users need access to all genes)
gene_names = [{'gene': gene_id, 'annotation': annotations.get(gene_id, '')[:100]}
for gene_id in sorted(annotations.keys())]
return jsonify({
'success': True,
'session_id': session_id,
'gene_count': len(annotations),
'annotated_genes': annotated_genes,
'matched_genes': matched_genes,
'total_genome_genes': total_genome_genes,
'unique_terms': len(term_counts),
'terms': terms,
'annotations': full_annotations,
'genes': gene_names
})
return jsonify({'success': False, 'error': 'No file provided'})
@app.route('/api/discovery/search', methods=['POST'])
def api_discovery_search():
"""Run discovery search to find syntenic blocks with specific annotations"""
start_time = time.time()
data = request.json
query_genome = data.get('query_genome')
comparisons = data.get('comparisons', [])
groups = data.get('groups', []) # New format: [{terms: [...], minMatch: N}, ...]
required_groups = data.get('required_groups', []) # Legacy: List of lists of terms
required_terms = data.get('required_terms', []) # Legacy: flat list of terms
optional_terms = data.get('optional_terms', []) # Optional terms to include in results
match_mode = data.get('match_mode', 'all') # 'all' (must match all genomes) or 'any' (match any genome)
search_type = data.get('search_type', 'term') # 'term', 'annotation', or 'gene'
annotation_session_id = data.get('annotation_session_id') # For custom annotations
# Helper to record analytics and return response
def _record_and_return(response, success=True):
if ANALYTICS_AVAILABLE:
duration_ms = int((time.time() - start_time) * 1000)
record_event(
feature_type=FEATURE_DISCOVERY,
query_genome=query_genome,
status='success' if success else 'failure',
duration_ms=duration_ms,
request=request
)
return response
if not query_genome:
return _record_and_return(jsonify({'success': False, 'error': 'Query genome is required'}), success=False)
if not comparisons:
return _record_and_return(jsonify({'success': False, 'error': 'At least one comparison genome is required'}), success=False)
# Convert new groups format to required_groups if provided
if groups and not required_groups:
required_groups = groups # Keep the new format with minMatch
# Convert legacy required_terms to required_groups (each term becomes its own group)
if required_terms and not required_groups:
required_groups = [{'terms': [term], 'minMatch': 1} for term in required_terms]
# Ensure backward compatibility - convert old [[...]] format to new format
if required_groups and isinstance(required_groups[0], list):
required_groups = [{'terms': group, 'minMatch': 1} for group in required_groups]
if not required_groups:
return _record_and_return(jsonify({'success': False, 'error': 'At least one search term is required'}), success=False)
script_path = os.path.join(SCRIPTS_DIR, 'discovery_search.py')
if not os.path.exists(script_path):
return _record_and_return(jsonify({'success': False, 'error': 'Discovery search script not found'}), success=False)
try:
# Build command - using PYTHON_BIN for HuggingFace Spaces
args = [
PYTHON_BIN, script_path,
'--query', query_genome,
'--comparisons'] + comparisons + [
'--search-type', search_type, # Pass search type to script
'--match-mode', match_mode, # 'all' or 'any'
'--format', 'json'
]
# Pass required_groups as JSON (new format with terms and minMatch)
if required_groups:
args.extend(['--required-groups', json.dumps(required_groups)])
# Pass optional terms as JSON
if optional_terms:
args.extend(['--optional-terms', json.dumps(optional_terms)])
# Add custom annotation file if provided
if annotation_session_id and annotation_session_id in _discovery_annotations:
annotation_file = _discovery_annotations[annotation_session_id]['file_path']
if os.path.exists(annotation_file):
args.extend(['--annotation-file', annotation_file])
result = subprocess.run(args, capture_output=True, text=True)
if result.returncode == 0:
try:
output = result.stdout.strip()
results = json.loads(output)
return _record_and_return(jsonify({'success': True, 'data': results}), success=True)
except json.JSONDecodeError as e:
return _record_and_return(jsonify({'success': False, 'error': f'Failed to parse results: {str(e)}', 'stdout': result.stdout, 'stderr': result.stderr}), success=False)
else:
return _record_and_return(jsonify({'success': False, 'error': result.stderr or 'Search failed', 'stdout': result.stdout}), success=False)
except Exception as e:
return _record_and_return(jsonify({'success': False, 'error': str(e)}), success=False)
@app.route('/api/discovery/check-genome-annotations/<genome>')
def api_discovery_check_genome_annotations(genome):
"""Check if a genome has annotations available"""
# Check database annotations
annotation_file = os.path.join(ANNOTATIONS_DIR, genome, 'gene_annotation.tsv')
has_db_annotations = os.path.exists(annotation_file)
gene_count = 0
if has_db_annotations:
annotations = load_genome_annotations(genome)
gene_count = len(annotations)
# Check if this is a custom genome
is_custom = genome not in AVAILABLE_GENOMES
return jsonify({
'success': True,
'genome': genome,
'has_database_annotations': has_db_annotations,
'is_custom_genome': is_custom,
'gene_count': gene_count,
'requires_custom_annotations': is_custom and not has_db_annotations
})
@app.route('/api/discovery/full-annotations/<genome>')
def api_discovery_full_annotations(genome):
"""Get full annotation descriptions for Annotation Search tab"""
annotations = load_genome_annotations(genome)
if not annotations:
return jsonify({'success': False, 'error': f'No annotations found for {genome}', 'annotations': []})
# Group by unique annotation descriptions and count occurrences
annotation_counts = {}
for gene_id, description in annotations.items():
if description and description.strip():
# Truncate very long annotations for display
display_desc = description[:200] + '...' if len(description) > 200 else description
if display_desc not in annotation_counts:
annotation_counts[display_desc] = 0
annotation_counts[display_desc] += 1
# Sort by count (most common first)
sorted_annotations = sorted(annotation_counts.items(), key=lambda x: -x[1])
result = [{'annotation': ann, 'count': count} for ann, count in sorted_annotations[:1000]]
return jsonify({
'success': True,
'genome': genome,
'total_annotations': len(annotation_counts),
'annotations': result
})
@app.route('/api/discovery/gene-names/<genome>')
def api_discovery_gene_names(genome):
"""Get gene names for Paralogous Search tab"""
# Load from BED file
bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome}.bed')
genes = []
if os.path.exists(bed_file):
try:
# Also load annotations to show alongside gene names
annotations = load_genome_annotations(genome)
with open(bed_file, 'r') as f:
for line in f:
if line.strip() and not line.startswith('#'):
parts = line.strip().split('\t')
if len(parts) >= 4:
gene_id = parts[3] # Gene ID is in 4th column
annotation = annotations.get(gene_id, '')
genes.append({
'gene': gene_id,
'annotation': annotation[:100] if annotation else ''
})
except Exception as e:
logger.error(f"Error loading BED file for {genome}: {e}")
if not genes:
return jsonify({'success': False, 'error': f'No genes found for {genome}', 'genes': []})
return jsonify({
'success': True,
'genome': genome,
'total_genes': len(genes),
'genes': genes
})
# ============================================================================
# Custom Genome API Routes
# ============================================================================
def allowed_file(filename):
"""Check if file has allowed extension"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def generate_private_run_key(project_name: str) -> str:
"""Generate a private run key: [project_name]_[5 random chars with at least one symbol]
This makes private keys more secure by adding randomness with special characters,
so the project name alone is not enough to access the results.
"""
# URL-safe symbols that work well in file paths
symbols = '!@#$'
alphanumeric = string.ascii_letters + string.digits
# Generate 5 characters: at least one symbol, rest alphanumeric
suffix_chars = []
# Add one guaranteed symbol at a random position
symbol_pos = random.randint(0, 4)
for i in range(5):
if i == symbol_pos:
suffix_chars.append(random.choice(symbols))
else:
suffix_chars.append(random.choice(alphanumeric))
suffix = ''.join(suffix_chars)
return f"{project_name}_{suffix}"
def run_mcscan_background(run_key, gff3_path, pep_path, genomes, display_name=None, bed_path=None, visibility='public',
cscore=None, min_anchor=None, gap_length=None):
"""Run MCscan processing in background thread with queue management
Args:
run_key: Unique run identifier
gff3_path: Path to GFF3 file (None for sequences-based uploads)
pep_path: Path to protein sequences file
genomes: List of comparison genomes
display_name: User-friendly genome name
bed_path: Path to BED file (for sequences-based uploads)
visibility: 'public' or 'private' - controls listing visibility
cscore: C-score threshold (default: 0.99)
min_anchor: Minimum number of gene anchors (default: 4)
gap_length: Maximum gap length between genes (default: 20)
"""
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
status_file = os.path.join(meta_dir, 'job_status.json')
def update_status(status, progress, message, **extra):
"""Helper to update job status file"""
os.makedirs(meta_dir, exist_ok=True)
data = {
'status': status,
'progress': progress,
'message': message,
'run_key': run_key,
'last_updated': datetime.now().isoformat()
}
data.update(extra)
with open(status_file, 'w') as f:
json.dump(data, f)
try:
# Wait for job slot (queue system)
queue_pos = get_queue_position(run_key)
if queue_pos > 0:
update_status('queued', 0, f'Waiting in queue (position {queue_pos})...', queue_position=queue_pos)
# Block until we get a slot (only 1 MCscan job at a time)
acquire_job_slot(run_key)
update_status('running', 5, 'Starting MCscan analysis...')
script_path = os.path.join(SCRIPTS_DIR, 'process_custom_genome.py')
genomes_str = ','.join(genomes)
# Using PYTHON_BIN for HuggingFace Spaces
cmd = [
PYTHON_BIN, script_path,
'--run-key', run_key,
'--pep', pep_path,
'--genomes', genomes_str,
'--meta-dir', meta_dir,
'--visibility', visibility
]
# Add input source (either GFF3 or BED)
if bed_path and os.path.exists(bed_path):
# Sequences-based upload: use BED directly
cmd.extend(['--bed', bed_path])
elif gff3_path and os.path.exists(gff3_path):
# GFF3-based upload
cmd.extend(['--gff3', gff3_path])
else:
raise ValueError("Neither GFF3 nor BED input file found")
# Add display name if provided
if display_name:
cmd.extend(['--display-name', display_name])
# Add MCscan parameters if provided (user overrides)
if cscore is not None:
cmd.extend(['--cscore', str(cscore)])
if min_anchor is not None:
cmd.extend(['--min-anchor', str(min_anchor)])
if gap_length is not None:
cmd.extend(['--gap-length', str(gap_length)])
# Run in background, output to log file in metadata directory
os.makedirs(meta_dir, exist_ok=True)
log_file = os.path.join(meta_dir, 'process.log')
with open(log_file, 'w') as log:
try:
# Run with timeout (5 hours max)
subprocess.run(cmd, stdout=log, stderr=subprocess.STDOUT, cwd=SCRIPTS_DIR,
timeout=MCSCAN_JOB_TIMEOUT_SECONDS)
except subprocess.TimeoutExpired:
update_status('failed', 0, f'Job timed out after {MCSCAN_JOB_TIMEOUT_SECONDS // 3600} hours. The analysis was too complex.')
return
except Exception as e:
# Update status file with error
update_status('failed', 0, f'Processing error: {str(e)}')
finally:
# Always release the job slot
release_job_slot(run_key)
# Clean up temp directory - files are no longer needed after processing
temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key)
if os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir)
logger.info(f"Cleanup: Deleted temp directory for {run_key}")
except Exception as e:
logger.warning(f"Cleanup: Failed to delete temp directory {run_key}: {e}")
# Record analytics based on actual job outcome
if ANALYTICS_AVAILABLE:
try:
final_status = 'failure' # Default to failure
if os.path.exists(status_file):
with open(status_file, 'r') as f:
status_data = json.load(f)
if status_data.get('status') == 'completed':
final_status = 'success'
record_event(
feature_type=FEATURE_CUSTOM_GENOME,
query_genome=display_name or run_key,
status=final_status,
duration_ms=0, # Duration not tracked for background jobs
extra_data={'genomes_count': len(genomes)}
)
except Exception as e:
logger.warning(f"Analytics: Failed to record event for {run_key}: {e}")
@app.route('/api/custom/upload', methods=['POST'])
@limiter.limit("5 per hour")
def api_custom_upload():
"""Upload and validate custom genome files"""
try:
# Check if files are present
if 'gff3' not in request.files or 'pep' not in request.files:
return jsonify({'success': False, 'error': 'Both GFF3 and PEP files are required'})
gff3_file = request.files['gff3']
pep_file = request.files['pep']
run_name = request.form.get('run_name', '').strip()
display_name = request.form.get('display_name', '').strip() # User-friendly genome name
visibility = request.form.get('visibility', 'public').strip() # 'public' or 'private'
# Validate visibility
if visibility not in ('public', 'private'):
visibility = 'public'
# Run name is now required
if not run_name:
return jsonify({'success': False, 'error': 'Run Name is required'})
# Validate name lengths
valid, error = validate_name_length(run_name, 'Run Name')
if not valid:
return jsonify({'success': False, 'error': error})
valid, error = validate_name_length(display_name, 'Display Name')
if not valid:
return jsonify({'success': False, 'error': error})
# Sanitize run name to create base key
base_key = secure_filename(run_name.replace(' ', '_').lower())
if not base_key:
return jsonify({'success': False, 'error': 'Run Name contains invalid characters'})
# For private runs, always generate a unique key with random suffix including symbol
if visibility == 'private':
run_key = generate_private_run_key(base_key)
else:
run_key = base_key
# Check if run key already exists (check both temp and meta dirs)
temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key)
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
if os.path.exists(temp_dir) or os.path.exists(meta_dir):
# Append random suffix
run_key = run_key + '_' + uuid.uuid4().hex[:4]
temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key)
os.makedirs(temp_dir, exist_ok=True)
# Validate and save files
if gff3_file.filename == '' or pep_file.filename == '':
return jsonify({'success': False, 'error': 'No files selected'})
# Validate file extensions
if not allowed_file(gff3_file.filename):
return jsonify({'success': False, 'error': f'Invalid GFF3 file extension. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'})
if not allowed_file(pep_file.filename):
return jsonify({'success': False, 'error': f'Invalid PEP file extension. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'})
# Save GFF3
gff3_filename = secure_filename(gff3_file.filename)
gff3_path = os.path.join(temp_dir, 'input.gff3')
gff3_file.save(gff3_path)
# Save PEP
pep_filename = secure_filename(pep_file.filename)
pep_path = os.path.join(temp_dir, 'input.pep')
pep_file.save(pep_path)
# Basic validation
validation_errors = []
# Check GFF3 has content and size
gff3_size = os.path.getsize(gff3_path)
if gff3_size == 0:
validation_errors.append('GFF3 file is empty')
elif gff3_size > MAX_GFF3_SIZE:
max_mb = MAX_GFF3_SIZE // (1024 * 1024)
validation_errors.append(f'GFF3 file exceeds {max_mb} MB limit (uploaded: {gff3_size // (1024*1024)} MB)')
# Check PEP has content, size, and FASTA format
pep_size = os.path.getsize(pep_path)
if pep_size == 0:
validation_errors.append('PEP file is empty')
elif pep_size > MAX_PEP_SIZE:
max_mb = MAX_PEP_SIZE // (1024 * 1024)
validation_errors.append(f'PEP file exceeds {max_mb} MB limit (uploaded: {pep_size // (1024*1024)} MB)')
else:
# Check if PEP looks like FASTA
with open(pep_path, 'r') as f:
first_line = f.readline().strip()
if not first_line.startswith('>'):
validation_errors.append('PEP file does not appear to be FASTA format (should start with >)')
if validation_errors:
# Cleanup on validation failure
shutil.rmtree(temp_dir, ignore_errors=True)
return jsonify({'success': False, 'error': '; '.join(validation_errors)})
# Count genes in GFF3 and proteins in PEP
gene_count = 0
with open(gff3_path, 'r') as f:
for line in f:
if not line.startswith('#') and '\tgene\t' in line:
gene_count += 1
protein_count = 0
with open(pep_path, 'r') as f:
for line in f:
if line.startswith('>'):
protein_count += 1
return jsonify({
'success': True,
'run_key': run_key,
'visibility': visibility,
'gff3_size': gff3_size,
'pep_size': pep_size,
'gene_count': gene_count,
'protein_count': protein_count,
'message': f'Files uploaded successfully. Found ~{gene_count} genes and {protein_count} proteins.'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/custom/upload-sequences', methods=['POST'])
@limiter.limit("5 per hour")
def api_custom_upload_sequences():
"""Upload BED content and protein sequences for custom genome"""
try:
# Get BED content as text (not file upload)
bed_content = request.form.get('bed_content', '').strip()
sequences_text = request.form.get('sequences', '').strip()
run_name = request.form.get('run_name', '').strip()
display_name = request.form.get('display_name', '').strip()
visibility = request.form.get('visibility', 'public').strip() # 'public' or 'private'
# Validate visibility
if visibility not in ('public', 'private'):
visibility = 'public'
if not bed_content:
return jsonify({'success': False, 'error': 'BED content is required'})
# Validate BED content size
bed_size = len(bed_content.encode('utf-8'))
if bed_size > MAX_BED_SIZE:
max_mb = MAX_BED_SIZE // (1024 * 1024)
return jsonify({'success': False, 'error': f'BED content exceeds {max_mb} MB limit (uploaded: {bed_size // (1024*1024)} MB)'})
if not sequences_text:
return jsonify({'success': False, 'error': 'Protein sequences are required'})
if not display_name:
return jsonify({'success': False, 'error': 'Genome display name is required'})
# Run name is now required
if not run_name:
return jsonify({'success': False, 'error': 'Run Name is required'})
# Validate name lengths
valid, error = validate_name_length(run_name, 'Run Name')
if not valid:
return jsonify({'success': False, 'error': error})
valid, error = validate_name_length(display_name, 'Display Name')
if not valid:
return jsonify({'success': False, 'error': error})
# Sanitize run name to create base key
base_key = secure_filename(run_name.replace(' ', '_').lower())
if not base_key:
return jsonify({'success': False, 'error': 'Run Name contains invalid characters'})
# For private runs, always generate a unique key with random suffix including symbol
if visibility == 'private':
run_key = generate_private_run_key(base_key)
else:
run_key = base_key
# Check if run key already exists
temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key)
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
if os.path.exists(temp_dir) or os.path.exists(meta_dir):
run_key = run_key + '_' + uuid.uuid4().hex[:4]
temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key)
os.makedirs(temp_dir, exist_ok=True)
# Parse BED content with header detection
validation_errors = []
gene_names = []
bed_lines = []
raw_lines = [line.strip() for line in bed_content.split('\n') if line.strip() and not line.startswith('#')]
if len(raw_lines) == 0:
validation_errors.append('BED content is empty')
else:
# Detect header row: if column 5 is not "0" or column 6 is not "+" or "-"
first_parts = raw_lines[0].split('\t')
skip_first = False
if len(first_parts) >= 6:
col5 = first_parts[4]
col6 = first_parts[5].strip()
if col5 != '0' or (col6 != '+' and col6 != '-'):
# This is a header row, skip it
skip_first = True
data_lines = raw_lines[1:] if skip_first else raw_lines
for line_idx, line in enumerate(data_lines):
orig_line_num = line_idx + (2 if skip_first else 1)
parts = line.split('\t')
if len(parts) < 6:
validation_errors.append(f'Line {orig_line_num}: BED must have 6 tab-separated columns (found {len(parts)})')
break
# Validate strand
strand = parts[5].strip()
if strand not in ('+', '-'):
validation_errors.append(f'Line {orig_line_num}: Strand must be "+" or "-" (found "{strand}")')
break
gene_names.append(parts[3])
bed_lines.append(line)
# Check minimum gene count (4 genes required)
if len(gene_names) < 4:
validation_errors.append(f'Minimum 4 genes required. Found {len(gene_names)} data rows.')
if validation_errors:
shutil.rmtree(temp_dir, ignore_errors=True)
return jsonify({'success': False, 'error': '; '.join(validation_errors)})
# Save validated BED content (without header)
bed_path = os.path.join(temp_dir, 'input.bed')
with open(bed_path, 'w') as f:
for line in bed_lines:
f.write(line + '\n')
# Parse and validate protein sequences (FASTA format)
pep_path = os.path.join(temp_dir, 'input.pep')
protein_names = []
try:
with open(pep_path, 'w') as f:
# Parse FASTA from text
current_header = None
current_seq = []
for line in sequences_text.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('>'):
# Save previous sequence
if current_header and current_seq:
f.write(f'>{current_header}\n')
f.write(''.join(current_seq) + '\n')
# Parse new header
current_header = line[1:].split()[0] # Take first word after >
protein_names.append(current_header)
current_seq = []
else:
current_seq.append(line)
# Save last sequence
if current_header and current_seq:
f.write(f'>{current_header}\n')
f.write(''.join(current_seq) + '\n')
except Exception as e:
shutil.rmtree(temp_dir, ignore_errors=True)
return jsonify({'success': False, 'error': f'Error parsing protein sequences: {str(e)}'})
if len(protein_names) == 0:
shutil.rmtree(temp_dir, ignore_errors=True)
return jsonify({'success': False, 'error': 'No valid protein sequences found. Check FASTA format.'})
# Check if BED genes match protein sequences
bed_genes_set = set(gene_names)
protein_genes_set = set(protein_names)
# Also check for isoform naming (gene.N pattern)
for prot in protein_names:
base_name = prot.rsplit('.', 1)[0] if '.' in prot else prot
protein_genes_set.add(base_name)
missing_in_proteins = bed_genes_set - protein_genes_set
if len(missing_in_proteins) > len(gene_names) * 0.5:
# More than 50% of genes are missing - warn user
sample_missing = list(missing_in_proteins)[:5]
shutil.rmtree(temp_dir, ignore_errors=True)
return jsonify({
'success': False,
'error': f'Many BED genes not found in protein sequences ({len(missing_in_proteins)}/{len(gene_names)}). Sample: {", ".join(sample_missing)}'
})
return jsonify({
'success': True,
'run_key': run_key,
'visibility': visibility,
'bed_size': len(bed_content),
'gene_count': len(gene_names),
'protein_count': len(protein_names),
'message': f'Sequences uploaded successfully. Found {len(gene_names)} genes and {len(protein_names)} proteins.'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/custom/run-mcscan', methods=['POST'])
def api_custom_run_mcscan():
"""Start MCscan analysis for uploaded genome"""
start_time = time.time()
data = request.json
# Check IP-based run limit FIRST (before any other processing)
client_ip = get_remote_address()
allowed, limit_msg = check_ip_run_limit(client_ip)
if not allowed:
# Record rate limit failure
if ANALYTICS_AVAILABLE:
record_event(
feature_type=FEATURE_CUSTOM_GENOME,
query_genome=None,
status='failure',
duration_ms=int((time.time() - start_time) * 1000),
request=request,
extra_data={'error': 'rate_limit'}
)
return jsonify({'success': False, 'error': limit_msg}), 429
run_key = data.get('run_key')
genomes = data.get('genomes', [])
display_name = data.get('display_name', '') # User-friendly genome name
visibility = data.get('visibility', 'public') # 'public' or 'private'
# MCscan parameters (optional user overrides)
cscore = data.get('cscore') # Default: 0.99 (set in shell script)
min_anchor = data.get('min_anchor') # Default: 4 (set in shell script)
gap_length = data.get('gap_length') # Default: 20 (set in shell script)
# Validate numeric parameters if provided
if cscore is not None:
try:
cscore = float(cscore)
if not (0.0 <= cscore <= 1.0):
return jsonify({'success': False, 'error': 'C-score must be between 0 and 1'})
except (ValueError, TypeError):
return jsonify({'success': False, 'error': 'Invalid C-score value'})
if min_anchor is not None:
try:
min_anchor = int(min_anchor)
if min_anchor < 1:
return jsonify({'success': False, 'error': 'Minimum anchor count must be at least 1'})
except (ValueError, TypeError):
return jsonify({'success': False, 'error': 'Invalid minimum anchor count value'})
if gap_length is not None:
try:
gap_length = int(gap_length)
if gap_length < 1:
return jsonify({'success': False, 'error': 'Gap length must be at least 1'})
except (ValueError, TypeError):
return jsonify({'success': False, 'error': 'Invalid gap length value'})
if not run_key:
return jsonify({'success': False, 'error': 'run_key is required'})
if not display_name or not display_name.strip():
return jsonify({'success': False, 'error': 'Genome Display Name is required'})
if not genomes:
return jsonify({'success': False, 'error': 'At least one comparison genome is required'})
# Validate genomes
for g in genomes:
if g not in AVAILABLE_GENOMES:
return jsonify({'success': False, 'error': f'Invalid genome: {g}'})
# Check temp directory for uploaded files
temp_dir = os.path.join(CUSTOM_TEMP_DIR, run_key)
if not os.path.exists(temp_dir):
return jsonify({'success': False, 'error': f'Run key not found: {run_key}'})
# Detect upload type: GFF3-based or sequences-based (BED)
gff3_path = os.path.join(temp_dir, 'input.gff3')
bed_path = os.path.join(temp_dir, 'input.bed')
pep_path = os.path.join(temp_dir, 'input.pep')
is_sequences_based = os.path.exists(bed_path) and not os.path.exists(gff3_path)
is_gff3_based = os.path.exists(gff3_path)
if not os.path.exists(pep_path):
return jsonify({'success': False, 'error': 'Protein sequences file not found. Please upload files first.'})
if is_sequences_based:
# Sequences-based upload: use BED file directly
input_file = bed_path
elif is_gff3_based:
# GFF3-based upload: use GFF3 file
input_file = gff3_path
else:
return jsonify({'success': False, 'error': 'Input files not found. Please upload files first.'})
# Create metadata directory for status tracking
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
os.makedirs(meta_dir, exist_ok=True)
# Create initial status file in metadata directory
status_file = os.path.join(meta_dir, 'job_status.json')
with open(status_file, 'w') as f:
json.dump({
'status': 'queued',
'progress': 0,
'message': 'Job queued, waiting for available slot...',
'run_key': run_key,
'genomes': genomes,
'visibility': visibility,
'total_steps': len(genomes) + 3,
'input_type': 'sequences' if is_sequences_based else 'gff3',
'params': {
'cscore': cscore,
'min_anchor': min_anchor,
'gap_length': gap_length
}
}, f)
# Record this run against the IP (for daily limit tracking)
record_ip_run(client_ip)
# Start background processing
thread = threading.Thread(
target=run_mcscan_background,
args=(run_key, gff3_path if is_gff3_based else None, pep_path, genomes, display_name),
kwargs={
'bed_path': bed_path if is_sequences_based else None,
'visibility': visibility,
'cscore': cscore,
'min_anchor': min_anchor,
'gap_length': gap_length
}
)
thread.daemon = True
thread.start()
# Note: Analytics is now recorded in run_mcscan_background when job completes
# to capture actual success/failure status
# Estimate time based on number of genomes
est_time = len(genomes) * 1 # ~1 minute per genome
return jsonify({
'success': True,
'run_key': run_key,
'genomes': genomes,
'message': f'MCscan analysis queued against {len(genomes)} genome(s). Jobs run one at a time.',
'estimated_minutes': est_time,
'runs_remaining': MAX_RUNS_PER_IP - len(_ip_run_tracker['runs'].get(client_ip, []))
})
@app.route('/api/custom/status/<run_key>')
def api_custom_status(run_key):
"""Get status of a custom genome MCscan job"""
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
if not os.path.exists(meta_dir):
return jsonify({'success': False, 'error': f'Run key not found: {run_key}'})
status_file = os.path.join(meta_dir, 'job_status.json')
if os.path.exists(status_file):
with open(status_file, 'r') as f:
status = json.load(f)
# Transform internal error codes to user-friendly messages
if status.get('status') == 'failed' and 'message' in status:
status['message'] = extract_user_error_from_log(status['message'], run_key)
# Add queue position info
queue_pos = get_queue_position(run_key)
if queue_pos >= 0:
status['queue_position'] = queue_pos
return jsonify({'success': True, 'data': status})
else:
return jsonify({
'success': True,
'data': {
'status': 'unknown',
'message': 'Job status not available'
}
})
@app.route('/api/queue/status')
def api_queue_status():
"""Get current MCscan job queue status"""
with _job_queue_status['queue_lock']:
current_job = _job_queue_status['current_job']
queue_positions = dict(_job_queue_status['queue_position'])
waiting_jobs = [k for k, v in queue_positions.items() if v > 0]
return jsonify({
'success': True,
'current_job': current_job,
'jobs_in_queue': len(waiting_jobs),
'waiting_jobs': waiting_jobs,
'max_concurrent': 1,
'max_runs_per_ip': MAX_RUNS_PER_IP,
'run_limit_window_hours': RUN_LIMIT_WINDOW_HOURS
})
@app.route('/api/custom/lookup/<run_key>')
def api_custom_lookup(run_key):
"""Lookup a custom genome run by its key (for sharing)"""
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
if not os.path.exists(meta_dir):
return jsonify({'success': False, 'error': f'Run not found: {run_key}'})
manifest_file = os.path.join(meta_dir, 'manifest.json')
status_file = os.path.join(meta_dir, 'job_status.json')
result = {
'run_key': run_key,
'exists': True
}
if os.path.exists(manifest_file):
with open(manifest_file, 'r') as f:
result['manifest'] = json.load(f)
if os.path.exists(status_file):
with open(status_file, 'r') as f:
result['status'] = json.load(f)
return jsonify({'success': True, 'data': result})
@app.route('/api/custom/genomes')
def api_custom_genomes():
"""List all available custom genomes"""
if not os.path.exists(CUSTOM_META_DIR):
return jsonify({'success': True, 'genomes': []})
genomes = []
for run_key in os.listdir(CUSTOM_META_DIR):
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
if not os.path.isdir(meta_dir):
continue
genome_info = {'run_key': run_key}
visibility = 'public' # Default to public for legacy runs
manifest_file = os.path.join(meta_dir, 'manifest.json')
if os.path.exists(manifest_file):
try:
with open(manifest_file, 'r') as f:
manifest_data = json.load(f)
genome_info['manifest'] = manifest_data
visibility = manifest_data.get('visibility', 'public')
except:
pass
# Check job_status.json for visibility if not in manifest (for runs still processing)
status_file = os.path.join(meta_dir, 'job_status.json')
if os.path.exists(status_file):
try:
with open(status_file, 'r') as f:
status_data = json.load(f)
genome_info['status'] = status_data.get('status', 'unknown')
genome_info['progress'] = status_data.get('progress', 0)
# Use visibility from status if manifest doesn't have it
if visibility == 'public' and 'visibility' in status_data:
visibility = status_data.get('visibility', 'public')
except:
genome_info['status'] = 'unknown'
# Debug logging
# print(f"DEBUG: Processing {run_key}. Visibility: {visibility}. Included: {visibility == 'public'}")
# Only include public runs in the listing
if visibility == 'public':
genomes.append(genome_info)
# Sort by creation date (newest first)
genomes.sort(key=lambda x: x.get('manifest', {}).get('created_at', ''), reverse=True)
response = jsonify({'success': True, 'genomes': genomes})
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
return response
@app.route('/api/custom/genomes/<run_key>', methods=['DELETE'])
def api_custom_delete(run_key):
"""Delete a custom genome run and its associated files in Mcscan_results"""
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
if not os.path.exists(meta_dir):
return jsonify({'success': False, 'error': f'Run not found: {run_key}'})
try:
# First, load manifest to get list of files in Mcscan_results
manifest_file = os.path.join(meta_dir, 'manifest.json')
if os.path.exists(manifest_file):
with open(manifest_file, 'r') as f:
manifest = json.load(f)
# Clean up files in Mcscan_results folder
cleanup_mcscan_results_files(manifest)
# Then delete the custom genome metadata folder
shutil.rmtree(meta_dir)
return jsonify({'success': True, 'message': f'Deleted run: {run_key}'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/custom/plot', methods=['POST'])
def api_custom_plot():
"""Generate microsynteny plot using a custom genome as query"""
data = request.json
run_key = data.get('run_key')
genes = data.get('genes', [])
comparisons = data.get('comparisons', [])
colors = data.get('colors', [])
annotations = data.get('annotations', [])
layout = data.get('layout', None)
genome_order = data.get('genome_order', None)
# Advanced tweaking parameters (optional)
padding_config = data.get('padding_config', {}) # {genome: {left: bp, right: bp}}
max_genes_config = data.get('max_genes_config', {}) # {genome: {left: count, right: count}}
display_names = data.get('display_names', {}) # {genome: "Custom Name"}
# Gene labels parameters (optional)
gene_labels = data.get('gene_labels', []) # List of gene IDs to label on the plot
gene_label_size = data.get('gene_label_size', 0) # Font size for labels (0=disabled, 2-8 recommended)
# Debug logging for tweaking parameters
logger.debug(f"api_custom_plot received:")
logger.debug(f" padding_config: {padding_config}")
logger.debug(f" max_genes_config: {max_genes_config}")
logger.debug(f" display_names: {display_names}")
logger.debug(f" gene_labels: {gene_labels}")
logger.debug(f" gene_label_size: {gene_label_size}")
if not run_key:
return jsonify({'success': False, 'error': 'run_key is required'})
if not genes:
return jsonify({'success': False, 'error': 'At least one gene is required'})
if not comparisons:
return jsonify({'success': False, 'error': 'At least one comparison genome is required'})
# Validate and sanitize gene IDs (security measure)
validated_genes, error_msg = sanitize_gene_ids(genes)
if error_msg:
return jsonify({'success': False, 'error': error_msg})
genes = validated_genes
# Also validate gene_labels if provided
if gene_labels:
validated_labels, label_error = sanitize_gene_ids(gene_labels)
if label_error:
return jsonify({'success': False, 'error': f'Gene labels: {label_error}'})
gene_labels = validated_labels
# Files are in the main Mcscan_results folder - check there
i1_blocks_dir = os.path.join(MCSCAN_RESULTS_DIR, 'i1_blocks')
# Check that comparison results exist (try both name orderings)
for comp in comparisons:
blocks_file1 = os.path.join(i1_blocks_dir, f'{run_key}.{comp}.i1.blocks')
blocks_file2 = os.path.join(i1_blocks_dir, f'{comp}.{run_key}.i1.blocks')
if not os.path.exists(blocks_file1) and not os.path.exists(blocks_file2):
return jsonify({'success': False, 'error': f'No MCscan results for comparison with {comp}'})
# Get display name from manifest (user-provided genome name), fallback to run_key
custom_display_name = run_key
mcscan_params = None # Will be read from manifest if available
meta_dir = os.path.join(CUSTOM_META_DIR, run_key)
manifest_file = os.path.join(meta_dir, 'manifest.json')
if os.path.exists(manifest_file):
try:
with open(manifest_file, 'r') as f:
manifest = json.load(f)
if manifest.get('display_name'):
custom_display_name = manifest['display_name']
# Read MCscan parameters from manifest for Method row in CSV
if manifest.get('mcscan_params'):
mcscan_params = manifest['mcscan_params']
except Exception as e:
logger.warning(f"Could not read manifest: {e}")
try:
# Build command for the plotting script
script_path = os.path.join(SCRIPTS_DIR, 'plot_user_genes_microsynteny_v2.py')
if not os.path.exists(script_path):
return jsonify({'success': False, 'error': f'Plotting script not found'})
# Build args - use run_key as the query genome, using PYTHON_BIN for HuggingFace Spaces
args = [PYTHON_BIN, script_path]
if colors:
args.extend(['--colors', ','.join(colors)])
args.extend(['--query', run_key, '--genes'] + genes + ['--comparisons'] + comparisons)
if annotations:
annotations_str = '|||'.join(str(a) for a in annotations)
args.extend(['--annotations', annotations_str])
# Pass MCscan parameters for Method row in CSV (if available from manifest)
if mcscan_params:
args.extend(['--mcscan-params', json.dumps(mcscan_params)])
if layout and isinstance(layout, list) and len(layout) > 0:
layout_str = ','.join(map(str, layout))
args.extend(['--layout', layout_str])
if genome_order and isinstance(genome_order, list):
order_str = ','.join(genome_order)
args.extend(['--genome-order', order_str])
# Add advanced tweaking parameters
# For custom genomes, the query is referenced as 'custom_query' in the frontend
# but the actual run_key is used in the backend
query_ref = 'custom_query' # Frontend reference for query genome
# Padding configuration for query genome (asymmetric)
if query_ref in padding_config and padding_config[query_ref]:
query_pad = padding_config[query_ref]
if 'left' in query_pad and query_pad['left'] is not None:
args.extend(['--query-padding-left', str(int(query_pad['left']))])
if 'right' in query_pad and query_pad['right'] is not None:
args.extend(['--query-padding-right', str(int(query_pad['right']))])
# Max genes configuration for query genome (asymmetric)
if query_ref in max_genes_config and max_genes_config[query_ref]:
query_genes = max_genes_config[query_ref]
if 'left' in query_genes and query_genes['left'] is not None:
args.extend(['--query-max-genes-left', str(int(query_genes['left']))])
if 'right' in query_genes and query_genes['right'] is not None:
args.extend(['--query-max-genes-right', str(int(query_genes['right']))])
# Comparison genomes padding (format: genome:left:right|genome2:left:right)
comp_padding_parts = []
for comp in comparisons:
if comp in padding_config and padding_config[comp]:
comp_pad = padding_config[comp]
left_val = int(comp_pad.get('left', 1500000))
right_val = int(comp_pad.get('right', 1500000))
comp_padding_parts.append(f"{comp}:{left_val}:{right_val}")
if comp_padding_parts:
comp_padding_str = '|'.join(comp_padding_parts)
args.extend(['--comp-padding-config', comp_padding_str])
# Comparison genomes max genes (format: genome:left:right|genome2:left:right)
comp_genes_parts = []
for comp in comparisons:
if comp in max_genes_config and max_genes_config[comp]:
comp_genes = max_genes_config[comp]
left_val = int(comp_genes.get('left', 50))
right_val = int(comp_genes.get('right', 50))
comp_genes_parts.append(f"{comp}:{left_val}:{right_val}")
if comp_genes_parts:
comp_genes_str = '|'.join(comp_genes_parts)
args.extend(['--comp-max-genes-config', comp_genes_str])
# Custom display names (format: genome:CustomName|genome2:Name2)
# For custom genomes: 'custom_query' in frontend maps to the actual run_key
if display_names:
display_parts = []
for genome, name in display_names.items():
if name and name.strip():
# Map 'custom_query' to actual run_key for the plotting script
actual_genome = run_key if genome == 'custom_query' else genome
# Escape special characters in display name
safe_name = name.replace('|', '_').replace(':', '_')
display_parts.append(f"{actual_genome}:{safe_name}")
if display_parts:
display_str = '|'.join(display_parts)
args.extend(['--display-names', display_str])
# Gene labels (list of gene IDs to display labels for on the plot)
if gene_labels and isinstance(gene_labels, list) and len(gene_labels) > 0:
gene_labels_str = ','.join(str(g) for g in gene_labels if g)
if gene_labels_str:
args.extend(['--genelabels', gene_labels_str])
# Add label size (default to 8 if labels are provided but size not specified)
label_size = int(gene_label_size) if gene_label_size else 8
if label_size > 0:
args.extend(['--genelabelsize', str(label_size)])
# Keep low-confidence coloring option (optional - colors all syntenic matches)
keep_lowconf_color = data.get('keep_lowconf_color', False)
if keep_lowconf_color:
args.extend(['--keep-lowconf-color'])
# Debug: print final command
logger.debug(f"api_custom_plot final command args: {' '.join(args)}")
# Set environment - files are in standard Mcscan_results folder now
env = os.environ.copy()
env['PYTHON'] = PYTHON_BIN
# Mark this as a custom genome for the script to handle appropriately
env['CUSTOM_GENOME_KEY'] = run_key
# Pass the display name for plot labels
env['CUSTOM_DISPLAY_NAME'] = custom_display_name
result = subprocess.run(
args,
capture_output=True,
text=True,
env=env,
cwd=SCRIPTS_DIR
)
if result.returncode == 0:
# Find the output folder
# Priority 1: Check if custom_query has a display name in tweaking config
# (note: 'custom_query' in frontend maps to run_key for the script)
# Priority 2: Use custom_display_name from manifest
folder_display_name = display_names.get('custom_query') if display_names and 'custom_query' in display_names else custom_display_name
# Escape special characters to match what was passed to script
folder_display_name = folder_display_name.replace('|', '_').replace(':', '_')
comp_str = '_'.join(comparisons)
pattern_prefix = f"{folder_display_name}_usergenes_{comp_str}_"
logger.debug(f"Looking for custom genome output folder with prefix: {pattern_prefix}")
output_folder = None
if os.path.exists(OUTPUT_DIR):
import re
folders = []
for f in os.listdir(OUTPUT_DIR):
if f.startswith(pattern_prefix):
remainder = f[len(pattern_prefix):]
if re.match(r'^\d{8}_\d{6}$', remainder):
folders.append(f)
if folders:
folders.sort(reverse=True)
output_folder = os.path.join(OUTPUT_DIR, folders[0])
if output_folder and os.path.exists(output_folder):
files = {}
for fname in os.listdir(output_folder):
if fname == 'microsynteny_plot.png':
files['png'] = fname
elif fname == 'microsynteny_plot.svg':
files['svg'] = fname
elif fname.endswith('.csv'):
files['csv'] = fname
return jsonify({
'success': True,
'message': 'Plot generated successfully',
'output_folder': os.path.basename(output_folder),
'files': files
})
else:
# Log details for debugging (not shown to user)
if app.debug:
logger.debug(f"Custom plot output folder not found. stdout: {result.stdout}")
logger.debug(f"stderr: {result.stderr}")
return jsonify({
'success': False,
'error': 'Output folder not found after plot generation'
})
else:
error_msg = 'Script failed'
combined = result.stdout + '\n' + result.stderr
if 'No syntenic matches' in combined:
error_msg = 'No syntenic matches found for the specified genes'
elif 'ERROR:' in combined:
for line in combined.split('\n'):
if 'ERROR:' in line:
error_msg = line.split('ERROR:')[-1].strip()
break
# Log details for debugging (not shown to user)
if app.debug:
logger.debug(f"Custom plot script failed. stdout: {result.stdout}")
logger.debug(f"stderr: {result.stderr}")
return jsonify({
'success': False,
'error': error_msg
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/custom/genes/<run_key>')
def api_custom_genes(run_key):
"""Get list of genes from a custom genome's BED file in main Mcscan_results folder"""
# BED file is now in the main bed_files folder
bed_files_dir = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files')
bed_file = os.path.join(bed_files_dir, f'{run_key}.bed')
if not os.path.exists(bed_file):
return jsonify({'success': False, 'error': f'BED file not found for run: {run_key}'})
genes = []
try:
with open(bed_file, 'r') as f:
for line in f:
if line.strip():
parts = line.strip().split('\t')
if len(parts) >= 4:
genes.append({
'chr': parts[0],
'start': int(parts[1]),
'end': int(parts[2]),
'gene_id': parts[3]
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
return jsonify({'success': True, 'genes': genes, 'total': len(genes)})
# ============================================================================
# Custom Synteny (Multi-Genome) API Routes
# ============================================================================
# Custom Synteny metadata directory
CUSTOM_SYNTENY_META_DIR = os.path.join(MCSCAN_RESULTS_DIR, 'custom_synteny_meta')
os.makedirs(CUSTOM_SYNTENY_META_DIR, exist_ok=True)
# Ensure bed_files and pep_files directories exist
os.makedirs(os.path.join(MCSCAN_RESULTS_DIR, 'bed_files'), exist_ok=True)
os.makedirs(os.path.join(MCSCAN_RESULTS_DIR, 'pep_files'), exist_ok=True)
@app.route('/api/custom-synteny/upload', methods=['POST'])
@limiter.limit("5 per hour")
def api_custom_synteny_upload():
"""Upload multiple genomes for custom synteny analysis"""
try:
project_name = request.form.get('project_name', '').strip()
if not project_name:
return jsonify({'success': False, 'error': 'Project name is required'})
# Validate project name length
valid, error = validate_name_length(project_name, 'Project Name')
if not valid:
return jsonify({'success': False, 'error': error})
visibility = request.form.get('visibility', 'public').strip()
# Get or create run key
run_key = request.form.get('run_key', '').strip()
if not run_key:
# Create new run key from project name
base_key = secure_filename(project_name.replace(' ', '_').lower())
if not base_key:
return jsonify({'success': False, 'error': 'Project name contains invalid characters'})
# For private runs, always generate a unique key with random suffix including symbol
if visibility == 'private':
run_key = generate_private_run_key(base_key)
else:
run_key = base_key
# Check if exists, append suffix if needed
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
if os.path.exists(meta_dir):
run_key = run_key + '_' + uuid.uuid4().hex[:4]
# Parse genome metadata
genomes_json = request.form.get('genomes', '[]')
try:
genomes_meta = json.loads(genomes_json)
except json.JSONDecodeError:
return jsonify({'success': False, 'error': 'Invalid genomes metadata'})
# Parse DB genomes and comparison pairs
db_genomes_json = request.form.get('db_genomes', '[]')
pairs_json = request.form.get('pairs', '[]')
try:
db_genomes = json.loads(db_genomes_json)
selected_pairs = json.loads(pairs_json)
except json.JSONDecodeError:
return jsonify({'success': False, 'error': 'Invalid JSON data'})
# Create directories
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
os.makedirs(meta_dir, exist_ok=True)
# Process each genome
uploaded_genomes = []
for idx, genome_info in enumerate(genomes_meta):
gff3_key = f'gff3_{idx}'
pep_key = f'pep_{idx}'
if gff3_key not in request.files or pep_key not in request.files:
continue # Skip if files not present
gff3_file = request.files[gff3_key]
pep_file = request.files[pep_key]
if gff3_file.filename == '' or pep_file.filename == '':
continue # Skip empty files
# Validate file extensions
if not allowed_file(gff3_file.filename):
return jsonify({'success': False, 'error': f'Invalid GFF3 file extension for genome {genome_info.get("displayName", idx)}. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'})
if not allowed_file(pep_file.filename):
return jsonify({'success': False, 'error': f'Invalid PEP file extension for genome {genome_info.get("displayName", idx)}. Allowed: {", ".join(ALLOWED_EXTENSIONS)}'})
genome_id = genome_info.get('id', idx)
display_name = genome_info.get('displayName', f'Genome_{idx}')
# Create genome directory
genome_dir = os.path.join(meta_dir, f'genome_{genome_id}')
os.makedirs(genome_dir, exist_ok=True)
# Save files
gff3_path = os.path.join(genome_dir, 'input.gff3')
pep_path = os.path.join(genome_dir, 'input.pep')
gff3_file.save(gff3_path)
pep_file.save(pep_path)
# Basic validation
gff3_size = os.path.getsize(gff3_path)
pep_size = os.path.getsize(pep_path)
if gff3_size == 0 or pep_size == 0:
shutil.rmtree(genome_dir, ignore_errors=True)
return jsonify({'success': False, 'error': f'Empty file for genome: {display_name}'})
# Count genes/proteins
gene_count = 0
with open(gff3_path, 'r') as f:
for line in f:
if not line.startswith('#') and '\tgene\t' in line:
gene_count += 1
protein_count = 0
with open(pep_path, 'r') as f:
for line in f:
if line.startswith('>'):
protein_count += 1
# Create sanitized key from display name
genome_key = secure_filename(display_name.replace(' ', '_').lower())
# Save genome metadata
genome_meta = {
'id': genome_id,
'key': genome_key,
'displayName': display_name,
'visibility': visibility,
'gff3_size': gff3_size,
'pep_size': pep_size,
'gene_count': gene_count,
'protein_count': protein_count,
'uploaded_at': datetime.now().isoformat()
}
with open(os.path.join(genome_dir, 'metadata.json'), 'w') as f:
json.dump(genome_meta, f, indent=2)
uploaded_genomes.append(genome_meta)
if len(uploaded_genomes) < 2:
shutil.rmtree(meta_dir, ignore_errors=True)
return jsonify({'success': False, 'error': 'At least 2 genomes with files are required'})
# Create project manifest
manifest = {
'project_name': project_name,
'run_key': run_key,
'created_at': datetime.now().isoformat(),
'visibility': visibility,
'genomes': uploaded_genomes,
'db_genomes': db_genomes,
'selected_pairs': selected_pairs,
'genome_count': len(uploaded_genomes),
'comparison_count': len(selected_pairs)
}
manifest_path = os.path.join(meta_dir, 'manifest.json')
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
return jsonify({
'success': True,
'run_key': run_key,
'genomes': uploaded_genomes,
'genome_count': len(uploaded_genomes),
'message': f'Successfully uploaded {len(uploaded_genomes)} genomes'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/custom-synteny/run-mcscan', methods=['POST'])
def api_custom_synteny_run_mcscan():
"""Start MCscan analysis for custom synteny project"""
start_time = time.time()
data = request.json
# Check IP-based run limit FIRST (before any other processing)
client_ip = get_remote_address()
allowed, limit_msg = check_ip_run_limit(client_ip)
if not allowed:
# Record rate limit failure
if ANALYTICS_AVAILABLE:
record_event(
feature_type=FEATURE_CUSTOM_SYNTENY,
query_genome=None,
status='failure',
duration_ms=int((time.time() - start_time) * 1000),
request=request,
extra_data={'error': 'rate_limit'}
)
return jsonify({'success': False, 'error': limit_msg}), 429
run_key = data.get('run_key')
# MCscan parameters
cscore = data.get('cscore')
min_anchor = data.get('min_anchor')
gap_length = data.get('gap_length')
if not run_key:
return jsonify({'success': False, 'error': 'run_key is required'})
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
if not os.path.exists(meta_dir):
return jsonify({'success': False, 'error': f'Run key not found: {run_key}'})
# Load manifest to get pairs and genomes
manifest_path = os.path.join(meta_dir, 'manifest.json')
if not os.path.exists(manifest_path):
return jsonify({'success': False, 'error': 'Manifest not found'})
with open(manifest_path, 'r') as f:
manifest = json.load(f)
selected_pairs = manifest.get('selected_pairs', [])
db_genomes = manifest.get('db_genomes', [])
if not selected_pairs:
return jsonify({'success': False, 'error': 'No comparison pairs found in manifest'})
# Validate db_genomes
for g in db_genomes:
if g not in AVAILABLE_GENOMES:
return jsonify({'success': False, 'error': f'Invalid database genome: {g}'})
# Estimate time based on pairs
estimated_minutes = len(selected_pairs) * 2 # ~2 minutes per pair
# Create status file
status_file = os.path.join(meta_dir, 'job_status.json')
with open(status_file, 'w') as f:
json.dump({
'status': 'queued',
'progress': 0,
'message': 'Job queued, waiting for available slot...',
'run_key': run_key,
'selected_pairs': selected_pairs,
'db_genomes': db_genomes,
'total_pairs': len(selected_pairs),
'completed_pairs': 0,
'params': {
'cscore': cscore,
'min_anchor': min_anchor,
'gap_length': gap_length
}
}, f, indent=2)
# Record this run against the IP (for daily limit tracking)
record_ip_run(client_ip)
# Start background processing
thread = threading.Thread(
target=run_custom_synteny_background,
args=(run_key, selected_pairs, db_genomes),
kwargs={
'cscore': cscore,
'min_anchor': min_anchor,
'gap_length': gap_length
}
)
thread.daemon = True
thread.start()
# Note: Analytics is now recorded in run_custom_synteny_background when job completes
# to capture actual success/failure status
return jsonify({
'success': True,
'run_key': run_key,
'total_pairs': len(selected_pairs),
'estimated_minutes': estimated_minutes,
'message': f'MCscan analysis queued for {len(selected_pairs)} comparison pairs. Jobs run one at a time.',
'runs_remaining': MAX_RUNS_PER_IP - len(_ip_run_tracker['runs'].get(client_ip, []))
})
def run_custom_synteny_background(run_key, selected_pairs, db_genomes, cscore=None, min_anchor=None, gap_length=None):
"""Background task for custom synteny MCscan processing with queue management
Uses the existing process_custom_genome.sh script for each custom genome,
which properly handles GFF3->BED conversion and places files in the correct
Mcscan_results directories.
"""
import traceback # For detailed error logging
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
status_file = os.path.join(meta_dir, 'job_status.json')
manifest_path = os.path.join(meta_dir, 'manifest.json')
def update_status(status, progress, message, **extra):
data = {
'status': status,
'progress': progress,
'message': message,
'run_key': run_key,
'selected_pairs': selected_pairs,
'db_genomes': db_genomes,
'total_pairs': len(selected_pairs),
'last_updated': datetime.now().isoformat() # Track when status was last updated
}
data.update(extra)
with open(status_file, 'w') as f:
json.dump(data, f, indent=2)
try:
# Wait for job slot (queue system - only 1 MCscan at a time)
queue_pos = get_queue_position(run_key)
if queue_pos > 0:
update_status('queued', 0, f'Waiting in queue (position {queue_pos})...', queue_position=queue_pos)
# Block until we get a slot
acquire_job_slot(run_key)
update_status('running', 2, 'Starting custom synteny analysis...')
# Load manifest
with open(manifest_path, 'r') as f:
manifest = json.load(f)
genomes_list = manifest.get('genomes', [])
total_genomes = len(genomes_list)
genome_keys = {} # Map genome_id -> key for MCscan
# Add DB genomes to genome_keys and copy their BED files to bed_files directory
for db_genome in db_genomes:
genome_keys[db_genome] = db_genome
# Copy BED file from Genomes/ to bed_files/ if not already there
src_bed = os.path.join(SCRIPT_DIR, '..', 'Genomes', db_genome, f'{db_genome}.bed')
dst_bed = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{db_genome}.bed')
if os.path.exists(src_bed) and not os.path.exists(dst_bed):
shutil.copy2(src_bed, dst_bed)
# Step 1: Process each custom genome using the existing process_custom_genome.sh script
# This properly converts GFF3 to BED and sets up all necessary files
for idx, genome_meta in enumerate(genomes_list):
genome_id = genome_meta.get('id')
genome_key = genome_meta.get('key')
display_name = genome_meta.get('displayName', genome_key)
genome_dir = os.path.join(meta_dir, f'genome_{genome_id}')
gff3_path = os.path.join(genome_dir, 'input.gff3')
pep_path = os.path.join(genome_dir, 'input.pep')
if not os.path.exists(gff3_path) or not os.path.exists(pep_path):
update_status('failed', 0, f'Input files not found for genome {display_name}')
return
genome_keys[str(genome_id)] = genome_key
genome_keys[f'custom_{genome_id}'] = genome_key
# Check if this genome has already been processed (BED file exists)
bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_key}.bed')
if os.path.exists(bed_file):
update_status('running', int(5 + (idx + 1) * 20 / total_genomes),
f'Genome {display_name} already processed, skipping...')
continue
# Copy input files to temp directory (process_custom_genome.sh deletes its work dir)
temp_dir = os.path.join(CUSTOM_TEMP_DIR, f'{run_key}_{genome_key}')
os.makedirs(temp_dir, exist_ok=True)
temp_gff3 = os.path.join(temp_dir, 'input.gff3')
temp_pep = os.path.join(temp_dir, 'input.pep')
shutil.copy2(gff3_path, temp_gff3)
shutil.copy2(pep_path, temp_pep)
# Use the existing process_custom_genome.py script
# Find ALL db_genomes that this custom genome should be compared against
# based on the selected_pairs
comparison_genomes_for_this = []
for pair in selected_pairs:
if isinstance(pair, dict):
pair_genome1 = pair.get('genome1')
pair_genome2 = pair.get('genome2')
else:
pair_genome1, pair_genome2 = pair
# Check if this custom genome is involved in this pair
if str(pair_genome1) == str(genome_id) or pair_genome1 == f'custom_{genome_id}' or pair_genome1 == genome_key:
# This custom genome is genome1, check if genome2 is a db_genome
if pair_genome2 in db_genomes:
if pair_genome2 not in comparison_genomes_for_this:
comparison_genomes_for_this.append(pair_genome2)
elif str(pair_genome2) == str(genome_id) or pair_genome2 == f'custom_{genome_id}' or pair_genome2 == genome_key:
# This custom genome is genome2, check if genome1 is a db_genome
if pair_genome1 in db_genomes:
if pair_genome1 not in comparison_genomes_for_this:
comparison_genomes_for_this.append(pair_genome1)
# Fallback: if no pairs found, use all db_genomes
if not comparison_genomes_for_this:
comparison_genomes_for_this = db_genomes if db_genomes else ['arabidopsis_thaliana']
# Join all comparison genomes with comma for the script
comparison_genomes_str = ','.join(comparison_genomes_for_this)
# Log the comparisons we're running for this genome
logger.info(f"CustomSynteny: Processing {display_name} ({genome_key}) against {len(comparison_genomes_for_this)} db genomes: {comparison_genomes_str}")
# Update status with comparison count info
update_status('running', int(5 + idx * 20 / total_genomes),
f'Processing genome {idx + 1}/{total_genomes}: {display_name} (running {len(comparison_genomes_for_this)} comparisons)...')
script_path = os.path.join(SCRIPTS_DIR, 'process_custom_genome.py')
# Using PYTHON_BIN for HuggingFace Spaces
cmd = [
PYTHON_BIN, script_path,
'--run-key', genome_key, # Use genome_key as run_key for file naming
'--gff3', temp_gff3,
'--pep', temp_pep,
'--genomes', comparison_genomes_str, # Pass ALL comparison genomes
'--meta-dir', os.path.join(CUSTOM_META_DIR, genome_key), # Use standard custom_meta location
'--display-name', display_name,
'--visibility', 'public'
]
# Add MCscan parameters if provided
if cscore is not None:
cmd.extend(['--cscore', str(cscore)])
if min_anchor is not None:
cmd.extend(['--min-anchor', str(min_anchor)])
if gap_length is not None:
cmd.extend(['--gap-length', str(gap_length)])
# Run the script with timeout
log_file = os.path.join(genome_dir, 'process.log')
with open(log_file, 'w') as log:
try:
result = subprocess.run(cmd, stdout=log, stderr=subprocess.STDOUT, cwd=SCRIPTS_DIR,
timeout=MCSCAN_JOB_TIMEOUT_SECONDS)
except subprocess.TimeoutExpired:
update_status('failed', 0, f'Job timed out for {display_name} after {MCSCAN_JOB_TIMEOUT_SECONDS // 3600} hours.')
return
if result.returncode != 0:
# Read log for error details and extract meaningful error message
with open(log_file, 'r') as log:
log_content = log.read()
# Extract user-friendly error message from log
user_error_msg = extract_user_error_from_log(log_content, display_name)
update_status('failed', 0, user_error_msg,
error_log=log_content[-2000:]) # Keep last 2000 chars for debugging
return
# Store genome key mapping in manifest
manifest['genome_keys'] = genome_keys
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
update_status('running', 30, 'All genomes processed. Running additional pairwise comparisons...')
# Step 2: Run any additional pairwise comparisons that weren't covered
# The process_custom_genome.py script already ran each custom genome vs db_genomes
# Now we need to run custom-to-custom comparisons (and any remaining pairs)
completed = 0
results = {}
# Log the genome_keys mapping for debugging
logger.info(f"CustomSynteny: genome_keys mapping: {genome_keys}")
logger.info(f"CustomSynteny: Processing {len(selected_pairs)} selected pairs")
for pair in selected_pairs:
# Handle different pair formats
if isinstance(pair, dict):
genome_a_id = pair.get('genome1')
genome_b_id = pair.get('genome2')
else:
genome_a_id, genome_b_id = pair
# Look up genome keys - check AVAILABLE_GENOMES first for both
if genome_a_id in AVAILABLE_GENOMES:
genome_a_key = genome_a_id
else:
genome_a_key = genome_keys.get(str(genome_a_id)) or genome_keys.get(genome_a_id)
if genome_b_id in AVAILABLE_GENOMES:
genome_b_key = genome_b_id
else:
genome_b_key = genome_keys.get(str(genome_b_id)) or genome_keys.get(genome_b_id)
logger.info(f"CustomSynteny: Pair {genome_a_id} vs {genome_b_id} -> keys: {genome_a_key} vs {genome_b_key}")
if not genome_a_key or not genome_b_key:
logger.warning(f"CustomSynteny: Skipping pair - genome key not found: a={genome_a_key}, b={genome_b_key}")
results[f'{genome_a_id}_vs_{genome_b_id}'] = {'status': 'skipped', 'error': 'Genome key not found'}
completed += 1
continue
# Check if this comparison was already done by process_custom_genome.py
# The script runs custom_genome vs all specified db_genomes
# For custom-to-custom pairs, we may need to run them
pair_key = f'{genome_a_key}.{genome_b_key}'
last_file = os.path.join(MCSCAN_RESULTS_DIR, 'last_filtered', f'{pair_key}.last.filtered')
i1_file = os.path.join(MCSCAN_RESULTS_DIR, 'i1_blocks', f'{pair_key}.i1.blocks')
# Also check reverse order
pair_key_rev = f'{genome_b_key}.{genome_a_key}'
last_file_rev = os.path.join(MCSCAN_RESULTS_DIR, 'last_filtered', f'{pair_key_rev}.last.filtered')
i1_file_rev = os.path.join(MCSCAN_RESULTS_DIR, 'i1_blocks', f'{pair_key_rev}.i1.blocks')
if os.path.exists(last_file) or os.path.exists(last_file_rev):
logger.info(f"CustomSynteny: Pair {genome_a_key} vs {genome_b_key} already processed, skipping")
results[f'{genome_a_key}_vs_{genome_b_key}'] = {'status': 'completed', 'note': 'Already processed'}
completed += 1
continue
logger.info(f"CustomSynteny: Running pairwise comparison: {genome_a_key} vs {genome_b_key}")
update_status('running', 30 + int(65 * completed / len(selected_pairs)),
f'Running comparison: {genome_a_key} vs {genome_b_key}...')
# Run the pairwise comparison script - using PYTHON_BIN for HuggingFace Spaces
script_path = os.path.join(SCRIPTS_DIR, 'process_custom_synteny_pair.py')
if os.path.exists(script_path):
# Build command with arguments instead of environment variables
cmd = [
PYTHON_BIN, script_path,
'--genome-a', genome_a_key,
'--genome-b', genome_b_key
]
if cscore:
cmd.extend(['--cscore', str(cscore)])
if min_anchor:
cmd.extend(['--min-anchor', str(min_anchor)])
if gap_length:
cmd.extend(['--gap-length', str(gap_length)])
logger.info(f"CustomSynteny: Running command: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=SCRIPTS_DIR
)
if result.returncode == 0:
logger.info(f"CustomSynteny: Comparison {genome_a_key} vs {genome_b_key} completed successfully")
results[f'{genome_a_key}_vs_{genome_b_key}'] = {'status': 'completed'}
else:
logger.error(f"CustomSynteny: Comparison {genome_a_key} vs {genome_b_key} failed: {result.stderr[:500] if result.stderr else 'Unknown error'}")
results[f'{genome_a_key}_vs_{genome_b_key}'] = {
'status': 'failed',
'error': result.stderr[:500] if result.stderr else 'Unknown error'
}
else:
logger.error(f"CustomSynteny: Script not found: {script_path}")
results[f'{genome_a_key}_vs_{genome_b_key}'] = {'status': 'skipped', 'error': 'Script not found'}
completed += 1
# Verify that BED and PEP files were created for all custom genomes
missing_files = []
for genome_meta in genomes_list:
genome_key = genome_meta.get('key')
bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_key}.bed')
pep_file = os.path.join(MCSCAN_RESULTS_DIR, 'pep_files', f'{genome_key}.pep')
if not os.path.exists(bed_file):
missing_files.append(f'{genome_key}.bed')
if not os.path.exists(pep_file):
missing_files.append(f'{genome_key}.pep')
if missing_files:
update_status('failed', 95, f'Missing output files: {missing_files}',
results=results, genome_keys=genome_keys, completed_pairs=completed)
return
# Mark as complete
update_status('completed', 100, f'Analysis complete. {completed} pairs processed.',
results=results, genome_keys=genome_keys, completed_pairs=completed)
# Clean up input files to save space (keep only metadata)
# Results are already copied to bed_files/, i1_blocks/, last_filtered/
for genome_meta in genomes_list:
genome_id = genome_meta.get('id')
genome_dir = os.path.join(meta_dir, f'genome_{genome_id}')
if os.path.exists(genome_dir):
# Remove large input files, keep metadata.json
for filename in ['input.gff3', 'input.pep', 'process.log']:
file_path = os.path.join(genome_dir, filename)
if os.path.exists(file_path):
try:
os.remove(file_path)
except:
pass
except Exception as e:
error_traceback = traceback.format_exc()
logger.error(f"CustomSynteny: Error in background job {run_key}: {e}")
logger.error(f"CustomSynteny: Traceback: {error_traceback}")
update_status('failed', 0, f'Error: {str(e)}', error_traceback=error_traceback[-2000:])
finally:
# Always release the job slot
release_job_slot(run_key)
# Clean up temp directories used during processing
# These are created for each custom genome: CUSTOM_TEMP_DIR/{run_key}_{genome_key}
try:
if os.path.exists(CUSTOM_TEMP_DIR):
for dirname in os.listdir(CUSTOM_TEMP_DIR):
if dirname.startswith(f'{run_key}_'):
temp_dir_path = os.path.join(CUSTOM_TEMP_DIR, dirname)
try:
shutil.rmtree(temp_dir_path)
logger.info(f"CustomSynteny: Cleaned up temp directory: {dirname}")
except Exception as e:
logger.warning(f"CustomSynteny: Failed to clean temp dir {dirname}: {e}")
except Exception as e:
logger.warning(f"CustomSynteny: Error cleaning temp directories: {e}")
# Record analytics based on actual job outcome
if ANALYTICS_AVAILABLE:
try:
final_status = 'failure' # Default to failure
if os.path.exists(status_file):
with open(status_file, 'r') as f:
status_data = json.load(f)
if status_data.get('status') == 'completed':
final_status = 'success'
record_event(
feature_type=FEATURE_CUSTOM_SYNTENY,
query_genome=run_key,
status=final_status,
duration_ms=0, # Duration not tracked for background jobs
extra_data={'pairs_count': len(selected_pairs)}
)
except Exception as e:
logger.warning(f"CustomSynteny Analytics: Failed to record event for {run_key}: {e}")
# Stale job detection timeout (in seconds) - mark job as stale if no update for 30 minutes
CUSTOM_SYNTENY_STALE_TIMEOUT = 30 * 60
@app.route('/api/custom-synteny/status/<run_key>')
def api_custom_synteny_status(run_key):
"""Get status of a custom synteny MCscan job"""
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
if not os.path.exists(meta_dir):
return jsonify({'success': False, 'error': f'Run key not found: {run_key}'})
status_file = os.path.join(meta_dir, 'job_status.json')
if not os.path.exists(status_file):
return jsonify({
'success': True,
'data': {
'status': 'unknown',
'message': 'Job status not available'
}
})
with open(status_file, 'r') as f:
status_data = json.load(f)
# Transform internal error codes to user-friendly messages
if status_data.get('status') == 'failed' and 'message' in status_data:
status_data['message'] = extract_user_error_from_log(status_data['message'], run_key)
# Include manifest data when status is completed (needed for plotting transition)
if status_data.get('status') == 'completed':
manifest_file = os.path.join(meta_dir, 'manifest.json')
if os.path.exists(manifest_file):
with open(manifest_file, 'r') as f:
status_data['manifest'] = json.load(f)
return jsonify({'success': True, 'data': status_data})
@app.route('/api/custom-synteny/lookup/<run_key>')
def api_custom_synteny_lookup(run_key):
"""Look up details about a custom synteny project"""
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
if not os.path.exists(meta_dir):
return jsonify({'success': False, 'error': f'Run key not found: {run_key}'})
manifest_path = os.path.join(meta_dir, 'manifest.json')
status_path = os.path.join(meta_dir, 'job_status.json')
result = {
'run_key': run_key,
'exists': True
}
if os.path.exists(manifest_path):
with open(manifest_path, 'r') as f:
result['manifest'] = json.load(f)
if os.path.exists(status_path):
with open(status_path, 'r') as f:
result['status'] = json.load(f)
return jsonify({'success': True, 'data': result})
@app.route('/api/custom-synteny/projects')
def api_custom_synteny_projects():
"""List all available custom synteny projects"""
projects = []
if os.path.exists(CUSTOM_SYNTENY_META_DIR):
for run_key in os.listdir(CUSTOM_SYNTENY_META_DIR):
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
if not os.path.isdir(meta_dir):
continue
manifest_path = os.path.join(meta_dir, 'manifest.json')
status_path = os.path.join(meta_dir, 'job_status.json')
project_info = {'run_key': run_key}
if os.path.exists(manifest_path):
with open(manifest_path, 'r') as f:
project_info['manifest'] = json.load(f)
if os.path.exists(status_path):
with open(status_path, 'r') as f:
status_data = json.load(f)
project_info['status'] = status_data.get('status', 'unknown')
projects.append(project_info)
# Sort by creation date (newest first)
projects.sort(key=lambda p: p.get('manifest', {}).get('created_at', ''), reverse=True)
return jsonify({'success': True, 'projects': projects})
@app.route('/api/custom-synteny/projects/<run_key>', methods=['DELETE'])
def api_custom_synteny_delete(run_key):
"""Delete a custom synteny project"""
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
if not os.path.exists(meta_dir):
return jsonify({'success': False, 'error': f'Run key not found: {run_key}'})
try:
# Load manifest to get genome keys for cleanup
manifest_path = os.path.join(meta_dir, 'manifest.json')
if os.path.exists(manifest_path):
with open(manifest_path, 'r') as f:
manifest = json.load(f)
# Remove generated BED and PEP files
for idx, key in manifest.get('genome_keys', {}).items():
bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{key}.bed')
pep_file = os.path.join(MCSCAN_RESULTS_DIR, 'pep_files', f'{key}.pep')
if os.path.exists(bed_file):
os.remove(bed_file)
if os.path.exists(pep_file):
os.remove(pep_file)
# Remove the project directory
shutil.rmtree(meta_dir)
return jsonify({'success': True, 'message': f'Project {run_key} deleted successfully'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/custom-synteny/plot', methods=['POST'])
def api_custom_synteny_plot():
"""Generate microsynteny plot for custom synteny project - using same approach as Custom Genome"""
data = request.json
run_key = data.get('run_key')
query_genome = data.get('query_genome')
genes = data.get('genes', [])
colors = data.get('colors', [])
annotations = data.get('annotations', [])
comparisons = data.get('comparisons', [])
layout = data.get('layout')
genome_order = data.get('genome_order', [])
# Advanced tweaking parameters (optional)
padding_config = data.get('padding_config', {}) # {genome: {left: bp, right: bp}}
max_genes_config = data.get('max_genes_config', {}) # {genome: {left: count, right: count}}
display_names = data.get('display_names', {}) # {genome: "Custom Name"}
# Gene labels parameters (optional)
gene_labels = data.get('gene_labels', []) # List of gene IDs to label on the plot
gene_label_size = data.get('gene_label_size', 0) # Font size for labels (0=disabled, 2-8 recommended)
# Debug logging for tweaking parameters
logger.debug(f"api_custom_synteny_plot received:")
logger.debug(f" padding_config: {padding_config}")
logger.debug(f" max_genes_config: {max_genes_config}")
logger.debug(f" display_names: {display_names}")
logger.debug(f" gene_labels: {gene_labels}")
logger.debug(f" gene_label_size: {gene_label_size}")
if not run_key:
return jsonify({'success': False, 'error': 'run_key is required'})
if not query_genome:
return jsonify({'success': False, 'error': 'query_genome is required'})
if not genes:
return jsonify({'success': False, 'error': 'At least one gene is required'})
if not comparisons:
return jsonify({'success': False, 'error': 'At least one comparison genome is required'})
# Validate and sanitize gene IDs (security measure)
validated_genes, error_msg = sanitize_gene_ids(genes)
if error_msg:
return jsonify({'success': False, 'error': error_msg})
genes = validated_genes
# Also validate gene_labels if provided
if gene_labels:
validated_labels, label_error = sanitize_gene_ids(gene_labels)
if label_error:
return jsonify({'success': False, 'error': f'Gene labels: {label_error}'})
gene_labels = validated_labels
# Look up the project
meta_dir = os.path.join(CUSTOM_SYNTENY_META_DIR, run_key)
if not os.path.exists(meta_dir):
return jsonify({'success': False, 'error': f'Run key not found: {run_key}'})
# Get display name and MCscan params for query genome from manifest
query_display_name = query_genome
mcscan_params = None
manifest_file = os.path.join(meta_dir, 'manifest.json')
if os.path.exists(manifest_file):
try:
with open(manifest_file, 'r') as f:
manifest = json.load(f)
# Look for the query genome in uploaded_genomes
if manifest.get('uploaded_genomes'):
for genome_info in manifest['uploaded_genomes']:
if genome_info.get('genome_id') == query_genome:
query_display_name = genome_info.get('display_name', query_genome)
break
# Read MCscan parameters from manifest for Method row in CSV
if manifest.get('mcscan_params'):
mcscan_params = manifest['mcscan_params']
except Exception as e:
logger.warning(f"Could not read manifest: {e}")
try:
# Use the SAME Python script as Custom Genome (plot_user_genes_microsynteny_v2.py)
script_path = os.path.join(SCRIPTS_DIR, 'plot_user_genes_microsynteny_v2.py')
if not os.path.exists(script_path):
return jsonify({'success': False, 'error': f'Plotting script not found: {script_path}'})
# Build args - use query_genome as the query genome, using PYTHON_BIN for HuggingFace Spaces
args = [PYTHON_BIN, script_path]
if colors:
args.extend(['--colors', ','.join(colors)])
args.extend(['--query', query_genome, '--genes'] + genes + ['--comparisons'] + comparisons)
if annotations:
annotations_str = '|||'.join(str(a) for a in annotations)
args.extend(['--annotations', annotations_str])
# Pass MCscan parameters for Method row in CSV (if available from manifest)
if mcscan_params:
args.extend(['--mcscan-params', json.dumps(mcscan_params)])
# Convert layout from list to comma-separated string (CRITICAL FIX)
if layout and isinstance(layout, list) and len(layout) > 0:
layout_str = ','.join(map(str, layout))
args.extend(['--layout', layout_str])
if genome_order and isinstance(genome_order, list):
order_str = ','.join(genome_order)
args.extend(['--genome-order', order_str])
# Add advanced tweaking parameters
# Padding configuration for query genome (asymmetric)
if query_genome in padding_config and padding_config[query_genome]:
query_pad = padding_config[query_genome]
if 'left' in query_pad and query_pad['left'] is not None:
args.extend(['--query-padding-left', str(int(query_pad['left']))])
if 'right' in query_pad and query_pad['right'] is not None:
args.extend(['--query-padding-right', str(int(query_pad['right']))])
# Max genes configuration for query genome (asymmetric)
if query_genome in max_genes_config and max_genes_config[query_genome]:
query_genes = max_genes_config[query_genome]
if 'left' in query_genes and query_genes['left'] is not None:
args.extend(['--query-max-genes-left', str(int(query_genes['left']))])
if 'right' in query_genes and query_genes['right'] is not None:
args.extend(['--query-max-genes-right', str(int(query_genes['right']))])
# Comparison genomes padding (format: genome:left:right|genome2:left:right)
comp_padding_parts = []
for comp in comparisons:
if comp in padding_config and padding_config[comp]:
comp_pad = padding_config[comp]
left_val = int(comp_pad.get('left', 1500000))
right_val = int(comp_pad.get('right', 1500000))
comp_padding_parts.append(f"{comp}:{left_val}:{right_val}")
if comp_padding_parts:
comp_padding_str = '|'.join(comp_padding_parts)
args.extend(['--comp-padding-config', comp_padding_str])
# Comparison genomes max genes (format: genome:left:right|genome2:left:right)
comp_genes_parts = []
for comp in comparisons:
if comp in max_genes_config and max_genes_config[comp]:
comp_genes = max_genes_config[comp]
left_val = int(comp_genes.get('left', 50))
right_val = int(comp_genes.get('right', 50))
comp_genes_parts.append(f"{comp}:{left_val}:{right_val}")
if comp_genes_parts:
comp_genes_str = '|'.join(comp_genes_parts)
args.extend(['--comp-max-genes-config', comp_genes_str])
# Custom display names (format: genome:CustomName|genome2:Name2)
if display_names:
display_parts = []
for genome, name in display_names.items():
if name and name.strip():
# Escape special characters in display name
safe_name = name.replace('|', '_').replace(':', '_')
display_parts.append(f"{genome}:{safe_name}")
if display_parts:
display_str = '|'.join(display_parts)
args.extend(['--display-names', display_str])
# Gene labels (list of gene IDs to display labels for on the plot)
if gene_labels and isinstance(gene_labels, list) and len(gene_labels) > 0:
gene_labels_str = ','.join(str(g) for g in gene_labels if g)
if gene_labels_str:
args.extend(['--genelabels', gene_labels_str])
# Add label size (default to 8 if labels are provided but size not specified)
label_size = int(gene_label_size) if gene_label_size else 8
if label_size > 0:
args.extend(['--genelabelsize', str(label_size)])
# Keep low-confidence coloring option (optional - colors all syntenic matches)
keep_lowconf_color = data.get('keep_lowconf_color', False)
if keep_lowconf_color:
args.extend(['--keep-lowconf-color'])
# Debug: print final command
logger.debug(f"api_custom_synteny_plot final command args: {' '.join(args)}")
# Set environment - files are in standard Mcscan_results folder
env = os.environ.copy()
env['PYTHON'] = PYTHON_BIN
# Mark this as a custom genome for the script to handle appropriately
env['CUSTOM_GENOME_KEY'] = query_genome
# Pass the display name for plot labels
env['CUSTOM_DISPLAY_NAME'] = query_display_name
result = subprocess.run(
args,
capture_output=True,
text=True,
env=env,
cwd=SCRIPTS_DIR
)
if result.returncode == 0:
# Find the output folder
# Priority 1: Check if query_genome has a display name in tweaking config
# Priority 2: Use query_display_name from manifest
folder_display_name = display_names.get(query_genome) if display_names and query_genome in display_names else query_display_name
# Escape special characters to match what was passed to script
folder_display_name = folder_display_name.replace('|', '_').replace(':', '_')
comp_str = '_'.join(comparisons)
pattern_prefix = f"{folder_display_name}_usergenes_{comp_str}_"
logger.debug(f"Looking for custom synteny output folder with prefix: {pattern_prefix}")
output_folder = None
if os.path.exists(OUTPUT_DIR):
import re
folders = []
for f in os.listdir(OUTPUT_DIR):
if f.startswith(pattern_prefix):
remainder = f[len(pattern_prefix):]
if re.match(r'^\d{8}_\d{6}$', remainder):
folders.append(f)
if folders:
folders.sort(reverse=True)
output_folder = os.path.join(OUTPUT_DIR, folders[0])
if output_folder and os.path.exists(output_folder):
files = {}
for fname in os.listdir(output_folder):
if fname == 'microsynteny_plot.png':
files['png'] = fname
elif fname == 'microsynteny_plot.svg':
files['svg'] = fname
elif fname.endswith('.csv'):
files['csv'] = fname
return jsonify({
'success': True,
'message': 'Plot generated successfully',
'output_folder': os.path.basename(output_folder),
'files': files
})
else:
# Log details for debugging (not shown to user)
if app.debug:
logger.debug(f"Custom synteny output folder not found. stdout: {result.stdout}")
logger.debug(f"stderr: {result.stderr}")
return jsonify({
'success': False,
'error': 'Output folder not found after plot generation'
})
else:
error_msg = 'Script failed'
combined = result.stdout + '\n' + result.stderr
if 'No syntenic matches' in combined:
error_msg = 'No syntenic matches found for the specified genes'
elif 'ERROR:' in combined:
for line in combined.split('\n'):
if 'ERROR:' in line:
error_msg = line.split('ERROR:')[-1].strip()
break
# Log details for debugging (not shown to user)
if app.debug:
logger.debug(f"Custom synteny plot script failed. stdout: {result.stdout}")
logger.debug(f"stderr: {result.stderr}")
return jsonify({
'success': False,
'error': error_msg
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/custom-synteny/genes/<run_key>/<genome_key>')
def api_custom_synteny_genes(run_key, genome_key):
"""Get list of genes from a custom synteny genome's BED file"""
# Check if it's a database genome
if genome_key in AVAILABLE_GENOMES:
bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_key}.bed')
else:
bed_file = os.path.join(MCSCAN_RESULTS_DIR, 'bed_files', f'{genome_key}.bed')
if not os.path.exists(bed_file):
return jsonify({'success': False, 'error': f'BED file not found for genome: {genome_key}'})
genes = []
try:
with open(bed_file, 'r') as f:
for line in f:
if line.strip():
parts = line.strip().split('\t')
if len(parts) >= 4:
genes.append({
'chr': parts[0],
'start': int(parts[1]),
'end': int(parts[2]),
'gene_id': parts[3]
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
return jsonify({'success': True, 'genes': genes, 'total': len(genes)})
# ============================================================================
# Run Application
# ============================================================================
if __name__ == '__main__':
# Get debug mode from environment variable (default: False for production safety)
# Set FLASK_DEBUG=1 for local development, FLASK_DEBUG=0 for production
debug_mode = os.environ.get('FLASK_DEBUG', '0') == '1'
logger.info("Starting Plant-mSyn - Plant Microsynteny Web Application...")
logger.info(f"Script directory: {SCRIPT_DIR}")
logger.info(f"Annotations directory: {ANNOTATIONS_DIR}")
logger.info(f"Output directory: {OUTPUT_DIR}")
logger.info(f"Debug mode: {debug_mode}")
if debug_mode:
logger.warning("Debug mode is ENABLED - do not use in production!")
# Start auto-cleanup scheduler
start_cleanup_scheduler()
# Start analytics weekly email scheduler
if ANALYTICS_AVAILABLE:
analytics_email = os.environ.get('ANALYTICS_EMAIL', '')
if analytics_email:
start_analytics_scheduler(analytics_email)
else:
logger.info("ANALYTICS_EMAIL not set - weekly reports disabled")
# Use port 7860 for HuggingFace Spaces, fallback to 5000 for local dev
port = int(os.environ.get('PORT', 7860))
logger.info(f"Open http://127.0.0.1:{port} in your browser")
logger.info("Press Ctrl+C to stop")
app.run(debug=debug_mode, host='0.0.0.0', port=port)