Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Anton Microscopy Analysis - Clean UI | |
| """ | |
| import streamlit as st | |
| import os | |
| import sys | |
| from pathlib import Path | |
| import numpy as np | |
| from PIL import Image | |
| import random | |
| import logging | |
| import traceback | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Setup page | |
| st.set_page_config( | |
| page_title="Anton Microscopy Analysis", | |
| page_icon="π¬", | |
| layout="centered" | |
| ) | |
| # Clean, modern CSS | |
| st.markdown(""" | |
| <style> | |
| /* Hide Streamlit branding */ | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| header {visibility: hidden;} | |
| /* Clean typography */ | |
| .main-header { | |
| font-size: 2.5rem; | |
| font-weight: 300; | |
| color: #1f2937; | |
| text-align: center; | |
| margin-bottom: 3rem; | |
| letter-spacing: -0.02em; | |
| } | |
| .section-header { | |
| font-size: 1.1rem; | |
| font-weight: 500; | |
| color: #374151; | |
| margin-bottom: 1rem; | |
| } | |
| /* Clean upload area */ | |
| .upload-section { | |
| background: #f8fafc; | |
| border: 2px dashed #e2e8f0; | |
| border-radius: 12px; | |
| padding: 2rem; | |
| text-align: center; | |
| margin-bottom: 2rem; | |
| } | |
| /* Primary button styling */ | |
| .stButton > button { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| border: none; | |
| border-radius: 8px; | |
| padding: 0.75rem 2rem; | |
| font-weight: 500; | |
| font-size: 1rem; | |
| width: 100%; | |
| transition: all 0.2s ease; | |
| box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); | |
| } | |
| .stButton > button:hover { | |
| transform: translateY(-1px); | |
| box-shadow: 0 8px 15px -3px rgba(0, 0, 0, 0.1); | |
| } | |
| /* Sample selection */ | |
| .sample-selector { | |
| background: white; | |
| border-radius: 8px; | |
| padding: 1rem; | |
| box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1); | |
| margin-bottom: 1rem; | |
| } | |
| /* Results styling */ | |
| .results-container { | |
| background: white; | |
| border-radius: 12px; | |
| padding: 1.5rem; | |
| box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); | |
| margin-top: 2rem; | |
| } | |
| /* Image display */ | |
| .image-container { | |
| border-radius: 8px; | |
| overflow: hidden; | |
| box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); | |
| } | |
| /* Clean spacing */ | |
| .block-container { | |
| padding-top: 2rem; | |
| max-width: 1000px; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Add Anton to path | |
| sys.path.append(str(Path(__file__).parent)) | |
| # Import Anton components | |
| try: | |
| from anton.core.pipeline import AnalysisPipeline | |
| from anton.utils.image_io import ImageLoader | |
| # CMPO mapping removed for simplicity | |
| anton_available = True | |
| except ImportError as e: | |
| anton_available = False | |
| import_error = str(e) | |
| # Main header | |
| st.markdown('<h1 class="main-header">π¬ Anton Microscopy Analysis</h1>', unsafe_allow_html=True) | |
| if not anton_available: | |
| st.error(f"System Error: {import_error}") | |
| st.stop() | |
| # Image input section | |
| st.markdown('<div class="section-header">Select Image</div>', unsafe_allow_html=True) | |
| # Check for sample images | |
| sample_images_path = Path("sample_images") | |
| sample_images = sorted(list(sample_images_path.glob("*.BMP"))) if sample_images_path.exists() else [] | |
| # Initialize session state | |
| if 'selected_sample_image' not in st.session_state: | |
| st.session_state.selected_sample_image = sample_images[0].name if sample_images else None | |
| # Image source selection | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if sample_images: | |
| if st.button("π² Random Sample", use_container_width=True): | |
| st.session_state.selected_sample_image = random.choice(sample_images).name | |
| st.rerun() | |
| with col2: | |
| use_sample = st.toggle("Use Sample Images", value=bool(sample_images)) | |
| # Image selection | |
| current_image = None | |
| image_to_analyze = None | |
| if use_sample and sample_images: | |
| # Sample image selection | |
| selected_image = st.selectbox( | |
| "Choose sample image:", | |
| [img.name for img in sample_images], | |
| index=[img.name for img in sample_images].index(st.session_state.selected_sample_image) if st.session_state.selected_sample_image in [img.name for img in sample_images] else 0, | |
| label_visibility="collapsed" | |
| ) | |
| if selected_image != st.session_state.selected_sample_image: | |
| st.session_state.selected_sample_image = selected_image | |
| # Load sample image | |
| sample_image_path = sample_images_path / selected_image | |
| if sample_image_path.exists(): | |
| try: | |
| loader = ImageLoader() | |
| current_image = loader.load(str(sample_image_path)) | |
| image_to_analyze = str(sample_image_path) | |
| except Exception as e: | |
| st.error(f"Error loading image: {e}") | |
| else: | |
| # File upload | |
| uploaded_file = st.file_uploader( | |
| "Upload microscopy image", | |
| type=['png', 'jpg', 'jpeg', 'tiff', 'bmp'], | |
| label_visibility="collapsed" | |
| ) | |
| if uploaded_file: | |
| try: | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_file: | |
| tmp_file.write(uploaded_file.getbuffer()) | |
| temp_path = tmp_file.name | |
| try: | |
| loader = ImageLoader() | |
| current_image = loader.load(temp_path) | |
| except: | |
| pil_img = Image.open(temp_path) | |
| current_image = np.array(pil_img) | |
| if len(current_image.shape) == 3 and current_image.shape[2] == 3: | |
| current_image = np.mean(current_image, axis=2).astype(np.uint8) | |
| image_to_analyze = temp_path | |
| except Exception as e: | |
| st.error(f"Error loading image: {e}") | |
| # Display image | |
| if current_image is not None: | |
| st.markdown('<div class="image-container">', unsafe_allow_html=True) | |
| st.image(current_image, use_container_width=True) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Analysis button | |
| st.markdown("---") | |
| analyze_btn = st.button("π Analyze Image", type="primary", use_container_width=True) | |
| # Run analysis | |
| if analyze_btn: | |
| logger.info("Analysis button pressed") | |
| st.info("Starting analysis...") | |
| # Check API keys | |
| vlm_provider = "mock" | |
| if os.getenv('GOOGLE_API_KEY'): | |
| vlm_provider = "gemini" | |
| logger.info("Using Gemini VLM provider") | |
| elif os.getenv('ANTHROPIC_API_KEY'): | |
| vlm_provider = "claude" | |
| logger.info("Using Claude VLM provider") | |
| else: | |
| logger.info("Using mock VLM provider (no API keys found)") | |
| # Configure pipeline | |
| if use_sample and sample_images: | |
| biological_context = { | |
| "experiment_type": "protein_translocation", | |
| "cell_line": "U2OS_osteosarcoma", | |
| "protein": "FKHR-GFP", | |
| "readout": "nuclear_vs_cytoplasmic_localization" | |
| } | |
| logger.info("Using sample image context: protein translocation") | |
| else: | |
| biological_context = { | |
| "experiment_type": "general_microscopy", | |
| "readout": "cellular_morphology_and_phenotypes" | |
| } | |
| logger.info("Using general microscopy context") | |
| config = { | |
| "vlm_provider": vlm_provider, | |
| "biological_context": biological_context | |
| } | |
| logger.info(f"Pipeline config: {config}") | |
| # Run analysis with progressive display | |
| logger.info(f"Creating pipeline with image: {image_to_analyze}") | |
| pipeline = AnalysisPipeline(config) | |
| logger.info("Pipeline created, starting progressive analysis...") | |
| # Create containers for each stage | |
| st.markdown('<div class="results-container">', unsafe_allow_html=True) | |
| st.markdown("### Analysis Results") | |
| stage_containers = {} | |
| stage_statuses = {} | |
| stages = [ | |
| ("Global Analysis", "stage_1_global"), | |
| ("Object Detection", "stage_2_objects"), | |
| ("Feature Analysis", "stage_3_features"), | |
| ("Population Analysis", "stage_4_population") | |
| ] | |
| # Initialize containers | |
| for stage_name, stage_key in stages: | |
| stage_containers[stage_key] = st.empty() | |
| stage_statuses[stage_key] = st.empty() | |
| def update_stage_display(stage_key, status, data): | |
| """Update the display for a specific stage.""" | |
| stage_name = next(name for name, key in stages if key == stage_key) | |
| if status == "running": | |
| stage_statuses[stage_key].info(f"β³ {stage_name}: {data}") | |
| stage_containers[stage_key].expander(f"π {stage_name}", expanded=False).write("Processing...") | |
| elif status == "completed": | |
| stage_statuses[stage_key].success(f"β {stage_name}: Completed") | |
| # Extract content from stage data | |
| if isinstance(data, dict): | |
| logger.info(f"Stage {stage_key} data keys: {list(data.keys())}") | |
| # Handle different stage result formats | |
| content = data.get('description') | |
| if not content: | |
| content = data.get('segmentation_guidance') | |
| if not content: | |
| content = data.get('population_summary') | |
| if not content: | |
| # Stage 3 feature_descriptions is a list | |
| feature_descs = data.get('feature_descriptions', []) | |
| logger.info(f"Stage 3 feature_descriptions: {feature_descs}") | |
| if feature_descs and isinstance(feature_descs, list): | |
| content = '\n'.join(str(desc) for desc in feature_descs) | |
| else: | |
| content = 'Analysis completed' | |
| logger.info(f"Stage {stage_key} final content: {content[:100]}...") | |
| if content and content.startswith('```'): | |
| lines = content.split('\n') | |
| content = '\n'.join([line for line in lines if not line.strip().startswith('```')]) | |
| # Display in expandable container | |
| with stage_containers[stage_key].expander(f"π {stage_name}", expanded=True): | |
| st.write(content[:1000] + "..." if len(content) > 1000 else content) | |
| else: | |
| stage_containers[stage_key].expander(f"π {stage_name}", expanded=True).write(str(data)) | |
| try: | |
| # Create callback to handle stage updates | |
| def stage_callback(stage, status, data): | |
| # Map stage names: stage_1 -> stage_1_global | |
| stage_mapping = { | |
| "stage_1": "stage_1_global", | |
| "stage_2": "stage_2_objects", | |
| "stage_3": "stage_3_features", | |
| "stage_4": "stage_4_population" | |
| } | |
| mapped_stage = stage_mapping.get(stage, stage) | |
| update_stage_display(mapped_stage, status, data) | |
| # Run progressive analysis | |
| results = pipeline.run_pipeline_progressive( | |
| image_to_analyze, | |
| callback=stage_callback | |
| ) | |
| logger.info(f"Progressive analysis completed. Results keys: {list(results.keys()) if results else 'None'}") | |
| # Clear status messages after completion | |
| for stage_key in stage_statuses: | |
| stage_statuses[stage_key].empty() | |
| logger.info("Progressive analysis results displayed successfully") | |
| except Exception as e: | |
| # Clear any running status messages on error | |
| for stage_key in stage_statuses: | |
| stage_statuses[stage_key].empty() | |
| error_msg = f"Analysis failed: {str(e)}" | |
| logger.error(error_msg) | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| st.error(error_msg) | |
| st.code(traceback.format_exc()) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Cleanup | |
| if 'temp_path' in locals() and os.path.exists(temp_path): | |
| try: | |
| os.remove(temp_path) | |
| except: | |
| pass | |
| else: | |
| st.markdown('<div class="upload-section">', unsafe_allow_html=True) | |
| st.markdown("**Upload an image or select a sample to begin analysis**") | |
| st.markdown('</div>', unsafe_allow_html=True) |