Spaces:
Running
Running
| # --- Logging setup: ensure logs are written to rag.log in project root --- | |
| import logging | |
| log_path = '/tmp/rag.log' | |
| logging.basicConfig( | |
| filename=log_path, | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| from urllib import response | |
| from flask import Flask, request, jsonify, send_file | |
| from werkzeug.utils import secure_filename | |
| from flask_cors import CORS | |
| import threading | |
| import os | |
| import traceback | |
| from functools import wraps | |
| from flask import session | |
| import uuid | |
| from flask import make_response, g | |
| import time | |
| import random | |
| # Ensure project root is on path for imports | |
| import sys | |
| sys.path.append(os.path.abspath(os.path.dirname(__file__))) | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'agents'))) | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'vector_stores'))) | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'tools'))) | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'main_graph'))) | |
| from vector_stores.L_vecdB import LongTermDatabase | |
| from vector_stores.S_vecdB import ShortTermDatabase | |
| from tools.email_scraper import EmailScraper | |
| from pipeline.RAGnarok import RAGnarok | |
| app = Flask(__name__) | |
| app.secret_key = os.environ.get("FLASK_SECRET_KEY", "your-default-secret-key") | |
| CORS(app, supports_credentials=True, origins=[ | |
| "https://rag-narok.vercel.app", | |
| "https://rag-narok-aiclubiitropars-projects.vercel.app", | |
| "http://localhost:3000" | |
| ]) | |
| # Define persistent directories for long-term and short-term databases | |
| LONG_TERM_PREFIX = "longterm_db" | |
| SHORT_TERM_PREFIX = "shortterm_db" | |
| # # Ensure persistent directories exist | |
| # os.makedirs(LONG_TERM_PREFIX, exist_ok=True) | |
| # os.makedirs(SHORT_TERM_PREFIX, exist_ok=True) | |
| # Initialize databases with Qdrant-compatible arguments | |
| long_db = LongTermDatabase(collection_prefix=LONG_TERM_PREFIX) | |
| # Update the fetch_latest_email callback to use EmailScraper | |
| def fetch_latest_email(): | |
| """ | |
| Fetch the latest email using the EmailScraper class. If summarization fails, skip the email. | |
| """ | |
| from tools.sumar import summarize_text | |
| from datetime import datetime | |
| scraper = EmailScraper() | |
| print("Fetching latest email...") | |
| emails = scraper.scrape_latest_emails(count=1) | |
| if not emails: | |
| app.logger.warning("No emails found when fetching latest email.") | |
| return None | |
| latest_email_id, latest_email = next(iter(emails.items())) | |
| body = latest_email.get('body', '') | |
| from_ = latest_email.get('from', '') | |
| subject = latest_email.get('subject', '') | |
| timestamp = latest_email.get('timestamp', datetime.utcnow().isoformat()) | |
| try: | |
| summary = summarize_text(body) | |
| # Concatenate 'from', 'subject', and 'timestamp' to the summarized body | |
| summary = f"From: {from_}\nSubject: {subject}\nTimestamp: {timestamp}\n{summary}" | |
| except Exception as e: | |
| app.logger.warning(f"Summarization failed for email {latest_email_id}: {e}. Skipping email.") | |
| return None | |
| # Return as a list of dicts for objectwise ingestion, including 'from', 'subject', and 'timestamp' | |
| return [{ | |
| 'id': latest_email_id, | |
| 'body': summary, | |
| 'from': from_, | |
| 'subject': subject, | |
| 'timestamp': timestamp | |
| }] | |
| # Pass the callback to ShortTermDatabase | |
| short_db = ShortTermDatabase( | |
| collection_prefix=SHORT_TERM_PREFIX, | |
| fetch_latest_email=fetch_latest_email | |
| ) | |
| # --- Short-term DB background worker management --- | |
| global_worker_thread = None | |
| global_worker_stop_event = threading.Event() | |
| global_worker_running = False # New global flag for worker status | |
| model = 'qwen/qwen3-32b' | |
| def shortterm_worker(): | |
| global global_worker_running | |
| global_worker_running = True | |
| try: | |
| short_db.run_worker() | |
| except Exception as e: | |
| print(f"Short-term worker error: {e}") | |
| traceback.print_exc() | |
| finally: | |
| global_worker_running = False | |
| def start_worker_thread_if_needed(): | |
| global global_worker_thread, global_worker_stop_event, global_worker_running | |
| # Only start the worker in the main Flask process (not the reloader) | |
| if os.environ.get("WERKZEUG_RUN_MAIN") == "true": | |
| if not global_worker_thread or not global_worker_thread.is_alive(): | |
| global_worker_stop_event.clear() | |
| global_worker_thread = threading.Thread(target=shortterm_worker) | |
| global_worker_thread.start() | |
| global_worker_running = True | |
| app.logger.info("Short-term worker thread started automatically on backend startup.") | |
| # Start the worker thread at app startup | |
| start_worker_thread_if_needed() | |
| # --- Simple session-based decorator for admin endpoints --- | |
| def require_admin(f): | |
| def decorated(*args, **kwargs): | |
| # Removed admin email check | |
| return f(*args, **kwargs) | |
| return decorated | |
| # --- Global dictionary for user RAGnarok objects with last access time --- | |
| user_rag_dict = {} # {user_uuid: {'rag': RAGnarok, 'last_access': timestamp}} | |
| USER_RAG_TIMEOUT = 30 * 60 # 30 minutes in seconds | |
| def cleanup_user_rag_dict(): | |
| """Remove user sessions that have been inactive for more than USER_RAG_TIMEOUT seconds.""" | |
| now = time.time() | |
| to_delete = [uuid for uuid, v in user_rag_dict.items() if now - v['last_access'] > USER_RAG_TIMEOUT] | |
| for uuid in to_delete: | |
| del user_rag_dict[uuid] | |
| # --- Global model variable --- | |
| # model = 'deepseek-r1-distill-llama-70b' # Default model | |
| def change_model(): | |
| global model | |
| try: | |
| data = request.get_json() | |
| new_model = data.get('model') | |
| if not new_model: | |
| return jsonify({'error': 'No model provided.'}), 400 | |
| model = new_model | |
| app.logger.info(f"Model changed to: {model}") | |
| return jsonify({'message': f'Model changed to {model}.'}), 200 | |
| except Exception as e: | |
| app.logger.error(f"Error changing model: {e}") | |
| return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 | |
| # --- API Endpoints --- | |
| def upload_json(): | |
| try: | |
| if 'file' not in request.files: | |
| app.logger.error("No file part in the request.") | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| app.logger.error("No file selected for upload.") | |
| return jsonify({'error': 'No selected file'}), 400 | |
| if not file.filename.endswith('.json'): | |
| app.logger.error("Uploaded file is not a JSON file.") | |
| return jsonify({'error': 'Invalid file type. Only JSON files are allowed.'}), 400 | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join('uploads', filename) | |
| os.makedirs('uploads', exist_ok=True) | |
| file.save(filepath) | |
| app.logger.info(f"File {filename} saved successfully at {filepath}.") | |
| # Add the uploaded JSON data to the long-term database | |
| try: | |
| app.logger.info(f"Adding data from {filename} to the long-term database...") | |
| long_db.add_data(filepath) | |
| app.logger.info(f"Data from {filename} added to the long-term database successfully.") | |
| except Exception as e: | |
| app.logger.error(f"Failed to add data from {filename} to the long-term database: {e}") | |
| return jsonify({'error': 'Failed to process the file.', 'details': str(e)}), 500 | |
| return jsonify({'message': 'File uploaded and data added to long-term DB.'}), 200 | |
| except Exception as e: | |
| app.logger.error(f"Unexpected error during file upload: {e}") | |
| return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 | |
| finally: | |
| # Clean up the uploaded file to save space | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| # Ensure the worker thread persists across sessions | |
| def start_shortterm_worker(): | |
| global global_worker_thread, global_worker_stop_event, global_worker_running | |
| if global_worker_thread and global_worker_thread.is_alive(): | |
| app.logger.info("Worker thread is already running.") | |
| return jsonify({'message': 'Short-term worker already running.'}), 200 | |
| # Reset the stop event to ensure the thread runs continuously | |
| global_worker_stop_event.clear() | |
| global_worker_thread = threading.Thread(target=shortterm_worker) | |
| global_worker_thread.start() | |
| global_worker_running = True | |
| app.logger.info("Worker thread has been started and will save updates to the short-term directory.") | |
| return jsonify({'message': 'Short-term worker started.'}), 200 | |
| def stop_shortterm_worker(): | |
| global global_worker_stop_event, global_worker_running | |
| try: | |
| global_worker_stop_event.set() # Signal the thread to stop | |
| short_db.stop_worker() | |
| global_worker_running = False | |
| return jsonify({'message': 'Short-term worker stopped.'}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 | |
| def worker_status(): | |
| global global_worker_running | |
| running = global_worker_running | |
| status = { | |
| 'running': running | |
| } | |
| if not running: | |
| app.logger.warning("Worker thread is not running.") | |
| else: | |
| app.logger.info("Worker thread is running.") | |
| return jsonify(status) | |
| # # Ensure RAGnarok is instantiated correctly | |
| # rg = RAGnarok(long_db, short_db) | |
| def chat(): | |
| try: | |
| data = request.get_json() | |
| query = data.get('query') | |
| user_uuid = data.get('user_uuid') | |
| if not query: | |
| return jsonify({'error': 'No query provided'}), 400 | |
| if not user_uuid: | |
| return jsonify({'error': 'No user_uuid provided'}), 400 | |
| global user_rag_dict, model, api_keys | |
| cleanup_user_rag_dict() # Clean up expired sessions on each chat | |
| app.logger.info(f"Received user_uuid: {user_uuid}") | |
| app.logger.info(f"Current user_rag_dict keys: {list(user_rag_dict.keys())}") | |
| now = time.time() | |
| if user_uuid not in user_rag_dict: | |
| user_rag_dict[user_uuid] = { | |
| 'rag': RAGnarok(long_db, short_db, model=model), | |
| 'last_access': now | |
| } | |
| else: | |
| user_rag_dict[user_uuid]['last_access'] = now | |
| user_rg = user_rag_dict[user_uuid]['rag'] | |
| response_text = user_rg.invoke(query) | |
| app.logger.info(f"RAGnarok response: {response_text}") | |
| resp = make_response(jsonify({'response': response_text}), 200) | |
| return resp | |
| except Exception as e: | |
| app.logger.error(f"Unexpected error in /chat endpoint: {e}", exc_info=True) | |
| return jsonify({'error': 'An unexpected error occurred. Please try again later.'}), 500 | |
| # --- Admin Authentication Endpoint --- | |
| def verify_credentials(): | |
| try: | |
| data = request.get_json() | |
| email = data.get('email') | |
| password = data.get('password') | |
| if not email or not password: | |
| return jsonify({'error': 'Email and password are required.'}), 400 | |
| admin_email = os.getenv('ADMIN_EMAIL') | |
| admin_password = os.getenv('ADMIN_PASSWORD') | |
| if email == admin_email and password == admin_password: | |
| return jsonify({'message': 'Authentication successful.'}), 200 | |
| else: | |
| return jsonify({'error': 'Invalid credentials.'}), 401 | |
| except Exception as e: | |
| app.logger.error(f"Error during admin authentication: {e}") | |
| return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500 | |
| # Flag to ensure initialization logic runs only once | |
| initialized = False | |
| def initialize_databases(): | |
| global initialized | |
| if not initialized: | |
| initialized = True | |
| try: | |
| if not os.listdir(LONG_TERM_PREFIX): | |
| app.logger.info("Long-term database is empty. Initializing with default data.") | |
| # Add logic to populate long-term database with initial data if needed | |
| if not os.listdir(SHORT_TERM_PREFIX): | |
| app.logger.info("Short-term database is empty. Initializing with default data.") | |
| # Add logic to populate short-term database with initial data if needed | |
| except Exception as e: | |
| app.logger.error(f"Error during database initialization: {e}") | |
| # --- Graceful shutdown --- | |
| import atexit | |
| def cleanup(): | |
| try: | |
| stop_shortterm_worker() | |
| short_db.close() | |
| except Exception: | |
| pass | |
| def download_logs(): | |
| log_path = 'rag.log' # Adjust path if your log file is elsewhere | |
| if not os.path.exists(log_path): | |
| return jsonify({'error': 'Log file not found.'}), 404 | |
| return send_file(log_path, as_attachment=True, download_name='rag.txt') | |
| def easter_egg_shutdown(): | |
| """Hidden endpoint""" | |
| try: | |
| logging.info("Special command executed - server shutdown initiated") | |
| # Send response before shutting down | |
| response = jsonify({ | |
| 'message': '🎉 Special command executed! Space is being paused...', | |
| 'status': 'pause_initiated' | |
| }) | |
| # Use threading to delay shutdown so response can be sent | |
| def delayed_shutdown(): | |
| import time | |
| from huggingface_hub import HfApi | |
| time.sleep(1) # Give time for response to be sent | |
| logging.info("Pausing Hugging Face Space due to special command") | |
| try: | |
| # Method 1: Hugging Face Space pause (only method) | |
| hf_token = os.getenv("HF_TOKEN") | |
| if hf_token: | |
| api = HfApi(token=hf_token) | |
| repo_id = "IotaCluster/RAGnarok-Stable" | |
| print("pausing") | |
| # Pause the space | |
| api.pause_space(repo_id=repo_id) | |
| print("paused") | |
| # Get status/info (using correct attributes) | |
| info = api.space_info(repo_id=repo_id) | |
| logging.info(f"Space info: {info.id}, runtime: {getattr(info, 'runtime', 'N/A')}") | |
| logging.info("Hugging Face Space paused successfully") | |
| else: | |
| logging.warning("HF_TOKEN not found in environment variables") | |
| except ImportError: | |
| logging.warning("huggingface_hub not available") | |
| except Exception as e: | |
| logging.warning(f"Hugging Face API pause failed: {e}") | |
| shutdown_thread = threading.Thread(target=delayed_shutdown) | |
| shutdown_thread.daemon = True | |
| shutdown_thread.start() | |
| return response, 200 | |
| except Exception as e: | |
| logging.error(f"Error in special command: {str(e)}") | |
| return jsonify({'error': 'Failed to initiate shutdown'}), 500 | |
| def index(): | |
| return "RAG-narok backend is running.", 200 | |
| def before_request(): | |
| pass | |
| if __name__ == '__main__': | |
| import os | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.run( | |
| host='0.0.0.0', | |
| port=port, | |
| debug=False, # disable debug mode in production | |
| use_reloader=False, # prevent double-start freeze | |
| threaded=True | |
| ) | |