#!/usr/bin/env python3 """ Anton Microscopy Analysis - Clean UI """ import streamlit as st import os import sys from pathlib import Path import numpy as np from PIL import Image import random import logging import traceback # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Setup page st.set_page_config( page_title="Anton Microscopy Analysis", page_icon="🔬", layout="centered" ) # Clean, modern CSS st.markdown(""" """, unsafe_allow_html=True) # Add Anton to path sys.path.append(str(Path(__file__).parent)) # Import Anton components try: from anton.core.pipeline import AnalysisPipeline from anton.utils.image_io import ImageLoader # CMPO mapping removed for simplicity anton_available = True except ImportError as e: anton_available = False import_error = str(e) # Main header st.markdown('

🔬 Anton Microscopy Analysis

', unsafe_allow_html=True) if not anton_available: st.error(f"System Error: {import_error}") st.stop() # Image input section st.markdown('
Select Image
', unsafe_allow_html=True) # Check for sample images sample_images_path = Path("sample_images") sample_images = sorted(list(sample_images_path.glob("*.BMP"))) if sample_images_path.exists() else [] # Initialize session state if 'selected_sample_image' not in st.session_state: st.session_state.selected_sample_image = sample_images[0].name if sample_images else None # Image source selection col1, col2 = st.columns(2) with col1: if sample_images: if st.button("🎲 Random Sample", use_container_width=True): st.session_state.selected_sample_image = random.choice(sample_images).name st.rerun() with col2: use_sample = st.toggle("Use Sample Images", value=bool(sample_images)) # Image selection current_image = None image_to_analyze = None if use_sample and sample_images: # Sample image selection selected_image = st.selectbox( "Choose sample image:", [img.name for img in sample_images], index=[img.name for img in sample_images].index(st.session_state.selected_sample_image) if st.session_state.selected_sample_image in [img.name for img in sample_images] else 0, label_visibility="collapsed" ) if selected_image != st.session_state.selected_sample_image: st.session_state.selected_sample_image = selected_image # Load sample image sample_image_path = sample_images_path / selected_image if sample_image_path.exists(): try: loader = ImageLoader() current_image = loader.load(str(sample_image_path)) image_to_analyze = str(sample_image_path) except Exception as e: st.error(f"Error loading image: {e}") else: # File upload uploaded_file = st.file_uploader( "Upload microscopy image", type=['png', 'jpg', 'jpeg', 'tiff', 'bmp'], label_visibility="collapsed" ) if uploaded_file: try: import tempfile with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_file: tmp_file.write(uploaded_file.getbuffer()) temp_path = tmp_file.name try: loader = ImageLoader() current_image = loader.load(temp_path) except: pil_img = Image.open(temp_path) current_image = np.array(pil_img) if len(current_image.shape) == 3 and current_image.shape[2] == 3: current_image = np.mean(current_image, axis=2).astype(np.uint8) image_to_analyze = temp_path except Exception as e: st.error(f"Error loading image: {e}") # Display image if current_image is not None: st.markdown('
', unsafe_allow_html=True) st.image(current_image, use_container_width=True) st.markdown('
', unsafe_allow_html=True) # Analysis button st.markdown("---") analyze_btn = st.button("🚀 Analyze Image", type="primary", use_container_width=True) # Run analysis if analyze_btn: logger.info("Analysis button pressed") st.info("Starting analysis...") # Check API keys vlm_provider = "mock" if os.getenv('GOOGLE_API_KEY'): vlm_provider = "gemini" logger.info("Using Gemini VLM provider") elif os.getenv('ANTHROPIC_API_KEY'): vlm_provider = "claude" logger.info("Using Claude VLM provider") else: logger.info("Using mock VLM provider (no API keys found)") # Configure pipeline if use_sample and sample_images: biological_context = { "experiment_type": "protein_translocation", "cell_line": "U2OS_osteosarcoma", "protein": "FKHR-GFP", "readout": "nuclear_vs_cytoplasmic_localization" } logger.info("Using sample image context: protein translocation") else: biological_context = { "experiment_type": "general_microscopy", "readout": "cellular_morphology_and_phenotypes" } logger.info("Using general microscopy context") config = { "vlm_provider": vlm_provider, "biological_context": biological_context } logger.info(f"Pipeline config: {config}") # Run analysis with progressive display logger.info(f"Creating pipeline with image: {image_to_analyze}") pipeline = AnalysisPipeline(config) logger.info("Pipeline created, starting progressive analysis...") # Create containers for each stage st.markdown('
', unsafe_allow_html=True) st.markdown("### Analysis Results") stage_containers = {} stage_statuses = {} stages = [ ("Global Analysis", "stage_1_global"), ("Object Detection", "stage_2_objects"), ("Feature Analysis", "stage_3_features"), ("Population Analysis", "stage_4_population") ] # Initialize containers for stage_name, stage_key in stages: stage_containers[stage_key] = st.empty() stage_statuses[stage_key] = st.empty() def update_stage_display(stage_key, status, data): """Update the display for a specific stage.""" stage_name = next(name for name, key in stages if key == stage_key) if status == "running": stage_statuses[stage_key].info(f"⏳ {stage_name}: {data}") stage_containers[stage_key].expander(f"📋 {stage_name}", expanded=False).write("Processing...") elif status == "completed": stage_statuses[stage_key].success(f"✅ {stage_name}: Completed") # Extract content from stage data if isinstance(data, dict): logger.info(f"Stage {stage_key} data keys: {list(data.keys())}") # Handle different stage result formats content = data.get('description') if not content: content = data.get('segmentation_guidance') if not content: content = data.get('population_summary') if not content: # Stage 3 feature_descriptions is a list feature_descs = data.get('feature_descriptions', []) logger.info(f"Stage 3 feature_descriptions: {feature_descs}") if feature_descs and isinstance(feature_descs, list): content = '\n'.join(str(desc) for desc in feature_descs) else: content = 'Analysis completed' logger.info(f"Stage {stage_key} final content: {content[:100]}...") if content and content.startswith('```'): lines = content.split('\n') content = '\n'.join([line for line in lines if not line.strip().startswith('```')]) # Display in expandable container with stage_containers[stage_key].expander(f"📋 {stage_name}", expanded=True): st.write(content[:1000] + "..." if len(content) > 1000 else content) else: stage_containers[stage_key].expander(f"📋 {stage_name}", expanded=True).write(str(data)) try: # Create callback to handle stage updates def stage_callback(stage, status, data): # Map stage names: stage_1 -> stage_1_global stage_mapping = { "stage_1": "stage_1_global", "stage_2": "stage_2_objects", "stage_3": "stage_3_features", "stage_4": "stage_4_population" } mapped_stage = stage_mapping.get(stage, stage) update_stage_display(mapped_stage, status, data) # Run progressive analysis results = pipeline.run_pipeline_progressive( image_to_analyze, callback=stage_callback ) logger.info(f"Progressive analysis completed. Results keys: {list(results.keys()) if results else 'None'}") # Clear status messages after completion for stage_key in stage_statuses: stage_statuses[stage_key].empty() logger.info("Progressive analysis results displayed successfully") except Exception as e: # Clear any running status messages on error for stage_key in stage_statuses: stage_statuses[stage_key].empty() error_msg = f"Analysis failed: {str(e)}" logger.error(error_msg) logger.error(f"Full traceback: {traceback.format_exc()}") st.error(error_msg) st.code(traceback.format_exc()) st.markdown('
', unsafe_allow_html=True) # Cleanup if 'temp_path' in locals() and os.path.exists(temp_path): try: os.remove(temp_path) except: pass else: st.markdown('
', unsafe_allow_html=True) st.markdown("**Upload an image or select a sample to begin analysis**") st.markdown('
', unsafe_allow_html=True)