import os import json import threading from uuid import uuid4 import numpy as np import pandas as pd from dotenv import load_dotenv from flask import ( Flask, render_template, request, jsonify, redirect, url_for, flash, session, send_from_directory # ← add this ) from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash # ML import tensorflow as tf from tensorflow.keras.models import load_model from tensorflow.keras.preprocessing import image # Gemini import google.generativeai as genai from datetime import datetime # in-memory chat history: username -> list of sessions # each session: {"id": "...", "pest": "wasp", "created_at": "...", "messages": [{"role": "user"/"assistant","text": "..."}]} # --------------------------------------------------------- # Boot # --------------------------------------------------------- load_dotenv() app = Flask(__name__) # Use a persistent secret key so Flask sessions survive restarts app.secret_key = os.getenv("FLASK_SECRET_KEY", "my-name-is-prabhat007") # Keep user sessions alive across reloads app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 1 day (in seconds) app.config['SESSION_PERMANENT'] = True #app.config['SESSION_COOKIE_SECURE'] = False # Required for HTTP (HF Spaces handles HTTPS) app.config['SESSION_COOKIE_HTTPONLY'] = True #app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # use HTTPS on HF spaces — set Secure=True and SameSite=None app.config['SESSION_COOKIE_SECURE'] = True app.config['SESSION_COOKIE_SAMESITE'] = 'None' @app.before_request def make_session_permanent(): session.permanent = True # --------------------------------------------------------- # 1) Model loading & config # --------------------------------------------------------- MODEL_PATH = 'dhanush_model.h5' CLASS_NAMES = [ 'ants', 'bees', 'beetle', 'caterpillar', 'earthworms', 'earwig', 'Grasshopper', 'moth', 'slug', 'snail', 'wasp', 'weevil' ] IMG_SIZE = (224, 224) DATA_DIR = os.environ.get("DATA_DIR", "/data") UPLOAD_FOLDER = os.path.join(DATA_DIR, "uploads") HISTORY_DIR = os.path.join(DATA_DIR, "chat_history") os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(HISTORY_DIR, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER # Load model try: pest_model = load_model(MODEL_PATH) print(f"✅ Model loaded successfully from {MODEL_PATH}") except Exception as e: print(f"❌ Error loading model: {e}") pest_model = None # Threshold (from diagnostics/thresholds.json if present) GLOBAL_TAU = 0.45 try: with open("diagnostics/thresholds.json", "r") as f: GLOBAL_TAU = float(json.load(f).get("global_tau", 0.0)) print(f"✅ Loaded global threshold τ = {GLOBAL_TAU:.3f}") except Exception as _: print("ℹ️ No diagnostics/thresholds.json found; using τ = 0.0") # --------------------------------------------------------- # 2) Gemini API & RAG setup # --------------------------------------------------------- # ===================== Gemini API & RAG setup ===================== # global chat sessions (per pest type) chat_sessions = {} # cache the most recent prediction per user, to attach image to first chat turn latest_pred_by_user = {} # username -> {"pest": str, "image_url": str, "ts": "..."} # Simple in-memory chat history store (for demo / dev) # Structure: user_sessions[username] -> list of sessions # session: { id, pest, created_at, messages: [ {role, text, ts} ] } user_sessions = {} _user_sessions_lock = threading.Lock() import glob def _history_path_for(username: str) -> str: safe = (username or "__anon__").replace(":", "_") return os.path.join(HISTORY_DIR, f"{safe}.json") def _persist_user(username: str): """Write this user's sessions to disk as JSON.""" if not username: username = "__anon__" path = _history_path_for(username) with _user_sessions_lock: data = user_sessions.get(username, []) try: with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"⚠️ Failed to persist history for {username}: {e}") def _load_all_users_on_boot(): """Load any previously saved sessions from disk into memory.""" loaded = 0 for p in glob.glob(os.path.join(HISTORY_DIR, "*.json")): try: with open(p, "r", encoding="utf-8") as f: sessions = json.load(f) username = os.path.splitext(os.path.basename(p))[0] username = "__anon__" if username == "__anon__" else username with _user_sessions_lock: user_sessions[username] = sessions loaded += 1 except Exception as e: print(f"⚠️ Failed to load history file {p}: {e}") print(f"📚 Loaded chat history for {loaded} user(s) from disk.") # Load any persisted chat history when the app starts _load_all_users_on_boot() def _make_session(username, pest, image_url=None): sid = f"{username}:{pest}:{uuid4().hex[:8]}" session = { "id": sid, "pest": pest, "created_at": datetime.utcnow().isoformat(), "image_url": image_url, # NEW: persist the prediction image for this chat "messages": [] } with _user_sessions_lock: user_sessions.setdefault(username, []).append(session) _persist_user(username) return sid def _find_session_id_for_user_pest(username, pest): """Return the most recent session id for user+pest or None.""" with _user_sessions_lock: sessions = user_sessions.get(username, []) for s in reversed(sessions): if (s.get("pest") or "").lower() == (pest or "").lower(): return s["id"] return None def _append_history(username, pest, role, text, session_id=None): """ Append a message to history. If session_id is None, create a new session for username+pest. Returns session_id used. """ if not username: username = "__anon__" with _user_sessions_lock: if session_id is None: # try to reuse most recent session for this user/pest; otherwise create new sid = _find_session_id_for_user_pest(username, pest) if sid is None: sid = _make_session(username, pest) else: sid = session_id # find session object sessions = user_sessions.setdefault(username, []) sess_obj = next((s for s in sessions if s["id"] == sid), None) if sess_obj is None: # if not found, create sid = _make_session(username, pest) sess_obj = next(s for s in user_sessions[username] if s["id"] == sid) sess_obj["messages"].append({"role": role, "text": text, "ts": datetime.utcnow().isoformat()}) _persist_user(username) return sid try: GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") if not GOOGLE_API_KEY: raise ValueError("GOOGLE_API_KEY not found in environment variables.") genai.configure(api_key=GOOGLE_API_KEY) # Use a widely available model name to avoid 404s rag_model = genai.GenerativeModel(model_name="gemini-2.0-flash-lite") print("✅ Gemini API configured successfully with 'gemini-2.5-pro'.") # Load the file references you created with upload_files.py with open('file_references.json', 'r') as f: file_references = json.load(f) print("✅ Document file references loaded.") except Exception as e: print(f"❌ Gemini API or File Reference Error: {e}") rag_model = None file_references = {} # ================================================================ # --------------------------------------------------------- # 3) Product recommendations (sample data) # --------------------------------------------------------- PRODUCT_RECOMMENDATIONS = { 'wasp': [ { 'name': 'Wasp & Hornet Killer Spray', 'image_url': 'https://placehold.co/150x150/e9d5ff/4c1d95?text=Wasp+Spray', 'link': '#', 'description': 'Fast-acting spray that kills wasps, hornets, and yellow jackets on contact.' }, { 'name': 'Reusable Wasp Trap', 'image_url': 'https://placehold.co/150x150/dbeafe/1e3a8a?text=Wasp+Trap', 'link': '#', 'description': 'Eco-friendly trap to lure and capture wasps without harmful chemicals.' } ], 'ants': [ { 'name': 'Ants', 'image_url': "https://m.media-amazon.com/images/I/71HxltbUHUL._UF350,350_QL80_.jpg", 'link': 'https://www.syngentapmp.com/product/advion-ant-gel-insecticide', 'description': 'Professional gel bait for colony control (good for sweet- and protein-feeding ants).' } ], 'caterpillar': [ { 'name': 'Bacillus Thuringiensis (BT) Spray', 'image_url': 'https://m.media-amazon.com/images/I/61r5VSrCMqL._UF1000,1000_QL80_.jpg', 'link': 'https://www.amazon.in/insecticide-for-caterpillar/s?k=insecticide+for+caterpillar&utm_source=chatgpt.com', 'description': 'Natural pesticide effective against caterpillars; safe for most beneficials.' } ], 'beetle': [ { 'name': 'Rhinoceros Beetle Lure', 'image_url': 'https://lilaagrotech.com/wp-content/uploads/2024/08/Live_Earth-Worms-removebg-preview.webp', 'link': 'https://www.bighaat.com/products/rhinoceros-beetle-lure-pest-control-india', 'description': 'A lure to attract and trap rhinoceros beetles in orchards.' } ], 'earthworms': [ { 'name': 'Aquaritin Shield', 'image_url': 'https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTtkun_yTRqurn0uQ4L9zEN6zmlWr29_OvnbA&s', 'link': 'https://www.kisaantrade.com/product/aquaritin-shield-turf-s?utm_source=chatgpt.com', 'description': 'Biodegradable nano-emulsion to deter casts on turf without harming worms.' } ], 'earwig': [ { 'name': 'Neem Oil', 'image_url': 'https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQWQKVOjI4FH_C5djSIgfiQXLtJPa27kiaWcA&s', 'link': 'https://environmentalfactor.com/natural-remedies-for-earwig-control-safe-and-eco-friendly-solutions/?utm_source=chatgpt.com', 'description': 'Botanical insecticide/repellent effective against earwigs and other pests.' } ], 'default': [ { 'name': 'General Garden Pest Control', 'image_url': 'https://placehold.co/150x150/f0f9ff/082f49?text=Pest+Spray', 'link': '#', 'description': 'Multi-purpose insecticide—always read the label before use.' } ] } # --------------------------------------------------------- # Utility: model inference (identity_uint8 + global threshold) # --------------------------------------------------------- def model_predict(img_path): """ Loads image -> uint8 array -> model -> softmax if needed. Applies global threshold GLOBAL_TAU; returns (label, confidence) or (None, confidence). """ if pest_model is None: raise Exception("Model is not loaded.") img_obj = image.load_img(img_path, target_size=IMG_SIZE) x = image.img_to_array(img_obj) # keep uint8 [0..255], as diagnostics showed x = np.expand_dims(x, axis=0) # (1, H, W, C) preds = pest_model.predict(x, verbose=0) # logits or probs # convert to probabilities if needed if preds.ndim == 2 and not np.allclose(preds.sum(axis=1, keepdims=True), 1.0, atol=1e-3): probs = tf.nn.softmax(preds, axis=1).numpy() else: probs = preds i = int(np.argmax(probs[0])) conf = float(probs[0][i]) label = CLASS_NAMES[i] if conf < float(GLOBAL_TAU): return None, conf return label, conf # --------------------------------------------------------- # 4) Routes # --------------------------------------------------------- @app.route('/', methods=['GET']) def index(): if not session.get('user'): return redirect(url_for('welcome_page')) return render_template('index.html', username=session.get('user')) # Upload/predict ALLOWED_EXTS = {"png", "jpg", "jpeg", "webp"} @app.route('/predict', methods=['POST']) def upload(): if pest_model is None: return jsonify({'error': 'Model not loaded.'}), 500 file = request.files.get('file') if not file or file.filename == '': return jsonify({'error': 'No file provided'}), 400 orig_name = secure_filename(file.filename) _, ext = os.path.splitext(orig_name) ext = (ext or '').lower().lstrip('.') if ext not in ALLOWED_EXTS: return jsonify({'error': f'Unsupported file type: .{ext}. Allowed: {", ".join(sorted(ALLOWED_EXTS))}'}), 400 unique_name = f"{uuid4().hex}.{ext}" filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_name) try: file.save(filepath) label, confidence = model_predict(filepath) print(f"Prediction result: {label} (confidence={confidence:.4f})") if label is None: # do NOT clear other sessions msg = f"No confident pest detected (confidence={confidence:.2f}). Please upload a clearer pest image." return jsonify({ 'success': False, 'message': msg, 'confidence': float(confidence), 'image_url': f'/files/uploads/{unique_name}' }), 200 # === NEW: remember this user's last prediction (pest + image) === _username = session.get('user') or "__anon__" latest_pred_by_user[_username] = { "pest": str(label), "image_url": f"/files/uploads/{unique_name}", "ts": datetime.utcnow().isoformat() } # Clear only current user's sessions (fresh context) _clear_user_sessions(session.get('user')) return jsonify({ 'success': True, 'prediction': str(label), 'confidence': float(confidence), 'image_url': f'/files/uploads/{unique_name}' }), 200 except Exception as e: print(f"Prediction Error: {e}") return jsonify({'error': f'Prediction failed: {e}'}), 500 # Recommendations @app.route('/get_recommendations/') def get_recommendations(pest_name): pest_name_lower = (pest_name or '').lower().strip() recommendations = PRODUCT_RECOMMENDATIONS.get(pest_name_lower, PRODUCT_RECOMMENDATIONS['default']) return jsonify(recommendations) # Optional: default recommendations route (in case frontend calls without pest) @app.route('/get_recommendations') def get_recommendations_default(): return jsonify(PRODUCT_RECOMMENDATIONS['default']) # Chatbot (Gemini) def _chat_key(username: str, pest: str) -> str: u = username or "__anon__" return f"{u}:{(pest or '').lower()}" def _clear_user_sessions(username: str): if not username: return prefix = f"{username}:".lower() to_del = [k for k in chat_sessions.keys() if k.lower().startswith(prefix)] for k in to_del: del chat_sessions[k] if to_del: print(f"🧹 Cleared {len(to_del)} chat session(s) for user '{username}'") @app.route('/chatbot', methods=['POST']) def chatbot_response(): # Ensure RAG/gemini is available if not rag_model or not file_references: return jsonify({'response': 'Chatbot service is unavailable.', 'error': 'CONFIG_ERROR'}), 503 data = request.get_json(silent=True) or {} user_message = data.get('message') pest_type = data.get('pest_type') if not user_message or not pest_type: return jsonify({'response': 'Missing message or pest type.'}), 400 # Use Flask session username if available; fall back to anon username = session.get('user') or "__anon__" chat_key = f"{username}:{pest_type}".lower() try: # --- FIRST TURN: RAG over the pest-specific document --- if chat_key not in chat_sessions: app.logger.debug("New conversation for '%s' (user=%s). Using RAG.", pest_type, username) file_uri = file_references.get(pest_type.lower()) if not file_uri: return jsonify({'response': f"Sorry, no document found for '{pest_type}'."}), 404 # prepare file resource and prompt file_id = file_uri.split('/')[-1] file_resource_name = f'files/{file_id}' file_to_use = genai.get_file(name=file_resource_name) prompt = [ f"Answer the following question based only on the provided document about '{pest_type}'.", file_to_use, f"Question: {user_message}" ] # RAG generate response = rag_model.generate_content(prompt) text = getattr(response, "text", None) or (response.get("text") if isinstance(response, dict) else str(response)) # create the interactive chat object seeded with the first exchange chat = rag_model.start_chat(history=[ {'role': 'user', 'parts': [user_message]}, {'role': 'model', 'parts': [text]} ]) # save the chat object for future follow-ups (keyed by user+pest) chat_sessions[chat_key] = chat # attach last prediction image if it matches this pest lp = latest_pred_by_user.get(username or "__anon__") img_for_session = None if lp and (lp.get("pest","").lower() == pest_type.lower()): img_for_session = lp.get("image_url") # persist history and return session id (now with image_url) sid = _make_session(username, pest_type, image_url=img_for_session) _append_history(username, pest_type, "user", user_message, session_id=sid) _append_history(username, pest_type, "assistant", text, session_id=sid) return jsonify({'response': text, 'pest_type': pest_type, 'session_id': sid}) # --- FOLLOW-UP turns: continue the conversation with the saved chat object --- else: app.logger.debug("Continuing conversation for '%s' (user=%s).", pest_type, username) chat = chat_sessions[chat_key] # find or create a session id to append to sid = _find_session_id_for_user_pest(username, pest_type) if sid is None: # attach last prediction image if it matches this pest lp = latest_pred_by_user.get(username or "__anon__") img_for_session = None if lp and (lp.get("pest","").lower() == pest_type.lower()): img_for_session = lp.get("image_url") sid = _make_session(username, pest_type, image_url=img_for_session) # append user message to history _append_history(username, pest_type, "user", user_message, session_id=sid) # send the message via the interactive chat object response = chat.send_message(user_message) reply_text = getattr(response, "text", None) or (response.get("text") if isinstance(response, dict) else str(response)) # append assistant reply to history _append_history(username, pest_type, "assistant", reply_text, session_id=sid) return jsonify({'response': reply_text, 'pest_type': pest_type, 'session_id': sid}) except Exception as e: error_message = str(e) app.logger.exception("❌ Gemini API call failed") return jsonify({'response': f'API Call Failed: {error_message}', 'error_type': 'API_CALL_FAIL'}), 500 # --------------------------------------------------------- # 5) Auth (Excel-based; dev/demo only) # --------------------------------------------------------- USERS_FILE = os.path.join(os.getcwd(), "users.xlsx") _users_lock = threading.Lock() def _ensure_users_file(): if not os.path.exists(USERS_FILE): df = pd.DataFrame(columns=["username", "password_hash"]) df.to_excel(USERS_FILE, index=False) def read_users_df(): _ensure_users_file() try: df = pd.read_excel(USERS_FILE, engine="openpyxl") except Exception: df = pd.DataFrame(columns=["username", "password_hash"]) df.to_excel(USERS_FILE, index=False) return df def username_exists(username: str) -> bool: df = read_users_df() if 'username' not in df.columns: return False return username.lower().strip() in df['username'].str.lower().str.strip().tolist() def add_user(username: str, password: str) -> bool: with _users_lock: df = read_users_df() if 'username' in df.columns and username.lower().strip() in df['username'].str.lower().str.strip().tolist(): return False hashed = generate_password_hash(password) new_row = {"username": username.strip(), "password_hash": hashed} df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) df.to_excel(USERS_FILE, index=False) return True def verify_user(username: str, password: str) -> bool: df = read_users_df() if 'username' not in df.columns or 'password_hash' not in df.columns: return False match = df[df['username'].str.lower().str.strip() == username.lower().strip()] if match.empty: return False stored_hash = match.iloc[0]['password_hash'] return check_password_hash(stored_hash, password) @app.route("/home") def home_redirect(): if session.get("user"): return redirect(url_for("dashboard")) return redirect(url_for("index")) @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "") confirm = request.form.get("confirm_password", "") if not username or not password: flash("Please provide both username and password.", "danger") return redirect(url_for("register")) if len(password) < 6: flash("Password should be at least 6 characters long.", "warning") return redirect(url_for("register")) if password != confirm: flash("Password and confirm password do not match.", "warning") return redirect(url_for("register")) if username_exists(username): flash("Username already exists. Please choose a different username or login.", "warning") return redirect(url_for("register")) ok = add_user(username, password) if ok: session['user'] = username session.permanent = True session.modified = True print(f"✅ REGISTER - Set session user: {username}") print(f"✅ REGISTER - Session after set: {dict(session)}") # Force save flash(f"Welcome {username}! Account created successfully.", "success") return redirect(url_for('dashboard')) else: flash("Could not create account. Try again.", "danger") return redirect(url_for("register")) return render_template("register.html", title="Register") @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "") if not username or not password: flash("Please enter both username and password.", "warning") return redirect(url_for("login")) if not username_exists(username): flash("User not existed. Please register.", "warning") return redirect(url_for("register")) if verify_user(username, password): session['user'] = username session.permanent = True session.modified = True # Force save print(f"✅ LOGIN - Set session user: {username}") print(f"✅ LOGIN - Session after set: {dict(session)}") flash(f"Welcome back {username}!", "success") return redirect(url_for('dashboard')) # Go directly to dashboard else: flash("Incorrect password.", "danger") return redirect(url_for("login")) return render_template("login.html", title="Login") @app.route("/dashboard") def dashboard(): print(f"🔍 DASHBOARD - Session user: {session.get('user')}") print(f"🔍 DASHBOARD - Full session: {dict(session)}") if not session.get("user"): flash("Please login first.", "warning") return redirect(url_for("login")) return render_template("chat_dashboard.html", title="Dashboard") @app.route("/logout") def logout(): session.pop("user", None) flash("You have been logged out.", "info") return redirect(url_for("login")) @app.route('/welcome') def welcome_page(): if session.get('user'): return redirect(url_for('dashboard')) return render_template('landing.html') @app.route('/greeting') def greeting(): username = request.args.get('username', 'friend') mode = request.args.get('mode', 'welcome') print(f"🔍 GREETING - Session user: {session.get('user')}") print(f"🔍 GREETING - Username from args: {username}") if mode == 'welcome': title_text = f"Welcome aboard, {username}!" subtitle_text = "We're delighted to have you. 🌱" else: title_text = f"Welcome back, {username}!" subtitle_text = "Great to see you again — ready to identify and remedy pests? 👋" return render_template('greeting.html', title_text=title_text, subtitle_text=subtitle_text) # Replace the existing /chat_history and /chat_history/ routes with these. from flask import make_response def _safe_username(): """Return logged-in username or '__anon__' (same logic used elsewhere).""" return session.get('user') or "__anon__" @app.route('/chat_history', methods=['GET']) def list_chat_history(): """Return light-weight listing of sessions for the current logged-in user. Uses the same in-memory user_sessions store that _append_history/_make_session use. """ username = session.get('user') if not username: return jsonify({'error': 'not_logged_in'}), 401 with _user_sessions_lock: sessions = user_sessions.get(username, [])[:] # return small summary list (reverse so recent first) lite = [{"id": s["id"], "pest": s["pest"], "created_at": s.get("created_at")} for s in sessions[::-1]] return jsonify({'username': username, 'sessions': lite}) # Serve uploaded files from /data/uploads with a clean URL. # e.g. /files/uploads/abc123.jpg @app.route("/files/uploads/") def serve_uploaded_file(filename): return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=False) @app.route('/chat_history/', methods=['GET']) def get_chat_session(session_id): """Return the full session object (messages) for the logged-in user.""" username = session.get('user') if not username: return jsonify({'error': 'not_logged_in'}), 401 with _user_sessions_lock: sessions = user_sessions.get(username, []) for s in sessions: if s.get('id') == session_id: return jsonify(s) return jsonify({'error': 'not_found'}), 404 # --------------------------------------------------------- # Main # --------------------------------------------------------- if __name__ == '__main__': port = int(os.environ.get("PORT", 7860)) # Hugging Face uses 7860 by default app.run(host="0.0.0.0", port=port, debug=True)