| |
| import io |
| import uuid |
| import re |
| import time |
| import tempfile |
| import requests |
| import json |
| import os |
| import logging |
| import traceback |
| from datetime import datetime |
| from pathlib import Path |
| import urllib.parse |
|
|
| from flask import Flask, request, jsonify, send_file, Response |
| from flask_cors import CORS |
| from supabase import create_client, Client |
|
|
| |
| import google.generativeai as genai |
| from elevenlabs.client import ElevenLabs |
| from elevenlabs import save as save_elevenlabs_audio |
| from PyPDF2 import PdfReader |
| import wikipedia |
| from youtube_transcript_api import YouTubeTranscriptApi |
| import arxiv |
| from elevenlabs import play, stream, save |
| import math |
|
|
| |
| |
| from dotenv import load_dotenv |
| load_dotenv() |
|
|
| SUPABASE_URL = os.getenv("SUPABASE_URL") |
| SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") |
| SUPABASE_ANON_KEY = os.getenv("SUPABASE_ANON_KEY") |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
| ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") |
|
|
| |
| app = Flask(__name__) |
| CORS(app) |
|
|
| |
| try: |
| if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: |
| raise ValueError("Supabase URL and Service Key must be set in environment variables.") |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) |
| print("Supabase client initialized successfully.") |
| |
| |
| |
| except Exception as e: |
| print(f"Error initializing Supabase client: {e}") |
| |
| supabase = None |
|
|
| |
| try: |
| if not GEMINI_API_KEY: |
| raise ValueError("Gemini API Key must be set in environment variables.") |
| genai.configure(api_key=GEMINI_API_KEY) |
| |
| gemini_model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp') |
| print("Gemini API initialized successfully.") |
| except Exception as e: |
| print(f"Error initializing Gemini API: {e}") |
| gemini_model = None |
|
|
| |
| try: |
| if not ELEVENLABS_API_KEY: |
| raise ValueError("ElevenLabs API Key must be set in environment variables.") |
| elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) |
| print("ElevenLabs client initialized successfully.") |
| |
| |
| |
| except Exception as e: |
| print(f"Error initializing ElevenLabs client: {e}") |
| elevenlabs_client = None |
|
|
| |
| LOG_FILE_PATH = "/tmp/ai_tutor.log" |
| logging.basicConfig(filename=LOG_FILE_PATH, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
|
|
|
|
| |
|
|
| def verify_token(auth_header): |
| """Verifies Supabase JWT token from Authorization header.""" |
| if not supabase: |
| raise ConnectionError("Supabase client not initialized.") |
| if not auth_header or not auth_header.startswith('Bearer '): |
| return None, {'error': 'Missing or invalid Authorization header', 'status': 401} |
|
|
| token = auth_header.split(' ')[1] |
| try: |
| |
| response = supabase.auth.get_user(token) |
| user = response.user |
| if not user: |
| return None, {'error': 'Invalid or expired token', 'status': 401} |
| |
| |
| return user, None |
| except Exception as e: |
| logging.error(f"Token verification error: {e}") |
| |
| return None, {'error': f'Token verification failed: {e}', 'status': 401} |
|
|
| def verify_admin(user): |
| """Checks if the verified user is an admin.""" |
| if not supabase: |
| raise ConnectionError("Supabase client not initialized.") |
| if not user: |
| return False, {'error': 'User not provided for admin check', 'status': 400} |
| try: |
| |
| profile_res = supabase.table('profiles').select('is_admin').eq('id', user.id).maybe_single().execute() |
| profile_data = profile_res.data |
| if profile_data and profile_data.get('is_admin'): |
| return True, None |
| else: |
| return False, {'error': 'Admin access required', 'status': 403} |
| except Exception as e: |
| logging.error(f"Admin check failed for user {user.id}: {e}") |
| return False, {'error': f'Error checking admin status: {e}', 'status': 500} |
|
|
|
|
| def upload_to_supabase_storage(bucket_name: str, file_path: str, destination_path: str, content_type: str): |
| """Uploads a local file to Supabase Storage.""" |
| if not supabase: |
| raise ConnectionError("Supabase client not initialized.") |
| try: |
| with open(file_path, 'rb') as f: |
| |
| supabase.storage.from_(bucket_name).upload( |
| path=destination_path, |
| file=f, |
| file_options={"content-type": content_type, "cache-control": "3600", "upsert": "true"} |
| ) |
| |
| res = supabase.storage.from_(bucket_name).get_public_url(destination_path) |
| return res |
| except Exception as e: |
| logging.error(f"Supabase Storage upload failed: {e}") |
| raise |
|
|
| |
|
|
| def get_pdf_text(pdf_file_storage): |
| """Extract text from a PDF file stream.""" |
| text = "" |
| try: |
| pdf_reader = PdfReader(pdf_file_storage) |
| for page in pdf_reader.pages: |
| page_text = page.extract_text() |
| if page_text: |
| text += page_text + "\n" |
| |
| MAX_CHARS = 300000 |
| return text[:MAX_CHARS] |
| except Exception as e: |
| logging.error(f"Error reading PDF: {e}") |
| raise ValueError(f"Could not process PDF file: {e}") |
|
|
| def get_youtube_transcript(url): |
| """Get transcript text from a YouTube URL.""" |
| try: |
| if "v=" in url: |
| video_id = url.split("v=")[1].split("&")[0] |
| elif "youtu.be/" in url: |
| video_id = url.split("youtu.be/")[1].split("?")[0] |
| else: |
| raise ValueError("Invalid YouTube URL format.") |
|
|
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) |
| transcript_text = " ".join([item['text'] for item in transcript_list]) |
| MAX_CHARS = 300000 |
| return transcript_text[:MAX_CHARS] |
| except Exception as e: |
| logging.error(f"Error getting YouTube transcript for {url}: {e}") |
| raise ValueError(f"Could not get transcript: {e}") |
|
|
| def get_wiki_content(url): |
| """Get summary content from a Wikipedia URL.""" |
| try: |
| |
| page_title = urllib.parse.unquote(url.rstrip("/").split("/")[-1]).replace("_", " ") |
| wikipedia.set_lang("en") |
| page = wikipedia.page(page_title, auto_suggest=False) |
| content = page.content |
| |
| MAX_CHARS = 300000 |
| return content[:MAX_CHARS] |
| except wikipedia.exceptions.PageError: |
| raise ValueError(f"Wikipedia page '{page_title}' not found.") |
| except wikipedia.exceptions.DisambiguationError as e: |
| raise ValueError(f"'{page_title}' refers to multiple pages: {e.options}") |
| except Exception as e: |
| logging.error(f"Error getting Wikipedia content for {url}: {e}") |
| raise ValueError(f"Could not get Wikipedia content: {e}") |
|
|
| def fetch_bible_text(reference): |
| """Fetch Bible text from an external API (example using bible-api.com).""" |
| |
| try: |
| |
| query = urllib.parse.quote(reference) |
| api_url = f"https://bible-api.com/{query}?translation=kjv" |
| response = requests.get(api_url, timeout=15) |
| response.raise_for_status() |
| data = response.json() |
| |
| if 'text' in data: |
| text = data['text'].strip() |
| MAX_CHARS = 300000 |
| return text[:MAX_CHARS] |
| elif 'error' in data: |
| raise ValueError(f"Bible API error: {data['error']}") |
| else: |
| |
| if 'verses' in data and isinstance(data['verses'], list): |
| text = " ".join([v.get('text', '').strip() for v in data['verses']]) |
| MAX_CHARS = 300000 |
| return text[:MAX_CHARS] if text else ValueError("Bible reference not found or empty.") |
| else: |
| raise ValueError("Bible API response format not recognized.") |
|
|
| except requests.exceptions.RequestException as e: |
| logging.error(f"Error fetching Bible text for '{reference}': {e}") |
| raise ConnectionError(f"Could not connect to Bible API: {e}") |
| except Exception as e: |
| logging.error(f"Error processing Bible reference '{reference}': {e}") |
| raise ValueError(f"Could not process Bible reference: {e}") |
|
|
|
|
| def get_arxiv_content(arxiv_id): |
| """Fetch abstract or PDF text from ArXiv.""" |
| try: |
| |
| if 'arxiv.org/abs/' in arxiv_id: |
| arxiv_id = arxiv_id.split('/abs/')[-1] |
| if 'arxiv.org/pdf/' in arxiv_id: |
| arxiv_id = arxiv_id.split('/pdf/')[-1].replace('.pdf', '') |
|
|
| search = arxiv.Search(id_list=[arxiv_id]) |
| paper = next(search.results()) |
|
|
| |
| content = f"Title: {paper.title}\n\nAbstract: {paper.summary}" |
| MAX_CHARS = 300000 |
| return content[:MAX_CHARS], paper.title |
| except StopIteration: |
| raise ValueError(f"ArXiv paper with ID '{arxiv_id}' not found.") |
| except Exception as e: |
| logging.error(f"Error fetching ArXiv content for {arxiv_id}: {e}") |
| raise ValueError(f"Could not get ArXiv content: {e}") |
|
|
|
|
| |
|
|
| def generate_notes_with_gemini(text_content, title=None): |
| """Generates study notes using Gemini.""" |
| if not gemini_model: |
| raise ConnectionError("Gemini client not initialized.") |
| try: |
| prompt = f""" |
| Act as an expert educator and study assistant. Based on the following text {'titled "' + title + '" ' if title else ''} , generate comprehensive and well-structured study notes. |
| |
| **Instructions:** |
| 1. **Identify Key Concepts:** Extract the main topics, definitions, key figures, dates, arguments, and important takeaways. |
| 2. **Structure Logically:** Organize the notes with clear headings (using Markdown ##) and bullet points (* or -) for readability. Use sub-bullets if necessary. |
| 3. **Be Concise but Thorough:** Summarize the information accurately without unnecessary jargon. Ensure all critical points are covered. |
| 4. **Highlight Importance:** You can use bold text (**bold**) for very important terms or concepts. |
| 5. **Focus:** Generate only the notes based on the provided text. Do not add introductions like "Here are the notes..." or conclusions like "These notes cover...". |
| |
| **Source Text:** |
| --- |
| {text_content} |
| --- |
| |
| **Generated Study Notes:** |
| """ |
| response = gemini_model.generate_content(prompt) |
| return response.text.strip() |
| except Exception as e: |
| logging.error(f"Gemini note generation failed: {e}") |
| raise RuntimeError(f"AI failed to generate notes: {e}") |
|
|
| def generate_quiz_with_gemini(notes_content, difficulty, num_questions=5): |
| """Generates multiple-choice quiz using Gemini.""" |
| if not gemini_model: |
| raise ConnectionError("Gemini client not initialized.") |
|
|
| difficulty_map = { |
| "easy": "basic recall and understanding", |
| "medium": "application and interpretation", |
| "hard": "analysis, synthesis, and evaluation" |
| } |
| difficulty_desc = difficulty_map.get(difficulty.lower(), "medium difficulty") |
|
|
| try: |
| prompt = f""" |
| Act as an expert quiz creator. Based on the following study notes, create a multiple-choice quiz. |
| |
| **Instructions:** |
| 1. **Number of Questions:** Generate exactly {num_questions} questions. |
| 2. **Difficulty Level:** The questions should be of {difficulty_desc} ({difficulty}). |
| 3. **Format:** Each question must have exactly four options (A, B, C, D). |
| 4. **Clarity:** Questions and options should be clear and unambiguous. |
| 5. **Single Correct Answer:** Ensure only one option is the correct answer. |
| 6. **JSON Output:** Format the entire output STRICTLY as a JSON list of objects. Each object must have the following keys: "question" (string), "options" (an object with keys "A", "B", "C", "D", all strings), and "correct_answer" (string, either "A", "B", "C", or "D"). |
| 7. **Focus:** Generate only the JSON output. Do not include any introductory text, explanations, or markdown formatting outside the JSON structure. |
| |
| **Study Notes:** |
| --- |
| {notes_content} |
| --- |
| |
| **Quiz JSON Output:** |
| ```json |
| [ |
| {{ |
| "question": "...", |
| "options": {{ |
| "A": "...", |
| "B": "...", |
| "C": "...", |
| "D": "..." |
| }}, |
| "correct_answer": "..." |
| }} |
| // ... more question objects |
| ] |
| ``` |
| """ |
| response = gemini_model.generate_content(prompt) |
| |
| cleaned_response = response.text.strip().lstrip('```json').rstrip('```').strip() |
| |
| quiz_data = json.loads(cleaned_response) |
| |
| if not isinstance(quiz_data, list): |
| raise ValueError("AI response is not a list.") |
| if quiz_data and not all(k in quiz_data[0] for k in ["question", "options", "correct_answer"]): |
| raise ValueError("AI response list items have missing keys.") |
| return quiz_data |
| except json.JSONDecodeError as e: |
| logging.error(f"Gemini quiz generation returned invalid JSON: {cleaned_response[:500]}... Error: {e}") |
| raise RuntimeError(f"AI failed to generate a valid quiz format. Please try again.") |
| except Exception as e: |
| logging.error(f"Gemini quiz generation failed: {e}") |
| raise RuntimeError(f"AI failed to generate quiz: {e}") |
|
|
|
|
| |
|
|
| def generate_tts_audio(text_to_speak, voice_id="Rachel"): |
| """Generates TTS audio using ElevenLabs and returns audio bytes.""" |
| if not elevenlabs_client: |
| raise ConnectionError("ElevenLabs client not initialized.") |
| try: |
| |
| audio_stream = elevenlabs_client.generate( |
| text=text_to_speak, |
| voice=voice_id, |
| model="eleven_multilingual_v2", |
| stream=True |
| ) |
|
|
| |
| audio_bytes = b"" |
| for chunk in audio_stream: |
| audio_bytes += chunk |
|
|
| if not audio_bytes: |
| raise ValueError("ElevenLabs generated empty audio.") |
|
|
| return audio_bytes |
|
|
| except Exception as e: |
| logging.error(f"ElevenLabs TTS generation failed: {e}") |
| raise RuntimeError(f"Failed to generate audio: {e}") |
|
|
|
|
| |
|
|
|
|
| @app.route('/api/auth/signup', methods=['POST']) |
| def signup(): |
| if not supabase: return jsonify({'error': 'Service unavailable'}), 503 |
| try: |
| data = request.get_json() |
| email = data.get('email') |
| password = data.get('password') |
| if not email or not password: |
| return jsonify({'error': 'Email and password are required'}), 400 |
|
|
| res = supabase.auth.sign_up({"email": email, "password": password}) |
|
|
| |
| supabase.table('profiles').upsert({ |
| 'id': res.user.id, |
| 'email': email, |
| 'credits': 20 |
| }).execute() |
|
|
| return jsonify({ |
| 'success': True, |
| 'message': 'Signup successful. Please check your email for verification.', |
| 'user_id': res.user.id if res.user else None |
| }), 201 |
|
|
| except Exception as e: |
| error_message = str(e) |
| status_code = 400 |
| if "User already registered" in error_message: |
| error_message = "Email already exists." |
| status_code = 409 |
| logging.error(f"Signup error: {error_message}") |
| return jsonify({'error': error_message}), status_code |
|
|
| @app.route('/api/auth/signin', methods=['POST']) |
| def signin(): |
| if not supabase: return jsonify({'error': 'Service unavailable'}), 503 |
| try: |
| data = request.get_json() |
| email = data.get('email') |
| password = data.get('password') |
| if not email or not password: |
| return jsonify({'error': 'Email and password are required'}), 400 |
|
|
| |
| res = supabase.auth.sign_in_with_password({"email": email, "password": password}) |
|
|
| |
| profile_res = supabase.table('profiles').select('*').eq('id', res.user.id).maybe_single().execute() |
|
|
| return jsonify({ |
| 'success': True, |
| 'access_token': res.session.access_token, |
| 'refresh_token': res.session.refresh_token, |
| 'user': { |
| 'id': res.user.id, |
| 'email': res.user.email, |
| 'profile': profile_res.data |
| } |
| }), 200 |
|
|
| except Exception as e: |
| |
| error_message = str(e) |
| status_code = 401 |
| if "Invalid login credentials" in error_message: |
| error_message = "Invalid email or password." |
| elif "Email not confirmed" in error_message: |
| error_message = "Please verify your email address before signing in." |
| status_code = 403 |
| logging.error(f"Signin error: {error_message}") |
| return jsonify({'error': error_message}), status_code |
|
|
| @app.route('/api/auth/google-signin', methods=['POST']) |
| def google_signin(): |
|
|
|
|
| |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: |
| return jsonify({'error': error['error']}), error['status'] |
|
|
| try: |
| |
| profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute() |
|
|
| if not profile_res.data: |
| |
| logging.warning(f"Google Sign-In: Profile not found for verified user {user.id}, attempting to create.") |
| |
| insert_res = supabase.table('profiles').insert({ |
| 'id': user.id, |
| 'email': user.email, |
| |
| }).execute() |
| profile_data = insert_res.data[0] if insert_res.data else None |
| if not profile_data: |
| raise Exception("Failed to create profile entry after Google Sign-In.") |
| else: |
| profile_data = profile_res.data |
|
|
|
|
| |
| return jsonify({ |
| 'success': True, |
| 'message': 'Google sign-in verified successfully.', |
| 'user': { |
| 'id': user.id, |
| 'email': user.email, |
| 'profile': profile_data |
| } |
| }), 200 |
|
|
| except Exception as e: |
| logging.error(f"Google sign-in profile fetch/creation error: {e}") |
| return jsonify({'error': f'An error occurred during sign-in: {e}'}), 500 |
|
|
|
|
| |
|
|
| @app.route('/api/user/profile', methods=['GET']) |
| def get_user_profile(): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: |
| return jsonify({'error': error['error']}), error['status'] |
|
|
| try: |
| |
| profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute() |
|
|
| if not profile_res.data: |
| |
| logging.error(f"Profile not found for authenticated user: {user.id} / {user.email}") |
| return jsonify({'error': 'User profile not found.'}), 404 |
|
|
| |
| profile_data = profile_res.data |
| full_user_data = { |
| 'id': user.id, |
| 'email': user.email, |
| 'credits': profile_data.get('credits'), |
| 'is_admin': profile_data.get('is_admin'), |
| 'created_at': profile_data.get('created_at'), |
| 'suspended': profile_data.get('suspended') |
| |
| } |
|
|
| return jsonify(full_user_data), 200 |
|
|
| except Exception as e: |
| logging.error(f"Error fetching user profile for {user.id}: {e}") |
| return jsonify({'error': f'Failed to fetch profile: {e}'}), 500 |
|
|
|
|
| |
|
|
| @app.route('/api/tutor/process_input', methods=['POST']) |
| def process_input_and_generate_notes(): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
| profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute() |
| if profile_res.data['suspended']: |
| return jsonify({'error': 'Account suspended'}), 403 |
| if profile_res.data['credits'] < 2: |
| return jsonify({'error': 'Insufficient credits (Need 2)'}), 402 |
| |
|
|
| try: |
| input_type = request.form.get('input_type') |
| source_ref = request.form.get('source_ref') |
| uploaded_file = request.files.get('file') |
|
|
| if not input_type: |
| return jsonify({'error': 'input_type (e.g., pdf, youtube, wiki, bible, arxiv, text) is required'}), 400 |
|
|
| content = None |
| title = None |
|
|
| if input_type == 'pdf': |
| if not uploaded_file: return jsonify({'error': 'File is required for input_type pdf'}), 400 |
| if not uploaded_file.filename.lower().endswith('.pdf'): return jsonify({'error': 'Only PDF files are allowed'}), 400 |
| content = get_pdf_text(uploaded_file.stream) |
| source_ref = uploaded_file.filename |
| title = uploaded_file.filename |
| elif input_type == 'youtube': |
| if not source_ref: return jsonify({'error': 'source_ref (YouTube URL) is required'}), 400 |
| content = get_youtube_transcript(source_ref) |
| |
| elif input_type == 'wiki': |
| if not source_ref: return jsonify({'error': 'source_ref (Wikipedia URL) is required'}), 400 |
| content = get_wiki_content(source_ref) |
| title = urllib.parse.unquote(source_ref.rstrip("/").split("/")[-1]).replace("_", " ") |
| elif input_type == 'bible': |
| if not source_ref: return jsonify({'error': 'source_ref (Bible reference) is required'}), 400 |
| content = fetch_bible_text(source_ref) |
| title = source_ref |
| elif input_type == 'arxiv': |
| if not source_ref: return jsonify({'error': 'source_ref (ArXiv ID or URL) is required'}), 400 |
| content, title = get_arxiv_content(source_ref) |
| elif input_type == 'text': |
| content = request.form.get('text_content') |
| if not content: return jsonify({'error': 'text_content is required for input_type text'}), 400 |
| source_ref = content[:100] + "..." |
| title = "Custom Text" |
| else: |
| return jsonify({'error': f'Unsupported input_type: {input_type}'}), 400 |
|
|
| if not content: |
| return jsonify({'error': 'Failed to extract content from the source.'}), 500 |
|
|
| |
| start_time = time.time() |
| logging.info(f"Generating notes for user {user.id}, type: {input_type}, ref: {source_ref[:50]}") |
| generated_notes = generate_notes_with_gemini(content, title=title) |
| logging.info(f"Notes generation took {time.time() - start_time:.2f}s") |
|
|
| |
| |
| material_res = supabase.table('study_materials').insert({ |
| 'user_id': user.id, |
| 'type': input_type, |
| 'source_ref': source_ref, |
| 'source_content': content if len(content) < 10000 else content[:10000] + "... (truncated)", |
| 'title': title |
| }).execute() |
| if not material_res.data: raise Exception(f"Failed to save study material: {material_res.error}") |
| material_id = material_res.data[0]['id'] |
|
|
| |
| notes_res = supabase.table('notes').insert({ |
| 'material_id': material_id, |
| 'user_id': user.id, |
| 'content': generated_notes |
| }).execute() |
| if not notes_res.data: raise Exception(f"Failed to save generated notes: {notes_res.error}") |
| notes_id = notes_res.data[0]['id'] |
|
|
| |
| new_credits = profile_res.data['credits'] - 2 |
| supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute() |
|
|
|
|
| return jsonify({ |
| 'success': True, |
| 'message': 'Content processed and notes generated successfully.', |
| 'material_id': material_id, |
| 'notes_id': notes_id, |
| 'notes': generated_notes |
| }), 201 |
|
|
| except ValueError as e: |
| logging.warning(f"Input processing error for user {user.id}: {e}") |
| return jsonify({'error': str(e)}), 400 |
| except ConnectionError as e: |
| logging.error(f"Connection error during processing: {e}") |
| return jsonify({'error': f'A backend service is unavailable: {e}'}), 503 |
| except RuntimeError as e: |
| logging.error(f"RuntimeError during processing for user {user.id}: {e}") |
| return jsonify({'error': str(e)}), 500 |
| except Exception as e: |
| logging.error(f"Unexpected error processing input for user {user.id}: {traceback.format_exc()}") |
| return jsonify({'error': f'An unexpected error occurred: {e}'}), 500 |
|
|
| @app.route('/api/view/notes/<uuid:note_id>', methods=['GET']) |
| def get_note_by_id(note_id): |
| try: |
| |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: |
| return jsonify({'error': error['error']}), error['status'] |
| |
| |
| note_res = supabase.table('notes') \ |
| .select('id, content, created_at, tts_audio_url, study_materials(title, type, source_ref)') \ |
| .eq('id', note_id) \ |
| .eq('user_id', user.id) \ |
| .maybe_single() \ |
| .execute() |
| |
| if not note_res.data: |
| return jsonify({'error': 'Note not found or unauthorized'}), 404 |
| |
| |
| note_data = note_res.data |
| response_data = { |
| "note": { |
| "note_id": note_data['id'], |
| "content": note_data['content'], |
| "audio_url": note_data['tts_audio_url'], |
| "created_at": note_data['created_at'], |
| "material": { |
| "title": note_data['study_materials']['title'] if note_data['study_materials'] else "Untitled", |
| "type": note_data['study_materials']['type'] if note_data['study_materials'] else None, |
| "source_ref": note_data['study_materials']['source_ref'] if note_data['study_materials'] else None |
| } |
| } |
| } |
| |
| return jsonify(response_data) |
| |
| except Exception as e: |
| logging.error(f"Error fetching note {note_id}: {str(e)}") |
| return jsonify({'error': 'Internal server error'}), 500 |
|
|
| @app.route('/api/tutor/notes/<uuid:notes_id>/generate_quiz', methods=['POST']) |
| def generate_quiz_for_notes(notes_id): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
| profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute() |
| if profile_res.data['suspended']: |
| return jsonify({'error': 'Account suspended'}), 403 |
| if profile_res.data['credits'] < 2: |
| return jsonify({'error': 'Insufficient credits (Need 2)'}), 402 |
|
|
|
|
| try: |
| data = request.get_json() |
| difficulty = data.get('difficulty', 'medium').lower() |
| num_questions = int(data.get('num_questions', 5)) |
|
|
| if difficulty not in ['easy', 'medium', 'hard']: |
| return jsonify({'error': 'difficulty must be easy, medium, or hard'}), 400 |
| if not 1 <= num_questions <= 10: |
| return jsonify({'error': 'num_questions must be between 1 and 10'}), 400 |
|
|
| |
| notes_res = supabase.table('notes').select('content, user_id').eq('id', notes_id).maybe_single().execute() |
| if not notes_res.data: |
| return jsonify({'error': 'Notes not found'}), 404 |
| |
| if notes_res.data['user_id'] != user.id: |
| return jsonify({'error': 'You do not have permission to access these notes'}), 403 |
|
|
| notes_content = notes_res.data['content'] |
|
|
| |
| start_time = time.time() |
| logging.info(f"Generating {difficulty} quiz ({num_questions}q) for user {user.id}, notes: {notes_id}") |
| quiz_questions = generate_quiz_with_gemini(notes_content, difficulty, num_questions) |
| logging.info(f"Quiz generation took {time.time() - start_time:.2f}s") |
|
|
| |
| quiz_res = supabase.table('quizzes').insert({ |
| 'notes_id': str(notes_id), |
| 'user_id': user.id, |
| 'difficulty': difficulty, |
| 'questions': json.dumps(quiz_questions) |
| }).execute() |
| if not quiz_res.data: raise Exception(f"Failed to save generated quiz: {quiz_res.error}") |
| quiz_id = quiz_res.data[0]['id'] |
|
|
| new_credits = profile_res.data['credits'] - 2 |
| supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute() |
|
|
| return jsonify({ |
| 'success': True, |
| 'quiz_id': quiz_id, |
| 'difficulty': difficulty, |
| 'questions': quiz_questions |
| }), 201 |
|
|
| except ValueError as e: |
| return jsonify({'error': str(e)}), 400 |
| except ConnectionError as e: |
| logging.error(f"Connection error during quiz generation: {e}") |
| return jsonify({'error': f'A backend service is unavailable: {e}'}), 503 |
| except RuntimeError as e: |
| logging.error(f"RuntimeError during quiz generation for user {user.id}: {e}") |
| return jsonify({'error': str(e)}), 500 |
| except Exception as e: |
| logging.error(f"Unexpected error generating quiz for user {user.id}, notes {notes_id}: {traceback.format_exc()}") |
| return jsonify({'error': f'An unexpected error occurred: {e}'}), 500 |
|
|
| @app.route('/api/view/quizzes/<uuid:quiz_id>', methods=['GET']) |
| def get_quiz_by_id(quiz_id): |
| try: |
| |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: |
| return jsonify({'error': error['error']}), error['status'] |
| |
| |
| quiz_res = supabase.table('quizzes') \ |
| .select('''id, difficulty, created_at, questions, |
| notes(id, content, study_materials(title, type))''') \ |
| .eq('id', quiz_id) \ |
| .eq('user_id', user.id) \ |
| .maybe_single() \ |
| .execute() |
| |
| if not quiz_res.data: |
| return jsonify({'error': 'Quiz not found or unauthorized'}), 404 |
| |
| |
| quiz_data = quiz_res.data |
| response_data = { |
| "quiz": { |
| "quiz_id": quiz_data['id'], |
| "difficulty": quiz_data['difficulty'], |
| "created_at": quiz_data['created_at'], |
| "questions": quiz_data['questions'], |
| "source_note": { |
| "note_id": quiz_data['notes']['id'], |
| "content_preview": quiz_data['notes']['content'][:100] + "..." if quiz_data['notes']['content'] else None, |
| "material": { |
| "title": quiz_data['notes']['study_materials']['title'] if quiz_data['notes']['study_materials'] else None, |
| "type": quiz_data['notes']['study_materials']['type'] if quiz_data['notes']['study_materials'] else None |
| } |
| } |
| } |
| } |
| |
| return jsonify(response_data) |
| |
| except Exception as e: |
| logging.error(f"Error fetching quiz {quiz_id}: {str(e)}") |
| return jsonify({'error': 'Internal server error'}), 500 |
|
|
| @app.route('/api/tutor/quizzes/<uuid:quiz_id>/submit', methods=['POST']) |
| def submit_quiz_attempt(quiz_id): |
| """Submits user answers for a quiz and calculates the score.""" |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| if not supabase: return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
| try: |
| data = request.get_json() |
| user_answers = data.get('answers') |
|
|
| if not isinstance(user_answers, dict): |
| return jsonify({'error': 'answers must be provided as a JSON object'}), 400 |
|
|
| |
| quiz_res = supabase.table('quizzes')\ |
| .select('questions, user_id')\ |
| .eq('id', quiz_id)\ |
| .maybe_single()\ |
| .execute() |
| |
| if not quiz_res.data: |
| return jsonify({'error': 'Quiz not found'}), 404 |
|
|
| quiz_questions = quiz_res.data['questions'] |
| if isinstance(quiz_questions, str): |
| quiz_questions = json.loads(quiz_questions) |
|
|
| |
| correct_count = 0 |
| total_questions = len(quiz_questions) |
| feedback = {} |
| correct_answers = {} |
|
|
| for i, question in enumerate(quiz_questions): |
| question_id = question.get('id', str(i)) |
| user_answer = user_answers.get(str(question_id)) |
| correct_answer = question.get('correct_answer') |
|
|
| if user_answer and correct_answer: |
| is_correct = user_answer.upper() == correct_answer.upper() |
| if is_correct: |
| correct_count += 1 |
| |
| correct_answers[str(question_id)] = correct_answer |
| feedback[str(question_id)] = { |
| "correct": is_correct, |
| "correct_answer": correct_answer, |
| "user_answer": user_answer |
| } |
|
|
| score = (correct_count / total_questions) * 100 if total_questions > 0 else 0.0 |
|
|
| |
| attempt_res = supabase.table('quiz_attempts').insert({ |
| 'quiz_id': str(quiz_id), |
| 'user_id': user.id, |
| 'score': score, |
| 'answers': json.dumps(user_answers) |
| }).execute() |
|
|
| return jsonify({ |
| 'success': True, |
| 'attempt_id': attempt_res.data[0]['id'], |
| 'score': round(score, 2), |
| 'correct_count': correct_count, |
| 'total_questions': total_questions, |
| 'correct_answers': correct_answers, |
| 'feedback': feedback |
| }), 201 |
|
|
| except Exception as e: |
| logging.error(f"Error submitting quiz: {traceback.format_exc()}") |
| return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
|
| |
| @app.route('/api/tutor/notes/<uuid:notes_id>/speak', methods=['POST']) |
| def speak_notes(notes_id): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: |
| return jsonify({'error': error['error']}), error['status'] |
|
|
| if not supabase or not elevenlabs_client: |
| return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
| try: |
| |
| note_res = supabase.table('notes') \ |
| .select('user_id, content, tts_audio_url') \ |
| .eq('id', notes_id) \ |
| .eq('user_id', user.id) \ |
| .maybe_single() \ |
| .execute() |
|
|
| if not note_res.data: |
| return jsonify({'error': 'Note not found or unauthorized'}), 404 |
|
|
| |
| profile_res = supabase.table('profiles') \ |
| .select('credits, suspended') \ |
| .eq('id', user.id) \ |
| .single().execute() |
| |
| if profile_res.data['suspended']: |
| return jsonify({'error': 'Account suspended'}), 403 |
| if profile_res.data['credits'] < 5: |
| return jsonify({'error': 'Insufficient credits (Need 5)'}), 402 |
|
|
| |
| if note_res.data.get('tts_audio_url'): |
| return jsonify({ |
| 'success': True, |
| 'audio_url': note_res.data['tts_audio_url'], |
| 'message': 'Using existing audio file' |
| }) |
|
|
| notes_content = note_res.data['content'] |
| if not notes_content: |
| return jsonify({'error': 'Notes content is empty'}), 400 |
|
|
| |
| CHUNK_SIZE = 2000 |
| chunks = [notes_content[i:i+CHUNK_SIZE] for i in range(0, len(notes_content), CHUNK_SIZE)] |
| |
| audio_bytes = b"" |
| for chunk in chunks: |
| try: |
| chunk_audio = elevenlabs_client.generate( |
| text=chunk, |
| voice="Rachel", |
| model="eleven_multilingual_v2", |
| stream=True |
| ) |
| audio_bytes += b"".join(chunk_audio) |
| except Exception as e: |
| logging.error(f"Error generating chunk: {str(e)}") |
| raise RuntimeError(f"Audio generation failed: {str(e)}") |
|
|
| if not audio_bytes: |
| raise RuntimeError("Generated empty audio file") |
|
|
| |
| bucket_name = 'notes-audio' |
| file_path = f'{user.id}/{notes_id}.mp3' |
| |
| try: |
| |
| upload_res = supabase.storage.from_(bucket_name).upload( |
| path=file_path, |
| file=audio_bytes, |
| file_options={ |
| 'content-type': 'audio/mpeg', |
| 'cache-control': '3600', |
| 'upsert': 'true', |
| 'owner': user.id |
| } |
| ) |
| |
| if upload_res.error: |
| raise ConnectionError(upload_res.error.message) |
|
|
| |
| audio_url = supabase.storage.from_(bucket_name).get_public_url(file_path) |
|
|
| |
| update_res = supabase.table('notes') \ |
| .update({'tts_audio_url': audio_url}) \ |
| .eq('id', notes_id) \ |
| .eq('user_id', user.id) \ |
| .execute() |
| |
| if update_res.error: |
| raise ConnectionError(update_res.error.message) |
|
|
| |
| new_credits = profile_res.data['credits'] - 5 |
| credit_res = supabase.table('profiles') \ |
| .update({'credits': new_credits}) \ |
| .eq('id', user.id) \ |
| .execute() |
| |
| if credit_res.error: |
| raise ConnectionError(credit_res.error.message) |
|
|
| return jsonify({ |
| 'success': True, |
| 'audio_url': audio_url, |
| 'remaining_credits': new_credits |
| }) |
|
|
| except Exception as upload_error: |
| |
| supabase.storage.from_(bucket_name).remove([file_path]) |
| raise upload_error |
|
|
| except Exception as e: |
| logging.error(f"Speak error: {traceback.format_exc()}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| |
| @app.route('/api/tutor/notes/<uuid:notes_id>/audio', methods=['GET']) |
| def get_note_audio(notes_id): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: |
| return jsonify({'error': error['error']}), error['status'] |
|
|
| try: |
| notes_res = supabase.table('notes').select('tts_audio_url, user_id').eq('id', notes_id).single().execute() |
| |
| if not notes_res.data: |
| return jsonify({'error': 'Notes not found'}), 404 |
| |
| if notes_res.data['user_id'] != user.id: |
| return jsonify({'error': 'Unauthorized access'}), 403 |
| |
| if not notes_res.data['tts_audio_url']: |
| return jsonify({'error': 'No audio available for these notes'}), 404 |
| |
| return jsonify({ |
| 'success': True, |
| 'audio_url': notes_res.data['tts_audio_url'] |
| }) |
| |
| except Exception as e: |
| logging.error(f"Error getting audio URL: {str(e)}") |
| return jsonify({'error': str(e)}), 500 |
| |
| |
|
|
| @app.route('/api/view/notes', methods=['GET']) |
| def view_notes(): |
| try: |
| |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| |
| |
| query = supabase.table('notes') \ |
| .select('id, content, created_at, tts_audio_url, study_materials(title, type)') \ |
| .eq('user_id', user.id) \ |
| .order('created_at', desc=True) |
| |
| result = query.execute() |
| |
| if hasattr(result, 'error') and result.error: |
| raise Exception(result.error.message) |
| |
| |
| notes = [] |
| for note in result.data: |
| notes.append({ |
| "note_id": note['id'], |
| "content": note['content'], |
| "audio_url": note['tts_audio_url'], |
| "created_at": note['created_at'], |
| "material_title": note['study_materials']['title'] if note['study_materials'] else "Untitled Note", |
| "material_type": note['study_materials']['type'] if note['study_materials'] else None |
| }) |
| |
| return jsonify({"notes": notes}) |
| |
| except Exception as e: |
| print(f"Error in /api/view/notes: {str(e)}") |
| logging.error(f"Notes endpoint error: {str(e)}") |
| logging.error(traceback.format_exc()) |
| return jsonify({'error': str(e)}), 500 |
| |
| @app.route('/api/view/quizzes', methods=['GET']) |
| def view_quizzes(): |
| try: |
| |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| |
| |
| query = supabase.table('quizzes') \ |
| .select('id, difficulty, created_at, notes(content, study_materials(title, type))') \ |
| .eq('user_id', user.id) \ |
| .order('created_at', desc=True) |
| |
| result = query.execute() |
| |
| if hasattr(result, 'error') and result.error: |
| raise Exception(result.error.message) |
| |
| |
| quizzes = [] |
| for quiz in result.data: |
| quizzes.append({ |
| "quiz_id": quiz['id'], |
| "difficulty": quiz['difficulty'], |
| "created_at": quiz['created_at'], |
| "notes_preview": quiz['notes']['content'][:100] + "..." if quiz['notes'] and quiz['notes']['content'] else None, |
| "material_title": quiz['notes']['study_materials']['title'] if quiz['notes'] and quiz['notes']['study_materials'] else "Untitled Quiz", |
| "material_type": quiz['notes']['study_materials']['type'] if quiz['notes'] and quiz['notes']['study_materials'] else None |
| }) |
| |
| return jsonify({"quizzes": quizzes}) |
| |
| except Exception as e: |
| print(f"Error in /api/view/quizzes: {str(e)}") |
| logging.error(f"Quizzes endpoint error: {str(e)}") |
| logging.error(traceback.format_exc()) |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/user/performance', methods=['GET']) |
| def get_user_performance(): |
| """Retrieves user's quiz performance and provides simple suggestions.""" |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| if not supabase: return jsonify({'error': 'Backend service unavailable'}), 503 |
|
|
| try: |
| |
| attempts_res = supabase.table('quiz_attempts')\ |
| .select('id, quiz_id, score, submitted_at, quizzes(difficulty, notes(material_id, study_materials(type, title))))')\ |
| .eq('user_id', user.id)\ |
| .order('submitted_at', desc=True)\ |
| .limit(20)\ |
| .execute() |
|
|
| if attempts_res.error: |
| raise Exception(f"Failed to fetch quiz attempts: {attempts_res.error}") |
|
|
| attempts_data = attempts_res.data |
|
|
| |
| average_score = 0.0 |
| suggestions = [] |
| if attempts_data: |
| total_score = sum(a['score'] for a in attempts_data) |
| average_score = total_score / len(attempts_data) |
|
|
| |
| if average_score < 60: |
| suggestions.append("Your average score is a bit low. Try reviewing the notes more thoroughly before taking quizzes.") |
| |
| low_score_attempts = sorted([a for a in attempts_data if a['score'] < 60], key=lambda x: x['submitted_at'], reverse=True) |
| if low_score_attempts: |
| |
| quiz_info = low_score_attempts[0].get('quizzes') |
| if quiz_info: |
| notes_info = quiz_info.get('notes') |
| if notes_info: |
| material_info = notes_info.get('study_materials') |
| if material_info and material_info.get('title'): |
| suggestions.append(f"Focus on reviewing the material titled: '{material_info['title']}'.") |
|
|
|
|
| elif average_score > 85: |
| suggestions.append("Great job on your recent quizzes! Consider trying 'hard' difficulty quizzes or exploring new topics.") |
| else: |
| suggestions.append("You're making good progress! Keep practicing to solidify your understanding.") |
|
|
| |
|
|
| return jsonify({ |
| 'success': True, |
| 'average_score': round(average_score, 2) if attempts_data else None, |
| 'recent_attempts': attempts_data, |
| 'suggestions': suggestions |
| }) |
|
|
| except Exception as e: |
| logging.error(f"Unexpected error fetching performance for user {user.id}: {traceback.format_exc()}") |
| return jsonify({'error': f'An unexpected error occurred: {e}'}), 500 |
|
|
|
|
| |
|
|
| @app.route('/api/admin/users', methods=['GET']) |
| def admin_list_users(): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| is_admin, admin_error = verify_admin(user) |
| if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] |
|
|
| try: |
| |
| profiles_res = supabase.table('profiles').select('*').execute() |
| return jsonify({'users': profiles_res.data}), 200 |
| except Exception as e: |
| logging.error(f"Admin list users error: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/admin/users/<uuid:target_user_id>/suspend', methods=['PUT']) |
| def admin_suspend_user(target_user_id): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| is_admin, admin_error = verify_admin(user) |
| if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] |
|
|
| try: |
| data = request.get_json() |
| action = data.get('action') |
|
|
| if action not in ["suspend", "unsuspend"]: |
| return jsonify({'error': 'action must be "suspend" or "unsuspend"'}), 400 |
|
|
| should_suspend = (action == "suspend") |
|
|
| |
| update_res = supabase.table('profiles').update({'suspended': should_suspend}).eq('id', target_user_id).execute() |
|
|
| if not update_res.data: |
| |
| |
| user_check = supabase.table('profiles').select('id').eq('id', target_user_id).maybe_single().execute() |
| if not user_check.data: |
| return jsonify({'error': 'User not found'}), 404 |
| else: |
| raise Exception(f"Failed to update suspension status: {update_res.error}") |
|
|
|
|
|
|
| return jsonify({'success': True, 'message': f'User {target_user_id} suspension status set to {should_suspend}'}), 200 |
| except Exception as e: |
| logging.error(f"Admin suspend user error: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| |
| |
|
|
| @app.route('/api/user/credits/request', methods=['POST']) |
| def request_credits(): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| |
| try: |
| data = request.get_json() |
| amount = data.get('amount') |
| note = data.get('note', '') |
|
|
| if not amount or not isinstance(amount, int) or amount <= 0: |
| return jsonify({'error': 'Invalid amount (must be positive integer)'}), 400 |
|
|
| res = supabase.table('credit_requests').insert({ |
| 'user_id': user.id, |
| 'amount': amount, |
| 'status': 'pending', |
| 'note': note, |
| 'created_at': datetime.now().isoformat() |
| }).execute() |
|
|
| return jsonify({ |
| 'success': True, |
| 'request_id': res.data[0]['id'] |
| }), 201 |
|
|
| except Exception as e: |
| logging.error(f"Credit request failed for user {user.id}: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/admin/credit-requests', methods=['GET']) |
| def admin_get_credit_requests(): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| is_admin, admin_error = verify_admin(user) |
| if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] |
|
|
| try: |
| status = request.args.get('status', 'pending') |
| res = supabase.table('credit_requests').select('*').eq('status', status).execute() |
| return jsonify(res.data), 200 |
| except Exception as e: |
| logging.error(f"Admin credit requests fetch failed: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/admin/credit-requests/<uuid:request_id>', methods=['PUT']) |
| def admin_review_credit_request(request_id): |
| user, error = verify_token(request.headers.get('Authorization')) |
| if error: return jsonify({'error': error['error']}), error['status'] |
| is_admin, admin_error = verify_admin(user) |
| if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] |
|
|
| try: |
| data = request.get_json() |
| action = data.get('action') |
| admin_note = data.get('note', '') |
|
|
| if action not in ['approve', 'decline']: |
| return jsonify({'error': 'Invalid action'}), 400 |
|
|
| req_res = supabase.table('credit_requests').select('*').eq('id', request_id).maybe_single().execute() |
| if not req_res.data: |
| return jsonify({'error': 'Request not found'}), 404 |
|
|
| req = req_res.data |
| if req['status'] != 'pending': |
| return jsonify({'error': 'Request already processed'}), 400 |
|
|
| update_data = { |
| 'status': 'approved' if action == 'approve' else 'declined', |
| 'reviewed_at': datetime.now().isoformat(), |
| 'reviewed_by': user.id, |
| 'admin_note': admin_note |
| } |
|
|
| if action == 'approve': |
| supabase.table('profiles').update( |
| {'credits': supabase.table('profiles').credits + req['amount']} |
| ).eq('id', req['user_id']).execute() |
|
|
| supabase.table('credit_requests').update(update_data).eq('id', request_id).execute() |
|
|
| return jsonify({'success': True}), 200 |
|
|
| except Exception as e: |
| logging.error(f"Credit request processing failed: {e}") |
| return jsonify({'error': str(e)}), 500 |
|
|
|
|
| |
| if __name__ == '__main__': |
| if not all([SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY]): |
| print("WARNING: One or more essential environment variables (SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY) are missing!") |
| print("Starting Flask server for AI Tutor...") |
| |
| app.run(debug=True, host="0.0.0.0", port=7860) |