# ---- Main Application File: main.py ---- import io import uuid import re import time import tempfile import requests import json import os import logging import traceback from datetime import datetime from pathlib import Path import urllib.parse from flask import Flask, request, jsonify, send_file, Response from flask_cors import CORS from supabase import create_client, Client # --- Input Processing & AI Libraries --- import google.generativeai as genai from elevenlabs.client import ElevenLabs from elevenlabs import save as save_elevenlabs_audio from PyPDF2 import PdfReader import wikipedia from youtube_transcript_api import YouTubeTranscriptApi import arxiv # For ArXiv from elevenlabs import play, stream, save import math import pydub import logging import traceback import uuid from io import BytesIO # To handle in-memory bytes # --- Environment Variables --- # Load environment variables if using a .env file (optional, good practice) from dotenv import load_dotenv load_dotenv() SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") # Use service role key for admin-like backend tasks SUPABASE_ANON_KEY = os.getenv("SUPABASE_ANON_KEY") # Use anon key for client-side actions if needed, but prefer service key for backend logic GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY") # --- Initialize Flask app and CORS --- app = Flask(__name__) CORS(app) # Allow all origins for simplicity in development # --- Initialize Supabase Client --- try: if not SUPABASE_URL or not SUPABASE_SERVICE_KEY: raise ValueError("Supabase URL and Service Key must be set in environment variables.") supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) print("Supabase client initialized successfully.") # Example table check (optional) # response = supabase.table('users').select("id", count='exact').limit(0).execute() # print("Checked 'users' table connection.") except Exception as e: print(f"Error initializing Supabase client: {e}") # Depending on your setup, you might want to exit or handle this differently supabase = None # Indicate client is not available # --- Initialize Gemini API --- try: if not GEMINI_API_KEY: raise ValueError("Gemini API Key must be set in environment variables.") genai.configure(api_key=GEMINI_API_KEY) # Use a generally available model, adjust if you have access to specific previews gemini_model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp') print("Gemini API initialized successfully.") except Exception as e: print(f"Error initializing Gemini API: {e}") gemini_model = None # --- Initialize ElevenLabs Client --- try: if not ELEVENLABS_API_KEY: raise ValueError("ElevenLabs API Key must be set in environment variables.") elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) print("ElevenLabs client initialized successfully.") # Optional: Check available voices # voices = elevenlabs_client.voices.get_all() # print(f"Available ElevenLabs voices: {[v.name for v in voices.voices]}") except Exception as e: print(f"Error initializing ElevenLabs client: {e}") elevenlabs_client = None # --- Logging --- LOG_FILE_PATH = "/tmp/ai_tutor.log" # Adjust path as needed logging.basicConfig(filename=LOG_FILE_PATH, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # === Helper Functions === def verify_token(auth_header): """Verifies Supabase JWT token from Authorization header.""" if not supabase: raise ConnectionError("Supabase client not initialized.") if not auth_header or not auth_header.startswith('Bearer '): return None, {'error': 'Missing or invalid Authorization header', 'status': 401} token = auth_header.split(' ')[1] try: # Verify token and get user data response = supabase.auth.get_user(token) user = response.user if not user: return None, {'error': 'Invalid or expired token', 'status': 401} # Optionally fetch profile data if needed immediately # profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute() return user, None except Exception as e: logging.error(f"Token verification error: {e}") # Differentiate between specific Supabase errors if needed return None, {'error': f'Token verification failed: {e}', 'status': 401} def verify_admin(user): """Checks if the verified user is an admin.""" if not supabase: raise ConnectionError("Supabase client not initialized.") if not user: return False, {'error': 'User not provided for admin check', 'status': 400} try: # Check the 'is_admin' flag in the 'profiles' table profile_res = supabase.table('profiles').select('is_admin').eq('id', user.id).maybe_single().execute() profile_data = profile_res.data if profile_data and profile_data.get('is_admin'): return True, None else: return False, {'error': 'Admin access required', 'status': 403} # 403 Forbidden except Exception as e: logging.error(f"Admin check failed for user {user.id}: {e}") return False, {'error': f'Error checking admin status: {e}', 'status': 500} def upload_to_supabase_storage(bucket_name: str, file_path: str, destination_path: str, content_type: str): """Uploads a local file to Supabase Storage.""" if not supabase: raise ConnectionError("Supabase client not initialized.") try: with open(file_path, 'rb') as f: # Use upsert=True to overwrite if file exists, adjust if needed supabase.storage.from_(bucket_name).upload( path=destination_path, file=f, file_options={"content-type": content_type, "cache-control": "3600", "upsert": "true"} ) # Get the public URL (ensure bucket has public access enabled or use signed URLs) res = supabase.storage.from_(bucket_name).get_public_url(destination_path) return res except Exception as e: logging.error(f"Supabase Storage upload failed: {e}") raise # Re-raise the exception to be handled by the caller # === Input Content Extraction Helpers === def get_pdf_text(pdf_file_storage): """Extract text from a PDF file stream.""" text = "" try: pdf_reader = PdfReader(pdf_file_storage) for page in pdf_reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" # Simple truncation (consider smarter chunking for very large PDFs) MAX_CHARS = 300000 # Adjust as needed based on Gemini context limits return text[:MAX_CHARS] except Exception as e: logging.error(f"Error reading PDF: {e}") raise ValueError(f"Could not process PDF file: {e}") def get_youtube_transcript(url): """Get transcript text from a YouTube URL.""" try: if "v=" in url: video_id = url.split("v=")[1].split("&")[0] elif "youtu.be/" in url: video_id = url.split("youtu.be/")[1].split("?")[0] else: raise ValueError("Invalid YouTube URL format.") transcript_list = YouTubeTranscriptApi.get_transcript(video_id) transcript_text = " ".join([item['text'] for item in transcript_list]) MAX_CHARS = 300000 # Adjust as needed return transcript_text[:MAX_CHARS] except Exception as e: logging.error(f"Error getting YouTube transcript for {url}: {e}") raise ValueError(f"Could not get transcript: {e}") def get_wiki_content(url): """Get summary content from a Wikipedia URL.""" try: # Extract title from URL (simple approach) page_title = urllib.parse.unquote(url.rstrip("/").split("/")[-1]).replace("_", " ") wikipedia.set_lang("en") # Or configure based on user preference page = wikipedia.page(page_title, auto_suggest=False) # Be specific content = page.content # Get full content, might be large # Alternatively, use summary: content = page.summary MAX_CHARS = 300000 # Adjust as needed return content[:MAX_CHARS] except wikipedia.exceptions.PageError: raise ValueError(f"Wikipedia page '{page_title}' not found.") except wikipedia.exceptions.DisambiguationError as e: raise ValueError(f"'{page_title}' refers to multiple pages: {e.options}") except Exception as e: logging.error(f"Error getting Wikipedia content for {url}: {e}") raise ValueError(f"Could not get Wikipedia content: {e}") def fetch_bible_text(reference): """Fetch Bible text from an external API (example using bible-api.com).""" # This API is simple but might have limitations. Consider alternatives if needed. try: # URL encode the reference query = urllib.parse.quote(reference) api_url = f"https://bible-api.com/{query}?translation=kjv" # King James Version, change if needed response = requests.get(api_url, timeout=15) # Add timeout response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) data = response.json() # Check if 'text' exists, handle potential variations in API response if 'text' in data: text = data['text'].strip() MAX_CHARS = 300000 return text[:MAX_CHARS] elif 'error' in data: raise ValueError(f"Bible API error: {data['error']}") else: # Attempt to extract verses if the structure is different if 'verses' in data and isinstance(data['verses'], list): text = " ".join([v.get('text', '').strip() for v in data['verses']]) MAX_CHARS = 300000 return text[:MAX_CHARS] if text else ValueError("Bible reference not found or empty.") else: raise ValueError("Bible API response format not recognized.") except requests.exceptions.RequestException as e: logging.error(f"Error fetching Bible text for '{reference}': {e}") raise ConnectionError(f"Could not connect to Bible API: {e}") except Exception as e: logging.error(f"Error processing Bible reference '{reference}': {e}") raise ValueError(f"Could not process Bible reference: {e}") def get_arxiv_content(arxiv_id): """Fetch abstract or PDF text from ArXiv.""" try: # Clean up potential URL prefixes if 'arxiv.org/abs/' in arxiv_id: arxiv_id = arxiv_id.split('/abs/')[-1] if 'arxiv.org/pdf/' in arxiv_id: arxiv_id = arxiv_id.split('/pdf/')[-1].replace('.pdf', '') search = arxiv.Search(id_list=[arxiv_id]) paper = next(search.results()) # Get the first (and only) result # Prioritize abstract, as full PDF processing is heavy content = f"Title: {paper.title}\n\nAbstract: {paper.summary}" MAX_CHARS = 300000 # Adjust as needed return content[:MAX_CHARS], paper.title # Return content and title except StopIteration: raise ValueError(f"ArXiv paper with ID '{arxiv_id}' not found.") except Exception as e: logging.error(f"Error fetching ArXiv content for {arxiv_id}: {e}") raise ValueError(f"Could not get ArXiv content: {e}") # === Gemini Interaction Helpers === def generate_notes_with_gemini(text_content, title=None): """Generates study notes using Gemini.""" if not gemini_model: raise ConnectionError("Gemini client not initialized.") try: prompt = f""" Act as an expert educator and study assistant. Based on the following text {'titled "' + title + '" ' if title else ''} , generate comprehensive and well-structured study notes. **Instructions:** 1. **Identify Key Concepts:** Extract the main topics, definitions, key figures, dates, arguments, and important takeaways. 2. **Structure Logically:** Organize the notes with clear headings (using Markdown ##) and bullet points (* or -) for readability. Use sub-bullets if necessary. 3. **Be Concise but Thorough:** Summarize the information accurately without unnecessary jargon. Ensure all critical points are covered. 4. **Highlight Importance:** You can use bold text (**bold**) for very important terms or concepts. 5. **Focus:** Generate only the notes based on the provided text. Do not add introductions like "Here are the notes..." or conclusions like "These notes cover...". **Source Text:** --- {text_content} --- **Generated Study Notes:** """ response = gemini_model.generate_content(prompt) return response.text.strip() except Exception as e: logging.error(f"Gemini note generation failed: {e}") raise RuntimeError(f"AI failed to generate notes: {e}") def generate_quiz_with_gemini(notes_content, difficulty, num_questions=5): """Generates multiple-choice quiz using Gemini.""" if not gemini_model: raise ConnectionError("Gemini client not initialized.") difficulty_map = { "easy": "basic recall and understanding", "medium": "application and interpretation", "hard": "analysis, synthesis, and evaluation" } difficulty_desc = difficulty_map.get(difficulty.lower(), "medium difficulty") try: prompt = f""" Act as an expert quiz creator. Based on the following study notes, create a multiple-choice quiz. **Instructions:** 1. **Number of Questions:** Generate exactly {num_questions} questions. 2. **Difficulty Level:** The questions should be of {difficulty_desc} ({difficulty}). 3. **Format:** Each question must have exactly four options (A, B, C, D). 4. **Clarity:** Questions and options should be clear and unambiguous. 5. **Single Correct Answer:** Ensure only one option is the correct answer. 6. **JSON Output:** Format the entire output STRICTLY as a JSON list of objects. Each object must have the following keys: "question" (string), "options" (an object with keys "A", "B", "C", "D", all strings), and "correct_answer" (string, either "A", "B", "C", or "D"). 7. **Focus:** Generate only the JSON output. Do not include any introductory text, explanations, or markdown formatting outside the JSON structure. **Study Notes:** --- {notes_content} --- **Quiz JSON Output:** ```json [ {{ "question": "...", "options": {{ "A": "...", "B": "...", "C": "...", "D": "..." }}, "correct_answer": "..." }} // ... more question objects ] ``` """ response = gemini_model.generate_content(prompt) # Clean potential markdown code block fences cleaned_response = response.text.strip().lstrip('```json').rstrip('```').strip() # Validate and parse JSON quiz_data = json.loads(cleaned_response) # Add basic validation (e.g., check if it's a list, check keys in first item) if not isinstance(quiz_data, list): raise ValueError("AI response is not a list.") if quiz_data and not all(k in quiz_data[0] for k in ["question", "options", "correct_answer"]): raise ValueError("AI response list items have missing keys.") return quiz_data except json.JSONDecodeError as e: logging.error(f"Gemini quiz generation returned invalid JSON: {cleaned_response[:500]}... Error: {e}") raise RuntimeError(f"AI failed to generate a valid quiz format. Please try again.") except Exception as e: logging.error(f"Gemini quiz generation failed: {e}") raise RuntimeError(f"AI failed to generate quiz: {e}") # === Authentication Endpoints === @app.route('/api/auth/signup', methods=['POST']) def signup(): if not supabase: return jsonify({'error': 'Service unavailable'}), 503 try: data = request.get_json() email = data.get('email') password = data.get('password') if not email or not password: return jsonify({'error': 'Email and password are required'}), 400 res = supabase.auth.sign_up({"email": email, "password": password}) # Ensure profile is created with 20 credits supabase.table('profiles').upsert({ 'id': res.user.id, 'email': email, 'credits': 20 }).execute() return jsonify({ 'success': True, 'message': 'Signup successful. Please check your email for verification.', 'user_id': res.user.id if res.user else None }), 201 except Exception as e: error_message = str(e) status_code = 400 if "User already registered" in error_message: error_message = "Email already exists." status_code = 409 logging.error(f"Signup error: {error_message}") return jsonify({'error': error_message}), status_code @app.route('/api/auth/signin', methods=['POST']) def signin(): if not supabase: return jsonify({'error': 'Service unavailable'}), 503 try: data = request.get_json() email = data.get('email') password = data.get('password') if not email or not password: return jsonify({'error': 'Email and password are required'}), 400 # Sign in user using Supabase Auth res = supabase.auth.sign_in_with_password({"email": email, "password": password}) # Fetch associated profile data profile_res = supabase.table('profiles').select('*').eq('id', res.user.id).maybe_single().execute() return jsonify({ 'success': True, 'access_token': res.session.access_token, 'refresh_token': res.session.refresh_token, 'user': { 'id': res.user.id, 'email': res.user.email, 'profile': profile_res.data # Include profile details } }), 200 except Exception as e: # Handle specific errors like invalid credentials error_message = str(e) status_code = 401 # Unauthorized if "Invalid login credentials" in error_message: error_message = "Invalid email or password." elif "Email not confirmed" in error_message: error_message = "Please verify your email address before signing in." status_code = 403 # Forbidden logging.error(f"Signin error: {error_message}") return jsonify({'error': error_message}), status_code @app.route('/api/auth/google-signin', methods=['POST']) def google_signin(): # Assuming frontend handles OAuth and sends Supabase session token: user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] try: # User is verified via the token. Fetch their profile. profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute() if not profile_res.data: # This case *shouldn't* happen if the trigger works, but handle defensively logging.warning(f"Google Sign-In: Profile not found for verified user {user.id}, attempting to create.") # Attempt to create profile (might fail if email exists from password signup) insert_res = supabase.table('profiles').insert({ 'id': user.id, 'email': user.email, # Set default credits/roles if needed }).execute() profile_data = insert_res.data[0] if insert_res.data else None if not profile_data: raise Exception("Failed to create profile entry after Google Sign-In.") else: profile_data = profile_res.data # Return user info (don't need to send tokens back usually, frontend manages session) return jsonify({ 'success': True, 'message': 'Google sign-in verified successfully.', 'user': { 'id': user.id, 'email': user.email, 'profile': profile_data } }), 200 except Exception as e: logging.error(f"Google sign-in profile fetch/creation error: {e}") return jsonify({'error': f'An error occurred during sign-in: {e}'}), 500 # === User Profile Endpoint === @app.route('/api/user/profile', methods=['GET']) def get_user_profile(): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] try: # Fetch user's profile data from the 'profiles' table profile_res = supabase.table('profiles').select('*').eq('id', user.id).maybe_single().execute() if not profile_res.data: # This indicates a potential issue (user exists in auth but not profiles) logging.error(f"Profile not found for authenticated user: {user.id} / {user.email}") return jsonify({'error': 'User profile not found.'}), 404 # Combine auth info (like email) with profile info profile_data = profile_res.data full_user_data = { 'id': user.id, 'email': user.email, # Email from auth is usually the source of truth 'credits': profile_data.get('credits'), 'is_admin': profile_data.get('is_admin'), 'created_at': profile_data.get('created_at'), 'suspended': profile_data.get('suspended') # Add any other fields from 'profiles' table } return jsonify(full_user_data), 200 except Exception as e: logging.error(f"Error fetching user profile for {user.id}: {e}") return jsonify({'error': f'Failed to fetch profile: {e}'}), 500 # === AI Tutor Core Endpoints === @app.route('/api/tutor/process_input', methods=['POST']) def process_input_and_generate_notes(): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503 profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute() if profile_res.data['suspended']: return jsonify({'error': 'Account suspended'}), 403 if profile_res.data['credits'] < 2: return jsonify({'error': 'Insufficient credits (Need 2)'}), 402 try: input_type = request.form.get('input_type') source_ref = request.form.get('source_ref') # URL, Bible ref, ArXiv ID, etc. uploaded_file = request.files.get('file') # For PDF if not input_type: return jsonify({'error': 'input_type (e.g., pdf, youtube, wiki, bible, arxiv, text) is required'}), 400 content = None title = None # Optional title if input_type == 'pdf': if not uploaded_file: return jsonify({'error': 'File is required for input_type pdf'}), 400 if not uploaded_file.filename.lower().endswith('.pdf'): return jsonify({'error': 'Only PDF files are allowed'}), 400 content = get_pdf_text(uploaded_file.stream) source_ref = uploaded_file.filename # Use filename as reference title = uploaded_file.filename elif input_type == 'youtube': if not source_ref: return jsonify({'error': 'source_ref (YouTube URL) is required'}), 400 content = get_youtube_transcript(source_ref) # You could fetch the video title using youtube API/libraries if needed elif input_type == 'wiki': if not source_ref: return jsonify({'error': 'source_ref (Wikipedia URL) is required'}), 400 content = get_wiki_content(source_ref) title = urllib.parse.unquote(source_ref.rstrip("/").split("/")[-1]).replace("_", " ") elif input_type == 'bible': if not source_ref: return jsonify({'error': 'source_ref (Bible reference) is required'}), 400 content = fetch_bible_text(source_ref) title = source_ref elif input_type == 'arxiv': if not source_ref: return jsonify({'error': 'source_ref (ArXiv ID or URL) is required'}), 400 content, title = get_arxiv_content(source_ref) # Gets title too elif input_type == 'text': content = request.form.get('text_content') if not content: return jsonify({'error': 'text_content is required for input_type text'}), 400 source_ref = content[:100] + "..." # Use beginning of text as ref title = "Custom Text" else: return jsonify({'error': f'Unsupported input_type: {input_type}'}), 400 if not content: return jsonify({'error': 'Failed to extract content from the source.'}), 500 # --- Generate Notes --- start_time = time.time() logging.info(f"Generating notes for user {user.id}, type: {input_type}, ref: {source_ref[:50]}") generated_notes = generate_notes_with_gemini(content, title=title) logging.info(f"Notes generation took {time.time() - start_time:.2f}s") # --- Save to Database --- # 1. Save Study Material material_res = supabase.table('study_materials').insert({ 'user_id': user.id, 'type': input_type, 'source_ref': source_ref, 'source_content': content if len(content) < 10000 else content[:10000] + "... (truncated)", # Optionally save truncated content 'title': title }).execute() if not material_res.data: raise Exception(f"Failed to save study material: {material_res.error}") material_id = material_res.data[0]['id'] # 2. Save Notes linked to Material notes_res = supabase.table('notes').insert({ 'material_id': material_id, 'user_id': user.id, 'content': generated_notes }).execute() if not notes_res.data: raise Exception(f"Failed to save generated notes: {notes_res.error}") notes_id = notes_res.data[0]['id'] # --- Deduct Credits (Example) --- new_credits = profile_res.data['credits'] - 2 supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute() return jsonify({ 'success': True, 'message': 'Content processed and notes generated successfully.', 'material_id': material_id, 'notes_id': notes_id, 'notes': generated_notes # Return notes directly for immediate use }), 201 except ValueError as e: # Input validation errors logging.warning(f"Input processing error for user {user.id}: {e}") return jsonify({'error': str(e)}), 400 except ConnectionError as e: # Service unavailable (Supabase, Gemini, etc.) logging.error(f"Connection error during processing: {e}") return jsonify({'error': f'A backend service is unavailable: {e}'}), 503 except RuntimeError as e: # AI generation errors logging.error(f"RuntimeError during processing for user {user.id}: {e}") return jsonify({'error': str(e)}), 500 except Exception as e: logging.error(f"Unexpected error processing input for user {user.id}: {traceback.format_exc()}") return jsonify({'error': f'An unexpected error occurred: {e}'}), 500 @app.route('/api/view/notes/', methods=['GET']) def get_note_by_id(note_id): try: # --- Authentication --- user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] # --- Query Database --- note_res = supabase.table('notes') \ .select('id, content, created_at, tts_audio_url, study_materials(title, type, source_ref)') \ .eq('id', note_id) \ .eq('user_id', user.id) \ .maybe_single() \ .execute() if not note_res.data: return jsonify({'error': 'Note not found or unauthorized'}), 404 # --- Format Response --- note_data = note_res.data response_data = { "note": { "note_id": note_data['id'], "content": note_data['content'], "audio_url": note_data['tts_audio_url'], "created_at": note_data['created_at'], "material": { "title": note_data['study_materials']['title'] if note_data['study_materials'] else "Untitled", "type": note_data['study_materials']['type'] if note_data['study_materials'] else None, "source_ref": note_data['study_materials']['source_ref'] if note_data['study_materials'] else None } } } return jsonify(response_data) except Exception as e: logging.error(f"Error fetching note {note_id}: {str(e)}") return jsonify({'error': 'Internal server error'}), 500 @app.route('/api/tutor/notes//generate_quiz', methods=['POST']) def generate_quiz_for_notes(notes_id): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] if not supabase or not gemini_model: return jsonify({'error': 'Backend service unavailable'}), 503 profile_res = supabase.table('profiles').select('credits', 'suspended').eq('id', user.id).single().execute() if profile_res.data['suspended']: return jsonify({'error': 'Account suspended'}), 403 if profile_res.data['credits'] < 2: return jsonify({'error': 'Insufficient credits (Need 2)'}), 402 try: data = request.get_json() difficulty = data.get('difficulty', 'medium').lower() num_questions = int(data.get('num_questions', 5)) if difficulty not in ['easy', 'medium', 'hard']: return jsonify({'error': 'difficulty must be easy, medium, or hard'}), 400 if not 1 <= num_questions <= 10: return jsonify({'error': 'num_questions must be between 1 and 10'}), 400 # --- Fetch Notes Content --- notes_res = supabase.table('notes').select('content, user_id').eq('id', notes_id).maybe_single().execute() if not notes_res.data: return jsonify({'error': 'Notes not found'}), 404 # Ensure user owns the notes if notes_res.data['user_id'] != user.id: return jsonify({'error': 'You do not have permission to access these notes'}), 403 notes_content = notes_res.data['content'] # --- Generate Quiz --- start_time = time.time() logging.info(f"Generating {difficulty} quiz ({num_questions}q) for user {user.id}, notes: {notes_id}") quiz_questions = generate_quiz_with_gemini(notes_content, difficulty, num_questions) logging.info(f"Quiz generation took {time.time() - start_time:.2f}s") # --- Save Quiz to Database --- quiz_res = supabase.table('quizzes').insert({ 'notes_id': str(notes_id), 'user_id': user.id, 'difficulty': difficulty, 'questions': json.dumps(quiz_questions) # Store questions as JSONB }).execute() if not quiz_res.data: raise Exception(f"Failed to save generated quiz: {quiz_res.error}") quiz_id = quiz_res.data[0]['id'] new_credits = profile_res.data['credits'] - 2 supabase.table('profiles').update({'credits': new_credits}).eq('id', user.id).execute() return jsonify({ 'success': True, 'quiz_id': quiz_id, 'difficulty': difficulty, 'questions': quiz_questions # Return quiz data for immediate use }), 201 except ValueError as e: return jsonify({'error': str(e)}), 400 except ConnectionError as e: logging.error(f"Connection error during quiz generation: {e}") return jsonify({'error': f'A backend service is unavailable: {e}'}), 503 except RuntimeError as e: # AI generation errors logging.error(f"RuntimeError during quiz generation for user {user.id}: {e}") return jsonify({'error': str(e)}), 500 except Exception as e: logging.error(f"Unexpected error generating quiz for user {user.id}, notes {notes_id}: {traceback.format_exc()}") return jsonify({'error': f'An unexpected error occurred: {e}'}), 500 @app.route('/api/view/quizzes/', methods=['GET']) def get_quiz_by_id(quiz_id): try: # --- Authentication --- user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] # --- Query Database --- quiz_res = supabase.table('quizzes') \ .select('''id, difficulty, created_at, questions, notes(id, content, study_materials(title, type))''') \ .eq('id', quiz_id) \ .eq('user_id', user.id) \ .maybe_single() \ .execute() if not quiz_res.data: return jsonify({'error': 'Quiz not found or unauthorized'}), 404 # --- Format Response --- quiz_data = quiz_res.data response_data = { "quiz": { "quiz_id": quiz_data['id'], "difficulty": quiz_data['difficulty'], "created_at": quiz_data['created_at'], "questions": quiz_data['questions'], "source_note": { "note_id": quiz_data['notes']['id'], "content_preview": quiz_data['notes']['content'][:100] + "..." if quiz_data['notes']['content'] else None, "material": { "title": quiz_data['notes']['study_materials']['title'] if quiz_data['notes']['study_materials'] else None, "type": quiz_data['notes']['study_materials']['type'] if quiz_data['notes']['study_materials'] else None } } } } return jsonify(response_data) except Exception as e: logging.error(f"Error fetching quiz {quiz_id}: {str(e)}") return jsonify({'error': 'Internal server error'}), 500 @app.route('/api/tutor/quizzes//submit', methods=['POST']) def submit_quiz_attempt(quiz_id): """Submits user answers for a quiz and calculates the score.""" user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] if not supabase: return jsonify({'error': 'Backend service unavailable'}), 503 try: data = request.get_json() user_answers = data.get('answers') # Expected format: { "questionId": "A", ... } if not isinstance(user_answers, dict): return jsonify({'error': 'answers must be provided as a JSON object'}), 400 # Fetch Quiz Data quiz_res = supabase.table('quizzes')\ .select('questions, user_id')\ .eq('id', quiz_id)\ .maybe_single()\ .execute() if not quiz_res.data: return jsonify({'error': 'Quiz not found'}), 404 quiz_questions = quiz_res.data['questions'] if isinstance(quiz_questions, str): quiz_questions = json.loads(quiz_questions) # Calculate Score correct_count = 0 total_questions = len(quiz_questions) feedback = {} correct_answers = {} for i, question in enumerate(quiz_questions): question_id = question.get('id', str(i)) # Use question.id if exists, otherwise index user_answer = user_answers.get(str(question_id)) # Match by question_id correct_answer = question.get('correct_answer') if user_answer and correct_answer: is_correct = user_answer.upper() == correct_answer.upper() if is_correct: correct_count += 1 correct_answers[str(question_id)] = correct_answer feedback[str(question_id)] = { "correct": is_correct, "correct_answer": correct_answer, "user_answer": user_answer } score = (correct_count / total_questions) * 100 if total_questions > 0 else 0.0 # Save Quiz Attempt attempt_res = supabase.table('quiz_attempts').insert({ 'quiz_id': str(quiz_id), 'user_id': user.id, 'score': score, 'answers': json.dumps(user_answers) }).execute() return jsonify({ 'success': True, 'attempt_id': attempt_res.data[0]['id'], 'score': round(score, 2), 'correct_count': correct_count, 'total_questions': total_questions, 'correct_answers': correct_answers, # Send back correct answers 'feedback': feedback }), 201 except Exception as e: logging.error(f"Error submitting quiz: {traceback.format_exc()}") return jsonify({'error': str(e)}), 500 # Modified speak_notes endpoint with ElevenLabs Studio API and chunking try: from pydub import AudioSegment PYDUB_AVAILABLE = True except ImportError: PYDUB_AVAILABLE = False logging.warning("pydub library not found or ffmpeg might be missing. Audio chunk concatenation will fail. Please install pydub and ensure ffmpeg is in your system's PATH.") # Define a dummy AudioSegment class if pydub is not installed to avoid NameError later class AudioSegment: @staticmethod def from_file(*args, **kwargs): raise ImportError("pydub/ffmpeg not installed or accessible") def __add__(self, other): raise ImportError("pydub/ffmpeg not installed or accessible") def export(self, *args, **kwargs): raise ImportError("pydub/ffmpeg not installed or accessible") def generate_tts_audio(text_to_speak, voice_id="Rachel"): """Generates TTS audio using ElevenLabs and returns audio bytes.""" if not elevenlabs_client: raise ConnectionError("ElevenLabs client not initialized.") try: # Stream the audio generation audio_stream = elevenlabs_client.generate( text=text_to_speak, voice=voice_id, # You can customize this model="eleven_multilingual_v2", # Or another suitable model stream=False ) # Collect audio bytes from the stream audio_bytes = b"" for chunk in audio_stream: audio_bytes += chunk if not audio_bytes: raise ValueError("ElevenLabs generated empty audio.") return audio_bytes except Exception as e: logging.error(f"ElevenLabs TTS generation failed: {e}") raise RuntimeError(f"Failed to generate audio: {e}") @app.route('/api/tutor/notes//speak', methods=['POST']) def speak_notes(notes_id): """ Generate TTS audio for notes using ElevenLabs, combine chunks using pydub, and store the final MP3 in Supabase Storage. Updates the note record with the audio URL and deducts credits. Rejects requests for content over 10,000 characters. """ if not PYDUB_AVAILABLE: logging.error("Audio processing library (pydub/ffmpeg) check failed.") return jsonify({'error': 'Server configuration error: Audio processing library not available.'}), 500 # 0. Authenticate User user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] if not supabase or not elevenlabs_client: logging.error("Backend service (Supabase or ElevenLabs client) not initialized.") return jsonify({'error': 'Backend service unavailable'}), 503 try: # 1. Verify note ownership and get content logging.info(f"Processing speak request for note {notes_id} by user {user.id}") note_res = supabase.table('notes') \ .select('user_id, content, tts_audio_url') \ .eq('id', str(notes_id)) \ .eq('user_id', user.id) \ .maybe_single() \ .execute() if not note_res.data: logging.warning(f"Note {notes_id} not found or unauthorized for user {user.id}.") return jsonify({'error': 'Note not found or unauthorized'}), 404 # 2. Check user status and credits profile_res = supabase.table('profiles') \ .select('credits, suspended') \ .eq('id', user.id) \ .single() \ .execute() # Check for potential errors from profile fetch itself if needed if not profile_res.data: logging.error(f"Could not fetch profile for user {user.id}") return jsonify({'error': 'Failed to retrieve user profile'}), 500 if profile_res.data.get('suspended'): logging.warning(f"User {user.id} account is suspended.") return jsonify({'error': 'Account suspended'}), 403 current_credits = profile_res.data.get('credits', 0) CREDIT_COST = 5 if current_credits < CREDIT_COST: logging.warning(f"User {user.id} has insufficient credits ({current_credits}/{CREDIT_COST}).") return jsonify({'error': f'Insufficient credits (Need {CREDIT_COST})'}), 402 # 3. Return existing audio if available (and skip generation/deduction) existing_audio_url = note_res.data.get('tts_audio_url') if existing_audio_url: logging.info(f"Using existing audio URL for note {notes_id}: {existing_audio_url}") return jsonify({ 'success': True, 'audio_url': existing_audio_url, 'message': 'Using existing audio file', 'remaining_credits': current_credits # Return current credits as none were deducted }) notes_content = note_res.data.get('content') if not notes_content or not notes_content.strip(): logging.warning(f"Note {notes_id} content is empty.") return jsonify({'error': 'Notes content is empty'}), 400 # Check for character limit (10,000 characters) if len(notes_content) > 10000: logging.warning(f"Note {notes_id} content exceeds 10,000 character limit ({len(notes_content)} chars).") return jsonify({ 'error': 'Content exceeds maximum length', 'message': f'Note content is {len(notes_content)} characters. Maximum allowed is 10,000 characters.' }), 413 # 4. Generate TTS Audio with chunking (still need chunking for long texts) # ElevenLabs v2 non-streaming limit is often around 2500 chars, but check docs. CHUNK_SIZE = 2500 text_chunks = [notes_content[i:i+CHUNK_SIZE] for i in range(0, len(notes_content), CHUNK_SIZE)] combined_audio_segment = None logging.info(f"Generating audio for note {notes_id} in {len(text_chunks)} chunks.") for i, chunk in enumerate(text_chunks): try: logging.debug(f"Generating audio for chunk {i+1}/{len(text_chunks)}...") # Use the new generate_tts_audio function chunk_audio_bytes = generate_tts_audio( text_to_speak=chunk.strip(), # Ensure no leading/trailing whitespace in chunk voice_id="Rachel" # Or your desired voice ID ) if not chunk_audio_bytes: logging.warning(f"TTS generation returned empty audio for chunk {i+1} of note {notes_id}") continue # Skip this chunk, maybe log or handle differently if needed # Load chunk audio bytes into pydub AudioSegment using BytesIO segment = AudioSegment.from_file(BytesIO(chunk_audio_bytes), format="mp3") # Combine segments if combined_audio_segment is None: combined_audio_segment = segment else: combined_audio_segment += segment # Append segment logging.debug(f"Successfully processed chunk {i+1}/{len(text_chunks)}") except ImportError as e: logging.error(f"pydub/ffmpeg error during chunk processing: {e}") raise e # Re-raise to be caught by the outer ImportError handler except Exception as e: logging.error(f"Error generating/processing audio chunk {i+1} for note {notes_id}: {str(e)}") # Stop the process if a chunk fails raise RuntimeError(f"Audio generation/processing failed for chunk {i+1}: {str(e)}") if combined_audio_segment is None: # This could happen if all chunks failed or the content was only whitespace logging.error(f"Failed to generate any audio content for note {notes_id}.") raise RuntimeError("Failed to generate any audio content.") # Export combined audio to final bytes output_bytes_io = BytesIO() combined_audio_segment.export(output_bytes_io, format="mp3") final_audio_bytes = output_bytes_io.getvalue() # Get the raw 'bytes' data if not final_audio_bytes: logging.error(f"Generated empty final audio file after combining chunks for note {notes_id}.") raise RuntimeError("Generated empty final audio file after combining chunks.") logging.info(f"Audio generation complete for note {notes_id}. Total size: {len(final_audio_bytes)} bytes.") # 5. Save to Supabase Storage bucket_name = 'notes-audio' # Ensure this bucket exists and has correct policies # Use user ID and note ID for a unique, organized path file_path = f'{user.id}/{str(notes_id)}.mp3' audio_url = None # Initialize audio_url try: logging.info(f"Uploading audio to Supabase Storage: {bucket_name}/{file_path}") # Upload the final combined audio bytes. Use upsert=true to overwrite if regenerating. supabase.storage.from_(bucket_name).upload( path=file_path, file=final_audio_bytes, # Pass the raw 'bytes' object file_options={"content-type": "audio/mpeg", "upsert": "true"} ) # Note: supabase-py v1 might raise StorageException on failure. # v2 might return a response object to check. Adapt error checking if needed. # Get public URL (make sure RLS policies allow public reads or generate signed URL) public_url_data = supabase.storage.from_(bucket_name).get_public_url(file_path) # Assuming the URL is directly in the response data audio_url = public_url_data if not audio_url: # This case indicates an issue with getting the URL after a successful upload logging.error(f"Upload to {file_path} seemed successful, but failed to get public URL.") raise ConnectionError("Failed to retrieve audio URL after upload.") logging.info(f"Audio uploaded successfully for note {notes_id}. URL: {audio_url}") # --- Database Updates and Credit Deduction --- # Wrap these in a try/except block for potential rollback on failure try: # 6. Update notes table with the audio URL logging.debug(f"Updating notes table for note {notes_id} with URL.") update_res = supabase.table('notes') \ .update({'tts_audio_url': audio_url}) \ .eq('id', str(notes_id)) \ .eq('user_id', user.id) \ .execute() # Basic check if response indicates data was modified (adapt based on client version) if not update_res.data: logging.warning(f"Note update query executed for {notes_id} but no data returned (might be ok, or indicate issue).") # Consider stronger checks based on specific client behavior on error/no-update # 7. Deduct credits new_credits = current_credits - CREDIT_COST logging.debug(f"Deducting {CREDIT_COST} credits for user {user.id}. New balance: {new_credits}") credit_res = supabase.table('profiles') \ .update({'credits': new_credits}) \ .eq('id', user.id) \ .execute() # Basic check for credit update if not credit_res.data: # CRITICAL: Failed to deduct credits after upload/URL update. logging.error(f"CRITICAL: Failed to deduct credits for user {user.id} after audio generation for note {notes_id}.") # Decide handling: Log and proceed? Attempt rollback? # For now, log error and return success as audio is generated, but flag the inconsistency. # Ideally, implement transactional logic or robust cleanup. logging.info(f"Successfully updated database and deducted credits for note {notes_id}") return jsonify({ 'success': True, 'audio_url': audio_url, 'remaining_credits': new_credits }) except Exception as db_error: # Error occurred during DB update/credit deduction AFTER successful upload logging.error(f"Database update/credit deduction failed for note {notes_id} AFTER upload: {str(db_error)}. URL was {audio_url}") logging.info(f"Attempting to clean up uploaded file: {file_path}") # Attempt to clean up the uploaded file since DB update failed try: supabase.storage.from_(bucket_name).remove([file_path]) logging.info(f"Successfully cleaned up orphaned file: {file_path}") except Exception as cleanup_error: logging.error(f"Failed to clean up orphaned file {file_path} after DB error: {cleanup_error}") # Re-raise the database error to signal the overall operation failed raise db_error except Exception as upload_db_error: # This catches errors during upload OR the subsequent DB operations block if re-raised logging.error(f"Error during upload or DB update phase for note {notes_id}: {str(upload_db_error)}") # Attempt cleanup if file might have been uploaded and URL obtained before the error if audio_url: # Check if upload likely succeeded before the error try: logging.info(f"Attempting cleanup for failed operation: {file_path}") supabase.storage.from_(bucket_name).remove([file_path]) logging.info(f"Cleanup successful for {file_path}") except Exception as cleanup_error: # Log if cleanup also fails, but report the original error logging.error(f"Upload/DB error occurred, AND cleanup failed for {file_path}: {cleanup_error}") # Re-raise the original error that caused the failure raise upload_db_error except ImportError as e: # Catch the specific ImportError from the pydub check/usage logging.error(f"Missing dependency error: {e}") return jsonify({'error': 'Server configuration error: Audio library (pydub/ffmpeg) missing or failed.'}), 500 except (RuntimeError, ConnectionError) as e: # Catch specific errors we raised for generation/upload/db issues logging.error(f"Operation failed for note {notes_id}: {str(e)}") return jsonify({'error': str(e)}), 500 # Return the specific error message except Exception as e: # Catch any other unexpected errors logging.error(f"Unexpected speak endpoint error for note {notes_id}: {traceback.format_exc()}") # Return a generic error message to the client for unknown errors return jsonify({'error': 'An unexpected error occurred during audio generation.'}), 500 # New endpoint to view existing audio URL @app.route('/api/tutor/notes//audio', methods=['GET']) def get_note_audio(notes_id): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] try: notes_res = supabase.table('notes').select('tts_audio_url, user_id').eq('id', notes_id).single().execute() if not notes_res.data: return jsonify({'error': 'Notes not found'}), 404 if notes_res.data['user_id'] != user.id: return jsonify({'error': 'Unauthorized access'}), 403 if not notes_res.data['tts_audio_url']: return jsonify({'error': 'No audio available for these notes'}), 404 return jsonify({ 'success': True, 'audio_url': notes_res.data['tts_audio_url'] }) except Exception as e: logging.error(f"Error getting audio URL: {str(e)}") return jsonify({'error': str(e)}), 500 # ---------- View Notes and Quizzes Endpoints ---------- @app.route('/api/view/notes', methods=['GET']) def view_notes(): try: # Authentication user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] # Query with proper error handling query = supabase.table('notes') \ .select('id, content, created_at, tts_audio_url, study_materials(title, type)') \ .eq('user_id', user.id) \ .order('created_at', desc=True) result = query.execute() if hasattr(result, 'error') and result.error: raise Exception(result.error.message) # Format response to match frontend expectations notes = [] for note in result.data: notes.append({ "note_id": note['id'], "content": note['content'], "audio_url": note['tts_audio_url'], "created_at": note['created_at'], "material_title": note['study_materials']['title'] if note['study_materials'] else "Untitled Note", "material_type": note['study_materials']['type'] if note['study_materials'] else None }) return jsonify({"notes": notes}) # Changed from "notes" to match frontend except Exception as e: print(f"Error in /api/view/notes: {str(e)}") # Debug logging logging.error(f"Notes endpoint error: {str(e)}") logging.error(traceback.format_exc()) return jsonify({'error': str(e)}), 500 @app.route('/api/view/quizzes', methods=['GET']) def view_quizzes(): try: # Authentication user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] # Query with proper error handling query = supabase.table('quizzes') \ .select('id, difficulty, created_at, notes(content, study_materials(title, type))') \ .eq('user_id', user.id) \ .order('created_at', desc=True) result = query.execute() if hasattr(result, 'error') and result.error: raise Exception(result.error.message) # Format response to match frontend expectations quizzes = [] for quiz in result.data: quizzes.append({ "quiz_id": quiz['id'], "difficulty": quiz['difficulty'], "created_at": quiz['created_at'], "notes_preview": quiz['notes']['content'][:100] + "..." if quiz['notes'] and quiz['notes']['content'] else None, "material_title": quiz['notes']['study_materials']['title'] if quiz['notes'] and quiz['notes']['study_materials'] else "Untitled Quiz", "material_type": quiz['notes']['study_materials']['type'] if quiz['notes'] and quiz['notes']['study_materials'] else None }) return jsonify({"quizzes": quizzes}) except Exception as e: print(f"Error in /api/view/quizzes: {str(e)}") # Debug logging logging.error(f"Quizzes endpoint error: {str(e)}") logging.error(traceback.format_exc()) return jsonify({'error': str(e)}), 500 @app.route('/api/user/performance', methods=['GET']) def get_user_performance(): """Retrieves user's quiz performance and provides simple suggestions.""" try: # --- Authentication --- user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] # --- Query Attempts with Proper Error Handling --- attempts_res = supabase.table('quiz_attempts') \ .select('id, quiz_id, score, submitted_at, quizzes(id, difficulty, created_at, notes(study_materials(title)))') \ .eq('user_id', user.id) \ .order('submitted_at', desc=True) \ .execute() if hasattr(attempts_res, 'error') and attempts_res.error: raise Exception(attempts_res.error.message) attempts_data = attempts_res.data # --- Group Attempts by Quiz --- quizzes = {} for attempt in attempts_data: quiz_id = attempt['quizzes']['id'] if quiz_id not in quizzes: quizzes[quiz_id] = { 'quiz_info': { 'id': quiz_id, 'title': attempt['quizzes']['notes']['study_materials']['title'], 'difficulty': attempt['quizzes']['difficulty'], 'created_at': attempt['quizzes']['created_at'] }, 'attempts': [] } quizzes[quiz_id]['attempts'].append(attempt) # --- Calculate Averages --- performance_data = [] overall_scores = [] for quiz_id, quiz_data in quizzes.items(): scores = [a['score'] for a in quiz_data['attempts']] avg_score = sum(scores) / len(scores) if scores else 0 overall_scores.extend(scores) performance_data.append({ **quiz_data, 'average_score': avg_score, 'attempt_count': len(scores) }) # --- Calculate Overall Average --- average_score = sum(overall_scores) / len(overall_scores) if overall_scores else 0 # --- Generate Suggestions --- suggestions = [] if performance_data: if average_score < 60: suggestions.append("Your average score is a bit low. Try reviewing the notes more thoroughly before taking quizzes.") # Find lowest scoring quiz weakest_quiz = min(performance_data, key=lambda x: x['average_score']) suggestions.append(f"Focus on: '{weakest_quiz['quiz_info']['title']}' (current average: {weakest_quiz['average_score']:.0f}%)") elif average_score > 85: suggestions.append("Great job! Try some 'hard' difficulty quizzes.") else: suggestions.append("You're making good progress! Keep practicing.") return jsonify({ 'success': True, 'average_score': round(average_score, 2), 'quizzes': performance_data, # Changed from recent_attempts to quizzes 'suggestions': suggestions }) except Exception as e: logging.error(f"Performance endpoint error: {str(e)}") logging.error(traceback.format_exc()) return jsonify({'error': 'Internal server error'}), 500 def generate_suggestions(quizzes, overall_avg): """Generate personalized suggestions based on quiz performance""" suggestions = [] if overall_avg < 60: suggestions.append("Your average score is a bit low. Try reviewing notes before retaking quizzes.") elif overall_avg > 85: suggestions.append("Great job! Challenge yourself with harder difficulty levels.") else: suggestions.append("Keep practicing! Focus on your weaker areas for improvement.") # Find weakest quiz weakest = min(quizzes, key=lambda x: x['average_score'], default=None) if weakest and weakest['average_score'] < 60: title = weakest['quiz_info']['title'] or "your recent quizzes" suggestions.append(f"Focus on improving in '{title}' (current average: {weakest['average_score']:.0f}%).") # Check for difficulty distribution difficulty_count = {} for quiz in quizzes: diff = quiz['quiz_info']['difficulty'] difficulty_count[diff] = difficulty_count.get(diff, 0) + 1 if difficulty_count.get('easy', 0) / len(quizzes) > 0.7: suggestions.append("Try more medium difficulty quizzes to push your skills!") return suggestions # === Admin Endpoints (Adapted for Supabase) === @app.route('/api/admin/users', methods=['GET']) def admin_list_users(): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] is_admin, admin_error = verify_admin(user) if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] try: # Fetch all profiles (which are linked 1-1 with auth users) profiles_res = supabase.table('profiles').select('*').execute() return jsonify({'users': profiles_res.data}), 200 except Exception as e: logging.error(f"Admin list users error: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/admin/users//suspend', methods=['PUT']) def admin_suspend_user(target_user_id): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] is_admin, admin_error = verify_admin(user) if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] try: data = request.get_json() action = data.get('action') # "suspend" or "unsuspend" if action not in ["suspend", "unsuspend"]: return jsonify({'error': 'action must be "suspend" or "unsuspend"'}), 400 should_suspend = (action == "suspend") # Update the 'suspended' flag in the profiles table update_res = supabase.table('profiles').update({'suspended': should_suspend}).eq('id', target_user_id).execute() if not update_res.data: # This could mean the user ID doesn't exist or there was another issue # Check if user exists first user_check = supabase.table('profiles').select('id').eq('id', target_user_id).maybe_single().execute() if not user_check.data: return jsonify({'error': 'User not found'}), 404 else: raise Exception(f"Failed to update suspension status: {update_res.error}") return jsonify({'success': True, 'message': f'User {target_user_id} suspension status set to {should_suspend}'}), 200 except Exception as e: logging.error(f"Admin suspend user error: {e}") return jsonify({'error': str(e)}), 500 # Add other admin endpoints (update credits, view specific data) similarly, # === Credit Management Endpoints === @app.route('/api/user/credits/request', methods=['POST']) def request_credits(): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] try: data = request.get_json() amount = data.get('amount') note = data.get('note', '') if not amount or not isinstance(amount, int) or amount <= 0: return jsonify({'error': 'Invalid amount (must be positive integer)'}), 400 res = supabase.table('credit_requests').insert({ 'user_id': user.id, 'amount': amount, 'status': 'pending', 'note': note, 'created_at': datetime.now().isoformat() }).execute() return jsonify({ 'success': True, 'request_id': res.data[0]['id'] }), 201 except Exception as e: logging.error(f"Credit request failed for user {user.id}: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/admin/credit-requests', methods=['GET']) def admin_get_credit_requests(): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] is_admin, admin_error = verify_admin(user) if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] try: status = request.args.get('status', 'pending') res = supabase.table('credit_requests').select('*').eq('status', status).execute() return jsonify(res.data), 200 except Exception as e: logging.error(f"Admin credit requests fetch failed: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/admin/credit-requests/', methods=['PUT']) def admin_review_credit_request(request_id): user, error = verify_token(request.headers.get('Authorization')) if error: return jsonify({'error': error['error']}), error['status'] is_admin, admin_error = verify_admin(user) if admin_error: return jsonify({'error': admin_error['error']}), admin_error['status'] try: data = request.get_json() action = data.get('action') admin_note = data.get('note', '') if action not in ['approve', 'decline']: return jsonify({'error': 'Invalid action'}), 400 req_res = supabase.table('credit_requests').select('*').eq('id', request_id).maybe_single().execute() if not req_res.data: return jsonify({'error': 'Request not found'}), 404 req = req_res.data if req['status'] != 'pending': return jsonify({'error': 'Request already processed'}), 400 update_data = { 'status': 'approved' if action == 'approve' else 'declined', 'reviewed_at': datetime.now().isoformat(), 'reviewed_by': user.id, 'admin_note': admin_note } if action == 'approve': supabase.table('profiles').update( {'credits': supabase.table('profiles').credits + req['amount']} ).eq('id', req['user_id']).execute() supabase.table('credit_requests').update(update_data).eq('id', request_id).execute() return jsonify({'success': True}), 200 except Exception as e: logging.error(f"Credit request processing failed: {e}") return jsonify({'error': str(e)}), 500 # === Main Execution === if __name__ == '__main__': if not all([SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY]): print("WARNING: One or more essential environment variables (SUPABASE_URL, SUPABASE_SERVICE_KEY, GEMINI_API_KEY, ELEVENLABS_API_KEY) are missing!") print("Starting Flask server for AI Tutor...") # Use Gunicorn or Waitress for production instead of app.run(debug=True) app.run(debug=True, host="0.0.0.0", port=7860)