Spaces:
Running
Running
| """ | |
| NotebookLM Clone - Main Flask Application | |
| AI-powered document intelligence platform with RAG | |
| Supports Admin/Employee roles and Bucket organization | |
| """ | |
| import os | |
| import uuid | |
| from functools import wraps | |
| from flask import Flask, request, jsonify, send_from_directory, send_file, Response | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| from config import Config | |
| from services.auth_service import auth_service | |
| from services.document_processor import document_processor | |
| from services.chroma_service import chroma_service | |
| from services.rag_service import rag_service | |
| from services.metadata_extractor import metadata_extractor | |
| # Initialize Flask app | |
| app = Flask(__name__, static_folder='static') | |
| app.config['MAX_CONTENT_LENGTH'] = Config.MAX_CONTENT_LENGTH | |
| CORS(app) | |
| # Ensure upload directory exists | |
| os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True) | |
| # ==================== Auth Decorators ==================== | |
| def require_auth(f): | |
| """Decorator to require authentication""" | |
| def decorated(*args, **kwargs): | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({"error": "Missing or invalid authorization header"}), 401 | |
| token = auth_header.split(' ')[1] | |
| user = auth_service.get_current_user(token) | |
| if not user: | |
| return jsonify({"error": "Invalid or expired token"}), 401 | |
| request.current_user = user | |
| return f(*args, **kwargs) | |
| return decorated | |
| def require_admin(f): | |
| """Decorator to require admin role""" | |
| def decorated(*args, **kwargs): | |
| auth_header = request.headers.get('Authorization') | |
| if not auth_header or not auth_header.startswith('Bearer '): | |
| return jsonify({"error": "Missing or invalid authorization header"}), 401 | |
| token = auth_header.split(' ')[1] | |
| user = auth_service.get_current_user(token) | |
| if not user: | |
| return jsonify({"error": "Invalid or expired token"}), 401 | |
| if user.get('role') != 'admin': | |
| return jsonify({"error": "Admin access required"}), 403 | |
| request.current_user = user | |
| return f(*args, **kwargs) | |
| return decorated | |
| # ==================== Static Routes ==================== | |
| def index(): | |
| return send_from_directory(app.static_folder, 'index.html') | |
| def serve_static(path): | |
| return send_from_directory(app.static_folder, path) | |
| # ==================== Auth Routes ==================== | |
| def register_admin(): | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"error": "No data provided"}), 400 | |
| username = data.get('username', '').strip() | |
| password = data.get('password', '') | |
| email = data.get('email', '').strip() | |
| result = auth_service.register_admin(username, password, email) | |
| if result['success']: | |
| return jsonify({ | |
| "token": result['token'], | |
| "user_id": result['user_id'], | |
| "username": result['username'], | |
| "role": result['role'] | |
| }) | |
| else: | |
| return jsonify({"error": result['error']}), 400 | |
| def login(): | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"error": "No data provided"}), 400 | |
| username = data.get('username', '').strip() | |
| password = data.get('password', '') | |
| role = data.get('role', 'admin') | |
| result = auth_service.login(username, password, role) | |
| if result['success']: | |
| return jsonify({ | |
| "token": result['token'], | |
| "user_id": result['user_id'], | |
| "username": result['username'], | |
| "role": result['role'] | |
| }) | |
| else: | |
| return jsonify({"error": result['error']}), 401 | |
| def verify_token(): | |
| return jsonify({ | |
| "user_id": request.current_user['user_id'], | |
| "username": request.current_user['username'], | |
| "role": request.current_user.get('role', 'admin') | |
| }) | |
| # ==================== Admin Employee Management ==================== | |
| def list_employees(): | |
| employees = auth_service.get_admin_employees(request.current_user['user_id']) | |
| return jsonify({"employees": employees}) | |
| def add_employee(): | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"error": "No data provided"}), 400 | |
| email = data.get('email', '').strip() | |
| password = data.get('password', '') | |
| result = auth_service.register_employee( | |
| admin_user_id=request.current_user['user_id'], | |
| email=email, | |
| password=password | |
| ) | |
| if result['success']: | |
| return jsonify({"user_id": result['user_id'], "email": result['email']}) | |
| else: | |
| return jsonify({"error": result['error']}), 400 | |
| def delete_employee(employee_id): | |
| success = auth_service.delete_employee( | |
| admin_user_id=request.current_user['user_id'], | |
| employee_id=employee_id | |
| ) | |
| if success: | |
| return jsonify({"success": True}) | |
| else: | |
| return jsonify({"error": "Employee not found or access denied"}), 404 | |
| # ==================== Bucket Routes ==================== | |
| def list_buckets(): | |
| """List all buckets for current user""" | |
| buckets = chroma_service.get_user_buckets(request.current_user['user_id']) | |
| return jsonify({"buckets": buckets}) | |
| def create_bucket(): | |
| """Create a new bucket""" | |
| data = request.get_json() | |
| if not data or not data.get('name'): | |
| return jsonify({"error": "Bucket name is required"}), 400 | |
| name = data.get('name', '').strip() | |
| description = data.get('description', '').strip() | |
| result = chroma_service.create_bucket( | |
| user_id=request.current_user['user_id'], | |
| name=name, | |
| description=description | |
| ) | |
| return jsonify(result) | |
| def delete_bucket(bucket_id): | |
| """Delete a bucket""" | |
| success = chroma_service.delete_bucket( | |
| bucket_id=bucket_id, | |
| user_id=request.current_user['user_id'] | |
| ) | |
| if success: | |
| return jsonify({"success": True}) | |
| else: | |
| return jsonify({"error": "Bucket not found or access denied"}), 404 | |
| # ==================== Document Routes ==================== | |
| # ==================== Async Processing ==================== | |
| # Global status store: doc_id -> {status, progress, message, result, error} | |
| processing_status = {} | |
| def process_document_background(doc_id, user_id, file_path, filename, bucket_id): | |
| """Background task for processing documents""" | |
| import threading | |
| try: | |
| processing_status[doc_id] = { | |
| "status": "processing", | |
| "progress": 10, | |
| "message": "Starting processing..." | |
| } | |
| print(f"[BACKGROUND] Processing file: {filename}") | |
| # Step 1: Text Extraction (OCR) | |
| processing_status[doc_id]["message"] = "Extracting text (OCR)..." | |
| processing_status[doc_id]["progress"] = 20 | |
| result = document_processor.process(file_path, filename) | |
| if not result['success']: | |
| processing_status[doc_id] = { | |
| "status": "failed", | |
| "error": result['error'] | |
| } | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| return | |
| processing_status[doc_id]["progress"] = 50 | |
| processing_status[doc_id]["message"] = "Storing document..." | |
| # Step 2: Store Metadata | |
| doc_type = document_processor.get_file_type(filename) | |
| chroma_service.store_document( | |
| user_id=user_id, | |
| doc_id=doc_id, | |
| filename=filename, | |
| doc_type=doc_type, | |
| content=result['text'], | |
| bucket_id=bucket_id | |
| ) | |
| processing_status[doc_id]["progress"] = 70 | |
| processing_status[doc_id]["message"] = "generating embeddings..." | |
| # Step 3: Chunking & Embeddings | |
| chunk_count = rag_service.process_document( | |
| user_id=user_id, | |
| doc_id=doc_id, | |
| content=result['text'], | |
| bucket_id=bucket_id | |
| ) | |
| processing_status[doc_id]["progress"] = 90 | |
| processing_status[doc_id]["message"] = "Generating summary..." | |
| # Step 4: Summary Generation | |
| summary_result = rag_service.generate_summary(result['text'], filename) | |
| summary = summary_result.get('summary', f'Document: {filename}') | |
| # Step 5: Extract and store metadata for aggregate queries (NEW) | |
| processing_status[doc_id]["progress"] = 95 | |
| processing_status[doc_id]["message"] = "Extracting metadata..." | |
| try: | |
| # Extract structured metadata from document | |
| metadata = metadata_extractor.extract_metadata(result['text'], filename) | |
| # Store metadata for aggregate queries | |
| chroma_service.store_document_metadata( | |
| doc_id=doc_id, | |
| user_id=user_id, | |
| bucket_id=bucket_id, | |
| metadata=metadata | |
| ) | |
| # Store summary chunk for aggregate queries | |
| chroma_service.store_summary_chunk( | |
| doc_id=doc_id, | |
| user_id=user_id, | |
| summary=summary, | |
| bucket_id=bucket_id, | |
| filename=filename | |
| ) | |
| print(f"[METADATA] Extracted and stored metadata for {filename}") | |
| except Exception as e: | |
| print(f"[METADATA] Warning: Failed to extract metadata for {filename}: {e}") | |
| # Non-fatal - continue processing | |
| # Complete | |
| processing_status[doc_id] = { | |
| "status": "completed", | |
| "progress": 100, | |
| "message": "Complete", | |
| "result": { | |
| "doc_id": doc_id, | |
| "filename": filename, | |
| "doc_type": doc_type, | |
| "bucket_id": bucket_id, | |
| "chunk_count": chunk_count, | |
| "summary": summary | |
| } | |
| } | |
| print(f"[BACKGROUND] Completed {filename}") | |
| except Exception as e: | |
| import traceback | |
| print(f"[BACKGROUND ERROR] {str(e)}") | |
| print(traceback.format_exc()) | |
| processing_status[doc_id] = { | |
| "status": "failed", | |
| "error": str(e) | |
| } | |
| if os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| except: | |
| pass | |
| def upload_document(): | |
| """Upload and process a document (Async)""" | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file provided"}), 400 | |
| file = request.files['file'] | |
| bucket_id = request.form.get('bucket_id', '') | |
| if file.filename == '': | |
| return jsonify({"error": "No file selected"}), 400 | |
| if not document_processor.is_supported(file.filename): | |
| return jsonify({"error": "Unsupported file type"}), 400 | |
| doc_id = str(uuid.uuid4()) | |
| filename = secure_filename(file.filename) | |
| user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id']) | |
| os.makedirs(user_folder, exist_ok=True) | |
| file_path = os.path.join(user_folder, f"{doc_id}_{filename}") | |
| file.save(file_path) | |
| # Initialize status | |
| processing_status[doc_id] = { | |
| "status": "queued", | |
| "progress": 0, | |
| "message": "Queued for processing..." | |
| } | |
| # Start background thread | |
| import threading | |
| thread = threading.Thread( | |
| target=process_document_background, | |
| args=(doc_id, request.current_user['user_id'], file_path, filename, bucket_id) | |
| ) | |
| thread.daemon = True | |
| thread.start() | |
| # Return immediately | |
| return jsonify({ | |
| "status": "queued", | |
| "doc_id": doc_id, | |
| "filename": filename, | |
| "message": "Upload accepted, processing in background" | |
| }), 202 | |
| def get_document_status(doc_id): | |
| """Get processing status of a document""" | |
| status = processing_status.get(doc_id) | |
| if not status: | |
| # Check if it exists in DB (might be completed and cleared from memory) | |
| # For now, just return not found if not in memory or DB check logic here | |
| # Simple version: | |
| return jsonify({"status": "unknown"}), 404 | |
| return jsonify(status) | |
| def get_document_summary(doc_id): | |
| """Get or generate summary for a document""" | |
| doc = chroma_service.get_document(doc_id, request.current_user['user_id']) | |
| if not doc: | |
| return jsonify({"error": "Document not found"}), 404 | |
| # Get the full document content from the stored preview | |
| # For a more complete summary, we'd need to re-read the document | |
| content_preview = doc.get('content_preview', '') | |
| # Generate summary | |
| summary_result = rag_service.generate_summary(content_preview, doc['filename']) | |
| return jsonify({ | |
| "doc_id": doc_id, | |
| "filename": doc['filename'], | |
| "summary": summary_result.get('summary', f'Document: {doc["filename"]}'), | |
| "success": summary_result.get('success', False) | |
| }) | |
| def list_documents(): | |
| """List all documents, optionally filtered by bucket""" | |
| bucket_id = request.args.get('bucket_id') | |
| documents = chroma_service.get_user_documents( | |
| request.current_user['user_id'], | |
| bucket_id=bucket_id if bucket_id else None | |
| ) | |
| return jsonify({"documents": documents}) | |
| def get_document(doc_id): | |
| """Get document details""" | |
| doc = chroma_service.get_document(doc_id, request.current_user['user_id']) | |
| if doc: | |
| return jsonify(doc) | |
| else: | |
| return jsonify({"error": "Document not found"}), 404 | |
| def view_document(doc_id): | |
| """View/download the actual document file""" | |
| doc = chroma_service.get_document(doc_id, request.current_user['user_id']) | |
| if not doc: | |
| return jsonify({"error": "Document not found"}), 404 | |
| user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id']) | |
| # Find the file | |
| for f in os.listdir(user_folder): | |
| if f.startswith(doc_id): | |
| file_path = os.path.join(user_folder, f) | |
| return send_file(file_path, as_attachment=False) | |
| return jsonify({"error": "File not found on server"}), 404 | |
| def update_document_bucket(doc_id): | |
| """Move document to a different bucket""" | |
| data = request.get_json() | |
| bucket_id = data.get('bucket_id', '') if data else '' | |
| success = chroma_service.update_document_bucket( | |
| doc_id=doc_id, | |
| user_id=request.current_user['user_id'], | |
| bucket_id=bucket_id | |
| ) | |
| if success: | |
| return jsonify({"success": True}) | |
| else: | |
| return jsonify({"error": "Document not found or access denied"}), 404 | |
| def delete_document(doc_id): | |
| """Delete a document""" | |
| success = chroma_service.delete_document( | |
| doc_id=doc_id, | |
| user_id=request.current_user['user_id'] | |
| ) | |
| if success: | |
| user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id']) | |
| try: | |
| for f in os.listdir(user_folder): | |
| if f.startswith(doc_id): | |
| os.remove(os.path.join(user_folder, f)) | |
| break | |
| except: | |
| pass | |
| return jsonify({"success": True}) | |
| else: | |
| return jsonify({"error": "Document not found or access denied"}), 404 | |
| # ==================== Chat/RAG Routes ==================== | |
| def chat(): | |
| """Process a chat query using RAG with optional bucket filtering""" | |
| data = request.get_json() | |
| if not data or not data.get('message'): | |
| return jsonify({"error": "No message provided"}), 400 | |
| message = data.get('message', '').strip() | |
| doc_ids = data.get('doc_ids') | |
| bucket_id = data.get('bucket_id') # New: filter by bucket | |
| conversation_history = data.get('history', []) | |
| result = rag_service.query( | |
| user_id=request.current_user['user_id'], | |
| query=message, | |
| doc_ids=doc_ids, | |
| bucket_id=bucket_id, | |
| conversation_history=conversation_history | |
| ) | |
| if result['success']: | |
| return jsonify({ | |
| "response": result['response'], | |
| "model": result.get('model', 'unknown'), | |
| "sources": result.get('sources', []), | |
| "source_files": result.get('source_files', []), | |
| "chunks_used": result.get('chunks_used', 0), | |
| "chunks_filtered": result.get('chunks_filtered', 0) | |
| }) | |
| else: | |
| return jsonify({"error": result['error']}), 500 | |
| def chat_stream(): | |
| """Streaming chat endpoint - sends response chunks as they arrive""" | |
| import json | |
| import time | |
| start_time = time.time() | |
| print(f"[STREAM] Endpoint called") | |
| data = request.get_json() | |
| if not data or not data.get('message'): | |
| return jsonify({"error": "No message provided"}), 400 | |
| message = data.get('message', '').strip() | |
| bucket_id = data.get('bucket_id') | |
| chat_id = data.get('chat_id', '') # Get chat_id from request | |
| user_id = request.current_user['user_id'] | |
| print(f"[STREAM] Request parsed in {time.time()-start_time:.2f}s") | |
| def generate(): | |
| # Immediately yield to start the stream | |
| yield f"data: {json.dumps({'type': 'start'})}\n\n" | |
| sse_chunk_count = 0 | |
| for chunk in rag_service.query_stream( | |
| user_id=user_id, | |
| query=message, | |
| bucket_id=bucket_id, | |
| chat_id=chat_id | |
| ): | |
| sse_chunk_count += 1 | |
| if sse_chunk_count <= 5: | |
| print(f"[SSE] Sending chunk {sse_chunk_count}: type={chunk.get('type', 'unknown')}") | |
| yield f"data: {json.dumps(chunk)}\n\n" | |
| print(f"[SSE] Stream complete, sent {sse_chunk_count} chunks total") | |
| return Response( | |
| generate(), | |
| mimetype='text/event-stream', | |
| headers={ | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive', | |
| 'X-Accel-Buffering': 'no' | |
| } | |
| ) | |
| def clear_chat_memory(): | |
| """Clear conversation memory for the current user""" | |
| data = request.get_json() or {} | |
| bucket_id = data.get('bucket_id') | |
| success = rag_service.clear_memory( | |
| user_id=request.current_user['user_id'], | |
| bucket_id=bucket_id | |
| ) | |
| if success: | |
| return jsonify({"success": True, "message": "Conversation memory cleared"}) | |
| else: | |
| return jsonify({"error": "Failed to clear memory"}), 500 | |
| def cleanup_user_chunks(): | |
| """Clear ALL chunks for the current user - use to fix stale data issues""" | |
| deleted_count = chroma_service.clear_all_user_chunks( | |
| user_id=request.current_user['user_id'] | |
| ) | |
| return jsonify({ | |
| "success": True, | |
| "message": f"Deleted {deleted_count} chunks. Please re-upload your documents." | |
| }) | |
| # ==================== Chat History Routes ==================== | |
| def list_chat_sessions(): | |
| """Get all chat sessions for current user""" | |
| sessions = chroma_service.get_user_chat_sessions(request.current_user['user_id']) | |
| return jsonify({"chats": sessions}) | |
| def save_chat_session(): | |
| """Save or update a chat session""" | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"error": "No data provided"}), 400 | |
| chat_id = data.get('id') | |
| topic = data.get('topic', 'Chat') | |
| messages = data.get('messages', []) | |
| bucket_id = data.get('bucket', '') | |
| if not chat_id: | |
| return jsonify({"error": "Chat ID is required"}), 400 | |
| result = chroma_service.save_chat_session( | |
| user_id=request.current_user['user_id'], | |
| chat_id=chat_id, | |
| topic=topic, | |
| messages=messages, | |
| bucket_id=bucket_id | |
| ) | |
| return jsonify({"success": True, **result}) | |
| def delete_chat_session(chat_id): | |
| """Delete a chat session""" | |
| success = chroma_service.delete_chat_session( | |
| user_id=request.current_user['user_id'], | |
| chat_id=chat_id | |
| ) | |
| if success: | |
| return jsonify({"success": True}) | |
| else: | |
| return jsonify({"error": "Chat not found or access denied"}), 404 | |
| # ==================== Health Check ==================== | |
| def health_check(): | |
| return jsonify({"status": "healthy", "version": "1.1.0"}) | |
| # ==================== Main ==================== | |
| if __name__ == '__main__': | |
| print("=" * 50) | |
| print("NotebookLM Clone - AI Document Intelligence") | |
| print("=" * 50) | |
| print(f"Upload folder: {Config.UPLOAD_FOLDER}") | |
| print(f"ChromaDB Cloud: {Config.CHROMA_TENANT}/{Config.CHROMA_DATABASE}") | |
| print("Starting server on http://localhost:5000") | |
| print("=" * 50) | |
| app.run(host='0.0.0.0', port=5000, debug=True) | |