tutor-api / main.py
rairo's picture
Update main.py
9ea4be7 verified
raw
history blame
57.3 kB
# ---- Main Application File: main.py ----
import io
import uuid
import re
import time
import tempfile
import requests
import json
import os
import logging
import traceback
from datetime import datetime
from pathlib import Path
import urllib.parse
from flask import Flask, request, jsonify, send_file, Response
from flask_cors import CORS
from supabase import create_client, Client
# --- Input Processing & AI Libraries ---
import google.generativeai as genai
from elevenlabs.client import ElevenLabs
from elevenlabs import save as save_elevenlabs_audio
from PyPDF2 import PdfReader
import wikipedia
from youtube_transcript_api import YouTubeTranscriptApi
import arxiv # For ArXiv
from elevenlabs import play, stream, save
import math
# --- Environment Variables ---
# Load environment variables if using a .env file (optional, good practice)
from dotenv import load_dotenv
load_dotenv()
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") # Use service role key for admin-like backend tasks
SUPABASE_ANON_KEY = os.getenv("SUPABASE_ANON_KEY") # Use anon key for client-side actions if needed, but prefer service key for backend logic
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")
# --- Initialize Flask app and CORS ---
app = Flask(__name__)
CORS(app) # Allow all origins for simplicity in development
# --- Initialize Supabase Client ---
try:
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY:
raise ValueError("Supabase URL and Service Key must be set in environment variables.")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
print("Supabase client initialized successfully.")
# Example table check (optional)
# response = supabase.table('users').select("id", count='exact').limit(0).execute()
# print("Checked 'users' table connection.")
except Exception as e:
print(f"Error initializing Supabase client: {e}")
# Depending on your setup, you might want to exit or handle this differently
supabase = None # Indicate client is not available
# --- Initialize Gemini API ---
try:
if not GEMINI_API_KEY:
raise ValueError("Gemini API Key must be set in environment variables.")
genai.configure(api_key=GEMINI_API_KEY)
# Use a generally available model, adjust if you have access to specific previews
gemini_model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
print("Gemini API initialized successfully.")
except Exception as e:
print(f"Error initializing Gemini API: {e}")
gemini_model = None
# --- Initialize ElevenLabs Client ---
try:
if not ELEVENLABS_API_KEY:
raise ValueError("ElevenLabs API Key must be set in environment variables.")
elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY)
print("ElevenLabs client initialized successfully.")
# Optional: Check available voices
# voices = elevenlabs_client.voices.get_all()
# print(f"Available ElevenLabs voices: {[v.name for v in voices.voices]}")
except Exception as e:
print(f"Error initializing ElevenLabs client: {e}")
elevenlabs_client = None
# --- Logging ---
LOG_FILE_PATH = "/tmp/ai_tutor.log" # Adjust path as needed
logging.basicConfig(filename=LOG_FILE_PATH, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# === Helper Functions ===
def verify_token(auth_header):
"""Verifies Supabase JWT token from Authorization header."""
if not supabase:
raise ConnectionError("Supabase client not initialized.")
if not auth_header or not auth_header.startswith('Bearer '):
return None, {'error': 'Missing or invalid Authorization header', 'status': 401}
token = auth_header.split(' ')[1]
try:
# Verify token and get user data
response = supabase.auth.get_user(token)
user = response.user
if not user:
return None, {'error': 'Invalid or expired token', 'status': 401}
# Optionally fetch profile data if needed immediately
# profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute()
return user, None
except Exception as e:
logging.error(f"Token verification error: {e}")
# Differentiate between specific Supabase errors if needed
return None, {'error': f'Token verification failed: {e}', 'status': 401}
def verify_admin(user):
"""Checks if the verified user is an admin."""
if not supabase:
raise ConnectionError("Supabase client not initialized.")
if not user:
return False, {'error': 'User not provided for admin check', 'status': 400}
try:
# Check the 'is_admin' flag in the 'profiles' table
profile_res = supabase.table('profiles').select('is_admin').eq('id', user.id).maybe_single().execute()
profile_data = profile_res.data
if profile_data and profile_data.get('is_admin'):
return True, None
else:
return False, {'error': 'Admin access required', 'status': 403} # 403 Forbidden
except Exception as e:
logging.error(f"Admin check failed for user {user.id}: {e}")
return False, {'error': f'Error checking admin status: {e}', 'status': 500}
def upload_to_supabase_storage(bucket_name: str, file_path: str, destination_path: str, content_type: str):
"""Uploads a local file to Supabase Storage."""
if not supabase:
raise ConnectionError("Supabase client not initialized.")
try:
with open(file_path, 'rb') as f:
# Use upsert=True to overwrite if file exists, adjust if needed
supabase.storage.from_(bucket_name).upload(
path=destination_path,
file=f,
file_options={"content-type": content_type, "cache-control": "3600", "upsert": "true"}
)
# Get the public URL (ensure bucket has public access enabled or use signed URLs)
res = supabase.storage.from_(bucket_name).get_public_url(destination_path)
return res
except Exception as e:
logging.error(f"Supabase Storage upload failed: {e}")
raise # Re-raise the exception to be handled by the caller
# === Input Content Extraction Helpers ===
def get_pdf_text(pdf_file_storage):
"""Extract text from a PDF file stream."""
text = ""
try:
pdf_reader = PdfReader(pdf_file_storage)
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
# Simple truncation (consider smarter chunking for very large PDFs)
MAX_CHARS = 300000 # Adjust as needed based on Gemini context limits
return text[:MAX_CHARS]
except Exception as e:
logging.error(f"Error reading PDF: {e}")
raise ValueError(f"Could not process PDF file: {e}")
def get_youtube_transcript(url):
"""Get transcript text from a YouTube URL."""
try:
if "v=" in url:
video_id = url.split("v=")[1].split("&")[0]
elif "youtu.be/" in url:
video_id = url.split("youtu.be/")[1].split("?")[0]
else:
raise ValueError("Invalid YouTube URL format.")
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = " ".join([item['text'] for item in transcript_list])
MAX_CHARS = 300000 # Adjust as needed
return transcript_text[:MAX_CHARS]
except Exception as e:
logging.error(f"Error getting YouTube transcript for {url}: {e}")
raise ValueError(f"Could not get transcript: {e}")
def get_wiki_content(url):
"""Get summary content from a Wikipedia URL."""
try:
# Extract title from URL (simple approach)
page_title = urllib.parse.unquote(url.rstrip("/").split("/")[-1]).replace("_", " ")
wikipedia.set_lang("en") # Or configure based on user preference
page = wikipedia.page(page_title, auto_suggest=False) # Be specific
content = page.content # Get full content, might be large
# Alternatively, use summary: content = page.summary
MAX_CHARS = 300000 # Adjust as needed
return content[:MAX_CHARS]
except wikipedia.exceptions.PageError:
raise ValueError(f"Wikipedia page '{page_title}' not found.")
except wikipedia.exceptions.DisambiguationError as e:
raise ValueError(f"'{page_title}' refers to multiple pages: {e.options}")
except Exception as e:
logging.error(f"Error getting Wikipedia content for {url}: {e}")
raise ValueError(f"Could not get Wikipedia content: {e}")
def fetch_bible_text(reference):
"""Fetch Bible text from an external API (example using bible-api.com)."""
# This API is simple but might have limitations. Consider alternatives if needed.
try:
# URL encode the reference
query = urllib.parse.quote(reference)
api_url = f"https://bible-api.com/{query}?translation=kjv" # King James Version, change if needed
response = requests.get(api_url, timeout=15) # Add timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
data = response.json()
# Check if 'text' exists, handle potential variations in API response
if 'text' in data:
text = data['text'].strip()
MAX_CHARS = 300000
return text[:MAX_CHARS]
elif 'error' in data:
raise ValueError(f"Bible API error: {data['error']}")
else:
# Attempt to extract verses if the structure is different
if 'verses' in data and isinstance(data['verses'], list):
text = " ".join([v.get('text', '').strip() for v in data['verses']])
MAX_CHARS = 300000
return text[:MAX_CHARS] if text else ValueError("Bible reference not found or empty.")
else:
raise ValueError("Bible API response format not recognized.")
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching Bible text for '{reference}': {e}")
raise ConnectionError(f"Could not connect to Bible API: {e}")
except Exception as e:
logging.error(f"Error processing Bible reference '{reference}': {e}")
raise ValueError(f"Could not process Bible reference: {e}")
def get_arxiv_content(arxiv_id):
"""Fetch abstract or PDF text from ArXiv."""
try:
# Clean up potential URL prefixes
if 'arxiv.org/abs/' in arxiv_id:
arxiv_id = arxiv_id.split('/abs/')[-1]
if 'arxiv.org/pdf/' in arxiv_id:
arxiv_id = arxiv_id.split('/pdf/')[-1].replace('.pdf', '')
search = arxiv.Search(id_list=[arxiv_id])
paper = next(search.results()) # Get the first (and only) result
# Prioritize abstract, as full PDF processing is heavy
content = f"Title: {paper.title}\n\nAbstract: {paper.summary}"
MAX_CHARS = 300000 # Adjust as needed
return content[:MAX_CHARS], paper.title # Return content and title
except StopIteration:
raise ValueError(f"ArXiv paper with ID '{arxiv_id}' not found.")
except Exception as e:
logging.error(f"Error fetching ArXiv content for {arxiv_id}: {e}")
raise ValueError(f"Could not get ArXiv content: {e}")
# === Gemini Interaction Helpers ===
def generate_notes_with_gemini(text_content, title=None):
"""Generates study notes using Gemini."""
if not gemini_model:
raise ConnectionError("Gemini client not initialized.")
try:
prompt = f"""
Act as an expert educator and study assistant. Based on the following text {'titled "' + title + '" ' if title else ''} , generate comprehensive and well-structured study notes.
**Instructions:**
1. **Identify Key Concepts:** Extract the main topics, definitions, key figures, dates, arguments, and important takeaways.
2. **Structure Logically:** Organize the notes with clear headings (using Markdown ##) and bullet points (* or -) for readability. Use sub-bullets if necessary.
3. **Be Concise but Thorough:** Summarize the information accurately without unnecessary jargon. Ensure all critical points are covered.
4. **Highlight Importance:** You can use bold text (**bold**) for very important terms or concepts.
5. **Focus:** Generate only the notes based on the provided text. Do not add introductions like "Here are the notes..." or conclusions like "These notes cover...".
**Source Text:**
---
{text_content}
---
**Generated Study Notes:**
"""
response = gemini_model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logging.error(f"Gemini note generation failed: {e}")
raise RuntimeError(f"AI failed to generate notes: {e}")
def generate_quiz_with_gemini(notes_content, difficulty, num_questions=5):
"""Generates multiple-choice quiz using Gemini."""
if not gemini_model:
raise ConnectionError("Gemini client not initialized.")
difficulty_map = {
"easy": "basic recall and understanding",
"medium": "application and interpretation",
"hard": "analysis, synthesis, and evaluation"
}
difficulty_desc = difficulty_map.get(difficulty.lower(), "medium difficulty")
try:
prompt = f"""
Act as an expert quiz creator. Based on the following study notes, create a multiple-choice quiz.
**Instructions:**
1. **Number of Questions:** Generate exactly {num_questions} questions.
2. **Difficulty Level:** The questions should be of {difficulty_desc} ({difficulty}).
3. **Format:** Each question must have exactly four options (A, B, C, D).
4. **Clarity:** Questions and options should be clear and unambiguous.
5. **Single Correct Answer:** Ensure only one option is the correct answer.
6. **JSON Output:** Format the entire output STRICTLY as a JSON list of objects. Each object must have the following keys: "question" (string), "options" (an object with keys "A", "B", "C", "D", all strings), and "correct_answer" (string, either "A", "B", "C", or "D").
7. **Focus:** Generate only the JSON output. Do not include any introductory text, explanations, or markdown formatting outside the JSON structure.
**Study Notes:**
---
{notes_content}
---
**Quiz JSON Output:**
```json
[
{{
"question": "...",
"options": {{
"A": "...",
"B": "...",
"C": "...",
"D": "..."
}},
"correct_answer": "..."
}}
// ... more question objects
]
```
"""
response = gemini_model.generate_content(prompt)
# Clean potential markdown code block fences
cleaned_response = response.text.strip().lstrip('```json').rstrip('```').strip()
# Validate and parse JSON
quiz_data = json.loads(cleaned_response)
# Add basic validation (e.g., check if it's a list, check keys in first item)
if not isinstance(quiz_data, list):
raise ValueError("AI response is not a list.")
if quiz_data and not all(k in quiz_data[0] for k in ["question", "options", "correct_answer"]):
raise ValueError("AI response list items have missing keys.")
return quiz_data
except json.JSONDecodeError as e:
logging.error(f"Gemini quiz generation returned invalid JSON: {cleaned_response[:500]}... Error: {e}")
raise RuntimeError(f"AI failed to generate a valid quiz format. Please try again.")
except Exception as e:
logging.error(f"Gemini quiz generation failed: {e}")
raise RuntimeError(f"AI failed to generate quiz: {e}")
# === ElevenLabs TTS Helper ===
def generate_tts_audio(text_to_speak, voice_id="Rachel"): # Example voice, choose one available
"""Generates TTS audio using ElevenLabs and returns audio bytes."""
if not elevenlabs_client:
raise ConnectionError("ElevenLabs client not initialized.")
try:
# Stream the audio generation
audio_stream = elevenlabs_client.generate(
text=text_to_speak,
voice=voice_id, # You can customize this
model="eleven_multilingual_v2", # Or another suitable model
stream=True
)
# Collect audio bytes from the stream
audio_bytes = b""
for chunk in audio_stream:
audio_bytes += chunk
if not audio_bytes:
raise ValueError("ElevenLabs generated empty audio.")
return audio_bytes
except Exception as e:
logging.error(f"ElevenLabs TTS generation failed: {e}")
raise RuntimeError(f"Failed to generate audio: {e}")
# === Authentication Endpoints ===
@app.route('/api/auth/signup', methods=['POST'])
def signup():
if not supabase: return jsonify({'error': 'Service unavailable'}), 503
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password:
return jsonify({'error': 'Email and password are required'}), 400
res = supabase.auth.sign_up({"email": email, "password": password})
# Ensure profile is created with 20 credits
supabase.table('profiles').upsert({
'id': res.user.id,
'email': email,
'credits': 20
}).execute()
return jsonify({
'success': True,
'message': 'Signup successful. Please check your email for verification.',
'user_id': res.user.id if res.user else None
}), 201
except Exception as e:
error_message = str(e)
status_code = 400
if "User already registered" in error_message:
error_message = "Email already exists."
status_code = 409
logging.error(f"Signup error: {error_message}")
return jsonify({'error': error_message}), status_code
@app.route('/api/auth/signin', methods=['POST'])
def signin():
if not supabase: return jsonify({'error': 'Service unavailable'}), 503
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password:
return jsonify({'error': 'Email and password are required'}), 400
# Sign in user using Supabase Auth
res = supabase.auth.sign_in_with_password({"email": email, "password": password})
# Fetch associated profile data
profile_res = supabase.table('profiles').select('*').eq('id', res.user.id).maybe_single().execute()
return jsonify({
'success': True,
'access_token': res.session.access_token,
'refresh_token': res.session.refresh_token,
'user': {
'id': res.user.id,
'email': res.user.email,
'profile': profile_res.data # Include profile details
}
}), 200
except Exception as e:
# Handle specific errors like invalid credentials
error_message = str(e)
status_code = 401 # Unauthorized
if "Invalid login credentials" in error_message:
error_message = "Invalid email or password."
elif "Email not confirmed" in error_message:
error_message = "Please verify your email address before signing in."
status_code = 403 # Forbidden
logging.error(f"Signin error: {error_message}")
return jsonify({'error': error_message}), status_code
@app.route('/api/auth/google-signin', methods=['POST'])
def google_signin():
# Assuming frontend handles OAuth and sends Supabase session token:
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
try:
# User is verified via the token. Fetch their profile.
profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute()
if not profile_res.data:
# This case *shouldn't* happen if the trigger works, but handle defensively
logging.warning(f"Google Sign-In: Profile not found for verified user {user.id}, attempting to create.")
# Attempt to create profile (might fail if email exists from password signup)
insert_res = supabase.table('profiles').insert({
'id': user.id,
'email': user.email,
# Set default credits/roles if needed
}).execute()
profile_data = insert_res.data[0] if insert_res.data else None
if not profile_data:
raise Exception("Failed to create profile entry after Google Sign-In.")
else:
profile_data = profile_res.data
# Return user info (don't need to send tokens back usually, frontend manages session)
return jsonify({
'success': True,
'message': 'Google sign-in verified successfully.',
'user': {
'id': user.id,
'email': user.email,
'profile': profile_data
}
}), 200
except Exception as e:
logging.error(f"Google sign-in profile fetch/creation error: {e}")
return jsonify({'error': f'An error occurred during sign-in: {e}'}), 500
# === User Profile Endpoint ===
@app.route('/api/user/profile', methods=['GET'])
def get_user_profile():
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
try:
# Fetch user's profile data from the 'profiles' table
profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute()
if not profile_res.data:
# This indicates a potential issue (user exists in auth but not profiles)
logging.error(f"Profile not found for authenticated user: {user.id} / {user.email}")
return jsonify({'error': 'User profile not found.'}), 404
# Combine auth info (like email) with profile info
profile_data = profile_res.data
full_user_data = {
'id': user.id,
'email': user.email, # Email from auth is usually the source of truth
'credits': profile_data.get('credits'),
'is_admin': profile_data.get('is_admin'),
'created_at': profile_data.get('created_at'),
'suspended': profile_data.get('suspended')
# Add any other fields from 'profiles' table
}
return jsonify(full_user_data), 200
except Exception as e:
logging.error(f"Error fetching user profile for {user.id}: {e}")
return jsonify({'error': f'Failed to fetch profile: {e}'}), 500
# === AI Tutor Core Endpoints ===
@app.route('/api/tutor/process_input', methods=['POST'])
def process_input_and_generate_notes():
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503
profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute()
if profile_res.data['suspended']:
return jsonify({'error': 'Account suspended'}), 403
if profile_res.data['credits'] < 2:
return jsonify({'error': 'Insufficient credits (Need 2)'}), 402
try:
input_type = request.form.get('input_type')
source_ref = request.form.get('source_ref') # URL, Bible ref, ArXiv ID, etc.
uploaded_file = request.files.get('file') # For PDF
if not input_type:
return jsonify({'error': 'input_type (e.g., pdf, youtube, wiki, bible, arxiv, text) is required'}), 400
content = None
title = None # Optional title
if input_type == 'pdf':
if not uploaded_file: return jsonify({'error': 'File is required for input_type pdf'}), 400
if not uploaded_file.filename.lower().endswith('.pdf'): return jsonify({'error': 'Only PDF files are allowed'}), 400
content = get_pdf_text(uploaded_file.stream)
source_ref = uploaded_file.filename # Use filename as reference
title = uploaded_file.filename
elif input_type == 'youtube':
if not source_ref: return jsonify({'error': 'source_ref (YouTube URL) is required'}), 400
content = get_youtube_transcript(source_ref)
# You could fetch the video title using youtube API/libraries if needed
elif input_type == 'wiki':
if not source_ref: return jsonify({'error': 'source_ref (Wikipedia URL) is required'}), 400
content = get_wiki_content(source_ref)
title = urllib.parse.unquote(source_ref.rstrip("/").split("/")[-1]).replace("_", " ")
elif input_type == 'bible':
if not source_ref: return jsonify({'error': 'source_ref (Bible reference) is required'}), 400
content = fetch_bible_text(source_ref)
title = source_ref
elif input_type == 'arxiv':
if not source_ref: return jsonify({'error': 'source_ref (ArXiv ID or URL) is required'}), 400
content, title = get_arxiv_content(source_ref) # Gets title too
elif input_type == 'text':
content = request.form.get('text_content')
if not content: return jsonify({'error': 'text_content is required for input_type text'}), 400
source_ref = content[:100] + "..." # Use beginning of text as ref
title = "Custom Text"
else:
return jsonify({'error': f'Unsupported input_type: {input_type}'}), 400
if not content:
return jsonify({'error': 'Failed to extract content from the source.'}), 500
# --- Generate Notes ---
start_time = time.time()
logging.info(f"Generating notes for user {user.id}, type: {input_type}, ref: {source_ref[:50]}")
generated_notes = generate_notes_with_gemini(content, title=title)
logging.info(f"Notes generation took {time.time() - start_time:.2f}s")
# --- Save to Database ---
# 1. Save Study Material
material_res = supabase.table('study_materials').insert({
'user_id': user.id,
'type': input_type,
'source_ref': source_ref,
'source_content': content if len(content) < 10000 else content[:10000] + "... (truncated)", # Optionally save truncated content
'title': title
}).execute()
if not material_res.data: raise Exception(f"Failed to save study material: {material_res.error}")
material_id = material_res.data[0]['id']
# 2. Save Notes linked to Material
notes_res = supabase.table('notes').insert({
'material_id': material_id,
'user_id': user.id,
'content': generated_notes
}).execute()
if not notes_res.data: raise Exception(f"Failed to save generated notes: {notes_res.error}")
notes_id = notes_res.data[0]['id']
# --- Deduct Credits (Example) ---
new_credits = profile_res.data['credits'] - 2
supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute()
return jsonify({
'success': True,
'message': 'Content processed and notes generated successfully.',
'material_id': material_id,
'notes_id': notes_id,
'notes': generated_notes # Return notes directly for immediate use
}), 201
except ValueError as e: # Input validation errors
logging.warning(f"Input processing error for user {user.id}: {e}")
return jsonify({'error': str(e)}), 400
except ConnectionError as e: # Service unavailable (Supabase, Gemini, etc.)
logging.error(f"Connection error during processing: {e}")
return jsonify({'error': f'A backend service is unavailable: {e}'}), 503
except RuntimeError as e: # AI generation errors
logging.error(f"RuntimeError during processing for user {user.id}: {e}")
return jsonify({'error': str(e)}), 500
except Exception as e:
logging.error(f"Unexpected error processing input for user {user.id}: {traceback.format_exc()}")
return jsonify({'error': f'An unexpected error occurred: {e}'}), 500
@app.route('/api/view/notes/<uuid:note_id>', methods=['GET'])
def get_note_by_id(note_id):
try:
# --- Authentication ---
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
# --- Query Database ---
note_res = supabase.table('notes') \
.select('id, content, created_at, tts_audio_url, study_materials(title, type, source_ref)') \
.eq('id', note_id) \
.eq('user_id', user.id) \
.maybe_single() \
.execute()
if not note_res.data:
return jsonify({'error': 'Note not found or unauthorized'}), 404
# --- Format Response ---
note_data = note_res.data
response_data = {
"note": {
"note_id": note_data['id'],
"content": note_data['content'],
"audio_url": note_data['tts_audio_url'],
"created_at": note_data['created_at'],
"material": {
"title": note_data['study_materials']['title'] if note_data['study_materials'] else "Untitled",
"type": note_data['study_materials']['type'] if note_data['study_materials'] else None,
"source_ref": note_data['study_materials']['source_ref'] if note_data['study_materials'] else None
}
}
}
return jsonify(response_data)
except Exception as e:
logging.error(f"Error fetching note {note_id}: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/tutor/notes/<uuid:notes_id>/generate_quiz', methods=['POST'])
def generate_quiz_for_notes(notes_id):
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503
profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute()
if profile_res.data['suspended']:
return jsonify({'error': 'Account suspended'}), 403
if profile_res.data['credits'] < 2:
return jsonify({'error': 'Insufficient credits (Need 2)'}), 402
try:
data = request.get_json()
difficulty = data.get('difficulty', 'medium').lower()
num_questions = int(data.get('num_questions', 5))
if difficulty not in ['easy', 'medium', 'hard']:
return jsonify({'error': 'difficulty must be easy, medium, or hard'}), 400
if not 1 <= num_questions <= 10:
return jsonify({'error': 'num_questions must be between 1 and 10'}), 400
# --- Fetch Notes Content ---
notes_res = supabase.table('notes').select('content, user_id').eq('id', notes_id).maybe_single().execute()
if not notes_res.data:
return jsonify({'error': 'Notes not found'}), 404
# Ensure user owns the notes
if notes_res.data['user_id'] != user.id:
return jsonify({'error': 'You do not have permission to access these notes'}), 403
notes_content = notes_res.data['content']
# --- Generate Quiz ---
start_time = time.time()
logging.info(f"Generating {difficulty} quiz ({num_questions}q) for user {user.id}, notes: {notes_id}")
quiz_questions = generate_quiz_with_gemini(notes_content, difficulty, num_questions)
logging.info(f"Quiz generation took {time.time() - start_time:.2f}s")
# --- Save Quiz to Database ---
quiz_res = supabase.table('quizzes').insert({
'notes_id': str(notes_id),
'user_id': user.id,
'difficulty': difficulty,
'questions': json.dumps(quiz_questions) # Store questions as JSONB
}).execute()
if not quiz_res.data: raise Exception(f"Failed to save generated quiz: {quiz_res.error}")
quiz_id = quiz_res.data[0]['id']
new_credits = profile_res.data['credits'] - 2
supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute()
return jsonify({
'success': True,
'quiz_id': quiz_id,
'difficulty': difficulty,
'questions': quiz_questions # Return quiz data for immediate use
}), 201
except ValueError as e:
return jsonify({'error': str(e)}), 400
except ConnectionError as e:
logging.error(f"Connection error during quiz generation: {e}")
return jsonify({'error': f'A backend service is unavailable: {e}'}), 503
except RuntimeError as e: # AI generation errors
logging.error(f"RuntimeError during quiz generation for user {user.id}: {e}")
return jsonify({'error': str(e)}), 500
except Exception as e:
logging.error(f"Unexpected error generating quiz for user {user.id}, notes {notes_id}: {traceback.format_exc()}")
return jsonify({'error': f'An unexpected error occurred: {e}'}), 500
@app.route('/api/view/quizzes/<uuid:quiz_id>', methods=['GET'])
def get_quiz_by_id(quiz_id):
try:
# --- Authentication ---
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
# --- Query Database ---
quiz_res = supabase.table('quizzes') \
.select('''id, difficulty, created_at, questions,
notes(id, content, study_materials(title, type))''') \
.eq('id', quiz_id) \
.eq('user_id', user.id) \
.maybe_single() \
.execute()
if not quiz_res.data:
return jsonify({'error': 'Quiz not found or unauthorized'}), 404
# --- Format Response ---
quiz_data = quiz_res.data
response_data = {
"quiz": {
"quiz_id": quiz_data['id'],
"difficulty": quiz_data['difficulty'],
"created_at": quiz_data['created_at'],
"questions": quiz_data['questions'],
"source_note": {
"note_id": quiz_data['notes']['id'],
"content_preview": quiz_data['notes']['content'][:100] + "..." if quiz_data['notes']['content'] else None,
"material": {
"title": quiz_data['notes']['study_materials']['title'] if quiz_data['notes']['study_materials'] else None,
"type": quiz_data['notes']['study_materials']['type'] if quiz_data['notes']['study_materials'] else None
}
}
}
}
return jsonify(response_data)
except Exception as e:
logging.error(f"Error fetching quiz {quiz_id}: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/tutor/quizzes/<uuid:quiz_id>/submit', methods=['POST'])
def submit_quiz_attempt(quiz_id):
"""Submits user answers for a quiz and calculates the score."""
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
if not supabase: return jsonify({'error': 'Backend service unavailable'}), 503
try:
data = request.get_json()
user_answers = data.get('answers') # Expected format: { "questionId": "A", ... }
if not isinstance(user_answers, dict):
return jsonify({'error': 'answers must be provided as a JSON object'}), 400
# Fetch Quiz Data
quiz_res = supabase.table('quizzes')\
.select('questions, user_id')\
.eq('id', quiz_id)\
.maybe_single()\
.execute()
if not quiz_res.data:
return jsonify({'error': 'Quiz not found'}), 404
quiz_questions = quiz_res.data['questions']
if isinstance(quiz_questions, str):
quiz_questions = json.loads(quiz_questions)
# Calculate Score
correct_count = 0
total_questions = len(quiz_questions)
feedback = {}
correct_answers = {}
for i, question in enumerate(quiz_questions):
question_id = question.get('id', str(i)) # Use question.id if exists, otherwise index
user_answer = user_answers.get(str(question_id)) # Match by question_id
correct_answer = question.get('correct_answer')
if user_answer and correct_answer:
is_correct = user_answer.upper() == correct_answer.upper()
if is_correct:
correct_count += 1
correct_answers[str(question_id)] = correct_answer
feedback[str(question_id)] = {
"correct": is_correct,
"correct_answer": correct_answer,
"user_answer": user_answer
}
score = (correct_count / total_questions) * 100 if total_questions > 0 else 0.0
# Save Quiz Attempt
attempt_res = supabase.table('quiz_attempts').insert({
'quiz_id': str(quiz_id),
'user_id': user.id,
'score': score,
'answers': json.dumps(user_answers)
}).execute()
return jsonify({
'success': True,
'attempt_id': attempt_res.data[0]['id'],
'score': round(score, 2),
'correct_count': correct_count,
'total_questions': total_questions,
'correct_answers': correct_answers, # Send back correct answers
'feedback': feedback
}), 201
except Exception as e:
logging.error(f"Error submitting quiz: {traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
# Modified speak_notes endpoint with ElevenLabs Studio API and chunking
@app.route('/api/tutor/notes/<uuid:notes_id>/speak', methods=['POST'])
def speak_notes(notes_id):
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
if not supabase or not elevenlabs_client:
return jsonify({'error': 'Backend service unavailable'}), 503
try:
# Verify note ownership first
note_res = supabase.table('notes') \
.select('user_id, content, tts_audio_url') \
.eq('id', notes_id) \
.eq('user_id', user.id) \
.maybe_single() \
.execute()
if not note_res.data:
return jsonify({'error': 'Note not found or unauthorized'}), 404
# Check user status and credits
profile_res = supabase.table('profiles') \
.select('credits, suspended') \
.eq('id', user.id) \
.single().execute()
if profile_res.data['suspended']:
return jsonify({'error': 'Account suspended'}), 403
if profile_res.data['credits'] < 5:
return jsonify({'error': 'Insufficient credits (Need 5)'}), 402
# Return existing audio if available
if note_res.data.get('tts_audio_url'):
return jsonify({
'success': True,
'audio_url': note_res.data['tts_audio_url'],
'message': 'Using existing audio file'
})
notes_content = note_res.data['content']
if not notes_content:
return jsonify({'error': 'Notes content is empty'}), 400
# --- Generate TTS Audio with chunking ---
CHUNK_SIZE = 2000
chunks = [notes_content[i:i+CHUNK_SIZE] for i in range(0, len(notes_content), CHUNK_SIZE)]
audio_bytes = b""
for chunk in chunks:
try:
chunk_audio = elevenlabs_client.generate(
text=chunk,
voice="Rachel",
model="eleven_multilingual_v2",
stream=True
)
audio_bytes += b"".join(chunk_audio)
except Exception as e:
logging.error(f"Error generating chunk: {str(e)}")
raise RuntimeError(f"Audio generation failed: {str(e)}")
if not audio_bytes:
raise RuntimeError("Generated empty audio file")
# --- Save to Supabase Storage with RLS compliance ---
bucket_name = 'notes-audio'
file_path = f'{user.id}/{notes_id}.mp3'
try:
# Upload with owner metadata for RLS
upload_res = supabase.storage.from_(bucket_name).upload(
path=file_path,
file=audio_bytes,
file_options={
'content-type': 'audio/mpeg',
'cache-control': '3600',
'upsert': 'true',
'owner': user.id # Essential for RLS policies
}
)
if upload_res.error:
raise ConnectionError(upload_res.error.message)
# Get public URL
audio_url = supabase.storage.from_(bucket_name).get_public_url(file_path)
# Update notes table with verification
update_res = supabase.table('notes') \
.update({'tts_audio_url': audio_url}) \
.eq('id', notes_id) \
.eq('user_id', user.id) \ # Double-check ownership
.execute()
if update_res.error:
raise ConnectionError(update_res.error.message)
# Deduct credits with verification
new_credits = profile_res.data['credits'] - 5
credit_res = supabase.table('profiles') \
.update({'credits': new_credits}) \
.eq('id', user.id) \
.execute()
if credit_res.error:
raise ConnectionError(credit_res.error.message)
return jsonify({
'success': True,
'audio_url': audio_url,
'remaining_credits': new_credits
})
except Exception as upload_error:
# Clean up failed upload
supabase.storage.from_(bucket_name).remove([file_path])
raise upload_error
except Exception as e:
logging.error(f"Speak error: {traceback.format_exc()}")
return jsonify({'error': str(e)}), 500
# New endpoint to view existing audio URL
@app.route('/api/tutor/notes/<uuid:notes_id>/audio', methods=['GET'])
def get_note_audio(notes_id):
user, error = verify_token(request.headers.get('Authorization'))
if error:
return jsonify({'error': error['error']}), error['status']
try:
notes_res = supabase.table('notes').select('tts_audio_url, user_id').eq('id', notes_id).single().execute()
if not notes_res.data:
return jsonify({'error': 'Notes not found'}), 404
if notes_res.data['user_id'] != user.id:
return jsonify({'error': 'Unauthorized access'}), 403
if not notes_res.data['tts_audio_url']:
return jsonify({'error': 'No audio available for these notes'}), 404
return jsonify({
'success': True,
'audio_url': notes_res.data['tts_audio_url']
})
except Exception as e:
logging.error(f"Error getting audio URL: {str(e)}")
return jsonify({'error': str(e)}), 500
# ---------- View Notes and Quizzes Endpoints ----------
@app.route('/api/view/notes', methods=['GET'])
def view_notes():
try:
# Authentication
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
# Query with proper error handling
query = supabase.table('notes') \
.select('id, content, created_at, tts_audio_url, study_materials(title, type)') \
.eq('user_id', user.id) \
.order('created_at', desc=True)
result = query.execute()
if hasattr(result, 'error') and result.error:
raise Exception(result.error.message)
# Format response to match frontend expectations
notes = []
for note in result.data:
notes.append({
"note_id": note['id'],
"content": note['content'],
"audio_url": note['tts_audio_url'],
"created_at": note['created_at'],
"material_title": note['study_materials']['title'] if note['study_materials'] else "Untitled Note",
"material_type": note['study_materials']['type'] if note['study_materials'] else None
})
return jsonify({"notes": notes}) # Changed from "notes" to match frontend
except Exception as e:
print(f"Error in /api/view/notes: {str(e)}") # Debug logging
logging.error(f"Notes endpoint error: {str(e)}")
logging.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/api/view/quizzes', methods=['GET'])
def view_quizzes():
try:
# Authentication
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
# Query with proper error handling
query = supabase.table('quizzes') \
.select('id, difficulty, created_at, notes(content, study_materials(title, type))') \
.eq('user_id', user.id) \
.order('created_at', desc=True)
result = query.execute()
if hasattr(result, 'error') and result.error:
raise Exception(result.error.message)
# Format response to match frontend expectations
quizzes = []
for quiz in result.data:
quizzes.append({
"quiz_id": quiz['id'],
"difficulty": quiz['difficulty'],
"created_at": quiz['created_at'],
"notes_preview": quiz['notes']['content'][:100] + "..." if quiz['notes'] and quiz['notes']['content'] else None,
"material_title": quiz['notes']['study_materials']['title'] if quiz['notes'] and quiz['notes']['study_materials'] else "Untitled Quiz",
"material_type": quiz['notes']['study_materials']['type'] if quiz['notes'] and quiz['notes']['study_materials'] else None
})
return jsonify({"quizzes": quizzes})
except Exception as e:
print(f"Error in /api/view/quizzes: {str(e)}") # Debug logging
logging.error(f"Quizzes endpoint error: {str(e)}")
logging.error(traceback.format_exc())
return jsonify({'error': str(e)}), 500
@app.route('/api/user/performance', methods=['GET'])
def get_user_performance():
"""Retrieves user's quiz performance and provides simple suggestions."""
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
if not supabase: return jsonify({'error': 'Backend service unavailable'}), 503
try:
# --- Fetch recent quiz attempts ---
attempts_res = supabase.table('quiz_attempts')\
.select('id, quiz_id, score, submitted_at, quizzes(difficulty, notes(material_id, study_materials(type, title))))')\
.eq('user_id', user.id)\
.order('submitted_at', desc=True)\
.limit(20)\
.execute() # Join to get context
if attempts_res.error:
raise Exception(f"Failed to fetch quiz attempts: {attempts_res.error}")
attempts_data = attempts_res.data
# --- Basic Analysis ---
average_score = 0.0
suggestions = []
if attempts_data:
total_score = sum(a['score'] for a in attempts_data)
average_score = total_score / len(attempts_data)
# Simple Suggestion Logic
if average_score < 60:
suggestions.append("Your average score is a bit low. Try reviewing the notes more thoroughly before taking quizzes.")
# Suggest reviewing specific materials from recent low scores
low_score_attempts = sorted([a for a in attempts_data if a['score'] < 60], key=lambda x: x['submitted_at'], reverse=True)
if low_score_attempts:
# Safely access nested data
quiz_info = low_score_attempts[0].get('quizzes')
if quiz_info:
notes_info = quiz_info.get('notes')
if notes_info:
material_info = notes_info.get('study_materials')
if material_info and material_info.get('title'):
suggestions.append(f"Focus on reviewing the material titled: '{material_info['title']}'.")
elif average_score > 85:
suggestions.append("Great job on your recent quizzes! Consider trying 'hard' difficulty quizzes or exploring new topics.")
else:
suggestions.append("You're making good progress! Keep practicing to solidify your understanding.")
# Add more sophisticated analysis here (e.g., performance by topic/difficulty)
return jsonify({
'success': True,
'average_score': round(average_score, 2) if attempts_data else None,
'recent_attempts': attempts_data, # Return structured attempt data
'suggestions': suggestions
})
except Exception as e:
logging.error(f"Unexpected error fetching performance for user {user.id}: {traceback.format_exc()}")
return jsonify({'error': f'An unexpected error occurred: {e}'}), 500
# === Admin Endpoints (Adapted for Supabase) ===
@app.route('/api/admin/users', methods=['GET'])
def admin_list_users():
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
is_admin, admin_error = verify_admin(user)
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status']
try:
# Fetch all profiles (which are linked 1-1 with auth users)
profiles_res = supabase.table('profiles').select('*').execute()
return jsonify({'users': profiles_res.data}), 200
except Exception as e:
logging.error(f"Admin list users error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/users/<uuid:target_user_id>/suspend', methods=['PUT'])
def admin_suspend_user(target_user_id):
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
is_admin, admin_error = verify_admin(user)
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status']
try:
data = request.get_json()
action = data.get('action') # "suspend" or "unsuspend"
if action not in ["suspend", "unsuspend"]:
return jsonify({'error': 'action must be "suspend" or "unsuspend"'}), 400
should_suspend = (action == "suspend")
# Update the 'suspended' flag in the profiles table
update_res = supabase.table('profiles').update({'suspended': should_suspend}).eq('id', target_user_id).execute()
if not update_res.data:
# This could mean the user ID doesn't exist or there was another issue
# Check if user exists first
user_check = supabase.table('profiles').select('id').eq('id', target_user_id).maybe_single().execute()
if not user_check.data:
return jsonify({'error': 'User not found'}), 404
else:
raise Exception(f"Failed to update suspension status: {update_res.error}")
return jsonify({'success': True, 'message': f'User {target_user_id} suspension status set to {should_suspend}'}), 200
except Exception as e:
logging.error(f"Admin suspend user error: {e}")
return jsonify({'error': str(e)}), 500
# Add other admin endpoints (update credits, view specific data) similarly,
# === Credit Management Endpoints ===
@app.route('/api/user/credits/request', methods=['POST'])
def request_credits():
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
try:
data = request.get_json()
amount = data.get('amount')
note = data.get('note', '')
if not amount or not isinstance(amount, int) or amount <= 0:
return jsonify({'error': 'Invalid amount (must be positive integer)'}), 400
res = supabase.table('credit_requests').insert({
'user_id': user.id,
'amount': amount,
'status': 'pending',
'note': note,
'created_at': datetime.now().isoformat()
}).execute()
return jsonify({
'success': True,
'request_id': res.data[0]['id']
}), 201
except Exception as e:
logging.error(f"Credit request failed for user {user.id}: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/credit-requests', methods=['GET'])
def admin_get_credit_requests():
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
is_admin, admin_error = verify_admin(user)
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status']
try:
status = request.args.get('status', 'pending')
res = supabase.table('credit_requests').select('*').eq('status', status).execute()
return jsonify(res.data), 200
except Exception as e:
logging.error(f"Admin credit requests fetch failed: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/credit-requests/<uuid:request_id>', methods=['PUT'])
def admin_review_credit_request(request_id):
user, error = verify_token(request.headers.get('Authorization'))
if error: return jsonify({'error': error['error']}), error['status']
is_admin, admin_error = verify_admin(user)
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status']
try:
data = request.get_json()
action = data.get('action')
admin_note = data.get('note', '')
if action not in ['approve', 'decline']:
return jsonify({'error': 'Invalid action'}), 400
req_res = supabase.table('credit_requests').select('*').eq('id', request_id).maybe_single().execute()
if not req_res.data:
return jsonify({'error': 'Request not found'}), 404
req = req_res.data
if req['status'] != 'pending':
return jsonify({'error': 'Request already processed'}), 400
update_data = {
'status': 'approved' if action == 'approve' else 'declined',
'reviewed_at': datetime.now().isoformat(),
'reviewed_by': user.id,
'admin_note': admin_note
}
if action == 'approve':
supabase.table('profiles').update(
{'credits': supabase.table('profiles').credits + req['amount']}
).eq('id', req['user_id']).execute()
supabase.table('credit_requests').update(update_data).eq('id', request_id).execute()
return jsonify({'success': True}), 200
except Exception as e:
logging.error(f"Credit request processing failed: {e}")
return jsonify({'error': str(e)}), 500
# === Main Execution ===
if __name__ == '__main__':
if not all([SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY]):
print("WARNING: One or more essential environment variables (SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY) are missing!")
print("Starting Flask server for AI Tutor...")
# Use Gunicorn or Waitress for production instead of app.run(debug=True)
app.run(debug=True, host="0.0.0.0", port=7860)