krushimitravit's picture
Upload 8 files
5b642ef verified
import os
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, session
import requests
from werkzeug.utils import secure_filename
import google.generativeai as genai
import base64
import json
from datetime import datetime, timedelta
import threading
import time
from gtts import gTTS
import dotenv
import markdown
from openai import OpenAI
from typing import Optional, Dict, Any
# Load environment variables if a .env file is present
dotenv.load_dotenv()
def markdown_to_html(text):
"""Convert markdown text to HTML for proper rendering."""
if not text:
return text
return markdown.markdown(text, extensions=['nl2br'])
# --- Configuration ---
# Ensure you have set these as environment variables in your deployment environment
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_CX = os.getenv("GOOGLE_CX")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY")
# Validate API keys with helpful messages
if not GEMINI_API_KEY:
print("WARNING: GEMINI_API_KEY not found. Will rely on NVIDIA fallback if available.")
print(" β†’ For Hugging Face: Set this in Space Settings > Repository Secrets")
if not GOOGLE_API_KEY or not GOOGLE_CX:
print("WARNING: GOOGLE_API_KEY or GOOGLE_CX is not set. Web and product search features will be disabled.")
print(" β†’ For Hugging Face: Set these in Space Settings > Repository Secrets")
if not NVIDIA_API_KEY:
print("WARNING: NVIDIA_API_KEY not set. NVIDIA fallback will not be available.")
print(" β†’ For Hugging Face: Set this in Space Settings > Repository Secrets")
# Configure Gemini only if API key is available
if GEMINI_API_KEY:
genai.configure(api_key=GEMINI_API_KEY)
else:
print("⚠️ Gemini API not configured. Application will use NVIDIA fallback only.")
# --- ENHANCED MODEL CONFIGURATION ---
GEMINI_MODELS = [
{"name": "gemini-2.0-flash-exp", "max_retries": 2, "timeout": 30, "description": "Latest experimental"},
{"name": "gemini-1.5-pro-latest", "max_retries": 2, "timeout": 45, "description": "Most capable"},
{"name": "gemini-1.5-flash", "max_retries": 3, "timeout": 20, "description": "Fast and reliable"},
{"name": "gemini-1.5-flash-8b", "max_retries": 3, "timeout": 15, "description": "Lightweight"},
]
NVIDIA_MODELS = [
{"name": "meta/llama-3.2-90b-vision-instruct", "max_retries": 2, "timeout": 40, "description": "High capability"},
{"name": "meta/llama-3.2-11b-vision-instruct", "max_retries": 2, "timeout": 30, "description": "Balanced"},
]
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY", "a-strong-default-secret-key")
# Configure folders
UPLOAD_FOLDER = 'static/uploads'
AUDIO_FOLDER = 'static/audio'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'}
# Create directories with better error handling for Hugging Face
try:
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(AUDIO_FOLDER, exist_ok=True)
print(f"βœ“ Created directories: {UPLOAD_FOLDER}, {AUDIO_FOLDER}")
except OSError as e:
print(f"⚠️ Warning: Could not create directories: {e}")
print(" β†’ This may be normal on Hugging Face Spaces with read-only filesystem")
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def retry_with_backoff(func, max_retries=3, initial_delay=1):
"""
Retry a function with exponential backoff.
Args:
func: Function to retry
max_retries: Maximum number of retry attempts
initial_delay: Initial delay in seconds (doubles each retry)
Returns:
Result of successful function call
Raises:
Last exception if all retries fail
"""
last_exception = None
for attempt in range(max_retries):
try:
return func()
except Exception as e:
last_exception = e
if attempt == max_retries - 1:
raise
delay = initial_delay * (2 ** attempt)
print(f" >> Retry {attempt + 1}/{max_retries} after {delay}s delay (Error: {type(e).__name__})")
time.sleep(delay)
raise last_exception
def analyze_with_gemini(image_path, prompt, model_config):
"""
Analyze with a specific Gemini model with retry logic.
Args:
image_path: Path to the image file
prompt: Text prompt for analysis
model_config: Dict with model name, retries, timeout
Returns:
Response text or None if failed
"""
model_name = model_config["name"]
max_retries = model_config.get("max_retries", 2)
def _attempt():
print(f" >> Attempting Gemini model: {model_name} ({model_config.get('description', '')})")
model = genai.GenerativeModel(model_name)
image_parts = [{"mime_type": "image/jpeg", "data": encode_image(image_path)}]
response = model.generate_content(
[prompt] + image_parts,
generation_config={
"temperature": 0.2,
"max_output_tokens": 2048,
}
)
if not response or not response.text:
raise ValueError("Empty response from model")
return response.text
try:
return retry_with_backoff(_attempt, max_retries=max_retries)
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)[:100]
print(f" >> FAILED {model_name}: {error_type}: {error_msg}")
return None
def analyze_with_nvidia(image_path, prompt, model_config):
"""
Analyze image using NVIDIA's API via OpenAI-compatible client with retry logic.
Args:
image_path: Path to the image file
prompt: Text prompt for analysis
model_config: Dict with model name, retries, timeout
Returns:
Response text or None if failed
"""
model_name = model_config["name"]
max_retries = model_config.get("max_retries", 2)
timeout = model_config.get("timeout", 30)
if not NVIDIA_API_KEY:
print("NVIDIA API key not available.")
return None
def _attempt():
print(f" >> Attempting NVIDIA model: {model_name} ({model_config.get('description', '')})")
client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=NVIDIA_API_KEY
)
base64_image = encode_image(image_path)
completion = client.chat.completions.create(
model=model_name,
messages=[{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}
}
]
}],
max_tokens=2000,
temperature=0.2,
timeout=timeout
)
response_text = completion.choices[0].message.content
if not response_text:
raise ValueError("Empty response from NVIDIA")
return response_text
try:
return retry_with_backoff(_attempt, max_retries=max_retries)
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)[:100]
print(f" >> FAILED NVIDIA {model_name}: {error_type}: {error_msg}")
return None
def get_web_pesticide_info(disease, plant_type="Unknown"):
"""Fetch pesticide information from web sources."""
if not GOOGLE_API_KEY or not GOOGLE_CX:
print("Skipping web search: Google API credentials not set.")
return None
query = f"site:agrowon.esakal.com {disease} in {plant_type}"
url = "https://www.googleapis.com/customsearch/v1"
params = {"key": GOOGLE_API_KEY, "cx": GOOGLE_CX, "q": query, "num": 1}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if "items" in data and data["items"]:
item = data["items"][0]
return {
"title": item.get("title", "No title"),
"link": item.get("link", "#"),
"summary": item.get("snippet", "No summary available")
}
except requests.exceptions.RequestException as e:
print(f"Error retrieving web pesticide info for '{disease}': {e}")
return None
def get_commercial_product_info(recommendation, disease_name):
"""Fetch commercial product information."""
if not GOOGLE_API_KEY or not GOOGLE_CX:
print("Skipping product search: Google API credentials not set.")
return []
queries = [
f"site:indiamart.com pesticide for '{disease_name}' '{recommendation}'",
f"site:krishisevakendra.in pesticide for '{disease_name}' '{recommendation}'"
]
results = []
for query in queries:
url = "https://www.googleapis.com/customsearch/v1"
params = {"key": GOOGLE_API_KEY, "cx": GOOGLE_CX, "q": query, "num": 2}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if "items" in data:
for item in data["items"]:
results.append({
"title": item.get("title", "No title"),
"link": item.get("link", "#"),
"snippet": item.get("snippet", "No snippet available")
})
except requests.exceptions.RequestException as e:
print(f"Error retrieving product info with query '{query}': {e}")
return results
def generate_audio(text, language, filename):
"""Generate an MP3 file from text using gTTS."""
try:
lang_mapping = {"English": "en", "Hindi": "hi", "Bengali": "bn", "Telugu": "te", "Marathi": "mr", "Tamil": "ta",
"Gujarati": "gu", "Urdu": "ur", "Kannada": "kn", "Odia": "or", "Malayalam": "ml"}
gtts_lang = lang_mapping.get(language, 'en')
tts = gTTS(text=text, lang=gtts_lang, slow=False)
tts.save(filename)
print(f"Audio file generated successfully: {filename}")
except Exception as e:
print(f"Error generating audio: {e}")
def analyze_plant_image(image_path, plant_name, language):
"""
Analyzes the plant image using enhanced LLM fallback system.
Tries Gemini models first, then falls back to NVIDIA if all fail.
Includes retry logic with exponential backoff for transient errors.
"""
try:
print("\n" + "=" * 70)
print("🌱 STARTING PLANT ANALYSIS WITH ENHANCED FALLBACK SYSTEM")
print("=" * 70)
# --- RAG: Fetch Learning Context ---
knowledge_context = ""
try:
if os.path.exists("knowledge_base.txt"):
with open("knowledge_base.txt", "r") as kb:
lines = kb.readlines()
relevant_lines = [line.strip() for line in lines if plant_name.lower() in line.lower()]
if not relevant_lines:
relevant_lines = lines[-10:]
if relevant_lines:
knowledge_context = "\n".join(relevant_lines)
print(f"πŸ“š [RAG] Injected {len(relevant_lines)} context lines")
except Exception as kbe:
print(f"⚠️ KB Read Error: {kbe}")
# Create the analysis prompt
prompt = f"""
You are an expert agricultural pathologist.
[SYSTEM KNOWLEDGE BASE - PREVIOUS VALIDATED USER CORRECTIONS]
The following are verified corrections from users for this crop. Give them 20% weight in your decision if the visual symptoms match:
{knowledge_context}
[END KNOWLEDGE BASE]
Analyze the image of a {plant_name} plant and decide whether it is healthy or has a disease or pest. Respond ONLY with a single, valid JSON object and NOTHING else.
The JSON must exactly match this structure:
{{"results": [{{"type": "disease/pest", "name": "...", "probability": "%", "symptoms": "...", "causes": "...", "severity": "Low/Medium/High", "spreading": "...", "treatment": "...", "prevention": "..."}}], "is_healthy": boolean, "confidence": "%"}}
Carefully follow these instructions when filling each field:
1. Top-level rules
- Return only the JSON object β€” no explanations, no extra text, no markdown.
- Use the {language} language for all human-facing text inside the JSON (except scientific names and chemical active ingredient names which may remain in English but must be immediately explained in {language}).
- Percent values must be strings with a percent sign, e.g. "85%".
- If the plant is healthy: set "is_healthy": true, set "results": [] (empty array), and set a high "confidence".
- If you cannot make a clear diagnosis from the image, set "is_healthy": false, give "confidence" a low value (e.g., "20%–40%"), and include one result with name "Inconclusive / Image unclear" (translated to {language}) and a short instruction on how to take a better photo.
2. results (one object per distinct issue; max 3 items; order by probability descending)
- "type": exactly "disease" or "pest".
- "name": give the common local name first (in {language}) and then scientific name in parentheses if available. Use names familiar to Indian farmers.
- "probability": your estimated chance this diagnosis is correct, as a percent string (e.g., "78%").
- "symptoms": list only observable signs a farmer can check (what to look for on leaves, stem, roots, fruits). Format as an HTML list (e.g., "<ul><li>Spot 1</li><li>Spot 2</li></ul>"). Use short simple sentences.
- "causes": 1–3 likely causes. Format as an HTML list (e.g., "<ul><li>Cause 1</li><li>Cause 2</li></ul>").
- "severity": choose exactly one of "Low", "Medium", or "High" and append a short reason in the same string (e.g., "High β€” fruit dropping"). Do NOT create a separate field.
- "spreading": describe how it spreads in simple terms (wind, water splash, touch, insects) and use one of these speed labels in the explanation: "None", "Slow", "Moderate", "Fast". Keep it short.
- "treatment": give a prioritized, farmer-friendly, step-by-step plan (max 5 steps). Format as an HTML ordered list (e.g., "<ol><li>Step 1</li><li>Step 2</li></ol>").
1) Low-cost cultural controls,
2) Biological/organic options,
3) Chemical options only if necessary: list **active ingredient** names.
Write treatment steps in simple, imperative sentences.
- "prevention": provide 4–6 simple preventive tips. Format as an HTML list (e.g., "<ul><li>Tip 1</li><li>Tip 2</li></ul>").
3. Additional formatting & behavior rules
- Use no null values; if unknown, use empty string "".
- Keep each text field concise and simple β€” aim for sentences a low-literacy farmer can understand.
- If you reference any chemical or biological product by active ingredient, include a short safety note and the phrase (in {language}): "ΰ€¦ΰ₯‡ΰ€–ΰ₯‡ΰ€‚ ΰ€²ΰ₯‡ΰ€¬ΰ€² / ΰ€•ΰ₯ΰ€·ΰ₯‡ΰ€€ΰ₯ΰ€°ΰ₯€ΰ€― ΰ€•ΰ₯ƒΰ€·ΰ€Ώ ΰ€…ΰ€§ΰ€Ώΰ€•ΰ€Ύΰ€°ΰ₯€ ΰ€Έΰ₯‡ ΰ€Έΰ€²ΰ€Ύΰ€Ή ΰ€²ΰ₯‡ΰ€‚" or equivalent in {language}.
- If recommending to contact an expert, mention the nearest trusted resource in India: "Krishi Vigyan Kendra / ΰ€Έΰ₯ΰ€₯ΰ€Ύΰ€¨ΰ₯€ΰ€― ΰ€•ΰ₯ƒΰ€·ΰ€Ώ ΰ€…ΰ€§ΰ€Ώΰ€•ΰ€Ύΰ€°ΰ₯€" (translated into {language}).
- If multiple issues are present, include up to 3 results. If only one issue, include only one result.
4. Image-quality fallback
- If the image is blurry, dark, or shows only part of the plant, put an honest low confidence (e.g., "30%"), set "is_healthy": false, and in results provide "Inconclusive / Image unclear" with one short line in {language} explaining how to take a clear photo (full leaf + whole plant + close-up of affected area + daylight).
Strictly produce only the JSON object following the structure above and the language requirement. No additional output.
"""
response_text = None
used_model_name = "None"
# --- PHASE 1: Try Gemini Models ---
if GEMINI_API_KEY:
print("\n" + "-" * 70)
print("πŸ“‘ PHASE 1: Trying Gemini Models")
print("-" * 70)
for idx, model_config in enumerate(GEMINI_MODELS, 1):
print(f"\n[{idx}/{len(GEMINI_MODELS)}] Testing {model_config['name']}...")
response_text = analyze_with_gemini(image_path, prompt, model_config)
if response_text:
used_model_name = f"Gemini-{model_config['name']}"
print(f" βœ“ SUCCESS with {used_model_name}")
break
else:
print(f" βœ— Failed, trying next model...")
else:
print("\n" + "-" * 70)
print("⚠️ PHASE 1: SKIPPED (No Gemini API key)")
print("-" * 70)
# --- PHASE 2: Try NVIDIA Models (Fallback) ---
if not response_text and NVIDIA_API_KEY:
print("\n" + "-" * 70)
print("πŸ“‘ PHASE 2: Trying NVIDIA Models (Fallback)")
print("-" * 70)
for idx, model_config in enumerate(NVIDIA_MODELS, 1):
print(f"\n[{idx}/{len(NVIDIA_MODELS)}] Testing {model_config['name']}...")
response_text = analyze_with_nvidia(image_path, prompt, model_config)
if response_text:
used_model_name = f"NVIDIA-{model_config['name']}"
print(f" βœ“ SUCCESS with {used_model_name}")
break
else:
print(f" βœ— Failed, trying next model...")
elif not response_text:
print("\n" + "-" * 70)
print("⚠️ PHASE 2: SKIPPED (No NVIDIA API key)")
print("-" * 70)
# --- PHASE 3: Final Error Handling ---
if not response_text:
error_msg = "❌ All LLM providers failed after retries."
if not GEMINI_API_KEY and not NVIDIA_API_KEY:
error_msg = "❌ No API keys configured. Set GEMINI_API_KEY or NVIDIA_API_KEY in environment."
print("\n" + "=" * 70)
print(f"ANALYSIS FAILED: {error_msg}")
print("=" * 70 + "\n")
raise RuntimeError(error_msg)
print("\n" + "=" * 70)
print(f"βœ… ANALYSIS COMPLETE using {used_model_name}")
print("=" * 70)
print(f"πŸ“„ Response preview (first 300 chars): {response_text[:300]}...")
print("=" * 70 + "\n")
# --- Parse JSON Response ---
try:
json_start = response_text.find('{')
json_end = response_text.rfind('}') + 1
if json_start == -1 or json_end == 0:
raise ValueError("No JSON object found in the response.")
json_str = response_text[json_start:json_end]
analysis_result = json.loads(json_str)
print("βœ“ Successfully parsed JSON response.")
except (json.JSONDecodeError, ValueError) as e:
print(f"❌ ERROR: Failed to parse JSON from response.")
print(f"Error: {e}")
print(f"Raw Response Text: {response_text}")
return {"error": "Failed to parse API response. The format was invalid."}
# --- Generate Audio Summary ---
print("πŸ”Š Generating audio summary...")
if analysis_result.get('is_healthy'):
summary_text = f"Your {plant_name} plant appears to be healthy."
elif analysis_result.get('results'):
result = analysis_result['results'][0]
summary_text = f"Issue detected: {result.get('name')}. Treatment suggestion: {result.get('treatment')}"
else:
summary_text = "Analysis was inconclusive."
audio_filename = "audio_result.mp3"
audio_path = os.path.join(AUDIO_FOLDER, audio_filename)
generate_audio(summary_text, language, audio_path)
analysis_result['audio_file'] = os.path.join('audio', audio_filename).replace('\\', '/')
print(f"βœ“ Audio file generated: {analysis_result['audio_file']}")
return analysis_result
except Exception as e:
print(f"\n{'=' * 70}")
print(f"❌ FATAL ERROR in analyze_plant_image: {e}")
print(f"{'=' * 70}\n")
return {"error": str(e), "is_healthy": None, "results": []}
@app.route('/', methods=['GET'])
def index():
return render_template('index.html')
@app.route('/feedback', methods=['POST'])
def feedback():
feedback_text = request.form.get("feedback")
plant_name = request.form.get("plant_name", "Unknown")
if not feedback_text:
flash("Please provide your feedback before submitting.")
return redirect(url_for('index'))
feedback_data = {"plant_name": plant_name, "feedback": feedback_text, "timestamp": datetime.now().isoformat()}
# --- FEEDBACK REINFORCEMENT LEARNING LOOP ---
def validate_and_learn(f_data, img_path):
"""
Background task:
1. Ask Gemini if this feedback is scientifically valid for the image.
2. If valid, append to 'knowledge_base.txt' for future prompt injection.
"""
try:
print(f"--- [RL] Validating Feedback: '{f_data['feedback']}' ---")
if not img_path or not os.path.exists(img_path):
print("--- [RL] No image found for validation. Skipping.")
return
if not GEMINI_API_KEY:
print("--- [RL] No Gemini API key. Skipping validation.")
return
model = genai.GenerativeModel('gemini-1.5-flash')
img_file = {"mime_type": "image/jpeg", "data": encode_image(img_path)}
validation_prompt = f"""
You are a Senior Agricultural Quality Control Auditor.
A user provided the following feedback/correction for an AI diagnosis of this {f_data['plant_name']} plant:
USER FEEDBACK: "{f_data['feedback']}"
Task:
1. Analyze the image and the user's claim.
2. Determine if the feedback is PLAUSIBLE or CORRECT based on visual evidence.
3. Respond with ONLY 'VALID' or 'INVALID'.
"""
resp = model.generate_content([validation_prompt, img_file])
verdict = resp.text.strip().upper()
print(f"--- [RL] Verdict: {verdict} ---")
if "VALID" in verdict:
kb_entry = f"[{f_data['plant_name']}] Verified User Insight: {f_data['feedback']} (Visuals confirmed)\n"
with open("knowledge_base.txt", "a") as kb:
kb.write(kb_entry)
print("--- [RL] Knowledge Base Updated! ---")
except Exception as e:
print(f"--- [RL] Validation Failed: {e}")
image_filename = request.form.get("image_filename")
if image_filename:
full_img_path = os.path.join(app.config['UPLOAD_FOLDER'], image_filename)
threading.Thread(target=validate_and_learn, args=(feedback_data, full_img_path)).start()
flash("Thank you! Your feedback is being analyzed to improve future predictions.")
return redirect(url_for('index'))
@app.route('/analyze', methods=['POST'])
def analyze():
print("\n" + "=" * 70)
print("πŸ”¬ NEW ANALYSIS REQUEST RECEIVED")
print("=" * 70)
if 'plant_image' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['plant_image']
plant_name = request.form.get('plant_name', 'Unknown Plant')
language = request.form.get('language', 'English')
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
try:
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
print(f"πŸ’Ύ Saving uploaded file to: {file_path}")
file.save(file_path)
analysis_result = analyze_plant_image(file_path, plant_name, language)
if 'error' in analysis_result:
flash(f"Analysis Error: {analysis_result['error']}")
return redirect(url_for('index'))
# Convert markdown to HTML in all result fields
if analysis_result.get('results'):
for result in analysis_result['results']:
for field in ['symptoms', 'causes', 'spreading', 'treatment', 'prevention']:
if field in result:
result[field] = markdown_to_html(result[field])
web_info = {}
product_info = {}
if not analysis_result.get('is_healthy') and analysis_result.get('results'):
print("πŸ” Disease detected. Fetching additional web and product info...")
for result in analysis_result['results']:
disease_name = result.get('name', '')
if disease_name:
web_info[disease_name] = get_web_pesticide_info(disease_name, plant_name)
product_info[disease_name] = get_commercial_product_info(result.get('treatment', ''), disease_name)
print("βœ“ Finished fetching additional info.")
print("\n" + "=" * 70)
print("βœ… Analysis complete. Rendering results page.")
print("=" * 70 + "\n")
return render_template(
'results.html',
results=analysis_result,
plant_name=plant_name,
image_path='uploads/' + filename,
web_info=web_info,
product_info=product_info
)
except Exception as e:
print(f"\n{'=' * 70}")
print(f"❌ FATAL ERROR IN /analyze ROUTE: {e}")
print(f"{'=' * 70}\n")
flash(f"A critical server error occurred: {e}")
return redirect(url_for('index'))
flash('Invalid file type. Please upload an image (png, jpg, jpeg).')
return redirect(request.url)
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860))
print("\n" + "=" * 70)
print(f"πŸš€ Starting Flask Application on port {port}")
print("=" * 70)
print(f"πŸ“Š Configuration Status:")
print(f" - Gemini API: {'βœ“ Configured' if GEMINI_API_KEY else 'βœ— Not configured'}")
print(f" - NVIDIA API: {'βœ“ Configured' if NVIDIA_API_KEY else 'βœ— Not configured'}")
print(f" - Google Search: {'βœ“ Configured' if (GOOGLE_API_KEY and GOOGLE_CX) else 'βœ— Not configured'}")
print(f" - Available Gemini Models: {len(GEMINI_MODELS)}")
print(f" - Available NVIDIA Models: {len(NVIDIA_MODELS)}")
print("=" * 70 + "\n")
app.run(host='0.0.0.0', port=port, debug=True)