pest_pred / app.py
prabhat-rocks's picture
Update app.py
b239d3e verified
import os
import json
import threading
from uuid import uuid4
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from flask import (
Flask, render_template, request, jsonify,
redirect, url_for, flash, session, send_from_directory # ← add this
)
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
# ML
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image
# Gemini
import google.generativeai as genai
from datetime import datetime
# in-memory chat history: username -> list of sessions
# each session: {"id": "...", "pest": "wasp", "created_at": "...", "messages": [{"role": "user"/"assistant","text": "..."}]}
# ---------------------------------------------------------
# Boot
# ---------------------------------------------------------
load_dotenv()
app = Flask(__name__)
# Use a persistent secret key so Flask sessions survive restarts
app.secret_key = os.getenv("FLASK_SECRET_KEY", "my-name-is-prabhat007")
# Keep user sessions alive across reloads
app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 1 day (in seconds)
app.config['SESSION_PERMANENT'] = True
#app.config['SESSION_COOKIE_SECURE'] = False # Required for HTTP (HF Spaces handles HTTPS)
app.config['SESSION_COOKIE_HTTPONLY'] = True
#app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# use HTTPS on HF spaces β€” set Secure=True and SameSite=None
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
@app.before_request
def make_session_permanent():
session.permanent = True
# ---------------------------------------------------------
# 1) Model loading & config
# ---------------------------------------------------------
MODEL_PATH = 'dhanush_model.h5'
CLASS_NAMES = [
'ants', 'bees', 'beetle', 'caterpillar', 'earthworms', 'earwig',
'Grasshopper', 'moth', 'slug', 'snail', 'wasp', 'weevil'
]
IMG_SIZE = (224, 224)
DATA_DIR = os.environ.get("DATA_DIR", "/data")
UPLOAD_FOLDER = os.path.join(DATA_DIR, "uploads")
HISTORY_DIR = os.path.join(DATA_DIR, "chat_history")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(HISTORY_DIR, exist_ok=True)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
# Load model
try:
pest_model = load_model(MODEL_PATH)
print(f"βœ… Model loaded successfully from {MODEL_PATH}")
except Exception as e:
print(f"❌ Error loading model: {e}")
pest_model = None
# Threshold (from diagnostics/thresholds.json if present)
GLOBAL_TAU = 0.45
try:
with open("diagnostics/thresholds.json", "r") as f:
GLOBAL_TAU = float(json.load(f).get("global_tau", 0.0))
print(f"βœ… Loaded global threshold Ο„ = {GLOBAL_TAU:.3f}")
except Exception as _:
print("ℹ️ No diagnostics/thresholds.json found; using Ο„ = 0.0")
# ---------------------------------------------------------
# 2) Gemini API & RAG setup
# ---------------------------------------------------------
# ===================== Gemini API & RAG setup =====================
# global chat sessions (per pest type)
chat_sessions = {}
# cache the most recent prediction per user, to attach image to first chat turn
latest_pred_by_user = {} # username -> {"pest": str, "image_url": str, "ts": "..."}
# Simple in-memory chat history store (for demo / dev)
# Structure: user_sessions[username] -> list of sessions
# session: { id, pest, created_at, messages: [ {role, text, ts} ] }
user_sessions = {}
_user_sessions_lock = threading.Lock()
import glob
def _history_path_for(username: str) -> str:
safe = (username or "__anon__").replace(":", "_")
return os.path.join(HISTORY_DIR, f"{safe}.json")
def _persist_user(username: str):
"""Write this user's sessions to disk as JSON."""
if not username:
username = "__anon__"
path = _history_path_for(username)
with _user_sessions_lock:
data = user_sessions.get(username, [])
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"⚠️ Failed to persist history for {username}: {e}")
def _load_all_users_on_boot():
"""Load any previously saved sessions from disk into memory."""
loaded = 0
for p in glob.glob(os.path.join(HISTORY_DIR, "*.json")):
try:
with open(p, "r", encoding="utf-8") as f:
sessions = json.load(f)
username = os.path.splitext(os.path.basename(p))[0]
username = "__anon__" if username == "__anon__" else username
with _user_sessions_lock:
user_sessions[username] = sessions
loaded += 1
except Exception as e:
print(f"⚠️ Failed to load history file {p}: {e}")
print(f"πŸ“š Loaded chat history for {loaded} user(s) from disk.")
# Load any persisted chat history when the app starts
_load_all_users_on_boot()
def _make_session(username, pest, image_url=None):
sid = f"{username}:{pest}:{uuid4().hex[:8]}"
session = {
"id": sid,
"pest": pest,
"created_at": datetime.utcnow().isoformat(),
"image_url": image_url, # NEW: persist the prediction image for this chat
"messages": []
}
with _user_sessions_lock:
user_sessions.setdefault(username, []).append(session)
_persist_user(username)
return sid
def _find_session_id_for_user_pest(username, pest):
"""Return the most recent session id for user+pest or None."""
with _user_sessions_lock:
sessions = user_sessions.get(username, [])
for s in reversed(sessions):
if (s.get("pest") or "").lower() == (pest or "").lower():
return s["id"]
return None
def _append_history(username, pest, role, text, session_id=None):
"""
Append a message to history.
If session_id is None, create a new session for username+pest.
Returns session_id used.
"""
if not username:
username = "__anon__"
with _user_sessions_lock:
if session_id is None:
# try to reuse most recent session for this user/pest; otherwise create new
sid = _find_session_id_for_user_pest(username, pest)
if sid is None:
sid = _make_session(username, pest)
else:
sid = session_id
# find session object
sessions = user_sessions.setdefault(username, [])
sess_obj = next((s for s in sessions if s["id"] == sid), None)
if sess_obj is None:
# if not found, create
sid = _make_session(username, pest)
sess_obj = next(s for s in user_sessions[username] if s["id"] == sid)
sess_obj["messages"].append({"role": role, "text": text, "ts": datetime.utcnow().isoformat()})
_persist_user(username)
return sid
try:
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
raise ValueError("GOOGLE_API_KEY not found in environment variables.")
genai.configure(api_key=GOOGLE_API_KEY)
# Use a widely available model name to avoid 404s
rag_model = genai.GenerativeModel(model_name="gemini-2.0-flash-lite")
print("βœ… Gemini API configured successfully with 'gemini-2.5-pro'.")
# Load the file references you created with upload_files.py
with open('file_references.json', 'r') as f:
file_references = json.load(f)
print("βœ… Document file references loaded.")
except Exception as e:
print(f"❌ Gemini API or File Reference Error: {e}")
rag_model = None
file_references = {}
# ================================================================
# ---------------------------------------------------------
# 3) Product recommendations (sample data)
# ---------------------------------------------------------
PRODUCT_RECOMMENDATIONS = {
'wasp': [
{
'name': 'Wasp & Hornet Killer Spray',
'image_url': 'https://placehold.co/150x150/e9d5ff/4c1d95?text=Wasp+Spray',
'link': '#',
'description': 'Fast-acting spray that kills wasps, hornets, and yellow jackets on contact.'
},
{
'name': 'Reusable Wasp Trap',
'image_url': 'https://placehold.co/150x150/dbeafe/1e3a8a?text=Wasp+Trap',
'link': '#',
'description': 'Eco-friendly trap to lure and capture wasps without harmful chemicals.'
}
],
'ants': [
{
'name': 'Ants',
'image_url': "https://m.media-amazon.com/images/I/71HxltbUHUL._UF350,350_QL80_.jpg",
'link': 'https://www.syngentapmp.com/product/advion-ant-gel-insecticide',
'description': 'Professional gel bait for colony control (good for sweet- and protein-feeding ants).'
}
],
'caterpillar': [
{
'name': 'Bacillus Thuringiensis (BT) Spray',
'image_url': 'https://m.media-amazon.com/images/I/61r5VSrCMqL._UF1000,1000_QL80_.jpg',
'link': 'https://www.amazon.in/insecticide-for-caterpillar/s?k=insecticide+for+caterpillar&utm_source=chatgpt.com',
'description': 'Natural pesticide effective against caterpillars; safe for most beneficials.'
}
],
'beetle': [
{
'name': 'Rhinoceros Beetle Lure',
'image_url': 'https://lilaagrotech.com/wp-content/uploads/2024/08/Live_Earth-Worms-removebg-preview.webp',
'link': 'https://www.bighaat.com/products/rhinoceros-beetle-lure-pest-control-india',
'description': 'A lure to attract and trap rhinoceros beetles in orchards.'
}
],
'earthworms': [
{
'name': 'Aquaritin Shield',
'image_url': 'https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTtkun_yTRqurn0uQ4L9zEN6zmlWr29_OvnbA&s',
'link': 'https://www.kisaantrade.com/product/aquaritin-shield-turf-s?utm_source=chatgpt.com',
'description': 'Biodegradable nano-emulsion to deter casts on turf without harming worms.'
}
],
'earwig': [
{
'name': 'Neem Oil',
'image_url': 'https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQWQKVOjI4FH_C5djSIgfiQXLtJPa27kiaWcA&s',
'link': 'https://environmentalfactor.com/natural-remedies-for-earwig-control-safe-and-eco-friendly-solutions/?utm_source=chatgpt.com',
'description': 'Botanical insecticide/repellent effective against earwigs and other pests.'
}
],
'default': [
{
'name': 'General Garden Pest Control',
'image_url': 'https://placehold.co/150x150/f0f9ff/082f49?text=Pest+Spray',
'link': '#',
'description': 'Multi-purpose insecticideβ€”always read the label before use.'
}
]
}
# ---------------------------------------------------------
# Utility: model inference (identity_uint8 + global threshold)
# ---------------------------------------------------------
def model_predict(img_path):
"""
Loads image -> uint8 array -> model -> softmax if needed.
Applies global threshold GLOBAL_TAU; returns (label, confidence) or (None, confidence).
"""
if pest_model is None:
raise Exception("Model is not loaded.")
img_obj = image.load_img(img_path, target_size=IMG_SIZE)
x = image.img_to_array(img_obj) # keep uint8 [0..255], as diagnostics showed
x = np.expand_dims(x, axis=0) # (1, H, W, C)
preds = pest_model.predict(x, verbose=0) # logits or probs
# convert to probabilities if needed
if preds.ndim == 2 and not np.allclose(preds.sum(axis=1, keepdims=True), 1.0, atol=1e-3):
probs = tf.nn.softmax(preds, axis=1).numpy()
else:
probs = preds
i = int(np.argmax(probs[0]))
conf = float(probs[0][i])
label = CLASS_NAMES[i]
if conf < float(GLOBAL_TAU):
return None, conf
return label, conf
# ---------------------------------------------------------
# 4) Routes
# ---------------------------------------------------------
@app.route('/', methods=['GET'])
def index():
if not session.get('user'):
return redirect(url_for('welcome_page'))
return render_template('index.html', username=session.get('user'))
# Upload/predict
ALLOWED_EXTS = {"png", "jpg", "jpeg", "webp"}
@app.route('/predict', methods=['POST'])
def upload():
if pest_model is None:
return jsonify({'error': 'Model not loaded.'}), 500
file = request.files.get('file')
if not file or file.filename == '':
return jsonify({'error': 'No file provided'}), 400
orig_name = secure_filename(file.filename)
_, ext = os.path.splitext(orig_name)
ext = (ext or '').lower().lstrip('.')
if ext not in ALLOWED_EXTS:
return jsonify({'error': f'Unsupported file type: .{ext}. Allowed: {", ".join(sorted(ALLOWED_EXTS))}'}), 400
unique_name = f"{uuid4().hex}.{ext}"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_name)
try:
file.save(filepath)
label, confidence = model_predict(filepath)
print(f"Prediction result: {label} (confidence={confidence:.4f})")
if label is None:
# do NOT clear other sessions
msg = f"No confident pest detected (confidence={confidence:.2f}). Please upload a clearer pest image."
return jsonify({
'success': False,
'message': msg,
'confidence': float(confidence),
'image_url': f'/files/uploads/{unique_name}'
}), 200
# === NEW: remember this user's last prediction (pest + image) ===
_username = session.get('user') or "__anon__"
latest_pred_by_user[_username] = {
"pest": str(label),
"image_url": f"/files/uploads/{unique_name}",
"ts": datetime.utcnow().isoformat()
}
# Clear only current user's sessions (fresh context)
_clear_user_sessions(session.get('user'))
return jsonify({
'success': True,
'prediction': str(label),
'confidence': float(confidence),
'image_url': f'/files/uploads/{unique_name}'
}), 200
except Exception as e:
print(f"Prediction Error: {e}")
return jsonify({'error': f'Prediction failed: {e}'}), 500
# Recommendations
@app.route('/get_recommendations/<pest_name>')
def get_recommendations(pest_name):
pest_name_lower = (pest_name or '').lower().strip()
recommendations = PRODUCT_RECOMMENDATIONS.get(pest_name_lower, PRODUCT_RECOMMENDATIONS['default'])
return jsonify(recommendations)
# Optional: default recommendations route (in case frontend calls without pest)
@app.route('/get_recommendations')
def get_recommendations_default():
return jsonify(PRODUCT_RECOMMENDATIONS['default'])
# Chatbot (Gemini)
def _chat_key(username: str, pest: str) -> str:
u = username or "__anon__"
return f"{u}:{(pest or '').lower()}"
def _clear_user_sessions(username: str):
if not username:
return
prefix = f"{username}:".lower()
to_del = [k for k in chat_sessions.keys() if k.lower().startswith(prefix)]
for k in to_del:
del chat_sessions[k]
if to_del:
print(f"🧹 Cleared {len(to_del)} chat session(s) for user '{username}'")
@app.route('/chatbot', methods=['POST'])
def chatbot_response():
# Ensure RAG/gemini is available
if not rag_model or not file_references:
return jsonify({'response': 'Chatbot service is unavailable.', 'error': 'CONFIG_ERROR'}), 503
data = request.get_json(silent=True) or {}
user_message = data.get('message')
pest_type = data.get('pest_type')
if not user_message or not pest_type:
return jsonify({'response': 'Missing message or pest type.'}), 400
# Use Flask session username if available; fall back to anon
username = session.get('user') or "__anon__"
chat_key = f"{username}:{pest_type}".lower()
try:
# --- FIRST TURN: RAG over the pest-specific document ---
if chat_key not in chat_sessions:
app.logger.debug("New conversation for '%s' (user=%s). Using RAG.", pest_type, username)
file_uri = file_references.get(pest_type.lower())
if not file_uri:
return jsonify({'response': f"Sorry, no document found for '{pest_type}'."}), 404
# prepare file resource and prompt
file_id = file_uri.split('/')[-1]
file_resource_name = f'files/{file_id}'
file_to_use = genai.get_file(name=file_resource_name)
prompt = [
f"Answer the following question based only on the provided document about '{pest_type}'.",
file_to_use,
f"Question: {user_message}"
]
# RAG generate
response = rag_model.generate_content(prompt)
text = getattr(response, "text", None) or (response.get("text") if isinstance(response, dict) else str(response))
# create the interactive chat object seeded with the first exchange
chat = rag_model.start_chat(history=[
{'role': 'user', 'parts': [user_message]},
{'role': 'model', 'parts': [text]}
])
# save the chat object for future follow-ups (keyed by user+pest)
chat_sessions[chat_key] = chat
# attach last prediction image if it matches this pest
lp = latest_pred_by_user.get(username or "__anon__")
img_for_session = None
if lp and (lp.get("pest","").lower() == pest_type.lower()):
img_for_session = lp.get("image_url")
# persist history and return session id (now with image_url)
sid = _make_session(username, pest_type, image_url=img_for_session)
_append_history(username, pest_type, "user", user_message, session_id=sid)
_append_history(username, pest_type, "assistant", text, session_id=sid)
return jsonify({'response': text, 'pest_type': pest_type, 'session_id': sid})
# --- FOLLOW-UP turns: continue the conversation with the saved chat object ---
else:
app.logger.debug("Continuing conversation for '%s' (user=%s).", pest_type, username)
chat = chat_sessions[chat_key]
# find or create a session id to append to
sid = _find_session_id_for_user_pest(username, pest_type)
if sid is None:
# attach last prediction image if it matches this pest
lp = latest_pred_by_user.get(username or "__anon__")
img_for_session = None
if lp and (lp.get("pest","").lower() == pest_type.lower()):
img_for_session = lp.get("image_url")
sid = _make_session(username, pest_type, image_url=img_for_session)
# append user message to history
_append_history(username, pest_type, "user", user_message, session_id=sid)
# send the message via the interactive chat object
response = chat.send_message(user_message)
reply_text = getattr(response, "text", None) or (response.get("text") if isinstance(response, dict) else str(response))
# append assistant reply to history
_append_history(username, pest_type, "assistant", reply_text, session_id=sid)
return jsonify({'response': reply_text, 'pest_type': pest_type, 'session_id': sid})
except Exception as e:
error_message = str(e)
app.logger.exception("❌ Gemini API call failed")
return jsonify({'response': f'API Call Failed: {error_message}', 'error_type': 'API_CALL_FAIL'}), 500
# ---------------------------------------------------------
# 5) Auth (Excel-based; dev/demo only)
# ---------------------------------------------------------
USERS_FILE = os.path.join(os.getcwd(), "users.xlsx")
_users_lock = threading.Lock()
def _ensure_users_file():
if not os.path.exists(USERS_FILE):
df = pd.DataFrame(columns=["username", "password_hash"])
df.to_excel(USERS_FILE, index=False)
def read_users_df():
_ensure_users_file()
try:
df = pd.read_excel(USERS_FILE, engine="openpyxl")
except Exception:
df = pd.DataFrame(columns=["username", "password_hash"])
df.to_excel(USERS_FILE, index=False)
return df
def username_exists(username: str) -> bool:
df = read_users_df()
if 'username' not in df.columns:
return False
return username.lower().strip() in df['username'].str.lower().str.strip().tolist()
def add_user(username: str, password: str) -> bool:
with _users_lock:
df = read_users_df()
if 'username' in df.columns and username.lower().strip() in df['username'].str.lower().str.strip().tolist():
return False
hashed = generate_password_hash(password)
new_row = {"username": username.strip(), "password_hash": hashed}
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
df.to_excel(USERS_FILE, index=False)
return True
def verify_user(username: str, password: str) -> bool:
df = read_users_df()
if 'username' not in df.columns or 'password_hash' not in df.columns:
return False
match = df[df['username'].str.lower().str.strip() == username.lower().strip()]
if match.empty:
return False
stored_hash = match.iloc[0]['password_hash']
return check_password_hash(stored_hash, password)
@app.route("/home")
def home_redirect():
if session.get("user"):
return redirect(url_for("dashboard"))
return redirect(url_for("index"))
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
confirm = request.form.get("confirm_password", "")
if not username or not password:
flash("Please provide both username and password.", "danger")
return redirect(url_for("register"))
if len(password) < 6:
flash("Password should be at least 6 characters long.", "warning")
return redirect(url_for("register"))
if password != confirm:
flash("Password and confirm password do not match.", "warning")
return redirect(url_for("register"))
if username_exists(username):
flash("Username already exists. Please choose a different username or login.", "warning")
return redirect(url_for("register"))
ok = add_user(username, password)
if ok:
session['user'] = username
session.permanent = True
session.modified = True
print(f"βœ… REGISTER - Set session user: {username}")
print(f"βœ… REGISTER - Session after set: {dict(session)}") # Force save
flash(f"Welcome {username}! Account created successfully.", "success")
return redirect(url_for('dashboard'))
else:
flash("Could not create account. Try again.", "danger")
return redirect(url_for("register"))
return render_template("register.html", title="Register")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
if not username or not password:
flash("Please enter both username and password.", "warning")
return redirect(url_for("login"))
if not username_exists(username):
flash("User not existed. Please register.", "warning")
return redirect(url_for("register"))
if verify_user(username, password):
session['user'] = username
session.permanent = True
session.modified = True # Force save
print(f"βœ… LOGIN - Set session user: {username}")
print(f"βœ… LOGIN - Session after set: {dict(session)}")
flash(f"Welcome back {username}!", "success")
return redirect(url_for('dashboard')) # Go directly to dashboard
else:
flash("Incorrect password.", "danger")
return redirect(url_for("login"))
return render_template("login.html", title="Login")
@app.route("/dashboard")
def dashboard():
print(f"πŸ” DASHBOARD - Session user: {session.get('user')}")
print(f"πŸ” DASHBOARD - Full session: {dict(session)}")
if not session.get("user"):
flash("Please login first.", "warning")
return redirect(url_for("login"))
return render_template("chat_dashboard.html", title="Dashboard")
@app.route("/logout")
def logout():
session.pop("user", None)
flash("You have been logged out.", "info")
return redirect(url_for("login"))
@app.route('/welcome')
def welcome_page():
if session.get('user'):
return redirect(url_for('dashboard'))
return render_template('landing.html')
@app.route('/greeting')
def greeting():
username = request.args.get('username', 'friend')
mode = request.args.get('mode', 'welcome')
print(f"πŸ” GREETING - Session user: {session.get('user')}")
print(f"πŸ” GREETING - Username from args: {username}")
if mode == 'welcome':
title_text = f"Welcome aboard, {username}!"
subtitle_text = "We're delighted to have you. 🌱"
else:
title_text = f"Welcome back, {username}!"
subtitle_text = "Great to see you again β€” ready to identify and remedy pests? πŸ‘‹"
return render_template('greeting.html', title_text=title_text, subtitle_text=subtitle_text)
# Replace the existing /chat_history and /chat_history/<id> routes with these.
from flask import make_response
def _safe_username():
"""Return logged-in username or '__anon__' (same logic used elsewhere)."""
return session.get('user') or "__anon__"
@app.route('/chat_history', methods=['GET'])
def list_chat_history():
"""Return light-weight listing of sessions for the current logged-in user.
Uses the same in-memory user_sessions store that _append_history/_make_session use.
"""
username = session.get('user')
if not username:
return jsonify({'error': 'not_logged_in'}), 401
with _user_sessions_lock:
sessions = user_sessions.get(username, [])[:]
# return small summary list (reverse so recent first)
lite = [{"id": s["id"], "pest": s["pest"], "created_at": s.get("created_at")} for s in sessions[::-1]]
return jsonify({'username': username, 'sessions': lite})
# Serve uploaded files from /data/uploads with a clean URL.
# e.g. /files/uploads/abc123.jpg
@app.route("/files/uploads/<path:filename>")
def serve_uploaded_file(filename):
return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=False)
@app.route('/chat_history/<session_id>', methods=['GET'])
def get_chat_session(session_id):
"""Return the full session object (messages) for the logged-in user."""
username = session.get('user')
if not username:
return jsonify({'error': 'not_logged_in'}), 401
with _user_sessions_lock:
sessions = user_sessions.get(username, [])
for s in sessions:
if s.get('id') == session_id:
return jsonify(s)
return jsonify({'error': 'not_found'}), 404
# ---------------------------------------------------------
# Main
# ---------------------------------------------------------
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860)) # Hugging Face uses 7860 by default
app.run(host="0.0.0.0", port=port, debug=True)