Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify, Response, render_template | |
| from flask_cors import CORS | |
| import os | |
| import logging | |
| import functools | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Custom Imports | |
| from rag_system import initialize_and_get_rag_system | |
| from config import ( | |
| API_USERNAME, API_PASSWORD, RAG_SOURCES_DIR, | |
| GDRIVE_INDEX_ENABLED, GDRIVE_INDEX_ID_OR_URL, | |
| GDRIVE_USERS_CSV_ENABLED, GDRIVE_USERS_CSV_ID_OR_URL, | |
| ADMIN_USERNAME, ADMIN_PASSWORD, | |
| RAG_RERANKER_K | |
| ) | |
| from utils import download_and_unzip_gdrive_file, download_gdrive_file | |
| # Logging Setup | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Flask Init | |
| app = Flask(__name__, static_folder='static', template_folder='templates') | |
| CORS(app) | |
| # Global State | |
| rag_system = None | |
| user_df = None | |
| _APP_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| # --- Helper: Load Users --- | |
| def load_users_from_csv(): | |
| global user_df | |
| assets_folder = os.path.join(_APP_BASE_DIR, 'assets') | |
| os.makedirs(assets_folder, exist_ok=True) | |
| users_csv_path = os.path.join(assets_folder, 'users.csv') | |
| try: | |
| if os.path.exists(users_csv_path): | |
| user_df = pd.read_csv(users_csv_path) | |
| # Normalize email | |
| if 'email' in user_df.columns: | |
| user_df['email'] = user_df['email'].str.lower().str.strip() | |
| logger.info(f"Loaded {len(user_df)} users from CSV.") | |
| else: | |
| logger.warning("users.csv not found in assets folder.") | |
| user_df = None | |
| except Exception as e: | |
| logger.error(f"Failed to load users.csv: {e}") | |
| user_df = None | |
| # --- Helper: Auth Decorators --- | |
| def require_api_auth(f): | |
| """Protects the N8N Webhook endpoint""" | |
| def decorated(*args, **kwargs): | |
| auth = request.authorization | |
| # Check against API_USERNAME/PASSWORD from .env | |
| if not auth or auth.username != API_USERNAME or auth.password != API_PASSWORD: | |
| return Response('Unauthorized', 401, {'WWW-Authenticate': 'Basic realm="API Login Required"'}) | |
| return f(*args, **kwargs) | |
| return decorated | |
| def require_admin_auth(f): | |
| """Protects Admin Rebuild/Update endpoints""" | |
| def decorated(*args, **kwargs): | |
| auth = request.authorization | |
| if not auth: | |
| return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'}) | |
| # 1. Check against loaded CSV | |
| if user_df is not None: | |
| user_email = auth.username.lower().strip() | |
| user_record = user_df[user_df['email'] == user_email] | |
| if not user_record.empty: | |
| user_data = user_record.iloc[0] | |
| # Compare password as string | |
| if str(user_data['password']) == auth.password and user_data['role'] == 'admin': | |
| return f(*args, **kwargs) | |
| # 2. Fallback to .env Admin Credentials | |
| elif auth.username == ADMIN_USERNAME and auth.password == ADMIN_PASSWORD: | |
| return f(*args, **kwargs) | |
| return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'}) | |
| return decorated | |
| # --- Startup Logic (Fixed: No Decorator) --- | |
| def run_startup_tasks(): | |
| """Initializes RAG system and loads data. Called explicitly.""" | |
| global rag_system | |
| logger.info("--- Executing Startup Tasks ---") | |
| # 1. Download Users CSV if configured | |
| if GDRIVE_USERS_CSV_ENABLED and GDRIVE_USERS_CSV_ID_OR_URL: | |
| target = os.path.join(_APP_BASE_DIR, 'assets', 'users.csv') | |
| download_gdrive_file(GDRIVE_USERS_CSV_ID_OR_URL, target) | |
| # 2. Load User Data | |
| load_users_from_csv() | |
| # 3. Download FAISS Index if configured | |
| if GDRIVE_INDEX_ENABLED and GDRIVE_INDEX_ID_OR_URL: | |
| download_and_unzip_gdrive_file(GDRIVE_INDEX_ID_OR_URL, os.getcwd()) | |
| # 4. Initialize RAG | |
| rag_system = initialize_and_get_rag_system() | |
| logger.info("--- Startup Tasks Complete ---") | |
| # Execute startup tasks immediately when this module is loaded | |
| # This ensures it runs before the first request in Flask 3.x | |
| with app.app_context(): | |
| run_startup_tasks() | |
| # =========================== | |
| # API ROUTES | |
| # =========================== | |
| # --- 1. N8N Webhook (The Core Function) --- | |
| def search_knowledgebase_api(): | |
| """ | |
| Main entry point for N8N. | |
| Expected JSON: { "query": "...", "use_reranker": bool, "final_k": int } | |
| """ | |
| if not rag_system: | |
| # Try to recover if somehow not initialized | |
| return jsonify({"error": "RAG not initialized. Check server logs."}), 503 | |
| data = request.json or {} | |
| query = data.get('query') | |
| if not query: | |
| return jsonify({"error": "Query field is required"}), 400 | |
| # Configs (with defaults) | |
| # --- FIX: Use RAG_RERANKER_K from config as the default instead of hardcoded 5 --- | |
| top_k = data.get('final_k', RAG_RERANKER_K) | |
| use_reranker = data.get('use_reranker', True) | |
| # Dynamic Reranker Toggling | |
| if rag_system.retriever: | |
| if not use_reranker: | |
| rag_system.retriever.reranker = None | |
| elif use_reranker and rag_system.reranker: | |
| rag_system.retriever.reranker = rag_system.reranker | |
| try: | |
| results = rag_system.search_knowledge_base(query, top_k=top_k) | |
| return jsonify({ | |
| "results": results, | |
| "count": len(results), | |
| "status": "success" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Search API Error: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| # --- 2. User Login (RESTORED) --- | |
| def user_login(): | |
| """ | |
| Standard user login endpoint. | |
| Checks credentials against users.csv. | |
| """ | |
| if user_df is None: | |
| return jsonify({"error": "User database not available."}), 503 | |
| data = request.json | |
| email = data.get('email', '').lower().strip() | |
| password = data.get('password') | |
| if not email or not password: | |
| return jsonify({"error": "Email and password required"}), 400 | |
| user_record = user_df[user_df['email'] == email] | |
| if not user_record.empty: | |
| u_data = user_record.iloc[0] | |
| if str(u_data['password']) == str(password): | |
| # Return user info (excluding password) | |
| resp = u_data.to_dict() | |
| if 'password' in resp: | |
| del resp['password'] | |
| return jsonify(resp), 200 | |
| return jsonify({"error": "Invalid credentials"}), 401 | |
| # --- 3. UI Route --- | |
| def index_route(): | |
| # Renders the HTML Dashboard | |
| return render_template('chat-bot.html') | |
| # --- 4. Admin Auth Check --- | |
| def admin_login(): | |
| """Verifies Admin Credentials via Basic Auth Header""" | |
| return jsonify({"status": "success", "message": "Authenticated"}), 200 | |
| # --- 5. Admin RAG Controls --- | |
| def update_faiss_index(): | |
| if not rag_system: | |
| return jsonify({"error": "RAG system not initialized"}), 503 | |
| data = request.json or {} | |
| max_files = data.get('max_new_files') | |
| try: | |
| result = rag_system.update_index_with_new_files(RAG_SOURCES_DIR, max_files) | |
| return jsonify(result), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def rebuild_index(): | |
| global rag_system | |
| try: | |
| # Force rebuild calls the initialization logic with force_rebuild=True | |
| rag_system = initialize_and_get_rag_system(force_rebuild=True) | |
| return jsonify({"status": "Index rebuilt successfully"}), 200 | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # --- 6. Status Check --- | |
| def status_route(): | |
| return jsonify({ | |
| "status": "online", | |
| "rag_initialized": rag_system is not None, | |
| "users_loaded": user_df is not None | |
| }) | |
| if __name__ == '__main__': | |
| # Default to 7860 for Hugging Face Spaces | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.run(host='0.0.0.0', port=port) |