Spaces:
Sleeping
Sleeping
| import requests | |
| import json | |
| from flask import Flask, render_template, request, jsonify, Response, send_file | |
| from google import genai | |
| from gtts import gTTS | |
| import os | |
| from dotenv import load_dotenv | |
| from datetime import datetime, timedelta | |
| # Load .env file | |
| load_dotenv() | |
| app = Flask(__name__) | |
| app.config['AUDIO_FOLDER'] = 'static/audio' | |
| os.makedirs(app.config['AUDIO_FOLDER'], exist_ok=True) | |
| def markdown_to_html(text): | |
| """Convert markdown text to HTML for proper rendering.""" | |
| if not text: | |
| return text | |
| import markdown as md | |
| return md.markdown(text, extensions=['nl2br']) | |
| # --- ROBUST API KEY DETECTION --- | |
| def get_api_key(service_type): | |
| """ | |
| Intelligently fetches API keys by scanning multiple environment variables. | |
| - service_type="gemini": Looks for keys starting with "AIza" | |
| - service_type="nvidia": Looks for keys starting with "nvapi-" | |
| """ | |
| potential_vars = [ | |
| "GEMINI_API_KEY", "GEMINI_API_KEY_1", "GEMINI_API_KEY_2", "GEMINI_API", | |
| "NVIDIA_API_KEY", "NVIDIA_API" | |
| ] | |
| for var_name in potential_vars: | |
| val = os.getenv(var_name) | |
| if not val: | |
| continue | |
| if service_type == "gemini" and val.startswith("AIza"): | |
| print(f"DEBUG: Found Gemini Key in variable '{var_name}'") | |
| return val | |
| elif service_type == "nvidia" and val.startswith("nvapi-"): | |
| print(f"DEBUG: Found NVIDIA Key in variable '{var_name}'") | |
| return val | |
| return None | |
| # Initialize Clients | |
| gemini_key = get_api_key("gemini") | |
| nvidia_key = get_api_key("nvidia") | |
| gemini_client = None | |
| if gemini_key: | |
| print(f"Initializing Gemini Client with key: {gemini_key[:10]}...") | |
| try: | |
| gemini_client = genai.Client(api_key=gemini_key) | |
| except Exception as e: | |
| print(f"Error initializing Gemini Client: {e}") | |
| else: | |
| print("WARNING: No valid Gemini API Key (starting with 'AIza') found in environment variables.") | |
| def validate_coordinates(lat, lon): | |
| """Validate and convert latitude and longitude to float.""" | |
| try: | |
| return float(lat), float(lon) | |
| except (TypeError, ValueError): | |
| return None, None | |
| def index(): | |
| return render_template('index.html') | |
| def get_weather_data(): | |
| """ | |
| Fetch weather data using Open-Meteo's forecast endpoint. | |
| """ | |
| lat = request.args.get('lat') | |
| lon = request.args.get('lon') | |
| lat, lon = validate_coordinates(lat, lon) | |
| if lat is None or lon is None: | |
| return jsonify({"error": "Invalid coordinates"}), 400 | |
| try: | |
| forecast_url = "https://api.open-meteo.com/v1/forecast" | |
| forecast_params = { | |
| "latitude": lat, | |
| "longitude": lon, | |
| "current_weather": "true", | |
| "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum", | |
| "hourly": "relative_humidity_2m,soil_moisture_3_to_9cm,cloudcover,windspeed_10m", | |
| "timezone": "auto" | |
| } | |
| resp = requests.get(forecast_url, params=forecast_params) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| daily = data.get("daily", {}) | |
| hourly = data.get("hourly", {}) | |
| current = data.get("current_weather", {}) | |
| # Daily data | |
| max_temp = daily.get("temperature_2m_max", [None])[0] | |
| min_temp = daily.get("temperature_2m_min", [None])[0] | |
| rain = daily.get("precipitation_sum", [None])[0] | |
| # Hourly data (averages) | |
| humidity_list = hourly.get("relative_humidity_2m", []) | |
| soil_list = hourly.get("soil_moisture_3_to_9cm", []) | |
| cloud_list = hourly.get("cloudcover", []) | |
| avg_humidity = sum(humidity_list) / len(humidity_list) if humidity_list else None | |
| avg_soil_moisture = sum(soil_list) / len(soil_list) if soil_list else None | |
| avg_cloud_cover = sum(cloud_list) / len(cloud_list) if cloud_list else None | |
| # Current weather | |
| current_temp = current.get("temperature") | |
| wind_speed = current.get("windspeed") | |
| weather = { | |
| "max_temp": max_temp, | |
| "min_temp": min_temp, | |
| "rainfall": rain, | |
| "humidity": avg_humidity, | |
| "soil_moisture": avg_soil_moisture, | |
| "current_temp": current_temp, | |
| "wind_speed": wind_speed, | |
| "cloud_cover": avg_cloud_cover | |
| } | |
| return jsonify(weather) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def get_historical_weather_summary(lat, lon, start_date_str, end_date_str): | |
| """ | |
| Fetches historical weather data from Open-Meteo Archive for the specified period. | |
| If period is in future, shifts to previous year. | |
| Returns a text summary of monthly averages and structured data list. | |
| """ | |
| try: | |
| if not start_date_str or not end_date_str: | |
| return "Weather data unavailable (dates missing).", [] | |
| start = datetime.strptime(start_date_str, '%Y-%m-%d') | |
| end = datetime.strptime(end_date_str, '%Y-%m-%d') | |
| today = datetime.now() | |
| # If start date is in future, use last year's data as proxy | |
| is_proxy = False | |
| if start > today: | |
| start = start.replace(year=start.year - 1) | |
| end = end.replace(year=end.year - 1) | |
| is_proxy = True | |
| # Clip end date if it's still in the future after adjustment | |
| if end > today: | |
| end = today | |
| # Call Open-Meteo Archive | |
| archive_url = "https://archive-api.open-meteo.com/v1/archive" | |
| params = { | |
| "latitude": lat, | |
| "longitude": lon, | |
| "start_date": start.strftime('%Y-%m-%d'), | |
| "end_date": end.strftime('%Y-%m-%d'), | |
| "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,relative_humidity_2m_mean", | |
| "timezone": "auto" | |
| } | |
| resp = requests.get(archive_url, params=params) | |
| if resp.status_code != 200: | |
| print(f"DEBUG: API Failed {resp.status_code} - {resp.text}") | |
| return f"Could not fetch weather data: {resp.status_code}", [] | |
| data = resp.json() | |
| daily = data.get("daily", {}) | |
| dates = daily.get("time", []) | |
| print(f"DEBUG: Retrieved {len(dates)} days.") | |
| if dates: | |
| print(f"DEBUG: Range {dates[0]} to {dates[-1]}") | |
| if not dates: | |
| return "No weather data available for this range.", [] | |
| summary_parts = [] | |
| structured_data = [] | |
| if is_proxy: | |
| summary_parts.append("(Note: Using last year's weather as proxy for future dates)") | |
| # Extract lists | |
| max_temps = daily.get("temperature_2m_max", []) | |
| humidities = daily.get("relative_humidity_2m_mean", []) | |
| rains = daily.get("precipitation_sum", []) | |
| # Monthly aggregation | |
| current_month = None | |
| temp_sum = 0 | |
| hum_sum = 0 | |
| rain_sum = 0 | |
| count = 0 | |
| month_days = [] | |
| for i, d_str in enumerate(dates): | |
| date_obj = datetime.strptime(d_str, '%Y-%m-%d') | |
| month_key = date_obj.strftime('%B %Y') | |
| # Accumulate values (handle None) | |
| t = max_temps[i] if i < len(max_temps) and max_temps[i] is not None else 0 | |
| h = humidities[i] if i < len(humidities) and humidities[i] is not None else 0 | |
| r = rains[i] if i < len(rains) and rains[i] is not None else 0 | |
| if current_month is None: | |
| current_month = month_key | |
| if month_key != current_month: | |
| # Flush previous month | |
| avg_t = temp_sum / count if count else 0 | |
| avg_h = hum_sum / count if count else 0 | |
| summary_parts.append( | |
| f"{current_month}: Avg Temp {avg_t:.1f}C, Rain {rain_sum:.1f}mm, Humidity {avg_h:.1f}%" | |
| ) | |
| structured_data.append({ | |
| "month": current_month, | |
| "avg_temp": round(avg_t, 1), | |
| "rainfall": round(rain_sum, 1), | |
| "humidity": round(avg_h, 1), | |
| "days": month_days | |
| }) | |
| # Reset for new month | |
| current_month = month_key | |
| temp_sum = 0 | |
| hum_sum = 0 | |
| rain_sum = 0 | |
| count = 0 | |
| month_days = [] | |
| # Accumulate | |
| temp_sum += t | |
| hum_sum += h | |
| rain_sum += r | |
| count += 1 | |
| month_days.append({ | |
| "date": d_str, | |
| "temp": t, | |
| "humidity": h, | |
| "rain": r | |
| }) | |
| # Flush last month | |
| if count > 0: | |
| avg_t = temp_sum / count | |
| avg_h = hum_sum / count | |
| summary_parts.append( | |
| f"{current_month}: Avg Temp {avg_t:.1f}C, Rain {rain_sum:.1f}mm, Humidity {avg_h:.1f}%" | |
| ) | |
| structured_data.append({ | |
| "month": current_month, | |
| "avg_temp": round(avg_t, 1), | |
| "rainfall": round(rain_sum, 1), | |
| "humidity": round(avg_h, 1), | |
| "days": month_days | |
| }) | |
| return "\n".join(summary_parts), structured_data | |
| except Exception as e: | |
| print(f"Weather Fetch Error: {e}") | |
| return "Weather data processing failed.", [] | |
| def calculate_season_dates(season_name): | |
| """ | |
| Derives standard sowing and harvest dates based on the Indian agricultural season. | |
| """ | |
| today = datetime.today() | |
| current_year = today.year | |
| # Defaults | |
| sowing_date = today | |
| harvest_date = today + timedelta(days=120) | |
| try: | |
| if season_name == "Kharif": | |
| # June 15 to Oct 15 | |
| sowing_date = datetime(current_year, 6, 15) | |
| harvest_date = datetime(current_year, 10, 15) | |
| if today.month > 10: | |
| sowing_date = datetime(current_year + 1, 6, 15) | |
| harvest_date = datetime(current_year + 1, 10, 15) | |
| elif season_name == "Rabi": | |
| # Nov 1 to April 1 (crosses year boundary) | |
| if today.month > 4: | |
| # Predicting for upcoming Rabi | |
| sowing_date = datetime(current_year, 11, 1) | |
| harvest_date = datetime(current_year + 1, 4, 1) | |
| else: | |
| # We are IN Rabi or just past it | |
| sowing_date = datetime(current_year - 1, 11, 1) | |
| harvest_date = datetime(current_year, 4, 1) | |
| elif season_name == "Zaid": | |
| # March 1 to June 1 | |
| sowing_date = datetime(current_year, 3, 1) | |
| harvest_date = datetime(current_year, 6, 1) | |
| if today.month > 6: | |
| sowing_date = datetime(current_year + 1, 3, 1) | |
| harvest_date = datetime(current_year + 1, 6, 1) | |
| elif season_name == "Annual": | |
| # 1-year cycle from today | |
| sowing_date = today | |
| harvest_date = today + timedelta(days=365) | |
| except Exception as e: | |
| print(f"Date Calc Error: {e}") | |
| return sowing_date.strftime('%Y-%m-%d'), harvest_date.strftime('%Y-%m-%d') | |
| def call_gemini_api(input_data, language): | |
| """ | |
| Calls the Gemini API to get a pest outbreak report in structured JSON format. | |
| Implements fallback mechanism to try multiple models if primary fails, | |
| then falls back to NVIDIA models if all Gemini attempts fail. | |
| """ | |
| # 1. Determine dates from season | |
| lat = input_data.get('latitude') | |
| lon = input_data.get('longitude') | |
| season = input_data.get('season') | |
| if season: | |
| sowing, harvest = calculate_season_dates(season) | |
| else: | |
| sowing = input_data.get('sowing_date', datetime.today().strftime('%Y-%m-%d')) | |
| harvest = input_data.get('harvest_date', (datetime.today() + timedelta(days=120)).strftime('%Y-%m-%d')) | |
| print(f"Analysis Period: {season} ({sowing} to {harvest})") | |
| # 2. Fetch historical/seasonal weather profile | |
| weather_summary, weather_profile = get_historical_weather_summary(lat, lon, sowing, harvest) | |
| print(f"Generated Weather Summary: {weather_summary[:100]}...") | |
| prompt = f""" | |
| You are an expert Agricultural Entomologist. Analyze the provided inputs and the MONTH-WISE WEATHER PROFILE to generate a precise Pest Outbreak Prediction. | |
| INPUTS: | |
| - Crop: {input_data.get('crop_type')} | |
| - Soil: {input_data.get('soil_type')} | |
| - Season: {season} ({sowing} to {harvest}) | |
| - Location: {input_data.get('derived_location', 'Unknown')} | |
| - Current Growth Stage: {input_data.get('growth_stage')} | |
| - Irrigation Frequency: {input_data.get('irrigation_freq')} | |
| - Irrigation Method: {input_data.get('irrigation_method')} | |
| - Location Coordinates: Lat {lat}, Lon {lon} | |
| WEATHER PROFILE (Month-by-Month): | |
| {weather_summary} | |
| CRITICAL INSTRUCTIONS: | |
| 1. ANALYZE EACH MONTH SEPARATELY. Example: High Rainfall in a month -> Risk of Fungal Diseases. | |
| 2. MULTIPLE PESTS: If a month has multiple risky pests, create SEPARATE entries for each pest in the table. | |
| 3. LANGUAGE RULE: JSON KEYS must remain in English. Only translate VALUES into '{language}'. | |
| 4. ACCURACY: Only predict pests that genuinely thrive in the given weather conditions. | |
| 5. MONTH NAMING: The 'outbreak_months' field MUST contain the exact full English month names (e.g., "June", "July"). | |
| Your response MUST be a single, valid JSON object and nothing else. Do not wrap it in markdown backticks. | |
| Required JSON structure: | |
| {{ | |
| "report_title": "Pest Outbreak Dashboard Report", | |
| "location_info": {{ | |
| "latitude": "{lat}", | |
| "longitude": "{lon}", | |
| "derived_location": "A human-readable location derived from the coordinates (e.g., 'Nagpur, India')." | |
| }}, | |
| "agricultural_inputs_analysis": "A detailed bullet-point analysis of how the chosen Season and Weather Profile impacts this specific crop.", | |
| "pest_prediction_table": [ | |
| {{ | |
| "pest_name": "Name of the predicted pest", | |
| "outbreak_months": "Predicted month(s) for the outbreak", | |
| "severity": "Predicted severity level (e.g., Low, Medium, High)", | |
| "impacting_stage": "The specific crop stage affected (e.g., Flowering, Vegetative)", | |
| "potential_damage": "Short description of the damage caused.", | |
| "precautionary_measures": "A short description of key precautionary measures." | |
| }} | |
| ], | |
| "pest_avoidance_practices": [ | |
| "A detailed, specific pest avoidance practice based on the inputs.", | |
| "Provide 10-12 detailed bullet points." | |
| ], | |
| "agricultural_best_practices": [ | |
| "A specific agricultural best practice based on the inputs.", | |
| "Another specific recommendation related to crop management." | |
| ], | |
| "predicted_pest_damage_info": "Detailed bullet points (markdown format using -) describing the potential damage the predicted pests could cause." | |
| }} | |
| """ | |
| models_to_try = [ | |
| "gemini-2.5-flash", | |
| "gemini-2.5-flash-lite", | |
| ] | |
| report_data = {} | |
| # 3. Try Gemini models first | |
| if gemini_client: | |
| for model_name in models_to_try: | |
| try: | |
| print(f"DEBUG: Attempting Gemini model {model_name}...") | |
| response = gemini_client.models.generate_content( | |
| model=model_name, | |
| contents=prompt | |
| ) | |
| if response and response.text: | |
| raw_text = response.text | |
| start_idx = raw_text.find('{') | |
| end_idx = raw_text.rfind('}') + 1 | |
| if start_idx != -1 and end_idx > start_idx: | |
| json_text = raw_text[start_idx:end_idx] | |
| report_data = json.loads(json_text) | |
| print(f"DEBUG: Successfully parsed JSON from Gemini {model_name}") | |
| return report_data, weather_profile # SUCCESS | |
| except Exception as e: | |
| print(f"CRITICAL: Error calling Gemini {model_name}: {e}") | |
| continue | |
| else: | |
| print("DEBUG: Gemini client not initialized. Skipping Gemini models.") | |
| # 4. NVIDIA Fallback (if Gemini fails) | |
| if not report_data and nvidia_key: | |
| try: | |
| from openai import OpenAI | |
| nv_client = OpenAI( | |
| base_url="https://integrate.api.nvidia.com/v1", | |
| api_key=nvidia_key | |
| ) | |
| nvidia_models = [ | |
| "meta/llama-3.1-405b-instruct", | |
| "meta/llama-3.1-70b-instruct", | |
| "google/gemma-3-27b-it", | |
| "meta/llama-3.1-8b-instruct" | |
| ] | |
| for model_name in nvidia_models: | |
| try: | |
| print(f"DEBUG: Attempting NVIDIA model {model_name}...") | |
| response = nv_client.chat.completions.create( | |
| model=model_name, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0.2, | |
| max_tokens=2048 | |
| ) | |
| raw_text = response.choices[0].message.content | |
| start_idx = raw_text.find('{') | |
| end_idx = raw_text.rfind('}') + 1 | |
| if start_idx != -1 and end_idx > start_idx: | |
| json_text = raw_text[start_idx:end_idx] | |
| report_data = json.loads(json_text) | |
| print(f"DEBUG: Successfully parsed JSON from NVIDIA {model_name}") | |
| return report_data, weather_profile # SUCCESS | |
| except Exception as ne: | |
| print(f"CRITICAL: Error calling NVIDIA {model_name}: {ne}") | |
| continue | |
| except Exception as e: | |
| print(f"CRITICAL: NVIDIA client setup failed: {e}") | |
| elif not report_data: | |
| print("DEBUG: No valid report and NVIDIA key not found/valid.") | |
| if not report_data: | |
| print("DEBUG: All models failed to generate valid report.") | |
| return { | |
| "error": "Failed to generate a valid report from the AI model. All fallback models failed. Please try again later." | |
| }, [] | |
| return report_data, weather_profile | |
| def predict(): | |
| print("----- PREDICT ROUTE HIT -----") | |
| form_data = request.form.to_dict() | |
| print(f"Form Data Received: {form_data}") | |
| language = form_data.get("language", "English") | |
| report_data, weather_profile = call_gemini_api(form_data, language) | |
| # Handle error from AI call | |
| if "error" in report_data: | |
| return render_template( | |
| 'results.html', | |
| report_data={"report_title": "Error", "predicted_pest_damage_info": report_data['error']}, | |
| location={}, | |
| current_date=datetime.now().strftime("%B %d, %Y"), | |
| audio_url=None, | |
| language_code="en", | |
| weather_profile=[] | |
| ) | |
| location = report_data.get('location_info', {}) | |
| # Convert markdown fields to HTML | |
| if 'agricultural_inputs_analysis' in report_data: | |
| report_data['agricultural_inputs_analysis'] = markdown_to_html(report_data['agricultural_inputs_analysis']) | |
| if 'predicted_pest_damage_info' in report_data: | |
| report_data['predicted_pest_damage_info'] = markdown_to_html(report_data['predicted_pest_damage_info']) | |
| if 'pest_prediction_table' in report_data: | |
| for pest in report_data['pest_prediction_table']: | |
| if 'precautionary_measures' in pest: | |
| pest['precautionary_measures'] = markdown_to_html(pest['precautionary_measures']) | |
| # Generate TTS summary | |
| pest_table = report_data.get('pest_prediction_table', []) | |
| summary = f"Pest Outbreak Report for {location.get('derived_location', 'your location')}. " | |
| summary += (report_data.get('agricultural_inputs_analysis', '')[:200] + "... ") | |
| if pest_table: | |
| summary += "Predicted pests: " + ', '.join([p.get('pest_name', '') for p in pest_table]) + ". " | |
| summary += "Severity: " + ', '.join([p.get('severity', '') for p in pest_table]) + ". " | |
| summary += report_data.get('predicted_pest_damage_info', '')[:200] | |
| # Language code mapping for gTTS | |
| lang_mapping = { | |
| "English": "en", "Hindi": "hi", "Bengali": "bn", "Telugu": "te", | |
| "Marathi": "mr", "Tamil": "ta", "Gujarati": "gu", "Urdu": "ur", | |
| "Kannada": "kn", "Odia": "or", "Malayalam": "ml" | |
| } | |
| gtts_lang = lang_mapping.get(language, 'en') | |
| audio_url = None | |
| try: | |
| tts = gTTS(summary, lang=gtts_lang) | |
| safe_lat = str(location.get('latitude', '0')).replace('.', '_') | |
| safe_lon = str(location.get('longitude', '0')).replace('.', '_') | |
| audio_filename = f"pest_report_{safe_lat}_{safe_lon}.mp3" | |
| os.makedirs(app.config['AUDIO_FOLDER'], exist_ok=True) | |
| audio_path = os.path.join(app.config['AUDIO_FOLDER'], audio_filename) | |
| tts.save(audio_path) | |
| audio_url = f"/static/audio/{audio_filename}" | |
| except Exception as e: | |
| print(f"Error generating audio: {e}") | |
| return render_template( | |
| 'results.html', | |
| report_data=report_data, | |
| location=location, | |
| current_date=datetime.now().strftime("%B %d, %Y"), | |
| audio_url=audio_url, | |
| language_code=gtts_lang, | |
| weather_profile=weather_profile | |
| ) | |
| def get_timeline_weather(): | |
| """Returns the seasonal weather profile for the frontend timeline.""" | |
| lat = request.args.get('lat') | |
| lon = request.args.get('lon') | |
| start = request.args.get('start_date') | |
| end = request.args.get('end_date') | |
| if not all([lat, lon, start, end]): | |
| return jsonify({"error": "Missing parameters"}), 400 | |
| _, profile = get_historical_weather_summary(lat, lon, start, end) | |
| return jsonify(profile) | |
| if __name__ == '__main__': | |
| app.run(debug=True, port=5001) |