ed-cad-ref / app.py
SakibAhmed's picture
Update app.py
eb2c208 verified
from flask import Flask, request, jsonify, Response, render_template
from flask_cors import CORS
import os
import logging
import functools
import pandas as pd
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Custom Imports
from rag_system import initialize_and_get_rag_system
from config import (
API_USERNAME, API_PASSWORD, RAG_SOURCES_DIR,
GDRIVE_INDEX_ENABLED, GDRIVE_INDEX_ID_OR_URL,
GDRIVE_USERS_CSV_ENABLED, GDRIVE_USERS_CSV_ID_OR_URL,
ADMIN_USERNAME, ADMIN_PASSWORD,
RAG_RERANKER_K
)
from utils import download_and_unzip_gdrive_file, download_gdrive_file
# Logging Setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Flask Init
app = Flask(__name__, static_folder='static', template_folder='templates')
CORS(app)
# Global State
rag_system = None
user_df = None
_APP_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# --- Helper: Load Users ---
def load_users_from_csv():
global user_df
assets_folder = os.path.join(_APP_BASE_DIR, 'assets')
os.makedirs(assets_folder, exist_ok=True)
users_csv_path = os.path.join(assets_folder, 'users.csv')
try:
if os.path.exists(users_csv_path):
user_df = pd.read_csv(users_csv_path)
# Normalize email
if 'email' in user_df.columns:
user_df['email'] = user_df['email'].str.lower().str.strip()
logger.info(f"Loaded {len(user_df)} users from CSV.")
else:
logger.warning("users.csv not found in assets folder.")
user_df = None
except Exception as e:
logger.error(f"Failed to load users.csv: {e}")
user_df = None
# --- Helper: Auth Decorators ---
def require_api_auth(f):
"""Protects the N8N Webhook endpoint"""
@functools.wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
# Check against API_USERNAME/PASSWORD from .env
if not auth or auth.username != API_USERNAME or auth.password != API_PASSWORD:
return Response('Unauthorized', 401, {'WWW-Authenticate': 'Basic realm="API Login Required"'})
return f(*args, **kwargs)
return decorated
def require_admin_auth(f):
"""Protects Admin Rebuild/Update endpoints"""
@functools.wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth:
return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'})
# 1. Check against loaded CSV
if user_df is not None:
user_email = auth.username.lower().strip()
user_record = user_df[user_df['email'] == user_email]
if not user_record.empty:
user_data = user_record.iloc[0]
# Compare password as string
if str(user_data['password']) == auth.password and user_data['role'] == 'admin':
return f(*args, **kwargs)
# 2. Fallback to .env Admin Credentials
elif auth.username == ADMIN_USERNAME and auth.password == ADMIN_PASSWORD:
return f(*args, **kwargs)
return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'})
return decorated
# --- Startup Logic (Fixed: No Decorator) ---
def run_startup_tasks():
"""Initializes RAG system and loads data. Called explicitly."""
global rag_system
logger.info("--- Executing Startup Tasks ---")
# 1. Download Users CSV if configured
if GDRIVE_USERS_CSV_ENABLED and GDRIVE_USERS_CSV_ID_OR_URL:
target = os.path.join(_APP_BASE_DIR, 'assets', 'users.csv')
download_gdrive_file(GDRIVE_USERS_CSV_ID_OR_URL, target)
# 2. Load User Data
load_users_from_csv()
# 3. Download FAISS Index if configured
if GDRIVE_INDEX_ENABLED and GDRIVE_INDEX_ID_OR_URL:
download_and_unzip_gdrive_file(GDRIVE_INDEX_ID_OR_URL, os.getcwd())
# 4. Initialize RAG
rag_system = initialize_and_get_rag_system()
logger.info("--- Startup Tasks Complete ---")
# Execute startup tasks immediately when this module is loaded
# This ensures it runs before the first request in Flask 3.x
with app.app_context():
run_startup_tasks()
# ===========================
# API ROUTES
# ===========================
# --- 1. N8N Webhook (The Core Function) ---
@app.route('/webhook/search', methods=['POST'])
@require_api_auth
def search_knowledgebase_api():
"""
Main entry point for N8N.
Expected JSON: { "query": "...", "use_reranker": bool, "final_k": int }
"""
if not rag_system:
# Try to recover if somehow not initialized
return jsonify({"error": "RAG not initialized. Check server logs."}), 503
data = request.json or {}
query = data.get('query')
if not query:
return jsonify({"error": "Query field is required"}), 400
# Configs (with defaults)
# --- FIX: Use RAG_RERANKER_K from config as the default instead of hardcoded 5 ---
top_k = data.get('final_k', RAG_RERANKER_K)
use_reranker = data.get('use_reranker', True)
# Dynamic Reranker Toggling
if rag_system.retriever:
if not use_reranker:
rag_system.retriever.reranker = None
elif use_reranker and rag_system.reranker:
rag_system.retriever.reranker = rag_system.reranker
try:
results = rag_system.search_knowledge_base(query, top_k=top_k)
return jsonify({
"results": results,
"count": len(results),
"status": "success"
})
except Exception as e:
logger.error(f"Search API Error: {e}")
return jsonify({"error": str(e)}), 500
# --- 2. User Login (RESTORED) ---
@app.route('/user-login', methods=['POST'])
def user_login():
"""
Standard user login endpoint.
Checks credentials against users.csv.
"""
if user_df is None:
return jsonify({"error": "User database not available."}), 503
data = request.json
email = data.get('email', '').lower().strip()
password = data.get('password')
if not email or not password:
return jsonify({"error": "Email and password required"}), 400
user_record = user_df[user_df['email'] == email]
if not user_record.empty:
u_data = user_record.iloc[0]
if str(u_data['password']) == str(password):
# Return user info (excluding password)
resp = u_data.to_dict()
if 'password' in resp:
del resp['password']
return jsonify(resp), 200
return jsonify({"error": "Invalid credentials"}), 401
# --- 3. UI Route ---
@app.route('/')
def index_route():
# Renders the HTML Dashboard
return render_template('chat-bot.html')
# --- 4. Admin Auth Check ---
@app.route('/admin/login', methods=['POST'])
@require_admin_auth
def admin_login():
"""Verifies Admin Credentials via Basic Auth Header"""
return jsonify({"status": "success", "message": "Authenticated"}), 200
# --- 5. Admin RAG Controls ---
@app.route('/admin/update_faiss_index', methods=['POST'])
@require_admin_auth
def update_faiss_index():
if not rag_system:
return jsonify({"error": "RAG system not initialized"}), 503
data = request.json or {}
max_files = data.get('max_new_files')
try:
result = rag_system.update_index_with_new_files(RAG_SOURCES_DIR, max_files)
return jsonify(result), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/admin/rebuild_index', methods=['POST'])
@require_admin_auth
def rebuild_index():
global rag_system
try:
# Force rebuild calls the initialization logic with force_rebuild=True
rag_system = initialize_and_get_rag_system(force_rebuild=True)
return jsonify({"status": "Index rebuilt successfully"}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# --- 6. Status Check ---
@app.route('/status', methods=['GET'])
def status_route():
return jsonify({
"status": "online",
"rag_initialized": rag_system is not None,
"users_loaded": user_df is not None
})
if __name__ == '__main__':
# Default to 7860 for Hugging Face Spaces
port = int(os.environ.get("PORT", 7860))
app.run(host='0.0.0.0', port=port)