""" Risk Level Assessment for Crop Disease Detection Calculates risk levels based on prediction confidence and disease severity """ import json from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Optional class RiskLevelCalculator: """Calculate risk levels for crop disease predictions""" def __init__(self, knowledge_base_path='knowledge_base/disease_info.json'): """ Initialize risk calculator Args: knowledge_base_path: Path to disease knowledge base """ self.knowledge_base_path = knowledge_base_path self.disease_info = self._load_disease_info() # Disease severity mapping (based on agricultural impact) self.disease_severity = { # Corn diseases 'Corn___Cercospora_leaf_spot_Gray_leaf_spot': 'medium', 'Corn___Common_rust': 'medium', 'Corn___Northern_Leaf_Blight': 'high', 'Corn___healthy': 'none', # Potato diseases 'Potato___Early_Blight': 'medium', 'Potato___Late_Blight': 'high', # Most destructive 'Potato___healthy': 'none', # Tomato diseases 'Tomato___Bacterial_spot': 'medium', 'Tomato___Early_blight': 'medium', 'Tomato___Late_blight': 'high', # Very destructive 'Tomato___Leaf_Mold': 'low', 'Tomato___Septoria_leaf_spot': 'medium', 'Tomato___Spider_mites_Two_spotted_spider_mite': 'medium', 'Tomato___Target_Spot': 'medium', 'Tomato___Tomato_mosaic_virus': 'high', # Viral, no cure 'Tomato___Tomato_Yellow_Leaf_Curl_Virus': 'high', # Viral, devastating 'Tomato___healthy': 'none' } # Risk level thresholds self.confidence_thresholds = { 'high': 0.8, 'medium': 0.5, 'low': 0.0 } def _load_disease_info(self): """Load disease information from knowledge base""" try: with open(self.knowledge_base_path, 'r') as f: data = json.load(f) return {f"{d['crop']}___{d['disease']}": d for d in data['diseases']} except FileNotFoundError: print(f"Warning: Knowledge base not found at {self.knowledge_base_path}") return {} def calculate_base_risk(self, predicted_class: str, confidence: float) -> str: """ Calculate base risk level using confidence and disease severity Args: predicted_class: Predicted disease class confidence: Model confidence (0-1) Returns: risk_level: 'Low', 'Medium', or 'High' """ # Handle healthy cases if 'healthy' in predicted_class.lower(): return 'Low' # Get disease severity disease_severity = self.disease_severity.get(predicted_class, 'medium') # Calculate risk based on confidence and severity if confidence >= self.confidence_thresholds['high']: # High confidence predictions if disease_severity == 'high': return 'High' elif disease_severity == 'medium': return 'Medium' else: return 'Low' elif confidence >= self.confidence_thresholds['medium']: # Medium confidence predictions if disease_severity == 'high': return 'High' elif disease_severity == 'medium': return 'Medium' else: return 'Low' else: # Low confidence predictions if disease_severity == 'high': return 'Medium' else: return 'Low' def calculate_enhanced_risk(self, predicted_class: str, confidence: float, weather_data: Optional[Dict] = None, growth_stage: Optional[str] = None) -> Dict: """ Calculate enhanced risk level with environmental factors Args: predicted_class: Predicted disease class confidence: Model confidence (0-1) weather_data: Optional weather information growth_stage: Optional crop growth stage Returns: risk_assessment: Detailed risk assessment """ # Base risk calculation base_risk = self.calculate_base_risk(predicted_class, confidence) # Initialize risk factors risk_factors = [] risk_multiplier = 1.0 # Weather-based risk adjustment if weather_data: weather_risk, weather_factors = self._assess_weather_risk( predicted_class, weather_data ) risk_factors.extend(weather_factors) risk_multiplier *= weather_risk # Growth stage risk adjustment if growth_stage: stage_risk, stage_factors = self._assess_growth_stage_risk( predicted_class, growth_stage ) risk_factors.extend(stage_factors) risk_multiplier *= stage_risk # Calculate final risk level final_risk = self._adjust_risk_level(base_risk, risk_multiplier) return { 'risk_level': final_risk, 'base_risk': base_risk, 'confidence': confidence, 'disease_severity': self.disease_severity.get(predicted_class, 'unknown'), 'risk_factors': risk_factors, 'risk_multiplier': risk_multiplier, 'assessment_timestamp': datetime.now().isoformat(), 'recommendations': self._get_risk_recommendations(final_risk, predicted_class) } def _assess_weather_risk(self, predicted_class: str, weather_data: Dict) -> Tuple[float, List[str]]: """Assess weather-based risk factors""" risk_multiplier = 1.0 factors = [] humidity = weather_data.get('humidity', 50) temperature = weather_data.get('temperature', 25) rainfall = weather_data.get('rainfall', 0) # Disease-specific weather risk if 'Late_Blight' in predicted_class or 'Late_blight' in predicted_class: # Late blight thrives in cool, humid conditions if humidity > 80 and temperature < 20: risk_multiplier *= 1.5 factors.append("High humidity and cool temperature favor late blight") if rainfall > 10: risk_multiplier *= 1.3 factors.append("Recent rainfall increases late blight risk") elif 'rust' in predicted_class.lower(): # Rust diseases favor cool, humid conditions if humidity > 70 and 15 < temperature < 25: risk_multiplier *= 1.4 factors.append("Cool, humid conditions favor rust development") elif 'Early_Blight' in predicted_class or 'Early_blight' in predicted_class: # Early blight thrives in warm, humid conditions if humidity > 75 and temperature > 25: risk_multiplier *= 1.4 factors.append("Warm, humid conditions favor early blight") elif 'Spider_mites' in predicted_class: # Spider mites thrive in hot, dry conditions if humidity < 40 and temperature > 30: risk_multiplier *= 1.6 factors.append("Hot, dry conditions favor spider mite infestations") return risk_multiplier, factors def _assess_growth_stage_risk(self, predicted_class: str, growth_stage: str) -> Tuple[float, List[str]]: """Assess growth stage-based risk factors""" risk_multiplier = 1.0 factors = [] # Critical growth stages for different diseases if growth_stage.lower() in ['flowering', 'fruit_development']: if 'Late_Blight' in predicted_class or 'Late_blight' in predicted_class: risk_multiplier *= 1.3 factors.append("Late blight is particularly damaging during flowering/fruiting") elif 'virus' in predicted_class.lower(): risk_multiplier *= 1.4 factors.append("Viral infections during flowering severely impact yield") elif growth_stage.lower() in ['seedling', 'early_vegetative']: risk_multiplier *= 1.2 factors.append("Young plants are more vulnerable to disease damage") return risk_multiplier, factors def _adjust_risk_level(self, base_risk: str, multiplier: float) -> str: """Adjust risk level based on multiplier""" risk_levels = ['Low', 'Medium', 'High'] current_index = risk_levels.index(base_risk) if multiplier >= 1.5: # Increase risk level new_index = min(current_index + 1, len(risk_levels) - 1) elif multiplier <= 0.7: # Decrease risk level new_index = max(current_index - 1, 0) else: new_index = current_index return risk_levels[new_index] def _get_risk_recommendations(self, risk_level: str, predicted_class: str) -> List[str]: """Get recommendations based on risk level""" recommendations = [] if risk_level == 'High': recommendations.extend([ "🚨 IMMEDIATE ACTION REQUIRED", "Apply appropriate treatment immediately", "Monitor field daily for disease spread", "Consider emergency harvest if disease is severe", "Consult agricultural extension services" ]) elif risk_level == 'Medium': recommendations.extend([ "āš ļø MONITOR CLOSELY", "Apply preventive treatments", "Increase monitoring frequency", "Prepare for potential treatment application", "Check weather forecasts for favorable disease conditions" ]) else: # Low risk recommendations.extend([ "āœ… CONTINUE MONITORING", "Maintain regular field inspections", "Follow standard preventive practices", "Keep treatment options ready" ]) # Add disease-specific recommendations if 'healthy' not in predicted_class.lower(): disease_info = self.disease_info.get(predicted_class, {}) if 'solutions' in disease_info: recommendations.extend(disease_info['solutions'][:3]) # Top 3 solutions return recommendations def get_risk_summary(self, predictions: List[Dict]) -> Dict: """ Generate risk summary for multiple predictions Args: predictions: List of prediction dictionaries Returns: summary: Risk summary across all predictions """ if not predictions: return {'overall_risk': 'Low', 'total_predictions': 0} risk_counts = {'High': 0, 'Medium': 0, 'Low': 0} total_confidence = 0 diseases_detected = [] for pred in predictions: risk_level = pred.get('risk_level', 'Low') risk_counts[risk_level] += 1 total_confidence += pred.get('confidence', 0) if 'healthy' not in pred.get('predicted_class', '').lower(): diseases_detected.append(pred.get('predicted_class', '')) # Determine overall risk if risk_counts['High'] > 0: overall_risk = 'High' elif risk_counts['Medium'] > 0: overall_risk = 'Medium' else: overall_risk = 'Low' return { 'overall_risk': overall_risk, 'risk_distribution': risk_counts, 'total_predictions': len(predictions), 'average_confidence': total_confidence / len(predictions), 'diseases_detected': len(set(diseases_detected)), 'unique_diseases': list(set(diseases_detected)), 'assessment_timestamp': datetime.now().isoformat() } def test_risk_calculator(): """Test risk level calculator""" print("šŸŽÆ Testing Risk Level Calculator...") calculator = RiskLevelCalculator() # Test cases test_cases = [ ('Potato___Late_Blight', 0.95), ('Tomato___healthy', 0.88), ('Corn___Northern_Leaf_Blight', 0.65), ('Tomato___Spider_mites_Two_spotted_spider_mite', 0.45) ] print("\nšŸ“Š Risk Assessment Results:") print("-" * 60) for disease, confidence in test_cases: # Basic risk assessment basic_risk = calculator.calculate_base_risk(disease, confidence) # Enhanced risk assessment with weather weather_data = { 'humidity': 85, 'temperature': 18, 'rainfall': 15 } enhanced_risk = calculator.calculate_enhanced_risk( disease, confidence, weather_data, 'flowering' ) print(f"Disease: {disease}") print(f"Confidence: {confidence:.1%}") print(f"Basic Risk: {basic_risk}") print(f"Enhanced Risk: {enhanced_risk['risk_level']}") print(f"Risk Factors: {len(enhanced_risk['risk_factors'])}") print("-" * 60) print("āœ… Risk Level Calculator tested successfully!") return True if __name__ == "__main__": test_risk_calculator()