import os # Disable GPU to avoid CUDA errors os.environ["CUDA_VISIBLE_DEVICES"] = "" os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" # Prevent TensorFlow memory issues # Suppress TensorFlow warnings os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # More aggressive suppression import gradio as gr import torch import pickle import subprocess import pandas as pd import re import logging import numpy as np from predictor import EnhancedGenePredictor from tensorflow.keras.models import load_model from analyzer import PhylogeneticTreeAnalyzer import tempfile import shutil import sys import uuid from pathlib import Path from huggingface_hub import hf_hub_download from Bio import SeqIO from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord import stat import time import asyncio from fastapi import FastAPI, File, UploadFile, Form, HTTPException from fastapi.responses import HTMLResponse, FileResponse from pydantic import BaseModel from typing import Optional import uvicorn # --- Logging Setup --- log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') log_handler = logging.StreamHandler() log_handler.setFormatter(log_formatter) try: file_handler = logging.FileHandler('/tmp/app.log') file_handler.setFormatter(log_formatter) logging.basicConfig(level=logging.INFO, handlers=[log_handler, file_handler]) except Exception as e: logging.basicConfig(level=logging.INFO, handlers=[log_handler]) logging.warning(f"Failed to set up file logging: {e}") logger = logging.getLogger(__name__) logger.info(f"Gradio version: {gr.__version__}") # Set event loop policy for compatibility with Gradio Spaces try: asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) except Exception as e: logger.warning(f"Failed to set event loop policy: {e}") # --- Global Variables --- BASE_DIR = os.path.dirname(os.path.abspath(__file__)) MAFFT_PATH = os.path.join(BASE_DIR, "binaries", "mafft", "mafft") IQTREE_PATH = os.path.join(BASE_DIR, "binaries", "iqtree", "bin", "iqtree3") ALIGNMENT_PATH = os.path.join(BASE_DIR, "f_gene_sequences_aligned.fasta") TREE_PATH = os.path.join(BASE_DIR, "f_gene_sequences.phy.treefile") QUERY_OUTPUT_DIR = os.path.join(BASE_DIR, "queries") os.makedirs(QUERY_OUTPUT_DIR, exist_ok=True) # Model repository and file paths MODEL_REPO = "GGproject10/best_boundary_aware_model" CSV_PATH = "f cleaned.csv" # Initialize models as None boundary_model = None keras_model = None kmer_to_index = None analyzer = None # --- Model Loading --- def load_models_safely(): global boundary_model, keras_model, kmer_to_index, analyzer logger.info("🔍 Loading models...") try: boundary_path = hf_hub_download( repo_id=MODEL_REPO, filename="best_boundary_aware_model.pth", token=None ) if os.path.exists(boundary_path): boundary_model = EnhancedGenePredictor(boundary_path) logger.info("✅ Boundary model loaded successfully.") else: logger.error(f"❌ Boundary model file not found after download.") except Exception as e: logger.error(f"❌ Failed to load boundary model: {e}") boundary_model = None try: keras_path = hf_hub_download( repo_id=MODEL_REPO, filename="best_model.keras", token=None ) kmer_path = hf_hub_download( repo_id=MODEL_REPO, filename="kmer_to_index.pkl", token=None ) if os.path.exists(keras_path) and os.path.exists(kmer_path): keras_model = load_model(keras_path) with open(kmer_path, "rb") as f: kmer_to_index = pickle.load(f) logger.info("✅ Keras model and k-mer index loaded successfully.") else: logger.error(f"❌ Keras model or k-mer files not found.") except Exception as e: logger.error(f"❌ Failed to load Keras model: {e}") keras_model = None kmer_to_index = None try: logger.info("🌳 Initializing tree analyzer...") analyzer = PhylogeneticTreeAnalyzer() csv_candidates = [ CSV_PATH, os.path.join(BASE_DIR, CSV_PATH), os.path.join(BASE_DIR, "app", CSV_PATH), os.path.join(os.path.dirname(__file__), CSV_PATH), "f_cleaned.csv", os.path.join(BASE_DIR, "f_cleaned.csv") ] csv_loaded = False for csv_candidate in csv_candidates: if os.path.exists(csv_candidate): logger.info(f"📊 Trying CSV: {csv_candidate}") try: if analyzer.load_data(csv_candidate): logger.info(f"✅ CSV loaded from: {csv_candidate}") csv_loaded = True break except Exception as e: logger.warning(f"CSV load failed for {csv_candidate}: {e}") continue if not csv_loaded: logger.error("❌ Failed to load CSV data from any candidate location.") analyzer = None else: try: if analyzer.train_ai_model(): logger.info("✅ AI model training completed successfully") else: logger.warning("⚠️ AI model training failed; proceeding with basic analysis.") except Exception as e: logger.warning(f"⚠️ AI model training failed: {e}") except Exception as e: logger.error(f"❌ Tree analyzer initialization failed: {e}") analyzer = None # Load models at startup load_models_safely() # --- Tool Detection --- def setup_binary_permissions(): for binary in [MAFFT_PATH, IQTREE_PATH]: if os.path.exists(binary): try: os.chmod(binary, os.stat(binary).st_mode | stat.S_IEXEC) logger.info(f"Set executable permission on {binary}") except Exception as e: logger.warning(f"Failed to set permission on {binary}: {e}") def check_tool_availability(): setup_binary_permissions() mafft_available = False mafft_cmd = None mafft_candidates = ['mafft', '/usr/bin/mafft', '/usr/local/bin/mafft', MAFFT_PATH] for candidate in mafft_candidates: if shutil.which(candidate) or os.path.exists(candidate): try: result = subprocess.run( [candidate, "--help"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0 or "mafft" in result.stderr.lower(): mafft_available = True mafft_cmd = candidate logger.info(f"✅ MAFFT found at: {candidate}") break except Exception as e: logger.debug(f"MAFFT test failed for {candidate}: {e}") iqtree_available = False iqtree_cmd = None iqtree_candidates = ['iqtree', 'iqtree2', 'iqtree3', '/usr/bin/iqtree', '/usr/local/bin/iqtree', IQTREE_PATH] for candidate in iqtree_candidates: if shutil.which(candidate) or os.path.exists(candidate): try: result = subprocess.run( [candidate, "--help"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0 or "iqtree" in result.stderr.lower(): iqtree_available = True iqtree_cmd = candidate logger.info(f"✅ IQ-TREE found at: {candidate}") break except Exception as e: logger.debug(f"IQ-TREE test failed for {candidate}: {e}") return mafft_available, iqtree_available, mafft_cmd, iqtree_cmd # --- Pipeline Functions --- def cleanup_file(file_path: str) -> None: """Utility function to safely delete a file and log errors.""" if file_path and os.path.exists(file_path): try: os.unlink(file_path) logger.debug(f"Cleaned up {file_path}") except Exception as cleanup_error: logger.warning(f"Failed to clean up {file_path}: {cleanup_error}") def phylogenetic_placement(sequence: str, mafft_cmd: str, iqtree_cmd: str): query_fasta = None try: if len(sequence.strip()) < 100: return False, "Sequence too short (<100 bp).", None, None query_id = f"QUERY_{uuid.uuid4().hex[:8]}" query_fasta = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}.fa") aligned_with_query = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_aligned.fa") output_prefix = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_placed_tree") if not os.path.exists(ALIGNMENT_PATH) or not os.path.exists(TREE_PATH): cleanup_file(query_fasta) return False, "Reference alignment or tree not found.", None, None query_record = SeqRecord(Seq(sequence.upper()), id=query_id, description="") SeqIO.write([query_record], query_fasta, "fasta") with open(aligned_with_query, "w") as output_file: result = subprocess.run( [mafft_cmd, "--add", query_fasta, "--reorder", ALIGNMENT_PATH], stdout=output_file, stderr=subprocess.PIPE, text=True, timeout=600, check=True ) if not os.path.exists(aligned_with_query) or os.path.getsize(aligned_with_query) == 0: cleanup_file(query_fasta) return False, "MAFFT alignment failed.", None, None result = subprocess.run( [iqtree_cmd, "-s", aligned_with_query, "-g", TREE_PATH, "-m", "GTR+G", "-pre", output_prefix, "-redo"], capture_output=True, text=True, timeout=1200, check=True ) treefile = f"{output_prefix}.treefile" if not os.path.exists(treefile): cleanup_file(query_fasta) return False, "IQ-TREE placement failed.", aligned_with_query, None success_msg = f"Placement completed!\nQuery ID: {query_id}\nAlignment: {os.path.basename(aligned_with_query)}\nTree: {os.path.basename(treefile)}" cleanup_file(query_fasta) return True, success_msg, aligned_with_query, treefile except Exception as main_error: logger.error(f"Phylogenetic placement failed: {main_error}", exc_info=True) cleanup_file(query_fasta) return False, f"Error: {str(main_error)}", None, None def analyze_sequence_for_tree(sequence: str, matching_percentage: float): try: logger.debug("Starting tree analysis...") if not analyzer: return "❌ Tree analyzer not initialized.", None, None if not sequence or len(sequence.strip()) < 10: return "❌ Invalid sequence.", None, None if not (1 <= matching_percentage <= 99): return "❌ Matching percentage must be 1-99.", None, None logger.debug("Finding query sequence...") if not analyzer.find_query_sequence(sequence): return "❌ Sequence not accepted.", None, None logger.debug("Finding similar sequences...") matched_ids, actual_percentage = analyzer.find_similar_sequences(matching_percentage) if not matched_ids: return f"❌ No similar sequences at {matching_percentage}% threshold.", None, None logger.debug("Building tree structure...") analyzer.build_tree_structure_with_ml_safe(matched_ids) logger.debug("Creating interactive tree...") fig = analyzer.create_interactive_tree(matched_ids, actual_percentage) query_id = analyzer.query_id or f"query_{int(time.time())}" tree_html_path = os.path.join("/tmp", f'phylogenetic_tree_{query_id}.html') logger.debug(f"Saving tree to {tree_html_path}") fig.write_html(tree_html_path) analyzer.matching_percentage = matching_percentage logger.debug("Generating detailed report...") report_success = analyzer.generate_detailed_report(matched_ids, actual_percentage) report_html_path = os.path.join("/tmp", f'detailed_report_{query_id}.html') if report_success else None logger.debug(f"Tree analysis completed: {len(matched_ids)} matches") return f"✅ Found {len(matched_ids)} sequences at {actual_percentage:.2f}% similarity.", tree_html_path, report_html_path except Exception as e: logger.error(f"Tree analysis failed: {e}", exc_info=True) return f"❌ Error: {str(e)}", None, None def predict_with_keras(sequence): try: if not keras_model or not kmer_to_index: return "❌ Keras model not available." if len(sequence) < 6: return "❌ Sequence too short (<6 bp)." kmers = [sequence[i:i+6] for i in range(len(sequence)-5)] indices = [kmer_to_index.get(kmer, 0) for kmer in kmers] input_arr = np.array([indices]) prediction = keras_model.predict(input_arr, verbose=0)[0] f_gene_prob = prediction[-1] percentage = min(100, max(0, int(f_gene_prob * 100 + 5))) return f"✅ {percentage}% F gene confidence" except Exception as e: logger.error(f"Keras prediction failed: {e}", exc_info=True) return f"❌ Error: {str(e)}" def read_fasta_file(file_obj): try: if file_obj is None: return "" if isinstance(file_obj, str): with open(file_obj, "r") as f: content = f.read() else: content = file_obj.read().decode("utf-8") lines = content.strip().split("\n") seq_lines = [line.strip() for line in lines if not line.startswith(">")] return ''.join(seq_lines) except Exception as e: logger.error(f"Failed to read FASTA file: {e}", exc_info=True) return "" def run_pipeline(dna_input, similarity_score=95.0, build_ml_tree=False): try: dna_input = dna_input.upper().strip() if not dna_input: return "❌ Empty input", "", "", "", "", None, None, None, None, "No input", "No input", None, None if not re.match('^[ACTGN]+$', dna_input): dna_input = ''.join(c if c in 'ACTGN' else 'N' for c in dna_input) processed_sequence = dna_input boundary_output = "" if boundary_model: try: result = boundary_model.predict_sequence(dna_input) regions = result['gene_regions'] if regions: processed_sequence = regions[0]["sequence"] boundary_output = f"✅ F gene region found: {len(processed_sequence)} bp" else: boundary_output = "⚠️ No F gene regions found." processed_sequence = dna_input except Exception as e: boundary_output = f"❌ Boundary prediction error: {str(e)}" processed_sequence = dna_input else: boundary_output = f"⚠️ Boundary model not available. Using full input: {len(dna_input)} bp" keras_output = predict_with_keras(processed_sequence) if processed_sequence and len(processed_sequence) >= 6 else "❌ Sequence too short." aligned_file = None phy_file = None ml_tree_output = "" if build_ml_tree and processed_sequence and len(processed_sequence) >= 100: try: mafft_available, iqtree_available, mafft_cmd, iqtree_cmd = check_tool_availability() if mafft_available and iqtree_available: ml_success, ml_message, ml_aligned, ml_tree = phylogenetic_placement(processed_sequence, mafft_cmd, iqtree_cmd) ml_tree_output = ml_message aligned_file = ml_aligned phy_file = ml_tree else: ml_tree_output = "❌ MAFFT or IQ-TREE not available" except Exception as e: ml_tree_output = f"❌ ML tree error: {str(e)}" elif build_ml_tree: ml_tree_output = "❌ Sequence too short for placement (<100 bp)." else: ml_tree_output = "⚠️ Phylogenetic placement skipped." tree_html_content = "No tree generated." report_html_content = "No report generated." tree_html_path = None report_html_path = None simplified_ml_output = "" if analyzer and processed_sequence and len(processed_sequence) >= 10: try: tree_result, tree_html_path, report_html_path = analyze_sequence_for_tree(processed_sequence, similarity_score) simplified_ml_output = tree_result if tree_html_path and os.path.exists(tree_html_path): with open(tree_html_path, 'r', encoding='utf-8') as f: tree_html_content = f.read() else: tree_html_content = f"
{tree_result}
" if report_html_path and os.path.exists(report_html_path): with open(report_html_path, 'r', encoding='utf-8') as f: report_html_content = f.read() else: report_html_content = f"
{tree_result}
" except Exception as e: simplified_ml_output = f"❌ Tree analysis error: {str(e)}" tree_html_content = f"
{simplified_ml_output}
" report_html_content = f"
{simplified_ml_output}
" else: simplified_ml_output = "❌ Tree analyzer not available." if not analyzer else "❌ Sequence too short (<10 bp)." tree_html_content = f"
{simplified_ml_output}
" report_html_content = f"
{simplified_ml_output}
" summary_output = f""" 📊 ANALYSIS SUMMARY: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Input: {len(dna_input)} bp F Gene: {len(processed_sequence)} bp Validation: {keras_output.split(':')[-1].strip() if ':' in keras_output else keras_output} Placement: {'✅ OK' if '✅' in ml_tree_output else '⚠️ Skipped' if 'skipped' in ml_tree_output else '❌ Failed'} Tree Analysis: {'✅ OK' if 'Found' in simplified_ml_output else '❌ Failed'} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ return ( boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output, aligned_file, phy_file, None, None, tree_html_content, report_html_content, tree_html_path, report_html_path ) except Exception as e: logger.error(f"Pipeline error: {e}", exc_info=True) error_msg = f"❌ Pipeline Error: {str(e)}" return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg, None, None async def run_pipeline_from_file(fasta_file_obj, similarity_score, build_ml_tree): temp_file_path = None try: if fasta_file_obj is None: return "❌ No file provided", "", "", "", "", None, None, None, None, "No input", "No input", None, None with tempfile.NamedTemporaryFile(delete=False, suffix=".fasta", dir="/tmp") as temp_file: if isinstance(fasta_file_obj, UploadFile): content = await fasta_file_obj.read() temp_file.write(content) else: with open(fasta_file_obj, 'rb') as f: content = f.read() temp_file.write(content) temp_file_path = temp_file.name dna_input = read_fasta_file(temp_file_path) if not dna_input: cleanup_file(temp_file_path) return "❌ Failed to read FASTA file", "", "", "", "", None, None, None, None, "No input", "No input", None, None result = run_pipeline(dna_input, similarity_score, build_ml_tree) cleanup_file(temp_file_path) return result except Exception as main_error: logger.error(f"Pipeline from file error: {main_error}", exc_info=True) cleanup_file(temp_file_path) error_msg = f"❌ Error: {str(main_error)}" return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg, None, None class AnalysisRequest(BaseModel): sequence: str similarity_score: float = 95.0 build_ml_tree: bool = False class AnalysisResponse(BaseModel): boundary_output: str keras_output: str ml_tree_output: str tree_analysis_output: str summary_output: str success: bool error_message: Optional[str] = None tree_html_path: Optional[str] = None report_html_path: Optional[str] = None # --- FastAPI App Setup --- app = FastAPI(title="🧬 Gene Analysis Pipeline", version="1.0.0") @app.get("/") async def root(): return { "message": "🧬 Gene Analysis Pipeline API", "status": "running", "endpoints": { "docs": "/docs", "health": "/health", "gradio": "/gradio", "analyze": "/analyze", "analyze_file": "/analyze-file", "download": "/download/{file_type}/{query_id}" } } @app.get("/health") async def health_check(): try: mafft_available, iqtree_available, _, _ = check_tool_availability() return { "status": "healthy", "components": { "boundary_model": boundary_model is not None, "keras_model": keras_model is not None, "tree_analyzer": analyzer is not None, "mafft_available": mafft_available, "iqtree_available": iqtree_available }, "paths": { "base_dir": BASE_DIR, "query_output_dir": QUERY_OUTPUT_DIR } } except Exception as e: logger.error(f"Health check error: {e}", exc_info=True) return {"status": "unhealthy", "error": str(e)} @app.post("/analyze", response_model=AnalysisResponse) async def analyze_sequence(request: AnalysisRequest): try: result = run_pipeline(request.sequence, request.similarity_score, request.build_ml_tree) return AnalysisResponse( boundary_output=result[0] or "", keras_output=result[1] or "", ml_tree_output=result[2] or "", tree_analysis_output=result[3] or "", summary_output=result[4] or "", tree_html_path=result[11], report_html_path=result[12], success=True ) except Exception as e: logger.error(f"Analyze error: {e}", exc_info=True) return AnalysisResponse( boundary_output="", keras_output="", ml_tree_output="", tree_analysis_output="", summary_output="", tree_html_path=None, report_html_path=None, success=False, error_message=str(e) ) @app.post("/analyze-file", response_model=AnalysisResponse) async def analyze_file( file: UploadFile = File(...), similarity_score: float = Form(95.0), build_ml_tree: bool = Form(False) ): temp_file_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=".fasta", dir="/tmp") as temp_file: content = await file.read() temp_file.write(content) temp_file_path = temp_file.name result = await run_pipeline_from_file(temp_file_path, similarity_score, build_ml_tree) cleanup_file(temp_file_path) return AnalysisResponse( boundary_output=result[0] or "", keras_output=result[1] or "", ml_tree_output=result[2] or "", tree_analysis_output=result[3] or "", summary_output=result[4] or "", tree_html_path=result[11], report_html_path=result[12], success=True ) except Exception as main_error: logger.error(f"Analyze-file error: {main_error}", exc_info=True) cleanup_file(temp_file_path) return AnalysisResponse( boundary_output="", keras_output="", ml_tree_output="", tree_analysis_output="", summary_output="", tree_html_path=None, report_html_path=None, success=False, error_message=str(main_error) ) @app.get("/download/{file_type}/{query_id}") async def download_file(file_type: str, query_id: str): try: if file_type not in ["tree", "report"]: raise HTTPException(status_code=400, detail="Invalid file type. Use 'tree' or 'report'.") file_name = f"phylogenetic_tree_{query_id}.html" if file_type == "tree" else f"detailed_report_{query_id}.html" file_path = os.path.join("/tmp", file_name) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found.") return FileResponse(file_path, filename=file_name, media_type="text/html") except Exception as e: logger.error(f"Download error: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Error serving file: {str(e)}") # --- Gradio Interface --- def create_gradio_interface(): try: with gr.Blocks( title="🧬 Gene Analysis Pipeline", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1200px !important; } .status-box { padding: 10px; border-radius: 5px; margin: 5px 0; } .success { background-color: #d4edda; border: 1px solid #c3e6cb; color: #155724; } .warning { background-color: #fff3cd; border: 1px solid #ffeaa7; color: #856404; } .error { background-color: #f8d7da; border: 1px solid #f5c6cb; color: #721c24; } """ ) as iface: gr.Markdown("# 🧬 Gene Analysis Pipeline") with gr.Row(): with gr.Column(): status_display = gr.HTML(value=f"""

