File size: 15,845 Bytes
5f8122e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
daadadb
5f8122e
 
 
 
 
994a6cd
4806004
5f8122e
 
4806004
5f8122e
daadadb
5f8122e
daadadb
5f8122e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# --- Logging setup: ensure logs are written to rag.log in project root ---
import logging

log_path = '/tmp/rag.log'
logging.basicConfig(
    filename=log_path,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)



from urllib import response
from flask import Flask, request, jsonify, send_file
from werkzeug.utils import secure_filename
from flask_cors import CORS
import threading
import os
import traceback
from functools import wraps
from flask import session
import uuid
from flask import make_response, g
import time
import random

# Ensure project root is on path for imports
import sys
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'agents')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'vector_stores')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'tools')))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'main_graph')))

from vector_stores.L_vecdB import LongTermDatabase
from vector_stores.S_vecdB import ShortTermDatabase
from tools.email_scraper import EmailScraper
from pipeline.RAGnarok import RAGnarok

app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "your-default-secret-key")
CORS(app, supports_credentials=True, origins=[
    "https://rag-narok.vercel.app",
    "https://rag-narok-aiclubiitropars-projects.vercel.app",
    "http://localhost:3000"
])

# Define persistent directories for long-term and short-term databases
LONG_TERM_PREFIX = "longterm_db"
SHORT_TERM_PREFIX = "shortterm_db"

# # Ensure persistent directories exist
# os.makedirs(LONG_TERM_PREFIX, exist_ok=True)
# os.makedirs(SHORT_TERM_PREFIX, exist_ok=True)

# Initialize databases with Qdrant-compatible arguments
long_db = LongTermDatabase(collection_prefix=LONG_TERM_PREFIX)

# Update the fetch_latest_email callback to use EmailScraper

def fetch_latest_email():
    """
    Fetch the latest email using the EmailScraper class. If summarization fails, skip the email.
    """
    from tools.sumar import summarize_text
    from datetime import datetime
    scraper = EmailScraper()
    print("Fetching latest email...")
    emails = scraper.scrape_latest_emails(count=1)
    if not emails:
        app.logger.warning("No emails found when fetching latest email.")
        return None
    latest_email_id, latest_email = next(iter(emails.items()))
    body = latest_email.get('body', '')
    from_ = latest_email.get('from', '')
    subject = latest_email.get('subject', '')
    timestamp = latest_email.get('timestamp', datetime.utcnow().isoformat())
    try:
        summary = summarize_text(body)
        # Concatenate 'from', 'subject', and 'timestamp' to the summarized body
        summary = f"From: {from_}\nSubject: {subject}\nTimestamp: {timestamp}\n{summary}"
    except Exception as e:
        app.logger.warning(f"Summarization failed for email {latest_email_id}: {e}. Skipping email.")
        return None
    # Return as a list of dicts for objectwise ingestion, including 'from', 'subject', and 'timestamp'
    return [{
        'id': latest_email_id,
        'body': summary,
        'from': from_,
        'subject': subject,
        'timestamp': timestamp
    }]

# Pass the callback to ShortTermDatabase
short_db = ShortTermDatabase(
    collection_prefix=SHORT_TERM_PREFIX,
    fetch_latest_email=fetch_latest_email
)

# --- Short-term DB background worker management ---
global_worker_thread = None
global_worker_stop_event = threading.Event()
global_worker_running = False  # New global flag for worker status

model = 'qwen/qwen3-32b'

def shortterm_worker():
    global global_worker_running
    global_worker_running = True
    try:
        short_db.run_worker()
    except Exception as e:
        print(f"Short-term worker error: {e}")
        traceback.print_exc()
    finally:
        global_worker_running = False

def start_worker_thread_if_needed():
    global global_worker_thread, global_worker_stop_event, global_worker_running
    # Only start the worker in the main Flask process (not the reloader)
    if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
        if not global_worker_thread or not global_worker_thread.is_alive():
            global_worker_stop_event.clear()
            global_worker_thread = threading.Thread(target=shortterm_worker)
            global_worker_thread.start()
            global_worker_running = True
            app.logger.info("Short-term worker thread started automatically on backend startup.")

# Start the worker thread at app startup
start_worker_thread_if_needed()

