import requests import json from flask import Flask, render_template, request, jsonify, Response, send_file from google import genai from gtts import gTTS import os from dotenv import load_dotenv from datetime import datetime, timedelta # Load .env file load_dotenv() app = Flask(__name__) app.config['AUDIO_FOLDER'] = 'static/audio' os.makedirs(app.config['AUDIO_FOLDER'], exist_ok=True) def markdown_to_html(text): """Convert markdown text to HTML for proper rendering.""" if not text: return text import markdown as md return md.markdown(text, extensions=['nl2br']) # --- ROBUST API KEY DETECTION --- def get_api_key(service_type): """ Intelligently fetches API keys by scanning multiple environment variables. - service_type="gemini": Looks for keys starting with "AIza" - service_type="nvidia": Looks for keys starting with "nvapi-" """ potential_vars = [ "GEMINI_API_KEY", "GEMINI_API_KEY_1", "GEMINI_API_KEY_2", "GEMINI_API", "NVIDIA_API_KEY", "NVIDIA_API" ] for var_name in potential_vars: val = os.getenv(var_name) if not val: continue if service_type == "gemini" and val.startswith("AIza"): print(f"DEBUG: Found Gemini Key in variable '{var_name}'") return val elif service_type == "nvidia" and val.startswith("nvapi-"): print(f"DEBUG: Found NVIDIA Key in variable '{var_name}'") return val return None # Initialize Clients gemini_key = get_api_key("gemini") nvidia_key = get_api_key("nvidia") gemini_client = None if gemini_key: print(f"Initializing Gemini Client with key: {gemini_key[:10]}...") try: gemini_client = genai.Client(api_key=gemini_key) except Exception as e: print(f"Error initializing Gemini Client: {e}") else: print("WARNING: No valid Gemini API Key (starting with 'AIza') found in environment variables.") def validate_coordinates(lat, lon): """Validate and convert latitude and longitude to float.""" try: return float(lat), float(lon) except (TypeError, ValueError): return None, None @app.route('/') def index(): return render_template('index.html') @app.route('/get_weather_data', methods=['GET']) def get_weather_data(): """ Fetch weather data using Open-Meteo's forecast endpoint. """ lat = request.args.get('lat') lon = request.args.get('lon') lat, lon = validate_coordinates(lat, lon) if lat is None or lon is None: return jsonify({"error": "Invalid coordinates"}), 400 try: forecast_url = "https://api.open-meteo.com/v1/forecast" forecast_params = { "latitude": lat, "longitude": lon, "current_weather": "true", "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum", "hourly": "relative_humidity_2m,soil_moisture_3_to_9cm,cloudcover,windspeed_10m", "timezone": "auto" } resp = requests.get(forecast_url, params=forecast_params) resp.raise_for_status() data = resp.json() daily = data.get("daily", {}) hourly = data.get("hourly", {}) current = data.get("current_weather", {}) # Daily data max_temp = daily.get("temperature_2m_max", [None])[0] min_temp = daily.get("temperature_2m_min", [None])[0] rain = daily.get("precipitation_sum", [None])[0] # Hourly data (averages) humidity_list = hourly.get("relative_humidity_2m", []) soil_list = hourly.get("soil_moisture_3_to_9cm", []) cloud_list = hourly.get("cloudcover", []) avg_humidity = sum(humidity_list) / len(humidity_list) if humidity_list else None avg_soil_moisture = sum(soil_list) / len(soil_list) if soil_list else None avg_cloud_cover = sum(cloud_list) / len(cloud_list) if cloud_list else None # Current weather current_temp = current.get("temperature") wind_speed = current.get("windspeed") weather = { "max_temp": max_temp, "min_temp": min_temp, "rainfall": rain, "humidity": avg_humidity, "soil_moisture": avg_soil_moisture, "current_temp": current_temp, "wind_speed": wind_speed, "cloud_cover": avg_cloud_cover } return jsonify(weather) except Exception as e: return jsonify({"error": str(e)}), 500 def get_historical_weather_summary(lat, lon, start_date_str, end_date_str): """ Fetches historical weather data from Open-Meteo Archive for the specified period. If period is in future, shifts to previous year. Returns a text summary of monthly averages and structured data list. """ try: if not start_date_str or not end_date_str: return "Weather data unavailable (dates missing).", [] start = datetime.strptime(start_date_str, '%Y-%m-%d') end = datetime.strptime(end_date_str, '%Y-%m-%d') today = datetime.now() # If start date is in future, use last year's data as proxy is_proxy = False if start > today: start = start.replace(year=start.year - 1) end = end.replace(year=end.year - 1) is_proxy = True # Clip end date if it's still in the future after adjustment if end > today: end = today # Call Open-Meteo Archive archive_url = "https://archive-api.open-meteo.com/v1/archive" params = { "latitude": lat, "longitude": lon, "start_date": start.strftime('%Y-%m-%d'), "end_date": end.strftime('%Y-%m-%d'), "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,relative_humidity_2m_mean", "timezone": "auto" } resp = requests.get(archive_url, params=params) if resp.status_code != 200: print(f"DEBUG: API Failed {resp.status_code} - {resp.text}") return f"Could not fetch weather data: {resp.status_code}", [] data = resp.json() daily = data.get("daily", {}) dates = daily.get("time", []) print(f"DEBUG: Retrieved {len(dates)} days.") if dates: print(f"DEBUG: Range {dates[0]} to {dates[-1]}") if not dates: return "No weather data available for this range.", [] summary_parts = [] structured_data = [] if is_proxy: summary_parts.append("(Note: Using last year's weather as proxy for future dates)") # Extract lists max_temps = daily.get("temperature_2m_max", []) humidities = daily.get("relative_humidity_2m_mean", []) rains = daily.get("precipitation_sum", []) # Monthly aggregation current_month = None temp_sum = 0 hum_sum = 0 rain_sum = 0 count = 0 month_days = [] for i, d_str in enumerate(dates): date_obj = datetime.strptime(d_str, '%Y-%m-%d') month_key = date_obj.strftime('%B %Y') # Accumulate values (handle None) t = max_temps[i] if i < len(max_temps) and max_temps[i] is not None else 0 h = humidities[i] if i < len(humidities) and humidities[i] is not None else 0 r = rains[i] if i < len(rains) and rains[i] is not None else 0 if current_month is None: current_month = month_key if month_key != current_month: # Flush previous month avg_t = temp_sum / count if count else 0 avg_h = hum_sum / count if count else 0 summary_parts.append( f"{current_month}: Avg Temp {avg_t:.1f}C, Rain {rain_sum:.1f}mm, Humidity {avg_h:.1f}%" ) structured_data.append({ "month": current_month, "avg_temp": round(avg_t, 1), "rainfall": round(rain_sum, 1), "humidity": round(avg_h, 1), "days": month_days }) # Reset for new month current_month = month_key temp_sum = 0 hum_sum = 0 rain_sum = 0 count = 0 month_days = [] # Accumulate temp_sum += t hum_sum += h rain_sum += r count += 1 month_days.append({ "date": d_str, "temp": t, "humidity": h, "rain": r }) # Flush last month if count > 0: avg_t = temp_sum / count avg_h = hum_sum / count summary_parts.append( f"{current_month}: Avg Temp {avg_t:.1f}C, Rain {rain_sum:.1f}mm, Humidity {avg_h:.1f}%" ) structured_data.append({ "month": current_month, "avg_temp": round(avg_t, 1), "rainfall": round(rain_sum, 1), "humidity": round(avg_h, 1), "days": month_days }) return "\n".join(summary_parts), structured_data except Exception as e: print(f"Weather Fetch Error: {e}") return "Weather data processing failed.", [] def calculate_season_dates(season_name): """ Derives standard sowing and harvest dates based on the Indian agricultural season. """ today = datetime.today() current_year = today.year # Defaults sowing_date = today harvest_date = today + timedelta(days=120) try: if season_name == "Kharif": # June 15 to Oct 15 sowing_date = datetime(current_year, 6, 15) harvest_date = datetime(current_year, 10, 15) if today.month > 10: sowing_date = datetime(current_year + 1, 6, 15) harvest_date = datetime(current_year + 1, 10, 15) elif season_name == "Rabi": # Nov 1 to April 1 (crosses year boundary) if today.month > 4: # Predicting for upcoming Rabi sowing_date = datetime(current_year, 11, 1) harvest_date = datetime(current_year + 1, 4, 1) else: # We are IN Rabi or just past it sowing_date = datetime(current_year - 1, 11, 1) harvest_date = datetime(current_year, 4, 1) elif season_name == "Zaid": # March 1 to June 1 sowing_date = datetime(current_year, 3, 1) harvest_date = datetime(current_year, 6, 1) if today.month > 6: sowing_date = datetime(current_year + 1, 3, 1) harvest_date = datetime(current_year + 1, 6, 1) elif season_name == "Annual": # 1-year cycle from today sowing_date = today harvest_date = today + timedelta(days=365) except Exception as e: print(f"Date Calc Error: {e}") return sowing_date.strftime('%Y-%m-%d'), harvest_date.strftime('%Y-%m-%d') def call_gemini_api(input_data, language): """ Calls the Gemini API to get a pest outbreak report in structured JSON format. Implements fallback mechanism to try multiple models if primary fails, then falls back to NVIDIA models if all Gemini attempts fail. """ # 1. Determine dates from season lat = input_data.get('latitude') lon = input_data.get('longitude') season = input_data.get('season') if season: sowing, harvest = calculate_season_dates(season) else: sowing = input_data.get('sowing_date', datetime.today().strftime('%Y-%m-%d')) harvest = input_data.get('harvest_date', (datetime.today() + timedelta(days=120)).strftime('%Y-%m-%d')) print(f"Analysis Period: {season} ({sowing} to {harvest})") # 2. Fetch historical/seasonal weather profile weather_summary, weather_profile = get_historical_weather_summary(lat, lon, sowing, harvest) print(f"Generated Weather Summary: {weather_summary[:100]}...") prompt = f""" You are an expert Agricultural Entomologist. Analyze the provided inputs and the MONTH-WISE WEATHER PROFILE to generate a precise Pest Outbreak Prediction. INPUTS: - Crop: {input_data.get('crop_type')} - Soil: {input_data.get('soil_type')} - Season: {season} ({sowing} to {harvest}) - Location: {input_data.get('derived_location', 'Unknown')} - Current Growth Stage: {input_data.get('growth_stage')} - Irrigation Frequency: {input_data.get('irrigation_freq')} - Irrigation Method: {input_data.get('irrigation_method')} - Location Coordinates: Lat {lat}, Lon {lon} WEATHER PROFILE (Month-by-Month): {weather_summary} CRITICAL INSTRUCTIONS: 1. ANALYZE EACH MONTH SEPARATELY. Example: High Rainfall in a month -> Risk of Fungal Diseases. 2. MULTIPLE PESTS: If a month has multiple risky pests, create SEPARATE entries for each pest in the table. 3. LANGUAGE RULE: JSON KEYS must remain in English. Only translate VALUES into '{language}'. 4. ACCURACY: Only predict pests that genuinely thrive in the given weather conditions. 5. MONTH NAMING: The 'outbreak_months' field MUST contain the exact full English month names (e.g., "June", "July"). Your response MUST be a single, valid JSON object and nothing else. Do not wrap it in markdown backticks. Required JSON structure: {{ "report_title": "Pest Outbreak Dashboard Report", "location_info": {{ "latitude": "{lat}", "longitude": "{lon}", "derived_location": "A human-readable location derived from the coordinates (e.g., 'Nagpur, India')." }}, "agricultural_inputs_analysis": "A detailed bullet-point analysis of how the chosen Season and Weather Profile impacts this specific crop.", "pest_prediction_table": [ {{ "pest_name": "Name of the predicted pest", "outbreak_months": "Predicted month(s) for the outbreak", "severity": "Predicted severity level (e.g., Low, Medium, High)", "impacting_stage": "The specific crop stage affected (e.g., Flowering, Vegetative)", "potential_damage": "Short description of the damage caused.", "precautionary_measures": "A short description of key precautionary measures." }} ], "pest_avoidance_practices": [ "A detailed, specific pest avoidance practice based on the inputs.", "Provide 10-12 detailed bullet points." ], "agricultural_best_practices": [ "A specific agricultural best practice based on the inputs.", "Another specific recommendation related to crop management." ], "predicted_pest_damage_info": "Detailed bullet points (markdown format using -) describing the potential damage the predicted pests could cause." }} """ models_to_try = [ "gemini-2.5-flash", "gemini-2.5-flash-lite", ] report_data = {} # 3. Try Gemini models first if gemini_client: for model_name in models_to_try: try: print(f"DEBUG: Attempting Gemini model {model_name}...") response = gemini_client.models.generate_content( model=model_name, contents=prompt ) if response and response.text: raw_text = response.text start_idx = raw_text.find('{') end_idx = raw_text.rfind('}') + 1 if start_idx != -1 and end_idx > start_idx: json_text = raw_text[start_idx:end_idx] report_data = json.loads(json_text) print(f"DEBUG: Successfully parsed JSON from Gemini {model_name}") return report_data, weather_profile # SUCCESS except Exception as e: print(f"CRITICAL: Error calling Gemini {model_name}: {e}") continue else: print("DEBUG: Gemini client not initialized. Skipping Gemini models.") # 4. NVIDIA Fallback (if Gemini fails) if not report_data and nvidia_key: try: from openai import OpenAI nv_client = OpenAI( base_url="https://integrate.api.nvidia.com/v1", api_key=nvidia_key ) nvidia_models = [ "meta/llama-3.1-405b-instruct", "meta/llama-3.1-70b-instruct", "google/gemma-3-27b-it", "meta/llama-3.1-8b-instruct" ] for model_name in nvidia_models: try: print(f"DEBUG: Attempting NVIDIA model {model_name}...") response = nv_client.chat.completions.create( model=model_name, messages=[{"role": "user", "content": prompt}], temperature=0.2, max_tokens=2048 ) raw_text = response.choices[0].message.content start_idx = raw_text.find('{') end_idx = raw_text.rfind('}') + 1 if start_idx != -1 and end_idx > start_idx: json_text = raw_text[start_idx:end_idx] report_data = json.loads(json_text) print(f"DEBUG: Successfully parsed JSON from NVIDIA {model_name}") return report_data, weather_profile # SUCCESS except Exception as ne: print(f"CRITICAL: Error calling NVIDIA {model_name}: {ne}") continue except Exception as e: print(f"CRITICAL: NVIDIA client setup failed: {e}") elif not report_data: print("DEBUG: No valid report and NVIDIA key not found/valid.") if not report_data: print("DEBUG: All models failed to generate valid report.") return { "error": "Failed to generate a valid report from the AI model. All fallback models failed. Please try again later." }, [] return report_data, weather_profile @app.route('/predict', methods=['POST']) def predict(): print("----- PREDICT ROUTE HIT -----") form_data = request.form.to_dict() print(f"Form Data Received: {form_data}") language = form_data.get("language", "English") report_data, weather_profile = call_gemini_api(form_data, language) # Handle error from AI call if "error" in report_data: return render_template( 'results.html', report_data={"report_title": "Error", "predicted_pest_damage_info": report_data['error']}, location={}, current_date=datetime.now().strftime("%B %d, %Y"), audio_url=None, language_code="en", weather_profile=[] ) location = report_data.get('location_info', {}) # Convert markdown fields to HTML if 'agricultural_inputs_analysis' in report_data: report_data['agricultural_inputs_analysis'] = markdown_to_html(report_data['agricultural_inputs_analysis']) if 'predicted_pest_damage_info' in report_data: report_data['predicted_pest_damage_info'] = markdown_to_html(report_data['predicted_pest_damage_info']) if 'pest_prediction_table' in report_data: for pest in report_data['pest_prediction_table']: if 'precautionary_measures' in pest: pest['precautionary_measures'] = markdown_to_html(pest['precautionary_measures']) # Generate TTS summary pest_table = report_data.get('pest_prediction_table', []) summary = f"Pest Outbreak Report for {location.get('derived_location', 'your location')}. " summary += (report_data.get('agricultural_inputs_analysis', '')[:200] + "... ") if pest_table: summary += "Predicted pests: " + ', '.join([p.get('pest_name', '') for p in pest_table]) + ". " summary += "Severity: " + ', '.join([p.get('severity', '') for p in pest_table]) + ". " summary += report_data.get('predicted_pest_damage_info', '')[:200] # Language code mapping for gTTS lang_mapping = { "English": "en", "Hindi": "hi", "Bengali": "bn", "Telugu": "te", "Marathi": "mr", "Tamil": "ta", "Gujarati": "gu", "Urdu": "ur", "Kannada": "kn", "Odia": "or", "Malayalam": "ml" } gtts_lang = lang_mapping.get(language, 'en') audio_url = None try: tts = gTTS(summary, lang=gtts_lang) safe_lat = str(location.get('latitude', '0')).replace('.', '_') safe_lon = str(location.get('longitude', '0')).replace('.', '_') audio_filename = f"pest_report_{safe_lat}_{safe_lon}.mp3" os.makedirs(app.config['AUDIO_FOLDER'], exist_ok=True) audio_path = os.path.join(app.config['AUDIO_FOLDER'], audio_filename) tts.save(audio_path) audio_url = f"/static/audio/{audio_filename}" except Exception as e: print(f"Error generating audio: {e}") return render_template( 'results.html', report_data=report_data, location=location, current_date=datetime.now().strftime("%B %d, %Y"), audio_url=audio_url, language_code=gtts_lang, weather_profile=weather_profile ) @app.route('/get_timeline_weather', methods=['GET']) def get_timeline_weather(): """Returns the seasonal weather profile for the frontend timeline.""" lat = request.args.get('lat') lon = request.args.get('lon') start = request.args.get('start_date') end = request.args.get('end_date') if not all([lat, lon, start, end]): return jsonify({"error": "Missing parameters"}), 400 _, profile = get_historical_weather_summary(lat, lon, start, end) return jsonify(profile) if __name__ == '__main__': app.run(debug=True, port=5001)