|
|
|
|
|
import io |
|
|
import uuid |
|
|
import re |
|
|
import time |
|
|
import tempfile |
|
|
import requests |
|
|
import json |
|
|
import os |
|
|
import logging |
|
|
import traceback |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import urllib.parse |
|
|
|
|
|
from flask import Flask, request, jsonify, send_file, Response |
|
|
from flask_cors import CORS |
|
|
from supabase import create_client, Client |
|
|
|
|
|
|
|
|
import google.generativeai as genai |
|
|
from elevenlabs.client import ElevenLabs |
|
|
from elevenlabs import save as save_elevenlabs_audio |
|
|
from PyPDF2 import PdfReader |
|
|
import wikipedia |
|
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
|
import arxiv |
|
|
from elevenlabs import play, stream, save |
|
|
import math |
|
|
import pydub |
|
|
import logging |
|
|
import traceback |
|
|
import uuid |
|
|
from io import BytesIO |
|
|
|
|
|
|
|
|
|
|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
|
|
|
SUPABASE_URL = os.getenv("SUPABASE_URL") |
|
|
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") |
|
|
SUPABASE_ANON_KEY = os.getenv("SUPABASE_ANON_KEY") |
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") |
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
CORS(app) |
|
|
|
|
|
|
|
|
try: |
|
|
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: |
|
|
raise ValueError("Supabase URL and Service Key must be set in environment variables.") |
|
|
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) |
|
|
print("Supabase client initialized successfully.") |
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error initializing Supabase client: {e}") |
|
|
|
|
|
supabase = None |
|
|
|
|
|
|
|
|
try: |
|
|
if not GEMINI_API_KEY: |
|
|
raise ValueError("Gemini API Key must be set in environment variables.") |
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
|
|
|
gemini_model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp') |
|
|
print("Gemini API initialized successfully.") |
|
|
except Exception as e: |
|
|
print(f"Error initializing Gemini API: {e}") |
|
|
gemini_model = None |
|
|
|
|
|
|
|
|
try: |
|
|
if not ELEVENLABS_API_KEY: |
|
|
raise ValueError("ElevenLabs API Key must be set in environment variables.") |
|
|
elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) |
|
|
print("ElevenLabs client initialized successfully.") |
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error initializing ElevenLabs client: {e}") |
|
|
elevenlabs_client = None |
|
|
|
|
|
|
|
|
LOG_FILE_PATH = "/tmp/ai_tutor.log" |
|
|
logging.basicConfig(filename=LOG_FILE_PATH, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def verify_token(auth_header): |
|
|
"""Verifies Supabase JWT token from Authorization header.""" |
|
|
if not supabase: |
|
|
raise ConnectionError("Supabase client not initialized.") |
|
|
if not auth_header or not auth_header.startswith('Bearer '): |
|
|
return None, {'error': 'Missing or invalid Authorization header', 'status': 401} |
|
|
|
|
|
token = auth_header.split(' ')[1] |
|
|
try: |
|
|
|
|
|
response = supabase.auth.get_user(token) |
|
|
user = response.user |
|
|
if not user: |
|
|
return None, {'error': 'Invalid or expired token', 'status': 401} |
|
|
|
|
|
|
|
|
return user, None |
|
|
except Exception as e: |
|
|
logging.error(f"Token verification error: {e}") |
|
|
|
|
|
return None, {'error': f'Token verification failed: {e}', 'status': 401} |
|
|
|
|
|
def verify_admin(user): |
|
|
"""Checks if the verified user is an admin.""" |
|
|
if not supabase: |
|
|
raise ConnectionError("Supabase client not initialized.") |
|
|
if not user: |
|
|
return False, {'error': 'User not provided for admin check', 'status': 400} |
|
|
try: |
|
|
|
|
|
profile_res = supabase.table('profiles').select('is_admin').eq('id', user.id).maybe_single().execute() |
|
|
profile_data = profile_res.data |
|
|
if profile_data and profile_data.get('is_admin'): |
|
|
return True, None |
|
|
else: |
|
|
return False, {'error': 'Admin access required', 'status': 403} |
|
|
except Exception as e: |
|
|
logging.error(f"Admin check failed for user {user.id}: {e}") |
|
|
return False, {'error': f'Error checking admin status: {e}', 'status': 500} |
|
|
|
|
|
|
|
|
def upload_to_supabase_storage(bucket_name: str, file_path: str, destination_path: str, content_type: str): |
|
|
"""Uploads a local file to Supabase Storage.""" |
|
|
if not supabase: |
|
|
raise ConnectionError("Supabase client not initialized.") |
|
|
try: |
|
|
with open(file_path, 'rb') as f: |
|
|
|
|
|
supabase.storage.from_(bucket_name).upload( |
|
|
path=destination_path, |
|
|
file=f, |
|
|
file_options={"content-type": content_type, "cache-control": "3600", "upsert": "true"} |
|
|
) |
|
|
|
|
|
res = supabase.storage.from_(bucket_name).get_public_url(destination_path) |
|
|
return res |
|
|
except Exception as e: |
|
|
logging.error(f"Supabase Storage upload failed: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
def get_pdf_text(pdf_file_storage): |
|
|
"""Extract text from a PDF file stream.""" |
|
|
text = "" |
|
|
try: |
|
|
pdf_reader = PdfReader(pdf_file_storage) |
|
|
for page in pdf_reader.pages: |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text += page_text + "\n" |
|
|
|
|
|
MAX_CHARS = 300000 |
|
|
return text[:MAX_CHARS] |
|
|
except Exception as e: |
|
|
logging.error(f"Error reading PDF: {e}") |
|
|
raise ValueError(f"Could not process PDF file: {e}") |
|
|
|
|
|
def get_youtube_transcript(url): |
|
|
"""Get transcript text from a YouTube URL.""" |
|
|
try: |
|
|
if "v=" in url: |
|
|
video_id = url.split("v=")[1].split("&")[0] |
|
|
elif "youtu.be/" in url: |
|
|
video_id = url.split("youtu.be/")[1].split("?")[0] |
|
|
else: |
|
|
raise ValueError("Invalid YouTube URL format.") |
|
|
|
|
|
transcript_list = YouTubeTranscriptApi.get_transcript(video_id) |
|
|
transcript_text = " ".join([item['text'] for item in transcript_list]) |
|
|
MAX_CHARS = 300000 |
|
|
return transcript_text[:MAX_CHARS] |
|
|
except Exception as e: |
|
|
logging.error(f"Error getting YouTube transcript for {url}: {e}") |
|
|
raise ValueError(f"Could not get transcript: {e}") |
|
|
|
|
|
def get_wiki_content(url): |
|
|
"""Get summary content from a Wikipedia URL.""" |
|
|
try: |
|
|
|
|
|
page_title = urllib.parse.unquote(url.rstrip("/").split("/")[-1]).replace("_", " ") |
|
|
wikipedia.set_lang("en") |
|
|
page = wikipedia.page(page_title, auto_suggest=False) |
|
|
content = page.content |
|
|
|
|
|
MAX_CHARS = 300000 |
|
|
return content[:MAX_CHARS] |
|
|
except wikipedia.exceptions.PageError: |
|
|
raise ValueError(f"Wikipedia page '{page_title}' not found.") |
|
|
except wikipedia.exceptions.DisambiguationError as e: |
|
|
raise ValueError(f"'{page_title}' refers to multiple pages: {e.options}") |
|
|
except Exception as e: |
|
|
logging.error(f"Error getting Wikipedia content for {url}: {e}") |
|
|
raise ValueError(f"Could not get Wikipedia content: {e}") |
|
|
|
|
|
def fetch_bible_text(reference): |
|
|
"""Fetch Bible text from an external API (example using bible-api.com).""" |
|
|
|
|
|
try: |
|
|
|
|
|
query = urllib.parse.quote(reference) |
|
|
api_url = f"https://bible-api.com/{query}?translation=kjv" |
|
|
response = requests.get(api_url, timeout=15) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
|
|
|
if 'text' in data: |
|
|
text = data['text'].strip() |
|
|
MAX_CHARS = 300000 |
|
|
return text[:MAX_CHARS] |
|
|
elif 'error' in data: |
|
|
raise ValueError(f"Bible API error: {data['error']}") |
|
|
else: |
|
|
|
|
|
if 'verses' in data and isinstance(data['verses'], list): |
|
|
text = " ".join([v.get('text', '').strip() for v in data['verses']]) |
|
|
MAX_CHARS = 300000 |
|
|
return text[:MAX_CHARS] if text else ValueError("Bible reference not found or empty.") |
|
|
else: |
|
|
raise ValueError("Bible API response format not recognized.") |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
logging.error(f"Error fetching Bible text for '{reference}': {e}") |
|
|
raise ConnectionError(f"Could not connect to Bible API: {e}") |
|
|
except Exception as e: |
|
|
logging.error(f"Error processing Bible reference '{reference}': {e}") |
|
|
raise ValueError(f"Could not process Bible reference: {e}") |
|
|
|
|
|
|
|
|
def get_arxiv_content(arxiv_id): |
|
|
"""Fetch abstract or PDF text from ArXiv.""" |
|
|
try: |
|
|
|
|
|
if 'arxiv.org/abs/' in arxiv_id: |
|
|
arxiv_id = arxiv_id.split('/abs/')[-1] |
|
|
if 'arxiv.org/pdf/' in arxiv_id: |
|
|
arxiv_id = arxiv_id.split('/pdf/')[-1].replace('.pdf', '') |
|
|
|
|
|
search = arxiv.Search(id_list=[arxiv_id]) |
|
|
paper = next(search.results()) |
|
|
|
|
|
|
|
|
content = f"Title: {paper.title}\n\nAbstract: {paper.summary}" |
|
|
MAX_CHARS = 300000 |
|
|
return content[:MAX_CHARS], paper.title |
|
|
except StopIteration: |
|
|
raise ValueError(f"ArXiv paper with ID '{arxiv_id}' not found.") |
|
|
except Exception as e: |
|
|
logging.error(f"Error fetching ArXiv content for {arxiv_id}: {e}") |
|
|
raise ValueError(f"Could not get ArXiv content: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_notes_with_gemini(text_content, title=None): |
|
|
"""Generates study notes using Gemini.""" |
|
|
if not gemini_model: |
|
|
raise ConnectionError("Gemini client not initialized.") |
|
|
try: |
|
|
prompt = f""" |
|
|
Act as an expert educator and study assistant. Based on the following text {'titled "' + title + '" ' if title else ''} , generate comprehensive and well-structured study notes. |
|
|
|
|
|
**Instructions:** |
|
|
1. **Identify Key Concepts:** Extract the main topics, definitions, key figures, dates, arguments, and important takeaways. |
|
|
2. **Structure Logically:** Organize the notes with clear headings (using Markdown ##) and bullet points (* or -) for readability. Use sub-bullets if necessary. |
|
|
3. **Be Concise but Thorough:** Summarize the information accurately without unnecessary jargon. Ensure all critical points are covered. |
|
|
4. **Highlight Importance:** You can use bold text (**bold**) for very important terms or concepts. |
|
|
5. **Focus:** Generate only the notes based on the provided text. Do not add introductions like "Here are the notes..." or conclusions like "These notes cover...". |
|
|
|
|
|
**Source Text:** |
|
|
--- |
|
|
{text_content} |
|
|
--- |
|
|
|
|
|
**Generated Study Notes:** |
|
|
""" |
|
|
response = gemini_model.generate_content(prompt) |
|
|
return response.text.strip() |
|
|
except Exception as e: |
|
|
logging.error(f"Gemini note generation failed: {e}") |
|
|
raise RuntimeError(f"AI failed to generate notes: {e}") |
|
|
|
|
|
def generate_quiz_with_gemini(notes_content, difficulty, num_questions=5): |
|
|
"""Generates multiple-choice quiz using Gemini.""" |
|
|
if not gemini_model: |
|
|
raise ConnectionError("Gemini client not initialized.") |
|
|
|
|
|
difficulty_map = { |
|
|
"easy": "basic recall and understanding", |
|
|
"medium": "application and interpretation", |
|
|
"hard": "analysis, synthesis, and evaluation" |
|
|
} |
|
|
difficulty_desc = difficulty_map.get(difficulty.lower(), "medium difficulty") |
|
|
|
|
|
try: |
|
|
prompt = f""" |
|
|
Act as an expert quiz creator. Based on the following study notes, create a multiple-choice quiz. |
|
|
|
|
|
**Instructions:** |
|
|
1. **Number of Questions:** Generate exactly {num_questions} questions. |
|
|
2. **Difficulty Level:** The questions should be of {difficulty_desc} ({difficulty}). |
|
|
3. **Format:** Each question must have exactly four options (A, B, C, D). |
|
|
4. **Clarity:** Questions and options should be clear and unambiguous. |
|
|
5. **Single Correct Answer:** Ensure only one option is the correct answer. |
|
|
6. **JSON Output:** Format the entire output STRICTLY as a JSON list of objects. Each object must have the following keys: "question" (string), "options" (an object with keys "A", "B", "C", "D", all strings), and "correct_answer" (string, either "A", "B", "C", or "D"). |
|
|
7. **Focus:** Generate only the JSON output. Do not include any introductory text, explanations, or markdown formatting outside the JSON structure. |
|
|
|
|
|
**Study Notes:** |
|
|
--- |
|
|
{notes_content} |
|
|
--- |
|
|
|
|
|
**Quiz JSON Output:** |
|
|
```json |
|
|
[ |
|
|
{{ |
|
|
"question": "...", |
|
|
"options": {{ |
|
|
"A": "...", |
|
|
"B": "...", |
|
|
"C": "...", |
|
|
"D": "..." |
|
|
}}, |
|
|
"correct_answer": "..." |
|
|
}} |
|
|
// ... more question objects |
|
|
] |
|
|
``` |
|
|
""" |
|
|
response = gemini_model.generate_content(prompt) |
|
|
|
|
|
cleaned_response = response.text.strip().lstrip('```json').rstrip('```').strip() |
|
|
|
|
|
quiz_data = json.loads(cleaned_response) |
|
|
|
|
|
if not isinstance(quiz_data, list): |
|
|
raise ValueError("AI response is not a list.") |
|
|
if quiz_data and not all(k in quiz_data[0] for k in ["question", "options", "correct_answer"]): |
|
|
raise ValueError("AI response list items have missing keys.") |
|
|
return quiz_data |
|
|
except json.JSONDecodeError as e: |
|
|
logging.error(f"Gemini quiz generation returned invalid JSON: {cleaned_response[:500]}... Error: {e}") |
|
|
raise RuntimeError(f"AI failed to generate a valid quiz format. Please try again.") |
|
|
except Exception as e: |
|
|
logging.error(f"Gemini quiz generation failed: {e}") |
|
|
raise RuntimeError(f"AI failed to generate quiz: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/auth/signup', methods=['POST']) |
|
|
def signup(): |
|
|
if not supabase: return jsonify({'error': 'Service unavailable'}), 503 |
|
|
try: |
|
|
data = request.get_json() |
|
|
email = data.get('email') |
|
|
password = data.get('password') |
|
|
if not email or not password: |
|
|
return jsonify({'error': 'Email and password are required'}), 400 |
|
|
|
|
|
res = supabase.auth.sign_up({"email": email, "password": password}) |
|
|
|
|
|
|
|
|
supabase.table('profiles').upsert({ |
|
|
'id': res.user.id, |
|
|
'email': email, |
|
|
'credits': 20 |
|
|
}).execute() |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'message': 'Signup successful. Please check your email for verification.', |
|
|
'user_id': res.user.id if res.user else None |
|
|
}), 201 |
|
|
|
|
|
except Exception as e: |
|
|
error_message = str(e) |
|
|
status_code = 400 |
|
|
if "User already registered" in error_message: |
|
|
error_message = "Email already exists." |
|
|
status_code = 409 |
|
|
logging.error(f"Signup error: {error_message}") |
|
|
return jsonify({'error': error_message}), status_code |
|
|
|
|
|
@app.route('/api/auth/signin', methods=['POST']) |
|
|
def signin(): |
|
|
if not supabase: return jsonify({'error': 'Service unavailable'}), 503 |
|
|
try: |
|
|
data = request.get_json() |
|
|
email = data.get('email') |
|
|
password = data.get('password') |
|
|
if not email or not password: |
|
|
return jsonify({'error': 'Email and password are required'}), 400 |
|
|
|
|
|
|
|
|
res = supabase.auth.sign_in_with_password({"email": email, "password": password}) |
|
|
|
|
|
|
|
|
profile_res = supabase.table('profiles').select('*').eq('id', res.user.id).maybe_single().execute() |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'access_token': res.session.access_token, |
|
|
'refresh_token': res.session.refresh_token, |
|
|
'user': { |
|
|
'id': res.user.id, |
|
|
'email': res.user.email, |
|
|
'profile': profile_res.data |
|
|
} |
|
|
}), 200 |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
error_message = str(e) |
|
|
status_code = 401 |
|
|
if "Invalid login credentials" in error_message: |
|
|
error_message = "Invalid email or password." |
|
|
elif "Email not confirmed" in error_message: |
|
|
error_message = "Please verify your email address before signing in." |
|
|
status_code = 403 |
|
|
logging.error(f"Signin error: {error_message}") |
|
|
return jsonify({'error': error_message}), status_code |
|
|
|
|
|
@app.route('/api/auth/google-signin', methods=['POST']) |
|
|
def google_signin(): |
|
|
|
|
|
|
|
|
|
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: |
|
|
return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
try: |
|
|
|
|
|
profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute() |
|
|
|
|
|
if not profile_res.data: |
|
|
|
|
|
logging.warning(f"Google Sign-In: Profile not found for verified user {user.id}, attempting to create.") |
|
|
|
|
|
insert_res = supabase.table('profiles').insert({ |
|
|
'id': user.id, |
|
|
'email': user.email, |
|
|
|
|
|
}).execute() |
|
|
profile_data = insert_res.data[0] if insert_res.data else None |
|
|
if not profile_data: |
|
|
raise Exception("Failed to create profile entry after Google Sign-In.") |
|
|
else: |
|
|
profile_data = profile_res.data |
|
|
|
|
|
|
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'message': 'Google sign-in verified successfully.', |
|
|
'user': { |
|
|
'id': user.id, |
|
|
'email': user.email, |
|
|
'profile': profile_data |
|
|
} |
|
|
}), 200 |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Google sign-in profile fetch/creation error: {e}") |
|
|
return jsonify({'error': f'An error occurred during sign-in: {e}'}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/user/profile', methods=['GET']) |
|
|
def get_user_profile(): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: |
|
|
return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
try: |
|
|
|
|
|
profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute() |
|
|
|
|
|
if not profile_res.data: |
|
|
|
|
|
logging.error(f"Profile not found for authenticated user: {user.id} / {user.email}") |
|
|
return jsonify({'error': 'User profile not found.'}), 404 |
|
|
|
|
|
|
|
|
profile_data = profile_res.data |
|
|
full_user_data = { |
|
|
'id': user.id, |
|
|
'email': user.email, |
|
|
'credits': profile_data.get('credits'), |
|
|
'is_admin': profile_data.get('is_admin'), |
|
|
'created_at': profile_data.get('created_at'), |
|
|
'suspended': profile_data.get('suspended') |
|
|
|
|
|
} |
|
|
|
|
|
return jsonify(full_user_data), 200 |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error fetching user profile for {user.id}: {e}") |
|
|
return jsonify({'error': f'Failed to fetch profile: {e}'}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/tutor/process_input', methods=['POST']) |
|
|
def process_input_and_generate_notes(): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
|
|
|
profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute() |
|
|
if profile_res.data['suspended']: |
|
|
return jsonify({'error': 'Account suspended'}), 403 |
|
|
if profile_res.data['credits'] < 2: |
|
|
return jsonify({'error': 'Insufficient credits (Need 2)'}), 402 |
|
|
|
|
|
|
|
|
try: |
|
|
input_type = request.form.get('input_type') |
|
|
source_ref = request.form.get('source_ref') |
|
|
uploaded_file = request.files.get('file') |
|
|
|
|
|
if not input_type: |
|
|
return jsonify({'error': 'input_type (e.g., pdf, youtube, wiki, bible, arxiv, text) is required'}), 400 |
|
|
|
|
|
content = None |
|
|
title = None |
|
|
|
|
|
if input_type == 'pdf': |
|
|
if not uploaded_file: return jsonify({'error': 'File is required for input_type pdf'}), 400 |
|
|
if not uploaded_file.filename.lower().endswith('.pdf'): return jsonify({'error': 'Only PDF files are allowed'}), 400 |
|
|
content = get_pdf_text(uploaded_file.stream) |
|
|
source_ref = uploaded_file.filename |
|
|
title = uploaded_file.filename |
|
|
elif input_type == 'youtube': |
|
|
if not source_ref: return jsonify({'error': 'source_ref (YouTube URL) is required'}), 400 |
|
|
content = get_youtube_transcript(source_ref) |
|
|
|
|
|
elif input_type == 'wiki': |
|
|
if not source_ref: return jsonify({'error': 'source_ref (Wikipedia URL) is required'}), 400 |
|
|
content = get_wiki_content(source_ref) |
|
|
title = urllib.parse.unquote(source_ref.rstrip("/").split("/")[-1]).replace("_", " ") |
|
|
elif input_type == 'bible': |
|
|
if not source_ref: return jsonify({'error': 'source_ref (Bible reference) is required'}), 400 |
|
|
content = fetch_bible_text(source_ref) |
|
|
title = source_ref |
|
|
elif input_type == 'arxiv': |
|
|
if not source_ref: return jsonify({'error': 'source_ref (ArXiv ID or URL) is required'}), 400 |
|
|
content, title = get_arxiv_content(source_ref) |
|
|
elif input_type == 'text': |
|
|
content = request.form.get('text_content') |
|
|
if not content: return jsonify({'error': 'text_content is required for input_type text'}), 400 |
|
|
source_ref = content[:100] + "..." |
|
|
title = "Custom Text" |
|
|
else: |
|
|
return jsonify({'error': f'Unsupported input_type: {input_type}'}), 400 |
|
|
|
|
|
if not content: |
|
|
return jsonify({'error': 'Failed to extract content from the source.'}), 500 |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
logging.info(f"Generating notes for user {user.id}, type: {input_type}, ref: {source_ref[:50]}") |
|
|
generated_notes = generate_notes_with_gemini(content, title=title) |
|
|
logging.info(f"Notes generation took {time.time() - start_time:.2f}s") |
|
|
|
|
|
|
|
|
|
|
|
material_res = supabase.table('study_materials').insert({ |
|
|
'user_id': user.id, |
|
|
'type': input_type, |
|
|
'source_ref': source_ref, |
|
|
'source_content': content if len(content) < 10000 else content[:10000] + "... (truncated)", |
|
|
'title': title |
|
|
}).execute() |
|
|
if not material_res.data: raise Exception(f"Failed to save study material: {material_res.error}") |
|
|
material_id = material_res.data[0]['id'] |
|
|
|
|
|
|
|
|
notes_res = supabase.table('notes').insert({ |
|
|
'material_id': material_id, |
|
|
'user_id': user.id, |
|
|
'content': generated_notes |
|
|
}).execute() |
|
|
if not notes_res.data: raise Exception(f"Failed to save generated notes: {notes_res.error}") |
|
|
notes_id = notes_res.data[0]['id'] |
|
|
|
|
|
|
|
|
new_credits = profile_res.data['credits'] - 2 |
|
|
supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute() |
|
|
|
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'message': 'Content processed and notes generated successfully.', |
|
|
'material_id': material_id, |
|
|
'notes_id': notes_id, |
|
|
'notes': generated_notes |
|
|
}), 201 |
|
|
|
|
|
except ValueError as e: |
|
|
logging.warning(f"Input processing error for user {user.id}: {e}") |
|
|
return jsonify({'error': str(e)}), 400 |
|
|
except ConnectionError as e: |
|
|
logging.error(f"Connection error during processing: {e}") |
|
|
return jsonify({'error': f'A backend service is unavailable: {e}'}), 503 |
|
|
except RuntimeError as e: |
|
|
logging.error(f"RuntimeError during processing for user {user.id}: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
except Exception as e: |
|
|
logging.error(f"Unexpected error processing input for user {user.id}: {traceback.format_exc()}") |
|
|
return jsonify({'error': f'An unexpected error occurred: {e}'}), 500 |
|
|
|
|
|
@app.route('/api/view/notes/<uuid:note_id>', methods=['GET']) |
|
|
def get_note_by_id(note_id): |
|
|
try: |
|
|
|
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: |
|
|
return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
|
|
|
note_res = supabase.table('notes') \ |
|
|
.select('id, content, created_at, tts_audio_url, study_materials(title, type, source_ref)') \ |
|
|
.eq('id', note_id) \ |
|
|
.eq('user_id', user.id) \ |
|
|
.maybe_single() \ |
|
|
.execute() |
|
|
|
|
|
if not note_res.data: |
|
|
return jsonify({'error': 'Note not found or unauthorized'}), 404 |
|
|
|
|
|
|
|
|
note_data = note_res.data |
|
|
response_data = { |
|
|
"note": { |
|
|
"note_id": note_data['id'], |
|
|
"content": note_data['content'], |
|
|
"audio_url": note_data['tts_audio_url'], |
|
|
"created_at": note_data['created_at'], |
|
|
"material": { |
|
|
"title": note_data['study_materials']['title'] if note_data['study_materials'] else "Untitled", |
|
|
"type": note_data['study_materials']['type'] if note_data['study_materials'] else None, |
|
|
"source_ref": note_data['study_materials']['source_ref'] if note_data['study_materials'] else None |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return jsonify(response_data) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error fetching note {note_id}: {str(e)}") |
|
|
return jsonify({'error': 'Internal server error'}), 500 |
|
|
|
|
|
@app.route('/api/tutor/notes/<uuid:notes_id>/generate_quiz', methods=['POST']) |
|
|
def generate_quiz_for_notes(notes_id): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
|
|
|
profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute() |
|
|
if profile_res.data['suspended']: |
|
|
return jsonify({'error': 'Account suspended'}), 403 |
|
|
if profile_res.data['credits'] < 2: |
|
|
return jsonify({'error': 'Insufficient credits (Need 2)'}), 402 |
|
|
|
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
difficulty = data.get('difficulty', 'medium').lower() |
|
|
num_questions = int(data.get('num_questions', 5)) |
|
|
|
|
|
if difficulty not in ['easy', 'medium', 'hard']: |
|
|
return jsonify({'error': 'difficulty must be easy, medium, or hard'}), 400 |
|
|
if not 1 <= num_questions <= 10: |
|
|
return jsonify({'error': 'num_questions must be between 1 and 10'}), 400 |
|
|
|
|
|
|
|
|
notes_res = supabase.table('notes').select('content, user_id').eq('id', notes_id).maybe_single().execute() |
|
|
if not notes_res.data: |
|
|
return jsonify({'error': 'Notes not found'}), 404 |
|
|
|
|
|
if notes_res.data['user_id'] != user.id: |
|
|
return jsonify({'error': 'You do not have permission to access these notes'}), 403 |
|
|
|
|
|
notes_content = notes_res.data['content'] |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
logging.info(f"Generating {difficulty} quiz ({num_questions}q) for user {user.id}, notes: {notes_id}") |
|
|
quiz_questions = generate_quiz_with_gemini(notes_content, difficulty, num_questions) |
|
|
logging.info(f"Quiz generation took {time.time() - start_time:.2f}s") |
|
|
|
|
|
|
|
|
quiz_res = supabase.table('quizzes').insert({ |
|
|
'notes_id': str(notes_id), |
|
|
'user_id': user.id, |
|
|
'difficulty': difficulty, |
|
|
'questions': json.dumps(quiz_questions) |
|
|
}).execute() |
|
|
if not quiz_res.data: raise Exception(f"Failed to save generated quiz: {quiz_res.error}") |
|
|
quiz_id = quiz_res.data[0]['id'] |
|
|
|
|
|
new_credits = profile_res.data['credits'] - 2 |
|
|
supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute() |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'quiz_id': quiz_id, |
|
|
'difficulty': difficulty, |
|
|
'questions': quiz_questions |
|
|
}), 201 |
|
|
|
|
|
except ValueError as e: |
|
|
return jsonify({'error': str(e)}), 400 |
|
|
except ConnectionError as e: |
|
|
logging.error(f"Connection error during quiz generation: {e}") |
|
|
return jsonify({'error': f'A backend service is unavailable: {e}'}), 503 |
|
|
except RuntimeError as e: |
|
|
logging.error(f"RuntimeError during quiz generation for user {user.id}: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
except Exception as e: |
|
|
logging.error(f"Unexpected error generating quiz for user {user.id}, notes {notes_id}: {traceback.format_exc()}") |
|
|
return jsonify({'error': f'An unexpected error occurred: {e}'}), 500 |
|
|
|
|
|
@app.route('/api/view/quizzes/<uuid:quiz_id>', methods=['GET']) |
|
|
def get_quiz_by_id(quiz_id): |
|
|
try: |
|
|
|
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: |
|
|
return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
|
|
|
quiz_res = supabase.table('quizzes') \ |
|
|
.select('''id, difficulty, created_at, questions, |
|
|
notes(id, content, study_materials(title, type))''') \ |
|
|
.eq('id', quiz_id) \ |
|
|
.eq('user_id', user.id) \ |
|
|
.maybe_single() \ |
|
|
.execute() |
|
|
|
|
|
if not quiz_res.data: |
|
|
return jsonify({'error': 'Quiz not found or unauthorized'}), 404 |
|
|
|
|
|
|
|
|
quiz_data = quiz_res.data |
|
|
response_data = { |
|
|
"quiz": { |
|
|
"quiz_id": quiz_data['id'], |
|
|
"difficulty": quiz_data['difficulty'], |
|
|
"created_at": quiz_data['created_at'], |
|
|
"questions": quiz_data['questions'], |
|
|
"source_note": { |
|
|
"note_id": quiz_data['notes']['id'], |
|
|
"content_preview": quiz_data['notes']['content'][:100] + "..." if quiz_data['notes']['content'] else None, |
|
|
"material": { |
|
|
"title": quiz_data['notes']['study_materials']['title'] if quiz_data['notes']['study_materials'] else None, |
|
|
"type": quiz_data['notes']['study_materials']['type'] if quiz_data['notes']['study_materials'] else None |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
return jsonify(response_data) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error fetching quiz {quiz_id}: {str(e)}") |
|
|
return jsonify({'error': 'Internal server error'}), 500 |
|
|
|
|
|
@app.route('/api/tutor/quizzes/<uuid:quiz_id>/submit', methods=['POST']) |
|
|
def submit_quiz_attempt(quiz_id): |
|
|
"""Submits user answers for a quiz and calculates the score.""" |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
if not supabase: return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
user_answers = data.get('answers') |
|
|
|
|
|
if not isinstance(user_answers, dict): |
|
|
return jsonify({'error': 'answers must be provided as a JSON object'}), 400 |
|
|
|
|
|
|
|
|
quiz_res = supabase.table('quizzes')\ |
|
|
.select('questions, user_id')\ |
|
|
.eq('id', quiz_id)\ |
|
|
.maybe_single()\ |
|
|
.execute() |
|
|
|
|
|
if not quiz_res.data: |
|
|
return jsonify({'error': 'Quiz not found'}), 404 |
|
|
|
|
|
quiz_questions = quiz_res.data['questions'] |
|
|
if isinstance(quiz_questions, str): |
|
|
quiz_questions = json.loads(quiz_questions) |
|
|
|
|
|
|
|
|
correct_count = 0 |
|
|
total_questions = len(quiz_questions) |
|
|
feedback = {} |
|
|
correct_answers = {} |
|
|
|
|
|
for i, question in enumerate(quiz_questions): |
|
|
question_id = question.get('id', str(i)) |
|
|
user_answer = user_answers.get(str(question_id)) |
|
|
correct_answer = question.get('correct_answer') |
|
|
|
|
|
if user_answer and correct_answer: |
|
|
is_correct = user_answer.upper() == correct_answer.upper() |
|
|
if is_correct: |
|
|
correct_count += 1 |
|
|
|
|
|
correct_answers[str(question_id)] = correct_answer |
|
|
feedback[str(question_id)] = { |
|
|
"correct": is_correct, |
|
|
"correct_answer": correct_answer, |
|
|
"user_answer": user_answer |
|
|
} |
|
|
|
|
|
score = (correct_count / total_questions) * 100 if total_questions > 0 else 0.0 |
|
|
|
|
|
|
|
|
attempt_res = supabase.table('quiz_attempts').insert({ |
|
|
'quiz_id': str(quiz_id), |
|
|
'user_id': user.id, |
|
|
'score': score, |
|
|
'answers': json.dumps(user_answers) |
|
|
}).execute() |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'attempt_id': attempt_res.data[0]['id'], |
|
|
'score': round(score, 2), |
|
|
'correct_count': correct_count, |
|
|
'total_questions': total_questions, |
|
|
'correct_answers': correct_answers, |
|
|
'feedback': feedback |
|
|
}), 201 |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error submitting quiz: {traceback.format_exc()}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from pydub import AudioSegment |
|
|
PYDUB_AVAILABLE = True |
|
|
except ImportError: |
|
|
PYDUB_AVAILABLE = False |
|
|
logging.warning("pydub library not found or ffmpeg might be missing. Audio chunk concatenation will fail. Please install pydub and ensure ffmpeg is in your system's PATH.") |
|
|
|
|
|
class AudioSegment: |
|
|
@staticmethod |
|
|
def from_file(*args, **kwargs): |
|
|
raise ImportError("pydub/ffmpeg not installed or accessible") |
|
|
def __add__(self, other): |
|
|
raise ImportError("pydub/ffmpeg not installed or accessible") |
|
|
def export(self, *args, **kwargs): |
|
|
raise ImportError("pydub/ffmpeg not installed or accessible") |
|
|
|
|
|
|
|
|
def generate_tts_audio(text_to_speak, voice_id="Rachel"): |
|
|
"""Generates TTS audio using ElevenLabs and returns audio bytes.""" |
|
|
if not elevenlabs_client: |
|
|
raise ConnectionError("ElevenLabs client not initialized.") |
|
|
try: |
|
|
|
|
|
audio_stream = elevenlabs_client.generate( |
|
|
text=text_to_speak, |
|
|
voice=voice_id, |
|
|
model="eleven_multilingual_v2", |
|
|
stream=False |
|
|
) |
|
|
|
|
|
|
|
|
audio_bytes = b"" |
|
|
for chunk in audio_stream: |
|
|
audio_bytes += chunk |
|
|
|
|
|
if not audio_bytes: |
|
|
raise ValueError("ElevenLabs generated empty audio.") |
|
|
|
|
|
return audio_bytes |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"ElevenLabs TTS generation failed: {e}") |
|
|
raise RuntimeError(f"Failed to generate audio: {e}") |
|
|
|
|
|
|
|
|
@app.route('/api/tutor/notes/<uuid:notes_id>/speak', methods=['POST']) |
|
|
def speak_notes(notes_id): |
|
|
""" |
|
|
Generate TTS audio for notes using ElevenLabs, |
|
|
combine chunks using pydub, and store the final MP3 in Supabase Storage. |
|
|
Updates the note record with the audio URL and deducts credits. |
|
|
Rejects requests for content over 10,000 characters. |
|
|
""" |
|
|
if not PYDUB_AVAILABLE: |
|
|
logging.error("Audio processing library (pydub/ffmpeg) check failed.") |
|
|
return jsonify({'error': 'Server configuration error: Audio processing library not available.'}), 500 |
|
|
|
|
|
|
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: |
|
|
return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
if not supabase or not elevenlabs_client: |
|
|
logging.error("Backend service (Supabase or ElevenLabs client) not initialized.") |
|
|
return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
|
|
|
try: |
|
|
|
|
|
logging.info(f"Processing speak request for note {notes_id} by user {user.id}") |
|
|
note_res = supabase.table('notes') \ |
|
|
.select('user_id, content, tts_audio_url') \ |
|
|
.eq('id', str(notes_id)) \ |
|
|
.eq('user_id', user.id) \ |
|
|
.maybe_single() \ |
|
|
.execute() |
|
|
|
|
|
if not note_res.data: |
|
|
logging.warning(f"Note {notes_id} not found or unauthorized for user {user.id}.") |
|
|
return jsonify({'error': 'Note not found or unauthorized'}), 404 |
|
|
|
|
|
|
|
|
profile_res = supabase.table('profiles') \ |
|
|
.select('credits, suspended') \ |
|
|
.eq('id', user.id) \ |
|
|
.single() \ |
|
|
.execute() |
|
|
|
|
|
|
|
|
if not profile_res.data: |
|
|
logging.error(f"Could not fetch profile for user {user.id}") |
|
|
return jsonify({'error': 'Failed to retrieve user profile'}), 500 |
|
|
|
|
|
if profile_res.data.get('suspended'): |
|
|
logging.warning(f"User {user.id} account is suspended.") |
|
|
return jsonify({'error': 'Account suspended'}), 403 |
|
|
|
|
|
current_credits = profile_res.data.get('credits', 0) |
|
|
CREDIT_COST = 5 |
|
|
if current_credits < CREDIT_COST: |
|
|
logging.warning(f"User {user.id} has insufficient credits ({current_credits}/{CREDIT_COST}).") |
|
|
return jsonify({'error': f'Insufficient credits (Need {CREDIT_COST})'}), 402 |
|
|
|
|
|
|
|
|
existing_audio_url = note_res.data.get('tts_audio_url') |
|
|
if existing_audio_url: |
|
|
logging.info(f"Using existing audio URL for note {notes_id}: {existing_audio_url}") |
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'audio_url': existing_audio_url, |
|
|
'message': 'Using existing audio file', |
|
|
'remaining_credits': current_credits |
|
|
}) |
|
|
|
|
|
notes_content = note_res.data.get('content') |
|
|
if not notes_content or not notes_content.strip(): |
|
|
logging.warning(f"Note {notes_id} content is empty.") |
|
|
return jsonify({'error': 'Notes content is empty'}), 400 |
|
|
|
|
|
|
|
|
if len(notes_content) > 10000: |
|
|
logging.warning(f"Note {notes_id} content exceeds 10,000 character limit ({len(notes_content)} chars).") |
|
|
return jsonify({ |
|
|
'error': 'Content exceeds maximum length', |
|
|
'message': f'Note content is {len(notes_content)} characters. Maximum allowed is 10,000 characters.' |
|
|
}), 413 |
|
|
|
|
|
|
|
|
|
|
|
CHUNK_SIZE = 2500 |
|
|
text_chunks = [notes_content[i:i+CHUNK_SIZE] for i in range(0, len(notes_content), CHUNK_SIZE)] |
|
|
|
|
|
combined_audio_segment = None |
|
|
logging.info(f"Generating audio for note {notes_id} in {len(text_chunks)} chunks.") |
|
|
|
|
|
for i, chunk in enumerate(text_chunks): |
|
|
try: |
|
|
logging.debug(f"Generating audio for chunk {i+1}/{len(text_chunks)}...") |
|
|
|
|
|
|
|
|
chunk_audio_bytes = generate_tts_audio( |
|
|
text_to_speak=chunk.strip(), |
|
|
voice_id="Rachel" |
|
|
) |
|
|
|
|
|
if not chunk_audio_bytes: |
|
|
logging.warning(f"TTS generation returned empty audio for chunk {i+1} of note {notes_id}") |
|
|
continue |
|
|
|
|
|
|
|
|
segment = AudioSegment.from_file(BytesIO(chunk_audio_bytes), format="mp3") |
|
|
|
|
|
|
|
|
if combined_audio_segment is None: |
|
|
combined_audio_segment = segment |
|
|
else: |
|
|
combined_audio_segment += segment |
|
|
logging.debug(f"Successfully processed chunk {i+1}/{len(text_chunks)}") |
|
|
|
|
|
except ImportError as e: |
|
|
logging.error(f"pydub/ffmpeg error during chunk processing: {e}") |
|
|
raise e |
|
|
except Exception as e: |
|
|
logging.error(f"Error generating/processing audio chunk {i+1} for note {notes_id}: {str(e)}") |
|
|
|
|
|
raise RuntimeError(f"Audio generation/processing failed for chunk {i+1}: {str(e)}") |
|
|
|
|
|
if combined_audio_segment is None: |
|
|
|
|
|
logging.error(f"Failed to generate any audio content for note {notes_id}.") |
|
|
raise RuntimeError("Failed to generate any audio content.") |
|
|
|
|
|
|
|
|
output_bytes_io = BytesIO() |
|
|
combined_audio_segment.export(output_bytes_io, format="mp3") |
|
|
final_audio_bytes = output_bytes_io.getvalue() |
|
|
|
|
|
if not final_audio_bytes: |
|
|
logging.error(f"Generated empty final audio file after combining chunks for note {notes_id}.") |
|
|
raise RuntimeError("Generated empty final audio file after combining chunks.") |
|
|
|
|
|
logging.info(f"Audio generation complete for note {notes_id}. Total size: {len(final_audio_bytes)} bytes.") |
|
|
|
|
|
|
|
|
bucket_name = 'notes-audio' |
|
|
|
|
|
file_path = f'{user.id}/{str(notes_id)}.mp3' |
|
|
audio_url = None |
|
|
|
|
|
try: |
|
|
logging.info(f"Uploading audio to Supabase Storage: {bucket_name}/{file_path}") |
|
|
|
|
|
supabase.storage.from_(bucket_name).upload( |
|
|
path=file_path, |
|
|
file=final_audio_bytes, |
|
|
file_options={"content-type": "audio/mpeg", "upsert": "true"} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public_url_data = supabase.storage.from_(bucket_name).get_public_url(file_path) |
|
|
|
|
|
audio_url = public_url_data |
|
|
|
|
|
if not audio_url: |
|
|
|
|
|
logging.error(f"Upload to {file_path} seemed successful, but failed to get public URL.") |
|
|
raise ConnectionError("Failed to retrieve audio URL after upload.") |
|
|
|
|
|
logging.info(f"Audio uploaded successfully for note {notes_id}. URL: {audio_url}") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
logging.debug(f"Updating notes table for note {notes_id} with URL.") |
|
|
update_res = supabase.table('notes') \ |
|
|
.update({'tts_audio_url': audio_url}) \ |
|
|
.eq('id', str(notes_id)) \ |
|
|
.eq('user_id', user.id) \ |
|
|
.execute() |
|
|
|
|
|
|
|
|
if not update_res.data: |
|
|
logging.warning(f"Note update query executed for {notes_id} but no data returned (might be ok, or indicate issue).") |
|
|
|
|
|
|
|
|
|
|
|
new_credits = current_credits - CREDIT_COST |
|
|
logging.debug(f"Deducting {CREDIT_COST} credits for user {user.id}. New balance: {new_credits}") |
|
|
credit_res = supabase.table('profiles') \ |
|
|
.update({'credits': new_credits}) \ |
|
|
.eq('id', user.id) \ |
|
|
.execute() |
|
|
|
|
|
|
|
|
if not credit_res.data: |
|
|
|
|
|
logging.error(f"CRITICAL: Failed to deduct credits for user {user.id} after audio generation for note {notes_id}.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.info(f"Successfully updated database and deducted credits for note {notes_id}") |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'audio_url': audio_url, |
|
|
'remaining_credits': new_credits |
|
|
}) |
|
|
|
|
|
except Exception as db_error: |
|
|
|
|
|
logging.error(f"Database update/credit deduction failed for note {notes_id} AFTER upload: {str(db_error)}. URL was {audio_url}") |
|
|
logging.info(f"Attempting to clean up uploaded file: {file_path}") |
|
|
|
|
|
try: |
|
|
supabase.storage.from_(bucket_name).remove([file_path]) |
|
|
logging.info(f"Successfully cleaned up orphaned file: {file_path}") |
|
|
except Exception as cleanup_error: |
|
|
logging.error(f"Failed to clean up orphaned file {file_path} after DB error: {cleanup_error}") |
|
|
|
|
|
raise db_error |
|
|
|
|
|
|
|
|
except Exception as upload_db_error: |
|
|
|
|
|
logging.error(f"Error during upload or DB update phase for note {notes_id}: {str(upload_db_error)}") |
|
|
|
|
|
if audio_url: |
|
|
try: |
|
|
logging.info(f"Attempting cleanup for failed operation: {file_path}") |
|
|
supabase.storage.from_(bucket_name).remove([file_path]) |
|
|
logging.info(f"Cleanup successful for {file_path}") |
|
|
except Exception as cleanup_error: |
|
|
|
|
|
logging.error(f"Upload/DB error occurred, AND cleanup failed for {file_path}: {cleanup_error}") |
|
|
|
|
|
|
|
|
raise upload_db_error |
|
|
|
|
|
except ImportError as e: |
|
|
|
|
|
logging.error(f"Missing dependency error: {e}") |
|
|
return jsonify({'error': 'Server configuration error: Audio library (pydub/ffmpeg) missing or failed.'}), 500 |
|
|
except (RuntimeError, ConnectionError) as e: |
|
|
|
|
|
logging.error(f"Operation failed for note {notes_id}: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
except Exception as e: |
|
|
|
|
|
logging.error(f"Unexpected speak endpoint error for note {notes_id}: {traceback.format_exc()}") |
|
|
|
|
|
return jsonify({'error': 'An unexpected error occurred during audio generation.'}), 500 |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/tutor/notes/<uuid:notes_id>/audio', methods=['GET']) |
|
|
def get_note_audio(notes_id): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: |
|
|
return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
try: |
|
|
notes_res = supabase.table('notes').select('tts_audio_url, user_id').eq('id', notes_id).single().execute() |
|
|
|
|
|
if not notes_res.data: |
|
|
return jsonify({'error': 'Notes not found'}), 404 |
|
|
|
|
|
if notes_res.data['user_id'] != user.id: |
|
|
return jsonify({'error': 'Unauthorized access'}), 403 |
|
|
|
|
|
if not notes_res.data['tts_audio_url']: |
|
|
return jsonify({'error': 'No audio available for these notes'}), 404 |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'audio_url': notes_res.data['tts_audio_url'] |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Error getting audio URL: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/view/notes', methods=['GET']) |
|
|
def view_notes(): |
|
|
try: |
|
|
|
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
|
|
|
query = supabase.table('notes') \ |
|
|
.select('id, content, created_at, tts_audio_url, study_materials(title, type)') \ |
|
|
.eq('user_id', user.id) \ |
|
|
.order('created_at', desc=True) |
|
|
|
|
|
result = query.execute() |
|
|
|
|
|
if hasattr(result, 'error') and result.error: |
|
|
raise Exception(result.error.message) |
|
|
|
|
|
|
|
|
notes = [] |
|
|
for note in result.data: |
|
|
notes.append({ |
|
|
"note_id": note['id'], |
|
|
"content": note['content'], |
|
|
"audio_url": note['tts_audio_url'], |
|
|
"created_at": note['created_at'], |
|
|
"material_title": note['study_materials']['title'] if note['study_materials'] else "Untitled Note", |
|
|
"material_type": note['study_materials']['type'] if note['study_materials'] else None |
|
|
}) |
|
|
|
|
|
return jsonify({"notes": notes}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in /api/view/notes: {str(e)}") |
|
|
logging.error(f"Notes endpoint error: {str(e)}") |
|
|
logging.error(traceback.format_exc()) |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/view/quizzes', methods=['GET']) |
|
|
def view_quizzes(): |
|
|
try: |
|
|
|
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
|
|
|
query = supabase.table('quizzes') \ |
|
|
.select('id, difficulty, created_at, notes(content, study_materials(title, type))') \ |
|
|
.eq('user_id', user.id) \ |
|
|
.order('created_at', desc=True) |
|
|
|
|
|
result = query.execute() |
|
|
|
|
|
if hasattr(result, 'error') and result.error: |
|
|
raise Exception(result.error.message) |
|
|
|
|
|
|
|
|
quizzes = [] |
|
|
for quiz in result.data: |
|
|
quizzes.append({ |
|
|
"quiz_id": quiz['id'], |
|
|
"difficulty": quiz['difficulty'], |
|
|
"created_at": quiz['created_at'], |
|
|
"notes_preview": quiz['notes']['content'][:100] + "..." if quiz['notes'] and quiz['notes']['content'] else None, |
|
|
"material_title": quiz['notes']['study_materials']['title'] if quiz['notes'] and quiz['notes']['study_materials'] else "Untitled Quiz", |
|
|
"material_type": quiz['notes']['study_materials']['type'] if quiz['notes'] and quiz['notes']['study_materials'] else None |
|
|
}) |
|
|
|
|
|
return jsonify({"quizzes": quizzes}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in /api/view/quizzes: {str(e)}") |
|
|
logging.error(f"Quizzes endpoint error: {str(e)}") |
|
|
logging.error(traceback.format_exc()) |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/user/performance', methods=['GET']) |
|
|
def get_user_performance(): |
|
|
"""Retrieves user's quiz performance and provides simple suggestions.""" |
|
|
try: |
|
|
|
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: |
|
|
return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
|
|
|
attempts_res = supabase.table('quiz_attempts') \ |
|
|
.select('id, quiz_id, score, submitted_at, quizzes(id, difficulty, created_at, notes(study_materials(title)))') \ |
|
|
.eq('user_id', user.id) \ |
|
|
.order('submitted_at', desc=True) \ |
|
|
.execute() |
|
|
|
|
|
if hasattr(attempts_res, 'error') and attempts_res.error: |
|
|
raise Exception(attempts_res.error.message) |
|
|
|
|
|
attempts_data = attempts_res.data |
|
|
|
|
|
|
|
|
quizzes = {} |
|
|
for attempt in attempts_data: |
|
|
quiz_id = attempt['quizzes']['id'] |
|
|
if quiz_id not in quizzes: |
|
|
quizzes[quiz_id] = { |
|
|
'quiz_info': { |
|
|
'id': quiz_id, |
|
|
'title': attempt['quizzes']['notes']['study_materials']['title'], |
|
|
'difficulty': attempt['quizzes']['difficulty'], |
|
|
'created_at': attempt['quizzes']['created_at'] |
|
|
}, |
|
|
'attempts': [] |
|
|
} |
|
|
quizzes[quiz_id]['attempts'].append(attempt) |
|
|
|
|
|
|
|
|
performance_data = [] |
|
|
overall_scores = [] |
|
|
|
|
|
for quiz_id, quiz_data in quizzes.items(): |
|
|
scores = [a['score'] for a in quiz_data['attempts']] |
|
|
avg_score = sum(scores) / len(scores) if scores else 0 |
|
|
overall_scores.extend(scores) |
|
|
|
|
|
performance_data.append({ |
|
|
**quiz_data, |
|
|
'average_score': avg_score, |
|
|
'attempt_count': len(scores) |
|
|
}) |
|
|
|
|
|
|
|
|
average_score = sum(overall_scores) / len(overall_scores) if overall_scores else 0 |
|
|
|
|
|
|
|
|
suggestions = [] |
|
|
if performance_data: |
|
|
if average_score < 60: |
|
|
suggestions.append("Your average score is a bit low. Try reviewing the notes more thoroughly before taking quizzes.") |
|
|
|
|
|
weakest_quiz = min(performance_data, key=lambda x: x['average_score']) |
|
|
suggestions.append(f"Focus on: '{weakest_quiz['quiz_info']['title']}' (current average: {weakest_quiz['average_score']:.0f}%)") |
|
|
elif average_score > 85: |
|
|
suggestions.append("Great job! Try some 'hard' difficulty quizzes.") |
|
|
else: |
|
|
suggestions.append("You're making good progress! Keep practicing.") |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'average_score': round(average_score, 2), |
|
|
'quizzes': performance_data, |
|
|
'suggestions': suggestions |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Performance endpoint error: {str(e)}") |
|
|
logging.error(traceback.format_exc()) |
|
|
return jsonify({'error': 'Internal server error'}), 500 |
|
|
|
|
|
def generate_suggestions(quizzes, overall_avg): |
|
|
"""Generate personalized suggestions based on quiz performance""" |
|
|
suggestions = [] |
|
|
|
|
|
if overall_avg < 60: |
|
|
suggestions.append("Your average score is a bit low. Try reviewing notes before retaking quizzes.") |
|
|
elif overall_avg > 85: |
|
|
suggestions.append("Great job! Challenge yourself with harder difficulty levels.") |
|
|
else: |
|
|
suggestions.append("Keep practicing! Focus on your weaker areas for improvement.") |
|
|
|
|
|
|
|
|
weakest = min(quizzes, key=lambda x: x['average_score'], default=None) |
|
|
if weakest and weakest['average_score'] < 60: |
|
|
title = weakest['quiz_info']['title'] or "your recent quizzes" |
|
|
suggestions.append(f"Focus on improving in '{title}' (current average: {weakest['average_score']:.0f}%).") |
|
|
|
|
|
|
|
|
difficulty_count = {} |
|
|
for quiz in quizzes: |
|
|
diff = quiz['quiz_info']['difficulty'] |
|
|
difficulty_count[diff] = difficulty_count.get(diff, 0) + 1 |
|
|
|
|
|
if difficulty_count.get('easy', 0) / len(quizzes) > 0.7: |
|
|
suggestions.append("Try more medium difficulty quizzes to push your skills!") |
|
|
|
|
|
return suggestions |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/admin/users', methods=['GET']) |
|
|
def admin_list_users(): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
is_admin, admin_error = verify_admin(user) |
|
|
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] |
|
|
|
|
|
try: |
|
|
|
|
|
profiles_res = supabase.table('profiles').select('*').execute() |
|
|
return jsonify({'users': profiles_res.data}), 200 |
|
|
except Exception as e: |
|
|
logging.error(f"Admin list users error: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/admin/users/<uuid:target_user_id>/suspend', methods=['PUT']) |
|
|
def admin_suspend_user(target_user_id): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
is_admin, admin_error = verify_admin(user) |
|
|
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
action = data.get('action') |
|
|
|
|
|
if action not in ["suspend", "unsuspend"]: |
|
|
return jsonify({'error': 'action must be "suspend" or "unsuspend"'}), 400 |
|
|
|
|
|
should_suspend = (action == "suspend") |
|
|
|
|
|
|
|
|
update_res = supabase.table('profiles').update({'suspended': should_suspend}).eq('id', target_user_id).execute() |
|
|
|
|
|
if not update_res.data: |
|
|
|
|
|
|
|
|
user_check = supabase.table('profiles').select('id').eq('id', target_user_id).maybe_single().execute() |
|
|
if not user_check.data: |
|
|
return jsonify({'error': 'User not found'}), 404 |
|
|
else: |
|
|
raise Exception(f"Failed to update suspension status: {update_res.error}") |
|
|
|
|
|
|
|
|
|
|
|
return jsonify({'success': True, 'message': f'User {target_user_id} suspension status set to {should_suspend}'}), 200 |
|
|
except Exception as e: |
|
|
logging.error(f"Admin suspend user error: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/api/user/credits/request', methods=['POST']) |
|
|
def request_credits(): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
amount = data.get('amount') |
|
|
note = data.get('note', '') |
|
|
|
|
|
if not amount or not isinstance(amount, int) or amount <= 0: |
|
|
return jsonify({'error': 'Invalid amount (must be positive integer)'}), 400 |
|
|
|
|
|
res = supabase.table('credit_requests').insert({ |
|
|
'user_id': user.id, |
|
|
'amount': amount, |
|
|
'status': 'pending', |
|
|
'note': note, |
|
|
'created_at': datetime.now().isoformat() |
|
|
}).execute() |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'request_id': res.data[0]['id'] |
|
|
}), 201 |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Credit request failed for user {user.id}: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/admin/credit-requests', methods=['GET']) |
|
|
def admin_get_credit_requests(): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
is_admin, admin_error = verify_admin(user) |
|
|
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] |
|
|
|
|
|
try: |
|
|
status = request.args.get('status', 'pending') |
|
|
res = supabase.table('credit_requests').select('*').eq('status', status).execute() |
|
|
return jsonify(res.data), 200 |
|
|
except Exception as e: |
|
|
logging.error(f"Admin credit requests fetch failed: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/admin/credit-requests/<uuid:request_id>', methods=['PUT']) |
|
|
def admin_review_credit_request(request_id): |
|
|
user, error = verify_token(request.headers.get('Authorization')) |
|
|
if error: return jsonify({'error': error['error']}), error['status'] |
|
|
is_admin, admin_error = verify_admin(user) |
|
|
if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
action = data.get('action') |
|
|
admin_note = data.get('note', '') |
|
|
|
|
|
if action not in ['approve', 'decline']: |
|
|
return jsonify({'error': 'Invalid action'}), 400 |
|
|
|
|
|
req_res = supabase.table('credit_requests').select('*').eq('id', request_id).maybe_single().execute() |
|
|
if not req_res.data: |
|
|
return jsonify({'error': 'Request not found'}), 404 |
|
|
|
|
|
req = req_res.data |
|
|
if req['status'] != 'pending': |
|
|
return jsonify({'error': 'Request already processed'}), 400 |
|
|
|
|
|
update_data = { |
|
|
'status': 'approved' if action == 'approve' else 'declined', |
|
|
'reviewed_at': datetime.now().isoformat(), |
|
|
'reviewed_by': user.id, |
|
|
'admin_note': admin_note |
|
|
} |
|
|
|
|
|
if action == 'approve': |
|
|
supabase.table('profiles').update( |
|
|
{'credits': supabase.table('profiles').credits + req['amount']} |
|
|
).eq('id', req['user_id']).execute() |
|
|
|
|
|
supabase.table('credit_requests').update(update_data).eq('id', request_id).execute() |
|
|
|
|
|
return jsonify({'success': True}), 200 |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Credit request processing failed: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
if not all([SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY]): |
|
|
print("WARNING: One or more essential environment variables (SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY) are missing!") |
|
|
print("Starting Flask server for AI Tutor...") |
|
|
|
|
|
app.run(debug=True, host="0.0.0.0", port=7860) |