pskeshu's picture
πŸ› Fix Stage 3 feature analysis display issue
0c8c84a
#!/usr/bin/env python3
"""
Anton Microscopy Analysis - Clean UI
"""
import streamlit as st
import os
import sys
from pathlib import Path
import numpy as np
from PIL import Image
import random
import logging
import traceback
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Setup page
st.set_page_config(
page_title="Anton Microscopy Analysis",
page_icon="πŸ”¬",
layout="centered"
)
# Clean, modern CSS
st.markdown("""
<style>
/* Hide Streamlit branding */
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
header {visibility: hidden;}
/* Clean typography */
.main-header {
font-size: 2.5rem;
font-weight: 300;
color: #1f2937;
text-align: center;
margin-bottom: 3rem;
letter-spacing: -0.02em;
}
.section-header {
font-size: 1.1rem;
font-weight: 500;
color: #374151;
margin-bottom: 1rem;
}
/* Clean upload area */
.upload-section {
background: #f8fafc;
border: 2px dashed #e2e8f0;
border-radius: 12px;
padding: 2rem;
text-align: center;
margin-bottom: 2rem;
}
/* Primary button styling */
.stButton > button {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 8px;
padding: 0.75rem 2rem;
font-weight: 500;
font-size: 1rem;
width: 100%;
transition: all 0.2s ease;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
}
.stButton > button:hover {
transform: translateY(-1px);
box-shadow: 0 8px 15px -3px rgba(0, 0, 0, 0.1);
}
/* Sample selection */
.sample-selector {
background: white;
border-radius: 8px;
padding: 1rem;
box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1);
margin-bottom: 1rem;
}
/* Results styling */
.results-container {
background: white;
border-radius: 12px;
padding: 1.5rem;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
margin-top: 2rem;
}
/* Image display */
.image-container {
border-radius: 8px;
overflow: hidden;
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1);
}
/* Clean spacing */
.block-container {
padding-top: 2rem;
max-width: 1000px;
}
</style>
""", unsafe_allow_html=True)
# Add Anton to path
sys.path.append(str(Path(__file__).parent))
# Import Anton components
try:
from anton.core.pipeline import AnalysisPipeline
from anton.utils.image_io import ImageLoader
# CMPO mapping removed for simplicity
anton_available = True
except ImportError as e:
anton_available = False
import_error = str(e)
# Main header
st.markdown('<h1 class="main-header">πŸ”¬ Anton Microscopy Analysis</h1>', unsafe_allow_html=True)
if not anton_available:
st.error(f"System Error: {import_error}")
st.stop()
# Image input section
st.markdown('<div class="section-header">Select Image</div>', unsafe_allow_html=True)
# Check for sample images
sample_images_path = Path("sample_images")
sample_images = sorted(list(sample_images_path.glob("*.BMP"))) if sample_images_path.exists() else []
# Initialize session state
if 'selected_sample_image' not in st.session_state:
st.session_state.selected_sample_image = sample_images[0].name if sample_images else None
# Image source selection
col1, col2 = st.columns(2)
with col1:
if sample_images:
if st.button("🎲 Random Sample", use_container_width=True):
st.session_state.selected_sample_image = random.choice(sample_images).name
st.rerun()
with col2:
use_sample = st.toggle("Use Sample Images", value=bool(sample_images))
# Image selection
current_image = None
image_to_analyze = None
if use_sample and sample_images:
# Sample image selection
selected_image = st.selectbox(
"Choose sample image:",
[img.name for img in sample_images],
index=[img.name for img in sample_images].index(st.session_state.selected_sample_image) if st.session_state.selected_sample_image in [img.name for img in sample_images] else 0,
label_visibility="collapsed"
)
if selected_image != st.session_state.selected_sample_image:
st.session_state.selected_sample_image = selected_image
# Load sample image
sample_image_path = sample_images_path / selected_image
if sample_image_path.exists():
try:
loader = ImageLoader()
current_image = loader.load(str(sample_image_path))
image_to_analyze = str(sample_image_path)
except Exception as e:
st.error(f"Error loading image: {e}")
else:
# File upload
uploaded_file = st.file_uploader(
"Upload microscopy image",
type=['png', 'jpg', 'jpeg', 'tiff', 'bmp'],
label_visibility="collapsed"
)
if uploaded_file:
try:
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{uploaded_file.name}") as tmp_file:
tmp_file.write(uploaded_file.getbuffer())
temp_path = tmp_file.name
try:
loader = ImageLoader()
current_image = loader.load(temp_path)
except:
pil_img = Image.open(temp_path)
current_image = np.array(pil_img)
if len(current_image.shape) == 3 and current_image.shape[2] == 3:
current_image = np.mean(current_image, axis=2).astype(np.uint8)
image_to_analyze = temp_path
except Exception as e:
st.error(f"Error loading image: {e}")
# Display image
if current_image is not None:
st.markdown('<div class="image-container">', unsafe_allow_html=True)
st.image(current_image, use_container_width=True)
st.markdown('</div>', unsafe_allow_html=True)
# Analysis button
st.markdown("---")
analyze_btn = st.button("πŸš€ Analyze Image", type="primary", use_container_width=True)
# Run analysis
if analyze_btn:
logger.info("Analysis button pressed")
st.info("Starting analysis...")
# Check API keys
vlm_provider = "mock"
if os.getenv('GOOGLE_API_KEY'):
vlm_provider = "gemini"
logger.info("Using Gemini VLM provider")
elif os.getenv('ANTHROPIC_API_KEY'):
vlm_provider = "claude"
logger.info("Using Claude VLM provider")
else:
logger.info("Using mock VLM provider (no API keys found)")
# Configure pipeline
if use_sample and sample_images:
biological_context = {
"experiment_type": "protein_translocation",
"cell_line": "U2OS_osteosarcoma",
"protein": "FKHR-GFP",
"readout": "nuclear_vs_cytoplasmic_localization"
}
logger.info("Using sample image context: protein translocation")
else:
biological_context = {
"experiment_type": "general_microscopy",
"readout": "cellular_morphology_and_phenotypes"
}
logger.info("Using general microscopy context")
config = {
"vlm_provider": vlm_provider,
"biological_context": biological_context
}
logger.info(f"Pipeline config: {config}")
# Run analysis with progressive display
logger.info(f"Creating pipeline with image: {image_to_analyze}")
pipeline = AnalysisPipeline(config)
logger.info("Pipeline created, starting progressive analysis...")
# Create containers for each stage
st.markdown('<div class="results-container">', unsafe_allow_html=True)
st.markdown("### Analysis Results")
stage_containers = {}
stage_statuses = {}
stages = [
("Global Analysis", "stage_1_global"),
("Object Detection", "stage_2_objects"),
("Feature Analysis", "stage_3_features"),
("Population Analysis", "stage_4_population")
]
# Initialize containers
for stage_name, stage_key in stages:
stage_containers[stage_key] = st.empty()
stage_statuses[stage_key] = st.empty()
def update_stage_display(stage_key, status, data):
"""Update the display for a specific stage."""
stage_name = next(name for name, key in stages if key == stage_key)
if status == "running":
stage_statuses[stage_key].info(f"⏳ {stage_name}: {data}")
stage_containers[stage_key].expander(f"πŸ“‹ {stage_name}", expanded=False).write("Processing...")
elif status == "completed":
stage_statuses[stage_key].success(f"βœ… {stage_name}: Completed")
# Extract content from stage data
if isinstance(data, dict):
logger.info(f"Stage {stage_key} data keys: {list(data.keys())}")
# Handle different stage result formats
content = data.get('description')
if not content:
content = data.get('segmentation_guidance')
if not content:
content = data.get('population_summary')
if not content:
# Stage 3 feature_descriptions is a list
feature_descs = data.get('feature_descriptions', [])
logger.info(f"Stage 3 feature_descriptions: {feature_descs}")
if feature_descs and isinstance(feature_descs, list):
content = '\n'.join(str(desc) for desc in feature_descs)
else:
content = 'Analysis completed'
logger.info(f"Stage {stage_key} final content: {content[:100]}...")
if content and content.startswith('```'):
lines = content.split('\n')
content = '\n'.join([line for line in lines if not line.strip().startswith('```')])
# Display in expandable container
with stage_containers[stage_key].expander(f"πŸ“‹ {stage_name}", expanded=True):
st.write(content[:1000] + "..." if len(content) > 1000 else content)
else:
stage_containers[stage_key].expander(f"πŸ“‹ {stage_name}", expanded=True).write(str(data))
try:
# Create callback to handle stage updates
def stage_callback(stage, status, data):
# Map stage names: stage_1 -> stage_1_global
stage_mapping = {
"stage_1": "stage_1_global",
"stage_2": "stage_2_objects",
"stage_3": "stage_3_features",
"stage_4": "stage_4_population"
}
mapped_stage = stage_mapping.get(stage, stage)
update_stage_display(mapped_stage, status, data)
# Run progressive analysis
results = pipeline.run_pipeline_progressive(
image_to_analyze,
callback=stage_callback
)
logger.info(f"Progressive analysis completed. Results keys: {list(results.keys()) if results else 'None'}")
# Clear status messages after completion
for stage_key in stage_statuses:
stage_statuses[stage_key].empty()
logger.info("Progressive analysis results displayed successfully")
except Exception as e:
# Clear any running status messages on error
for stage_key in stage_statuses:
stage_statuses[stage_key].empty()
error_msg = f"Analysis failed: {str(e)}"
logger.error(error_msg)
logger.error(f"Full traceback: {traceback.format_exc()}")
st.error(error_msg)
st.code(traceback.format_exc())
st.markdown('</div>', unsafe_allow_html=True)
# Cleanup
if 'temp_path' in locals() and os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
else:
st.markdown('<div class="upload-section">', unsafe_allow_html=True)
st.markdown("**Upload an image or select a sample to begin analysis**")
st.markdown('</div>', unsafe_allow_html=True)