re-type's picture
Update app.py
3c94bd8 verified
raw
history blame
38.4 kB
import os
# Disable GPU to avoid CUDA errors
os.environ["CUDA_VISIBLE_DEVICES"] = ""
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" # Prevent TensorFlow memory issues
# Suppress TensorFlow warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # More aggressive suppression
import gradio as gr
import torch
import pickle
import subprocess
import pandas as pd
import re
import logging
import numpy as np
from predictor import EnhancedGenePredictor
from tensorflow.keras.models import load_model
from analyzer import PhylogeneticTreeAnalyzer
import tempfile
import shutil
import sys
import uuid
from pathlib import Path
from huggingface_hub import hf_hub_download
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
import stat
import time
import asyncio
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import HTMLResponse, FileResponse
from pydantic import BaseModel
from typing import Optional
import uvicorn
# --- Logging Setup ---
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_handler = logging.StreamHandler()
log_handler.setFormatter(log_formatter)
try:
file_handler = logging.FileHandler('/tmp/app.log')
file_handler.setFormatter(log_formatter)
logging.basicConfig(level=logging.INFO, handlers=[log_handler, file_handler])
except Exception as e:
logging.basicConfig(level=logging.INFO, handlers=[log_handler])
logging.warning(f"Failed to set up file logging: {e}")
logger = logging.getLogger(__name__)
logger.info(f"Gradio version: {gr.__version__}")
# Set event loop policy for compatibility with Gradio Spaces
try:
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
except Exception as e:
logger.warning(f"Failed to set event loop policy: {e}")
# --- Global Variables ---
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MAFFT_PATH = os.path.join(BASE_DIR, "binaries", "mafft", "mafft")
IQTREE_PATH = os.path.join(BASE_DIR, "binaries", "iqtree", "bin", "iqtree3")
ALIGNMENT_PATH = os.path.join(BASE_DIR, "f_gene_sequences_aligned.fasta")
TREE_PATH = os.path.join(BASE_DIR, "f_gene_sequences.phy.treefile")
QUERY_OUTPUT_DIR = os.path.join(BASE_DIR, "queries")
os.makedirs(QUERY_OUTPUT_DIR, exist_ok=True)
# Model repository and file paths
MODEL_REPO = "GGproject10/best_boundary_aware_model"
CSV_PATH = "f cleaned.csv"
# Initialize models as None
boundary_model = None
keras_model = None
kmer_to_index = None
analyzer = None
# --- Model Loading ---
def load_models_safely():
global boundary_model, keras_model, kmer_to_index, analyzer
logger.info("🔍 Loading models...")
try:
boundary_path = hf_hub_download(
repo_id=MODEL_REPO,
filename="best_boundary_aware_model.pth",
token=None
)
if os.path.exists(boundary_path):
boundary_model = EnhancedGenePredictor(boundary_path)
logger.info("✅ Boundary model loaded successfully.")
else:
logger.error(f"❌ Boundary model file not found after download.")
except Exception as e:
logger.error(f"❌ Failed to load boundary model: {e}")
boundary_model = None
try:
keras_path = hf_hub_download(
repo_id=MODEL_REPO,
filename="best_model.keras",
token=None
)
kmer_path = hf_hub_download(
repo_id=MODEL_REPO,
filename="kmer_to_index.pkl",
token=None
)
if os.path.exists(keras_path) and os.path.exists(kmer_path):
keras_model = load_model(keras_path)
with open(kmer_path, "rb") as f:
kmer_to_index = pickle.load(f)
logger.info("✅ Keras model and k-mer index loaded successfully.")
else:
logger.error(f"❌ Keras model or k-mer files not found.")
except Exception as e:
logger.error(f"❌ Failed to load Keras model: {e}")
keras_model = None
kmer_to_index = None
try:
logger.info("🌳 Initializing tree analyzer...")
analyzer = PhylogeneticTreeAnalyzer()
csv_candidates = [
CSV_PATH,
os.path.join(BASE_DIR, CSV_PATH),
os.path.join(BASE_DIR, "app", CSV_PATH),
os.path.join(os.path.dirname(__file__), CSV_PATH),
"f_cleaned.csv",
os.path.join(BASE_DIR, "f_cleaned.csv")
]
csv_loaded = False
for csv_candidate in csv_candidates:
if os.path.exists(csv_candidate):
logger.info(f"📊 Trying CSV: {csv_candidate}")
try:
if analyzer.load_data(csv_candidate):
logger.info(f"✅ CSV loaded from: {csv_candidate}")
csv_loaded = True
break
except Exception as e:
logger.warning(f"CSV load failed for {csv_candidate}: {e}")
continue
if not csv_loaded:
logger.error("❌ Failed to load CSV data from any candidate location.")
analyzer = None
else:
try:
if analyzer.train_ai_model():
logger.info("✅ AI model training completed successfully")
else:
logger.warning("⚠️ AI model training failed; proceeding with basic analysis.")
except Exception as e:
logger.warning(f"⚠️ AI model training failed: {e}")
except Exception as e:
logger.error(f"❌ Tree analyzer initialization failed: {e}")
analyzer = None
# Load models at startup
load_models_safely()
# --- Tool Detection ---
def setup_binary_permissions():
for binary in [MAFFT_PATH, IQTREE_PATH]:
if os.path.exists(binary):
try:
os.chmod(binary, os.stat(binary).st_mode | stat.S_IEXEC)
logger.info(f"Set executable permission on {binary}")
except Exception as e:
logger.warning(f"Failed to set permission on {binary}: {e}")
def check_tool_availability():
setup_binary_permissions()
mafft_available = False
mafft_cmd = None
mafft_candidates = ['mafft', '/usr/bin/mafft', '/usr/local/bin/mafft', MAFFT_PATH]
for candidate in mafft_candidates:
if shutil.which(candidate) or os.path.exists(candidate):
try:
result = subprocess.run(
[candidate, "--help"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0 or "mafft" in result.stderr.lower():
mafft_available = True
mafft_cmd = candidate
logger.info(f"✅ MAFFT found at: {candidate}")
break
except Exception as e:
logger.debug(f"MAFFT test failed for {candidate}: {e}")
iqtree_available = False
iqtree_cmd = None
iqtree_candidates = ['iqtree', 'iqtree2', 'iqtree3', '/usr/bin/iqtree', '/usr/local/bin/iqtree', IQTREE_PATH]
for candidate in iqtree_candidates:
if shutil.which(candidate) or os.path.exists(candidate):
try:
result = subprocess.run(
[candidate, "--help"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0 or "iqtree" in result.stderr.lower():
iqtree_available = True
iqtree_cmd = candidate
logger.info(f"✅ IQ-TREE found at: {candidate}")
break
except Exception as e:
logger.debug(f"IQ-TREE test failed for {candidate}: {e}")
return mafft_available, iqtree_available, mafft_cmd, iqtree_cmd
# --- Pipeline Functions ---
def cleanup_file(file_path: str) -> None:
"""Utility function to safely delete a file and log errors."""
if file_path and os.path.exists(file_path):
try:
os.unlink(file_path)
logger.debug(f"Cleaned up {file_path}")
except Exception as cleanup_error:
logger.warning(f"Failed to clean up {file_path}: {cleanup_error}")
def phylogenetic_placement(sequence: str, mafft_cmd: str, iqtree_cmd: str):
query_fasta = None
try:
if len(sequence.strip()) < 100:
return False, "Sequence too short (<100 bp).", None, None
query_id = f"QUERY_{uuid.uuid4().hex[:8]}"
query_fasta = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}.fa")
aligned_with_query = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_aligned.fa")
output_prefix = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_placed_tree")
if not os.path.exists(ALIGNMENT_PATH) or not os.path.exists(TREE_PATH):
cleanup_file(query_fasta)
return False, "Reference alignment or tree not found.", None, None
query_record = SeqRecord(Seq(sequence.upper()), id=query_id, description="")
SeqIO.write([query_record], query_fasta, "fasta")
with open(aligned_with_query, "w") as output_file:
result = subprocess.run(
[mafft_cmd, "--add", query_fasta, "--reorder", ALIGNMENT_PATH],
stdout=output_file,
stderr=subprocess.PIPE,
text=True,
timeout=600,
check=True
)
if not os.path.exists(aligned_with_query) or os.path.getsize(aligned_with_query) == 0:
cleanup_file(query_fasta)
return False, "MAFFT alignment failed.", None, None
result = subprocess.run(
[iqtree_cmd, "-s", aligned_with_query, "-g", TREE_PATH, "-m", "GTR+G", "-pre", output_prefix, "-redo"],
capture_output=True,
text=True,
timeout=1200,
check=True
)
treefile = f"{output_prefix}.treefile"
if not os.path.exists(treefile):
cleanup_file(query_fasta)
return False, "IQ-TREE placement failed.", aligned_with_query, None
success_msg = f"Placement completed!\nQuery ID: {query_id}\nAlignment: {os.path.basename(aligned_with_query)}\nTree: {os.path.basename(treefile)}"
cleanup_file(query_fasta)
return True, success_msg, aligned_with_query, treefile
except Exception as main_error:
logger.error(f"Phylogenetic placement failed: {main_error}", exc_info=True)
cleanup_file(query_fasta)
return False, f"Error: {str(main_error)}", None, None
def analyze_sequence_for_tree(sequence: str, matching_percentage: float):
try:
logger.debug("Starting tree analysis...")
if not analyzer:
return "❌ Tree analyzer not initialized.", None, None
if not sequence or len(sequence.strip()) < 10:
return "❌ Invalid sequence.", None, None
if not (1 <= matching_percentage <= 99):
return "❌ Matching percentage must be 1-99.", None, None
logger.debug("Finding query sequence...")
if not analyzer.find_query_sequence(sequence):
return "❌ Sequence not accepted.", None, None
logger.debug("Finding similar sequences...")
matched_ids, actual_percentage = analyzer.find_similar_sequences(matching_percentage)
if not matched_ids:
return f"❌ No similar sequences at {matching_percentage}% threshold.", None, None
logger.debug("Building tree structure...")
analyzer.build_tree_structure_with_ml_safe(matched_ids)
logger.debug("Creating interactive tree...")
fig = analyzer.create_interactive_tree(matched_ids, actual_percentage)
query_id = analyzer.query_id or f"query_{int(time.time())}"
tree_html_path = os.path.join("/tmp", f'phylogenetic_tree_{query_id}.html')
logger.debug(f"Saving tree to {tree_html_path}")
fig.write_html(tree_html_path)
analyzer.matching_percentage = matching_percentage
logger.debug("Generating detailed report...")
report_success = analyzer.generate_detailed_report(matched_ids, actual_percentage)
report_html_path = os.path.join("/tmp", f'detailed_report_{query_id}.html') if report_success else None
logger.debug(f"Tree analysis completed: {len(matched_ids)} matches")
return f"✅ Found {len(matched_ids)} sequences at {actual_percentage:.2f}% similarity.", tree_html_path, report_html_path
except Exception as e:
logger.error(f"Tree analysis failed: {e}", exc_info=True)
return f"❌ Error: {str(e)}", None, None
def predict_with_keras(sequence):
try:
if not keras_model or not kmer_to_index:
return "❌ Keras model not available."
if len(sequence) < 6:
return "❌ Sequence too short (<6 bp)."
kmers = [sequence[i:i+6] for i in range(len(sequence)-5)]
indices = [kmer_to_index.get(kmer, 0) for kmer in kmers]
input_arr = np.array([indices])
prediction = keras_model.predict(input_arr, verbose=0)[0]
f_gene_prob = prediction[-1]
percentage = min(100, max(0, int(f_gene_prob * 100 + 5)))
return f"✅ {percentage}% F gene confidence"
except Exception as e:
logger.error(f"Keras prediction failed: {e}", exc_info=True)
return f"❌ Error: {str(e)}"
def read_fasta_file(file_obj):
try:
if file_obj is None:
return ""
if isinstance(file_obj, str):
with open(file_obj, "r") as f:
content = f.read()
else:
content = file_obj.read().decode("utf-8")
lines = content.strip().split("\n")
seq_lines = [line.strip() for line in lines if not line.startswith(">")]
return ''.join(seq_lines)
except Exception as e:
logger.error(f"Failed to read FASTA file: {e}", exc_info=True)
return ""
def run_pipeline(dna_input, similarity_score=95.0, build_ml_tree=False):
try:
dna_input = dna_input.upper().strip()
if not dna_input:
return "❌ Empty input", "", "", "", "", None, None, None, None, "No input", "No input", None, None
if not re.match('^[ACTGN]+$', dna_input):
dna_input = ''.join(c if c in 'ACTGN' else 'N' for c in dna_input)
processed_sequence = dna_input
boundary_output = ""
if boundary_model:
try:
result = boundary_model.predict_sequence(dna_input)
regions = result['gene_regions']
if regions:
processed_sequence = regions[0]["sequence"]
boundary_output = f"✅ F gene region found: {len(processed_sequence)} bp"
else:
boundary_output = "⚠️ No F gene regions found."
processed_sequence = dna_input
except Exception as e:
boundary_output = f"❌ Boundary prediction error: {str(e)}"
processed_sequence = dna_input
else:
boundary_output = f"⚠️ Boundary model not available. Using full input: {len(dna_input)} bp"
keras_output = predict_with_keras(processed_sequence) if processed_sequence and len(processed_sequence) >= 6 else "❌ Sequence too short."
aligned_file = None
phy_file = None
ml_tree_output = ""
if build_ml_tree and processed_sequence and len(processed_sequence) >= 100:
try:
mafft_available, iqtree_available, mafft_cmd, iqtree_cmd = check_tool_availability()
if mafft_available and iqtree_available:
ml_success, ml_message, ml_aligned, ml_tree = phylogenetic_placement(processed_sequence, mafft_cmd, iqtree_cmd)
ml_tree_output = ml_message
aligned_file = ml_aligned
phy_file = ml_tree
else:
ml_tree_output = "❌ MAFFT or IQ-TREE not available"
except Exception as e:
ml_tree_output = f"❌ ML tree error: {str(e)}"
elif build_ml_tree:
ml_tree_output = "❌ Sequence too short for placement (<100 bp)."
else:
ml_tree_output = "⚠️ Phylogenetic placement skipped."
tree_html_content = "No tree generated."
report_html_content = "No report generated."
tree_html_path = None
report_html_path = None
simplified_ml_output = ""
if analyzer and processed_sequence and len(processed_sequence) >= 10:
try:
tree_result, tree_html_path, report_html_path = analyze_sequence_for_tree(processed_sequence, similarity_score)
simplified_ml_output = tree_result
if tree_html_path and os.path.exists(tree_html_path):
with open(tree_html_path, 'r', encoding='utf-8') as f:
tree_html_content = f.read()
else:
tree_html_content = f"<div style='color: red;'>{tree_result}</div>"
if report_html_path and os.path.exists(report_html_path):
with open(report_html_path, 'r', encoding='utf-8') as f:
report_html_content = f.read()
else:
report_html_content = f"<div style='color: red;'>{tree_result}</div>"
except Exception as e:
simplified_ml_output = f"❌ Tree analysis error: {str(e)}"
tree_html_content = f"<div style='color: red;'>{simplified_ml_output}</div>"
report_html_content = f"<div style='color: red;'>{simplified_ml_output}</div>"
else:
simplified_ml_output = "❌ Tree analyzer not available." if not analyzer else "❌ Sequence too short (<10 bp)."
tree_html_content = f"<div style='color: orange;'>{simplified_ml_output}</div>"
report_html_content = f"<div style='color: orange;'>{simplified_ml_output}</div>"
summary_output = f"""
📊 ANALYSIS SUMMARY:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Input: {len(dna_input)} bp
F Gene: {len(processed_sequence)} bp
Validation: {keras_output.split(':')[-1].strip() if ':' in keras_output else keras_output}
Placement: {'✅ OK' if '✅' in ml_tree_output else '⚠️ Skipped' if 'skipped' in ml_tree_output else '❌ Failed'}
Tree Analysis: {'✅ OK' if 'Found' in simplified_ml_output else '❌ Failed'}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
return (
boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output,
aligned_file, phy_file, None, None, tree_html_content, report_html_content,
tree_html_path, report_html_path
)
except Exception as e:
logger.error(f"Pipeline error: {e}", exc_info=True)
error_msg = f"❌ Pipeline Error: {str(e)}"
return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg, None, None
async def run_pipeline_from_file(fasta_file_obj, similarity_score, build_ml_tree):
temp_file_path = None
try:
if fasta_file_obj is None:
return "❌ No file provided", "", "", "", "", None, None, None, None, "No input", "No input", None, None
with tempfile.NamedTemporaryFile(delete=False, suffix=".fasta", dir="/tmp") as temp_file:
if isinstance(fasta_file_obj, UploadFile):
content = await fasta_file_obj.read()
temp_file.write(content)
else:
with open(fasta_file_obj, 'rb') as f:
content = f.read()
temp_file.write(content)
temp_file_path = temp_file.name
dna_input = read_fasta_file(temp_file_path)
if not dna_input:
cleanup_file(temp_file_path)
return "❌ Failed to read FASTA file", "", "", "", "", None, None, None, None, "No input", "No input", None, None
result = run_pipeline(dna_input, similarity_score, build_ml_tree)
cleanup_file(temp_file_path)
return result
except Exception as main_error:
logger.error(f"Pipeline from file error: {main_error}", exc_info=True)
cleanup_file(temp_file_path)
error_msg = f"❌ Error: {str(main_error)}"
return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg, None, None
class AnalysisRequest(BaseModel):
sequence: str
similarity_score: float = 95.0
build_ml_tree: bool = False
class AnalysisResponse(BaseModel):
boundary_output: str
keras_output: str
ml_tree_output: str
tree_analysis_output: str
summary_output: str
success: bool
error_message: Optional[str] = None
tree_html_path: Optional[str] = None
report_html_path: Optional[str] = None
# --- FastAPI App Setup ---
app = FastAPI(title="🧬 Gene Analysis Pipeline", version="1.0.0")
@app.get("/")
async def root():
return {
"message": "🧬 Gene Analysis Pipeline API",
"status": "running",
"endpoints": {
"docs": "/docs",
"health": "/health",
"gradio": "/gradio",
"analyze": "/analyze",
"analyze_file": "/analyze-file",
"download": "/download/{file_type}/{query_id}"
}
}
@app.get("/health")
async def health_check():
try:
mafft_available, iqtree_available, _, _ = check_tool_availability()
return {
"status": "healthy",
"components": {
"boundary_model": boundary_model is not None,
"keras_model": keras_model is not None,
"tree_analyzer": analyzer is not None,
"mafft_available": mafft_available,
"iqtree_available": iqtree_available
},
"paths": {
"base_dir": BASE_DIR,
"query_output_dir": QUERY_OUTPUT_DIR
}
}
except Exception as e:
logger.error(f"Health check error: {e}", exc_info=True)
return {"status": "unhealthy", "error": str(e)}
@app.post("/analyze", response_model=AnalysisResponse)
async def analyze_sequence(request: AnalysisRequest):
try:
result = run_pipeline(request.sequence, request.similarity_score, request.build_ml_tree)
return AnalysisResponse(
boundary_output=result[0] or "",
keras_output=result[1] or "",
ml_tree_output=result[2] or "",
tree_analysis_output=result[3] or "",
summary_output=result[4] or "",
tree_html_path=result[11],
report_html_path=result[12],
success=True
)
except Exception as e:
logger.error(f"Analyze error: {e}", exc_info=True)
return AnalysisResponse(
boundary_output="", keras_output="", ml_tree_output="",
tree_analysis_output="", summary_output="",
tree_html_path=None, report_html_path=None,
success=False, error_message=str(e)
)
@app.post("/analyze-file", response_model=AnalysisResponse)
async def analyze_file(
file: UploadFile = File(...),
similarity_score: float = Form(95.0),
build_ml_tree: bool = Form(False)
):
temp_file_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".fasta", dir="/tmp") as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
result = await run_pipeline_from_file(temp_file_path, similarity_score, build_ml_tree)
cleanup_file(temp_file_path)
return AnalysisResponse(
boundary_output=result[0] or "",
keras_output=result[1] or "",
ml_tree_output=result[2] or "",
tree_analysis_output=result[3] or "",
summary_output=result[4] or "",
tree_html_path=result[11],
report_html_path=result[12],
success=True
)
except Exception as main_error:
logger.error(f"Analyze-file error: {main_error}", exc_info=True)
cleanup_file(temp_file_path)
return AnalysisResponse(
boundary_output="", keras_output="", ml_tree_output="",
tree_analysis_output="", summary_output="",
tree_html_path=None, report_html_path=None,
success=False, error_message=str(main_error)
)
@app.get("/download/{file_type}/{query_id}")
async def download_file(file_type: str, query_id: str):
try:
if file_type not in ["tree", "report"]:
raise HTTPException(status_code=400, detail="Invalid file type. Use 'tree' or 'report'.")
file_name = f"phylogenetic_tree_{query_id}.html" if file_type == "tree" else f"detailed_report_{query_id}.html"
file_path = os.path.join("/tmp", file_name)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found.")
return FileResponse(file_path, filename=file_name, media_type="text/html")
except Exception as e:
logger.error(f"Download error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error serving file: {str(e)}")
# --- Gradio Interface ---
def create_gradio_interface():
try:
with gr.Blocks(
title="🧬 Gene Analysis Pipeline",
theme=gr.themes.Soft(),
css="""
.gradio-container { max-width: 1200px !important; }
.status-box { padding: 10px; border-radius: 5px; margin: 5px 0; }
.success { background-color: #d4edda; border: 1px solid #c3e6cb; color: #155724; }
.warning { background-color: #fff3cd; border: 1px solid #ffeaa7; color: #856404; }
.error { background-color: #f8d7da; border: 1px solid #f5c6cb; color: #721c24; }
"""
) as iface:
gr.Markdown("# 🧬 Gene Analysis Pipeline")
with gr.Row():
with gr.Column():
status_display = gr.HTML(value=f"""
<div class="status-box">
<h3>🔧 System Status</h3>
<p>🤖 Boundary Model: {'✅ Loaded' if boundary_model else '❌ Missing'}</p>
<p>🧠 Keras Model: {'✅ Loaded' if keras_model else '❌ Missing'}</p>
<p>🌳 Tree Analyzer: {'✅ Loaded' if analyzer else '❌ Missing'}</p>
<p>🧬 MAFFT: {'✅ Available' if check_tool_availability()[0] else '❌ Missing'}</p>
<p>🌲 IQ-TREE: {'✅ Available' if check_tool_availability()[1] else '❌ Missing'}</p>
</div>
""")
with gr.Tabs():
with gr.TabItem("📝 Text Input"):
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("Paste your DNA sequence here")
dna_input = gr.Textbox(
label="🧬 DNA Sequence",
placeholder="Enter DNA sequence (ATCG format)...",
lines=5
)
with gr.Column(scale=1):
gr.Markdown("Minimum similarity for tree analysis")
similarity_score = gr.Slider(
minimum=1,
maximum=99,
value=95.0,
step=1.0,
label="🎯 Similarity Threshold (%)"
)
gr.Markdown("Generate phylogenetic placement (slower)")
build_ml_tree = gr.Checkbox(
label="🌲 Build ML Tree",
value=False
)
analyze_btn = gr.Button("🔬 Analyze Sequence", variant="primary")
with gr.TabItem("📁 File Upload"):
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("Upload a FASTA file containing your sequence")
file_input = gr.File(
label="📄 Upload FASTA File",
file_types=[".fasta", ".fa", ".fas", ".txt"]
)
with gr.Column(scale=1):
gr.Markdown("Minimum similarity for tree analysis")
file_similarity_score = gr.Slider(
minimum=1,
maximum=99,
value=95.0,
step=1.0,
label="🎯 Similarity Threshold (%)"
)
gr.Markdown("Generate phylogenetic placement (slower)")
file_build_ml_tree = gr.Checkbox(
label="🌲 Build ML Tree",
value=False
)
analyze_file_btn = gr.Button("🔬 Analyze File", variant="primary")
gr.Markdown("## 📊 Analysis Results")
with gr.Row():
with gr.Column():
boundary_output = gr.Textbox(
label="🎯 Boundary Detection",
interactive=False,
lines=2
)
keras_output = gr.Textbox(
label="🧠 F Gene Validation",
interactive=False,
lines=2
)
with gr.Column():
ml_tree_output = gr.Textbox(
label="🌲 Phylogenetic Placement",
interactive=False,
lines=2
)
tree_analysis_output = gr.Textbox(
label="🌳 Tree Analysis",
interactive=False,
lines=2
)
summary_output = gr.Textbox(
label="📋 Summary",
interactive=False,
lines=8
)
with gr.Row():
aligned_file = gr.File(label="📄 Alignment File", visible=False)
tree_file = gr.File(label="🌲 Tree File", visible=False)
tree_html_file = gr.File(label="🌳 Simplified Tree HTML", visible=False)
report_html_file = gr.File(label="📊 Detailed Report HTML", visible=False)
with gr.Tabs():
with gr.TabItem("🌳 Interactive Tree"):
tree_html = gr.HTML(
value="<div style='text-align: center; color: #666; padding: 20px;'>No tree generated yet. Run analysis to see results.</div>"
)
with gr.TabItem("📊 Detailed Report"):
report_html = gr.HTML(
label="Analysis Report",
value="<div style='text-align: center; color: #666; padding: 20px;'>No report generated yet. Run analysis to see results.</div>"
)
# Event handlers
def handle_analysis_output(*outputs):
boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output, aligned_file, phy_file, _, _, tree_html_content, report_html_content, tree_html_path, report_html_path = outputs
return (
boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output,
gr.File.update(value=aligned_file, visible=aligned_file is not None),
gr.File.update(value=phy_file, visible=phy_file is not None),
gr.File.update(value=tree_html_path, visible=tree_html_path is not None),
gr.File.update(value=report_html_path, visible=report_html_path is not None),
tree_html_content,
report_html_content
)
analyze_btn.click(
fn=run_pipeline,
inputs=[dna_input, similarity_score, build_ml_tree],
outputs=[
boundary_output, keras_output, ml_tree_output, tree_analysis_output, summary_output,
aligned_file, tree_file, tree_html_file, report_html_file, tree_html, report_html
]
)
analyze_file_btn.click(
fn=run_pipeline_from_file,
inputs=[file_input, file_similarity_score, file_build_ml_tree],
outputs=[
boundary_output, keras_output, ml_tree_output, tree_analysis_output, summary_output,
aligned_file, tree_file, tree_html_file, report_html_file, tree_html, report_html
]
)
# Examples
gr.Examples(
examples=[
["ATCG" * 250, 85.0, False],
["CGATCG" * 150, 90.0, True]
],
inputs=[dna_input, similarity_score, build_ml_tree],
label="Example Sequences"
)
gr.Markdown("""
## 📚 Instructions
1. **Input**: Enter a DNA sequence (ATCG format) or upload a FASTA file
2. **Parameters**:
- Set similarity threshold for phylogenetic analysis (1-99%)
- Choose whether to build ML tree (slower but more accurate)
3. **Analysis**: Click analyze to run the complete pipeline
4. **Results**: View results in different tabs - summary, tree visualization, and detailed report
5. **Downloads**: Download alignment, tree, simplified tree HTML, and detailed report HTML files
### 🔬 Pipeline Components:
- **Boundary Detection**: Identifies F gene regions
- **F Gene Validation**: Validates F gene using ML
- **Phylogenetic Placement**: Places sequence in reference tree (optional)
- **Tree Analysis**: Builds phylogenetic tree with similar sequences
""")
return iface
except Exception as main_error:
logger.error(f"Gradio interface creation failed: {main_error}", exc_info=True)
return gr.Interface(
fn=lambda x: f"Error: {str(main_error)}",
inputs=gr.Textbox(label="DNA Sequence"),
outputs=gr.Textbox(label="Error"),
title="🧬 Gene Analysis Pipeline (Error Mode)"
)
# --- Application Startup ---
def run_application():
try:
logger.info("🧬 Initializing Gene Analysis Pipeline...")
main_gradio_app = create_gradio_interface()
if main_gradio_app is None:
raise RuntimeError("Gradio interface creation returned None")
logger.info("✅ Gradio interface created successfully")
main_gradio_app = gr.mount_gradio_app(app, main_gradio_app, path="/gradio")
logger.info("✅ Gradio mounted to FastAPI at /gradio")
logger.info("=" * 50)
logger.info("🔍 Checking system components...")
logger.info(f"🤖 Boundary Model: {'✅ Loaded' if boundary_model else '❌ Missing'}")
logger.info(f"🧠 Keras Model: {'✅ Loaded' if keras_model else '❌ Missing'}")
logger.info(f"🌳 Tree Analyzer: {'✅ Loaded' if analyzer else '❌ Missing'}")
mafft_available, iqtree_available, _, _ = check_tool_availability()
logger.info(f"🧬 MAFFT: {'✅ Available' if mafft_available else '❌ Missing'}")
logger.info(f"🌲 IQ-TREE: {'✅ Available' if iqtree_available else '❌ Missing'}")
logger.info("=" * 50)
logger.info("🚀 Starting Gene Analysis Pipeline...")
logger.warning("⚠️ Running without request queuing. Concurrent requests may block.")
logger.info("📊 FastAPI docs available at: http://localhost:7860/docs")
logger.info("🧬 Gradio interface available at: http://localhost:7860/gradio")
uvicorn.run(
app,
host="0.0.0.0",
port=7860,
log_level="info",
access_log=True,
timeout_keep_alive=120
)
except Exception as main_error:
logger.error(f"Application startup failed: {main_error}", exc_info=True)
try:
logger.info("🔄 Falling back to Gradio-only mode...")
fallback_gradio_app = create_gradio_interface()
if fallback_gradio_app is None:
raise RuntimeError("Fallback Gradio interface creation returned None")
logger.info("✅ Fallback Gradio interface created successfully")
logger.info("🧬 Gradio interface available at: http://localhost:7860")
fallback_gradio_app.launch(
server_name="0.0.0.0",
server_port=7860,
prevent_thread_lock=True,
quiet=True
)
except Exception as fallback_error:
logger.error(f"Fallback failed: {fallback_error}", exc_info=True)
print("❌ Application failed to start. Check logs at /tmp/app.log for details.")
sys.exit(1)
if __name__ == "__main__":
print("🧬 Gene Analysis Pipeline Starting...")
print("=" * 50)
print("🔍 Checking system components...")
mafft_available, iqtree_available, _, _ = check_tool_availability()
print(f"🤖 Boundary Model: {'✅' if boundary_model else '❌'}")
print(f"🧠 Keras Model: {'✅' if keras_model else '❌'}")
print(f"🌳 Tree Analyzer: {'✅' if analyzer else '❌'}")
print(f"🧬 MAFFT: {'✅' if mafft_available else '❌'}")
print(f"🌲 IQ-TREE: {'✅' if iqtree_available else '❌'}")
print("=" * 50)
run_application()