Spaces:
No application file
No application file
| import gradio as gr | |
| import torch | |
| import pickle | |
| import subprocess | |
| import pandas as pd | |
| import os | |
| import re | |
| import logging | |
| import numpy as np | |
| from predictor import EnhancedGenePredictor | |
| from tensorflow.keras.models import load_model | |
| # Import the new analyzer | |
| from analyzer import PhylogeneticTreeAnalyzer | |
| import tempfile | |
| import shutil | |
| import sys | |
| import uuid | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download | |
| from Bio import SeqIO | |
| from Bio.Seq import Seq | |
| from Bio.SeqRecord import SeqRecord | |
| import stat | |
| import time | |
| # --- Global Variables --- | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| MAFFT_PATH = os.path.join(BASE_DIR, "binaries", "mafft", "mafft") # Updated path | |
| IQTREE_PATH = os.path.join(BASE_DIR, "binaries", "iqtree", "bin", "iqtree3") | |
| ALIGNMENT_PATH = os.path.join(BASE_DIR, "f_gene_sequences_aligned.fasta") | |
| TREE_PATH = os.path.join(BASE_DIR, "f_gene_sequences.phy.treefile") | |
| QUERY_OUTPUT_DIR = os.path.join(BASE_DIR, "queries") | |
| os.makedirs(QUERY_OUTPUT_DIR, exist_ok=True) | |
| # --- Logging --- | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # --- Paths --- | |
| # Model repository and file paths | |
| model_repo = "GGproject10/best_boundary_aware_model" | |
| csv_path = "f cleaned.csv" | |
| # Get HF token from environment (if available) | |
| hf_token = os.getenv("HF_TOKEN") | |
| # --- Load Models --- | |
| boundary_model = None | |
| keras_model = None | |
| kmer_to_index = None | |
| # Try to load boundary model from Hugging Face Hub | |
| try: | |
| boundary_path = hf_hub_download( | |
| repo_id=model_repo, | |
| filename="best_boundary_aware_model.pth", | |
| token=hf_token | |
| ) | |
| if os.path.exists(boundary_path): | |
| boundary_model = EnhancedGenePredictor(boundary_path) | |
| logging.info("Boundary model loaded successfully from Hugging Face Hub.") | |
| else: | |
| logging.warning(f"Boundary model file not found after download") | |
| except Exception as e: | |
| logging.error(f"Failed to load boundary model from HF Hub: {e}") | |
| # Try to load Keras model from Hugging Face Hub | |
| try: | |
| keras_path = hf_hub_download( | |
| repo_id=model_repo, | |
| filename="best_model.keras", | |
| token=hf_token | |
| ) | |
| kmer_path = hf_hub_download( | |
| repo_id=model_repo, | |
| filename="kmer_to_index.pkl", | |
| token=hf_token | |
| ) | |
| if os.path.exists(keras_path) and os.path.exists(kmer_path): | |
| keras_model = load_model(keras_path) | |
| with open(kmer_path, "rb") as f: | |
| kmer_to_index = pickle.load(f) | |
| logging.info("Keras model and k-mer index loaded successfully from Hugging Face Hub.") | |
| else: | |
| logging.warning(f"Keras model or kmer files not found after download") | |
| except Exception as e: | |
| logging.error(f"Failed to load Keras model from HF Hub: {e}") | |
| # --- Initialize New Tree Analyzer --- | |
| analyzer = None | |
| try: | |
| analyzer = PhylogeneticTreeAnalyzer() | |
| # Try multiple potential locations for the CSV file | |
| csv_candidates = [ | |
| csv_path, | |
| os.path.join(BASE_DIR, csv_path), | |
| os.path.join(BASE_DIR, "app", csv_path), | |
| os.path.join(os.path.dirname(__file__), csv_path), | |
| "f_cleaned.csv", # Alternative naming | |
| os.path.join(BASE_DIR, "f_cleaned.csv") | |
| ] | |
| csv_loaded = False | |
| for csv_candidate in csv_candidates: | |
| if os.path.exists(csv_candidate): | |
| if analyzer.load_data(csv_candidate): | |
| logging.info(f"Tree analyzer data loaded from: {csv_candidate}") | |
| csv_loaded = True | |
| csv_path = csv_candidate # Update path for consistency | |
| break | |
| else: | |
| logging.warning(f"Failed to load data from: {csv_candidate}") | |
| if not csv_loaded: | |
| logging.error("Failed to load CSV data from any candidate location") | |
| analyzer = None | |
| else: | |
| # Try to train AI model (optional) | |
| try: | |
| if analyzer.train_ai_model(): | |
| logging.info("AI model training completed successfully") | |
| else: | |
| logging.warning("AI model training failed; proceeding with basic analysis.") | |
| except Exception as e: | |
| logging.warning(f"AI model training failed: {e}") | |
| except Exception as e: | |
| logging.error(f"Failed to initialize tree analyzer: {e}") | |
| analyzer = None | |
| # --- Enhanced Tool Detection with Binary Permission Setup --- | |
| def setup_binary_permissions(): | |
| """Set executable permissions on MAFFT and IQ-TREE binaries""" | |
| binaries = [MAFFT_PATH, IQTREE_PATH] | |
| for binary in binaries: | |
| if os.path.exists(binary): | |
| try: | |
| # Set executable permission | |
| current_mode = os.stat(binary).st_mode | |
| os.chmod(binary, current_mode | stat.S_IEXEC) | |
| logging.info(f"Set executable permission on {binary}") | |
| except Exception as e: | |
| logging.warning(f"Failed to set executable permission on {binary}: {e}") | |
| else: | |
| logging.warning(f"Binary not found: {binary}") | |
| def check_tool_availability(): | |
| """Enhanced check for MAFFT and IQ-TREE availability with improved path validation""" | |
| # First, ensure binaries have executable permissions | |
| setup_binary_permissions() | |
| # Check MAFFT | |
| mafft_available = False | |
| mafft_cmd = None | |
| # Updated MAFFT candidates list based on your new API | |
| mafft_candidates = [ | |
| MAFFT_PATH, # Primary path from your new API | |
| os.path.join(BASE_DIR, "binaries", "mafft", "mafft"), | |
| os.path.join(BASE_DIR, "binaries", "mafft", "mafft.bat"), # Windows fallback | |
| 'mafft', | |
| '/usr/bin/mafft', | |
| '/usr/local/bin/mafft', | |
| os.path.join(BASE_DIR, "binaries", "mafft", "mafftdir", "bin", "mafft"), | |
| # Add potential conda/miniconda paths | |
| os.path.expanduser("~/anaconda3/bin/mafft"), | |
| os.path.expanduser("~/miniconda3/bin/mafft"), | |
| "/opt/conda/bin/mafft", | |
| "/usr/local/miniconda3/bin/mafft" | |
| ] | |
| for candidate in mafft_candidates: | |
| if not candidate: | |
| continue | |
| # First check if file exists or is in PATH | |
| if os.path.exists(candidate) or shutil.which(candidate): | |
| # Now test actual execution | |
| try: | |
| test_cmd = [candidate, "--help"] | |
| result = subprocess.run( | |
| test_cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=10 | |
| ) | |
| if result.returncode == 0 or "mafft" in result.stderr.lower(): | |
| mafft_available = True | |
| mafft_cmd = candidate | |
| logging.info(f"MAFFT found and tested successfully at: {candidate}") | |
| break | |
| except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError) as e: | |
| logging.debug(f"MAFFT test failed for {candidate}: {e}") | |
| continue | |
| # Check IQ-TREE with similar approach | |
| iqtree_available = False | |
| iqtree_cmd = None | |
| # Updated IQ-TREE candidates list | |
| iqtree_candidates = [ | |
| IQTREE_PATH, # Primary path from your new API | |
| 'iqtree2', | |
| 'iqtree', | |
| 'iqtree3', | |
| '/usr/bin/iqtree2', | |
| '/usr/local/bin/iqtree2', | |
| '/usr/bin/iqtree', | |
| '/usr/local/bin/iqtree', | |
| 'iqtree2.exe', # Windows | |
| 'iqtree.exe', # Windows | |
| 'iqtree3.exe', # Windows | |
| os.path.join(BASE_DIR, "binaries", "iqtree", "bin", "iqtree2"), | |
| # Add potential conda paths | |
| os.path.expanduser("~/anaconda3/bin/iqtree2"), | |
| os.path.expanduser("~/miniconda3/bin/iqtree2"), | |
| "/opt/conda/bin/iqtree2", | |
| "/usr/local/miniconda3/bin/iqtree2" | |
| ] | |
| for candidate in iqtree_candidates: | |
| if not candidate: | |
| continue | |
| if os.path.exists(candidate) or shutil.which(candidate): | |
| try: | |
| test_cmd = [candidate, "--help"] | |
| result = subprocess.run( | |
| test_cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=10 | |
| ) | |
| if result.returncode == 0 or "iqtree" in result.stderr.lower(): | |
| iqtree_available = True | |
| iqtree_cmd = candidate | |
| logging.info(f"IQ-TREE found and tested successfully at: {candidate}") | |
| break | |
| except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError) as e: | |
| logging.debug(f"IQ-TREE test failed for {candidate}: {e}") | |
| continue | |
| return mafft_available, iqtree_available, mafft_cmd, iqtree_cmd | |
| def install_dependencies_guide(): | |
| """Provide installation guidance for missing dependencies""" | |
| guide = """ | |
| 🔧 INSTALLATION GUIDE FOR MISSING DEPENDENCIES: | |
| For MAFFT: | |
| - Ubuntu/Debian: sudo apt-get install mafft | |
| - CentOS/RHEL: sudo yum install mafft | |
| - macOS: brew install mafft | |
| - Windows: Download from https://mafft.cbrc.jp/alignment/software/ | |
| - Conda: conda install -c bioconda mafft | |
| For IQ-TREE: | |
| - Ubuntu/Debian: sudo apt-get install iqtree | |
| - CentOS/RHEL: sudo yum install iqtree | |
| - macOS: brew install iqtree | |
| - Windows: Download from http://www.iqtree.org/ | |
| - Conda: conda install -c bioconda iqtree | |
| Alternative: Use conda/mamba (RECOMMENDED): | |
| - conda install -c bioconda mafft iqtree | |
| Docker option: | |
| - docker run -it --rm -v $(pwd):/data quay.io/biocontainers/mafft:7.490--h779adbc_0 | |
| - docker run -it --rm -v $(pwd):/data quay.io/biocontainers/iqtree:2.1.4_beta--hdcc8f71_0 | |
| TROUBLESHOOTING: | |
| If tools are installed but not detected, try: | |
| 1. Add installation directory to PATH | |
| 2. Use absolute paths in the configuration | |
| 3. Check permissions on executable files | |
| 4. Ensure binaries have executable permissions (chmod +x) | |
| """ | |
| return guide | |
| def phylogenetic_placement(sequence: str, mafft_cmd: str, iqtree_cmd: str): | |
| """ | |
| Improved phylogenetic placement using the new API approach. | |
| This adds the query sequence to a reference alignment and tree. | |
| """ | |
| try: | |
| # Validate sequence | |
| if len(sequence.strip()) < 100: | |
| return False, "Error: Sequence is too short for phylogenetic placement (minimum 100 bp).", None, None | |
| # Generate unique query ID | |
| query_id = f"QUERY_{uuid.uuid4().hex[:8]}" | |
| query_fasta = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}.fa") | |
| aligned_with_query = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_aligned.fa") | |
| output_prefix = os.path.join(QUERY_OUTPUT_DIR, f"{query_id}_placed_tree") | |
| # Check if reference files exist | |
| if not os.path.exists(ALIGNMENT_PATH): | |
| return False, f"Reference alignment not found: {ALIGNMENT_PATH}", None, None | |
| if not os.path.exists(TREE_PATH): | |
| return False, f"Reference tree not found: {TREE_PATH}", None, None | |
| # Save query sequence as FASTA (improved error handling) | |
| try: | |
| query_record = SeqRecord(Seq(sequence.upper()), id=query_id, description="") | |
| SeqIO.write([query_record], query_fasta, "fasta") | |
| logging.info(f"Query sequence saved: {query_fasta}") | |
| except Exception as e: | |
| return False, f"Error writing query sequence: {e}", None, None | |
| # Step 1: Add query sequence to reference alignment using MAFFT (improved approach) | |
| logging.info("Adding query sequence to reference alignment...") | |
| try: | |
| with open(aligned_with_query, "w") as output_file: | |
| mafft_result = subprocess.run([ | |
| mafft_cmd, "--add", query_fasta, "--reorder", ALIGNMENT_PATH | |
| ], stdout=output_file, stderr=subprocess.PIPE, text=True, timeout=600, check=True) | |
| # Verify alignment file was created and is not empty | |
| if not os.path.exists(aligned_with_query) or os.path.getsize(aligned_with_query) == 0: | |
| return False, "MAFFT alignment failed: output file is empty", None, None | |
| logging.info(f"MAFFT alignment completed: {aligned_with_query}") | |
| except subprocess.CalledProcessError as e: | |
| error_msg = e.stderr if e.stderr else "Unknown MAFFT error" | |
| return False, f"MAFFT alignment failed: {error_msg}", None, None | |
| except subprocess.TimeoutExpired: | |
| return False, "MAFFT alignment timeout (>10 minutes)", None, None | |
| except FileNotFoundError: | |
| return False, f"MAFFT executable not found: {mafft_cmd}", None, None | |
| except Exception as e: | |
| return False, f"MAFFT execution error: {e}", None, None | |
| # Step 2: Place sequence in phylogenetic tree using IQ-TREE (improved approach) | |
| logging.info("Placing sequence in phylogenetic tree...") | |
| try: | |
| iqtree_result = subprocess.run([ | |
| iqtree_cmd, "-s", aligned_with_query, "-g", TREE_PATH, | |
| "-m", "GTR+G", "-pre", output_prefix, "-redo" | |
| ], capture_output=True, text=True, timeout=1200, check=True) | |
| # Check if treefile was generated | |
| treefile = f"{output_prefix}.treefile" | |
| if not os.path.exists(treefile) or os.path.getsize(treefile) == 0: | |
| return False, "IQ-TREE placement failed: treefile not generated", aligned_with_query, None | |
| logging.info(f"IQ-TREE placement completed: {treefile}") | |
| # Generate success message with details | |
| success_msg = "✅ Phylogenetic placement completed successfully!\n" | |
| success_msg += f"- Query ID: {query_id}\n" | |
| success_msg += f"- Alignment: {os.path.basename(aligned_with_query)}\n" | |
| success_msg += f"- Tree: {os.path.basename(treefile)}\n" | |
| # Try to extract model information from log | |
| log_file = f"{output_prefix}.log" | |
| if os.path.exists(log_file): | |
| try: | |
| with open(log_file, 'r') as f: | |
| log_content = f.read() | |
| if "Log-likelihood" in log_content: | |
| log_lines = [line for line in log_content.split('\n') if "Log-likelihood" in line] | |
| if log_lines: | |
| success_msg += f"- {log_lines[0].strip()}\n" | |
| except Exception as e: | |
| logging.warning(f"Could not read log file: {e}") | |
| return True, success_msg, aligned_with_query, treefile | |
| except subprocess.CalledProcessError as e: | |
| error_msg = e.stderr if e.stderr else "Unknown IQ-TREE error" | |
| return False, f"IQ-TREE placement failed: {error_msg}", aligned_with_query, None | |
| except subprocess.TimeoutExpired: | |
| return False, "IQ-TREE placement timeout (>20 minutes)", aligned_with_query, None | |
| except FileNotFoundError: | |
| return False, f"IQ-TREE executable not found: {iqtree_cmd}", aligned_with_query, None | |
| except Exception as e: | |
| return False, f"IQ-TREE execution error: {e}", aligned_with_query, None | |
| except Exception as e: | |
| logging.error(f"Phylogenetic placement failed: {e}") | |
| return False, f"Phylogenetic placement failed: {str(e)}", None, None | |
| finally: | |
| # Clean up temporary query file | |
| if 'query_fasta' in locals() and os.path.exists(query_fasta): | |
| try: | |
| os.unlink(query_fasta) | |
| except: | |
| pass | |
| def build_maximum_likelihood_tree(f_gene_sequence): | |
| """ | |
| Build maximum likelihood phylogenetic tree using the improved phylogenetic placement approach. | |
| """ | |
| try: | |
| # Check tool availability with enhanced detection | |
| mafft_available, iqtree_available, mafft_cmd, iqtree_cmd = check_tool_availability() | |
| # Prepare status message | |
| status_msg = "🔍 Checking dependencies...\n" | |
| if not mafft_available: | |
| status_msg += "❌ MAFFT not found or not executable\n" | |
| else: | |
| status_msg += f"✅ MAFFT found and tested: {mafft_cmd}\n" | |
| if not iqtree_available: | |
| status_msg += "❌ IQ-TREE not found or not executable\n" | |
| else: | |
| status_msg += f"✅ IQ-TREE found and tested: {iqtree_cmd}\n" | |
| # Check for reference files | |
| if not os.path.exists(ALIGNMENT_PATH): | |
| status_msg += f"❌ Reference alignment not found: {ALIGNMENT_PATH}\n" | |
| else: | |
| status_msg += f"✅ Reference alignment found\n" | |
| if not os.path.exists(TREE_PATH): | |
| status_msg += f"❌ Reference tree not found: {TREE_PATH}\n" | |
| else: | |
| status_msg += f"✅ Reference tree found\n" | |
| # If any required component is missing, provide installation guide | |
| if not mafft_available or not iqtree_available: | |
| guide = install_dependencies_guide() | |
| return False, f"{status_msg}\n{guide}", None, None | |
| if not os.path.exists(ALIGNMENT_PATH) or not os.path.exists(TREE_PATH): | |
| status_msg += "\n❌ Reference alignment and/or tree files are missing.\n" | |
| status_msg += "Please ensure f_gene_sequences_aligned.fasta and f_gene_sequences.phy.treefile are available." | |
| return False, status_msg, None, None | |
| # Perform phylogenetic placement using improved method | |
| logging.info("Starting phylogenetic placement...") | |
| placement_success, placement_message, aligned_file, tree_file = phylogenetic_placement( | |
| f_gene_sequence, mafft_cmd, iqtree_cmd | |
| ) | |
| if placement_success: | |
| final_message = f"{status_msg}\n{placement_message}" | |
| # Copy files to standard locations for compatibility | |
| if aligned_file and os.path.exists(aligned_file): | |
| standard_aligned = "query_with_references_aligned.fasta" | |
| shutil.copy2(aligned_file, standard_aligned) | |
| aligned_file = standard_aligned | |
| if tree_file and os.path.exists(tree_file): | |
| standard_tree = "query_placement_tree.treefile" | |
| shutil.copy2(tree_file, standard_tree) | |
| tree_file = standard_tree | |
| logging.info("Phylogenetic placement completed successfully") | |
| return True, final_message, aligned_file, tree_file | |
| else: | |
| return False, f"{status_msg}\n{placement_message}", aligned_file, tree_file | |
| except Exception as e: | |
| logging.error(f"ML tree construction failed: {e}") | |
| return False, f"ML tree construction failed: {str(e)}", None, None | |
| # --- NEW Tree Analysis Function (Using the new analyzer API) --- | |
| # Replace this part in your analyze_sequence_for_tree function: | |
| def analyze_sequence_for_tree(sequence: str, matching_percentage: float) -> tuple: | |
| """ | |
| Analyze sequence and create phylogenetic tree and detailed report using the new analyzer API | |
| Args: | |
| sequence (str): DNA sequence to analyze | |
| matching_percentage (float): Similarity threshold percentage | |
| Returns: | |
| tuple: (status_message, tree_html_path, report_html_path) | |
| """ | |
| try: | |
| if not analyzer: | |
| return "❌ Error: Tree analyzer not initialized. Please check if the CSV data file is available.", None, None | |
| if not sequence: | |
| return "❌ Error: Please provide a sequence.", None, None | |
| if not (1 <= matching_percentage <= 99): | |
| return "❌ Error: Matching percentage must be between 1 and 99.", None, None | |
| # Validate inputs | |
| sequence = sequence.strip() | |
| if len(sequence) < 10: | |
| return "❌ Error: Invalid or missing sequence. Must be ≥10 nucleotides.", None, None | |
| # Find query sequence | |
| if not analyzer.find_query_sequence(sequence): | |
| return "❌ Error: Sequence not accepted.", None, None | |
| # Find similar sequences | |
| matched_ids, actual_percentage = analyzer.find_similar_sequences(matching_percentage) | |
| if not matched_ids: | |
| return f"❌ Error: No similar sequences found at {matching_percentage}% similarity threshold.", None, None | |
| logging.info(f"Found {len(matched_ids)} similar sequences at {actual_percentage:.2f}% similarity") | |
| # Build tree structure | |
| analyzer.build_tree_structure_with_ml_safe(matched_ids) | |
| # Create interactive tree | |
| fig = analyzer.create_interactive_tree(matched_ids, actual_percentage) | |
| # Save tree to temporary file | |
| temp_dir = tempfile.gettempdir() | |
| query_id = analyzer.query_id or f"query_{int(time.time())}" | |
| tree_html_path = os.path.join(temp_dir, f'phylogenetic_tree_interactive_{query_id}.html') | |
| fig.write_html(tree_html_path) | |
| # Ensure the analyzer has the correct user input threshold for the report | |
| analyzer.matching_percentage = matching_percentage | |
| # Generate detailed report - FIXED: Only pass the two required parameters | |
| report_success = analyzer.generate_detailed_report(matched_ids, actual_percentage) | |
| report_html_path = None | |
| if report_success: | |
| report_filename = f"detailed_report_{query_id.replace('/', '_')}.html" | |
| report_html_path = os.path.abspath(report_filename) | |
| if not os.path.exists(report_html_path): | |
| logging.warning(f"Report file not found at {report_html_path}") | |
| report_html_path = None | |
| else: | |
| logging.warning("Failed to generate detailed report") | |
| success_msg = f"✅ Analysis complete! Found {len(matched_ids)} similar sequences with {actual_percentage:.2f}% average similarity." | |
| return success_msg, tree_html_path, report_html_path | |
| except Exception as e: | |
| error_msg = f"❌ Error during analysis: {str(e)}" | |
| logging.error(error_msg) | |
| import traceback | |
| logging.error(f"Full traceback: {traceback.format_exc()}") | |
| return error_msg, None, None# --- Keras Prediction --- | |
| def predict_with_keras(sequence): | |
| try: | |
| if not keras_model or not kmer_to_index: | |
| return f"Keras model not available. Input sequence: {sequence[:100]}..." | |
| if len(sequence) < 6: | |
| return "Skipped: sequence too short for F gene validation (minimum 6 nucleotides required)." | |
| # Generate k-mers | |
| kmers = [sequence[i:i+6] for i in range(len(sequence)-5)] | |
| indices = [kmer_to_index.get(kmer, 0) for kmer in kmers] | |
| # Prepare input | |
| input_arr = np.array([indices]) | |
| prediction = keras_model.predict(input_arr, verbose=0)[0] | |
| # Assume the last value is the F gene probability (adjust index if model outputs differ) | |
| f_gene_prob = prediction[-1] # Take the probability of the F gene class | |
| # Convert to percentage with a buffer (e.g., add 5% to account for minor mismatches) | |
| percentage = min(100, max(0, int(f_gene_prob * 100 + 5))) # Ensure 0-100% range | |
| return f"{percentage}% F gene" | |
| except Exception as e: | |
| logging.error(f"Keras prediction failed: {e}") | |
| return f"Keras prediction failed: {str(e)}" | |
| # --- FASTA Reader --- | |
| def read_fasta_file(file_obj): | |
| try: | |
| if file_obj is None: | |
| return "" | |
| # Handle file object | |
| if hasattr(file_obj, 'name'): | |
| with open(file_obj.name, "r") as f: | |
| content = f.read() | |
| else: | |
| content = file_obj.read().decode("utf-8") if hasattr(file_obj, "read") else str(file_obj) | |
| lines = content.strip().split("\n") | |
| seq_lines = [line.strip() for line in lines if not line.startswith(">")] | |
| return ''.join(seq_lines) | |
| except Exception as e: | |
| logging.error(f"Failed to read FASTA file: {e}") | |
| return "" | |
| # --- Full Pipeline --- | |
| def run_pipeline_from_file(fasta_file_obj, similarity_score, build_ml_tree): | |
| try: | |
| dna_input = read_fasta_file(fasta_file_obj) | |
| if not dna_input: | |
| return "Failed to read FASTA file", "", "", "", "", None, None, None, None, "No input sequence", "No input sequence" | |
| return run_pipeline(dna_input, similarity_score, build_ml_tree) | |
| except Exception as e: | |
| error_msg = f"Pipeline error: {str(e)}" | |
| logging.error(error_msg) | |
| return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg | |
| def run_pipeline(dna_input, similarity_score=95.0, build_ml_tree=False): | |
| try: | |
| # Clean input | |
| dna_input = dna_input.upper().strip() | |
| if not dna_input: | |
| return "Empty input", "", "", "", "", None, None, None, None, "No input provided" | |
| # Sanitize DNA sequence | |
| if not re.match('^[ACTGN]+$', dna_input): | |
| dna_input = ''.join(c if c in 'ACTGN' else 'N' for c in dna_input) | |
| logging.info("DNA sequence sanitized") | |
| # Step 1: Boundary Prediction - Extract F gene sequence | |
| processed_sequence = dna_input | |
| boundary_output = "" | |
| if boundary_model: | |
| try: | |
| result = boundary_model.predict_sequence(dna_input) | |
| predictions = result['predictions'] | |
| probs = result['probabilities']['gene'] | |
| confidence = result['confidence'] | |
| regions = result['gene_regions'] | |
| if regions: | |
| processed_sequence = regions[0]["sequence"] | |
| boundary_output = processed_sequence | |
| logging.info(f"F gene extracted: {len(processed_sequence)} bp (confidence: {confidence:.3f})") | |
| else: | |
| boundary_output = f"No F gene regions found in input sequence" | |
| processed_sequence = dna_input | |
| logging.warning("No gene regions found, using full sequence") | |
| logging.info("Boundary model prediction completed") | |
| except Exception as e: | |
| logging.error(f"Boundary model failed: {e}") | |
| boundary_output = f"Boundary model error: {str(e)}" | |
| processed_sequence = dna_input | |
| else: | |
| boundary_output = f"Boundary model not available. Using original input: {len(dna_input)} bp" | |
| processed_sequence = dna_input | |
| # Step 2: Keras Prediction (F gene validation) | |
| keras_output = "" | |
| if processed_sequence and len(processed_sequence) >= 6: | |
| keras_prediction = predict_with_keras(processed_sequence) | |
| keras_output = keras_prediction | |
| else: | |
| keras_output = "Skipped: sequence too short for F gene validation" | |
| # Step 3: Maximum Likelihood Tree (Phylogenetic Placement) | |
| aligned_file = None | |
| phy_file = None | |
| ml_tree_output = "" | |
| if build_ml_tree and processed_sequence and len(processed_sequence) >= 100: | |
| try: | |
| logging.info("Starting phylogenetic placement...") | |
| ml_success, ml_message, ml_aligned, ml_tree = build_maximum_likelihood_tree(processed_sequence) | |
| if ml_success: | |
| ml_tree_output = ml_message | |
| aligned_file = ml_aligned | |
| phy_file = ml_tree | |
| else: | |
| ml_tree_output = ml_message | |
| except Exception as e: | |
| ml_tree_output = f"❌ Phylogenetic placement failed: {str(e)}" | |
| logging.error(f"Phylogenetic placement failed: {e}") | |
| elif build_ml_tree: | |
| ml_tree_output = "❌ F gene sequence too short for phylogenetic placement (minimum 100 bp)" | |
| else: | |
| ml_tree_output = "Phylogenetic placement skipped (not requested)" | |
| # Step 4: Simplified Tree Analysis | |
| tree_html_file = None | |
| report_html_file = None | |
| tree_html_content = "No tree generated" | |
| report_html_content = "No report generated" | |
| simplified_ml_output = "" | |
| if analyzer and processed_sequence and len(processed_sequence) >= 10: | |
| try: | |
| logging.info(f"Starting simplified ML tree analysis with F gene sequence length: {len(processed_sequence)}") | |
| # Updated call to analyze_sequence_for_tree | |
| tree_result, tree_html_path, report_html_path = analyze_sequence_for_tree(processed_sequence, similarity_score) | |
| if tree_html_path and os.path.exists(tree_html_path): | |
| # Copy tree HTML to output directory | |
| output_dir = "output" | |
| os.makedirs(output_dir, exist_ok=True) | |
| safe_seq_name = re.sub(r'[^a-zA-Z0-9_-]', '', processed_sequence[:20]) | |
| timestamp = str(int(time.time())) | |
| tree_html_filename = f"tree_{safe_seq_name}_{timestamp}.html" | |
| tree_html_final_path = os.path.join(output_dir, tree_html_filename) | |
| shutil.copy2(tree_html_path, tree_html_final_path) | |
| tree_html_file = tree_html_final_path | |
| # Read tree HTML content for display | |
| with open(tree_html_path, 'r', encoding='utf-8') as f: | |
| tree_html_content = f.read() | |
| # Clean up temporary tree file | |
| try: | |
| os.unlink(tree_html_path) | |
| except: | |
| pass | |
| if report_html_path and os.path.exists(report_html_path): | |
| # Copy report HTML to output directory | |
| report_html_filename = f"report_{safe_seq_name}_{timestamp}.html" | |
| report_html_final_path = os.path.join(output_dir, report_html_filename) | |
| shutil.copy2(report_html_path, report_html_final_path) | |
| report_html_file = report_html_final_path | |
| # Read report HTML content for display | |
| with open(report_html_path, 'r', encoding='utf-8') as f: | |
| report_html_content = f.read() | |
| # Clean up temporary report file | |
| try: | |
| os.unlink(report_html_path) | |
| except: | |
| pass | |
| simplified_ml_output = tree_result | |
| if not tree_html_file: | |
| tree_html_content = f"<div style='color: red;'>{tree_result}</div>" | |
| if not report_html_file: | |
| report_html_content = f"<div style='color: red;'>{tree_result}</div>" | |
| logging.info(f"Tree analysis completed successfully: {tree_html_filename}") | |
| except Exception as e: | |
| error_msg = f"❌ Tree analysis failed: {str(e)}" | |
| simplified_ml_output = error_msg | |
| tree_html_content = f"<div style='color: red;'>{error_msg}</div>" | |
| report_html_content = f"<div style='color: red;'>{error_msg}</div>" | |
| logging.error(f"Tree analysis failed: {e}") | |
| else: | |
| if not analyzer: | |
| simplified_ml_output = "❌ Tree analyzer not available (CSV data not loaded)" | |
| elif len(processed_sequence) < 10: | |
| simplified_ml_output = "❌ F gene sequence too short for tree analysis (minimum 10 bp)" | |
| else: | |
| simplified_ml_output = "❌ No processed sequence available for tree analysis" | |
| tree_html_content = f"<div style='color: orange;'>{simplified_ml_output}</div>" | |
| report_html_content = f"<div style='color: orange;'>{simplified_ml_output}</div>" | |
| # Final summary | |
| summary_output = f""" | |
| 🧬 ANALYSIS SUMMARY: | |
| ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ | |
| 📊 INPUT: {len(dna_input)} bp DNA sequence | |
| 🎯 F GENE EXTRACTED: {len(processed_sequence)} bp | |
| ✅ F GENE VALIDATION: {keras_output} | |
| 🌳 PHYLOGENETIC PLACEMENT: {'✅ Completed' if 'successfully' in ml_tree_output else '❌ ' + ('Skipped' if 'skipped' in ml_tree_output else 'Failed')} | |
| 🔬 TREE ANALYSIS: {'✅ Completed' if '✅' in simplified_ml_output else '❌ ' + ('Not available' if 'not available' in simplified_ml_output else 'Failed')} | |
| 📝 DETAILED REPORT: {'✅ Generated' if report_html_file else '❌ Not generated'} | |
| ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ | |
| """ | |
| return ( | |
| boundary_output, | |
| keras_output, | |
| ml_tree_output, | |
| simplified_ml_output, | |
| summary_output, | |
| aligned_file, | |
| phy_file, | |
| tree_html_file, | |
| report_html_file, | |
| tree_html_content, | |
| report_html_content | |
| ) | |
| except Exception as e: | |
| error_msg = f"Pipeline error: {str(e)}" | |
| logging.error(error_msg) | |
| import traceback | |
| logging.error(f"Full traceback: {traceback.format_exc()}") | |
| return error_msg, "", "", "", "", None, None, None, None, error_msg, error_msg | |
| # --- Gradio Interface --- | |
| def create_interface(): | |
| """Create and configure the Gradio interface""" | |
| custom_css = """ | |
| .gradio-container { | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| } | |
| .gr-button-primary { | |
| background: linear-gradient(45deg, #1e3a8a, #3b82f6); | |
| border: none; | |
| border-radius: 8px; | |
| font-weight: 600; | |
| } | |
| .gr-button-primary:hover { | |
| background: linear-gradient(45deg, #1e40af, #2563eb); | |
| transform: translateY(-1px); | |
| box-shadow: 0 4px 12px rgba(59, 130, 246, 0.4); | |
| } | |
| .gr-textbox, .gr-textarea { | |
| border-radius: 8px; | |
| border: 2px solid #e5e7eb; | |
| } | |
| .gr-textbox:focus, .gr-textarea:focus { | |
| border-color: #3b82f6; | |
| box-shadow: 0 0 0 3px rgba(59, 130, 246, 0.1); | |
| } | |
| .warning-box { | |
| background: linear-gradient(135deg, #fef3c7, #fbbf24); | |
| border: 1px solid #f59e0b; | |
| border-radius: 8px; | |
| padding: 12px; | |
| margin: 8px 0; | |
| } | |
| .success-box { | |
| background: linear-gradient(135deg, #d1fae5, #10b981); | |
| border: 1px solid #059669; | |
| border-radius: 8px; | |
| padding: 12px; | |
| margin: 8px 0; | |
| } | |
| .error-box { | |
| background: linear-gradient(135deg, #fee2e2, #ef4444); | |
| border: 1px solid #dc2626; | |
| border-radius: 8px; | |
| padding: 12px; | |
| margin: 8px 0; | |
| } | |
| """ | |
| with gr.Blocks(css=custom_css, title="🧬 Advanced Gene Analysis Pipeline", theme=gr.themes.Soft()) as iface: | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 15px; margin-bottom: 20px;"> | |
| <h1 style="color: white; margin: 0; font-size: 2.5em; font-weight: 700;">🧬 Advanced Gene Analysis Pipeline</h1> | |
| <p style="color: rgba(255,255,255,0.9); margin: 10px 0 0 0; font-size: 1.2em;">F Gene Boundary Detection • Validation • Phylogenetic Analysis</p> | |
| </div> | |
| """) | |
| with gr.Accordion("📋 Instructions & Information", open=False): | |
| gr.HTML(""" | |
| <div style="background: #f8fafc; padding: 20px; border-radius: 10px; border-left: 4px solid #3b82f6;"> | |
| <h3 style="color: #1e40af; margin-top: 0;">🔬 Pipeline Overview</h3> | |
| <ol style="line-height: 1.6;"> | |
| <li><strong>F Gene Extraction:</strong> Uses boundary-aware model to identify and extract F gene regions</li> | |
| <li><strong>Gene Validation:</strong> Validates extracted sequence as F gene using deep learning</li> | |
| <li><strong>Phylogenetic Placement:</strong> Places sequence in reference phylogenetic tree (MAFFT + IQ-TREE)</li> | |
| <li><strong>Interactive Tree Analysis:</strong> Creates interactive phylogenetic tree with similar sequences</li> | |
| <li><strong>Detailed Report:</strong> Provides comprehensive analysis details</li> | |
| </ol> | |
| <h3 style="color: #1e40af;">📁 Input Requirements</h3> | |
| <ul style="line-height: 1.6;"> | |
| <li><strong>DNA Sequence:</strong> Minimum 100 bp for phylogenetic analysis</li> | |
| <li><strong>FASTA Format:</strong> Supported for file uploads</li> | |
| <li><strong>Similarity Score:</strong> 70-99% (default: 95%)</li> | |
| </ul> | |
| <h3 style="color: #1e40af;">⚙️ Dependencies</h3> | |
| <p style="background: #fef3c7; padding: 10px; border-radius: 5px; border-left: 3px solid #f59e0b;"> | |
| <strong>Required:</strong> MAFFT and IQ-TREE must be installed for phylogenetic analysis.<br> | |
| <strong>Installation:</strong> <code>conda install -c bioconda mafft iqtree</code> | |
| </p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.HTML("<h3 style='color: #1e40af; margin-bottom: 10px;'>📝 Sequence Input</h3>") | |
| with gr.Tabs(): | |
| with gr.TabItem("✍️ Text Input"): | |
| dna_input = gr.Textbox( | |
| label="DNA Sequence", | |
| placeholder="Enter your DNA sequence here (A, T, C, G, N)...", | |
| lines=6, | |
| value="" | |
| ) | |
| with gr.TabItem("📁 File Upload"): | |
| fasta_file = gr.File( | |
| label="Upload FASTA File", | |
| file_types=[".fasta", ".fa", ".fas", ".txt"], | |
| type="filepath" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.HTML("<h3 style='color: #1e40af; margin-bottom: 10px;'>⚙️ Analysis Settings</h3>") | |
| similarity_score = gr.Slider( | |
| minimum=30.0, | |
| maximum=99.0, | |
| value=95.0, | |
| step=1.0, | |
| label="Similarity Threshold (%)" | |
| ) | |
| build_ml_tree = gr.Checkbox( | |
| label="🌳 Enable Phylogenetic Placement", | |
| value=False | |
| ) | |
| with gr.Row(): | |
| analyze_text_btn = gr.Button( | |
| "🚀 Analyze Text Input", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| analyze_file_btn = gr.Button( | |
| "📁 Analyze File", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| gr.HTML("<hr style='margin: 30px 0; border: none; height: 2px; background: linear-gradient(to right, #3b82f6, #8b5cf6);'>") | |
| gr.HTML("<h2 style='color: #1e40af; text-align: center; margin-bottom: 20px;'>📊 Analysis Results</h2>") | |
| with gr.Tabs(): | |
| with gr.TabItem("🎯 F Gene Extraction"): | |
| f_gene_output = gr.Textbox( | |
| label="Extracted F Gene Sequence", | |
| lines=8 | |
| ) | |
| with gr.TabItem("✅ Gene Validation"): | |
| keras_output = gr.Textbox( | |
| label="F Gene Validation Result", | |
| lines=3 | |
| ) | |
| with gr.TabItem("🌳 Phylogenetic Placement"): | |
| ml_tree_output = gr.Textbox( | |
| label="Phylogenetic Placement Results", | |
| lines=10 | |
| ) | |
| with gr.TabItem("🔬 Interactive Tree"): | |
| tree_analysis_output = gr.Textbox( | |
| label="Tree Analysis Status", | |
| lines=5 | |
| ) | |
| tree_html_display = gr.HTML( | |
| label="Interactive Phylogenetic Tree", | |
| value="<div style='text-align: center; color: #6b7280; padding: 40px;'>No tree generated yet. Run analysis to create interactive tree.</div>" | |
| ) | |
| with gr.TabItem("📝 Detailed Report"): | |
| report_html_display = gr.HTML( | |
| label="Detailed Analysis Report", | |
| value="<div style='text-align: center; color: #6b7280; padding: 40px;'>No report generated yet. Run analysis to create detailed report.</div>" | |
| ) | |
| with gr.TabItem("📋 Summary"): | |
| summary_output = gr.Textbox( | |
| label="Analysis Summary", | |
| lines=12 | |
| ) | |
| with gr.Accordion("💾 Download Results", open=False): | |
| with gr.Row(): | |
| alignment_file = gr.File( | |
| label="📄 Download Alignment", | |
| visible=True | |
| ) | |
| tree_file = gr.File( | |
| label="🌳 Download Tree", | |
| visible=True | |
| ) | |
| html_tree_file = gr.File( | |
| label="🌐 Download Interactive Tree (HTML)", | |
| visible=True | |
| ) | |
| report_file = gr.File( | |
| label="📝 Download Detailed Report (HTML)", | |
| visible=True | |
| ) | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 20px; margin-top: 30px; border-top: 2px solid #e5e7eb; color: #6b7280;"> | |
| <p style="margin: 0;">🧬 Advanced Gene Analysis Pipeline | Powered by Deep Learning & Phylogenetics</p> | |
| <p style="margin: 5px 0 0 0; font-size: 0.9em;">Built with Gradio • MAFFT • IQ-TREE • TensorFlow</p> | |
| </div> | |
| """) | |
| analyze_text_btn.click( | |
| fn=run_pipeline, | |
| inputs=[dna_input, similarity_score, build_ml_tree], | |
| outputs=[ | |
| f_gene_output, | |
| keras_output, | |
| ml_tree_output, | |
| tree_analysis_output, | |
| summary_output, | |
| alignment_file, | |
| tree_file, | |
| html_tree_file, | |
| report_file, | |
| tree_html_display, | |
| report_html_display | |
| ] | |
| ) | |
| analyze_file_btn.click( | |
| fn=run_pipeline_from_file, | |
| inputs=[fasta_file, similarity_score, build_ml_tree], | |
| outputs=[ | |
| f_gene_output, | |
| keras_output, | |
| ml_tree_output, | |
| tree_analysis_output, | |
| summary_output, | |
| alignment_file, | |
| tree_file, | |
| html_tree_file, | |
| report_file, | |
| tree_html_display, | |
| report_html_display | |
| ] | |
| ) | |
| return iface | |
| # --- Main Execution --- | |
| if __name__ == "__main__": | |
| try: | |
| # Print startup information | |
| print("🧬 Advanced Gene Analysis Pipeline") | |
| print("=" * 50) | |
| print(f"Base Directory: {BASE_DIR}") | |
| print(f"Boundary Model: {'✅ Loaded' if boundary_model else '❌ Not Available'}") | |
| print(f"Keras Model: {'✅ Loaded' if keras_model else '❌ Not Available'}") | |
| print(f"Tree Analyzer: {'✅ Loaded' if analyzer else '❌ Not Available'}") | |
| # Check tool availability | |
| mafft_available, iqtree_available, mafft_cmd, iqtree_cmd = check_tool_availability() | |
| print(f"MAFFT: {'✅ Available' if mafft_available else '❌ Not Found'}") | |
| print(f"IQ-TREE: {'✅ Available' if iqtree_available else '❌ Not Found'}") | |
| if not mafft_available or not iqtree_available: | |
| print("\n⚠️ Warning: Some phylogenetic tools are missing!") | |
| print("Install with: conda install -c bioconda mafft iqtree") | |
| print("\n🚀 Starting Gradio interface...") | |
| # Create and launch interface | |
| iface = create_interface() | |
| iface.queue().launch( | |
| share=False, # Set to True if you want to create a public link | |
| server_name="0.0.0.0", # Allow connections from any IP | |
| server_port=7860, # Default Gradio port | |
| show_error=True # Show errors in the interface | |
| ) | |
| except Exception as e: | |
| logging.error(f"Failed to start application: {e}") | |
| import traceback | |
| print(f"Error: {e}") | |
| print(f"Traceback: {traceback.format_exc()}") | |
| sys.exit(1) | |