PRC142004's picture
Update app.py
b0c54af verified
import requests
import json
from flask import Flask, render_template, request, jsonify, Response, send_file
from google import genai
from gtts import gTTS
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta
# Load .env file
load_dotenv()
app = Flask(__name__)
app.config['AUDIO_FOLDER'] = 'static/audio'
os.makedirs(app.config['AUDIO_FOLDER'], exist_ok=True)
def markdown_to_html(text):
"""Convert markdown text to HTML for proper rendering."""
if not text:
return text
import markdown as md
return md.markdown(text, extensions=['nl2br'])
# --- ROBUST API KEY DETECTION ---
def get_api_key(service_type):
"""
Intelligently fetches API keys by scanning multiple environment variables.
- service_type="gemini": Looks for keys starting with "AIza"
- service_type="nvidia": Looks for keys starting with "nvapi-"
"""
potential_vars = [
"GEMINI_API_KEY", "GEMINI_API_KEY_1", "GEMINI_API_KEY_2", "GEMINI_API",
"NVIDIA_API_KEY", "NVIDIA_API"
]
for var_name in potential_vars:
val = os.getenv(var_name)
if not val:
continue
if service_type == "gemini" and val.startswith("AIza"):
print(f"DEBUG: Found Gemini Key in variable '{var_name}'")
return val
elif service_type == "nvidia" and val.startswith("nvapi-"):
print(f"DEBUG: Found NVIDIA Key in variable '{var_name}'")
return val
return None
# Initialize Clients
gemini_key = get_api_key("gemini")
nvidia_key = get_api_key("nvidia")
gemini_client = None
if gemini_key:
print(f"Initializing Gemini Client with key: {gemini_key[:10]}...")
try:
gemini_client = genai.Client(api_key=gemini_key)
except Exception as e:
print(f"Error initializing Gemini Client: {e}")
else:
print("WARNING: No valid Gemini API Key (starting with 'AIza') found in environment variables.")
def validate_coordinates(lat, lon):
"""Validate and convert latitude and longitude to float."""
try:
return float(lat), float(lon)
except (TypeError, ValueError):
return None, None
@app.route('/')
def index():
return render_template('index.html')
@app.route('/get_weather_data', methods=['GET'])
def get_weather_data():
"""
Fetch weather data using Open-Meteo's forecast endpoint.
"""
lat = request.args.get('lat')
lon = request.args.get('lon')
lat, lon = validate_coordinates(lat, lon)
if lat is None or lon is None:
return jsonify({"error": "Invalid coordinates"}), 400
try:
forecast_url = "https://api.open-meteo.com/v1/forecast"
forecast_params = {
"latitude": lat,
"longitude": lon,
"current_weather": "true",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"hourly": "relative_humidity_2m,soil_moisture_3_to_9cm,cloudcover,windspeed_10m",
"timezone": "auto"
}
resp = requests.get(forecast_url, params=forecast_params)
resp.raise_for_status()
data = resp.json()
daily = data.get("daily", {})
hourly = data.get("hourly", {})
current = data.get("current_weather", {})
# Daily data
max_temp = daily.get("temperature_2m_max", [None])[0]
min_temp = daily.get("temperature_2m_min", [None])[0]
rain = daily.get("precipitation_sum", [None])[0]
# Hourly data (averages)
humidity_list = hourly.get("relative_humidity_2m", [])
soil_list = hourly.get("soil_moisture_3_to_9cm", [])
cloud_list = hourly.get("cloudcover", [])
avg_humidity = sum(humidity_list) / len(humidity_list) if humidity_list else None
avg_soil_moisture = sum(soil_list) / len(soil_list) if soil_list else None
avg_cloud_cover = sum(cloud_list) / len(cloud_list) if cloud_list else None
# Current weather
current_temp = current.get("temperature")
wind_speed = current.get("windspeed")
weather = {
"max_temp": max_temp,
"min_temp": min_temp,
"rainfall": rain,
"humidity": avg_humidity,
"soil_moisture": avg_soil_moisture,
"current_temp": current_temp,
"wind_speed": wind_speed,
"cloud_cover": avg_cloud_cover
}
return jsonify(weather)
except Exception as e:
return jsonify({"error": str(e)}), 500
def get_historical_weather_summary(lat, lon, start_date_str, end_date_str):
"""
Fetches historical weather data from Open-Meteo Archive for the specified period.
If period is in future, shifts to previous year.
Returns a text summary of monthly averages and structured data list.
"""
try:
if not start_date_str or not end_date_str:
return "Weather data unavailable (dates missing).", []
start = datetime.strptime(start_date_str, '%Y-%m-%d')
end = datetime.strptime(end_date_str, '%Y-%m-%d')
today = datetime.now()
# If start date is in future, use last year's data as proxy
is_proxy = False
if start > today:
start = start.replace(year=start.year - 1)
end = end.replace(year=end.year - 1)
is_proxy = True
# Clip end date if it's still in the future after adjustment
if end > today:
end = today
# Call Open-Meteo Archive
archive_url = "https://archive-api.open-meteo.com/v1/archive"
params = {
"latitude": lat,
"longitude": lon,
"start_date": start.strftime('%Y-%m-%d'),
"end_date": end.strftime('%Y-%m-%d'),
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,relative_humidity_2m_mean",
"timezone": "auto"
}
resp = requests.get(archive_url, params=params)
if resp.status_code != 200:
print(f"DEBUG: API Failed {resp.status_code} - {resp.text}")
return f"Could not fetch weather data: {resp.status_code}", []
data = resp.json()
daily = data.get("daily", {})
dates = daily.get("time", [])
print(f"DEBUG: Retrieved {len(dates)} days.")
if dates:
print(f"DEBUG: Range {dates[0]} to {dates[-1]}")
if not dates:
return "No weather data available for this range.", []
summary_parts = []
structured_data = []
if is_proxy:
summary_parts.append("(Note: Using last year's weather as proxy for future dates)")
# Extract lists
max_temps = daily.get("temperature_2m_max", [])
humidities = daily.get("relative_humidity_2m_mean", [])
rains = daily.get("precipitation_sum", [])
# Monthly aggregation
current_month = None
temp_sum = 0
hum_sum = 0
rain_sum = 0
count = 0
month_days = []
for i, d_str in enumerate(dates):
date_obj = datetime.strptime(d_str, '%Y-%m-%d')
month_key = date_obj.strftime('%B %Y')
# Accumulate values (handle None)
t = max_temps[i] if i < len(max_temps) and max_temps[i] is not None else 0
h = humidities[i] if i < len(humidities) and humidities[i] is not None else 0
r = rains[i] if i < len(rains) and rains[i] is not None else 0
if current_month is None:
current_month = month_key
if month_key != current_month:
# Flush previous month
avg_t = temp_sum / count if count else 0
avg_h = hum_sum / count if count else 0
summary_parts.append(
f"{current_month}: Avg Temp {avg_t:.1f}C, Rain {rain_sum:.1f}mm, Humidity {avg_h:.1f}%"
)
structured_data.append({
"month": current_month,
"avg_temp": round(avg_t, 1),
"rainfall": round(rain_sum, 1),
"humidity": round(avg_h, 1),
"days": month_days
})
# Reset for new month
current_month = month_key
temp_sum = 0
hum_sum = 0
rain_sum = 0
count = 0
month_days = []
# Accumulate
temp_sum += t
hum_sum += h
rain_sum += r
count += 1
month_days.append({
"date": d_str,
"temp": t,
"humidity": h,
"rain": r
})
# Flush last month
if count > 0:
avg_t = temp_sum / count
avg_h = hum_sum / count
summary_parts.append(
f"{current_month}: Avg Temp {avg_t:.1f}C, Rain {rain_sum:.1f}mm, Humidity {avg_h:.1f}%"
)
structured_data.append({
"month": current_month,
"avg_temp": round(avg_t, 1),
"rainfall": round(rain_sum, 1),
"humidity": round(avg_h, 1),
"days": month_days
})
return "\n".join(summary_parts), structured_data
except Exception as e:
print(f"Weather Fetch Error: {e}")
return "Weather data processing failed.", []
def calculate_season_dates(season_name):
"""
Derives standard sowing and harvest dates based on the Indian agricultural season.
"""
today = datetime.today()
current_year = today.year
# Defaults
sowing_date = today
harvest_date = today + timedelta(days=120)
try:
if season_name == "Kharif":
# June 15 to Oct 15
sowing_date = datetime(current_year, 6, 15)
harvest_date = datetime(current_year, 10, 15)
if today.month > 10:
sowing_date = datetime(current_year + 1, 6, 15)
harvest_date = datetime(current_year + 1, 10, 15)
elif season_name == "Rabi":
# Nov 1 to April 1 (crosses year boundary)
if today.month > 4:
# Predicting for upcoming Rabi
sowing_date = datetime(current_year, 11, 1)
harvest_date = datetime(current_year + 1, 4, 1)
else:
# We are IN Rabi or just past it
sowing_date = datetime(current_year - 1, 11, 1)
harvest_date = datetime(current_year, 4, 1)
elif season_name == "Zaid":
# March 1 to June 1
sowing_date = datetime(current_year, 3, 1)
harvest_date = datetime(current_year, 6, 1)
if today.month > 6:
sowing_date = datetime(current_year + 1, 3, 1)
harvest_date = datetime(current_year + 1, 6, 1)
elif season_name == "Annual":
# 1-year cycle from today
sowing_date = today
harvest_date = today + timedelta(days=365)
except Exception as e:
print(f"Date Calc Error: {e}")
return sowing_date.strftime('%Y-%m-%d'), harvest_date.strftime('%Y-%m-%d')
def call_gemini_api(input_data, language):
"""
Calls the Gemini API to get a pest outbreak report in structured JSON format.
Implements fallback mechanism to try multiple models if primary fails,
then falls back to NVIDIA models if all Gemini attempts fail.
"""
# 1. Determine dates from season
lat = input_data.get('latitude')
lon = input_data.get('longitude')
season = input_data.get('season')
if season:
sowing, harvest = calculate_season_dates(season)
else:
sowing = input_data.get('sowing_date', datetime.today().strftime('%Y-%m-%d'))
harvest = input_data.get('harvest_date', (datetime.today() + timedelta(days=120)).strftime('%Y-%m-%d'))
print(f"Analysis Period: {season} ({sowing} to {harvest})")
# 2. Fetch historical/seasonal weather profile
weather_summary, weather_profile = get_historical_weather_summary(lat, lon, sowing, harvest)
print(f"Generated Weather Summary: {weather_summary[:100]}...")
prompt = f"""
You are an expert Agricultural Entomologist. Analyze the provided inputs and the MONTH-WISE WEATHER PROFILE to generate a precise Pest Outbreak Prediction.
INPUTS:
- Crop: {input_data.get('crop_type')}
- Soil: {input_data.get('soil_type')}
- Season: {season} ({sowing} to {harvest})
- Location: {input_data.get('derived_location', 'Unknown')}
- Current Growth Stage: {input_data.get('growth_stage')}
- Irrigation Frequency: {input_data.get('irrigation_freq')}
- Irrigation Method: {input_data.get('irrigation_method')}
- Location Coordinates: Lat {lat}, Lon {lon}
WEATHER PROFILE (Month-by-Month):
{weather_summary}
CRITICAL INSTRUCTIONS:
1. ANALYZE EACH MONTH SEPARATELY. Example: High Rainfall in a month -> Risk of Fungal Diseases.
2. MULTIPLE PESTS: If a month has multiple risky pests, create SEPARATE entries for each pest in the table.
3. LANGUAGE RULE: JSON KEYS must remain in English. Only translate VALUES into '{language}'.
4. ACCURACY: Only predict pests that genuinely thrive in the given weather conditions.
5. MONTH NAMING: The 'outbreak_months' field MUST contain the exact full English month names (e.g., "June", "July").
Your response MUST be a single, valid JSON object and nothing else. Do not wrap it in markdown backticks.
Required JSON structure:
{{
"report_title": "Pest Outbreak Dashboard Report",
"location_info": {{
"latitude": "{lat}",
"longitude": "{lon}",
"derived_location": "A human-readable location derived from the coordinates (e.g., 'Nagpur, India')."
}},
"agricultural_inputs_analysis": "A detailed bullet-point analysis of how the chosen Season and Weather Profile impacts this specific crop.",
"pest_prediction_table": [
{{
"pest_name": "Name of the predicted pest",
"outbreak_months": "Predicted month(s) for the outbreak",
"severity": "Predicted severity level (e.g., Low, Medium, High)",
"impacting_stage": "The specific crop stage affected (e.g., Flowering, Vegetative)",
"potential_damage": "Short description of the damage caused.",
"precautionary_measures": "A short description of key precautionary measures."
}}
],
"pest_avoidance_practices": [
"A detailed, specific pest avoidance practice based on the inputs.",
"Provide 10-12 detailed bullet points."
],
"agricultural_best_practices": [
"A specific agricultural best practice based on the inputs.",
"Another specific recommendation related to crop management."
],
"predicted_pest_damage_info": "Detailed bullet points (markdown format using -) describing the potential damage the predicted pests could cause."
}}
"""
models_to_try = [
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
]
report_data = {}
# 3. Try Gemini models first
if gemini_client:
for model_name in models_to_try:
try:
print(f"DEBUG: Attempting Gemini model {model_name}...")
response = gemini_client.models.generate_content(
model=model_name,
contents=prompt
)
if response and response.text:
raw_text = response.text
start_idx = raw_text.find('{')
end_idx = raw_text.rfind('}') + 1
if start_idx != -1 and end_idx > start_idx:
json_text = raw_text[start_idx:end_idx]
report_data = json.loads(json_text)
print(f"DEBUG: Successfully parsed JSON from Gemini {model_name}")
return report_data, weather_profile # SUCCESS
except Exception as e:
print(f"CRITICAL: Error calling Gemini {model_name}: {e}")
continue
else:
print("DEBUG: Gemini client not initialized. Skipping Gemini models.")
# 4. NVIDIA Fallback (if Gemini fails)
if not report_data and nvidia_key:
try:
from openai import OpenAI
nv_client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=nvidia_key
)
nvidia_models = [
"meta/llama-3.1-405b-instruct",
"meta/llama-3.1-70b-instruct",
"google/gemma-3-27b-it",
"meta/llama-3.1-8b-instruct"
]
for model_name in nvidia_models:
try:
print(f"DEBUG: Attempting NVIDIA model {model_name}...")
response = nv_client.chat.completions.create(
model=model_name,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=2048
)
raw_text = response.choices[0].message.content
start_idx = raw_text.find('{')
end_idx = raw_text.rfind('}') + 1
if start_idx != -1 and end_idx > start_idx:
json_text = raw_text[start_idx:end_idx]
report_data = json.loads(json_text)
print(f"DEBUG: Successfully parsed JSON from NVIDIA {model_name}")
return report_data, weather_profile # SUCCESS
except Exception as ne:
print(f"CRITICAL: Error calling NVIDIA {model_name}: {ne}")
continue
except Exception as e:
print(f"CRITICAL: NVIDIA client setup failed: {e}")
elif not report_data:
print("DEBUG: No valid report and NVIDIA key not found/valid.")
if not report_data:
print("DEBUG: All models failed to generate valid report.")
return {
"error": "Failed to generate a valid report from the AI model. All fallback models failed. Please try again later."
}, []
return report_data, weather_profile
@app.route('/predict', methods=['POST'])
def predict():
print("----- PREDICT ROUTE HIT -----")
form_data = request.form.to_dict()
print(f"Form Data Received: {form_data}")
language = form_data.get("language", "English")
report_data, weather_profile = call_gemini_api(form_data, language)
# Handle error from AI call
if "error" in report_data:
return render_template(
'results.html',
report_data={"report_title": "Error", "predicted_pest_damage_info": report_data['error']},
location={},
current_date=datetime.now().strftime("%B %d, %Y"),
audio_url=None,
language_code="en",
weather_profile=[]
)
location = report_data.get('location_info', {})
# Convert markdown fields to HTML
if 'agricultural_inputs_analysis' in report_data:
report_data['agricultural_inputs_analysis'] = markdown_to_html(report_data['agricultural_inputs_analysis'])
if 'predicted_pest_damage_info' in report_data:
report_data['predicted_pest_damage_info'] = markdown_to_html(report_data['predicted_pest_damage_info'])
if 'pest_prediction_table' in report_data:
for pest in report_data['pest_prediction_table']:
if 'precautionary_measures' in pest:
pest['precautionary_measures'] = markdown_to_html(pest['precautionary_measures'])
# Generate TTS summary
pest_table = report_data.get('pest_prediction_table', [])
summary = f"Pest Outbreak Report for {location.get('derived_location', 'your location')}. "
summary += (report_data.get('agricultural_inputs_analysis', '')[:200] + "... ")
if pest_table:
summary += "Predicted pests: " + ', '.join([p.get('pest_name', '') for p in pest_table]) + ". "
summary += "Severity: " + ', '.join([p.get('severity', '') for p in pest_table]) + ". "
summary += report_data.get('predicted_pest_damage_info', '')[:200]
# Language code mapping for gTTS
lang_mapping = {
"English": "en", "Hindi": "hi", "Bengali": "bn", "Telugu": "te",
"Marathi": "mr", "Tamil": "ta", "Gujarati": "gu", "Urdu": "ur",
"Kannada": "kn", "Odia": "or", "Malayalam": "ml"
}
gtts_lang = lang_mapping.get(language, 'en')
audio_url = None
try:
tts = gTTS(summary, lang=gtts_lang)
safe_lat = str(location.get('latitude', '0')).replace('.', '_')
safe_lon = str(location.get('longitude', '0')).replace('.', '_')
audio_filename = f"pest_report_{safe_lat}_{safe_lon}.mp3"
os.makedirs(app.config['AUDIO_FOLDER'], exist_ok=True)
audio_path = os.path.join(app.config['AUDIO_FOLDER'], audio_filename)
tts.save(audio_path)
audio_url = f"/static/audio/{audio_filename}"
except Exception as e:
print(f"Error generating audio: {e}")
return render_template(
'results.html',
report_data=report_data,
location=location,
current_date=datetime.now().strftime("%B %d, %Y"),
audio_url=audio_url,
language_code=gtts_lang,
weather_profile=weather_profile
)
@app.route('/get_timeline_weather', methods=['GET'])
def get_timeline_weather():
"""Returns the seasonal weather profile for the frontend timeline."""
lat = request.args.get('lat')
lon = request.args.get('lon')
start = request.args.get('start_date')
end = request.args.get('end_date')
if not all([lat, lon, start, end]):
return jsonify({"error": "Missing parameters"}), 400
_, profile = get_historical_weather_summary(lat, lon, start, end)
return jsonify(profile)
if __name__ == '__main__':
app.run(debug=True, port=5001)