🔧 System Status

🤖 Boundary Model: {'✅ Loaded' if boundary_model else '❌ Missing'}

🧠 Keras Model: {'✅ Loaded' if keras_model else '❌ Missing'}

🌳 Tree Analyzer: {'✅ Loaded' if analyzer else '❌ Missing'}

🧬 MAFFT: {'✅ Available' if check_tool_availability()[0] else '❌ Missing'}

🌲 IQ-TREE: {'✅ Available' if check_tool_availability()[1] else '❌ Missing'}

""") with gr.Tabs(): with gr.TabItem("📝 Text Input"): with gr.Row(): with gr.Column(scale=2): gr.Markdown("Paste your DNA sequence here") dna_input = gr.Textbox( label="🧬 DNA Sequence", placeholder="Enter DNA sequence (ATCG format)...", lines=5 ) with gr.Column(scale=1): gr.Markdown("Minimum similarity for tree analysis") similarity_score = gr.Slider( minimum=1, maximum=99, value=95.0, step=1.0, label="🎯 Similarity Threshold (%)" ) gr.Markdown("Generate phylogenetic placement (slower)") build_ml_tree = gr.Checkbox( label="🌲 Build ML Tree", value=False ) analyze_btn = gr.Button("🔬 Analyze Sequence", variant="primary") with gr.TabItem("📁 File Upload"): with gr.Row(): with gr.Column(scale=2): gr.Markdown("Upload a FASTA file containing your sequence") file_input = gr.File( label="📄 Upload FASTA File", file_types=[".fasta", ".fa", ".fas", ".txt"] ) with gr.Column(scale=1): gr.Markdown("Minimum similarity for tree analysis") file_similarity_score = gr.Slider( minimum=1, maximum=99, value=95.0, step=1.0, label="🎯 Similarity Threshold (%)" ) gr.Markdown("Generate phylogenetic placement (slower)") file_build_ml_tree = gr.Checkbox( label="🌲 Build ML Tree", value=False ) analyze_file_btn = gr.Button("🔬 Analyze File", variant="primary") gr.Markdown("## 📊 Analysis Results") with gr.Row(): with gr.Column(): boundary_output = gr.Textbox( label="🎯 Boundary Detection", interactive=False, lines=2 ) keras_output = gr.Textbox( label="🧠 F Gene Validation", interactive=False, lines=2 ) with gr.Column(): ml_tree_output = gr.Textbox( label="🌲 Phylogenetic Placement", interactive=False, lines=2 ) tree_analysis_output = gr.Textbox( label="🌳 Tree Analysis", interactive=False, lines=2 ) summary_output = gr.Textbox( label="📋 Summary", interactive=False, lines=8 ) with gr.Row(): aligned_file = gr.File(label="📄 Alignment File", visible=False) tree_file = gr.File(label="🌲 Tree File", visible=False) tree_html_file = gr.File(label="🌳 Simplified Tree HTML", visible=False) report_html_file = gr.File(label="📊 Detailed Report HTML", visible=False) with gr.Tabs(): with gr.TabItem("🌳 Interactive Tree"): tree_html = gr.HTML( value="
No tree generated yet. Run analysis to see results.
" ) with gr.TabItem("📊 Detailed Report"): report_html = gr.HTML( label="Analysis Report", value="
No report generated yet. Run analysis to see results.
" ) # Event handlers def handle_analysis_output(*outputs): boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output, aligned_file, phy_file, _, _, tree_html_content, report_html_content, tree_html_path, report_html_path = outputs return ( boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output, gr.File.update(value=aligned_file, visible=aligned_file is not None), gr.File.update(value=phy_file, visible=phy_file is not None), gr.File.update(value=tree_html_path, visible=tree_html_path is not None), gr.File.update(value=report_html_path, visible=report_html_path is not None), tree_html_content, report_html_content ) analyze_btn.click( fn=run_pipeline, inputs=[dna_input, similarity_score, build_ml_tree], outputs=[ boundary_output, keras_output, ml_tree_output, tree_analysis_output, summary_output, aligned_file, tree_file, tree_html_file, report_html_file, tree_html, report_html ] ) analyze_file_btn.click( fn=run_pipeline_from_file, inputs=[file_input, file_similarity_score, file_build_ml_tree], outputs=[ boundary_output, keras_output, ml_tree_output, tree_analysis_output, summary_output, aligned_file, tree_file, tree_html_file, report_html_file, tree_html, report_html ] ) # Examples gr.Examples( examples=[ ["ATCG" * 250, 85.0, False], ["CGATCG" * 150, 90.0, True] ], inputs=[dna_input, similarity_score, build_ml_tree], label="Example Sequences" ) gr.Markdown(""" ## 📚 Instructions 1. **Input**: Enter a DNA sequence (ATCG format) or upload a FASTA file 2. **Parameters**: - Set similarity threshold for phylogenetic analysis (1-99%) - Choose whether to build ML tree (slower but more accurate) 3. **Analysis**: Click analyze to run the complete pipeline 4. **Results**: View results in different tabs - summary, tree visualization, and detailed report 5. **Downloads**: Download alignment, tree, simplified tree HTML, and detailed report HTML files ### 🔬 Pipeline Components: - **Boundary Detection**: Identifies F gene regions - **F Gene Validation**: Validates F gene using ML - **Phylogenetic Placement**: Places sequence in reference tree (optional) - **Tree Analysis**: Builds phylogenetic tree with similar sequences """) return iface except Exception as main_error: logger.error(f"Gradio interface creation failed: {main_error}", exc_info=True) return gr.Interface( fn=lambda x: f"Error: {str(main_error)}", inputs=gr.Textbox(label="DNA Sequence"), outputs=gr.Textbox(label="Error"), title="🧬 Gene Analysis Pipeline (Error Mode)" ) # --- Application Startup --- def run_application(): try: logger.info("🧬 Initializing Gene Analysis Pipeline...") main_gradio_app = create_gradio_interface() if main_gradio_app is None: raise RuntimeError("Gradio interface creation returned None") logger.info("✅ Gradio interface created successfully") main_gradio_app = gr.mount_gradio_app(app, main_gradio_app, path="/gradio") logger.info("✅ Gradio mounted to FastAPI at /gradio") logger.info("=" * 50) logger.info("🔍 Checking system components...") logger.info(f"🤖 Boundary Model: {'✅ Loaded' if boundary_model else '❌ Missing'}") logger.info(f"🧠 Keras Model: {'✅ Loaded' if keras_model else '❌ Missing'}") logger.info(f"🌳 Tree Analyzer: {'✅ Loaded' if analyzer else '❌ Missing'}") mafft_available, iqtree_available, _, _ = check_tool_availability() logger.info(f"🧬 MAFFT: {'✅ Available' if mafft_available else '❌ Missing'}") logger.info(f"🌲 IQ-TREE: {'✅ Available' if iqtree_available else '❌ Missing'}") logger.info("=" * 50) logger.info("🚀 Starting Gene Analysis Pipeline...") logger.warning("⚠️ Running without request queuing. Concurrent requests may block.") logger.info("📊 FastAPI docs available at: http://localhost:7860/docs") logger.info("🧬 Gradio interface available at: http://localhost:7860/gradio") uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info", access_log=True, timeout_keep_alive=120 ) except Exception as main_error: logger.error(f"Application startup failed: {main_error}", exc_info=True) try: logger.info("🔄 Falling back to Gradio-only mode...") fallback_gradio_app = create_gradio_interface() if fallback_gradio_app is None: raise RuntimeError("Fallback Gradio interface creation returned None") logger.info("✅ Fallback Gradio interface created successfully") logger.info("🧬 Gradio interface available at: http://localhost:7860") fallback_gradio_app.launch( server_name="0.0.0.0", server_port=7860, prevent_thread_lock=True, quiet=True ) except Exception as fallback_error: logger.error(f"Fallback failed: {fallback_error}", exc_info=True) print("❌ Application failed to start. Check logs at /tmp/app.log for details.") sys.exit(1) if __name__ == "__main__": print("🧬 Gene Analysis Pipeline Starting...") print("=" * 50) print("🔍 Checking system components...") mafft_available, iqtree_available, _, _ = check_tool_availability() print(f"🤖 Boundary Model: {'✅' if boundary_model else '❌'}") print(f"🧠 Keras Model: {'✅' if keras_model else '❌'}") print(f"🌳 Tree Analyzer: {'✅' if analyzer else '❌'}") print(f"🧬 MAFFT: {'✅' if mafft_available else '❌'}") print(f"🌲 IQ-TREE: {'✅' if iqtree_available else '❌'}") print("=" * 50) run_application()