# --- Simple session-based decorator for admin endpoints ---
def require_admin(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        # Removed admin email check
        return f(*args, **kwargs)
    return decorated

# --- Global dictionary for user RAGnarok objects with last access time ---
user_rag_dict = {}  # {user_uuid: {'rag': RAGnarok, 'last_access': timestamp}}
USER_RAG_TIMEOUT = 30 * 60  # 30 minutes in seconds

def cleanup_user_rag_dict():
    """Remove user sessions that have been inactive for more than USER_RAG_TIMEOUT seconds."""
    now = time.time()
    
    to_delete = [uuid for uuid, v in user_rag_dict.items() if now - v['last_access'] > USER_RAG_TIMEOUT]
    for uuid in to_delete:
        del user_rag_dict[uuid]

# --- Global model variable ---
# model = 'deepseek-r1-distill-llama-70b'  # Default model
@app.route('/admin/change_model', methods=['POST'])
def change_model():
    global model
    try:
        data = request.get_json()
        new_model = data.get('model')
        if not new_model:
            return jsonify({'error': 'No model provided.'}), 400
        model = new_model
        app.logger.info(f"Model changed to: {model}")
        return jsonify({'message': f'Model changed to {model}.'}), 200
    except Exception as e:
        app.logger.error(f"Error changing model: {e}")
        return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500

# --- API Endpoints ---
@app.route('/admin/upload_json', methods=['POST'])
@require_admin
def upload_json():
    try:
        if 'file' not in request.files:
            app.logger.error("No file part in the request.")
            return jsonify({'error': 'No file part'}), 400

        file = request.files['file']
        if file.filename == '':
            app.logger.error("No file selected for upload.")
            return jsonify({'error': 'No selected file'}), 400

        if not file.filename.endswith('.json'):
            app.logger.error("Uploaded file is not a JSON file.")
            return jsonify({'error': 'Invalid file type. Only JSON files are allowed.'}), 400

        filename = secure_filename(file.filename)
        filepath = os.path.join('uploads', filename)
        os.makedirs('uploads', exist_ok=True)
        file.save(filepath)

        app.logger.info(f"File {filename} saved successfully at {filepath}.")

        # Add the uploaded JSON data to the long-term database
        try:
            app.logger.info(f"Adding data from {filename} to the long-term database...")
            long_db.add_data(filepath)
            app.logger.info(f"Data from {filename} added to the long-term database successfully.")
        except Exception as e:
            app.logger.error(f"Failed to add data from {filename} to the long-term database: {e}")
            return jsonify({'error': 'Failed to process the file.', 'details': str(e)}), 500

        return jsonify({'message': 'File uploaded and data added to long-term DB.'}), 200
    except Exception as e:
        app.logger.error(f"Unexpected error during file upload: {e}")
        return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
    finally:
        # Clean up the uploaded file to save space
        if os.path.exists(filepath):
            os.remove(filepath)

# Ensure the worker thread persists across sessions
@app.route('/admin/start_shortterm_worker', methods=['POST'])
@require_admin
def start_shortterm_worker():
    global global_worker_thread, global_worker_stop_event, global_worker_running
    if global_worker_thread and global_worker_thread.is_alive():
        app.logger.info("Worker thread is already running.")
        return jsonify({'message': 'Short-term worker already running.'}), 200

    # Reset the stop event to ensure the thread runs continuously
    global_worker_stop_event.clear()
    global_worker_thread = threading.Thread(target=shortterm_worker)
    global_worker_thread.start()
    global_worker_running = True
    app.logger.info("Worker thread has been started and will save updates to the short-term directory.")
    return jsonify({'message': 'Short-term worker started.'}), 200

@app.route('/admin/stop_shortterm_worker', methods=['POST'])
@require_admin
def stop_shortterm_worker():
    global global_worker_stop_event, global_worker_running
    try:
        global_worker_stop_event.set()  # Signal the thread to stop
        short_db.stop_worker()
        global_worker_running = False
        return jsonify({'message': 'Short-term worker stopped.'}), 200
    except Exception as e:
        return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500

@app.route('/admin/worker_status', methods=['GET'])
@require_admin
def worker_status():
    global global_worker_running
    running = global_worker_running
    status = {
        'running': running
    }
    if not running:
        app.logger.warning("Worker thread is not running.")
    else:
        app.logger.info("Worker thread is running.")
    return jsonify(status)

# # Ensure RAGnarok is instantiated correctly
# rg = RAGnarok(long_db, short_db)

@app.route('/chat', methods=['POST'])
def chat():
    try:
        data = request.get_json()
        query = data.get('query')
        user_uuid = data.get('user_uuid')
        if not query:
            return jsonify({'error': 'No query provided'}), 400
        if not user_uuid:
            return jsonify({'error': 'No user_uuid provided'}), 400

        global user_rag_dict, model, api_keys
        cleanup_user_rag_dict()  # Clean up expired sessions on each chat

        app.logger.info(f"Received user_uuid: {user_uuid}")
        app.logger.info(f"Current user_rag_dict keys: {list(user_rag_dict.keys())}")

        now = time.time()
        if user_uuid not in user_rag_dict:
            user_rag_dict[user_uuid] = {
                'rag': RAGnarok(long_db, short_db, model=model),
                'last_access': now
            }
        else:
            user_rag_dict[user_uuid]['last_access'] = now

        user_rg = user_rag_dict[user_uuid]['rag']
        response_text = user_rg.invoke(query)
        app.logger.info(f"RAGnarok response: {response_text}")

        resp = make_response(jsonify({'response': response_text}), 200)
        return resp
    except Exception as e:
        app.logger.error(f"Unexpected error in /chat endpoint: {e}", exc_info=True)
        return jsonify({'error': 'An unexpected error occurred. Please try again later.'}), 500


# --- Admin Authentication Endpoint ---
@app.route('/admin/verify_credentials', methods=['POST'])
def verify_credentials():
    try:
        data = request.get_json()
        email = data.get('email')
        password = data.get('password')

        if not email or not password:
            return jsonify({'error': 'Email and password are required.'}), 400

        admin_email = os.getenv('ADMIN_EMAIL')
        admin_password = os.getenv('ADMIN_PASSWORD')

        if email == admin_email and password == admin_password:
            return jsonify({'message': 'Authentication successful.'}), 200
        else:
            return jsonify({'error': 'Invalid credentials.'}), 401

    except Exception as e:
        app.logger.error(f"Error during admin authentication: {e}")
        return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500


# Flag to ensure initialization logic runs only once
initialized = False

@app.before_request
def initialize_databases():
    global initialized
    if not initialized:
        initialized = True
        try:
            if not os.listdir(LONG_TERM_PREFIX):
                app.logger.info("Long-term database is empty. Initializing with default data.")
                # Add logic to populate long-term database with initial data if needed

            if not os.listdir(SHORT_TERM_PREFIX):
                app.logger.info("Short-term database is empty. Initializing with default data.")
                # Add logic to populate short-term database with initial data if needed

        except Exception as e:
            app.logger.error(f"Error during database initialization: {e}")

# --- Graceful shutdown ---
import atexit
@atexit.register
def cleanup():
    try:
        stop_shortterm_worker()
        short_db.close()
    except Exception:
        pass
    
@app.route('/admin/logs', methods=['GET'])
def download_logs():
    log_path = 'rag.log'  # Adjust path if your log file is elsewhere
    if not os.path.exists(log_path):
        return jsonify({'error': 'Log file not found.'}), 404
    return send_file(log_path, as_attachment=True, download_name='rag.txt')

@app.route('/kill69', methods=['POST', 'GET'])
def easter_egg_shutdown():
    """Hidden endpoint"""
    try:
        logging.info("Special command executed - server shutdown initiated")
        
        # Send response before shutting down
        response = jsonify({
            'message': '🎉 Special command executed! Space is being paused...',
            'status': 'pause_initiated'
        })
        
        # Use threading to delay shutdown so response can be sent
        def delayed_shutdown():
            import time
            from huggingface_hub import HfApi
            time.sleep(1)  # Give time for response to be sent
            logging.info("Pausing Hugging Face Space due to special command")
            
            try:
                # Method 1: Hugging Face Space pause (only method)
                hf_token = os.getenv("HF_TOKEN")
                if hf_token:
                    api = HfApi(token=hf_token)
                    repo_id = "IotaCluster/RAGnarok-Stable"
                    print("pausing")
                    # Pause the space
                    api.pause_space(repo_id=repo_id)
                    print("paused")

                    # Get status/info (using correct attributes)
                    info = api.space_info(repo_id=repo_id)
                    logging.info(f"Space info: {info.id}, runtime: {getattr(info, 'runtime', 'N/A')}")
                    logging.info("Hugging Face Space paused successfully")
                else:
                    logging.warning("HF_TOKEN not found in environment variables")
                    
            except ImportError:
                logging.warning("huggingface_hub not available")
            except Exception as e:
                logging.warning(f"Hugging Face API pause failed: {e}")
        
        shutdown_thread = threading.Thread(target=delayed_shutdown)
        shutdown_thread.daemon = True
        shutdown_thread.start()
        
        return response, 200
        
    except Exception as e:
        logging.error(f"Error in special command: {str(e)}")
        return jsonify({'error': 'Failed to initiate shutdown'}), 500

@app.route('/', methods=['GET'])
def index():
    return "RAG-narok backend is running.", 200

@app.before_request
def before_request():
    pass

if __name__ == '__main__':
    import os
    port = int(os.environ.get("PORT", 7860))
    app.run(
        host='0.0.0.0',
        port=port,
        debug=False,        # disable debug mode in production
        use_reloader=False, # prevent double-start freeze
        threaded=True
    )