#!/usr/bin/env python3
"""
Anton Microscopy Analysis - Clean UI
"""
import streamlit as st
import os
import sys
from pathlib import Path
import numpy as np
from PIL import Image
import random
import logging
import traceback
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Setup page
st.set_page_config(
page_title="Anton Microscopy Analysis",
page_icon="🔬",
layout="centered"
)
# Clean, modern CSS
st.markdown("""
""", unsafe_allow_html=True)
# Add Anton to path
sys.path.append(str(Path(__file__).parent))
# Import Anton components
try:
from anton.core.pipeline import AnalysisPipeline
from anton.utils.image_io import ImageLoader
# CMPO mapping removed for simplicity
anton_available = True
except ImportError as e:
anton_available = False
import_error = str(e)
# Main header
st.markdown('
🔬 Anton Microscopy Analysis
', unsafe_allow_html=True)
if not anton_available:
st.error(f"System Error: {import_error}")
st.stop()
# Image input section
st.markdown('', unsafe_allow_html=True)
# Check for sample images
sample_images_path = Path("sample_images")
sample_images = sorted(list(sample_images_path.glob("*.BMP"))) if sample_images_path.exists() else []
# Initialize session state
if 'selected_sample_image' not in st.session_state:
st.session_state.selected_sample_image = sample_images[0].name if sample_images else None
# Image source selection
col1, col2 = st.columns(2)
with col1:
if sample_images:
if st.button("🎲 Random Sample", use_container_width=True):
st.session_state.selected_sample_image = random.choice(sample_images).name
st.rerun()
with col2:
use_sample = st.toggle("Use Sample Images", value=bool(sample_images))
# Image selection
current_image = None
image_to_analyze = None
if use_sample and sample_images:
# Sample image selection
selected_image = st.selectbox(
"Choose sample image:",
[img.name for img in sample_images],
index=[img.name for img in sample_images].index(st.session_state.selected_sample_image) if st.session_state.selected_sample_image in [img.name for img in sample_images] else 0,
label_visibility="collapsed"
)
if selected_image != st.session_state.selected_sample_image:
st.session_state.selected_sample_image = selected_image
# Load sample image
sample_image_path = sample_images_path / selected_image
if sample_image_path.exists():
try:
loader = ImageLoader()
current_image = loader.load(str(sample_image_path))
image_to_analyze = str(sample_image_path)
except Exception as e:
st.error(f"Error loading image: {e}")
else:
# File upload
uploaded_file = st.file_uploader(
"Upload microscopy image",
type=['png', 'jpg', 'jpeg', 'tiff', 'bmp'],
label_visibility="collapsed"
)
if uploaded_file:
try:
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_file:
tmp_file.write(uploaded_file.getbuffer())
temp_path = tmp_file.name
try:
loader = ImageLoader()
current_image = loader.load(temp_path)
except:
pil_img = Image.open(temp_path)
current_image = np.array(pil_img)
if len(current_image.shape) == 3 and current_image.shape[2] == 3:
current_image = np.mean(current_image, axis=2).astype(np.uint8)
image_to_analyze = temp_path
except Exception as e:
st.error(f"Error loading image: {e}")
# Display image
if current_image is not None:
st.markdown('', unsafe_allow_html=True)
st.image(current_image, use_container_width=True)
st.markdown('
', unsafe_allow_html=True)
# Analysis button
st.markdown("---")
analyze_btn = st.button("🚀 Analyze Image", type="primary", use_container_width=True)
# Run analysis
if analyze_btn:
logger.info("Analysis button pressed")
st.info("Starting analysis...")
# Check API keys
vlm_provider = "mock"
if os.getenv('GOOGLE_API_KEY'):
vlm_provider = "gemini"
logger.info("Using Gemini VLM provider")
elif os.getenv('ANTHROPIC_API_KEY'):
vlm_provider = "claude"
logger.info("Using Claude VLM provider")
else:
logger.info("Using mock VLM provider (no API keys found)")
# Configure pipeline
if use_sample and sample_images:
biological_context = {
"experiment_type": "protein_translocation",
"cell_line": "U2OS_osteosarcoma",
"protein": "FKHR-GFP",
"readout": "nuclear_vs_cytoplasmic_localization"
}
logger.info("Using sample image context: protein translocation")
else:
biological_context = {
"experiment_type": "general_microscopy",
"readout": "cellular_morphology_and_phenotypes"
}
logger.info("Using general microscopy context")
config = {
"vlm_provider": vlm_provider,
"biological_context": biological_context
}
logger.info(f"Pipeline config: {config}")
# Run analysis with progressive display
logger.info(f"Creating pipeline with image: {image_to_analyze}")
pipeline = AnalysisPipeline(config)
logger.info("Pipeline created, starting progressive analysis...")
# Create containers for each stage
st.markdown('', unsafe_allow_html=True)
st.markdown("### Analysis Results")
stage_containers = {}
stage_statuses = {}
stages = [
("Global Analysis", "stage_1_global"),
("Object Detection", "stage_2_objects"),
("Feature Analysis", "stage_3_features"),
("Population Analysis", "stage_4_population")
]
# Initialize containers
for stage_name, stage_key in stages:
stage_containers[stage_key] = st.empty()
stage_statuses[stage_key] = st.empty()
def update_stage_display(stage_key, status, data):
"""Update the display for a specific stage."""
stage_name = next(name for name, key in stages if key == stage_key)
if status == "running":
stage_statuses[stage_key].info(f"⏳ {stage_name}: {data}")
stage_containers[stage_key].expander(f"📋 {stage_name}", expanded=False).write("Processing...")
elif status == "completed":
stage_statuses[stage_key].success(f"✅ {stage_name}: Completed")
# Extract content from stage data
if isinstance(data, dict):
logger.info(f"Stage {stage_key} data keys: {list(data.keys())}")
# Handle different stage result formats
content = data.get('description')
if not content:
content = data.get('segmentation_guidance')
if not content:
content = data.get('population_summary')
if not content:
# Stage 3 feature_descriptions is a list
feature_descs = data.get('feature_descriptions', [])
logger.info(f"Stage 3 feature_descriptions: {feature_descs}")
if feature_descs and isinstance(feature_descs, list):
content = '\n'.join(str(desc) for desc in feature_descs)
else:
content = 'Analysis completed'
logger.info(f"Stage {stage_key} final content: {content[:100]}...")
if content and content.startswith('```'):
lines = content.split('\n')
content = '\n'.join([line for line in lines if not line.strip().startswith('```')])
# Display in expandable container
with stage_containers[stage_key].expander(f"📋 {stage_name}", expanded=True):
st.write(content[:1000] + "..." if len(content) > 1000 else content)
else:
stage_containers[stage_key].expander(f"📋 {stage_name}", expanded=True).write(str(data))
try:
# Create callback to handle stage updates
def stage_callback(stage, status, data):
# Map stage names: stage_1 -> stage_1_global
stage_mapping = {
"stage_1": "stage_1_global",
"stage_2": "stage_2_objects",
"stage_3": "stage_3_features",
"stage_4": "stage_4_population"
}
mapped_stage = stage_mapping.get(stage, stage)
update_stage_display(mapped_stage, status, data)
# Run progressive analysis
results = pipeline.run_pipeline_progressive(
image_to_analyze,
callback=stage_callback
)
logger.info(f"Progressive analysis completed. Results keys: {list(results.keys()) if results else 'None'}")
# Clear status messages after completion
for stage_key in stage_statuses:
stage_statuses[stage_key].empty()
logger.info("Progressive analysis results displayed successfully")
except Exception as e:
# Clear any running status messages on error
for stage_key in stage_statuses:
stage_statuses[stage_key].empty()
error_msg = f"Analysis failed: {str(e)}"
logger.error(error_msg)
logger.error(f"Full traceback: {traceback.format_exc()}")
st.error(error_msg)
st.code(traceback.format_exc())
st.markdown('
', unsafe_allow_html=True)
# Cleanup
if 'temp_path' in locals() and os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
else:
st.markdown('', unsafe_allow_html=True)
st.markdown("**Upload an image or select a sample to begin analysis**")
st.markdown('
', unsafe_allow_html=True)