File size: 13,879 Bytes
36dd4e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
"""
Risk Level Assessment for Crop Disease Detection
Calculates risk levels based on prediction confidence and disease severity
"""

import json
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional

class RiskLevelCalculator:
    """Calculate risk levels for crop disease predictions"""
    
    def __init__(self, knowledge_base_path='knowledge_base/disease_info.json'):
        """
        Initialize risk calculator
        
        Args:
            knowledge_base_path: Path to disease knowledge base
        """
        self.knowledge_base_path = knowledge_base_path
        self.disease_info = self._load_disease_info()
        
        # Disease severity mapping (based on agricultural impact)
        self.disease_severity = {
            # Corn diseases
            'Corn___Cercospora_leaf_spot_Gray_leaf_spot': 'medium',
            'Corn___Common_rust': 'medium',
            'Corn___Northern_Leaf_Blight': 'high',
            'Corn___healthy': 'none',
            
            # Potato diseases
            'Potato___Early_Blight': 'medium',
            'Potato___Late_Blight': 'high',  # Most destructive
            'Potato___healthy': 'none',
            
            # Tomato diseases
            'Tomato___Bacterial_spot': 'medium',
            'Tomato___Early_blight': 'medium',
            'Tomato___Late_blight': 'high',  # Very destructive
            'Tomato___Leaf_Mold': 'low',
            'Tomato___Septoria_leaf_spot': 'medium',
            'Tomato___Spider_mites_Two_spotted_spider_mite': 'medium',
            'Tomato___Target_Spot': 'medium',
            'Tomato___Tomato_mosaic_virus': 'high',  # Viral, no cure
            'Tomato___Tomato_Yellow_Leaf_Curl_Virus': 'high',  # Viral, devastating
            'Tomato___healthy': 'none'
        }
        
        # Risk level thresholds
        self.confidence_thresholds = {
            'high': 0.8,
            'medium': 0.5,
            'low': 0.0
        }
    
    def _load_disease_info(self):
        """Load disease information from knowledge base"""
        try:
            with open(self.knowledge_base_path, 'r') as f:
                data = json.load(f)
            return {f"{d['crop']}___{d['disease']}": d for d in data['diseases']}
        except FileNotFoundError:
            print(f"Warning: Knowledge base not found at {self.knowledge_base_path}")
            return {}
    
    def calculate_base_risk(self, predicted_class: str, confidence: float) -> str:
        """
        Calculate base risk level using confidence and disease severity
        
        Args:
            predicted_class: Predicted disease class
            confidence: Model confidence (0-1)
            
        Returns:
            risk_level: 'Low', 'Medium', or 'High'
        """
        # Handle healthy cases
        if 'healthy' in predicted_class.lower():
            return 'Low'
        
        # Get disease severity
        disease_severity = self.disease_severity.get(predicted_class, 'medium')
        
        # Calculate risk based on confidence and severity
        if confidence >= self.confidence_thresholds['high']:
            # High confidence predictions
            if disease_severity == 'high':
                return 'High'
            elif disease_severity == 'medium':
                return 'Medium'
            else:
                return 'Low'
        
        elif confidence >= self.confidence_thresholds['medium']:
            # Medium confidence predictions
            if disease_severity == 'high':
                return 'High'
            elif disease_severity == 'medium':
                return 'Medium'
            else:
                return 'Low'
        
        else:
            # Low confidence predictions
            if disease_severity == 'high':
                return 'Medium'
            else:
                return 'Low'
    
    def calculate_enhanced_risk(self, predicted_class: str, confidence: float,
                              weather_data: Optional[Dict] = None,
                              growth_stage: Optional[str] = None) -> Dict:
        """
        Calculate enhanced risk level with environmental factors
        
        Args:
            predicted_class: Predicted disease class
            confidence: Model confidence (0-1)
            weather_data: Optional weather information
            growth_stage: Optional crop growth stage
            
        Returns:
            risk_assessment: Detailed risk assessment
        """
        # Base risk calculation
        base_risk = self.calculate_base_risk(predicted_class, confidence)
        
        # Initialize risk factors
        risk_factors = []
        risk_multiplier = 1.0
        
        # Weather-based risk adjustment
        if weather_data:
            weather_risk, weather_factors = self._assess_weather_risk(
                predicted_class, weather_data
            )
            risk_factors.extend(weather_factors)
            risk_multiplier *= weather_risk
        
        # Growth stage risk adjustment
        if growth_stage:
            stage_risk, stage_factors = self._assess_growth_stage_risk(
                predicted_class, growth_stage
            )
            risk_factors.extend(stage_factors)
            risk_multiplier *= stage_risk
        
        # Calculate final risk level
        final_risk = self._adjust_risk_level(base_risk, risk_multiplier)
        
        return {
            'risk_level': final_risk,
            'base_risk': base_risk,
            'confidence': confidence,
            'disease_severity': self.disease_severity.get(predicted_class, 'unknown'),
            'risk_factors': risk_factors,
            'risk_multiplier': risk_multiplier,
            'assessment_timestamp': datetime.now().isoformat(),
            'recommendations': self._get_risk_recommendations(final_risk, predicted_class)
        }
    
    def _assess_weather_risk(self, predicted_class: str, weather_data: Dict) -> Tuple[float, List[str]]:
        """Assess weather-based risk factors"""
        risk_multiplier = 1.0
        factors = []
        
        humidity = weather_data.get('humidity', 50)
        temperature = weather_data.get('temperature', 25)
        rainfall = weather_data.get('rainfall', 0)
        
        # Disease-specific weather risk
        if 'Late_Blight' in predicted_class or 'Late_blight' in predicted_class:
            # Late blight thrives in cool, humid conditions
            if humidity > 80 and temperature < 20:
                risk_multiplier *= 1.5
                factors.append("High humidity and cool temperature favor late blight")
            if rainfall > 10:
                risk_multiplier *= 1.3
                factors.append("Recent rainfall increases late blight risk")
        
        elif 'rust' in predicted_class.lower():
            # Rust diseases favor cool, humid conditions
            if humidity > 70 and 15 < temperature < 25:
                risk_multiplier *= 1.4
                factors.append("Cool, humid conditions favor rust development")
        
        elif 'Early_Blight' in predicted_class or 'Early_blight' in predicted_class:
            # Early blight thrives in warm, humid conditions
            if humidity > 75 and temperature > 25:
                risk_multiplier *= 1.4
                factors.append("Warm, humid conditions favor early blight")
        
        elif 'Spider_mites' in predicted_class:
            # Spider mites thrive in hot, dry conditions
            if humidity < 40 and temperature > 30:
                risk_multiplier *= 1.6
                factors.append("Hot, dry conditions favor spider mite infestations")
        
        return risk_multiplier, factors
    
    def _assess_growth_stage_risk(self, predicted_class: str, growth_stage: str) -> Tuple[float, List[str]]:
        """Assess growth stage-based risk factors"""
        risk_multiplier = 1.0
        factors = []
        
        # Critical growth stages for different diseases
        if growth_stage.lower() in ['flowering', 'fruit_development']:
            if 'Late_Blight' in predicted_class or 'Late_blight' in predicted_class:
                risk_multiplier *= 1.3
                factors.append("Late blight is particularly damaging during flowering/fruiting")
            
            elif 'virus' in predicted_class.lower():
                risk_multiplier *= 1.4
                factors.append("Viral infections during flowering severely impact yield")
        
        elif growth_stage.lower() in ['seedling', 'early_vegetative']:
            risk_multiplier *= 1.2
            factors.append("Young plants are more vulnerable to disease damage")
        
        return risk_multiplier, factors
    
    def _adjust_risk_level(self, base_risk: str, multiplier: float) -> str:
        """Adjust risk level based on multiplier"""
        risk_levels = ['Low', 'Medium', 'High']
        current_index = risk_levels.index(base_risk)
        
        if multiplier >= 1.5:
            # Increase risk level
            new_index = min(current_index + 1, len(risk_levels) - 1)
        elif multiplier <= 0.7:
            # Decrease risk level
            new_index = max(current_index - 1, 0)
        else:
            new_index = current_index
        
        return risk_levels[new_index]
    
    def _get_risk_recommendations(self, risk_level: str, predicted_class: str) -> List[str]:
        """Get recommendations based on risk level"""
        recommendations = []
        
        if risk_level == 'High':
            recommendations.extend([
                "🚨 IMMEDIATE ACTION REQUIRED",
                "Apply appropriate treatment immediately",
                "Monitor field daily for disease spread",
                "Consider emergency harvest if disease is severe",
                "Consult agricultural extension services"
            ])
        
        elif risk_level == 'Medium':
            recommendations.extend([
                "⚠️ MONITOR CLOSELY",
                "Apply preventive treatments",
                "Increase monitoring frequency",
                "Prepare for potential treatment application",
                "Check weather forecasts for favorable disease conditions"
            ])
        
        else:  # Low risk
            recommendations.extend([
                "✅ CONTINUE MONITORING",
                "Maintain regular field inspections",
                "Follow standard preventive practices",
                "Keep treatment options ready"
            ])
        
        # Add disease-specific recommendations
        if 'healthy' not in predicted_class.lower():
            disease_info = self.disease_info.get(predicted_class, {})
            if 'solutions' in disease_info:
                recommendations.extend(disease_info['solutions'][:3])  # Top 3 solutions
        
        return recommendations
    
    def get_risk_summary(self, predictions: List[Dict]) -> Dict:
        """
        Generate risk summary for multiple predictions
        
        Args:
            predictions: List of prediction dictionaries
            
        Returns:
            summary: Risk summary across all predictions
        """
        if not predictions:
            return {'overall_risk': 'Low', 'total_predictions': 0}
        
        risk_counts = {'High': 0, 'Medium': 0, 'Low': 0}
        total_confidence = 0
        diseases_detected = []
        
        for pred in predictions:
            risk_level = pred.get('risk_level', 'Low')
            risk_counts[risk_level] += 1
            total_confidence += pred.get('confidence', 0)
            
            if 'healthy' not in pred.get('predicted_class', '').lower():
                diseases_detected.append(pred.get('predicted_class', ''))
        
        # Determine overall risk
        if risk_counts['High'] > 0:
            overall_risk = 'High'
        elif risk_counts['Medium'] > 0:
            overall_risk = 'Medium'
        else:
            overall_risk = 'Low'
        
        return {
            'overall_risk': overall_risk,
            'risk_distribution': risk_counts,
            'total_predictions': len(predictions),
            'average_confidence': total_confidence / len(predictions),
            'diseases_detected': len(set(diseases_detected)),
            'unique_diseases': list(set(diseases_detected)),
            'assessment_timestamp': datetime.now().isoformat()
        }

