""" NotebookLM Clone - Main Flask Application AI-powered document intelligence platform with RAG Supports Admin/Employee roles and Bucket organization """ import os import uuid from functools import wraps from flask import Flask, request, jsonify, send_from_directory, send_file, Response from flask_cors import CORS from werkzeug.utils import secure_filename from config import Config from services.auth_service import auth_service from services.document_processor import document_processor from services.chroma_service import chroma_service from services.rag_service import rag_service from services.metadata_extractor import metadata_extractor # Initialize Flask app app = Flask(__name__, static_folder='static') app.config['MAX_CONTENT_LENGTH'] = Config.MAX_CONTENT_LENGTH CORS(app) # Ensure upload directory exists os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True) # ==================== Auth Decorators ==================== def require_auth(f): """Decorator to require authentication""" @wraps(f) def decorated(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({"error": "Missing or invalid authorization header"}), 401 token = auth_header.split(' ')[1] user = auth_service.get_current_user(token) if not user: return jsonify({"error": "Invalid or expired token"}), 401 request.current_user = user return f(*args, **kwargs) return decorated def require_admin(f): """Decorator to require admin role""" @wraps(f) def decorated(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({"error": "Missing or invalid authorization header"}), 401 token = auth_header.split(' ')[1] user = auth_service.get_current_user(token) if not user: return jsonify({"error": "Invalid or expired token"}), 401 if user.get('role') != 'admin': return jsonify({"error": "Admin access required"}), 403 request.current_user = user return f(*args, **kwargs) return decorated # ==================== Static Routes ==================== @app.route('/') def index(): return send_from_directory(app.static_folder, 'index.html') @app.route('/') def serve_static(path): return send_from_directory(app.static_folder, path) # ==================== Auth Routes ==================== @app.route('/api/auth/register/admin', methods=['POST']) def register_admin(): data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 username = data.get('username', '').strip() password = data.get('password', '') email = data.get('email', '').strip() result = auth_service.register_admin(username, password, email) if result['success']: return jsonify({ "token": result['token'], "user_id": result['user_id'], "username": result['username'], "role": result['role'] }) else: return jsonify({"error": result['error']}), 400 @app.route('/api/auth/login', methods=['POST']) def login(): data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 username = data.get('username', '').strip() password = data.get('password', '') role = data.get('role', 'admin') result = auth_service.login(username, password, role) if result['success']: return jsonify({ "token": result['token'], "user_id": result['user_id'], "username": result['username'], "role": result['role'] }) else: return jsonify({"error": result['error']}), 401 @app.route('/api/auth/verify', methods=['GET']) @require_auth def verify_token(): return jsonify({ "user_id": request.current_user['user_id'], "username": request.current_user['username'], "role": request.current_user.get('role', 'admin') }) # ==================== Admin Employee Management ==================== @app.route('/api/admin/employees', methods=['GET']) @require_admin def list_employees(): employees = auth_service.get_admin_employees(request.current_user['user_id']) return jsonify({"employees": employees}) @app.route('/api/admin/employees', methods=['POST']) @require_admin def add_employee(): data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 email = data.get('email', '').strip() password = data.get('password', '') result = auth_service.register_employee( admin_user_id=request.current_user['user_id'], email=email, password=password ) if result['success']: return jsonify({"user_id": result['user_id'], "email": result['email']}) else: return jsonify({"error": result['error']}), 400 @app.route('/api/admin/employees/', methods=['DELETE']) @require_admin def delete_employee(employee_id): success = auth_service.delete_employee( admin_user_id=request.current_user['user_id'], employee_id=employee_id ) if success: return jsonify({"success": True}) else: return jsonify({"error": "Employee not found or access denied"}), 404 # ==================== Bucket Routes ==================== @app.route('/api/buckets', methods=['GET']) @require_auth def list_buckets(): """List all buckets for current user""" buckets = chroma_service.get_user_buckets(request.current_user['user_id']) return jsonify({"buckets": buckets}) @app.route('/api/buckets', methods=['POST']) @require_auth def create_bucket(): """Create a new bucket""" data = request.get_json() if not data or not data.get('name'): return jsonify({"error": "Bucket name is required"}), 400 name = data.get('name', '').strip() description = data.get('description', '').strip() result = chroma_service.create_bucket( user_id=request.current_user['user_id'], name=name, description=description ) return jsonify(result) @app.route('/api/buckets/', methods=['DELETE']) @require_auth def delete_bucket(bucket_id): """Delete a bucket""" success = chroma_service.delete_bucket( bucket_id=bucket_id, user_id=request.current_user['user_id'] ) if success: return jsonify({"success": True}) else: return jsonify({"error": "Bucket not found or access denied"}), 404 # ==================== Document Routes ==================== # ==================== Async Processing ==================== # Global status store: doc_id -> {status, progress, message, result, error} processing_status = {} def process_document_background(doc_id, user_id, file_path, filename, bucket_id): """Background task for processing documents""" import threading try: processing_status[doc_id] = { "status": "processing", "progress": 10, "message": "Starting processing..." } print(f"[BACKGROUND] Processing file: {filename}") # Step 1: Text Extraction (OCR) processing_status[doc_id]["message"] = "Extracting text (OCR)..." processing_status[doc_id]["progress"] = 20 result = document_processor.process(file_path, filename) if not result['success']: processing_status[doc_id] = { "status": "failed", "error": result['error'] } if os.path.exists(file_path): os.remove(file_path) return processing_status[doc_id]["progress"] = 50 processing_status[doc_id]["message"] = "Storing document..." # Step 2: Store Metadata doc_type = document_processor.get_file_type(filename) chroma_service.store_document( user_id=user_id, doc_id=doc_id, filename=filename, doc_type=doc_type, content=result['text'], bucket_id=bucket_id ) processing_status[doc_id]["progress"] = 70 processing_status[doc_id]["message"] = "generating embeddings..." # Step 3: Chunking & Embeddings chunk_count = rag_service.process_document( user_id=user_id, doc_id=doc_id, content=result['text'], bucket_id=bucket_id ) processing_status[doc_id]["progress"] = 90 processing_status[doc_id]["message"] = "Generating summary..." # Step 4: Summary Generation summary_result = rag_service.generate_summary(result['text'], filename) summary = summary_result.get('summary', f'Document: {filename}') # Step 5: Extract and store metadata for aggregate queries (NEW) processing_status[doc_id]["progress"] = 95 processing_status[doc_id]["message"] = "Extracting metadata..." try: # Extract structured metadata from document metadata = metadata_extractor.extract_metadata(result['text'], filename) # Store metadata for aggregate queries chroma_service.store_document_metadata( doc_id=doc_id, user_id=user_id, bucket_id=bucket_id, metadata=metadata ) # Store summary chunk for aggregate queries chroma_service.store_summary_chunk( doc_id=doc_id, user_id=user_id, summary=summary, bucket_id=bucket_id, filename=filename ) print(f"[METADATA] Extracted and stored metadata for {filename}") except Exception as e: print(f"[METADATA] Warning: Failed to extract metadata for {filename}: {e}") # Non-fatal - continue processing # Complete processing_status[doc_id] = { "status": "completed", "progress": 100, "message": "Complete", "result": { "doc_id": doc_id, "filename": filename, "doc_type": doc_type, "bucket_id": bucket_id, "chunk_count": chunk_count, "summary": summary } } print(f"[BACKGROUND] Completed {filename}") except Exception as e: import traceback print(f"[BACKGROUND ERROR] {str(e)}") print(traceback.format_exc()) processing_status[doc_id] = { "status": "failed", "error": str(e) } if os.path.exists(file_path): try: os.remove(file_path) except: pass @app.route('/api/documents/upload', methods=['POST']) @require_auth def upload_document(): """Upload and process a document (Async)""" if 'file' not in request.files: return jsonify({"error": "No file provided"}), 400 file = request.files['file'] bucket_id = request.form.get('bucket_id', '') if file.filename == '': return jsonify({"error": "No file selected"}), 400 if not document_processor.is_supported(file.filename): return jsonify({"error": "Unsupported file type"}), 400 doc_id = str(uuid.uuid4()) filename = secure_filename(file.filename) user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id']) os.makedirs(user_folder, exist_ok=True) file_path = os.path.join(user_folder, f"{doc_id}_{filename}") file.save(file_path) # Initialize status processing_status[doc_id] = { "status": "queued", "progress": 0, "message": "Queued for processing..." } # Start background thread import threading thread = threading.Thread( target=process_document_background, args=(doc_id, request.current_user['user_id'], file_path, filename, bucket_id) ) thread.daemon = True thread.start() # Return immediately return jsonify({ "status": "queued", "doc_id": doc_id, "filename": filename, "message": "Upload accepted, processing in background" }), 202 @app.route('/api/documents//status', methods=['GET']) @require_auth def get_document_status(doc_id): """Get processing status of a document""" status = processing_status.get(doc_id) if not status: # Check if it exists in DB (might be completed and cleared from memory) # For now, just return not found if not in memory or DB check logic here # Simple version: return jsonify({"status": "unknown"}), 404 return jsonify(status) @app.route('/api/documents//summary', methods=['GET']) @require_auth def get_document_summary(doc_id): """Get or generate summary for a document""" doc = chroma_service.get_document(doc_id, request.current_user['user_id']) if not doc: return jsonify({"error": "Document not found"}), 404 # Get the full document content from the stored preview # For a more complete summary, we'd need to re-read the document content_preview = doc.get('content_preview', '') # Generate summary summary_result = rag_service.generate_summary(content_preview, doc['filename']) return jsonify({ "doc_id": doc_id, "filename": doc['filename'], "summary": summary_result.get('summary', f'Document: {doc["filename"]}'), "success": summary_result.get('success', False) }) @app.route('/api/documents', methods=['GET']) @require_auth def list_documents(): """List all documents, optionally filtered by bucket""" bucket_id = request.args.get('bucket_id') documents = chroma_service.get_user_documents( request.current_user['user_id'], bucket_id=bucket_id if bucket_id else None ) return jsonify({"documents": documents}) @app.route('/api/documents/', methods=['GET']) @require_auth def get_document(doc_id): """Get document details""" doc = chroma_service.get_document(doc_id, request.current_user['user_id']) if doc: return jsonify(doc) else: return jsonify({"error": "Document not found"}), 404 @app.route('/api/documents//view', methods=['GET']) @require_auth def view_document(doc_id): """View/download the actual document file""" doc = chroma_service.get_document(doc_id, request.current_user['user_id']) if not doc: return jsonify({"error": "Document not found"}), 404 user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id']) # Find the file for f in os.listdir(user_folder): if f.startswith(doc_id): file_path = os.path.join(user_folder, f) return send_file(file_path, as_attachment=False) return jsonify({"error": "File not found on server"}), 404 @app.route('/api/documents//bucket', methods=['PUT']) @require_auth def update_document_bucket(doc_id): """Move document to a different bucket""" data = request.get_json() bucket_id = data.get('bucket_id', '') if data else '' success = chroma_service.update_document_bucket( doc_id=doc_id, user_id=request.current_user['user_id'], bucket_id=bucket_id ) if success: return jsonify({"success": True}) else: return jsonify({"error": "Document not found or access denied"}), 404 @app.route('/api/documents/', methods=['DELETE']) @require_auth def delete_document(doc_id): """Delete a document""" success = chroma_service.delete_document( doc_id=doc_id, user_id=request.current_user['user_id'] ) if success: user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id']) try: for f in os.listdir(user_folder): if f.startswith(doc_id): os.remove(os.path.join(user_folder, f)) break except: pass return jsonify({"success": True}) else: return jsonify({"error": "Document not found or access denied"}), 404 # ==================== Chat/RAG Routes ==================== @app.route('/api/chat', methods=['POST']) @require_auth def chat(): """Process a chat query using RAG with optional bucket filtering""" data = request.get_json() if not data or not data.get('message'): return jsonify({"error": "No message provided"}), 400 message = data.get('message', '').strip() doc_ids = data.get('doc_ids') bucket_id = data.get('bucket_id') # New: filter by bucket conversation_history = data.get('history', []) result = rag_service.query( user_id=request.current_user['user_id'], query=message, doc_ids=doc_ids, bucket_id=bucket_id, conversation_history=conversation_history ) if result['success']: return jsonify({ "response": result['response'], "model": result.get('model', 'unknown'), "sources": result.get('sources', []), "source_files": result.get('source_files', []), "chunks_used": result.get('chunks_used', 0), "chunks_filtered": result.get('chunks_filtered', 0) }) else: return jsonify({"error": result['error']}), 500 @app.route('/api/chat/stream', methods=['POST']) @require_auth def chat_stream(): """Streaming chat endpoint - sends response chunks as they arrive""" import json import time start_time = time.time() print(f"[STREAM] Endpoint called") data = request.get_json() if not data or not data.get('message'): return jsonify({"error": "No message provided"}), 400 message = data.get('message', '').strip() bucket_id = data.get('bucket_id') chat_id = data.get('chat_id', '') # Get chat_id from request user_id = request.current_user['user_id'] print(f"[STREAM] Request parsed in {time.time()-start_time:.2f}s") def generate(): # Immediately yield to start the stream yield f"data: {json.dumps({'type': 'start'})}\n\n" sse_chunk_count = 0 for chunk in rag_service.query_stream( user_id=user_id, query=message, bucket_id=bucket_id, chat_id=chat_id ): sse_chunk_count += 1 if sse_chunk_count <= 5: print(f"[SSE] Sending chunk {sse_chunk_count}: type={chunk.get('type', 'unknown')}") yield f"data: {json.dumps(chunk)}\n\n" print(f"[SSE] Stream complete, sent {sse_chunk_count} chunks total") return Response( generate(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' } ) @app.route('/api/chat/clear', methods=['POST']) @require_auth def clear_chat_memory(): """Clear conversation memory for the current user""" data = request.get_json() or {} bucket_id = data.get('bucket_id') success = rag_service.clear_memory( user_id=request.current_user['user_id'], bucket_id=bucket_id ) if success: return jsonify({"success": True, "message": "Conversation memory cleared"}) else: return jsonify({"error": "Failed to clear memory"}), 500 @app.route('/api/cleanup/chunks', methods=['POST']) @require_auth def cleanup_user_chunks(): """Clear ALL chunks for the current user - use to fix stale data issues""" deleted_count = chroma_service.clear_all_user_chunks( user_id=request.current_user['user_id'] ) return jsonify({ "success": True, "message": f"Deleted {deleted_count} chunks. Please re-upload your documents." }) # ==================== Chat History Routes ==================== @app.route('/api/chats', methods=['GET']) @require_auth def list_chat_sessions(): """Get all chat sessions for current user""" sessions = chroma_service.get_user_chat_sessions(request.current_user['user_id']) return jsonify({"chats": sessions}) @app.route('/api/chats', methods=['POST']) @require_auth def save_chat_session(): """Save or update a chat session""" data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 chat_id = data.get('id') topic = data.get('topic', 'Chat') messages = data.get('messages', []) bucket_id = data.get('bucket', '') if not chat_id: return jsonify({"error": "Chat ID is required"}), 400 result = chroma_service.save_chat_session( user_id=request.current_user['user_id'], chat_id=chat_id, topic=topic, messages=messages, bucket_id=bucket_id ) return jsonify({"success": True, **result}) @app.route('/api/chats/', methods=['DELETE']) @require_auth def delete_chat_session(chat_id): """Delete a chat session""" success = chroma_service.delete_chat_session( user_id=request.current_user['user_id'], chat_id=chat_id ) if success: return jsonify({"success": True}) else: return jsonify({"error": "Chat not found or access denied"}), 404 # ==================== Health Check ==================== @app.route('/api/health', methods=['GET']) def health_check(): return jsonify({"status": "healthy", "version": "1.1.0"}) # ==================== Main ==================== if __name__ == '__main__': print("=" * 50) print("NotebookLM Clone - AI Document Intelligence") print("=" * 50) print(f"Upload folder: {Config.UPLOAD_FOLDER}") print(f"ChromaDB Cloud: {Config.CHROMA_TENANT}/{Config.CHROMA_DATABASE}") print("Starting server on http://localhost:5000") print("=" * 50) app.run(host='0.0.0.0', port=5000, debug=True)