import os import json from typing import List, Dict, Tuple import time import streamlit as st import requests # 선택적 의존성 가드 try: import torch TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False print("[WARNING] torch not available") try: from transformers import AutoTokenizer, AutoModel, AutoModelForMaskedLM TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False print("[WARNING] transformers not available") try: from datasets import load_dataset DATASETS_AVAILABLE = True except ImportError: DATASETS_AVAILABLE = False print("[WARNING] datasets not available") try: from sentence_transformers import SentenceTransformer SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: SENTENCE_TRANSFORMERS_AVAILABLE = False print("[WARNING] sentence_transformers not available") try: import faiss FAISS_AVAILABLE = True except ImportError: FAISS_AVAILABLE = False print("[WARNING] faiss not available") try: from Bio import SeqIO BIOPYTHON_AVAILABLE = True except ImportError: BIOPYTHON_AVAILABLE = False print("[WARNING] biopython not available") # PDF 지원 라이브러리 try: import pdfplumber PDFPLUMBER_AVAILABLE = True except ImportError: PDFPLUMBER_AVAILABLE = False print("[WARNING] pdfplumber not available") try: import PyPDF2 PYPDF2_AVAILABLE = True except ImportError: PYPDF2_AVAILABLE = False print("[WARNING] PyPDF2 not available") # 상수 APP_TITLE = "BioSeq Chat Pro: Advanced Collaborative AI System" DISCLAIMER = "This tool is for research/education and is not a medical device. Do not use outputs for diagnosis or treatment decisions." # --------------- Helper Functions --------------- def get_secret(name: str, fallback: str = "") -> str: """Get secret from st.secrets or environment""" try: if hasattr(st, 'secrets') and name in st.secrets: return st.secrets[name] except: pass return os.environ.get(name, fallback) def brave_search(query: str, count: int = 5) -> List[Dict]: """Brave Search API""" key = get_secret("BRAVE_API_KEY", "") if not key: return [{ "title": "BRAVE_API_KEY missing", "url": "", "snippet": "Set BRAVE_API_KEY in Space secrets or sidebar" }] url = "https://api.search.brave.com/res/v1/web/search" headers = { "Accept": "application/json", "X-Subscription-Token": key } params = {"q": query, "count": count} try: r = requests.get(url, headers=headers, params=params, timeout=15) r.raise_for_status() data = r.json() results = [] for item in data.get("web", {}).get("results", [])[:count]: results.append({ "title": item.get("title", ""), "url": item.get("url", ""), "snippet": item.get("description", "") }) return results if results else [{"title": "No results", "url": "", "snippet": ""}] except Exception as e: return [{"title": "Error", "url": "", "snippet": str(e)}] def call_llm(messages: List[Dict], temperature: float = 0.6, max_tokens: int = 8000) -> str: """Call Fireworks AI API with increased token limit""" api_key = get_secret("FIREWORKS_API_KEY", "") if not api_key: return "FIREWORKS_API_KEY missing. Set it in Secrets or sidebar." url = "https://api.fireworks.ai/inference/v1/chat/completions" payload = { "model": "accounts/fireworks/models/llama-v3p1-70b-instruct", "messages": messages, "max_tokens": max_tokens, # 8000으로 증가 "temperature": temperature, "top_p": 1, "frequency_penalty": 0, "presence_penalty": 0 } headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } try: r = requests.post(url, headers=headers, json=payload, timeout=120) r.raise_for_status() return r.json()["choices"][0]["message"]["content"] except Exception as e: return f"[LLM Error] {e}" def collaborative_answer(query: str, context: str, collaboration_type: str = "full") -> Dict[str, str]: """ 협업 AI 시스템: 감독자, 비평자, 조사자가 협력하여 답변 생성 Args: query: 사용자 질문 context: 검색된 문맥 정보 collaboration_type: "full" (전체 협업), "quick" (빠른 답변), "deep" (심층 분석) Returns: 각 역할자의 기여와 최종 답변을 포함한 딕셔너리 """ # 1. 조사자(Investigator) - 사실 수집 및 검증 investigator_prompt = f"""You are an INVESTIGATOR specializing in bioinformatics fact-checking. Context: {context} Question: {query} Your task: 1. Extract and verify all relevant facts from the context 2. Identify any missing information that would improve the answer 3. Flag any potentially conflicting or uncertain information 4. Suggest additional areas for research 5. Provide confidence scores for key facts (0-100%) Format your response with: - VERIFIED FACTS: (with confidence scores) - UNCERTAIN AREAS: - MISSING INFORMATION: - RESEARCH SUGGESTIONS: - KEY CITATIONS:""" investigator_msg = [ {"role": "system", "content": "You are a meticulous scientific fact-checker and researcher."}, {"role": "user", "content": investigator_prompt} ] investigator_response = call_llm(investigator_msg, temperature=0.2, max_tokens=2000) # 2. 감독자(Supervisor) - 구조화된 답변 생성 supervisor_prompt = f"""You are a SUPERVISOR creating a comprehensive answer. Question: {query} Context: {context} Investigator's Analysis: {investigator_response} Your task: 1. Create a well-structured, scientifically accurate answer 2. Include: - Executive Summary (2-3 sentences) - Background & Context - Detailed Explanation with subsections - Practical Applications - Current Research Status - Future Perspectives 3. Use clear headings and logical flow 4. Integrate verified facts from the investigator 5. Aim for 500-1000 words minimum 6. Include relevant examples and analogies Format with clear markdown headers and bullet points where appropriate.""" supervisor_msg = [ {"role": "system", "content": "You are an expert bioinformatics educator who creates comprehensive, well-structured scientific explanations."}, {"role": "user", "content": supervisor_prompt} ] supervisor_response = call_llm(supervisor_msg, temperature=0.4, max_tokens=3500) # 3. 비평자(Critic) - 품질 검증 및 개선 critic_prompt = f"""You are a CRITIC reviewing the following answer for scientific accuracy. Original Question: {query} Supervisor's Answer: {supervisor_response} Investigator's Facts: {investigator_response} Your task: 1. Check for scientific accuracy and completeness 2. Identify any errors, omissions, or unclear explanations 3. Verify that all claims are properly supported 4. Assess the answer's clarity and accessibility 5. Suggest specific improvements 6. Provide a quality score (0-100) Format your critique: - ACCURACY ASSESSMENT: - COMPLETENESS CHECK: - CLARITY EVALUATION: - ERRORS/ISSUES FOUND: - IMPROVEMENT SUGGESTIONS: - QUALITY SCORE: X/100""" critic_msg = [ {"role": "system", "content": "You are a rigorous scientific peer reviewer specializing in bioinformatics."}, {"role": "user", "content": critic_prompt} ] critic_response = call_llm(critic_msg, temperature=0.3, max_tokens=1500) # 4. 최종 통합 답변 (Final Integration) if collaboration_type == "full": integration_prompt = f"""Create the FINAL INTEGRATED ANSWER incorporating all feedback. Question: {query} Supervisor's Answer: {supervisor_response} Critic's Feedback: {critic_response} Verified Facts: {investigator_response} Create a polished, final answer that: 1. Addresses all critic's concerns 2. Maintains scientific rigor 3. Includes proper citations 4. Uses clear structure with markdown formatting 5. Provides comprehensive coverage (800-1500 words) 6. Includes a TL;DR section at the beginning 7. Ends with key takeaways and further reading suggestions Use Korean if the question is in Korean, otherwise English.""" integration_msg = [ {"role": "system", "content": "You are a master science communicator creating the definitive answer by integrating all expert inputs."}, {"role": "user", "content": integration_prompt} ] final_answer = call_llm(integration_msg, temperature=0.35, max_tokens=8000) else: final_answer = supervisor_response return { "investigator": investigator_response, "supervisor": supervisor_response, "critic": critic_response, "final": final_answer } def load_file_text(upload) -> str: """Load text from uploaded file (PDF 지원 포함)""" name = upload.name.lower() # PDF 처리 if name.endswith(".pdf"): if PDFPLUMBER_AVAILABLE: try: text_parts = [] with pdfplumber.open(upload) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) return "\n\n".join(text_parts) except Exception as e: st.error(f"PDF 읽기 오류 (pdfplumber): {e}") return "" elif PYPDF2_AVAILABLE: try: upload.seek(0) pdf_reader = PyPDF2.PdfReader(upload) text_parts = [] for page_num in range(len(pdf_reader.pages)): page = pdf_reader.pages[page_num] text_parts.append(page.extract_text()) return "\n\n".join(text_parts) except Exception as e: st.error(f"PDF 읽기 오류 (PyPDF2): {e}") return "" else: st.error("PDF 파일을 읽으려면 pdfplumber 또는 PyPDF2가 필요합니다") return "" # 기존 텍스트 파일 처리 try: content = upload.read() text = content.decode("utf-8", errors="ignore") except: return "" # FASTA handling if name.endswith((".fa", ".fasta", ".faa", ".fna")) and BIOPYTHON_AVAILABLE: try: upload.seek(0) records = list(SeqIO.parse(upload, "fasta")) seqs = [f">{r.id}\n{str(r.seq)}" for r in records] return "\n\n".join(seqs) except: pass return text def chunk_text(text: str, size: int = 1500, overlap: int = 300) -> List[str]: """Split text into chunks with larger size for better context""" chunks = [] start = 0 text_len = len(text) while start < text_len: end = min(start + size, text_len) chunks.append(text[start:end]) if end >= text_len: break start = end - overlap return chunks def build_index(texts: List[str]): """Build vector index with better model""" if not SENTENCE_TRANSFORMERS_AVAILABLE or not FAISS_AVAILABLE: return None, None try: # 더 나은 임베딩 모델 사용 model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") embeddings = model.encode(texts, show_progress_bar=False) dim = embeddings.shape[1] index = faiss.IndexFlatIP(dim) index.add(embeddings.astype("float32")) return index, model except Exception as e: st.warning(f"Index build failed: {e}") return None, None def search_index(query: str, index, model, texts: List[str], k: int = 5) -> List[Dict]: """Search vector index with more results""" if index is None or model is None: return [] try: q_emb = model.encode([query]) D, I = index.search(q_emb.astype("float32"), k) results = [] for idx, score in zip(I[0], D[0]): if 0 <= idx < len(texts): results.append({ "score": float(score), "text": texts[idx] }) return results except: return [] def build_context(query: str, docs: List[str], index, model, use_web: bool, web_k: int) -> Tuple[str, List[Dict]]: """Build enhanced context from sources""" pieces = [] sources = [] # File search with more results if index and model and docs: hits = search_index(query, index, model, docs, k=6) for h in hits: pieces.append(f"[FILE SOURCE] {h['text'][:800]}") sources.append({"type": "file", "text": h['text'][:150], "score": h['score']}) # Web search with scientific focus if use_web: # 과학적 키워드 추가 scientific_query = f"{query} scientific research pubmed nature science" results = brave_search(scientific_query, count=web_k) for r in results: pieces.append(f"[WEB SOURCE] {r['title']}\n{r['snippet']}") sources.append({"type": "web", "title": r['title'], "url": r['url']}) context = "\n\n---\n\n".join(pieces)[:6000] # 컨텍스트 크기 증가 return context, sources # Enhanced analysis functions def esm2_embed(seq: str, model_name: str = "facebook/esm2_t6_8M_UR50D") -> Dict: """Enhanced ESM-2 protein embedding with more analysis""" if not TORCH_AVAILABLE or not TRANSFORMERS_AVAILABLE: return {"error": "PyTorch/Transformers not available"} try: tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForMaskedLM.from_pretrained(model_name) model.eval() with torch.no_grad(): inputs = tokenizer(seq, return_tensors="pt", truncation=True, max_length=1024) outputs = model(**inputs, output_hidden_states=True) hidden = outputs.hidden_states[-1].mean(dim=1).squeeze(0) vec = hidden.cpu().numpy() # 추가 분석 attention_weights = outputs.hidden_states[-1].std(dim=1).squeeze(0).cpu().numpy() # 메모리 정리 del model del tokenizer if torch.cuda.is_available(): torch.cuda.empty_cache() return { "embedding": vec.tolist()[:10], "size": vec.shape[0], "mean": float(vec.mean()), "std": float(vec.std()), "attention_peaks": attention_weights.tolist()[:10] } except Exception as e: return {"error": str(e)} def dna_embed(seq: str, model_name: str = "zhihan1996/DNABERT-2-117M") -> Dict: """Enhanced DNA embedding with k-mer analysis""" if not TORCH_AVAILABLE or not TRANSFORMERS_AVAILABLE: return {"error": "PyTorch/Transformers not available"} try: # einops 체크 try: import einops except ImportError: return {"error": "einops package required. Please wait for installation and refresh the page."} # k-mer 변환 함수 def seq_to_kmer(seq, k=6): kmers = [] for i in range(len(seq) - k + 1): kmers.append(seq[i:i+k]) return ' '.join(kmers) # 모델 로딩 시도 try: from transformers import AutoTokenizer, AutoModel tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) model = AutoModel.from_pretrained(model_name, trust_remote_code=True) except Exception as model_error: # 대체 모델 사용 try: from transformers import BertTokenizer, BertModel fallback_model = "bert-base-uncased" tokenizer = BertTokenizer.from_pretrained(fallback_model) model = BertModel.from_pretrained(fallback_model) st.warning(f"DNABERT-2 로딩 실패. 대체 모델 사용중: {fallback_model}") except: return {"error": f"모델 로딩 실패: {str(model_error)}"} model.eval() # k-mer 변환 if len(seq) > 6: input_seq = seq_to_kmer(seq, k=6) kmer_count = len(seq) - 5 else: input_seq = seq kmer_count = 1 with torch.no_grad(): inputs = tokenizer( input_seq, return_tensors="pt", truncation=True, max_length=512, padding=True ) outputs = model(**inputs) if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: vec = outputs.pooler_output.squeeze(0).cpu().numpy() else: hidden = outputs.last_hidden_state.mean(dim=1).squeeze(0) vec = hidden.cpu().numpy() # 메모리 정리 del model del tokenizer if torch.cuda.is_available(): torch.cuda.empty_cache() return { "embedding": vec.tolist()[:10], "size": vec.shape[0], "kmer_count": kmer_count, "mean": float(vec.mean()), "std": float(vec.std()) } except Exception as e: return {"error": f"분석 중 오류 발생: {str(e)[:200]}"} # --------------- Streamlit UI --------------- st.set_page_config(page_title=APP_TITLE, page_icon="🧬", layout="wide") st.title(APP_TITLE) st.caption(DISCLAIMER) # Session state init if "docs" not in st.session_state: st.session_state.docs = [] if "index" not in st.session_state: st.session_state.index = None if "model" not in st.session_state: st.session_state.model = None if "chat_history" not in st.session_state: st.session_state.chat_history = [] # Sidebar with st.sidebar: st.header("⚙️ Configuration") fw_key = st.text_input( "FIREWORKS_API_KEY", value=get_secret("FIREWORKS_API_KEY", ""), type="password", help="Required for AI responses" ) brave_key = st.text_input( "BRAVE_API_KEY", value=get_secret("BRAVE_API_KEY", ""), type="password", help="Required for web search" ) if fw_key: os.environ["FIREWORKS_API_KEY"] = fw_key if brave_key: os.environ["BRAVE_API_KEY"] = brave_key st.divider() st.subheader("🤖 AI Models") esm_model = st.text_input( "ESM-2 Model", value="facebook/esm2_t6_8M_UR50D", help="Protein analysis model" ) dna_model = st.text_input( "DNA Model", value="bert-base-uncased", help="DNA analysis model" ) st.divider() st.subheader("🔍 Search Settings") use_web = st.checkbox("Enable web search", value=True) web_results = st.slider("Web results", 1, 10, 5) st.divider() st.subheader("🎭 Collaboration Mode") collab_mode = st.radio( "AI Collaboration Type", ["full", "quick", "deep"], index=0, help="Full: Complete collaboration\nQuick: Fast response\nDeep: In-depth analysis" ) # Tabs tab1, tab2, tab3, tab4, tab5 = st.tabs(["💬 Chat", "🧬 Protein", "🧬 DNA", "📊 Analysis", "ℹ️ About"]) # File upload with st.expander("📁 Upload Files", expanded=True): files = st.file_uploader( "Upload text/FASTA/PDF files", type=["txt", "fa", "fasta", "csv", "json", "pdf"], accept_multiple_files=True, help="Support for multiple file types including PDF" ) if files: docs = [] for f in files: try: if f.name.lower().endswith(".pdf"): if not (PDFPLUMBER_AVAILABLE or PYPDF2_AVAILABLE): st.warning(f"⚠️ PDF support requires: pip install pdfplumber") continue text = load_file_text(f) if text: docs.extend(chunk_text(text)) st.success(f"✅ {f.name} loaded ({len(text)} chars)") except Exception as e: st.error(f"Error reading {f.name}: {e}") if docs: st.session_state.docs = docs st.info(f"📚 Total chunks created: {len(docs)}") if SENTENCE_TRANSFORMERS_AVAILABLE and FAISS_AVAILABLE: with st.spinner("Building semantic index..."): index, model = build_index(docs) if index: st.session_state.index = index st.session_state.model = model st.success("✅ Index built successfully") # Chat tab with collaborative AI with tab1: st.subheader("💬 Advanced Collaborative Chat") # 협업 시스템 설명 with st.expander("🎭 How Collaborative AI Works", expanded=False): st.markdown(""" ### Three AI Experts Work Together: 1. **🔍 Investigator**: Fact-checks and verifies information 2. **📝 Supervisor**: Creates structured, comprehensive answers 3. **✅ Critic**: Reviews for accuracy and clarity 4. **🎯 Integrator**: Combines all inputs for the final answer This system ensures maximum accuracy and comprehensiveness. """) question = st.text_area( "Ask about proteins, DNA, or any bioinformatics topic:", value="Explain how AlphaFold revolutionized protein structure prediction and its impact on drug discovery.", height=100 ) col1, col2 = st.columns([3, 1]) with col1: answer_button = st.button("🚀 Get Collaborative Answer", type="primary", use_container_width=True) with col2: show_process = st.checkbox("Show process", value=False, help="Display each AI's contribution") if answer_button: if not get_secret("FIREWORKS_API_KEY"): st.error("⚠️ Please set FIREWORKS_API_KEY") else: # Progress tracking progress_bar = st.progress(0) status_text = st.empty() with st.spinner("🔍 Building knowledge base..."): status_text.text("Searching sources...") progress_bar.progress(10) context, sources = build_context( question, st.session_state.docs, st.session_state.index, st.session_state.model, use_web, web_results ) progress_bar.progress(20) status_text.text("Collaborative AI system working...") # Get collaborative answer start_time = time.time() collaborative_result = collaborative_answer( question, context, collaboration_type=collab_mode ) elapsed_time = time.time() - start_time progress_bar.progress(100) status_text.text(f"✅ Completed in {elapsed_time:.1f} seconds") # Display results if show_process: # Show each AI's contribution with st.expander("🔍 Investigator's Analysis", expanded=False): st.markdown(collaborative_result["investigator"]) with st.expander("📝 Supervisor's Draft", expanded=False): st.markdown(collaborative_result["supervisor"]) with st.expander("✅ Critic's Review", expanded=False): st.markdown(collaborative_result["critic"]) # Final answer st.markdown("### 🎯 Final Integrated Answer") st.markdown(collaborative_result["final"]) # Sources if sources: with st.expander("📚 Sources & References", expanded=False): for s in sources: if s["type"] == "web": st.write(f"- 🌐 [{s['title']}]({s['url']})") elif s["type"] == "file": st.write(f"- 📄 File: {s['text'][:100]}... (Score: {s.get('score', 0):.2f})") # Save to history st.session_state.chat_history.append({ "question": question, "answer": collaborative_result["final"], "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "mode": collab_mode }) # Feedback col1, col2, col3 = st.columns(3) with col1: if st.button("👍 Helpful"): st.success("Thank you for your feedback!") with col2: if st.button("👎 Not helpful"): st.info("We'll work on improving our responses.") with col3: if st.button("💾 Save Answer"): st.download_button( label="Download", data=collaborative_result["final"], file_name=f"bioseq_answer_{time.strftime('%Y%m%d_%H%M%S')}.md", mime="text/markdown" ) # Enhanced Protein tab with tab2: st.subheader("🧬 Advanced Protein Analysis") with st.expander("📚 Learn About Protein Analysis", expanded=False): st.markdown(""" ### What is Protein Sequence Analysis? **Proteins** are the workhorses of cells, performing nearly every function necessary for life: - 🧪 **Enzymes**: Catalyze chemical reactions - 🛡️ **Antibodies**: Defend against pathogens - 🚚 **Transporters**: Move molecules across membranes - 📡 **Receptors**: Receive and transmit signals **ESM-2** (Evolutionary Scale Modeling) is Meta's breakthrough AI that: - Trained on 65 million protein sequences - Predicts structure and function from sequence alone - Enables drug discovery and protein engineering """) protein_seq = st.text_area( "Enter protein sequence (single letter amino acid code):", value="MKTIIALSYIFCLVFA", help="Standard amino acids: A,C,D,E,F,G,H,I,K,L,M,N,P,Q,R,S,T,V,W,Y", height=100 ) # Example sequences st.markdown("**🧪 Example Sequences (Click to copy):**") col1, col2, col3, col4 = st.columns(4) with col1: if st.button("💉 Insulin", key="ins"): st.code("FVNQHLCGSHLVEALYLVCGERGFFYTPKT", language=None) with col2: if st.button("😊 Endorphin", key="end"): st.code("YGGFMTSEKSQTPLVTLFKNAIIKNAYKKGE", language=None) with col3: if st.button("❤️ Oxytocin", key="oxy"): st.code("CYIQNCPLG", language=None) with col4: if st.button("🦠 Lysozyme", key="lys"): st.code("KVFGRCELAAAMKRHGLDNYRGYSLGNWVCAAKFESNFNTQATNR", language=None) if st.button("🔬 Analyze Protein", type="primary", use_container_width=True): seq = protein_seq.strip().upper() # Validation valid_aa = set("ACDEFGHIKLMNPQRSTVWY") invalid = set(seq) - valid_aa if invalid: st.warning(f"⚠️ Invalid amino acids detected: {', '.join(invalid)}") seq = ''.join([aa for aa in seq if aa in valid_aa]) if len(seq) < 3: st.error("Sequence too short. Please enter at least 3 amino acids.") else: # Basic analysis st.markdown("### 📊 Sequence Statistics") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Length", f"{len(seq)} aa") st.metric("Mol. Weight", f"~{len(seq) * 110:.1f} Da") with col2: unique_aa = len(set(seq)) st.metric("Unique AA", f"{unique_aa}/20") charged = sum(1 for aa in seq if aa in "DEKR") st.metric("Charged", f"{charged/len(seq)*100:.1f}%") with col3: hydrophobic = sum(1 for aa in seq if aa in "AVILMFYW") st.metric("Hydrophobic", f"{hydrophobic/len(seq)*100:.1f}%") aromatic = sum(1 for aa in seq if aa in "FWY") st.metric("Aromatic", f"{aromatic/len(seq)*100:.1f}%") with col4: basic = sum(1 for aa in seq if aa in "KRH") acidic = sum(1 for aa in seq if aa in "DE") pi_estimate = 7 + (basic - acidic) * 0.5 st.metric("pI (est.)", f"~{pi_estimate:.1f}") st.metric("Basic/Acidic", f"{basic}/{acidic}") # Secondary structure prediction (simplified) st.markdown("### 🔮 Predicted Properties") col1, col2 = st.columns(2) with col1: # Helix propensity helix_aa = "AELMQKRH" helix_score = sum(1 for aa in seq if aa in helix_aa) / len(seq) st.metric("α-Helix Propensity", f"{helix_score*100:.1f}%") # Beta propensity beta_aa = "FIVWY" beta_score = sum(1 for aa in seq if aa in beta_aa) / len(seq) st.metric("β-Sheet Propensity", f"{beta_score*100:.1f}%") with col2: # Disorder prediction disorder_aa = "PESKTQ" disorder_score = sum(1 for aa in seq if aa in disorder_aa) / len(seq) st.metric("Disorder Tendency", f"{disorder_score*100:.1f}%") # Solubility estimate soluble_score = 100 - (hydrophobic/len(seq)*100) st.metric("Solubility Score", f"{soluble_score:.1f}%") # AI Analysis if TORCH_AVAILABLE and TRANSFORMERS_AVAILABLE: st.markdown("### 🤖 AI-Powered Analysis") with st.spinner("Running ESM-2 analysis... This may take 10-30 seconds"): result = esm2_embed(seq, esm_model) if "error" in result: st.error(f"Analysis failed: {result['error']}") else: st.success("✅ AI analysis complete!") col1, col2, col3 = st.columns(3) with col1: st.metric("Embedding Dimension", result['size']) with col2: st.metric("Mean Value", f"{result.get('mean', 0):.3f}") with col3: st.metric("Std Dev", f"{result.get('std', 0):.3f}") # Visualization placeholder st.markdown("**🎨 Embedding Visualization:**") st.info("The protein has been encoded into a high-dimensional space where similar proteins cluster together.") # Applications st.markdown(""" ### 🎯 Applications of This Analysis: 1. **🔍 Similar Protein Search**: Find proteins with similar functions 2. **💊 Drug Target Identification**: Predict binding sites and interactions 3. **🧬 Mutation Impact**: Assess how changes affect protein function 4. **🏗️ Structure Prediction**: Input for AlphaFold-like systems 5. **⚗️ Protein Engineering**: Design improved variants """) else: st.warning("⚠️ AI models are loading. Please refresh in a moment.") # Enhanced DNA tab with tab3: st.subheader("🧬 Advanced DNA Analysis") with st.expander("📚 Learn About DNA Analysis", expanded=False): st.markdown(""" ### Understanding DNA Sequences **DNA** is the blueprint of life, encoding all genetic information in four bases: - **A** (Adenine): Pairs with T - **T** (Thymine): Pairs with A - **G** (Guanine): Pairs with C - **C** (Cytosine): Pairs with G **Key Concepts:** - **Gene**: A DNA segment that codes for a protein - **Promoter**: Controls when genes are turned on/off - **Codon**: Three bases that code for one amino acid - **GC Content**: Affects stability and gene expression **DNABERT-2** is an AI model that understands DNA "language" to predict: - Gene function - Regulatory elements - Disease-causing mutations - Evolution patterns """) dna_seq = st.text_area( "Enter DNA sequence:", value="ATGCGATCGTAGC", help="Use A, T, G, C for DNA (U will be converted to T for RNA)", height=100 ) # Example sequences st.markdown("**🧪 Example Sequences (Click to analyze):**") col1, col2, col3, col4 = st.columns(4) with col1: if st.button("📋 TATA Box", key="tata"): st.code("TATAAAAGCGCGCGCG", language=None) st.caption("Gene start signal") with col2: if st.button("🎯 Promoter", key="prom"): st.code("TTGACAGGCTAGCTCAGTCCTAGGTATAATGCTAGC", language=None) st.caption("Gene control region") with col3: if st.button("✂️ CRISPR", key="crispr"): st.code("GTCACCTCCAATGACTAGGGTGG", language=None) st.caption("Gene editing target") with col4: if st.button("🧬 Telomere", key="telo"): st.code("TTAGGGTTAGGGTTAGGG", language=None) st.caption("Chromosome end") if st.button("🔬 Analyze DNA", type="primary", use_container_width=True): seq = dna_seq.strip().upper().replace("U", "T") seq = ''.join(c for c in seq if c in 'ATGC') if len(seq) < 3: st.error("Sequence too short. Please enter at least 3 bases.") else: # Advanced statistics st.markdown("### 📊 Sequence Analysis") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Length", f"{len(seq)} bp") st.metric("Size", f"~{len(seq)*660:.0f} Da") with col2: gc = (seq.count("G") + seq.count("C")) / len(seq) * 100 st.metric("GC Content", f"{gc:.1f}%") if gc > 65: st.caption("🔴 Very high") elif gc > 55: st.caption("🟠 High") elif gc < 35: st.caption("🔵 Low") elif gc < 25: st.caption("🟣 Very low") else: st.caption("🟢 Normal") with col3: at = 100 - gc st.metric("AT Content", f"{at:.1f}%") tm = 4 * (seq.count("G") + seq.count("C")) + 2 * (seq.count("A") + seq.count("T")) st.metric("Tm (est.)", f"{tm}°C") with col4: cpg = seq.count("CG") cpg_ratio = (cpg * len(seq)) / (seq.count("C") * seq.count("G")) if seq.count("C") * seq.count("G") > 0 else 0 st.metric("CpG Sites", cpg) st.metric("CpG O/E", f"{cpg_ratio:.2f}") # Motif search st.markdown("### 🔍 Regulatory Elements & Motifs") motifs_found = [] motif_positions = [] # Extended motif database motif_db = { "TATA Box": ["TATAAA", "TATAWAW"], "CAAT Box": ["CAAT", "CCAAT", "GGCCAATCT"], "GC Box": ["GGGCGG", "GGCGGG"], "Start Codon": ["ATG"], "Stop Codons": ["TAA", "TAG", "TGA"], "Kozak Sequence": ["GCCRCCATGG"], "Poly-A Signal": ["AATAAA", "ATTAAA"], "E-box": ["CANNTG"], "CRE": ["TGACGTCA"], "NF-κB": ["GGGACTTTCC"] } for motif_name, patterns in motif_db.items(): for pattern in patterns: # Simple pattern matching (R=A/G, W=A/T, N=any) simple_pattern = pattern.replace("R", "[AG]").replace("W", "[AT]").replace("N", "[ATGC]") import re if re.search(simple_pattern, seq): motifs_found.append(f"✅ {motif_name}: {pattern}") break if motifs_found: for motif in motifs_found: st.write(motif) else: st.info("No known regulatory motifs detected") # Codon analysis if len(seq) >= 3: st.markdown("### 🧬 Coding Potential Analysis") col1, col2 = st.columns(2) with col1: # Reading frames st.markdown("**Open Reading Frames:**") for frame in range(3): frame_seq = seq[frame:] if "ATG" in frame_seq: start_pos = frame_seq.index("ATG") + frame st.write(f"Frame {frame+1}: Start at position {start_pos+1}") with col2: # Codon usage if len(seq) % 3 == 0: st.markdown("**Codon Statistics:**") codon_count = len(seq) // 3 st.metric("Total Codons", codon_count) # Count stops stops = seq.count("TAA") + seq.count("TAG") + seq.count("TGA") st.metric("Stop Codons", stops) # AI Analysis if TORCH_AVAILABLE and TRANSFORMERS_AVAILABLE: st.markdown("### 🤖 AI-Powered Genomic Analysis") with st.spinner("Running DNABERT analysis... This may take 10-30 seconds"): result = dna_embed(seq, dna_model) if "error" in result: st.error(f"Analysis failed: {result['error']}") else: st.success("✅ AI analysis complete!") col1, col2, col3 = st.columns(3) with col1: st.metric("Embedding Dimension", result['size']) with col2: st.metric("k-mer Count", result.get('kmer_count', 'N/A')) with col3: st.metric("Mean Value", f"{result.get('mean', 0):.3f}") st.markdown(""" ### 🎯 Applications of DNA Analysis: 1. **🔬 Gene Discovery**: Identify coding and regulatory regions 2. **🏥 Disease Diagnosis**: Detect pathogenic mutations 3. **✂️ CRISPR Design**: Find optimal gene editing sites 4. **🌱 Evolution Studies**: Compare sequences across species 5. **💊 Personalized Medicine**: Tailor treatments to genetic profiles 6. **🦠 Pathogen Detection**: Identify viral/bacterial DNA """) else: st.warning("⚠️ AI models are loading. Please refresh in a moment.") # Analysis History tab with tab4: st.subheader("📊 Analysis History & Insights") if st.session_state.chat_history: st.markdown(f"### 💾 Previous Analyses ({len(st.session_state.chat_history)} total)") for i, entry in enumerate(reversed(st.session_state.chat_history[-5:])): with st.expander(f"🕐 {entry['timestamp']} - Mode: {entry['mode']}", expanded=False): st.markdown("**Question:**") st.write(entry['question']) st.markdown("**Answer:**") st.write(entry['answer'][:500] + "..." if len(entry['answer']) > 500 else entry['answer']) if st.button(f"View Full", key=f"view_{i}"): st.markdown(entry['answer']) else: st.info("No analysis history yet. Start by asking a question in the Chat tab!") # Export options if st.session_state.chat_history: st.markdown("### 📤 Export Options") col1, col2 = st.columns(2) with col1: if st.button("Export as Markdown"): md_content = "\n\n---\n\n".join([ f"## {entry['timestamp']}\n\n**Q:** {entry['question']}\n\n**A:** {entry['answer']}" for entry in st.session_state.chat_history ]) st.download_button( "Download MD", md_content, f"bioseq_history_{time.strftime('%Y%m%d')}.md", "text/markdown" ) with col2: if st.button("Clear History"): st.session_state.chat_history = [] st.rerun() # Enhanced About tab with tab5: st.subheader("ℹ️ About BioSeq Chat Pro") st.markdown(""" ### 🚀 Enhanced Features #### **Collaborative AI System** - 🔍 **Investigator**: Verifies facts and identifies knowledge gaps - 📝 **Supervisor**: Creates comprehensive, structured answers - ✅ **Critic**: Reviews for accuracy and clarity - 🎯 **Integrator**: Synthesizes all inputs into final answer #### **Technical Improvements** - **8000 token responses** for comprehensive answers - **Enhanced context building** with semantic search - **Multiple collaboration modes** (Full, Quick, Deep) - **Scientific source prioritization** in web search - **Larger embedding models** for better accuracy ### 🧬 Supported Analyses - **Protein Analysis**: ESM-2 embeddings, property prediction - **DNA Analysis**: DNABERT-2/BERT embeddings, motif search - **RAG Chat**: Context-aware Q&A with file integration - **PDF Support**: Direct analysis of research papers ### 📚 Models & Technologies - **LLM**: Llama 3.1 70B (via Fireworks AI) - **Protein**: ESM-2 (Meta/Facebook) - **DNA**: DNABERT-2 (Microsoft) / BERT (Google) - **Embeddings**: all-mpnet-base-v2 (Sentence Transformers) - **Vector Search**: FAISS (Facebook) ### ⚠️ Disclaimer This tool is designed for **research and educational purposes only**. - Not intended for medical diagnosis or treatment - Not validated for clinical use - Always consult qualified professionals for medical decisions ### 🔧 System Status """) # System status with better formatting col1, col2 = st.columns(2) deps_essential = { "PyTorch": TORCH_AVAILABLE, "Transformers": TRANSFORMERS_AVAILABLE, "Sentence Transformers": SENTENCE_TRANSFORMERS_AVAILABLE, "FAISS": FAISS_AVAILABLE, } deps_optional = { "BioPython": BIOPYTHON_AVAILABLE, "Datasets": DATASETS_AVAILABLE, "PDF (pdfplumber)": PDFPLUMBER_AVAILABLE, "PDF (PyPDF2)": PYPDF2_AVAILABLE } with col1: st.markdown("**Essential Components:**") for name, available in deps_essential.items(): if available: st.success(f"✅ {name}") else: st.error(f"❌ {name}") with col2: st.markdown("**Optional Components:**") for name, available in deps_optional.items(): if available: st.success(f"✅ {name}") else: st.warning(f"⚠️ {name}") # Performance metrics if st.session_state.chat_history: st.markdown("### 📈 Usage Statistics") col1, col2, col3 = st.columns(3) with col1: st.metric("Total Queries", len(st.session_state.chat_history)) with col2: modes = [h['mode'] for h in st.session_state.chat_history] most_used = max(set(modes), key=modes.count) if modes else "N/A" st.metric("Most Used Mode", most_used) with col3: avg_length = sum(len(h['answer']) for h in st.session_state.chat_history) / len(st.session_state.chat_history) st.metric("Avg Answer Length", f"{avg_length:.0f} chars") st.markdown(""" --- ### 📞 Support & Feedback - Report issues or suggest features - Contribute to development - Share your research results **Version**: 2.0.0 Pro | **Last Updated**: 2025 """)