notebooklm-fast / app.py
jashdoshi77
feat: Add AI-powered query understanding with DeepSeek parsing
64deb3c
"""
NotebookLM Clone - Main Flask Application
AI-powered document intelligence platform with RAG
Supports Admin/Employee roles and Bucket organization
"""
import os
import uuid
from functools import wraps
from flask import Flask, request, jsonify, send_from_directory, send_file, Response
from flask_cors import CORS
from werkzeug.utils import secure_filename
from config import Config
from services.auth_service import auth_service
from services.document_processor import document_processor
from services.chroma_service import chroma_service
from services.rag_service import rag_service
from services.metadata_extractor import metadata_extractor
# Initialize Flask app
app = Flask(__name__, static_folder='static')
app.config['MAX_CONTENT_LENGTH'] = Config.MAX_CONTENT_LENGTH
CORS(app)
# Ensure upload directory exists
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
# ==================== Auth Decorators ====================
def require_auth(f):
"""Decorator to require authentication"""
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({"error": "Missing or invalid authorization header"}), 401
token = auth_header.split(' ')[1]
user = auth_service.get_current_user(token)
if not user:
return jsonify({"error": "Invalid or expired token"}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated
def require_admin(f):
"""Decorator to require admin role"""
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({"error": "Missing or invalid authorization header"}), 401
token = auth_header.split(' ')[1]
user = auth_service.get_current_user(token)
if not user:
return jsonify({"error": "Invalid or expired token"}), 401
if user.get('role') != 'admin':
return jsonify({"error": "Admin access required"}), 403
request.current_user = user
return f(*args, **kwargs)
return decorated
# ==================== Static Routes ====================
@app.route('/')
def index():
return send_from_directory(app.static_folder, 'index.html')
@app.route('/<path:path>')
def serve_static(path):
return send_from_directory(app.static_folder, path)
# ==================== Auth Routes ====================
@app.route('/api/auth/register/admin', methods=['POST'])
def register_admin():
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
username = data.get('username', '').strip()
password = data.get('password', '')
email = data.get('email', '').strip()
result = auth_service.register_admin(username, password, email)
if result['success']:
return jsonify({
"token": result['token'],
"user_id": result['user_id'],
"username": result['username'],
"role": result['role']
})
else:
return jsonify({"error": result['error']}), 400
@app.route('/api/auth/login', methods=['POST'])
def login():
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
username = data.get('username', '').strip()
password = data.get('password', '')
role = data.get('role', 'admin')
result = auth_service.login(username, password, role)
if result['success']:
return jsonify({
"token": result['token'],
"user_id": result['user_id'],
"username": result['username'],
"role": result['role']
})
else:
return jsonify({"error": result['error']}), 401
@app.route('/api/auth/verify', methods=['GET'])
@require_auth
def verify_token():
return jsonify({
"user_id": request.current_user['user_id'],
"username": request.current_user['username'],
"role": request.current_user.get('role', 'admin')
})
# ==================== Admin Employee Management ====================
@app.route('/api/admin/employees', methods=['GET'])
@require_admin
def list_employees():
employees = auth_service.get_admin_employees(request.current_user['user_id'])
return jsonify({"employees": employees})
@app.route('/api/admin/employees', methods=['POST'])
@require_admin
def add_employee():
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
email = data.get('email', '').strip()
password = data.get('password', '')
result = auth_service.register_employee(
admin_user_id=request.current_user['user_id'],
email=email,
password=password
)
if result['success']:
return jsonify({"user_id": result['user_id'], "email": result['email']})
else:
return jsonify({"error": result['error']}), 400
@app.route('/api/admin/employees/<employee_id>', methods=['DELETE'])
@require_admin
def delete_employee(employee_id):
success = auth_service.delete_employee(
admin_user_id=request.current_user['user_id'],
employee_id=employee_id
)
if success:
return jsonify({"success": True})
else:
return jsonify({"error": "Employee not found or access denied"}), 404
# ==================== Bucket Routes ====================
@app.route('/api/buckets', methods=['GET'])
@require_auth
def list_buckets():
"""List all buckets for current user"""
buckets = chroma_service.get_user_buckets(request.current_user['user_id'])
return jsonify({"buckets": buckets})
@app.route('/api/buckets', methods=['POST'])
@require_auth
def create_bucket():
"""Create a new bucket"""
data = request.get_json()
if not data or not data.get('name'):
return jsonify({"error": "Bucket name is required"}), 400
name = data.get('name', '').strip()
description = data.get('description', '').strip()
result = chroma_service.create_bucket(
user_id=request.current_user['user_id'],
name=name,
description=description
)
return jsonify(result)
@app.route('/api/buckets/<bucket_id>', methods=['DELETE'])
@require_auth
def delete_bucket(bucket_id):
"""Delete a bucket"""
success = chroma_service.delete_bucket(
bucket_id=bucket_id,
user_id=request.current_user['user_id']
)
if success:
return jsonify({"success": True})
else:
return jsonify({"error": "Bucket not found or access denied"}), 404
# ==================== Document Routes ====================
# ==================== Async Processing ====================
# Global status store: doc_id -> {status, progress, message, result, error}
processing_status = {}
def process_document_background(doc_id, user_id, file_path, filename, bucket_id):
"""Background task for processing documents"""
import threading
try:
processing_status[doc_id] = {
"status": "processing",
"progress": 10,
"message": "Starting processing..."
}
print(f"[BACKGROUND] Processing file: {filename}")
# Step 1: Text Extraction (OCR)
processing_status[doc_id]["message"] = "Extracting text (OCR)..."
processing_status[doc_id]["progress"] = 20
result = document_processor.process(file_path, filename)
if not result['success']:
processing_status[doc_id] = {
"status": "failed",
"error": result['error']
}
if os.path.exists(file_path):
os.remove(file_path)
return
processing_status[doc_id]["progress"] = 50
processing_status[doc_id]["message"] = "Storing document..."
# Step 2: Store Metadata
doc_type = document_processor.get_file_type(filename)
chroma_service.store_document(
user_id=user_id,
doc_id=doc_id,
filename=filename,
doc_type=doc_type,
content=result['text'],
bucket_id=bucket_id
)
processing_status[doc_id]["progress"] = 70
processing_status[doc_id]["message"] = "generating embeddings..."
# Step 3: Chunking & Embeddings
chunk_count = rag_service.process_document(
user_id=user_id,
doc_id=doc_id,
content=result['text'],
bucket_id=bucket_id
)
processing_status[doc_id]["progress"] = 90
processing_status[doc_id]["message"] = "Generating summary..."
# Step 4: Summary Generation
summary_result = rag_service.generate_summary(result['text'], filename)
summary = summary_result.get('summary', f'Document: {filename}')
# Step 5: Extract and store metadata for aggregate queries (NEW)
processing_status[doc_id]["progress"] = 95
processing_status[doc_id]["message"] = "Extracting metadata..."
try:
# Extract structured metadata from document
metadata = metadata_extractor.extract_metadata(result['text'], filename)
# Store metadata for aggregate queries
chroma_service.store_document_metadata(
doc_id=doc_id,
user_id=user_id,
bucket_id=bucket_id,
metadata=metadata
)
# Store summary chunk for aggregate queries
chroma_service.store_summary_chunk(
doc_id=doc_id,
user_id=user_id,
summary=summary,
bucket_id=bucket_id,
filename=filename
)
print(f"[METADATA] Extracted and stored metadata for {filename}")
except Exception as e:
print(f"[METADATA] Warning: Failed to extract metadata for {filename}: {e}")
# Non-fatal - continue processing
# Complete
processing_status[doc_id] = {
"status": "completed",
"progress": 100,
"message": "Complete",
"result": {
"doc_id": doc_id,
"filename": filename,
"doc_type": doc_type,
"bucket_id": bucket_id,
"chunk_count": chunk_count,
"summary": summary
}
}
print(f"[BACKGROUND] Completed {filename}")
except Exception as e:
import traceback
print(f"[BACKGROUND ERROR] {str(e)}")
print(traceback.format_exc())
processing_status[doc_id] = {
"status": "failed",
"error": str(e)
}
if os.path.exists(file_path):
try:
os.remove(file_path)
except:
pass
@app.route('/api/documents/upload', methods=['POST'])
@require_auth
def upload_document():
"""Upload and process a document (Async)"""
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
file = request.files['file']
bucket_id = request.form.get('bucket_id', '')
if file.filename == '':
return jsonify({"error": "No file selected"}), 400
if not document_processor.is_supported(file.filename):
return jsonify({"error": "Unsupported file type"}), 400
doc_id = str(uuid.uuid4())
filename = secure_filename(file.filename)
user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id'])
os.makedirs(user_folder, exist_ok=True)
file_path = os.path.join(user_folder, f"{doc_id}_{filename}")
file.save(file_path)
# Initialize status
processing_status[doc_id] = {
"status": "queued",
"progress": 0,
"message": "Queued for processing..."
}
# Start background thread
import threading
thread = threading.Thread(
target=process_document_background,
args=(doc_id, request.current_user['user_id'], file_path, filename, bucket_id)
)
thread.daemon = True
thread.start()
# Return immediately
return jsonify({
"status": "queued",
"doc_id": doc_id,
"filename": filename,
"message": "Upload accepted, processing in background"
}), 202
@app.route('/api/documents/<doc_id>/status', methods=['GET'])
@require_auth
def get_document_status(doc_id):
"""Get processing status of a document"""
status = processing_status.get(doc_id)
if not status:
# Check if it exists in DB (might be completed and cleared from memory)
# For now, just return not found if not in memory or DB check logic here
# Simple version:
return jsonify({"status": "unknown"}), 404
return jsonify(status)
@app.route('/api/documents/<doc_id>/summary', methods=['GET'])
@require_auth
def get_document_summary(doc_id):
"""Get or generate summary for a document"""
doc = chroma_service.get_document(doc_id, request.current_user['user_id'])
if not doc:
return jsonify({"error": "Document not found"}), 404
# Get the full document content from the stored preview
# For a more complete summary, we'd need to re-read the document
content_preview = doc.get('content_preview', '')
# Generate summary
summary_result = rag_service.generate_summary(content_preview, doc['filename'])
return jsonify({
"doc_id": doc_id,
"filename": doc['filename'],
"summary": summary_result.get('summary', f'Document: {doc["filename"]}'),
"success": summary_result.get('success', False)
})
@app.route('/api/documents', methods=['GET'])
@require_auth
def list_documents():
"""List all documents, optionally filtered by bucket"""
bucket_id = request.args.get('bucket_id')
documents = chroma_service.get_user_documents(
request.current_user['user_id'],
bucket_id=bucket_id if bucket_id else None
)
return jsonify({"documents": documents})
@app.route('/api/documents/<doc_id>', methods=['GET'])
@require_auth
def get_document(doc_id):
"""Get document details"""
doc = chroma_service.get_document(doc_id, request.current_user['user_id'])
if doc:
return jsonify(doc)
else:
return jsonify({"error": "Document not found"}), 404
@app.route('/api/documents/<doc_id>/view', methods=['GET'])
@require_auth
def view_document(doc_id):
"""View/download the actual document file"""
doc = chroma_service.get_document(doc_id, request.current_user['user_id'])
if not doc:
return jsonify({"error": "Document not found"}), 404
user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id'])
# Find the file
for f in os.listdir(user_folder):
if f.startswith(doc_id):
file_path = os.path.join(user_folder, f)
return send_file(file_path, as_attachment=False)
return jsonify({"error": "File not found on server"}), 404
@app.route('/api/documents/<doc_id>/bucket', methods=['PUT'])
@require_auth
def update_document_bucket(doc_id):
"""Move document to a different bucket"""
data = request.get_json()
bucket_id = data.get('bucket_id', '') if data else ''
success = chroma_service.update_document_bucket(
doc_id=doc_id,
user_id=request.current_user['user_id'],
bucket_id=bucket_id
)
if success:
return jsonify({"success": True})
else:
return jsonify({"error": "Document not found or access denied"}), 404
@app.route('/api/documents/<doc_id>', methods=['DELETE'])
@require_auth
def delete_document(doc_id):
"""Delete a document"""
success = chroma_service.delete_document(
doc_id=doc_id,
user_id=request.current_user['user_id']
)
if success:
user_folder = os.path.join(Config.UPLOAD_FOLDER, request.current_user['user_id'])
try:
for f in os.listdir(user_folder):
if f.startswith(doc_id):
os.remove(os.path.join(user_folder, f))
break
except:
pass
return jsonify({"success": True})
else:
return jsonify({"error": "Document not found or access denied"}), 404
# ==================== Chat/RAG Routes ====================
@app.route('/api/chat', methods=['POST'])
@require_auth
def chat():
"""Process a chat query using RAG with optional bucket filtering"""
data = request.get_json()
if not data or not data.get('message'):
return jsonify({"error": "No message provided"}), 400
message = data.get('message', '').strip()
doc_ids = data.get('doc_ids')
bucket_id = data.get('bucket_id') # New: filter by bucket
conversation_history = data.get('history', [])
result = rag_service.query(
user_id=request.current_user['user_id'],
query=message,
doc_ids=doc_ids,
bucket_id=bucket_id,
conversation_history=conversation_history
)
if result['success']:
return jsonify({
"response": result['response'],
"model": result.get('model', 'unknown'),
"sources": result.get('sources', []),
"source_files": result.get('source_files', []),
"chunks_used": result.get('chunks_used', 0),
"chunks_filtered": result.get('chunks_filtered', 0)
})
else:
return jsonify({"error": result['error']}), 500
@app.route('/api/chat/stream', methods=['POST'])
@require_auth
def chat_stream():
"""Streaming chat endpoint - sends response chunks as they arrive"""
import json
import time
start_time = time.time()
print(f"[STREAM] Endpoint called")
data = request.get_json()
if not data or not data.get('message'):
return jsonify({"error": "No message provided"}), 400
message = data.get('message', '').strip()
bucket_id = data.get('bucket_id')
chat_id = data.get('chat_id', '') # Get chat_id from request
user_id = request.current_user['user_id']
print(f"[STREAM] Request parsed in {time.time()-start_time:.2f}s")
def generate():
# Immediately yield to start the stream
yield f"data: {json.dumps({'type': 'start'})}\n\n"
sse_chunk_count = 0
for chunk in rag_service.query_stream(
user_id=user_id,
query=message,
bucket_id=bucket_id,
chat_id=chat_id
):
sse_chunk_count += 1
if sse_chunk_count <= 5:
print(f"[SSE] Sending chunk {sse_chunk_count}: type={chunk.get('type', 'unknown')}")
yield f"data: {json.dumps(chunk)}\n\n"
print(f"[SSE] Stream complete, sent {sse_chunk_count} chunks total")
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
}
)
@app.route('/api/chat/clear', methods=['POST'])
@require_auth
def clear_chat_memory():
"""Clear conversation memory for the current user"""
data = request.get_json() or {}
bucket_id = data.get('bucket_id')
success = rag_service.clear_memory(
user_id=request.current_user['user_id'],
bucket_id=bucket_id
)
if success:
return jsonify({"success": True, "message": "Conversation memory cleared"})
else:
return jsonify({"error": "Failed to clear memory"}), 500
@app.route('/api/cleanup/chunks', methods=['POST'])
@require_auth
def cleanup_user_chunks():
"""Clear ALL chunks for the current user - use to fix stale data issues"""
deleted_count = chroma_service.clear_all_user_chunks(
user_id=request.current_user['user_id']
)
return jsonify({
"success": True,
"message": f"Deleted {deleted_count} chunks. Please re-upload your documents."
})
# ==================== Chat History Routes ====================
@app.route('/api/chats', methods=['GET'])
@require_auth
def list_chat_sessions():
"""Get all chat sessions for current user"""
sessions = chroma_service.get_user_chat_sessions(request.current_user['user_id'])
return jsonify({"chats": sessions})
@app.route('/api/chats', methods=['POST'])
@require_auth
def save_chat_session():
"""Save or update a chat session"""
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
chat_id = data.get('id')
topic = data.get('topic', 'Chat')
messages = data.get('messages', [])
bucket_id = data.get('bucket', '')
if not chat_id:
return jsonify({"error": "Chat ID is required"}), 400
result = chroma_service.save_chat_session(
user_id=request.current_user['user_id'],
chat_id=chat_id,
topic=topic,
messages=messages,
bucket_id=bucket_id
)
return jsonify({"success": True, **result})
@app.route('/api/chats/<chat_id>', methods=['DELETE'])
@require_auth
def delete_chat_session(chat_id):
"""Delete a chat session"""
success = chroma_service.delete_chat_session(
user_id=request.current_user['user_id'],
chat_id=chat_id
)
if success:
return jsonify({"success": True})
else:
return jsonify({"error": "Chat not found or access denied"}), 404
# ==================== Health Check ====================
@app.route('/api/health', methods=['GET'])
def health_check():
return jsonify({"status": "healthy", "version": "1.1.0"})
# ==================== Main ====================
if __name__ == '__main__':
print("=" * 50)
print("NotebookLM Clone - AI Document Intelligence")
print("=" * 50)
print(f"Upload folder: {Config.UPLOAD_FOLDER}")
print(f"ChromaDB Cloud: {Config.CHROMA_TENANT}/{Config.CHROMA_DATABASE}")
print("Starting server on http://localhost:5000")
print("=" * 50)
app.run(host='0.0.0.0', port=5000, debug=True)