Spaces:
Sleeping
Sleeping
| import altair as alt | |
| import numpy as np | |
| import pandas as pd | |
| import streamlit as st | |
| """ | |
| # Welcome to Streamlit! | |
| Edit `/streamlit_app.py` to customize this app to your heart's desire :heart:. | |
| If you have any questions, checkout our [documentation](https://docs.streamlit.io) and [community | |
| forums](https://discuss.streamlit.io). | |
| In the meantime, below is an example of what you can do with just a few lines of code: | |
| """ | |
| """ | |
| Streamlit Web Application for Mango Disease Detection | |
| Supports image upload, batch processing, and real-time webcam detection | |
| """ | |
| import streamlit as st | |
| import os | |
| import sys | |
| import tempfile | |
| import zipfile | |
| import shutil | |
| from datetime import datetime | |
| import cv2 | |
| import numpy as np | |
| import time | |
| import threading | |
| from queue import Queue | |
| import base64 | |
| from PIL import Image | |
| import io | |
| # Import the semantic detection system (assuming it's available) | |
| try: | |
| from src.semantic_disease_analyzer import SemanticDiseaseAnalyzer | |
| ANALYZER_AVAILABLE = True | |
| except ImportError: | |
| ANALYZER_AVAILABLE = False | |
| st.error("semantic_disease_analyzer module not found. Please ensure it's in your Python path.") | |
| # Configure Streamlit page | |
| st.set_page_config( | |
| page_title="Mango Disease Detection", | |
| page_icon="🥭", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| class StreamlitMangoDetector: | |
| """Streamlit interface for mango disease detection""" | |
| def __init__(self): | |
| if ANALYZER_AVAILABLE: | |
| if 'analyzer' not in st.session_state: | |
| with st.spinner("Initializing semantic disease detection system..."): | |
| try: | |
| st.session_state.analyzer = SemanticDiseaseAnalyzer() | |
| st.success("System ready for inference!") | |
| except Exception as e: | |
| st.error(f"Error initializing system: {e}") | |
| st.session_state.analyzer = None | |
| self.analyzer = st.session_state.analyzer | |
| else: | |
| self.analyzer = None | |
| def detect_diseases_image(self, image_array, filename="uploaded_image"): | |
| """Run disease detection on an image array""" | |
| if not self.analyzer: | |
| return None | |
| try: | |
| # Create temporary file path without keeping it open | |
| temp_dir = tempfile.gettempdir() | |
| temp_filename = f"temp_mango_{int(time.time() * 1000000)}.jpg" | |
| temp_path = os.path.join(temp_dir, temp_filename) | |
| # Convert to PIL Image | |
| if len(image_array.shape) == 3: | |
| image = Image.fromarray(image_array) | |
| else: | |
| image = Image.fromarray(image_array, mode='L') | |
| # Handle different image modes for JPEG conversion | |
| if image.mode in ('RGBA', 'LA', 'P'): | |
| # Convert RGBA/LA/P to RGB by creating white background | |
| if image.mode == 'P': | |
| image = image.convert('RGBA') | |
| # Create white background | |
| background = Image.new('RGB', image.size, (255, 255, 255)) | |
| if image.mode == 'RGBA': | |
| background.paste(image, mask=image.split()[-1]) # Use alpha channel as mask | |
| else: # LA mode | |
| background.paste(image, mask=image.split()[-1]) | |
| image = background | |
| elif image.mode not in ('RGB', 'L'): | |
| # Convert other modes to RGB | |
| image = image.convert('RGB') | |
| # Save image to temporary path | |
| image.save(temp_path, 'JPEG', quality=95) | |
| # Explicitly close the image to release file handles | |
| image.close() | |
| # Run detection | |
| results = self.analyzer.analyze_image_semantically( | |
| temp_path, save_results=False | |
| ) | |
| # Clean up - try multiple times if needed (Windows file locking issue) | |
| max_attempts = 3 | |
| for attempt in range(max_attempts): | |
| try: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| break | |
| except (PermissionError, OSError) as e: | |
| if attempt == max_attempts - 1: | |
| st.warning(f"Could not delete temporary file: {temp_path}") | |
| else: | |
| time.sleep(0.1) # Brief pause before retry | |
| return results | |
| except Exception as e: | |
| # Cleanup on error | |
| try: | |
| if 'temp_path' in locals() and os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| except: | |
| pass | |
| st.error(f"Detection error: {e}") | |
| return None | |
| def format_results_for_display(self, results): | |
| """Format detection results for Streamlit display""" | |
| if not results: | |
| return "No results available" | |
| # Basic detection info | |
| disease_level = results.get('disease_level', 'Unknown') | |
| severity = results.get('severity_percentage', 0) | |
| num_regions = results.get('num_diseased_regions', 0) | |
| # Status indicators | |
| status_colors = { | |
| 'Healthy': 'green', | |
| 'Early Disease': 'orange', | |
| 'Moderate Disease': 'red', | |
| 'Severe Disease': 'red', | |
| 'Critical Disease': 'darkred' | |
| } | |
| status_color = status_colors.get(disease_level, 'gray') | |
| # Create formatted output | |
| output = f""" | |
| ### Detection Results | |
| **Status:** <span style="color: {status_color}; font-weight: bold;">{disease_level}</span> | |
| **Severity:** {severity:.2f}% | |
| **Diseased Regions:** {num_regions} | |
| """ | |
| # Add semantic analysis if available | |
| semantic_info = results.get('semantic_info', {}) | |
| if semantic_info: | |
| diseases = semantic_info.get('diseases', []) | |
| if diseases: | |
| output += "\n**Detected Diseases:**\n" | |
| for disease in diseases: | |
| output += f"- {disease['label']}\n" | |
| # Economic impact | |
| economic_impact = semantic_info.get('economic_impact') | |
| if economic_impact: | |
| marketability = economic_impact.get('marketability_score', 0) | |
| output += f"\n**Marketability Score:** {marketability:.0f}%" | |
| # Treatment recommendations | |
| inferences = results.get('ontology_inferences', []) | |
| treatments = [inf for inf in inferences if any(word in inf.lower() | |
| for word in ['apply', 'improve', 'remove', 'use', 'avoid', 'reduce'])] | |
| if treatments: | |
| output += "\n\n**Treatment Recommendations:**\n" | |
| for treatment in treatments[:3]: # Show top 3 | |
| output += f"- {treatment}\n" | |
| # Quality assessment | |
| if severity < 2: | |
| quality = "Premium Quality" | |
| recommendation = "Suitable for premium market sale" | |
| elif severity < 8: | |
| quality = "Good Quality" | |
| recommendation = "Monitor for disease progression" | |
| elif severity < 20: | |
| quality = "Fair Quality" | |
| recommendation = "Consider treatment or reduced price sale" | |
| elif severity < 40: | |
| quality = "Poor Quality" | |
| recommendation = "Not suitable for fresh market, consider processing" | |
| else: | |
| quality = "Reject" | |
| recommendation = "Discard to prevent contamination" | |
| output += f"\n**Quality Grade:** {quality}\n" | |
| output += f"**Recommendation:** {recommendation}" | |
| return output | |
| def main(): | |
| """Main Streamlit application""" | |
| # Header | |
| st.title("Mango Disease Detection System") | |
| st.markdown("### AI-Powered Semantic Disease Analysis") | |
| if not ANALYZER_AVAILABLE: | |
| st.error("Cannot proceed without the semantic_disease_analyzer module.") | |
| st.stop() | |
| # Initialize detector | |
| detector = StreamlitMangoDetector() | |
| if not detector.analyzer: | |
| st.error("Failed to initialize the detection system.") | |
| st.stop() | |
| # Sidebar for mode selection | |
| st.sidebar.title("Detection Mode") | |
| mode = st.sidebar.selectbox( | |
| "Choose detection mode:", | |
| ["Single Image", "Batch Processing", "Webcam Detection", "About"] | |
| ) | |
| if mode == "Single Image": | |
| single_image_mode(detector) | |
| elif mode == "Batch Processing": | |
| batch_processing_mode(detector) | |
| elif mode == "Webcam Detection": | |
| webcam_mode(detector) | |
| elif mode == "About": | |
| about_page() | |
| def single_image_mode(detector): | |
| """Single image upload and detection""" | |
| st.header("Single Image Detection") | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| st.subheader("Upload Image") | |
| uploaded_file = st.file_uploader( | |
| "Choose a mango image...", | |
| type=['jpg', 'jpeg', 'png', 'bmp', 'tiff', 'webp'], | |
| help="Upload an image of a mango for disease detection" | |
| ) | |
| if uploaded_file is not None: | |
| # Display uploaded image | |
| image = Image.open(uploaded_file) | |
| st.image(image, caption="Uploaded Image", use_column_width=True) | |
| # Convert to array | |
| image_array = np.array(image) | |
| # Detection button | |
| if st.button("Analyze Disease", type="primary"): | |
| with st.spinner("Analyzing image..."): | |
| results = detector.detect_diseases_image(image_array, uploaded_file.name) | |
| if results: | |
| # Store results in session state | |
| st.session_state.current_results = results | |
| st.success("Analysis complete!") | |
| else: | |
| st.error("Analysis failed!") | |
| with col2: | |
| st.subheader("Detection Results") | |
| if 'current_results' in st.session_state and st.session_state.current_results: | |
| results = st.session_state.current_results | |
| # Display processed image if available | |
| if 'output_image' in results: | |
| output_image = results['output_image'] | |
| st.image(output_image, caption="Detection Results", use_column_width=True) | |
| # Download button for processed image | |
| is_success, im_buf_arr = cv2.imencode(".jpg", output_image) | |
| if is_success: | |
| byte_im = im_buf_arr.tobytes() | |
| st.download_button( | |
| label="Download Result", | |
| data=byte_im, | |
| file_name=f"detection_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", | |
| mime="image/jpeg" | |
| ) | |
| # Display formatted results | |
| formatted_results = detector.format_results_for_display(results) | |
| st.markdown(formatted_results, unsafe_allow_html=True) | |
| # Raw results expander | |
| with st.expander("View Raw Results"): | |
| st.json(results) | |
| else: | |
| st.info("Upload an image and click 'Analyze Disease' to see results here.") | |
| def batch_processing_mode(detector): | |
| """Batch processing for multiple images""" | |
| st.header("Batch Processing") | |
| st.info("Upload multiple images in a ZIP file for batch processing.") | |
| uploaded_zip = st.file_uploader( | |
| "Upload ZIP file containing images:", | |
| type=['zip'], | |
| help="ZIP file should contain .jpg, .jpeg, .png, .bmp, .tiff, or .webp images" | |
| ) | |
| if uploaded_zip is not None: | |
| if st.button("Process Batch", type="primary"): | |
| process_batch(detector, uploaded_zip) | |
| def process_batch(detector, uploaded_zip): | |
| """Process batch of images from ZIP file""" | |
| try: | |
| # Create temporary directory | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| # Extract ZIP file | |
| zip_path = os.path.join(temp_dir, "upload.zip") | |
| with open(zip_path, "wb") as f: | |
| f.write(uploaded_zip.getbuffer()) | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(temp_dir) | |
| # Find image files | |
| image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'] | |
| image_files = [] | |
| for root, dirs, files in os.walk(temp_dir): | |
| for file in files: | |
| if any(file.lower().endswith(ext) for ext in image_extensions): | |
| image_files.append(os.path.join(root, file)) | |
| if not image_files: | |
| st.error("No valid image files found in the ZIP archive.") | |
| return | |
| st.info(f"Found {len(image_files)} images to process.") | |
| # Process images | |
| results_list = [] | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| for i, image_path in enumerate(image_files): | |
| status_text.text(f"Processing {os.path.basename(image_path)}...") | |
| try: | |
| # Load and process image with proper file handling | |
| with Image.open(image_path) as img: | |
| # Convert image to array | |
| image_array = np.array(img) | |
| results = detector.detect_diseases_image(image_array, os.path.basename(image_path)) | |
| if results: | |
| results['filename'] = os.path.basename(image_path) | |
| results_list.append(results) | |
| else: | |
| st.warning(f"Failed to process: {os.path.basename(image_path)}") | |
| except Exception as e: | |
| st.warning(f"Error processing {os.path.basename(image_path)}: {str(e)}") | |
| progress_bar.progress((i + 1) / len(image_files)) | |
| status_text.text("Batch processing complete!") | |
| # Display results summary | |
| display_batch_results(results_list) | |
| except Exception as e: | |
| st.error(f"Error processing batch: {e}") | |
| def display_batch_results(results_list): | |
| """Display batch processing results""" | |
| st.subheader("Batch Results Summary") | |
| if not results_list: | |
| st.warning("No successful detections in batch.") | |
| return | |
| # Create summary statistics | |
| healthy_count = sum(1 for r in results_list if r.get('disease_level') == 'Healthy') | |
| diseased_count = len(results_list) - healthy_count | |
| avg_severity = np.mean([r.get('severity_percentage', 0) for r in results_list]) | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Images", len(results_list)) | |
| with col2: | |
| st.metric("Healthy", healthy_count, delta=f"{healthy_count/len(results_list)*100:.1f}%") | |
| with col3: | |
| st.metric("Diseased", diseased_count, delta=f"{diseased_count/len(results_list)*100:.1f}%") | |
| with col4: | |
| st.metric("Avg Severity", f"{avg_severity:.1f}%") | |
| # Detailed results table | |
| st.subheader("Detailed Results") | |
| # Create results dataframe | |
| table_data = [] | |
| for result in results_list: | |
| table_data.append({ | |
| 'Filename': result.get('filename', 'Unknown'), | |
| 'Status': result.get('disease_level', 'Unknown'), | |
| 'Severity (%)': f"{result.get('severity_percentage', 0):.1f}", | |
| 'Diseased Regions': result.get('num_diseased_regions', 0), | |
| 'Quality': get_quality_grade(result.get('severity_percentage', 0)) | |
| }) | |
| st.dataframe(table_data, use_container_width=True) | |
| # Download results as CSV | |
| if st.button("Download Results CSV"): | |
| import pandas as pd | |
| df = pd.DataFrame(table_data) | |
| csv = df.to_csv(index=False) | |
| st.download_button( | |
| label="Download CSV", | |
| data=csv, | |
| file_name=f"batch_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
| mime="text/csv" | |
| ) | |
| def get_quality_grade(severity): | |
| """Get quality grade based on severity""" | |
| if severity < 2: | |
| return "Premium" | |
| elif severity < 8: | |
| return "Good" | |
| elif severity < 20: | |
| return "Fair" | |
| elif severity < 40: | |
| return "Poor" | |
| else: | |
| return "Reject" | |
| def webcam_mode(detector): | |
| """Real-time webcam detection""" | |
| st.header("Real-time Webcam Detection") | |
| st.warning("Webcam detection requires additional setup and may not work in all deployment environments.") | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| st.subheader("Camera Controls") | |
| if st.button("Start Webcam"): | |
| st.info("Webcam functionality would require additional WebRTC setup for Streamlit deployment.") | |
| st.code(""" | |
| # For local development, you could use: | |
| import cv2 | |
| cap = cv2.VideoCapture(0) | |
| # This would need proper WebRTC integration | |
| # for Streamlit deployment | |
| """) | |
| # Camera settings | |
| st.selectbox("Camera Quality", ["High (720p)", "Medium (480p)", "Low (360p)"]) | |
| st.slider("Detection Frequency", 1, 30, 5, help="Analyze every Nth frame") | |
| with col2: | |
| st.subheader("Live Detection") | |
| st.info("Live webcam feed would appear here with real-time disease detection overlay.") | |
| # Placeholder for webcam feed | |
| st.image("https://via.placeholder.com/640x480/cccccc/666666?text=Webcam+Feed+Placeholder", | |
| caption="Live Camera Feed") | |
| def about_page(): | |
| """About page with system information""" | |
| st.header("About Mango Disease Detection System") | |
| st.markdown(""" | |
| ### System Overview | |
| This AI-powered system uses computer vision and semantic analysis to detect diseases in mango fruits. | |
| The system combines: | |
| - **Computer Vision**: Deep learning models for image analysis | |
| - **Semantic Reasoning**: Ontology-based knowledge inference | |
| - **Real-time Processing**: Fast detection suitable for commercial use | |
| ### Detection Capabilities | |
| The system can detect and classify: | |
| - **Healthy mangoes**: No visible disease symptoms | |
| - **Early disease**: Minor symptoms requiring monitoring | |
| - **Moderate/Severe disease**: Clear symptoms requiring treatment | |
| - **Critical disease**: Severe damage requiring disposal | |
| ### Analysis Features | |
| - **Disease Classification**: Specific disease type identification | |
| - **Severity Assessment**: Quantitative severity percentage | |
| - **Economic Impact**: Marketability scoring | |
| - **Treatment Recommendations**: AI-generated suggestions | |
| - **Quality Grading**: Commercial quality assessment | |
| ### Usage Modes | |
| 1. **Single Image**: Upload individual images for analysis | |
| 2. **Batch Processing**: Process multiple images in ZIP format | |
| 3. **Real-time Detection**: Live webcam analysis (requires setup) | |
| ### Technical Details | |
| - Built with Streamlit for web interface | |
| - Semantic analysis using OWL-RL reasoning | |
| - Computer vision with deep learning models | |
| - Supports common image formats (JPG, PNG, BMP, TIFF) | |
| ### Usage Tips | |
| - Use high-quality, well-lit images for best results | |
| - Ensure mango is clearly visible in the frame | |
| - Multiple angles can provide more comprehensive analysis | |
| - Regular monitoring helps track disease progression | |
| --- | |
| *For technical support or questions about the detection algorithms, | |
| please refer to the system documentation.* | |
| """) | |
| # System status | |
| st.subheader("System Status") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if ANALYZER_AVAILABLE: | |
| st.success("Analyzer Available") | |
| else: | |
| st.error("Analyzer Unavailable") | |
| with col2: | |
| st.info(f"Session: {datetime.now().strftime('%Y-%m-%d %H:%M')}") | |
| with col3: | |
| if 'analyzer' in st.session_state and st.session_state.analyzer: | |
| st.success("System Ready") | |
| else: | |
| st.warning("System Not Ready") | |
| if __name__ == "__main__": | |
| main() |