IotaCluster commited on
Commit
5f8122e
·
verified ·
1 Parent(s): 99be9aa

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +424 -0
app.py ADDED
@@ -0,0 +1,424 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # --- Logging setup: ensure logs are written to rag.log in project root ---
2
+ import logging
3
+
4
+ log_path = '/tmp/rag.log'
5
+ logging.basicConfig(
6
+ filename=log_path,
7
+ level=logging.INFO,
8
+ format='%(asctime)s - %(levelname)s - %(message)s'
9
+ )
10
+
11
+
12
+
13
+ from urllib import response
14
+ from flask import Flask, request, jsonify, send_file
15
+ from werkzeug.utils import secure_filename
16
+ from flask_cors import CORS
17
+ import threading
18
+ import os
19
+ import traceback
20
+ from functools import wraps
21
+ from flask import session
22
+ import uuid
23
+ from flask import make_response, g
24
+ import time
25
+ import random
26
+
27
+ # Ensure project root is on path for imports
28
+ import sys
29
+ sys.path.append(os.path.abspath(os.path.dirname(__file__)))
30
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'agents')))
31
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'vector_stores')))
32
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'tools')))
33
+ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'main_graph')))
34
+
35
+ from vector_stores.L_vecdB import LongTermDatabase
36
+ from vector_stores.S_vecdB import ShortTermDatabase
37
+ from tools.email_scraper import EmailScraper
38
+ from pipeline.RAGnarok import RAGnarok
39
+
40
+ app = Flask(__name__)
41
+ app.secret_key = os.environ.get("FLASK_SECRET_KEY", "your-default-secret-key")
42
+ CORS(app, supports_credentials=True, origins=[
43
+ "https://rag-narok.vercel.app",
44
+ "https://rag-narok-aiclubiitropars-projects.vercel.app",
45
+ "http://localhost:3000"
46
+ ])
47
+
48
+ # Define persistent directories for long-term and short-term databases
49
+ LONG_TERM_PREFIX = "longterm_db"
50
+ SHORT_TERM_PREFIX = "shortterm_db"
51
+
52
+ # # Ensure persistent directories exist
53
+ # os.makedirs(LONG_TERM_PREFIX, exist_ok=True)
54
+ # os.makedirs(SHORT_TERM_PREFIX, exist_ok=True)
55
+
56
+ # Initialize databases with Qdrant-compatible arguments
57
+ long_db = LongTermDatabase(collection_prefix=LONG_TERM_PREFIX)
58
+
59
+ # Update the fetch_latest_email callback to use EmailScraper
60
+
61
+ def fetch_latest_email():
62
+ """
63
+ Fetch the latest email using the EmailScraper class. If summarization fails, skip the email.
64
+ """
65
+ from tools.sumar import summarize_text
66
+ from datetime import datetime
67
+ scraper = EmailScraper()
68
+ print("Fetching latest email...")
69
+ emails = scraper.scrape_latest_emails(count=1)
70
+ if not emails:
71
+ app.logger.warning("No emails found when fetching latest email.")
72
+ return None
73
+ latest_email_id, latest_email = next(iter(emails.items()))
74
+ body = latest_email.get('body', '')
75
+ from_ = latest_email.get('from', '')
76
+ subject = latest_email.get('subject', '')
77
+ timestamp = latest_email.get('timestamp', datetime.utcnow().isoformat())
78
+ try:
79
+ summary = summarize_text(body)
80
+ # Concatenate 'from', 'subject', and 'timestamp' to the summarized body
81
+ summary = f"From: {from_}\nSubject: {subject}\nTimestamp: {timestamp}\n{summary}"
82
+ except Exception as e:
83
+ app.logger.warning(f"Summarization failed for email {latest_email_id}: {e}. Skipping email.")
84
+ return None
85
+ # Return as a list of dicts for objectwise ingestion, including 'from', 'subject', and 'timestamp'
86
+ return [{
87
+ 'id': latest_email_id,
88
+ 'body': summary,
89
+ 'from': from_,
90
+ 'subject': subject,
91
+ 'timestamp': timestamp
92
+ }]
93
+
94
+ # Pass the callback to ShortTermDatabase
95
+ short_db = ShortTermDatabase(
96
+ collection_prefix=SHORT_TERM_PREFIX,
97
+ fetch_latest_email=fetch_latest_email
98
+ )
99
+
100
+ # --- Short-term DB background worker management ---
101
+ global_worker_thread = None
102
+ global_worker_stop_event = threading.Event()
103
+ global_worker_running = False # New global flag for worker status
104
+
105
+ model = 'qwen/qwen3-32b'
106
+
107
+ def shortterm_worker():
108
+ global global_worker_running
109
+ global_worker_running = True
110
+ try:
111
+ short_db.run_worker()
112
+ except Exception as e:
113
+ print(f"Short-term worker error: {e}")
114
+ traceback.print_exc()
115
+ finally:
116
+ global_worker_running = False
117
+
118
+ def start_worker_thread_if_needed():
119
+ global global_worker_thread, global_worker_stop_event, global_worker_running
120
+ # Only start the worker in the main Flask process (not the reloader)
121
+ if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
122
+ if not global_worker_thread or not global_worker_thread.is_alive():
123
+ global_worker_stop_event.clear()
124
+ global_worker_thread = threading.Thread(target=shortterm_worker)
125
+ global_worker_thread.start()
126
+ global_worker_running = True
127
+ app.logger.info("Short-term worker thread started automatically on backend startup.")
128
+
129
+ # Start the worker thread at app startup
130
+ start_worker_thread_if_needed()
131
+
132
+ # --- Simple session-based decorator for admin endpoints ---
133
+ def require_admin(f):
134
+ @wraps(f)
135
+ def decorated(*args, **kwargs):
136
+ # Removed admin email check
137
+ return f(*args, **kwargs)
138
+ return decorated
139
+
140
+ # --- Global dictionary for user RAGnarok objects with last access time ---
141
+ user_rag_dict = {} # {user_uuid: {'rag': RAGnarok, 'last_access': timestamp}}
142
+ USER_RAG_TIMEOUT = 30 * 60 # 30 minutes in seconds
143
+
144
+ def cleanup_user_rag_dict():
145
+ """Remove user sessions that have been inactive for more than USER_RAG_TIMEOUT seconds."""
146
+ now = time.time()
147
+
148
+ to_delete = [uuid for uuid, v in user_rag_dict.items() if now - v['last_access'] > USER_RAG_TIMEOUT]
149
+ for uuid in to_delete:
150
+ del user_rag_dict[uuid]
151
+
152
+ # --- Global model variable ---
153
+ # model = 'deepseek-r1-distill-llama-70b' # Default model
154
+ @app.route('/admin/change_model', methods=['POST'])
155
+ def change_model():
156
+ global model
157
+ try:
158
+ data = request.get_json()
159
+ new_model = data.get('model')
160
+ if not new_model:
161
+ return jsonify({'error': 'No model provided.'}), 400
162
+ model = new_model
163
+ app.logger.info(f"Model changed to: {model}")
164
+ return jsonify({'message': f'Model changed to {model}.'}), 200
165
+ except Exception as e:
166
+ app.logger.error(f"Error changing model: {e}")
167
+ return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
168
+
169
+ # --- API Endpoints ---
170
+ @app.route('/admin/upload_json', methods=['POST'])
171
+ @require_admin
172
+ def upload_json():
173
+ try:
174
+ if 'file' not in request.files:
175
+ app.logger.error("No file part in the request.")
176
+ return jsonify({'error': 'No file part'}), 400
177
+
178
+ file = request.files['file']
179
+ if file.filename == '':
180
+ app.logger.error("No file selected for upload.")
181
+ return jsonify({'error': 'No selected file'}), 400
182
+
183
+ if not file.filename.endswith('.json'):
184
+ app.logger.error("Uploaded file is not a JSON file.")
185
+ return jsonify({'error': 'Invalid file type. Only JSON files are allowed.'}), 400
186
+
187
+ filename = secure_filename(file.filename)
188
+ filepath = os.path.join('uploads', filename)
189
+ os.makedirs('uploads', exist_ok=True)
190
+ file.save(filepath)
191
+
192
+ app.logger.info(f"File {filename} saved successfully at {filepath}.")
193
+
194
+ # Add the uploaded JSON data to the long-term database
195
+ try:
196
+ app.logger.info(f"Adding data from {filename} to the long-term database...")
197
+ long_db.add_data(filepath)
198
+ app.logger.info(f"Data from {filename} added to the long-term database successfully.")
199
+ except Exception as e:
200
+ app.logger.error(f"Failed to add data from {filename} to the long-term database: {e}")
201
+ return jsonify({'error': 'Failed to process the file.', 'details': str(e)}), 500
202
+
203
+ return jsonify({'message': 'File uploaded and data added to long-term DB.'}), 200
204
+ except Exception as e:
205
+ app.logger.error(f"Unexpected error during file upload: {e}")
206
+ return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
207
+ finally:
208
+ # Clean up the uploaded file to save space
209
+ if os.path.exists(filepath):
210
+ os.remove(filepath)
211
+
212
+ # Ensure the worker thread persists across sessions
213
+ @app.route('/admin/start_shortterm_worker', methods=['POST'])
214
+ @require_admin
215
+ def start_shortterm_worker():
216
+ global global_worker_thread, global_worker_stop_event, global_worker_running
217
+ if global_worker_thread and global_worker_thread.is_alive():
218
+ app.logger.info("Worker thread is already running.")
219
+ return jsonify({'message': 'Short-term worker already running.'}), 200
220
+
221
+ # Reset the stop event to ensure the thread runs continuously
222
+ global_worker_stop_event.clear()
223
+ global_worker_thread = threading.Thread(target=shortterm_worker)
224
+ global_worker_thread.start()
225
+ global_worker_running = True
226
+ app.logger.info("Worker thread has been started and will save updates to the short-term directory.")
227
+ return jsonify({'message': 'Short-term worker started.'}), 200
228
+
229
+ @app.route('/admin/stop_shortterm_worker', methods=['POST'])
230
+ @require_admin
231
+ def stop_shortterm_worker():
232
+ global global_worker_stop_event, global_worker_running
233
+ try:
234
+ global_worker_stop_event.set() # Signal the thread to stop
235
+ short_db.stop_worker()
236
+ global_worker_running = False
237
+ return jsonify({'message': 'Short-term worker stopped.'}), 200
238
+ except Exception as e:
239
+ return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
240
+
241
+ @app.route('/admin/worker_status', methods=['GET'])
242
+ @require_admin
243
+ def worker_status():
244
+ global global_worker_running
245
+ running = global_worker_running
246
+ status = {
247
+ 'running': running
248
+ }
249
+ if not running:
250
+ app.logger.warning("Worker thread is not running.")
251
+ else:
252
+ app.logger.info("Worker thread is running.")
253
+ return jsonify(status)
254
+
255
+ # # Ensure RAGnarok is instantiated correctly
256
+ # rg = RAGnarok(long_db, short_db)
257
+
258
+ @app.route('/chat', methods=['POST'])
259
+ def chat():
260
+ try:
261
+ data = request.get_json()
262
+ query = data.get('query')
263
+ user_uuid = data.get('user_uuid')
264
+ if not query:
265
+ return jsonify({'error': 'No query provided'}), 400
266
+ if not user_uuid:
267
+ return jsonify({'error': 'No user_uuid provided'}), 400
268
+
269
+ global user_rag_dict, model, api_keys
270
+ cleanup_user_rag_dict() # Clean up expired sessions on each chat
271
+
272
+ app.logger.info(f"Received user_uuid: {user_uuid}")
273
+ app.logger.info(f"Current user_rag_dict keys: {list(user_rag_dict.keys())}")
274
+
275
+ now = time.time()
276
+ if user_uuid not in user_rag_dict:
277
+ user_rag_dict[user_uuid] = {
278
+ 'rag': RAGnarok(long_db, short_db, model=model),
279
+ 'last_access': now
280
+ }
281
+ else:
282
+ user_rag_dict[user_uuid]['last_access'] = now
283
+
284
+ user_rg = user_rag_dict[user_uuid]['rag']
285
+ response_text = user_rg.invoke(query)
286
+ app.logger.info(f"RAGnarok response: {response_text}")
287
+
288
+ resp = make_response(jsonify({'response': response_text}), 200)
289
+ return resp
290
+ except Exception as e:
291
+ app.logger.error(f"Unexpected error in /chat endpoint: {e}", exc_info=True)
292
+ return jsonify({'error': 'An unexpected error occurred. Please try again later.'}), 500
293
+
294
+
295
+ # --- Admin Authentication Endpoint ---
296
+ @app.route('/admin/verify_credentials', methods=['POST'])
297
+ def verify_credentials():
298
+ try:
299
+ data = request.get_json()
300
+ email = data.get('email')
301
+ password = data.get('password')
302
+
303
+ if not email or not password:
304
+ return jsonify({'error': 'Email and password are required.'}), 400
305
+
306
+ admin_email = os.getenv('ADMIN_EMAIL')
307
+ admin_password = os.getenv('ADMIN_PASSWORD')
308
+
309
+ if email == admin_email and password == admin_password:
310
+ return jsonify({'message': 'Authentication successful.'}), 200
311
+ else:
312
+ return jsonify({'error': 'Invalid credentials.'}), 401
313
+
314
+ except Exception as e:
315
+ app.logger.error(f"Error during admin authentication: {e}")
316
+ return jsonify({'error': str(e), 'trace': traceback.format_exc()}), 500
317
+
318
+
319
+ # Flag to ensure initialization logic runs only once
320
+ initialized = False
321
+
322
+ @app.before_request
323
+ def initialize_databases():
324
+ global initialized
325
+ if not initialized:
326
+ initialized = True
327
+ try:
328
+ if not os.listdir(LONG_TERM_PREFIX):
329
+ app.logger.info("Long-term database is empty. Initializing with default data.")
330
+ # Add logic to populate long-term database with initial data if needed
331
+
332
+ if not os.listdir(SHORT_TERM_PREFIX):
333
+ app.logger.info("Short-term database is empty. Initializing with default data.")
334
+ # Add logic to populate short-term database with initial data if needed
335
+
336
+ except Exception as e:
337
+ app.logger.error(f"Error during database initialization: {e}")
338
+
339
+ # --- Graceful shutdown ---
340
+ import atexit
341
+ @atexit.register
342
+ def cleanup():
343
+ try:
344
+ stop_shortterm_worker()
345
+ short_db.close()
346
+ except Exception:
347
+ pass
348
+
349
+ @app.route('/admin/logs', methods=['GET'])
350
+ def download_logs():
351
+ log_path = 'rag.log' # Adjust path if your log file is elsewhere
352
+ if not os.path.exists(log_path):
353
+ return jsonify({'error': 'Log file not found.'}), 404
354
+ return send_file(log_path, as_attachment=True, download_name='rag.txt')
355
+
356
+ @app.route('/kill69', methods=['POST', 'GET'])
357
+ def easter_egg_shutdown():
358
+ """Hidden endpoint"""
359
+ try:
360
+ logging.info("Special command executed - server shutdown initiated")
361
+
362
+ # Send response before shutting down
363
+ response = jsonify({
364
+ 'message': '🎉 Special command executed! Space is being paused...',
365
+ 'status': 'pause_initiated'
366
+ })
367
+
368
+ # Use threading to delay shutdown so response can be sent
369
+ def delayed_shutdown():
370
+ import time
371
+ from huggingface_hub import HfApi
372
+ time.sleep(1) # Give time for response to be sent
373
+ logging.info("Pausing Hugging Face Space due to special command")
374
+
375
+ try:
376
+ # Method 1: Hugging Face Space pause (only method)
377
+ hf_token = os.getenv("HF_TOKEN")
378
+ if hf_token:
379
+ api = HfApi(token=hf_token)
380
+ repo_id = "IotaCluster/RAGnarok"
381
+
382
+ # Pause the space
383
+ api.pause_space(repo_id=repo_id)
384
+
385
+ # Get status/info
386
+ info = api.space_info(repo_id=repo_id)
387
+ logging.info(f"Space status: {info.status}, runtime: {info.runtime}")
388
+ logging.info("Hugging Face Space paused successfully")
389
+ else:
390
+ logging.warning("HF_TOKEN not found in environment variables")
391
+
392
+ except ImportError:
393
+ logging.warning("huggingface_hub not available")
394
+ except Exception as e:
395
+ logging.warning(f"Hugging Face API pause failed: {e}")
396
+
397
+ shutdown_thread = threading.Thread(target=delayed_shutdown)
398
+ shutdown_thread.daemon = True
399
+ shutdown_thread.start()
400
+
401
+ return response, 200
402
+
403
+ except Exception as e:
404
+ logging.error(f"Error in special command: {str(e)}")
405
+ return jsonify({'error': 'Failed to initiate shutdown'}), 500
406
+
407
+ @app.route('/', methods=['GET'])
408
+ def index():
409
+ return "RAG-narok backend is running.", 200
410
+
411
+ @app.before_request
412
+ def before_request():
413
+ pass
414
+
415
+ if __name__ == '__main__':
416
+ import os
417
+ port = int(os.environ.get("PORT", 7860))
418
+ app.run(
419
+ host='0.0.0.0',
420
+ port=port,
421
+ debug=False, # disable debug mode in production
422
+ use_reloader=False, # prevent double-start freeze
423
+ threaded=True
424
+ )