Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import threading | |
| from uuid import uuid4 | |
| import numpy as np | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| from flask import ( | |
| Flask, render_template, request, jsonify, | |
| redirect, url_for, flash, session, send_from_directory # β add this | |
| ) | |
| from werkzeug.utils import secure_filename | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| # ML | |
| import tensorflow as tf | |
| from tensorflow.keras.models import load_model | |
| from tensorflow.keras.preprocessing import image | |
| # Gemini | |
| import google.generativeai as genai | |
| from datetime import datetime | |
| # in-memory chat history: username -> list of sessions | |
| # each session: {"id": "...", "pest": "wasp", "created_at": "...", "messages": [{"role": "user"/"assistant","text": "..."}]} | |
| # --------------------------------------------------------- | |
| # Boot | |
| # --------------------------------------------------------- | |
| load_dotenv() | |
| app = Flask(__name__) | |
| # Use a persistent secret key so Flask sessions survive restarts | |
| app.secret_key = os.getenv("FLASK_SECRET_KEY", "my-name-is-prabhat007") | |
| # Keep user sessions alive across reloads | |
| app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 1 day (in seconds) | |
| app.config['SESSION_PERMANENT'] = True | |
| #app.config['SESSION_COOKIE_SECURE'] = False # Required for HTTP (HF Spaces handles HTTPS) | |
| app.config['SESSION_COOKIE_HTTPONLY'] = True | |
| #app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' | |
| # use HTTPS on HF spaces β set Secure=True and SameSite=None | |
| app.config['SESSION_COOKIE_SECURE'] = True | |
| app.config['SESSION_COOKIE_SAMESITE'] = 'None' | |
| def make_session_permanent(): | |
| session.permanent = True | |
| # --------------------------------------------------------- | |
| # 1) Model loading & config | |
| # --------------------------------------------------------- | |
| MODEL_PATH = 'dhanush_model.h5' | |
| CLASS_NAMES = [ | |
| 'ants', 'bees', 'beetle', 'caterpillar', 'earthworms', 'earwig', | |
| 'Grasshopper', 'moth', 'slug', 'snail', 'wasp', 'weevil' | |
| ] | |
| IMG_SIZE = (224, 224) | |
| DATA_DIR = os.environ.get("DATA_DIR", "/data") | |
| UPLOAD_FOLDER = os.path.join(DATA_DIR, "uploads") | |
| HISTORY_DIR = os.path.join(DATA_DIR, "chat_history") | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(HISTORY_DIR, exist_ok=True) | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| # Load model | |
| try: | |
| pest_model = load_model(MODEL_PATH) | |
| print(f"β Model loaded successfully from {MODEL_PATH}") | |
| except Exception as e: | |
| print(f"β Error loading model: {e}") | |
| pest_model = None | |
| # Threshold (from diagnostics/thresholds.json if present) | |
| GLOBAL_TAU = 0.45 | |
| try: | |
| with open("diagnostics/thresholds.json", "r") as f: | |
| GLOBAL_TAU = float(json.load(f).get("global_tau", 0.0)) | |
| print(f"β Loaded global threshold Ο = {GLOBAL_TAU:.3f}") | |
| except Exception as _: | |
| print("βΉοΈ No diagnostics/thresholds.json found; using Ο = 0.0") | |
| # --------------------------------------------------------- | |
| # 2) Gemini API & RAG setup | |
| # --------------------------------------------------------- | |
| # ===================== Gemini API & RAG setup ===================== | |
| # global chat sessions (per pest type) | |
| chat_sessions = {} | |
| # cache the most recent prediction per user, to attach image to first chat turn | |
| latest_pred_by_user = {} # username -> {"pest": str, "image_url": str, "ts": "..."} | |
| # Simple in-memory chat history store (for demo / dev) | |
| # Structure: user_sessions[username] -> list of sessions | |
| # session: { id, pest, created_at, messages: [ {role, text, ts} ] } | |
| user_sessions = {} | |
| _user_sessions_lock = threading.Lock() | |
| import glob | |
| def _history_path_for(username: str) -> str: | |
| safe = (username or "__anon__").replace(":", "_") | |
| return os.path.join(HISTORY_DIR, f"{safe}.json") | |
| def _persist_user(username: str): | |
| """Write this user's sessions to disk as JSON.""" | |
| if not username: | |
| username = "__anon__" | |
| path = _history_path_for(username) | |
| with _user_sessions_lock: | |
| data = user_sessions.get(username, []) | |
| try: | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print(f"β οΈ Failed to persist history for {username}: {e}") | |
| def _load_all_users_on_boot(): | |
| """Load any previously saved sessions from disk into memory.""" | |
| loaded = 0 | |
| for p in glob.glob(os.path.join(HISTORY_DIR, "*.json")): | |
| try: | |
| with open(p, "r", encoding="utf-8") as f: | |
| sessions = json.load(f) | |
| username = os.path.splitext(os.path.basename(p))[0] | |
| username = "__anon__" if username == "__anon__" else username | |
| with _user_sessions_lock: | |
| user_sessions[username] = sessions | |
| loaded += 1 | |
| except Exception as e: | |
| print(f"β οΈ Failed to load history file {p}: {e}") | |
| print(f"π Loaded chat history for {loaded} user(s) from disk.") | |
| # Load any persisted chat history when the app starts | |
| _load_all_users_on_boot() | |
| def _make_session(username, pest, image_url=None): | |
| sid = f"{username}:{pest}:{uuid4().hex[:8]}" | |
| session = { | |
| "id": sid, | |
| "pest": pest, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "image_url": image_url, # NEW: persist the prediction image for this chat | |
| "messages": [] | |
| } | |
| with _user_sessions_lock: | |
| user_sessions.setdefault(username, []).append(session) | |
| _persist_user(username) | |
| return sid | |
| def _find_session_id_for_user_pest(username, pest): | |
| """Return the most recent session id for user+pest or None.""" | |
| with _user_sessions_lock: | |
| sessions = user_sessions.get(username, []) | |
| for s in reversed(sessions): | |
| if (s.get("pest") or "").lower() == (pest or "").lower(): | |
| return s["id"] | |
| return None | |
| def _append_history(username, pest, role, text, session_id=None): | |
| """ | |
| Append a message to history. | |
| If session_id is None, create a new session for username+pest. | |
| Returns session_id used. | |
| """ | |
| if not username: | |
| username = "__anon__" | |
| with _user_sessions_lock: | |
| if session_id is None: | |
| # try to reuse most recent session for this user/pest; otherwise create new | |
| sid = _find_session_id_for_user_pest(username, pest) | |
| if sid is None: | |
| sid = _make_session(username, pest) | |
| else: | |
| sid = session_id | |
| # find session object | |
| sessions = user_sessions.setdefault(username, []) | |
| sess_obj = next((s for s in sessions if s["id"] == sid), None) | |
| if sess_obj is None: | |
| # if not found, create | |
| sid = _make_session(username, pest) | |
| sess_obj = next(s for s in user_sessions[username] if s["id"] == sid) | |
| sess_obj["messages"].append({"role": role, "text": text, "ts": datetime.utcnow().isoformat()}) | |
| _persist_user(username) | |
| return sid | |
| try: | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| if not GOOGLE_API_KEY: | |
| raise ValueError("GOOGLE_API_KEY not found in environment variables.") | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| # Use a widely available model name to avoid 404s | |
| rag_model = genai.GenerativeModel(model_name="gemini-2.0-flash-lite") | |
| print("β Gemini API configured successfully with 'gemini-2.5-pro'.") | |
| # Load the file references you created with upload_files.py | |
| with open('file_references.json', 'r') as f: | |
| file_references = json.load(f) | |
| print("β Document file references loaded.") | |
| except Exception as e: | |
| print(f"β Gemini API or File Reference Error: {e}") | |
| rag_model = None | |
| file_references = {} | |
| # ================================================================ | |
| # --------------------------------------------------------- | |
| # 3) Product recommendations (sample data) | |
| # --------------------------------------------------------- | |
| PRODUCT_RECOMMENDATIONS = { | |
| 'wasp': [ | |
| { | |
| 'name': 'Wasp & Hornet Killer Spray', | |
| 'image_url': 'https://placehold.co/150x150/e9d5ff/4c1d95?text=Wasp+Spray', | |
| 'link': '#', | |
| 'description': 'Fast-acting spray that kills wasps, hornets, and yellow jackets on contact.' | |
| }, | |
| { | |
| 'name': 'Reusable Wasp Trap', | |
| 'image_url': 'https://placehold.co/150x150/dbeafe/1e3a8a?text=Wasp+Trap', | |
| 'link': '#', | |
| 'description': 'Eco-friendly trap to lure and capture wasps without harmful chemicals.' | |
| } | |
| ], | |
| 'ants': [ | |
| { | |
| 'name': 'Ants', | |
| 'image_url': "https://m.media-amazon.com/images/I/71HxltbUHUL._UF350,350_QL80_.jpg", | |
| 'link': 'https://www.syngentapmp.com/product/advion-ant-gel-insecticide', | |
| 'description': 'Professional gel bait for colony control (good for sweet- and protein-feeding ants).' | |
| } | |
| ], | |
| 'caterpillar': [ | |
| { | |
| 'name': 'Bacillus Thuringiensis (BT) Spray', | |
| 'image_url': 'https://m.media-amazon.com/images/I/61r5VSrCMqL._UF1000,1000_QL80_.jpg', | |
| 'link': 'https://www.amazon.in/insecticide-for-caterpillar/s?k=insecticide+for+caterpillar&utm_source=chatgpt.com', | |
| 'description': 'Natural pesticide effective against caterpillars; safe for most beneficials.' | |
| } | |
| ], | |
| 'beetle': [ | |
| { | |
| 'name': 'Rhinoceros Beetle Lure', | |
| 'image_url': 'https://lilaagrotech.com/wp-content/uploads/2024/08/Live_Earth-Worms-removebg-preview.webp', | |
| 'link': 'https://www.bighaat.com/products/rhinoceros-beetle-lure-pest-control-india', | |
| 'description': 'A lure to attract and trap rhinoceros beetles in orchards.' | |
| } | |
| ], | |
| 'earthworms': [ | |
| { | |
| 'name': 'Aquaritin Shield', | |
| 'image_url': 'https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTtkun_yTRqurn0uQ4L9zEN6zmlWr29_OvnbA&s', | |
| 'link': 'https://www.kisaantrade.com/product/aquaritin-shield-turf-s?utm_source=chatgpt.com', | |
| 'description': 'Biodegradable nano-emulsion to deter casts on turf without harming worms.' | |
| } | |
| ], | |
| 'earwig': [ | |
| { | |
| 'name': 'Neem Oil', | |
| 'image_url': 'https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQWQKVOjI4FH_C5djSIgfiQXLtJPa27kiaWcA&s', | |
| 'link': 'https://environmentalfactor.com/natural-remedies-for-earwig-control-safe-and-eco-friendly-solutions/?utm_source=chatgpt.com', | |
| 'description': 'Botanical insecticide/repellent effective against earwigs and other pests.' | |
| } | |
| ], | |
| 'default': [ | |
| { | |
| 'name': 'General Garden Pest Control', | |
| 'image_url': 'https://placehold.co/150x150/f0f9ff/082f49?text=Pest+Spray', | |
| 'link': '#', | |
| 'description': 'Multi-purpose insecticideβalways read the label before use.' | |
| } | |
| ] | |
| } | |
| # --------------------------------------------------------- | |
| # Utility: model inference (identity_uint8 + global threshold) | |
| # --------------------------------------------------------- | |
| def model_predict(img_path): | |
| """ | |
| Loads image -> uint8 array -> model -> softmax if needed. | |
| Applies global threshold GLOBAL_TAU; returns (label, confidence) or (None, confidence). | |
| """ | |
| if pest_model is None: | |
| raise Exception("Model is not loaded.") | |
| img_obj = image.load_img(img_path, target_size=IMG_SIZE) | |
| x = image.img_to_array(img_obj) # keep uint8 [0..255], as diagnostics showed | |
| x = np.expand_dims(x, axis=0) # (1, H, W, C) | |
| preds = pest_model.predict(x, verbose=0) # logits or probs | |
| # convert to probabilities if needed | |
| if preds.ndim == 2 and not np.allclose(preds.sum(axis=1, keepdims=True), 1.0, atol=1e-3): | |
| probs = tf.nn.softmax(preds, axis=1).numpy() | |
| else: | |
| probs = preds | |
| i = int(np.argmax(probs[0])) | |
| conf = float(probs[0][i]) | |
| label = CLASS_NAMES[i] | |
| if conf < float(GLOBAL_TAU): | |
| return None, conf | |
| return label, conf | |
| # --------------------------------------------------------- | |
| # 4) Routes | |
| # --------------------------------------------------------- | |
| def index(): | |
| if not session.get('user'): | |
| return redirect(url_for('welcome_page')) | |
| return render_template('index.html', username=session.get('user')) | |
| # Upload/predict | |
| ALLOWED_EXTS = {"png", "jpg", "jpeg", "webp"} | |
| def upload(): | |
| if pest_model is None: | |
| return jsonify({'error': 'Model not loaded.'}), 500 | |
| file = request.files.get('file') | |
| if not file or file.filename == '': | |
| return jsonify({'error': 'No file provided'}), 400 | |
| orig_name = secure_filename(file.filename) | |
| _, ext = os.path.splitext(orig_name) | |
| ext = (ext or '').lower().lstrip('.') | |
| if ext not in ALLOWED_EXTS: | |
| return jsonify({'error': f'Unsupported file type: .{ext}. Allowed: {", ".join(sorted(ALLOWED_EXTS))}'}), 400 | |
| unique_name = f"{uuid4().hex}.{ext}" | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_name) | |
| try: | |
| file.save(filepath) | |
| label, confidence = model_predict(filepath) | |
| print(f"Prediction result: {label} (confidence={confidence:.4f})") | |
| if label is None: | |
| # do NOT clear other sessions | |
| msg = f"No confident pest detected (confidence={confidence:.2f}). Please upload a clearer pest image." | |
| return jsonify({ | |
| 'success': False, | |
| 'message': msg, | |
| 'confidence': float(confidence), | |
| 'image_url': f'/files/uploads/{unique_name}' | |
| }), 200 | |
| # === NEW: remember this user's last prediction (pest + image) === | |
| _username = session.get('user') or "__anon__" | |
| latest_pred_by_user[_username] = { | |
| "pest": str(label), | |
| "image_url": f"/files/uploads/{unique_name}", | |
| "ts": datetime.utcnow().isoformat() | |
| } | |
| # Clear only current user's sessions (fresh context) | |
| _clear_user_sessions(session.get('user')) | |
| return jsonify({ | |
| 'success': True, | |
| 'prediction': str(label), | |
| 'confidence': float(confidence), | |
| 'image_url': f'/files/uploads/{unique_name}' | |
| }), 200 | |
| except Exception as e: | |
| print(f"Prediction Error: {e}") | |
| return jsonify({'error': f'Prediction failed: {e}'}), 500 | |
| # Recommendations | |
| def get_recommendations(pest_name): | |
| pest_name_lower = (pest_name or '').lower().strip() | |
| recommendations = PRODUCT_RECOMMENDATIONS.get(pest_name_lower, PRODUCT_RECOMMENDATIONS['default']) | |
| return jsonify(recommendations) | |
| # Optional: default recommendations route (in case frontend calls without pest) | |
| def get_recommendations_default(): | |
| return jsonify(PRODUCT_RECOMMENDATIONS['default']) | |
| # Chatbot (Gemini) | |
| def _chat_key(username: str, pest: str) -> str: | |
| u = username or "__anon__" | |
| return f"{u}:{(pest or '').lower()}" | |
| def _clear_user_sessions(username: str): | |
| if not username: | |
| return | |
| prefix = f"{username}:".lower() | |
| to_del = [k for k in chat_sessions.keys() if k.lower().startswith(prefix)] | |
| for k in to_del: | |
| del chat_sessions[k] | |
| if to_del: | |
| print(f"π§Ή Cleared {len(to_del)} chat session(s) for user '{username}'") | |
| def chatbot_response(): | |
| # Ensure RAG/gemini is available | |
| if not rag_model or not file_references: | |
| return jsonify({'response': 'Chatbot service is unavailable.', 'error': 'CONFIG_ERROR'}), 503 | |
| data = request.get_json(silent=True) or {} | |
| user_message = data.get('message') | |
| pest_type = data.get('pest_type') | |
| if not user_message or not pest_type: | |
| return jsonify({'response': 'Missing message or pest type.'}), 400 | |
| # Use Flask session username if available; fall back to anon | |
| username = session.get('user') or "__anon__" | |
| chat_key = f"{username}:{pest_type}".lower() | |
| try: | |
| # --- FIRST TURN: RAG over the pest-specific document --- | |
| if chat_key not in chat_sessions: | |
| app.logger.debug("New conversation for '%s' (user=%s). Using RAG.", pest_type, username) | |
| file_uri = file_references.get(pest_type.lower()) | |
| if not file_uri: | |
| return jsonify({'response': f"Sorry, no document found for '{pest_type}'."}), 404 | |
| # prepare file resource and prompt | |
| file_id = file_uri.split('/')[-1] | |
| file_resource_name = f'files/{file_id}' | |
| file_to_use = genai.get_file(name=file_resource_name) | |
| prompt = [ | |
| f"Answer the following question based only on the provided document about '{pest_type}'.", | |
| file_to_use, | |
| f"Question: {user_message}" | |
| ] | |
| # RAG generate | |
| response = rag_model.generate_content(prompt) | |
| text = getattr(response, "text", None) or (response.get("text") if isinstance(response, dict) else str(response)) | |
| # create the interactive chat object seeded with the first exchange | |
| chat = rag_model.start_chat(history=[ | |
| {'role': 'user', 'parts': [user_message]}, | |
| {'role': 'model', 'parts': [text]} | |
| ]) | |
| # save the chat object for future follow-ups (keyed by user+pest) | |
| chat_sessions[chat_key] = chat | |
| # attach last prediction image if it matches this pest | |
| lp = latest_pred_by_user.get(username or "__anon__") | |
| img_for_session = None | |
| if lp and (lp.get("pest","").lower() == pest_type.lower()): | |
| img_for_session = lp.get("image_url") | |
| # persist history and return session id (now with image_url) | |
| sid = _make_session(username, pest_type, image_url=img_for_session) | |
| _append_history(username, pest_type, "user", user_message, session_id=sid) | |
| _append_history(username, pest_type, "assistant", text, session_id=sid) | |
| return jsonify({'response': text, 'pest_type': pest_type, 'session_id': sid}) | |
| # --- FOLLOW-UP turns: continue the conversation with the saved chat object --- | |
| else: | |
| app.logger.debug("Continuing conversation for '%s' (user=%s).", pest_type, username) | |
| chat = chat_sessions[chat_key] | |
| # find or create a session id to append to | |
| sid = _find_session_id_for_user_pest(username, pest_type) | |
| if sid is None: | |
| # attach last prediction image if it matches this pest | |
| lp = latest_pred_by_user.get(username or "__anon__") | |
| img_for_session = None | |
| if lp and (lp.get("pest","").lower() == pest_type.lower()): | |
| img_for_session = lp.get("image_url") | |
| sid = _make_session(username, pest_type, image_url=img_for_session) | |
| # append user message to history | |
| _append_history(username, pest_type, "user", user_message, session_id=sid) | |
| # send the message via the interactive chat object | |
| response = chat.send_message(user_message) | |
| reply_text = getattr(response, "text", None) or (response.get("text") if isinstance(response, dict) else str(response)) | |
| # append assistant reply to history | |
| _append_history(username, pest_type, "assistant", reply_text, session_id=sid) | |
| return jsonify({'response': reply_text, 'pest_type': pest_type, 'session_id': sid}) | |
| except Exception as e: | |
| error_message = str(e) | |
| app.logger.exception("β Gemini API call failed") | |
| return jsonify({'response': f'API Call Failed: {error_message}', 'error_type': 'API_CALL_FAIL'}), 500 | |
| # --------------------------------------------------------- | |
| # 5) Auth (Excel-based; dev/demo only) | |
| # --------------------------------------------------------- | |
| USERS_FILE = os.path.join(os.getcwd(), "users.xlsx") | |
| _users_lock = threading.Lock() | |
| def _ensure_users_file(): | |
| if not os.path.exists(USERS_FILE): | |
| df = pd.DataFrame(columns=["username", "password_hash"]) | |
| df.to_excel(USERS_FILE, index=False) | |
| def read_users_df(): | |
| _ensure_users_file() | |
| try: | |
| df = pd.read_excel(USERS_FILE, engine="openpyxl") | |
| except Exception: | |
| df = pd.DataFrame(columns=["username", "password_hash"]) | |
| df.to_excel(USERS_FILE, index=False) | |
| return df | |
| def username_exists(username: str) -> bool: | |
| df = read_users_df() | |
| if 'username' not in df.columns: | |
| return False | |
| return username.lower().strip() in df['username'].str.lower().str.strip().tolist() | |
| def add_user(username: str, password: str) -> bool: | |
| with _users_lock: | |
| df = read_users_df() | |
| if 'username' in df.columns and username.lower().strip() in df['username'].str.lower().str.strip().tolist(): | |
| return False | |
| hashed = generate_password_hash(password) | |
| new_row = {"username": username.strip(), "password_hash": hashed} | |
| df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) | |
| df.to_excel(USERS_FILE, index=False) | |
| return True | |
| def verify_user(username: str, password: str) -> bool: | |
| df = read_users_df() | |
| if 'username' not in df.columns or 'password_hash' not in df.columns: | |
| return False | |
| match = df[df['username'].str.lower().str.strip() == username.lower().strip()] | |
| if match.empty: | |
| return False | |
| stored_hash = match.iloc[0]['password_hash'] | |
| return check_password_hash(stored_hash, password) | |
| def home_redirect(): | |
| if session.get("user"): | |
| return redirect(url_for("dashboard")) | |
| return redirect(url_for("index")) | |
| def register(): | |
| if request.method == "POST": | |
| username = request.form.get("username", "").strip() | |
| password = request.form.get("password", "") | |
| confirm = request.form.get("confirm_password", "") | |
| if not username or not password: | |
| flash("Please provide both username and password.", "danger") | |
| return redirect(url_for("register")) | |
| if len(password) < 6: | |
| flash("Password should be at least 6 characters long.", "warning") | |
| return redirect(url_for("register")) | |
| if password != confirm: | |
| flash("Password and confirm password do not match.", "warning") | |
| return redirect(url_for("register")) | |
| if username_exists(username): | |
| flash("Username already exists. Please choose a different username or login.", "warning") | |
| return redirect(url_for("register")) | |
| ok = add_user(username, password) | |
| if ok: | |
| session['user'] = username | |
| session.permanent = True | |
| session.modified = True | |
| print(f"β REGISTER - Set session user: {username}") | |
| print(f"β REGISTER - Session after set: {dict(session)}") # Force save | |
| flash(f"Welcome {username}! Account created successfully.", "success") | |
| return redirect(url_for('dashboard')) | |
| else: | |
| flash("Could not create account. Try again.", "danger") | |
| return redirect(url_for("register")) | |
| return render_template("register.html", title="Register") | |
| def login(): | |
| if request.method == "POST": | |
| username = request.form.get("username", "").strip() | |
| password = request.form.get("password", "") | |
| if not username or not password: | |
| flash("Please enter both username and password.", "warning") | |
| return redirect(url_for("login")) | |
| if not username_exists(username): | |
| flash("User not existed. Please register.", "warning") | |
| return redirect(url_for("register")) | |
| if verify_user(username, password): | |
| session['user'] = username | |
| session.permanent = True | |
| session.modified = True # Force save | |
| print(f"β LOGIN - Set session user: {username}") | |
| print(f"β LOGIN - Session after set: {dict(session)}") | |
| flash(f"Welcome back {username}!", "success") | |
| return redirect(url_for('dashboard')) # Go directly to dashboard | |
| else: | |
| flash("Incorrect password.", "danger") | |
| return redirect(url_for("login")) | |
| return render_template("login.html", title="Login") | |
| def dashboard(): | |
| print(f"π DASHBOARD - Session user: {session.get('user')}") | |
| print(f"π DASHBOARD - Full session: {dict(session)}") | |
| if not session.get("user"): | |
| flash("Please login first.", "warning") | |
| return redirect(url_for("login")) | |
| return render_template("chat_dashboard.html", title="Dashboard") | |
| def logout(): | |
| session.pop("user", None) | |
| flash("You have been logged out.", "info") | |
| return redirect(url_for("login")) | |
| def welcome_page(): | |
| if session.get('user'): | |
| return redirect(url_for('dashboard')) | |
| return render_template('landing.html') | |
| def greeting(): | |
| username = request.args.get('username', 'friend') | |
| mode = request.args.get('mode', 'welcome') | |
| print(f"π GREETING - Session user: {session.get('user')}") | |
| print(f"π GREETING - Username from args: {username}") | |
| if mode == 'welcome': | |
| title_text = f"Welcome aboard, {username}!" | |
| subtitle_text = "We're delighted to have you. π±" | |
| else: | |
| title_text = f"Welcome back, {username}!" | |
| subtitle_text = "Great to see you again β ready to identify and remedy pests? π" | |
| return render_template('greeting.html', title_text=title_text, subtitle_text=subtitle_text) | |
| # Replace the existing /chat_history and /chat_history/<id> routes with these. | |
| from flask import make_response | |
| def _safe_username(): | |
| """Return logged-in username or '__anon__' (same logic used elsewhere).""" | |
| return session.get('user') or "__anon__" | |
| def list_chat_history(): | |
| """Return light-weight listing of sessions for the current logged-in user. | |
| Uses the same in-memory user_sessions store that _append_history/_make_session use. | |
| """ | |
| username = session.get('user') | |
| if not username: | |
| return jsonify({'error': 'not_logged_in'}), 401 | |
| with _user_sessions_lock: | |
| sessions = user_sessions.get(username, [])[:] | |
| # return small summary list (reverse so recent first) | |
| lite = [{"id": s["id"], "pest": s["pest"], "created_at": s.get("created_at")} for s in sessions[::-1]] | |
| return jsonify({'username': username, 'sessions': lite}) | |
| # Serve uploaded files from /data/uploads with a clean URL. | |
| # e.g. /files/uploads/abc123.jpg | |
| def serve_uploaded_file(filename): | |
| return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=False) | |
| def get_chat_session(session_id): | |
| """Return the full session object (messages) for the logged-in user.""" | |
| username = session.get('user') | |
| if not username: | |
| return jsonify({'error': 'not_logged_in'}), 401 | |
| with _user_sessions_lock: | |
| sessions = user_sessions.get(username, []) | |
| for s in sessions: | |
| if s.get('id') == session_id: | |
| return jsonify(s) | |
| return jsonify({'error': 'not_found'}), 404 | |
| # --------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------- | |
| if __name__ == '__main__': | |
| port = int(os.environ.get("PORT", 7860)) # Hugging Face uses 7860 by default | |
| app.run(host="0.0.0.0", port=port, debug=True) | |