from flask import Flask, request, jsonify, Response, render_template from flask_cors import CORS import os import logging import functools import pandas as pd from dotenv import load_dotenv # Load environment variables load_dotenv() # Custom Imports from rag_system import initialize_and_get_rag_system from config import ( API_USERNAME, API_PASSWORD, RAG_SOURCES_DIR, GDRIVE_INDEX_ENABLED, GDRIVE_INDEX_ID_OR_URL, GDRIVE_USERS_CSV_ENABLED, GDRIVE_USERS_CSV_ID_OR_URL, ADMIN_USERNAME, ADMIN_PASSWORD, RAG_RERANKER_K ) from utils import download_and_unzip_gdrive_file, download_gdrive_file # Logging Setup logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Flask Init app = Flask(__name__, static_folder='static', template_folder='templates') CORS(app) # Global State rag_system = None user_df = None _APP_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # --- Helper: Load Users --- def load_users_from_csv(): global user_df assets_folder = os.path.join(_APP_BASE_DIR, 'assets') os.makedirs(assets_folder, exist_ok=True) users_csv_path = os.path.join(assets_folder, 'users.csv') try: if os.path.exists(users_csv_path): user_df = pd.read_csv(users_csv_path) # Normalize email if 'email' in user_df.columns: user_df['email'] = user_df['email'].str.lower().str.strip() logger.info(f"Loaded {len(user_df)} users from CSV.") else: logger.warning("users.csv not found in assets folder.") user_df = None except Exception as e: logger.error(f"Failed to load users.csv: {e}") user_df = None # --- Helper: Auth Decorators --- def require_api_auth(f): """Protects the N8N Webhook endpoint""" @functools.wraps(f) def decorated(*args, **kwargs): auth = request.authorization # Check against API_USERNAME/PASSWORD from .env if not auth or auth.username != API_USERNAME or auth.password != API_PASSWORD: return Response('Unauthorized', 401, {'WWW-Authenticate': 'Basic realm="API Login Required"'}) return f(*args, **kwargs) return decorated def require_admin_auth(f): """Protects Admin Rebuild/Update endpoints""" @functools.wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth: return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'}) # 1. Check against loaded CSV if user_df is not None: user_email = auth.username.lower().strip() user_record = user_df[user_df['email'] == user_email] if not user_record.empty: user_data = user_record.iloc[0] # Compare password as string if str(user_data['password']) == auth.password and user_data['role'] == 'admin': return f(*args, **kwargs) # 2. Fallback to .env Admin Credentials elif auth.username == ADMIN_USERNAME and auth.password == ADMIN_PASSWORD: return f(*args, **kwargs) return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'}) return decorated # --- Startup Logic (Fixed: No Decorator) --- def run_startup_tasks(): """Initializes RAG system and loads data. Called explicitly.""" global rag_system logger.info("--- Executing Startup Tasks ---") # 1. Download Users CSV if configured if GDRIVE_USERS_CSV_ENABLED and GDRIVE_USERS_CSV_ID_OR_URL: target = os.path.join(_APP_BASE_DIR, 'assets', 'users.csv') download_gdrive_file(GDRIVE_USERS_CSV_ID_OR_URL, target) # 2. Load User Data load_users_from_csv() # 3. Download FAISS Index if configured if GDRIVE_INDEX_ENABLED and GDRIVE_INDEX_ID_OR_URL: download_and_unzip_gdrive_file(GDRIVE_INDEX_ID_OR_URL, os.getcwd()) # 4. Initialize RAG rag_system = initialize_and_get_rag_system() logger.info("--- Startup Tasks Complete ---") # Execute startup tasks immediately when this module is loaded # This ensures it runs before the first request in Flask 3.x with app.app_context(): run_startup_tasks() # =========================== # API ROUTES # =========================== # --- 1. N8N Webhook (The Core Function) --- @app.route('/webhook/search', methods=['POST']) @require_api_auth def search_knowledgebase_api(): """ Main entry point for N8N. Expected JSON: { "query": "...", "use_reranker": bool, "final_k": int } """ if not rag_system: # Try to recover if somehow not initialized return jsonify({"error": "RAG not initialized. Check server logs."}), 503 data = request.json or {} query = data.get('query') if not query: return jsonify({"error": "Query field is required"}), 400 # Configs (with defaults) # --- FIX: Use RAG_RERANKER_K from config as the default instead of hardcoded 5 --- top_k = data.get('final_k', RAG_RERANKER_K) use_reranker = data.get('use_reranker', True) # Dynamic Reranker Toggling if rag_system.retriever: if not use_reranker: rag_system.retriever.reranker = None elif use_reranker and rag_system.reranker: rag_system.retriever.reranker = rag_system.reranker try: results = rag_system.search_knowledge_base(query, top_k=top_k) return jsonify({ "results": results, "count": len(results), "status": "success" }) except Exception as e: logger.error(f"Search API Error: {e}") return jsonify({"error": str(e)}), 500 # --- 2. User Login (RESTORED) --- @app.route('/user-login', methods=['POST']) def user_login(): """ Standard user login endpoint. Checks credentials against users.csv. """ if user_df is None: return jsonify({"error": "User database not available."}), 503 data = request.json email = data.get('email', '').lower().strip() password = data.get('password') if not email or not password: return jsonify({"error": "Email and password required"}), 400 user_record = user_df[user_df['email'] == email] if not user_record.empty: u_data = user_record.iloc[0] if str(u_data['password']) == str(password): # Return user info (excluding password) resp = u_data.to_dict() if 'password' in resp: del resp['password'] return jsonify(resp), 200 return jsonify({"error": "Invalid credentials"}), 401 # --- 3. UI Route --- @app.route('/') def index_route(): # Renders the HTML Dashboard return render_template('chat-bot.html') # --- 4. Admin Auth Check --- @app.route('/admin/login', methods=['POST']) @require_admin_auth def admin_login(): """Verifies Admin Credentials via Basic Auth Header""" return jsonify({"status": "success", "message": "Authenticated"}), 200 # --- 5. Admin RAG Controls --- @app.route('/admin/update_faiss_index', methods=['POST']) @require_admin_auth def update_faiss_index(): if not rag_system: return jsonify({"error": "RAG system not initialized"}), 503 data = request.json or {} max_files = data.get('max_new_files') try: result = rag_system.update_index_with_new_files(RAG_SOURCES_DIR, max_files) return jsonify(result), 200 except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/admin/rebuild_index', methods=['POST']) @require_admin_auth def rebuild_index(): global rag_system try: # Force rebuild calls the initialization logic with force_rebuild=True rag_system = initialize_and_get_rag_system(force_rebuild=True) return jsonify({"status": "Index rebuilt successfully"}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 # --- 6. Status Check --- @app.route('/status', methods=['GET']) def status_route(): return jsonify({ "status": "online", "rag_initialized": rag_system is not None, "users_loaded": user_df is not None }) if __name__ == '__main__': # Default to 7860 for Hugging Face Spaces port = int(os.environ.get("PORT", 7860)) app.run(host='0.0.0.0', port=port)