Spaces:
No application file
No application file
| import os | |
| # Disable GPU to avoid CUDA errors | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" # Prevent TensorFlow memory issues | |
| # Suppress TensorFlow warnings | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # More aggressive suppression | |
| import gradio as gr | |
| import torch | |
| import pickle | |
| import subprocess | |
| import pandas as pd | |
| import re | |
| import logging | |
| import numpy as np | |
| from predictor import EnhancedGenePredictor | |
| from tensorflow.keras.models import load_model | |
| from analyzer import PhylogeneticTreeAnalyzer | |
| import tempfile | |
| import shutil | |
| import sys | |
| import uuid | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download | |
| from Bio import SeqIO | |
| from Bio.Seq import Seq | |
| from Bio.SeqRecord import SeqRecord | |
| import stat | |
| import time | |
| import asyncio | |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| import uvicorn | |
| # --- Logging Setup --- | |
| log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
| log_handler = logging.StreamHandler() | |
| log_handler.setFormatter(log_formatter) | |
| try: | |
| file_handler = logging.FileHandler('/tmp/app.log') | |
| file_handler.setFormatter(log_formatter) | |
| logging.basicConfig(level=logging.INFO, handlers=[log_handler, file_handler]) | |
| except Exception as e: | |
| logging.basicConfig(level=logging.INFO, handlers=[log_handler]) | |
| logging.warning(f"Failed to set up file logging: {e}") | |
| logger = logging.getLogger(__name__) | |
| logger.info(f"Gradio version: {gr.__version__}") | |
| # Set event loop policy for compatibility with Gradio Spaces | |
| try: | |
| asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) | |
| except Exception as e: | |
| logger.warning(f"Failed to set event loop policy: {e}") | |
| # --- Global Variables --- | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| MAFFT_PATH = os.path.join(BASE_DIR, "binaries", "mafft", "mafft") | |
| IQTREE_PATH = os.path.join(BASE_DIR, "binaries", "iqtree", "bin", "iqtree3") | |
| ALIGNMENT_PATH = os.path.join(BASE_DIR, "f_gene_sequences_aligned.fasta") | |
| TREE_PATH = os.path.join(BASE_DIR, "f_gene_sequences.phy.treefile") | |
| QUERY_OUTPUT_DIR = os.path.join(BASE_DIR, "queries") | |
| os.makedirs(QUERY_OUTPUT_DIR, exist_ok=True) | |
| # Model repository and file paths | |
| MODEL_REPO = "GGproject10/best_boundary_aware_model" | |
| CSV_PATH = "f cleaned.csv" | |
| # Initialize models as None | |
| boundary_model = None | |
| keras_model = None | |
| kmer_to_index = None | |
| analyzer = None | |
| # --- Model Loading --- | |
| def load_models_safely(): | |
| global boundary_model, keras_model, kmer_to_index, analyzer | |
| logger.info("🔍 Loading models...") | |
| try: | |
| boundary_path = hf_hub_download( | |
| repo_id=MODEL_REPO, | |
| filename="best_boundary_aware_model.pth", | |
| token=None | |
| ) | |
| if os.path.exists(boundary_path): | |
| boundary_model = EnhancedGenePredictor(boundary_path) | |
| logger.info("✅ Boundary model loaded successfully.") | |
| else: | |
| logger.error(f"❌ Boundary model file not found after download.") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load boundary model: {e}") | |
| boundary_model = None | |
| try: | |
| keras_path = hf_hub_download( | |
| repo_id=MODEL_REPO, | |
| filename="best_model.keras", | |
| token=None | |
| ) | |
| kmer_path = hf_hub_download( | |
| repo_id=MODEL_REPO, | |
| filename="kmer_to_index.pkl", | |
| token=None | |
| ) | |
| if os.path.exists(keras_path) and os.path.exists(kmer_path): | |
| keras_model = load_model(keras_path) | |
| with open(kmer_path, "rb") as f: | |
| kmer_to_index = pickle.load(f) | |
| logger.info("✅ Keras model and k-mer index loaded successfully.") | |
| else: | |
| logger.error(f"❌ Keras model or k-mer files not found.") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load Keras model: {e}") | |
| keras_model = None | |
| kmer_to_index = None | |
| try: | |
| logger.info("🌳 Initializing tree analyzer...") | |
| analyzer = PhylogeneticTreeAnalyzer() | |
| csv_candidates = [ | |
| CSV_PATH, | |
| os.path.join(BASE_DIR, CSV_PATH), | |
| os.path.join(BASE_DIR, "app", CSV_PATH), | |
| os.path.join(os.path.dirname(__file__), CSV_PATH), | |
| "f_cleaned.csv", | |
| os.path.join(BASE_DIR, "f_cleaned.csv") | |
| ] | |
| csv_loaded = False | |
| for csv_candidate in csv_candidates: | |
| if os.path.exists(csv_candidate): | |
| logger.info(f"📊 Trying CSV: {csv_candidate}") | |
| try: | |
| if analyzer.load_data(csv_candidate): | |
| logger.info(f"✅ CSV loaded from: {csv_candidate}") | |
| csv_loaded = True | |
| break | |
| except Exception as e: | |
| logger.warning(f"CSV load failed for {csv_candidate}: {e}") | |
| continue | |
| if not csv_loaded: | |
| logger.error("❌ Failed to load CSV data from any candidate location.") | |
| analyzer = None | |
| else: | |
| try: | |
| if analyzer.train_ai_model(): | |
| logger.info("✅ AI model training completed successfully") | |
| else: | |
| logger.warning("⚠️ AI model training failed; proceeding with basic analysis.") | |
| except Exception as e: | |
| logger.warning(f"⚠️ AI model training failed: {e}") | |
| except Exception as e: | |
| logger.error(f"❌ Tree analyzer initialization failed: {e}") | |
| analyzer = None | |
| # Load models at startup | |
| load_models_safely() | |
| # --- Tool Detection --- | |
| def setup_binary_permissions(): | |
| for binary in [MAFFT_PATH, IQTREE_PATH]: | |
| if os.path.exists(binary): | |
| try: | |
| os.chmod(binary, os.stat(binary).st_mode | stat.S_IEXEC) | |
| logger.info(f"Set executable permission on {binary}") | |
| except Exception as e: | |
| logger.warning(f"Failed to set permission on {binary}: {e}") | |
| def check_tool_availability(): | |
| setup_binary_permissions() | |
| mafft_available = False | |
| mafft_cmd = None | |
| mafft_candidates = ['mafft', '/usr/bin/mafft', '/usr/local/bin/mafft', MAFFT_PATH] | |
| for candidate in mafft_candidates: | |
| if shutil.which(candidate) or os.path.exists(candidate): | |
| try: | |
| result = subprocess.run( | |
| [candidate, "--help"], | |
| capture_output=True, | |
| text=True, | |
| timeout=5 | |
| ) | |
| if result.returncode == 0 or "mafft" in result.stderr.lower(): | |
| mafft_available = True | |
| mafft_cmd = candidate | |
| logger.info(f"✅ MAFFT found at: {candidate}") | |
| break | |
| except Exception as e: | |
| logger.debug(f"MAFFT test failed for {candidate}: {e}") | |
| iqtree_available = False | |
| iqtree_cmd = None | |
| iqtree_candidates = ['iqtree', 'iqtree2', 'iqtree3', '/usr/bin/iqtree', '/usr/local/bin/iqtree', IQTREE_PATH] | |
| for candidate in iqtree_candidates: | |
| if shutil.which(candidate) or os.path.exists(candidate): | |
| try: | |
| result = subprocess.run( | |
| [candidate, "--help"], | |
| capture_output=True, | |
| text=True, | |
| timeout=5 | |
| ) | |
| if result.returncode == 0 or "iqtree" in result.stderr.lower(): | |
| iqtree_available = True | |
| iqtree_cmd = candidate | |
| logger.info(f"✅ IQ-TREE found at: {candidate}") | |
| break | |
| except Exception as e: | |
| logger.debug(f"IQ-TREE test failed for {candidate}: {e}") | |
| return mafft_available, iqtree_available, mafft_cmd, iqtree_cmd | |
| # --- Pipeline Functions --- | |
| def cleanup_file(file_path: str) -> None: | |
| """Utility function to safely delete a file and log errors.""" | |
| if file_path and os.path.exists(file_path): | |
| try: | |
| os.unlink(file_path) | |
| logger.debug(f"Cleaned up {file_path}") | |
| except Exception as cleanup_error: | |
| logger.warning(f"Failed to clean up {file_path}: {cleanup_error}") | |
| def phylogenetic_placement(sequence: str, mafft_cmd: str, iqtree_cmd: str): | |
| query_fasta = None | |
| try: | |
| if len(sequence.strip()) < 100: | |
| return False, "Sequence too short (<100 bp).", None, None | |
| query_id = f"QUERY_{uuid.uuid4().hex[:8]}" | |
| query_fasta = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}.fa") | |
| aligned_with_query = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_aligned.fa") | |
| output_prefix = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_placed_tree") | |
| if not os.path.exists(ALIGNMENT_PATH) or not os.path.exists(TREE_PATH): | |
| cleanup_file(query_fasta) | |
| return False, "Reference alignment or tree not found.", None, None | |
| query_record = SeqRecord(Seq(sequence.upper()), id=query_id, description="") | |
| SeqIO.write([query_record], query_fasta, "fasta") | |
| with open(aligned_with_query, "w") as output_file: | |
| result = subprocess.run( | |
| [mafft_cmd, "--add", query_fasta, "--reorder", ALIGNMENT_PATH], | |
| stdout=output_file, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| timeout=600, | |
| check=True | |
| ) | |
| if not os.path.exists(aligned_with_query) or os.path.getsize(aligned_with_query) == 0: | |
| cleanup_file(query_fasta) | |
| return False, "MAFFT alignment failed.", None, None | |
| result = subprocess.run( | |
| [iqtree_cmd, "-s", aligned_with_query, "-g", TREE_PATH, "-m", "GTR+G", "-pre", output_prefix, "-redo"], | |
| capture_output=True, | |
| text=True, | |
| timeout=1200, | |
| check=True | |
| ) | |
| treefile = f"{output_prefix}.treefile" | |
| if not os.path.exists(treefile): | |
| cleanup_file(query_fasta) | |
| return False, "IQ-TREE placement failed.", aligned_with_query, None | |
| success_msg = f"Placement completed!\nQuery ID: {query_id}\nAlignment: {os.path.basename(aligned_with_query)}\nTree: {os.path.basename(treefile)}" | |
| cleanup_file(query_fasta) | |
| return True, success_msg, aligned_with_query, treefile | |
| except Exception as main_error: | |
| logger.error(f"Phylogenetic placement failed: {main_error}", exc_info=True) | |
| cleanup_file(query_fasta) | |
| return False, f"Error: {str(main_error)}", None, None | |
| def analyze_sequence_for_tree(sequence: str, matching_percentage: float): | |
| try: | |
| logger.debug("Starting tree analysis...") | |
| if not analyzer: | |
| return "❌ Tree analyzer not initialized.", None, None | |
| if not sequence or len(sequence.strip()) < 10: | |
| return "❌ Invalid sequence.", None, None | |
| if not (1 <= matching_percentage <= 99): | |
| return "❌ Matching percentage must be 1-99.", None, None | |
| logger.debug("Finding query sequence...") | |
| if not analyzer.find_query_sequence(sequence): | |
| return "❌ Sequence not accepted.", None, None | |
| logger.debug("Finding similar sequences...") | |
| matched_ids, actual_percentage = analyzer.find_similar_sequences(matching_percentage) | |
| if not matched_ids: | |
| return f"❌ No similar sequences at {matching_percentage}% threshold.", None, None | |
| logger.debug("Building tree structure...") | |
| analyzer.build_tree_structure_with_ml_safe(matched_ids) | |
| logger.debug("Creating interactive tree...") | |
| fig = analyzer.create_interactive_tree(matched_ids, actual_percentage) | |
| query_id = analyzer.query_id or f"query_{int(time.time())}" | |
| tree_html_path = os.path.join("/tmp", f'phylogenetic_tree_{query_id}.html') | |
| logger.debug(f"Saving tree to {tree_html_path}") | |
| fig.write_html(tree_html_path) | |
| analyzer.matching_percentage = matching_percentage | |
| logger.debug("Generating detailed report...") | |
| report_success = analyzer.generate_detailed_report(matched_ids, actual_percentage) | |
| report_html_path = os.path.join("/tmp", f'detailed_report_{query_id}.html') if report_success else None | |
| logger.debug(f"Tree analysis completed: {len(matched_ids)} matches") | |
| return f"✅ Found {len(matched_ids)} sequences at {actual_percentage:.2f}% similarity.", tree_html_path, report_html_path | |
| except Exception as e: | |
| logger.error(f"Tree analysis failed: {e}", exc_info=True) | |
| return f"❌ Error: {str(e)}", None, None | |
| def predict_with_keras(sequence): | |
| try: | |
| if not keras_model or not kmer_to_index: | |
| return "❌ Keras model not available." | |
| if len(sequence) < 6: | |
| return "❌ Sequence too short (<6 bp)." | |
| kmers = [sequence[i:i+6] for i in range(len(sequence)-5)] | |
| indices = [kmer_to_index.get(kmer, 0) for kmer in kmers] | |
| input_arr = np.array([indices]) | |
| prediction = keras_model.predict(input_arr, verbose=0)[0] | |
| f_gene_prob = prediction[-1] | |
| percentage = min(100, max(0, int(f_gene_prob * 100 + 5))) | |
| return f"✅ {percentage}% F gene confidence" | |
| except Exception as e: | |
| logger.error(f"Keras prediction failed: {e}", exc_info=True) | |
| return f"❌ Error: {str(e)}" | |
| def read_fasta_file(file_obj): | |
| try: | |
| if file_obj is None: | |
| return "" | |
| if isinstance(file_obj, str): | |
| with open(file_obj, "r") as f: | |
| content = f.read() | |
| else: | |
| content = file_obj.read().decode("utf-8") | |
| lines = content.strip().split("\n") | |
| seq_lines = [line.strip() for line in lines if not line.startswith(">")] | |
| return ''.join(seq_lines) | |
| except Exception as e: | |
| logger.error(f"Failed to read FASTA file: {e}", exc_info=True) | |
| return "" | |
| def run_pipeline(dna_input, similarity_score=95.0, build_ml_tree=False): | |
| try: | |
| dna_input = dna_input.upper().strip() | |
| if not dna_input: | |
| return "❌ Empty input", "", "", "", "", None, None, None, None, "No input", "No input", None, None | |
| if not re.match('^[ACTGN]+$', dna_input): | |
| dna_input = ''.join(c if c in 'ACTGN' else 'N' for c in dna_input) | |
| processed_sequence = dna_input | |
| boundary_output = "" | |
| if boundary_model: | |
| try: | |
| result = boundary_model.predict_sequence(dna_input) | |
| regions = result['gene_regions'] | |
| if regions: | |
| processed_sequence = regions[0]["sequence"] | |
| boundary_output = f"✅ F gene region found: {len(processed_sequence)} bp" | |
| else: | |
| boundary_output = "⚠️ No F gene regions found." | |
| processed_sequence = dna_input | |
| except Exception as e: | |
| boundary_output = f"❌ Boundary prediction error: {str(e)}" | |
| processed_sequence = dna_input | |
| else: | |
| boundary_output = f"⚠️ Boundary model not available. Using full input: {len(dna_input)} bp" | |
| keras_output = predict_with_keras(processed_sequence) if processed_sequence and len(processed_sequence) >= 6 else "❌ Sequence too short." | |
| aligned_file = None | |
| phy_file = None | |
| ml_tree_output = "" | |
| if build_ml_tree and processed_sequence and len(processed_sequence) >= 100: | |
| try: | |
| mafft_available, iqtree_available, mafft_cmd, iqtree_cmd = check_tool_availability() | |
| if mafft_available and iqtree_available: | |
| ml_success, ml_message, ml_aligned, ml_tree = phylogenetic_placement(processed_sequence, mafft_cmd, iqtree_cmd) | |
| ml_tree_output = ml_message | |
| aligned_file = ml_aligned | |
| phy_file = ml_tree | |
| else: | |
| ml_tree_output = "❌ MAFFT or IQ-TREE not available" | |
| except Exception as e: | |
| ml_tree_output = f"❌ ML tree error: {str(e)}" | |
| elif build_ml_tree: | |
| ml_tree_output = "❌ Sequence too short for placement (<100 bp)." | |
| else: | |
| ml_tree_output = "⚠️ Phylogenetic placement skipped." | |
| tree_html_content = "No tree generated." | |
| report_html_content = "No report generated." | |
| tree_html_path = None | |
| report_html_path = None | |
| simplified_ml_output = "" | |
| if analyzer and processed_sequence and len(processed_sequence) >= 10: | |
| try: | |
| tree_result, tree_html_path, report_html_path = analyze_sequence_for_tree(processed_sequence, similarity_score) | |
| simplified_ml_output = tree_result | |
| if tree_html_path and os.path.exists(tree_html_path): | |
| with open(tree_html_path, 'r', encoding='utf-8') as f: | |
| tree_html_content = f.read() | |
| else: | |
| tree_html_content = f"<div style='color: red;'>{tree_result}</div>" | |
| if report_html_path and os.path.exists(report_html_path): | |
| with open(report_html_path, 'r', encoding='utf-8') as f: | |
| report_html_content = f.read() | |
| else: | |
| report_html_content = f"<div style='color: red;'>{tree_result}</div>" | |
| except Exception as e: | |
| simplified_ml_output = f"❌ Tree analysis error: {str(e)}" | |
| tree_html_content = f"<div style='color: red;'>{simplified_ml_output}</div>" | |
| report_html_content = f"<div style='color: red;'>{simplified_ml_output}</div>" | |
| else: | |
| simplified_ml_output = "❌ Tree analyzer not available." if not analyzer else "❌ Sequence too short (<10 bp)." | |
| tree_html_content = f"<div style='color: orange;'>{simplified_ml_output}</div>" | |
| report_html_content = f"<div style='color: orange;'>{simplified_ml_output}</div>" | |
| summary_output = f""" | |
| 📊 ANALYSIS SUMMARY: | |
| ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ | |
| Input: {len(dna_input)} bp | |
| F Gene: {len(processed_sequence)} bp | |
| Validation: {keras_output.split(':')[-1].strip() if ':' in keras_output else keras_output} | |
| Placement: {'✅ OK' if '✅' in ml_tree_output else '⚠️ Skipped' if 'skipped' in ml_tree_output else '❌ Failed'} | |
| Tree Analysis: {'✅ OK' if 'Found' in simplified_ml_output else '❌ Failed'} | |
| ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ | |
| """ | |
| return ( | |
| boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output, | |
| aligned_file, phy_file, None, None, tree_html_content, report_html_content, | |
| tree_html_path, report_html_path | |
| ) | |
| except Exception as e: | |
| logger.error(f"Pipeline error: {e}", exc_info=True) | |
| error_msg = f"❌ Pipeline Error: {str(e)}" | |
| return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg, None, None | |
| async def run_pipeline_from_file(fasta_file_obj, similarity_score, build_ml_tree): | |
| temp_file_path = None | |
| try: | |
| if fasta_file_obj is None: | |
| return "❌ No file provided", "", "", "", "", None, None, None, None, "No input", "No input", None, None | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".fasta", dir="/tmp") as temp_file: | |
| if isinstance(fasta_file_obj, UploadFile): | |
| content = await fasta_file_obj.read() | |
| temp_file.write(content) | |
| else: | |
| with open(fasta_file_obj, 'rb') as f: | |
| content = f.read() | |
| temp_file.write(content) | |
| temp_file_path = temp_file.name | |
| dna_input = read_fasta_file(temp_file_path) | |
| if not dna_input: | |
| cleanup_file(temp_file_path) | |
| return "❌ Failed to read FASTA file", "", "", "", "", None, None, None, None, "No input", "No input", None, None | |
| result = run_pipeline(dna_input, similarity_score, build_ml_tree) | |
| cleanup_file(temp_file_path) | |
| return result | |
| except Exception as main_error: | |
| logger.error(f"Pipeline from file error: {main_error}", exc_info=True) | |
| cleanup_file(temp_file_path) | |
| error_msg = f"❌ Error: {str(main_error)}" | |
| return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg, None, None | |
| class AnalysisRequest(BaseModel): | |
| sequence: str | |
| similarity_score: float = 95.0 | |
| build_ml_tree: bool = False | |
| class AnalysisResponse(BaseModel): | |
| boundary_output: str | |
| keras_output: str | |
| ml_tree_output: str | |
| tree_analysis_output: str | |
| summary_output: str | |
| success: bool | |
| error_message: Optional[str] = None | |
| tree_html_path: Optional[str] = None | |
| report_html_path: Optional[str] = None | |
| # --- FastAPI App Setup --- | |
| app = FastAPI(title="🧬 Gene Analysis Pipeline", version="1.0.0") | |
| async def root(): | |
| return { | |
| "message": "🧬 Gene Analysis Pipeline API", | |
| "status": "running", | |
| "endpoints": { | |
| "docs": "/docs", | |
| "health": "/health", | |
| "gradio": "/gradio", | |
| "analyze": "/analyze", | |
| "analyze_file": "/analyze-file", | |
| "download": "/download/{file_type}/{query_id}" | |
| } | |
| } | |
| async def health_check(): | |
| try: | |
| mafft_available, iqtree_available, _, _ = check_tool_availability() | |
| return { | |
| "status": "healthy", | |
| "components": { | |
| "boundary_model": boundary_model is not None, | |
| "keras_model": keras_model is not None, | |
| "tree_analyzer": analyzer is not None, | |
| "mafft_available": mafft_available, | |
| "iqtree_available": iqtree_available | |
| }, | |
| "paths": { | |
| "base_dir": BASE_DIR, | |
| "query_output_dir": QUERY_OUTPUT_DIR | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Health check error: {e}", exc_info=True) | |
| return {"status": "unhealthy", "error": str(e)} | |
| async def analyze_sequence(request: AnalysisRequest): | |
| try: | |
| result = run_pipeline(request.sequence, request.similarity_score, request.build_ml_tree) | |
| return AnalysisResponse( | |
| boundary_output=result[0] or "", | |
| keras_output=result[1] or "", | |
| ml_tree_output=result[2] or "", | |
| tree_analysis_output=result[3] or "", | |
| summary_output=result[4] or "", | |
| tree_html_path=result[11], | |
| report_html_path=result[12], | |
| success=True | |
| ) | |
| except Exception as e: | |
| logger.error(f"Analyze error: {e}", exc_info=True) | |
| return AnalysisResponse( | |
| boundary_output="", keras_output="", ml_tree_output="", | |
| tree_analysis_output="", summary_output="", | |
| tree_html_path=None, report_html_path=None, | |
| success=False, error_message=str(e) | |
| ) | |
| async def analyze_file( | |
| file: UploadFile = File(...), | |
| similarity_score: float = Form(95.0), | |
| build_ml_tree: bool = Form(False) | |
| ): | |
| temp_file_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".fasta", dir="/tmp") as temp_file: | |
| content = await file.read() | |
| temp_file.write(content) | |
| temp_file_path = temp_file.name | |
| result = await run_pipeline_from_file(temp_file_path, similarity_score, build_ml_tree) | |
| cleanup_file(temp_file_path) | |
| return AnalysisResponse( | |
| boundary_output=result[0] or "", | |
| keras_output=result[1] or "", | |
| ml_tree_output=result[2] or "", | |
| tree_analysis_output=result[3] or "", | |
| summary_output=result[4] or "", | |
| tree_html_path=result[11], | |
| report_html_path=result[12], | |
| success=True | |
| ) | |
| except Exception as main_error: | |
| logger.error(f"Analyze-file error: {main_error}", exc_info=True) | |
| cleanup_file(temp_file_path) | |
| return AnalysisResponse( | |
| boundary_output="", keras_output="", ml_tree_output="", | |
| tree_analysis_output="", summary_output="", | |
| tree_html_path=None, report_html_path=None, | |
| success=False, error_message=str(main_error) | |
| ) | |
| async def download_file(file_type: str, query_id: str): | |
| try: | |
| if file_type not in ["tree", "report"]: | |
| raise HTTPException(status_code=400, detail="Invalid file type. Use 'tree' or 'report'.") | |
| file_name = f"phylogenetic_tree_{query_id}.html" if file_type == "tree" else f"detailed_report_{query_id}.html" | |
| file_path = os.path.join("/tmp", file_name) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found.") | |
| return FileResponse(file_path, filename=file_name, media_type="text/html") | |
| except Exception as e: | |
| logger.error(f"Download error: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail=f"Error serving file: {str(e)}") | |
| # --- Gradio Interface --- | |
| def create_gradio_interface(): | |
| try: | |
| with gr.Blocks( | |
| title="🧬 Gene Analysis Pipeline", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { max-width: 1200px !important; } | |
| .status-box { padding: 10px; border-radius: 5px; margin: 5px 0; } | |
| .success { background-color: #d4edda; border: 1px solid #c3e6cb; color: #155724; } | |
| .warning { background-color: #fff3cd; border: 1px solid #ffeaa7; color: #856404; } | |
| .error { background-color: #f8d7da; border: 1px solid #f5c6cb; color: #721c24; } | |
| """ | |
| ) as iface: | |
| gr.Markdown("# 🧬 Gene Analysis Pipeline") | |
| with gr.Row(): | |
| with gr.Column(): | |
| status_display = gr.HTML(value=f""" | |
| <div class="status-box"> | |
| <h3>🔧 System Status</h3> | |
| <p>🤖 Boundary Model: {'✅ Loaded' if boundary_model else '❌ Missing'}</p> | |
| <p>🧠 Keras Model: {'✅ Loaded' if keras_model else '❌ Missing'}</p> | |
| <p>🌳 Tree Analyzer: {'✅ Loaded' if analyzer else '❌ Missing'}</p> | |
| <p>🧬 MAFFT: {'✅ Available' if check_tool_availability()[0] else '❌ Missing'}</p> | |
| <p>🌲 IQ-TREE: {'✅ Available' if check_tool_availability()[1] else '❌ Missing'}</p> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| with gr.TabItem("📝 Text Input"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("Paste your DNA sequence here") | |
| dna_input = gr.Textbox( | |
| label="🧬 DNA Sequence", | |
| placeholder="Enter DNA sequence (ATCG format)...", | |
| lines=5 | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("Minimum similarity for tree analysis") | |
| similarity_score = gr.Slider( | |
| minimum=1, | |
| maximum=99, | |
| value=95.0, | |
| step=1.0, | |
| label="🎯 Similarity Threshold (%)" | |
| ) | |
| gr.Markdown("Generate phylogenetic placement (slower)") | |
| build_ml_tree = gr.Checkbox( | |
| label="🌲 Build ML Tree", | |
| value=False | |
| ) | |
| analyze_btn = gr.Button("🔬 Analyze Sequence", variant="primary") | |
| with gr.TabItem("📁 File Upload"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("Upload a FASTA file containing your sequence") | |
| file_input = gr.File( | |
| label="📄 Upload FASTA File", | |
| file_types=[".fasta", ".fa", ".fas", ".txt"] | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("Minimum similarity for tree analysis") | |
| file_similarity_score = gr.Slider( | |
| minimum=1, | |
| maximum=99, | |
| value=95.0, | |
| step=1.0, | |
| label="🎯 Similarity Threshold (%)" | |
| ) | |
| gr.Markdown("Generate phylogenetic placement (slower)") | |
| file_build_ml_tree = gr.Checkbox( | |
| label="🌲 Build ML Tree", | |
| value=False | |
| ) | |
| analyze_file_btn = gr.Button("🔬 Analyze File", variant="primary") | |
| gr.Markdown("## 📊 Analysis Results") | |
| with gr.Row(): | |
| with gr.Column(): | |
| boundary_output = gr.Textbox( | |
| label="🎯 Boundary Detection", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| keras_output = gr.Textbox( | |
| label="🧠 F Gene Validation", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| with gr.Column(): | |
| ml_tree_output = gr.Textbox( | |
| label="🌲 Phylogenetic Placement", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| tree_analysis_output = gr.Textbox( | |
| label="🌳 Tree Analysis", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| summary_output = gr.Textbox( | |
| label="📋 Summary", | |
| interactive=False, | |
| lines=8 | |
| ) | |
| with gr.Row(): | |
| aligned_file = gr.File(label="📄 Alignment File", visible=False) | |
| tree_file = gr.File(label="🌲 Tree File", visible=False) | |
| tree_html_file = gr.File(label="🌳 Simplified Tree HTML", visible=False) | |
| report_html_file = gr.File(label="📊 Detailed Report HTML", visible=False) | |
| with gr.Tabs(): | |
| with gr.TabItem("🌳 Interactive Tree"): | |
| tree_html = gr.HTML( | |
| value="<div style='text-align: center; color: #666; padding: 20px;'>No tree generated yet. Run analysis to see results.</div>" | |
| ) | |
| with gr.TabItem("📊 Detailed Report"): | |
| report_html = gr.HTML( | |
| label="Analysis Report", | |
| value="<div style='text-align: center; color: #666; padding: 20px;'>No report generated yet. Run analysis to see results.</div>" | |
| ) | |
| # Event handlers | |
| def handle_analysis_output(*outputs): | |
| boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output, aligned_file, phy_file, _, _, tree_html_content, report_html_content, tree_html_path, report_html_path = outputs | |
| return ( | |
| boundary_output, keras_output, ml_tree_output, simplified_ml_output, summary_output, | |
| gr.File.update(value=aligned_file, visible=aligned_file is not None), | |
| gr.File.update(value=phy_file, visible=phy_file is not None), | |
| gr.File.update(value=tree_html_path, visible=tree_html_path is not None), | |
| gr.File.update(value=report_html_path, visible=report_html_path is not None), | |
| tree_html_content, | |
| report_html_content | |
| ) | |
| analyze_btn.click( | |
| fn=run_pipeline, | |
| inputs=[dna_input, similarity_score, build_ml_tree], | |
| outputs=[ | |
| boundary_output, keras_output, ml_tree_output, tree_analysis_output, summary_output, | |
| aligned_file, tree_file, tree_html_file, report_html_file, tree_html, report_html | |
| ] | |
| ) | |
| analyze_file_btn.click( | |
| fn=run_pipeline_from_file, | |
| inputs=[file_input, file_similarity_score, file_build_ml_tree], | |
| outputs=[ | |
| boundary_output, keras_output, ml_tree_output, tree_analysis_output, summary_output, | |
| aligned_file, tree_file, tree_html_file, report_html_file, tree_html, report_html | |
| ] | |
| ) | |
| # Examples | |
| gr.Examples( | |
| examples=[ | |
| ["ATCG" * 250, 85.0, False], | |
| ["CGATCG" * 150, 90.0, True] | |
| ], | |
| inputs=[dna_input, similarity_score, build_ml_tree], | |
| label="Example Sequences" | |
| ) | |
| gr.Markdown(""" | |
| ## 📚 Instructions | |
| 1. **Input**: Enter a DNA sequence (ATCG format) or upload a FASTA file | |
| 2. **Parameters**: | |
| - Set similarity threshold for phylogenetic analysis (1-99%) | |
| - Choose whether to build ML tree (slower but more accurate) | |
| 3. **Analysis**: Click analyze to run the complete pipeline | |
| 4. **Results**: View results in different tabs - summary, tree visualization, and detailed report | |
| 5. **Downloads**: Download alignment, tree, simplified tree HTML, and detailed report HTML files | |
| ### 🔬 Pipeline Components: | |
| - **Boundary Detection**: Identifies F gene regions | |
| - **F Gene Validation**: Validates F gene using ML | |
| - **Phylogenetic Placement**: Places sequence in reference tree (optional) | |
| - **Tree Analysis**: Builds phylogenetic tree with similar sequences | |
| """) | |
| return iface | |
| except Exception as main_error: | |
| logger.error(f"Gradio interface creation failed: {main_error}", exc_info=True) | |
| return gr.Interface( | |
| fn=lambda x: f"Error: {str(main_error)}", | |
| inputs=gr.Textbox(label="DNA Sequence"), | |
| outputs=gr.Textbox(label="Error"), | |
| title="🧬 Gene Analysis Pipeline (Error Mode)" | |
| ) | |
| # --- Application Startup --- | |
| def run_application(): | |
| try: | |
| logger.info("🧬 Initializing Gene Analysis Pipeline...") | |
| main_gradio_app = create_gradio_interface() | |
| if main_gradio_app is None: | |
| raise RuntimeError("Gradio interface creation returned None") | |
| logger.info("✅ Gradio interface created successfully") | |
| main_gradio_app = gr.mount_gradio_app(app, main_gradio_app, path="/gradio") | |
| logger.info("✅ Gradio mounted to FastAPI at /gradio") | |
| logger.info("=" * 50) | |
| logger.info("🔍 Checking system components...") | |
| logger.info(f"🤖 Boundary Model: {'✅ Loaded' if boundary_model else '❌ Missing'}") | |
| logger.info(f"🧠 Keras Model: {'✅ Loaded' if keras_model else '❌ Missing'}") | |
| logger.info(f"🌳 Tree Analyzer: {'✅ Loaded' if analyzer else '❌ Missing'}") | |
| mafft_available, iqtree_available, _, _ = check_tool_availability() | |
| logger.info(f"🧬 MAFFT: {'✅ Available' if mafft_available else '❌ Missing'}") | |
| logger.info(f"🌲 IQ-TREE: {'✅ Available' if iqtree_available else '❌ Missing'}") | |
| logger.info("=" * 50) | |
| logger.info("🚀 Starting Gene Analysis Pipeline...") | |
| logger.warning("⚠️ Running without request queuing. Concurrent requests may block.") | |
| logger.info("📊 FastAPI docs available at: http://localhost:7860/docs") | |
| logger.info("🧬 Gradio interface available at: http://localhost:7860/gradio") | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=7860, | |
| log_level="info", | |
| access_log=True, | |
| timeout_keep_alive=120 | |
| ) | |
| except Exception as main_error: | |
| logger.error(f"Application startup failed: {main_error}", exc_info=True) | |
| try: | |
| logger.info("🔄 Falling back to Gradio-only mode...") | |
| fallback_gradio_app = create_gradio_interface() | |
| if fallback_gradio_app is None: | |
| raise RuntimeError("Fallback Gradio interface creation returned None") | |
| logger.info("✅ Fallback Gradio interface created successfully") | |
| logger.info("🧬 Gradio interface available at: http://localhost:7860") | |
| fallback_gradio_app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| prevent_thread_lock=True, | |
| quiet=True | |
| ) | |
| except Exception as fallback_error: | |
| logger.error(f"Fallback failed: {fallback_error}", exc_info=True) | |
| print("❌ Application failed to start. Check logs at /tmp/app.log for details.") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| print("🧬 Gene Analysis Pipeline Starting...") | |
| print("=" * 50) | |
| print("🔍 Checking system components...") | |
| mafft_available, iqtree_available, _, _ = check_tool_availability() | |
| print(f"🤖 Boundary Model: {'✅' if boundary_model else '❌'}") | |
| print(f"🧠 Keras Model: {'✅' if keras_model else '❌'}") | |
| print(f"🌳 Tree Analyzer: {'✅' if analyzer else '❌'}") | |
| print(f"🧬 MAFFT: {'✅' if mafft_available else '❌'}") | |
| print(f"🌲 IQ-TREE: {'✅' if iqtree_available else '❌'}") | |
| print("=" * 50) | |
| run_application() |