Spaces:
Running
Running
File size: 13,879 Bytes
36dd4e6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
"""
Risk Level Assessment for Crop Disease Detection
Calculates risk levels based on prediction confidence and disease severity
"""
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional
class RiskLevelCalculator:
"""Calculate risk levels for crop disease predictions"""
def __init__(self, knowledge_base_path='knowledge_base/disease_info.json'):
"""
Initialize risk calculator
Args:
knowledge_base_path: Path to disease knowledge base
"""
self.knowledge_base_path = knowledge_base_path
self.disease_info = self._load_disease_info()
# Disease severity mapping (based on agricultural impact)
self.disease_severity = {
# Corn diseases
'Corn___Cercospora_leaf_spot_Gray_leaf_spot': 'medium',
'Corn___Common_rust': 'medium',
'Corn___Northern_Leaf_Blight': 'high',
'Corn___healthy': 'none',
# Potato diseases
'Potato___Early_Blight': 'medium',
'Potato___Late_Blight': 'high', # Most destructive
'Potato___healthy': 'none',
# Tomato diseases
'Tomato___Bacterial_spot': 'medium',
'Tomato___Early_blight': 'medium',
'Tomato___Late_blight': 'high', # Very destructive
'Tomato___Leaf_Mold': 'low',
'Tomato___Septoria_leaf_spot': 'medium',
'Tomato___Spider_mites_Two_spotted_spider_mite': 'medium',
'Tomato___Target_Spot': 'medium',
'Tomato___Tomato_mosaic_virus': 'high', # Viral, no cure
'Tomato___Tomato_Yellow_Leaf_Curl_Virus': 'high', # Viral, devastating
'Tomato___healthy': 'none'
}
# Risk level thresholds
self.confidence_thresholds = {
'high': 0.8,
'medium': 0.5,
'low': 0.0
}
def _load_disease_info(self):
"""Load disease information from knowledge base"""
try:
with open(self.knowledge_base_path, 'r') as f:
data = json.load(f)
return {f"{d['crop']}___{d['disease']}": d for d in data['diseases']}
except FileNotFoundError:
print(f"Warning: Knowledge base not found at {self.knowledge_base_path}")
return {}
def calculate_base_risk(self, predicted_class: str, confidence: float) -> str:
"""
Calculate base risk level using confidence and disease severity
Args:
predicted_class: Predicted disease class
confidence: Model confidence (0-1)
Returns:
risk_level: 'Low', 'Medium', or 'High'
"""
# Handle healthy cases
if 'healthy' in predicted_class.lower():
return 'Low'
# Get disease severity
disease_severity = self.disease_severity.get(predicted_class, 'medium')
# Calculate risk based on confidence and severity
if confidence >= self.confidence_thresholds['high']:
# High confidence predictions
if disease_severity == 'high':
return 'High'
elif disease_severity == 'medium':
return 'Medium'
else:
return 'Low'
elif confidence >= self.confidence_thresholds['medium']:
# Medium confidence predictions
if disease_severity == 'high':
return 'High'
elif disease_severity == 'medium':
return 'Medium'
else:
return 'Low'
else:
# Low confidence predictions
if disease_severity == 'high':
return 'Medium'
else:
return 'Low'
def calculate_enhanced_risk(self, predicted_class: str, confidence: float,
weather_data: Optional[Dict] = None,
growth_stage: Optional[str] = None) -> Dict:
"""
Calculate enhanced risk level with environmental factors
Args:
predicted_class: Predicted disease class
confidence: Model confidence (0-1)
weather_data: Optional weather information
growth_stage: Optional crop growth stage
Returns:
risk_assessment: Detailed risk assessment
"""
# Base risk calculation
base_risk = self.calculate_base_risk(predicted_class, confidence)
# Initialize risk factors
risk_factors = []
risk_multiplier = 1.0
# Weather-based risk adjustment
if weather_data:
weather_risk, weather_factors = self._assess_weather_risk(
predicted_class, weather_data
)
risk_factors.extend(weather_factors)
risk_multiplier *= weather_risk
# Growth stage risk adjustment
if growth_stage:
stage_risk, stage_factors = self._assess_growth_stage_risk(
predicted_class, growth_stage
)
risk_factors.extend(stage_factors)
risk_multiplier *= stage_risk
# Calculate final risk level
final_risk = self._adjust_risk_level(base_risk, risk_multiplier)
return {
'risk_level': final_risk,
'base_risk': base_risk,
'confidence': confidence,
'disease_severity': self.disease_severity.get(predicted_class, 'unknown'),
'risk_factors': risk_factors,
'risk_multiplier': risk_multiplier,
'assessment_timestamp': datetime.now().isoformat(),
'recommendations': self._get_risk_recommendations(final_risk, predicted_class)
}
def _assess_weather_risk(self, predicted_class: str, weather_data: Dict) -> Tuple[float, List[str]]:
"""Assess weather-based risk factors"""
risk_multiplier = 1.0
factors = []
humidity = weather_data.get('humidity', 50)
temperature = weather_data.get('temperature', 25)
rainfall = weather_data.get('rainfall', 0)
# Disease-specific weather risk
if 'Late_Blight' in predicted_class or 'Late_blight' in predicted_class:
# Late blight thrives in cool, humid conditions
if humidity > 80 and temperature < 20:
risk_multiplier *= 1.5
factors.append("High humidity and cool temperature favor late blight")
if rainfall > 10:
risk_multiplier *= 1.3
factors.append("Recent rainfall increases late blight risk")
elif 'rust' in predicted_class.lower():
# Rust diseases favor cool, humid conditions
if humidity > 70 and 15 < temperature < 25:
risk_multiplier *= 1.4
factors.append("Cool, humid conditions favor rust development")
elif 'Early_Blight' in predicted_class or 'Early_blight' in predicted_class:
# Early blight thrives in warm, humid conditions
if humidity > 75 and temperature > 25:
risk_multiplier *= 1.4
factors.append("Warm, humid conditions favor early blight")
elif 'Spider_mites' in predicted_class:
# Spider mites thrive in hot, dry conditions
if humidity < 40 and temperature > 30:
risk_multiplier *= 1.6
factors.append("Hot, dry conditions favor spider mite infestations")
return risk_multiplier, factors
def _assess_growth_stage_risk(self, predicted_class: str, growth_stage: str) -> Tuple[float, List[str]]:
"""Assess growth stage-based risk factors"""
risk_multiplier = 1.0
factors = []
# Critical growth stages for different diseases
if growth_stage.lower() in ['flowering', 'fruit_development']:
if 'Late_Blight' in predicted_class or 'Late_blight' in predicted_class:
risk_multiplier *= 1.3
factors.append("Late blight is particularly damaging during flowering/fruiting")
elif 'virus' in predicted_class.lower():
risk_multiplier *= 1.4
factors.append("Viral infections during flowering severely impact yield")
elif growth_stage.lower() in ['seedling', 'early_vegetative']:
risk_multiplier *= 1.2
factors.append("Young plants are more vulnerable to disease damage")
return risk_multiplier, factors
def _adjust_risk_level(self, base_risk: str, multiplier: float) -> str:
"""Adjust risk level based on multiplier"""
risk_levels = ['Low', 'Medium', 'High']
current_index = risk_levels.index(base_risk)
if multiplier >= 1.5:
# Increase risk level
new_index = min(current_index + 1, len(risk_levels) - 1)
elif multiplier <= 0.7:
# Decrease risk level
new_index = max(current_index - 1, 0)
else:
new_index = current_index
return risk_levels[new_index]
def _get_risk_recommendations(self, risk_level: str, predicted_class: str) -> List[str]:
"""Get recommendations based on risk level"""
recommendations = []
if risk_level == 'High':
recommendations.extend([
"🚨 IMMEDIATE ACTION REQUIRED",
"Apply appropriate treatment immediately",
"Monitor field daily for disease spread",
"Consider emergency harvest if disease is severe",
"Consult agricultural extension services"
])
elif risk_level == 'Medium':
recommendations.extend([
"⚠️ MONITOR CLOSELY",
"Apply preventive treatments",
"Increase monitoring frequency",
"Prepare for potential treatment application",
"Check weather forecasts for favorable disease conditions"
])
else: # Low risk
recommendations.extend([
"✅ CONTINUE MONITORING",
"Maintain regular field inspections",
"Follow standard preventive practices",
"Keep treatment options ready"
])
# Add disease-specific recommendations
if 'healthy' not in predicted_class.lower():
disease_info = self.disease_info.get(predicted_class, {})
if 'solutions' in disease_info:
recommendations.extend(disease_info['solutions'][:3]) # Top 3 solutions
return recommendations
def get_risk_summary(self, predictions: List[Dict]) -> Dict:
"""
Generate risk summary for multiple predictions
Args:
predictions: List of prediction dictionaries
Returns:
summary: Risk summary across all predictions
"""
if not predictions:
return {'overall_risk': 'Low', 'total_predictions': 0}
risk_counts = {'High': 0, 'Medium': 0, 'Low': 0}
total_confidence = 0
diseases_detected = []
for pred in predictions:
risk_level = pred.get('risk_level', 'Low')
risk_counts[risk_level] += 1
total_confidence += pred.get('confidence', 0)
if 'healthy' not in pred.get('predicted_class', '').lower():
diseases_detected.append(pred.get('predicted_class', ''))
# Determine overall risk
if risk_counts['High'] > 0:
overall_risk = 'High'
elif risk_counts['Medium'] > 0:
overall_risk = 'Medium'
else:
overall_risk = 'Low'
return {
'overall_risk': overall_risk,
'risk_distribution': risk_counts,
'total_predictions': len(predictions),
'average_confidence': total_confidence / len(predictions),
'diseases_detected': len(set(diseases_detected)),
'unique_diseases': list(set(diseases_detected)),
'assessment_timestamp': datetime.now().isoformat()
}
def test_risk_calculator():
"""Test risk level calculator"""
print("🎯 Testing Risk Level Calculator...")
calculator = RiskLevelCalculator()
# Test cases
test_cases = [
('Potato___Late_Blight', 0.95),
('Tomato___healthy', 0.88),
('Corn___Northern_Leaf_Blight', 0.65),
('Tomato___Spider_mites_Two_spotted_spider_mite', 0.45)
]
print("\n📊 Risk Assessment Results:")
print("-" * 60)
for disease, confidence in test_cases:
# Basic risk assessment
basic_risk = calculator.calculate_base_risk(disease, confidence)
# Enhanced risk assessment with weather
weather_data = {
'humidity': 85,
'temperature': 18,
'rainfall': 15
}
enhanced_risk = calculator.calculate_enhanced_risk(
disease, confidence, weather_data, 'flowering'
)
print(f"Disease: {disease}")
print(f"Confidence: {confidence:.1%}")
print(f"Basic Risk: {basic_risk}")
print(f"Enhanced Risk: {enhanced_risk['risk_level']}")
print(f"Risk Factors: {len(enhanced_risk['risk_factors'])}")
print("-" * 60)
print("✅ Risk Level Calculator tested successfully!")
return True
if __name__ == "__main__":
test_risk_calculator()
|