def test_risk_calculator():
    """Test risk level calculator"""
    print("🎯 Testing Risk Level Calculator...")
    
    calculator = RiskLevelCalculator()
    
    # Test cases
    test_cases = [
        ('Potato___Late_Blight', 0.95),
        ('Tomato___healthy', 0.88),
        ('Corn___Northern_Leaf_Blight', 0.65),
        ('Tomato___Spider_mites_Two_spotted_spider_mite', 0.45)
    ]
    
    print("\n📊 Risk Assessment Results:")
    print("-" * 60)
    
    for disease, confidence in test_cases:
        # Basic risk assessment
        basic_risk = calculator.calculate_base_risk(disease, confidence)
        
        # Enhanced risk assessment with weather
        weather_data = {
            'humidity': 85,
            'temperature': 18,
            'rainfall': 15
        }
        
        enhanced_risk = calculator.calculate_enhanced_risk(
            disease, confidence, weather_data, 'flowering'
        )
        
        print(f"Disease: {disease}")
        print(f"Confidence: {confidence:.1%}")
        print(f"Basic Risk: {basic_risk}")
        print(f"Enhanced Risk: {enhanced_risk['risk_level']}")
        print(f"Risk Factors: {len(enhanced_risk['risk_factors'])}")
        print("-" * 60)
    
    print("✅ Risk Level Calculator tested successfully!")
    return True

if __name__ == "__main__":
    test_risk_calculator()