RAGnarok-Stable / app.py
IotaCluster's picture
Update app.py
994a6cd verified
# --- Logging setup: ensure logs are written to rag.log in project root ---
import logging
log_path = '/tmp/rag.log'
logging.basicConfig(
filename=log_path,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
from urllib import response
from flask import Flask, request, jsonify, send_file
from werkzeug.utils import secure_filename
from flask_cors import CORS
import threading
import os
import traceback
from functools import wraps
from flask import session
import uuid
from flask import make_response, g
import time
import random
# Ensure project root is on path for imports
import sys
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'agents')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'vector_stores')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'tools')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'main_graph')))
from vector_stores.L_vecdB import LongTermDatabase
from vector_stores.S_vecdB import ShortTermDatabase
from tools.email_scraper import EmailScraper
from pipeline.RAGnarok import RAGnarok
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "your-default-secret-key")
CORS(app, supports_credentials=True, origins=[
"https://rag-narok.vercel.app",
"https://rag-narok-aiclubiitropars-projects.vercel.app",
"http://localhost:3000"
])
# Define persistent directories for long-term and short-term databases
LONG_TERM_PREFIX = "longterm_db"
SHORT_TERM_PREFIX = "shortterm_db"
# # Ensure persistent directories exist
# os.makedirs(LONG_TERM_PREFIX, exist_ok=True)
# os.makedirs(SHORT_TERM_PREFIX, exist_ok=True)
# Initialize databases with Qdrant-compatible arguments
long_db = LongTermDatabase(collection_prefix=LONG_TERM_PREFIX)
# Update the fetch_latest_email callback to use EmailScraper
def fetch_latest_email():
"""
Fetch the latest email using the EmailScraper class. If summarization fails, skip the email.
"""
from tools.sumar import summarize_text
from datetime import datetime
scraper = EmailScraper()
print("Fetching latest email...")
emails = scraper.scrape_latest_emails(count=1)
if not emails:
app.logger.warning("No emails found when fetching latest email.")
return None
latest_email_id, latest_email = next(iter(emails.items()))
body = latest_email.get('body', '')
from_ = latest_email.get('from', '')
subject = latest_email.get('subject', '')
timestamp = latest_email.get('timestamp', datetime.utcnow().isoformat())
try:
summary = summarize_text(body)
# Concatenate 'from', 'subject', and 'timestamp' to the summarized body
summary = f"From: {from_}\nSubject: {subject}\nTimestamp: {timestamp}\n{summary}"
except Exception as e:
app.logger.warning(f"Summarization failed for email {latest_email_id}: {e}. Skipping email.")
return None
# Return as a list of dicts for objectwise ingestion, including 'from', 'subject', and 'timestamp'
return [{
'id': latest_email_id,
'body': summary,
'from': from_,
'subject': subject,
'timestamp': timestamp
}]
# Pass the callback to ShortTermDatabase
short_db = ShortTermDatabase(
collection_prefix=SHORT_TERM_PREFIX,
fetch_latest_email=fetch_latest_email
)
# --- Short-term DB background worker management ---
global_worker_thread = None
global_worker_stop_event = threading.Event()
global_worker_running = False # New global flag for worker status
model = 'qwen/qwen3-32b'
def shortterm_worker():
global global_worker_running
global_worker_running = True
try:
short_db.run_worker()
except Exception as e:
print(f"Short-term worker error: {e}")
traceback.print_exc()
finally:
global_worker_running = False
def start_worker_thread_if_needed():
global global_worker_thread, global_worker_stop_event, global_worker_running
# Only start the worker in the main Flask process (not the reloader)
if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
if not global_worker_thread or not global_worker_thread.is_alive():
global_worker_stop_event.clear()
global_worker_thread = threading.Thread(target=shortterm_worker)
global_worker_thread.start()
global_worker_running = True
app.logger.info("Short-term worker thread started automatically on backend startup.")
# Start the worker thread at app startup
start_worker_thread_if_needed()
# --- Simple session-based decorator for admin endpoints ---
def require_admin(f):
@wraps(f)
def decorated(*args, **kwargs):
# Removed admin email check
return f(*args, **kwargs)
return decorated
# --- Global dictionary for user RAGnarok objects with last access time ---
user_rag_dict = {} # {user_uuid: {'rag': RAGnarok, 'last_access': timestamp}}
USER_RAG_TIMEOUT = 30 * 60 # 30 minutes in seconds
def cleanup_user_rag_dict():
"""Remove user sessions that have been inactive for more than USER_RAG_TIMEOUT seconds."""
now = time.time()
to_delete = [uuid for uuid, v in user_rag_dict.items() if now - v['last_access'] > USER_RAG_TIMEOUT]
for uuid in to_delete:
del user_rag_dict[uuid]
# --- Global model variable ---
# model = 'deepseek-r1-distill-llama-70b' # Default model
@app.route('/admin/change_model', methods=['POST'])
def change_model():
global model
try:
data = request.get_json()
new_model = data.get('model')
if not new_model:
return jsonify({'error': 'No model provided.'}), 400
model = new_model
app.logger.info(f"Model changed to: {model}")
return jsonify({'message': f'Model changed to {model}.'}), 200
except Exception as e:
app.logger.error(f"Error changing model: {e}")
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
# --- API Endpoints ---
@app.route('/admin/upload_json', methods=['POST'])
@require_admin
def upload_json():
try:
if 'file' not in request.files:
app.logger.error("No file part in the request.")
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
app.logger.error("No file selected for upload.")
return jsonify({'error': 'No selected file'}), 400
if not file.filename.endswith('.json'):
app.logger.error("Uploaded file is not a JSON file.")
return jsonify({'error': 'Invalid file type. Only JSON files are allowed.'}), 400
filename = secure_filename(file.filename)
filepath = os.path.join('uploads', filename)
os.makedirs('uploads', exist_ok=True)
file.save(filepath)
app.logger.info(f"File {filename} saved successfully at {filepath}.")
# Add the uploaded JSON data to the long-term database
try:
app.logger.info(f"Adding data from {filename} to the long-term database...")
long_db.add_data(filepath)
app.logger.info(f"Data from {filename} added to the long-term database successfully.")
except Exception as e:
app.logger.error(f"Failed to add data from {filename} to the long-term database: {e}")
return jsonify({'error': 'Failed to process the file.', 'details': str(e)}), 500
return jsonify({'message': 'File uploaded and data added to long-term DB.'}), 200
except Exception as e:
app.logger.error(f"Unexpected error during file upload: {e}")
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
finally:
# Clean up the uploaded file to save space
if os.path.exists(filepath):
os.remove(filepath)
# Ensure the worker thread persists across sessions
@app.route('/admin/start_shortterm_worker', methods=['POST'])
@require_admin
def start_shortterm_worker():
global global_worker_thread, global_worker_stop_event, global_worker_running
if global_worker_thread and global_worker_thread.is_alive():
app.logger.info("Worker thread is already running.")
return jsonify({'message': 'Short-term worker already running.'}), 200
# Reset the stop event to ensure the thread runs continuously
global_worker_stop_event.clear()
global_worker_thread = threading.Thread(target=shortterm_worker)
global_worker_thread.start()
global_worker_running = True
app.logger.info("Worker thread has been started and will save updates to the short-term directory.")
return jsonify({'message': 'Short-term worker started.'}), 200
@app.route('/admin/stop_shortterm_worker', methods=['POST'])
@require_admin
def stop_shortterm_worker():
global global_worker_stop_event, global_worker_running
try:
global_worker_stop_event.set() # Signal the thread to stop
short_db.stop_worker()
global_worker_running = False
return jsonify({'message': 'Short-term worker stopped.'}), 200
except Exception as e:
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
@app.route('/admin/worker_status', methods=['GET'])
@require_admin
def worker_status():
global global_worker_running
running = global_worker_running
status = {
'running': running
}
if not running:
app.logger.warning("Worker thread is not running.")
else:
app.logger.info("Worker thread is running.")
return jsonify(status)
# # Ensure RAGnarok is instantiated correctly
# rg = RAGnarok(long_db, short_db)
@app.route('/chat', methods=['POST'])
def chat():
try:
data = request.get_json()
query = data.get('query')
user_uuid = data.get('user_uuid')
if not query:
return jsonify({'error': 'No query provided'}), 400
if not user_uuid:
return jsonify({'error': 'No user_uuid provided'}), 400
global user_rag_dict, model, api_keys
cleanup_user_rag_dict() # Clean up expired sessions on each chat
app.logger.info(f"Received user_uuid: {user_uuid}")
app.logger.info(f"Current user_rag_dict keys: {list(user_rag_dict.keys())}")
now = time.time()
if user_uuid not in user_rag_dict:
user_rag_dict[user_uuid] = {
'rag': RAGnarok(long_db, short_db, model=model),
'last_access': now
}
else:
user_rag_dict[user_uuid]['last_access'] = now
user_rg = user_rag_dict[user_uuid]['rag']
response_text = user_rg.invoke(query)
app.logger.info(f"RAGnarok response: {response_text}")
resp = make_response(jsonify({'response': response_text}), 200)
return resp
except Exception as e:
app.logger.error(f"Unexpected error in /chat endpoint: {e}", exc_info=True)
return jsonify({'error': 'An unexpected error occurred. Please try again later.'}), 500
# --- Admin Authentication Endpoint ---
@app.route('/admin/verify_credentials', methods=['POST'])
def verify_credentials():
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password:
return jsonify({'error': 'Email and password are required.'}), 400
admin_email = os.getenv('ADMIN_EMAIL')
admin_password = os.getenv('ADMIN_PASSWORD')
if email == admin_email and password == admin_password:
return jsonify({'message': 'Authentication successful.'}), 200
else:
return jsonify({'error': 'Invalid credentials.'}), 401
except Exception as e:
app.logger.error(f"Error during admin authentication: {e}")
return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
# Flag to ensure initialization logic runs only once
initialized = False
@app.before_request
def initialize_databases():
global initialized
if not initialized:
initialized = True
try:
if not os.listdir(LONG_TERM_PREFIX):
app.logger.info("Long-term database is empty. Initializing with default data.")
# Add logic to populate long-term database with initial data if needed
if not os.listdir(SHORT_TERM_PREFIX):
app.logger.info("Short-term database is empty. Initializing with default data.")
# Add logic to populate short-term database with initial data if needed
except Exception as e:
app.logger.error(f"Error during database initialization: {e}")
# --- Graceful shutdown ---
import atexit
@atexit.register
def cleanup():
try:
stop_shortterm_worker()
short_db.close()
except Exception:
pass
@app.route('/admin/logs', methods=['GET'])
def download_logs():
log_path = 'rag.log' # Adjust path if your log file is elsewhere
if not os.path.exists(log_path):
return jsonify({'error': 'Log file not found.'}), 404
return send_file(log_path, as_attachment=True, download_name='rag.txt')
@app.route('/kill69', methods=['POST', 'GET'])
def easter_egg_shutdown():
"""Hidden endpoint"""
try:
logging.info("Special command executed - server shutdown initiated")
# Send response before shutting down
response = jsonify({
'message': '🎉 Special command executed! Space is being paused...',
'status': 'pause_initiated'
})
# Use threading to delay shutdown so response can be sent
def delayed_shutdown():
import time
from huggingface_hub import HfApi
time.sleep(1) # Give time for response to be sent
logging.info("Pausing Hugging Face Space due to special command")
try:
# Method 1: Hugging Face Space pause (only method)
hf_token = os.getenv("HF_TOKEN")
if hf_token:
api = HfApi(token=hf_token)
repo_id = "IotaCluster/RAGnarok-Stable"
print("pausing")
# Pause the space
api.pause_space(repo_id=repo_id)
print("paused")
# Get status/info (using correct attributes)
info = api.space_info(repo_id=repo_id)
logging.info(f"Space info: {info.id}, runtime: {getattr(info, 'runtime', 'N/A')}")
logging.info("Hugging Face Space paused successfully")
else:
logging.warning("HF_TOKEN not found in environment variables")
except ImportError:
logging.warning("huggingface_hub not available")
except Exception as e:
logging.warning(f"Hugging Face API pause failed: {e}")
shutdown_thread = threading.Thread(target=delayed_shutdown)
shutdown_thread.daemon = True
shutdown_thread.start()
return response, 200
except Exception as e:
logging.error(f"Error in special command: {str(e)}")
return jsonify({'error': 'Failed to initiate shutdown'}), 500
@app.route('/', methods=['GET'])
def index():
return "RAG-narok backend is running.", 200
@app.before_request
def before_request():
pass
if __name__ == '__main__':
import os
port = int(os.environ.get("PORT", 7860))
app.run(
host='0.0.0.0',
port=port,
debug=False, # disable debug mode in production
use_reloader=False, # prevent double-start freeze
threaded=True
)