ansar-y0usif's picture
Update app.py
cd8fa20 verified
raw
history blame
46 kB
import gradio as gr
import torch
import pickle
import subprocess
import pandas as pd
import os
import re
import logging
import numpy as np
from predictor import EnhancedGenePredictor
from tensorflow.keras.models import load_model
# Import the new analyzer
from analyzer import PhylogeneticTreeAnalyzer
import tempfile
import shutil
import sys
import uuid
from pathlib import Path
from huggingface_hub import hf_hub_download
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
import stat
import time
# --- Global Variables ---
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MAFFT_PATH = os.path.join(BASE_DIR, "binaries", "mafft", "mafft") # Updated path
IQTREE_PATH = os.path.join(BASE_DIR, "binaries", "iqtree", "bin", "iqtree3")
ALIGNMENT_PATH = os.path.join(BASE_DIR, "f_gene_sequences_aligned.fasta")
TREE_PATH = os.path.join(BASE_DIR, "f_gene_sequences.phy.treefile")
QUERY_OUTPUT_DIR = os.path.join(BASE_DIR, "queries")
os.makedirs(QUERY_OUTPUT_DIR, exist_ok=True)
# --- Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Paths ---
# Model repository and file paths
model_repo = "GGproject10/best_boundary_aware_model"
csv_path = "f cleaned.csv"
# Get HF token from environment (if available)
hf_token = os.getenv("HF_TOKEN")
# --- Load Models ---
boundary_model = None
keras_model = None
kmer_to_index = None
# Try to load boundary model from Hugging Face Hub
try:
boundary_path = hf_hub_download(
repo_id=model_repo,
filename="best_boundary_aware_model.pth",
token=hf_token
)
if os.path.exists(boundary_path):
boundary_model = EnhancedGenePredictor(boundary_path)
logging.info("Boundary model loaded successfully from Hugging Face Hub.")
else:
logging.warning(f"Boundary model file not found after download")
except Exception as e:
logging.error(f"Failed to load boundary model from HF Hub: {e}")
# Try to load Keras model from Hugging Face Hub
try:
keras_path = hf_hub_download(
repo_id=model_repo,
filename="best_model.keras",
token=hf_token
)
kmer_path = hf_hub_download(
repo_id=model_repo,
filename="kmer_to_index.pkl",
token=hf_token
)
if os.path.exists(keras_path) and os.path.exists(kmer_path):
keras_model = load_model(keras_path)
with open(kmer_path, "rb") as f:
kmer_to_index = pickle.load(f)
logging.info("Keras model and k-mer index loaded successfully from Hugging Face Hub.")
else:
logging.warning(f"Keras model or kmer files not found after download")
except Exception as e:
logging.error(f"Failed to load Keras model from HF Hub: {e}")
# --- Initialize New Tree Analyzer ---
analyzer = None
try:
analyzer = PhylogeneticTreeAnalyzer()
# Try multiple potential locations for the CSV file
csv_candidates = [
csv_path,
os.path.join(BASE_DIR, csv_path),
os.path.join(BASE_DIR, "app", csv_path),
os.path.join(os.path.dirname(__file__), csv_path),
"f_cleaned.csv", # Alternative naming
os.path.join(BASE_DIR, "f_cleaned.csv")
]
csv_loaded = False
for csv_candidate in csv_candidates:
if os.path.exists(csv_candidate):
if analyzer.load_data(csv_candidate):
logging.info(f"Tree analyzer data loaded from: {csv_candidate}")
csv_loaded = True
csv_path = csv_candidate # Update path for consistency
break
else:
logging.warning(f"Failed to load data from: {csv_candidate}")
if not csv_loaded:
logging.error("Failed to load CSV data from any candidate location")
analyzer = None
else:
# Try to train AI model (optional)
try:
if analyzer.train_ai_model():
logging.info("AI model training completed successfully")
else:
logging.warning("AI model training failed; proceeding with basic analysis.")
except Exception as e:
logging.warning(f"AI model training failed: {e}")
except Exception as e:
logging.error(f"Failed to initialize tree analyzer: {e}")
analyzer = None
# --- Enhanced Tool Detection with Binary Permission Setup ---
def setup_binary_permissions():
"""Set executable permissions on MAFFT and IQ-TREE binaries"""
binaries = [MAFFT_PATH, IQTREE_PATH]
for binary in binaries:
if os.path.exists(binary):
try:
# Set executable permission
current_mode = os.stat(binary).st_mode
os.chmod(binary, current_mode | stat.S_IEXEC)
logging.info(f"Set executable permission on {binary}")
except Exception as e:
logging.warning(f"Failed to set executable permission on {binary}: {e}")
else:
logging.warning(f"Binary not found: {binary}")
def check_tool_availability():
"""Enhanced check for MAFFT and IQ-TREE availability with improved path validation"""
# First, ensure binaries have executable permissions
setup_binary_permissions()
# Check MAFFT
mafft_available = False
mafft_cmd = None
# Updated MAFFT candidates list based on your new API
mafft_candidates = [
MAFFT_PATH, # Primary path from your new API
os.path.join(BASE_DIR, "binaries", "mafft", "mafft"),
os.path.join(BASE_DIR, "binaries", "mafft", "mafft.bat"), # Windows fallback
'mafft',
'/usr/bin/mafft',
'/usr/local/bin/mafft',
os.path.join(BASE_DIR, "binaries", "mafft", "mafftdir", "bin", "mafft"),
# Add potential conda/miniconda paths
os.path.expanduser("~/anaconda3/bin/mafft"),
os.path.expanduser("~/miniconda3/bin/mafft"),
"/opt/conda/bin/mafft",
"/usr/local/miniconda3/bin/mafft"
]
for candidate in mafft_candidates:
if not candidate:
continue
# First check if file exists or is in PATH
if os.path.exists(candidate) or shutil.which(candidate):
# Now test actual execution
try:
test_cmd = [candidate, "--help"]
result = subprocess.run(
test_cmd,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 or "mafft" in result.stderr.lower():
mafft_available = True
mafft_cmd = candidate
logging.info(f"MAFFT found and tested successfully at: {candidate}")
break
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError) as e:
logging.debug(f"MAFFT test failed for {candidate}: {e}")
continue
# Check IQ-TREE with similar approach
iqtree_available = False
iqtree_cmd = None
# Updated IQ-TREE candidates list
iqtree_candidates = [
IQTREE_PATH, # Primary path from your new API
'iqtree2',
'iqtree',
'iqtree3',
'/usr/bin/iqtree2',
'/usr/local/bin/iqtree2',
'/usr/bin/iqtree',
'/usr/local/bin/iqtree',
'iqtree2.exe', # Windows
'iqtree.exe', # Windows
'iqtree3.exe', # Windows
os.path.join(BASE_DIR, "binaries", "iqtree", "bin", "iqtree2"),
# Add potential conda paths
os.path.expanduser("~/anaconda3/bin/iqtree2"),
os.path.expanduser("~/miniconda3/bin/iqtree2"),
"/opt/conda/bin/iqtree2",
"/usr/local/miniconda3/bin/iqtree2"
]
for candidate in iqtree_candidates:
if not candidate:
continue
if os.path.exists(candidate) or shutil.which(candidate):
try:
test_cmd = [candidate, "--help"]
result = subprocess.run(
test_cmd,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0 or "iqtree" in result.stderr.lower():
iqtree_available = True
iqtree_cmd = candidate
logging.info(f"IQ-TREE found and tested successfully at: {candidate}")
break
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError) as e:
logging.debug(f"IQ-TREE test failed for {candidate}: {e}")
continue
return mafft_available, iqtree_available, mafft_cmd, iqtree_cmd
def install_dependencies_guide():
"""Provide installation guidance for missing dependencies"""
guide = """
🔧 INSTALLATION GUIDE FOR MISSING DEPENDENCIES:
For MAFFT:
- Ubuntu/Debian: sudo apt-get install mafft
- CentOS/RHEL: sudo yum install mafft
- macOS: brew install mafft
- Windows: Download from https://mafft.cbrc.jp/alignment/software/
- Conda: conda install -c bioconda mafft
For IQ-TREE:
- Ubuntu/Debian: sudo apt-get install iqtree
- CentOS/RHEL: sudo yum install iqtree
- macOS: brew install iqtree
- Windows: Download from http://www.iqtree.org/
- Conda: conda install -c bioconda iqtree
Alternative: Use conda/mamba (RECOMMENDED):
- conda install -c bioconda mafft iqtree
Docker option:
- docker run -it --rm -v $(pwd):/data quay.io/biocontainers/mafft:7.490--h779adbc_0
- docker run -it --rm -v $(pwd):/data quay.io/biocontainers/iqtree:2.1.4_beta--hdcc8f71_0
TROUBLESHOOTING:
If tools are installed but not detected, try:
1. Add installation directory to PATH
2. Use absolute paths in the configuration
3. Check permissions on executable files
4. Ensure binaries have executable permissions (chmod +x)
"""
return guide
def phylogenetic_placement(sequence: str, mafft_cmd: str, iqtree_cmd: str):
"""
Improved phylogenetic placement using the new API approach.
This adds the query sequence to a reference alignment and tree.
"""
try:
# Validate sequence
if len(sequence.strip()) < 100:
return False, "Error: Sequence is too short for phylogenetic placement (minimum 100 bp).", None, None
# Generate unique query ID
query_id = f"QUERY_{uuid.uuid4().hex[:8]}"
query_fasta = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}.fa")
aligned_with_query = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_aligned.fa")
output_prefix = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_placed_tree")
# Check if reference files exist
if not os.path.exists(ALIGNMENT_PATH):
return False, f"Reference alignment not found: {ALIGNMENT_PATH}", None, None
if not os.path.exists(TREE_PATH):
return False, f"Reference tree not found: {TREE_PATH}", None, None
# Save query sequence as FASTA (improved error handling)
try:
query_record = SeqRecord(Seq(sequence.upper()), id=query_id, description="")
SeqIO.write([query_record], query_fasta, "fasta")
logging.info(f"Query sequence saved: {query_fasta}")
except Exception as e:
return False, f"Error writing query sequence: {e}", None, None
# Step 1: Add query sequence to reference alignment using MAFFT (improved approach)
logging.info("Adding query sequence to reference alignment...")
try:
with open(aligned_with_query, "w") as output_file:
mafft_result = subprocess.run([
mafft_cmd, "--add", query_fasta, "--reorder", ALIGNMENT_PATH
], stdout=output_file, stderr=subprocess.PIPE, text=True, timeout=600, check=True)
# Verify alignment file was created and is not empty
if not os.path.exists(aligned_with_query) or os.path.getsize(aligned_with_query) == 0:
return False, "MAFFT alignment failed: output file is empty", None, None
logging.info(f"MAFFT alignment completed: {aligned_with_query}")
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else "Unknown MAFFT error"
return False, f"MAFFT alignment failed: {error_msg}", None, None
except subprocess.TimeoutExpired:
return False, "MAFFT alignment timeout (>10 minutes)", None, None
except FileNotFoundError:
return False, f"MAFFT executable not found: {mafft_cmd}", None, None
except Exception as e:
return False, f"MAFFT execution error: {e}", None, None
# Step 2: Place sequence in phylogenetic tree using IQ-TREE (improved approach)
logging.info("Placing sequence in phylogenetic tree...")
try:
iqtree_result = subprocess.run([
iqtree_cmd, "-s", aligned_with_query, "-g", TREE_PATH,
"-m", "GTR+G", "-pre", output_prefix, "-redo"
], capture_output=True, text=True, timeout=1200, check=True)
# Check if treefile was generated
treefile = f"{output_prefix}.treefile"
if not os.path.exists(treefile) or os.path.getsize(treefile) == 0:
return False, "IQ-TREE placement failed: treefile not generated", aligned_with_query, None
logging.info(f"IQ-TREE placement completed: {treefile}")
# Generate success message with details
success_msg = "✅ Phylogenetic placement completed successfully!\n"
success_msg += f"- Query ID: {query_id}\n"
success_msg += f"- Alignment: {os.path.basename(aligned_with_query)}\n"
success_msg += f"- Tree: {os.path.basename(treefile)}\n"
# Try to extract model information from log
log_file = f"{output_prefix}.log"
if os.path.exists(log_file):
try:
with open(log_file, 'r') as f:
log_content = f.read()
if "Log-likelihood" in log_content:
log_lines = [line for line in log_content.split('\n') if "Log-likelihood" in line]
if log_lines:
success_msg += f"- {log_lines[0].strip()}\n"
except Exception as e:
logging.warning(f"Could not read log file: {e}")
return True, success_msg, aligned_with_query, treefile
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else "Unknown IQ-TREE error"
return False, f"IQ-TREE placement failed: {error_msg}", aligned_with_query, None
except subprocess.TimeoutExpired:
return False, "IQ-TREE placement timeout (>20 minutes)", aligned_with_query, None
except FileNotFoundError:
return False, f"IQ-TREE executable not found: {iqtree_cmd}", aligned_with_query, None
except Exception as e:
return False, f"IQ-TREE execution error: {e}", aligned_with_query, None
except Exception as e:
logging.error(f"Phylogenetic placement failed: {e}")
return False, f"Phylogenetic placement failed: {str(e)}", None, None
finally:
# Clean up temporary query file
if 'query_fasta' in locals() and os.path.exists(query_fasta):
try:
os.unlink(query_fasta)
except:
pass
def build_maximum_likelihood_tree(f_gene_sequence):
"""
Build maximum likelihood phylogenetic tree using the improved phylogenetic placement approach.
"""
try:
# Check tool availability with enhanced detection
mafft_available, iqtree_available, mafft_cmd, iqtree_cmd = check_tool_availability()
# Prepare status message
status_msg = "🔍 Checking dependencies...\n"
if not mafft_available:
status_msg += "❌ MAFFT not found or not executable\n"
else:
status_msg += f"✅ MAFFT found and tested: {mafft_cmd}\n"
if not iqtree_available:
status_msg += "❌ IQ-TREE not found or not executable\n"
else:
status_msg += f"✅ IQ-TREE found and tested: {iqtree_cmd}\n"
# Check for reference files
if not os.path.exists(ALIGNMENT_PATH):
status_msg += f"❌ Reference alignment not found: {ALIGNMENT_PATH}\n"
else:
status_msg += f"✅ Reference alignment found\n"
if not os.path.exists(TREE_PATH):
status_msg += f"❌ Reference tree not found: {TREE_PATH}\n"
else:
status_msg += f"✅ Reference tree found\n"
# If any required component is missing, provide installation guide
if not mafft_available or not iqtree_available:
guide = install_dependencies_guide()
return False, f"{status_msg}\n{guide}", None, None
if not os.path.exists(ALIGNMENT_PATH) or not os.path.exists(TREE_PATH):
status_msg += "\n❌ Reference alignment and/or tree files are missing.\n"
status_msg += "Please ensure f_gene_sequences_aligned.fasta and f_gene_sequences.phy.treefile are available."
return False, status_msg, None, None
# Perform phylogenetic placement using improved method
logging.info("Starting phylogenetic placement...")
placement_success, placement_message, aligned_file, tree_file = phylogenetic_placement(
f_gene_sequence, mafft_cmd, iqtree_cmd
)
if placement_success:
final_message = f"{status_msg}\n{placement_message}"
# Copy files to standard locations for compatibility
if aligned_file and os.path.exists(aligned_file):
standard_aligned = "query_with_references_aligned.fasta"
shutil.copy2(aligned_file, standard_aligned)
aligned_file = standard_aligned
if tree_file and os.path.exists(tree_file):
standard_tree = "query_placement_tree.treefile"
shutil.copy2(tree_file, standard_tree)
tree_file = standard_tree
logging.info("Phylogenetic placement completed successfully")
return True, final_message, aligned_file, tree_file
else:
return False, f"{status_msg}\n{placement_message}", aligned_file, tree_file
except Exception as e:
logging.error(f"ML tree construction failed: {e}")
return False, f"ML tree construction failed: {str(e)}", None, None
# --- NEW Tree Analysis Function (Using the new analyzer API) ---
# Replace this part in your analyze_sequence_for_tree function:
def analyze_sequence_for_tree(sequence: str, matching_percentage: float) -> tuple:
"""
Analyze sequence and create phylogenetic tree and detailed report using the new analyzer API
Args:
sequence (str): DNA sequence to analyze
matching_percentage (float): Similarity threshold percentage
Returns:
tuple: (status_message, tree_html_path, report_html_path)
"""
try:
if not analyzer:
return "❌ Error: Tree analyzer not initialized. Please check if the CSV data file is available.", None, None
if not sequence:
return "❌ Error: Please provide a sequence.", None, None
if not (1 <= matching_percentage <= 99):
return "❌ Error: Matching percentage must be between 1 and 99.", None, None
# Validate inputs
sequence = sequence.strip()
if len(sequence) < 10:
return "❌ Error: Invalid or missing sequence. Must be ≥10 nucleotides.", None, None
# Find query sequence
if not analyzer.find_query_sequence(sequence):
return "❌ Error: Sequence not accepted.", None, None
# Find similar sequences
matched_ids, actual_percentage = analyzer.find_similar_sequences(matching_percentage)
if not matched_ids:
return f"❌ Error: No similar sequences found at {matching_percentage}% similarity threshold.", None, None
logging.info(f"Found {len(matched_ids)} similar sequences at {actual_percentage:.2f}% similarity")
# Build tree structure
analyzer.build_tree_structure_with_ml_safe(matched_ids)
# Create interactive tree
fig = analyzer.create_interactive_tree(matched_ids, actual_percentage)
# Save tree to temporary file
temp_dir = tempfile.gettempdir()
query_id = analyzer.query_id or f"query_{int(time.time())}"
tree_html_path = os.path.join(temp_dir, f'phylogenetic_tree_interactive_{query_id}.html')
fig.write_html(tree_html_path)
# Ensure the analyzer has the correct user input threshold for the report
analyzer.matching_percentage = matching_percentage
# Generate detailed report - FIXED: Only pass the two required parameters
report_success = analyzer.generate_detailed_report(matched_ids, actual_percentage)
report_html_path = None
if report_success:
report_filename = f"detailed_report_{query_id.replace('/', '_')}.html"
report_html_path = os.path.abspath(report_filename)
if not os.path.exists(report_html_path):
logging.warning(f"Report file not found at {report_html_path}")
report_html_path = None
else:
logging.warning("Failed to generate detailed report")
success_msg = f"✅ Analysis complete! Found {len(matched_ids)} similar sequences with {actual_percentage:.2f}% average similarity."
return success_msg, tree_html_path, report_html_path
except Exception as e:
error_msg = f"❌ Error during analysis: {str(e)}"
logging.error(error_msg)
import traceback
logging.error(f"Full traceback: {traceback.format_exc()}")
return error_msg, None, None# --- Keras Prediction ---
def predict_with_keras(sequence):
try:
if not keras_model or not kmer_to_index:
return f"Keras model not available. Input sequence: {sequence[:100]}..."
if len(sequence) < 6:
return "Skipped: sequence too short for F gene validation (minimum 6 nucleotides required)."
# Generate k-mers
kmers = [sequence[i:i+6] for i in range(len(sequence)-5)]
indices = [kmer_to_index.get(kmer, 0) for kmer in kmers]
# Prepare input
input_arr = np.array([indices])
prediction = keras_model.predict(input_arr, verbose=0)[0]
# Assume the last value is the F gene probability (adjust index if model outputs differ)
f_gene_prob = prediction[-1] # Take the probability of the F gene class
# Convert to percentage with a buffer (e.g., add 5% to account for minor mismatches)
percentage = min(100, max(0, int(f_gene_prob * 100 + 5))) # Ensure 0-100% range
return f"{percentage}% F gene"
except Exception as e:
logging.error(f"Keras prediction failed: {e}")
return f"Keras prediction failed: {str(e)}"
# --- FASTA Reader ---
def read_fasta_file(file_obj):
try:
if file_obj is None:
return ""
# Handle file object
if hasattr(file_obj, 'name'):
with open(file_obj.name, "r") as f:
content = f.read()
else:
content = file_obj.read().decode("utf-8") if hasattr(file_obj, "read") else str(file_obj)
lines = content.strip().split("\n")
seq_lines = [line.strip() for line in lines if not line.startswith(">")]
return ''.join(seq_lines)
except Exception as e:
logging.error(f"Failed to read FASTA file: {e}")
return ""
# --- Full Pipeline ---
def run_pipeline_from_file(fasta_file_obj, similarity_score, build_ml_tree):
try:
dna_input = read_fasta_file(fasta_file_obj)
if not dna_input:
return "Failed to read FASTA file", "", "", "", "", None, None, None, None, "No input sequence", "No input sequence"
return run_pipeline(dna_input, similarity_score, build_ml_tree)
except Exception as e:
error_msg = f"Pipeline error: {str(e)}"
logging.error(error_msg)
return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg
def run_pipeline(dna_input, similarity_score=95.0, build_ml_tree=False):
try:
# Clean input
dna_input = dna_input.upper().strip()
if not dna_input:
return "Empty input", "", "", "", "", None, None, None, None, "No input provided"
# Sanitize DNA sequence
if not re.match('^[ACTGN]+$', dna_input):
dna_input = ''.join(c if c in 'ACTGN' else 'N' for c in dna_input)
logging.info("DNA sequence sanitized")
# Step 1: Boundary Prediction - Extract F gene sequence
processed_sequence = dna_input
boundary_output = ""
if boundary_model:
try:
result = boundary_model.predict_sequence(dna_input)
predictions = result['predictions']
probs = result['probabilities']['gene']
confidence = result['confidence']
regions = result['gene_regions']
if regions:
processed_sequence = regions[0]["sequence"]
boundary_output = processed_sequence
logging.info(f"F gene extracted: {len(processed_sequence)} bp (confidence: {confidence:.3f})")
else:
boundary_output = f"No F gene regions found in input sequence"
processed_sequence = dna_input
logging.warning("No gene regions found, using full sequence")
logging.info("Boundary model prediction completed")
except Exception as e:
logging.error(f"Boundary model failed: {e}")
boundary_output = f"Boundary model error: {str(e)}"
processed_sequence = dna_input
else:
boundary_output = f"Boundary model not available. Using original input: {len(dna_input)} bp"
processed_sequence = dna_input
# Step 2: Keras Prediction (F gene validation)
keras_output = ""
if processed_sequence and len(processed_sequence) >= 6:
keras_prediction = predict_with_keras(processed_sequence)
keras_output = keras_prediction
else:
keras_output = "Skipped: sequence too short for F gene validation"
# Step 3: Maximum Likelihood Tree (Phylogenetic Placement)
aligned_file = None
phy_file = None
ml_tree_output = ""
if build_ml_tree and processed_sequence and len(processed_sequence) >= 100:
try:
logging.info("Starting phylogenetic placement...")
ml_success, ml_message, ml_aligned, ml_tree = build_maximum_likelihood_tree(processed_sequence)
if ml_success:
ml_tree_output = ml_message
aligned_file = ml_aligned
phy_file = ml_tree
else:
ml_tree_output = ml_message
except Exception as e:
ml_tree_output = f"❌ Phylogenetic placement failed: {str(e)}"
logging.error(f"Phylogenetic placement failed: {e}")
elif build_ml_tree:
ml_tree_output = "❌ F gene sequence too short for phylogenetic placement (minimum 100 bp)"
else:
ml_tree_output = "Phylogenetic placement skipped (not requested)"
# Step 4: Simplified Tree Analysis
tree_html_file = None
report_html_file = None
tree_html_content = "No tree generated"
report_html_content = "No report generated"
simplified_ml_output = ""
if analyzer and processed_sequence and len(processed_sequence) >= 10:
try:
logging.info(f"Starting simplified ML tree analysis with F gene sequence length: {len(processed_sequence)}")
# Updated call to analyze_sequence_for_tree
tree_result, tree_html_path, report_html_path = analyze_sequence_for_tree(processed_sequence, similarity_score)
if tree_html_path and os.path.exists(tree_html_path):
# Copy tree HTML to output directory
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
safe_seq_name = re.sub(r'[^a-zA-Z0-9_-]', '', processed_sequence[:20])
timestamp = str(int(time.time()))
tree_html_filename = f"tree_{safe_seq_name}_{timestamp}.html"
tree_html_final_path = os.path.join(output_dir, tree_html_filename)
shutil.copy2(tree_html_path, tree_html_final_path)
tree_html_file = tree_html_final_path
# Read tree HTML content for display
with open(tree_html_path, 'r', encoding='utf-8') as f:
tree_html_content = f.read()
# Clean up temporary tree file
try:
os.unlink(tree_html_path)
except:
pass
if report_html_path and os.path.exists(report_html_path):
# Copy report HTML to output directory
report_html_filename = f"report_{safe_seq_name}_{timestamp}.html"
report_html_final_path = os.path.join(output_dir, report_html_filename)
shutil.copy2(report_html_path, report_html_final_path)
report_html_file = report_html_final_path
# Read report HTML content for display
with open(report_html_path, 'r', encoding='utf-8') as f:
report_html_content = f.read()
# Clean up temporary report file
try:
os.unlink(report_html_path)
except:
pass
simplified_ml_output = tree_result
if not tree_html_file:
tree_html_content = f"<div style='color: red;'>{tree_result}</div>"
if not report_html_file:
report_html_content = f"<div style='color: red;'>{tree_result}</div>"
logging.info(f"Tree analysis completed successfully: {tree_html_filename}")
except Exception as e:
error_msg = f"❌ Tree analysis failed: {str(e)}"
simplified_ml_output = error_msg
tree_html_content = f"<div style='color: red;'>{error_msg}</div>"
report_html_content = f"<div style='color: red;'>{error_msg}</div>"
logging.error(f"Tree analysis failed: {e}")
else:
if not analyzer:
simplified_ml_output = "❌ Tree analyzer not available (CSV data not loaded)"
elif len(processed_sequence) < 10:
simplified_ml_output = "❌ F gene sequence too short for tree analysis (minimum 10 bp)"
else:
simplified_ml_output = "❌ No processed sequence available for tree analysis"
tree_html_content = f"<div style='color: orange;'>{simplified_ml_output}</div>"
report_html_content = f"<div style='color: orange;'>{simplified_ml_output}</div>"
# Final summary
summary_output = f"""
🧬 ANALYSIS SUMMARY:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📊 INPUT: {len(dna_input)} bp DNA sequence
🎯 F GENE EXTRACTED: {len(processed_sequence)} bp
✅ F GENE VALIDATION: {keras_output}
🌳 PHYLOGENETIC PLACEMENT: {'✅ Completed' if 'successfully' in ml_tree_output else '❌ ' + ('Skipped' if 'skipped' in ml_tree_output else 'Failed')}
🔬 TREE ANALYSIS: {'✅ Completed' if '✅' in simplified_ml_output else '❌ ' + ('Not available' if 'not available' in simplified_ml_output else 'Failed')}
📝 DETAILED REPORT: {'✅ Generated' if report_html_file else '❌ Not generated'}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
return (
boundary_output,
keras_output,
ml_tree_output,
simplified_ml_output,
summary_output,
aligned_file,
phy_file,
tree_html_file,
report_html_file,
tree_html_content,
report_html_content
)
except Exception as e:
error_msg = f"Pipeline error: {str(e)}"
logging.error(error_msg)
import traceback
logging.error(f"Full traceback: {traceback.format_exc()}")
return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg
# --- Gradio Interface ---
def create_interface():
"""Create and configure the Gradio interface"""
custom_css = """
.gradio-container {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.gr-button-primary {
background: linear-gradient(45deg, #1e3a8a, #3b82f6);
border: none;
border-radius: 8px;
font-weight: 600;
}
.gr-button-primary:hover {
background: linear-gradient(45deg, #1e40af, #2563eb);
transform: translateY(-1px);
box-shadow: 0 4px 12px rgba(59, 130, 246, 0.4);
}
.gr-textbox, .gr-textarea {
border-radius: 8px;
border: 2px solid #e5e7eb;
}
.gr-textbox:focus, .gr-textarea:focus {
border-color: #3b82f6;
box-shadow: 0 0 0 3px rgba(59, 130, 246, 0.1);
}
.warning-box {
background: linear-gradient(135deg, #fef3c7, #fbbf24);
border: 1px solid #f59e0b;
border-radius: 8px;
padding: 12px;
margin: 8px 0;
}
.success-box {
background: linear-gradient(135deg, #d1fae5, #10b981);
border: 1px solid #059669;
border-radius: 8px;
padding: 12px;
margin: 8px 0;
}
.error-box {
background: linear-gradient(135deg, #fee2e2, #ef4444);
border: 1px solid #dc2626;
border-radius: 8px;
padding: 12px;
margin: 8px 0;
}
"""
with gr.Blocks(css=custom_css, title="🧬 Advanced Gene Analysis Pipeline", theme=gr.themes.Soft()) as iface:
gr.HTML("""
<div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 15px; margin-bottom: 20px;">
<h1 style="color: white; margin: 0; font-size: 2.5em; font-weight: 700;">🧬 Advanced Gene Analysis Pipeline</h1>
<p style="color: rgba(255,255,255,0.9); margin: 10px 0 0 0; font-size: 1.2em;">F Gene Boundary Detection • Validation • Phylogenetic Analysis</p>
</div>
""")
with gr.Accordion("📋 Instructions & Information", open=False):
gr.HTML("""
<div style="background: #f8fafc; padding: 20px; border-radius: 10px; border-left: 4px solid #3b82f6;">
<h3 style="color: #1e40af; margin-top: 0;">🔬 Pipeline Overview</h3>
<ol style="line-height: 1.6;">
<li><strong>F Gene Extraction:</strong> Uses boundary-aware model to identify and extract F gene regions</li>
<li><strong>Gene Validation:</strong> Validates extracted sequence as F gene using deep learning</li>
<li><strong>Phylogenetic Placement:</strong> Places sequence in reference phylogenetic tree (MAFFT + IQ-TREE)</li>
<li><strong>Interactive Tree Analysis:</strong> Creates interactive phylogenetic tree with similar sequences</li>
<li><strong>Detailed Report:</strong> Provides comprehensive analysis details</li>
</ol>
<h3 style="color: #1e40af;">📁 Input Requirements</h3>
<ul style="line-height: 1.6;">
<li><strong>DNA Sequence:</strong> Minimum 100 bp for phylogenetic analysis</li>
<li><strong>FASTA Format:</strong> Supported for file uploads</li>
<li><strong>Similarity Score:</strong> 70-99% (default: 95%)</li>
</ul>
<h3 style="color: #1e40af;">⚙️ Dependencies</h3>
<p style="background: #fef3c7; padding: 10px; border-radius: 5px; border-left: 3px solid #f59e0b;">
<strong>Required:</strong> MAFFT and IQ-TREE must be installed for phylogenetic analysis.<br>
<strong>Installation:</strong> <code>conda install -c bioconda mafft iqtree</code>
</p>
</div>
""")
with gr.Row():
with gr.Column(scale=2):
gr.HTML("<h3 style='color: #1e40af; margin-bottom: 10px;'>📝 Sequence Input</h3>")
with gr.Tabs():
with gr.TabItem("✍️ Text Input"):
dna_input = gr.Textbox(
label="DNA Sequence",
placeholder="Enter your DNA sequence here (A, T, C, G, N)...",
lines=6,
value=""
)
with gr.TabItem("📁 File Upload"):
fasta_file = gr.File(
label="Upload FASTA File",
file_types=[".fasta", ".fa", ".fas", ".txt"],
type="filepath"
)
with gr.Column(scale=1):
gr.HTML("<h3 style='color: #1e40af; margin-bottom: 10px;'>⚙️ Analysis Settings</h3>")
similarity_score = gr.Slider(
minimum=30.0,
maximum=99.0,
value=95.0,
step=1.0,
label="Similarity Threshold (%)"
)
build_ml_tree = gr.Checkbox(
label="🌳 Enable Phylogenetic Placement",
value=False
)
with gr.Row():
analyze_text_btn = gr.Button(
"🚀 Analyze Text Input",
variant="primary",
size="lg"
)
analyze_file_btn = gr.Button(
"📁 Analyze File",
variant="secondary",
size="lg"
)
gr.HTML("<hr style='margin: 30px 0; border: none; height: 2px; background: linear-gradient(to right, #3b82f6, #8b5cf6);'>")
gr.HTML("<h2 style='color: #1e40af; text-align: center; margin-bottom: 20px;'>📊 Analysis Results</h2>")
with gr.Tabs():
with gr.TabItem("🎯 F Gene Extraction"):
f_gene_output = gr.Textbox(
label="Extracted F Gene Sequence",
lines=8
)
with gr.TabItem("✅ Gene Validation"):
keras_output = gr.Textbox(
label="F Gene Validation Result",
lines=3
)
with gr.TabItem("🌳 Phylogenetic Placement"):
ml_tree_output = gr.Textbox(
label="Phylogenetic Placement Results",
lines=10
)
with gr.TabItem("🔬 Interactive Tree"):
tree_analysis_output = gr.Textbox(
label="Tree Analysis Status",
lines=5
)
tree_html_display = gr.HTML(
label="Interactive Phylogenetic Tree",
value="<div style='text-align: center; color: #6b7280; padding: 40px;'>No tree generated yet. Run analysis to create interactive tree.</div>"
)
with gr.TabItem("📝 Detailed Report"):
report_html_display = gr.HTML(
label="Detailed Analysis Report",
value="<div style='text-align: center; color: #6b7280; padding: 40px;'>No report generated yet. Run analysis to create detailed report.</div>"
)
with gr.TabItem("📋 Summary"):
summary_output = gr.Textbox(
label="Analysis Summary",
lines=12
)
with gr.Accordion("💾 Download Results", open=False):
with gr.Row():
alignment_file = gr.File(
label="📄 Download Alignment",
visible=True
)
tree_file = gr.File(
label="🌳 Download Tree",
visible=True
)
html_tree_file = gr.File(
label="🌐 Download Interactive Tree (HTML)",
visible=True
)
report_file = gr.File(
label="📝 Download Detailed Report (HTML)",
visible=True
)
gr.HTML("""
<div style="text-align: center; padding: 20px; margin-top: 30px; border-top: 2px solid #e5e7eb; color: #6b7280;">
<p style="margin: 0;">🧬 Advanced Gene Analysis Pipeline | Powered by Deep Learning & Phylogenetics</p>
<p style="margin: 5px 0 0 0; font-size: 0.9em;">Built with Gradio • MAFFT • IQ-TREE • TensorFlow</p>
</div>
""")
analyze_text_btn.click(
fn=run_pipeline,
inputs=[dna_input, similarity_score, build_ml_tree],
outputs=[
f_gene_output,
keras_output,
ml_tree_output,
tree_analysis_output,
summary_output,
alignment_file,
tree_file,
html_tree_file,
report_file,
tree_html_display,
report_html_display
]
)
analyze_file_btn.click(
fn=run_pipeline_from_file,
inputs=[fasta_file, similarity_score, build_ml_tree],
outputs=[
f_gene_output,
keras_output,
ml_tree_output,
tree_analysis_output,
summary_output,
alignment_file,
tree_file,
html_tree_file,
report_file,
tree_html_display,
report_html_display
]
)
return iface
# --- Main Execution ---
if __name__ == "__main__":
try:
# Print startup information
print("🧬 Advanced Gene Analysis Pipeline")
print("=" * 50)
print(f"Base Directory: {BASE_DIR}")
print(f"Boundary Model: {'✅ Loaded' if boundary_model else '❌ Not Available'}")
print(f"Keras Model: {'✅ Loaded' if keras_model else '❌ Not Available'}")
print(f"Tree Analyzer: {'✅ Loaded' if analyzer else '❌ Not Available'}")
# Check tool availability
mafft_available, iqtree_available, mafft_cmd, iqtree_cmd = check_tool_availability()
print(f"MAFFT: {'✅ Available' if mafft_available else '❌ Not Found'}")
print(f"IQ-TREE: {'✅ Available' if iqtree_available else '❌ Not Found'}")
if not mafft_available or not iqtree_available:
print("\n⚠️ Warning: Some phylogenetic tools are missing!")
print("Install with: conda install -c bioconda mafft iqtree")
print("\n🚀 Starting Gradio interface...")
# Create and launch interface
iface = create_interface()
iface.queue().launch(
share=False, # Set to True if you want to create a public link
server_name="0.0.0.0", # Allow connections from any IP
server_port=7860, # Default Gradio port
show_error=True # Show errors in the interface
)
except Exception as e:
logging.error(f"Failed to start application: {e}")
import traceback
print(f"Error: {e}")
print(f"Traceback: {traceback.format_exc()}")
sys.exit(1)