|
|
import os |
|
|
import io |
|
|
import uuid |
|
|
import re |
|
|
import json |
|
|
import time |
|
|
import math |
|
|
import traceback |
|
|
from datetime import datetime, timedelta |
|
|
import logging |
|
|
import random |
|
|
import wave |
|
|
|
|
|
from flask import Flask, request, jsonify, Response |
|
|
from flask_cors import CORS |
|
|
|
|
|
import firebase_admin |
|
|
from firebase_admin import credentials, db, storage, auth |
|
|
|
|
|
from PIL import Image |
|
|
from io import BytesIO |
|
|
|
|
|
import requests |
|
|
|
|
|
from google import genai |
|
|
from google.genai import types |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
CORS(app) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
try: |
|
|
credentials_json_string = os.environ.get("FIREBASE") |
|
|
if not credentials_json_string: |
|
|
raise ValueError("The FIREBASE environment variable is not set.") |
|
|
|
|
|
credentials_json = json.loads(credentials_json_string) |
|
|
firebase_db_url = os.environ.get("Firebase_DB") |
|
|
firebase_storage_bucket = os.environ.get("Firebase_Storage") |
|
|
|
|
|
if not firebase_db_url or not firebase_storage_bucket: |
|
|
raise ValueError( |
|
|
"Firebase_DB and Firebase_Storage environment variables must be set." |
|
|
) |
|
|
|
|
|
cred = credentials.Certificate(credentials_json) |
|
|
firebase_admin.initialize_app( |
|
|
cred, |
|
|
{"databaseURL": firebase_db_url, "storageBucket": firebase_storage_bucket}, |
|
|
) |
|
|
logger.info("Firebase Admin SDK initialized successfully.") |
|
|
except Exception as e: |
|
|
logger.error(f"FATAL: Error initializing Firebase: {e}") |
|
|
raise SystemExit(1) |
|
|
|
|
|
bucket = storage.bucket() |
|
|
db_ref = db.reference() |
|
|
|
|
|
|
|
|
try: |
|
|
api_key = os.environ.get("Gemini") |
|
|
if not api_key: |
|
|
raise ValueError("The 'Gemini' environment variable for the API key is not set.") |
|
|
client = genai.Client(api_key=api_key) |
|
|
logger.info("Google GenAI Client initialized successfully.") |
|
|
except Exception as e: |
|
|
logger.error(f"FATAL: Error initializing GenAI Client: {e}") |
|
|
raise SystemExit(1) |
|
|
|
|
|
|
|
|
TEXT_MODEL = "gemini-2.5-flash" |
|
|
MULTIMODAL_MODEL = "gemini-2.5-flash" |
|
|
|
|
|
|
|
|
|
|
|
DEEPGRAM_API_KEY = os.environ.get("DEEPGRAM_API_KEY") |
|
|
|
|
|
|
|
|
ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY") |
|
|
ELEVENLABS_AGENT_ID = os.environ.get("ELEVENLABS_AGENT_ID") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def verify_token(auth_header): |
|
|
"""Verifies the Firebase ID token from the Authorization header.""" |
|
|
if not auth_header or not auth_header.startswith("Bearer "): |
|
|
return None |
|
|
token = auth_header.split("Bearer ")[1] |
|
|
try: |
|
|
decoded_token = auth.verify_id_token(token) |
|
|
return decoded_token["uid"] |
|
|
except Exception as e: |
|
|
logger.warning(f"Token verification failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def upload_to_storage(data_bytes, destination_blob_name, content_type): |
|
|
"""Uploads bytes to Firebase Storage and returns its public URL.""" |
|
|
blob = bucket.blob(destination_blob_name) |
|
|
blob.upload_from_string(data_bytes, content_type=content_type) |
|
|
blob.make_public() |
|
|
return blob.public_url |
|
|
|
|
|
|
|
|
def parse_numbered_steps(text): |
|
|
""" |
|
|
Extracts numbered steps from text like: |
|
|
1. Do this |
|
|
2. Do that |
|
|
Returns list of { stepNumber, text }. |
|
|
""" |
|
|
text = "\n" + text |
|
|
steps_found = re.findall(r"\n\s*(\d+)\.\s*(.*)", text, re.MULTILINE) |
|
|
return [{"stepNumber": int(num), "text": desc.strip()} for num, desc in steps_found] |
|
|
|
|
|
|
|
|
def _convert_pcm_to_wav(pcm_data, sample_rate=24000, channels=1, sample_width=2): |
|
|
"""Wrap raw PCM audio data in a WAV container in memory.""" |
|
|
audio_buffer = io.BytesIO() |
|
|
with wave.open(audio_buffer, "wb") as wf: |
|
|
wf.setnchannels(channels) |
|
|
wf.setsampwidth(sample_width) |
|
|
wf.setframerate(sample_rate) |
|
|
wf.writeframes(pcm_data) |
|
|
audio_buffer.seek(0) |
|
|
return audio_buffer.getvalue() |
|
|
|
|
|
|
|
|
def generate_tts_audio_and_upload(text_to_speak, uid, context_id, step_num): |
|
|
""" |
|
|
Generates audio using Deepgram TTS API and uploads it to Firebase Storage. |
|
|
context_id can be quizId, sessionId, etc. |
|
|
""" |
|
|
if not DEEPGRAM_API_KEY: |
|
|
logger.warning("DEEPGRAM_API_KEY not set, skipping TTS.") |
|
|
return None |
|
|
try: |
|
|
DEEPGRAM_URL = "https://api.deepgram.com/v1/speak?model=aura-2-draco-en" |
|
|
headers = { |
|
|
"Authorization": f"Token {DEEPGRAM_API_KEY}", |
|
|
"Content-Type": "text/plain", |
|
|
} |
|
|
response = requests.post(DEEPGRAM_URL, headers=headers, data=text_to_speak.encode("utf-8")) |
|
|
response.raise_for_status() |
|
|
audio_data = response.content |
|
|
|
|
|
audio_path = f"users/{uid}/tts/{context_id}/step_{step_num}.mp3" |
|
|
narration_url = upload_to_storage(audio_data, audio_path, "audio/mpeg") |
|
|
return narration_url |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"Deepgram API error for step {step_num}: {e}") |
|
|
if e.response is not None: |
|
|
logger.error(f"Deepgram error response: {e.response.text}") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected TTS error for step {step_num}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def send_gemini_text(prompt: str) -> str: |
|
|
"""Simple text-only call to Gemini.""" |
|
|
try: |
|
|
resp = client.models.generate_content(model=TEXT_MODEL, contents=[prompt]) |
|
|
return (resp.text or "").strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Gemini text call failed: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
def send_gemini_multimodal(prompt: str, pil_image: Image.Image) -> str: |
|
|
"""Send text + image to Gemini and return text response.""" |
|
|
try: |
|
|
chat = client.chats.create(model=MULTIMODAL_MODEL) |
|
|
resp = chat.send_message([prompt, pil_image]) |
|
|
parts = resp.candidates[0].content.parts |
|
|
txt = "".join(getattr(p, "text", "") or "" for p in parts) |
|
|
return txt.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Gemini multimodal call failed: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
def get_user(uid: str): |
|
|
"""Get user profile from RTDB.""" |
|
|
return db_ref.child(f"users/{uid}").get() or {} |
|
|
|
|
|
|
|
|
def require_role(uid: str, allowed_roles): |
|
|
"""Check that user has one of the allowed roles.""" |
|
|
user_data = get_user(uid) |
|
|
role = user_data.get("role") |
|
|
if role not in allowed_roles: |
|
|
raise PermissionError(f"User role '{role}' not allowed for this action.") |
|
|
return user_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CORE_SUBJECTS = [ |
|
|
|
|
|
{"id": "g7_eng", "name": "English", "level": "Grade7", "examBoard": "ZIMSEC"}, |
|
|
{"id": "g7_math", "name": "Mathematics", "level": "Grade7", "examBoard": "ZIMSEC"}, |
|
|
{"id": "g7_sci", "name": "General Science", "level": "Grade7", "examBoard": "ZIMSEC"}, |
|
|
{"id": "g7_shona", "name": "Shona", "level": "Grade7", "examBoard": "ZIMSEC"}, |
|
|
|
|
|
{"id": "ol_eng", "name": "English Language", "level": "O", "examBoard": "ZIMSEC"}, |
|
|
{"id": "ol_math", "name": "Mathematics", "level": "O", "examBoard": "ZIMSEC"}, |
|
|
{"id": "ol_sci", "name": "Combined Science", "level": "O", "examBoard": "ZIMSEC"}, |
|
|
{"id": "ol_hist", "name": "History", "level": "O", "examBoard": "ZIMSEC"}, |
|
|
|
|
|
{"id": "al_math", "name": "Mathematics", "level": "A", "examBoard": "ZIMSEC"}, |
|
|
{"id": "al_phy", "name": "Physics", "level": "A", "examBoard": "ZIMSEC"}, |
|
|
{"id": "al_acc", "name": "Accounting", "level": "A", "examBoard": "ZIMSEC"}, |
|
|
] |
|
|
|
|
|
CORE_TOPICS = { |
|
|
"g7_math": [ |
|
|
"Whole Numbers", |
|
|
"Fractions", |
|
|
"Decimals", |
|
|
"Percentages", |
|
|
"Measurement", |
|
|
"Geometry", |
|
|
"Word Problems", |
|
|
], |
|
|
"ol_math": [ |
|
|
"Number Theory", |
|
|
"Algebra", |
|
|
"Functions", |
|
|
"Sequences & Series", |
|
|
"Coordinate Geometry", |
|
|
"Trigonometry", |
|
|
"Statistics", |
|
|
"Vectors", |
|
|
], |
|
|
"al_math": [ |
|
|
"Functions & Graphs", |
|
|
"Calculus – Differentiation", |
|
|
"Calculus – Integration", |
|
|
"Complex Numbers", |
|
|
"Matrices", |
|
|
"Probability & Statistics", |
|
|
], |
|
|
"ol_eng": ["Comprehension", "Summary", "Composition", "Language Structures"], |
|
|
"g7_eng": ["Reading Comprehension", "Vocabulary", "Grammar", "Writing"], |
|
|
"ol_sci": ["Matter", "Energy", "Forces", "Electricity", "Ecology"], |
|
|
"g7_sci": ["Living Things", "Weather", "Materials", "Human Body"], |
|
|
} |
|
|
|
|
|
|
|
|
def ensure_core_subjects_and_topics(): |
|
|
"""Seed RTDB with core subjects and topics if they don't exist.""" |
|
|
logger.info("Ensuring core subjects and topics exist...") |
|
|
subjects_ref = db_ref.child("subjects") |
|
|
existing_subjects = subjects_ref.get() or {} |
|
|
existing_ids = set(existing_subjects.keys()) |
|
|
|
|
|
for subj in CORE_SUBJECTS: |
|
|
sid = subj["id"] |
|
|
if sid not in existing_ids: |
|
|
subjects_ref.child(sid).set( |
|
|
{ |
|
|
"subjectId": sid, |
|
|
"name": subj["name"], |
|
|
"level": subj["level"], |
|
|
"examBoard": subj["examBoard"], |
|
|
"isCore": True, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
logger.info(f"Seeded subject {sid} – {subj['name']}") |
|
|
|
|
|
topics_ref = db_ref.child("topics") |
|
|
existing_topics = topics_ref.get() or {} |
|
|
|
|
|
for sid, topics in CORE_TOPICS.items(): |
|
|
for t_name in topics: |
|
|
topic_key = f"{sid}_{t_name.replace(' ', '_').lower()}" |
|
|
if topic_key in existing_topics: |
|
|
continue |
|
|
topics_ref.child(topic_key).set( |
|
|
{ |
|
|
"topicId": topic_key, |
|
|
"subjectId": sid, |
|
|
"name": t_name, |
|
|
"isCore": True, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
) |
|
|
logger.info(f"Seeded topic {topic_key} – {t_name}") |
|
|
|
|
|
logger.info("Core curriculum seeding complete.") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
ensure_core_subjects_and_topics() |
|
|
except Exception as e: |
|
|
logger.error(f"Error in curriculum seeding: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/api/auth/signup", methods=["POST"]) |
|
|
def signup(): |
|
|
""" |
|
|
Email/password sign-up. |
|
|
Fields: |
|
|
email, password, displayName, role, grade, school, examBoard |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() or {} |
|
|
email = data.get("email") |
|
|
password = data.get("password") |
|
|
display_name = data.get("displayName") |
|
|
role = data.get("role", "learner") |
|
|
grade = data.get("grade") |
|
|
school = data.get("school") |
|
|
exam_board = data.get("examBoard", "ZIMSEC") |
|
|
|
|
|
if not email or not password: |
|
|
return jsonify({"error": "Email and password are required"}), 400 |
|
|
|
|
|
user = auth.create_user(email=email, password=password, display_name=display_name) |
|
|
|
|
|
user_doc = { |
|
|
"email": email, |
|
|
"displayName": display_name, |
|
|
"role": role, |
|
|
"grade": grade, |
|
|
"school": school, |
|
|
"examBoard": exam_board, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"users/{user.uid}").set(user_doc) |
|
|
|
|
|
logger.info(f"New user: {user.uid}, role={role}") |
|
|
return jsonify({"uid": user.uid, **user_doc}), 201 |
|
|
except Exception as e: |
|
|
logger.error(f"Signup failed: {e}") |
|
|
if "EMAIL_EXISTS" in str(e): |
|
|
return jsonify({"error": "An account with this email already exists."}), 409 |
|
|
return jsonify({"error": str(e)}), 400 |
|
|
|
|
|
|
|
|
@app.route("/api/auth/social-signin", methods=["POST"]) |
|
|
def social_signin(): |
|
|
""" |
|
|
Ensures a user profile exists for social-auth users. |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Invalid or expired token"}), 401 |
|
|
|
|
|
user_ref = db_ref.child(f"users/{uid}") |
|
|
user_data = user_ref.get() |
|
|
|
|
|
if user_data: |
|
|
|
|
|
if not user_data.get("displayName"): |
|
|
try: |
|
|
fb_user = auth.get_user(uid) |
|
|
if fb_user.display_name: |
|
|
user_ref.update({"displayName": fb_user.display_name}) |
|
|
user_data = user_ref.get() |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to backfill displayName for {uid}: {e}") |
|
|
return jsonify({"uid": uid, **user_data}), 200 |
|
|
|
|
|
|
|
|
try: |
|
|
fb_user = auth.get_user(uid) |
|
|
new_user_data = { |
|
|
"email": fb_user.email, |
|
|
"displayName": fb_user.display_name, |
|
|
"role": "learner", |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
"examBoard": "ZIMSEC", |
|
|
} |
|
|
user_ref.set(new_user_data) |
|
|
logger.info(f"Created profile for new social user: {uid}") |
|
|
return jsonify({"uid": uid, **new_user_data}), 201 |
|
|
except Exception as e: |
|
|
logger.error(f"Error creating profile for social user {uid}: {e}") |
|
|
return jsonify({"error": "Failed to create user profile"}), 500 |
|
|
|
|
|
|
|
|
@app.route("/api/user/profile", methods=["GET"]) |
|
|
def get_user_profile(): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Invalid or expired token"}), 401 |
|
|
user_data = get_user(uid) |
|
|
if not user_data: |
|
|
return jsonify({"error": "User not found"}), 404 |
|
|
return jsonify({"uid": uid, **user_data}) |
|
|
|
|
|
|
|
|
@app.route("/api/user/profile", methods=["PUT"]) |
|
|
def update_user_profile(): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Invalid or expired token"}), 401 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
allowed_fields = ["displayName", "grade", "school", "examBoard", "role"] |
|
|
updates = {k: v for k, v in data.items() if k in allowed_fields and v is not None} |
|
|
if not updates: |
|
|
return jsonify({"error": "No valid fields to update."}), 400 |
|
|
|
|
|
try: |
|
|
if "displayName" in updates: |
|
|
auth.update_user(uid, display_name=updates["displayName"]) |
|
|
db_ref.child(f"users/{uid}").update(updates) |
|
|
logger.info(f"User {uid} updated profile fields: {list(updates.keys())}") |
|
|
return jsonify({"success": True}), 200 |
|
|
except Exception as e: |
|
|
logger.error(f"Profile update error for {uid}: {e}") |
|
|
return jsonify({"error": "Failed to update profile"}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/api/learn/subjects", methods=["GET"]) |
|
|
def list_subjects(): |
|
|
"""Public learner-facing subjects listing.""" |
|
|
level = request.args.get("level") |
|
|
subjects = db_ref.child("subjects").get() or {} |
|
|
result = [] |
|
|
for sid, data in subjects.items(): |
|
|
if level and data.get("level") != level: |
|
|
continue |
|
|
result.append(data) |
|
|
return jsonify(result) |
|
|
|
|
|
|
|
|
@app.route("/api/learn/topics", methods=["GET"]) |
|
|
def list_topics(): |
|
|
subject_id = request.args.get("subjectId") |
|
|
topics = db_ref.child("topics").get() or {} |
|
|
result = [] |
|
|
for tid, data in topics.items(): |
|
|
if subject_id and data.get("subjectId") != subject_id: |
|
|
continue |
|
|
result.append(data) |
|
|
return jsonify(result) |
|
|
|
|
|
|
|
|
@app.route("/api/admin/subjects", methods=["POST"]) |
|
|
def admin_create_subject(): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["admin"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
name = data.get("name") |
|
|
level = data.get("level") |
|
|
exam_board = data.get("examBoard", "ZIMSEC") |
|
|
if not name or not level: |
|
|
return jsonify({"error": "name and level are required"}), 400 |
|
|
|
|
|
subject_id = data.get("subjectId") or f"{level.lower()}_{uuid.uuid4().hex[:8]}" |
|
|
subject_doc = { |
|
|
"subjectId": subject_id, |
|
|
"name": name, |
|
|
"level": level, |
|
|
"examBoard": exam_board, |
|
|
"isCore": False, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"subjects/{subject_id}").set(subject_doc) |
|
|
return jsonify(subject_doc), 201 |
|
|
|
|
|
|
|
|
@app.route("/api/admin/topics", methods=["POST"]) |
|
|
def admin_create_topic(): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["admin", "teacher"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
name = data.get("name") |
|
|
subject_id = data.get("subjectId") |
|
|
if not name or not subject_id: |
|
|
return jsonify({"error": "name and subjectId are required"}), 400 |
|
|
|
|
|
topic_id = data.get("topicId") or f"{subject_id}_{uuid.uuid4().hex[:8]}" |
|
|
topic_doc = { |
|
|
"topicId": topic_id, |
|
|
"subjectId": subject_id, |
|
|
"name": name, |
|
|
"isCore": False, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"topics/{topic_id}").set(topic_doc) |
|
|
return jsonify(topic_doc), 201 |
|
|
|
|
|
|
|
|
@app.route("/api/admin/papers", methods=["POST"]) |
|
|
def admin_upload_paper(): |
|
|
""" |
|
|
Admin/teacher uploads past exam paper PDF. |
|
|
Form-data: |
|
|
subjectId, level, year, session, file |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["admin", "teacher"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
if "file" not in request.files: |
|
|
return jsonify({"error": "file is required"}), 400 |
|
|
file = request.files["file"] |
|
|
subject_id = request.form.get("subjectId") |
|
|
level = request.form.get("level") |
|
|
year = request.form.get("year") |
|
|
session = request.form.get("session") |
|
|
|
|
|
if not subject_id or not level or not year: |
|
|
return jsonify({"error": "subjectId, level, year required"}), 400 |
|
|
|
|
|
paper_id = f"{subject_id}_{year}_{uuid.uuid4().hex[:6]}" |
|
|
path = f"papers/{subject_id}/{paper_id}.pdf" |
|
|
file_bytes = file.read() |
|
|
url = upload_to_storage(file_bytes, path, "application/pdf") |
|
|
|
|
|
paper_doc = { |
|
|
"paperId": paper_id, |
|
|
"subjectId": subject_id, |
|
|
"level": level, |
|
|
"year": year, |
|
|
"session": session, |
|
|
"fileUrl": url, |
|
|
"uploadedBy": uid, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"papers/{paper_id}").set(paper_doc) |
|
|
return jsonify(paper_doc), 201 |
|
|
|
|
|
|
|
|
@app.route("/api/learn/papers", methods=["GET"]) |
|
|
def list_papers(): |
|
|
subject_id = request.args.get("subjectId") |
|
|
level = request.args.get("level") |
|
|
papers = db_ref.child("papers").get() or {} |
|
|
result = [] |
|
|
for pid, data in papers.items(): |
|
|
if subject_id and data.get("subjectId") != subject_id: |
|
|
continue |
|
|
if level and data.get("level") != level: |
|
|
continue |
|
|
result.append(data) |
|
|
return jsonify(result) |
|
|
|
|
|
|
|
|
@app.route("/api/admin/questions", methods=["POST"]) |
|
|
def admin_create_question(): |
|
|
""" |
|
|
Create a question manually. |
|
|
Body: |
|
|
subjectId, topicId, level, type (mcq/short/structured) |
|
|
stem, options[], answer, markScheme, difficulty, paperId(optional) |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["admin", "teacher"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
subject_id = data.get("subjectId") |
|
|
topic_id = data.get("topicId") |
|
|
q_type = data.get("type") |
|
|
stem = data.get("stem") |
|
|
if not subject_id or not topic_id or not q_type or not stem: |
|
|
return jsonify({"error": "subjectId, topicId, type, stem required"}), 400 |
|
|
|
|
|
question_id = data.get("questionId") or uuid.uuid4().hex |
|
|
question_doc = { |
|
|
"questionId": question_id, |
|
|
"subjectId": subject_id, |
|
|
"topicId": topic_id, |
|
|
"level": data.get("level"), |
|
|
"type": q_type, |
|
|
"stem": stem, |
|
|
"options": data.get("options") or [], |
|
|
"answer": data.get("answer"), |
|
|
"markScheme": data.get("markScheme"), |
|
|
"difficulty": data.get("difficulty", "medium"), |
|
|
"paperId": data.get("paperId"), |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
"createdBy": uid, |
|
|
} |
|
|
db_ref.child(f"questions/{question_id}").set(question_doc) |
|
|
return jsonify(question_doc), 201 |
|
|
|
|
|
|
|
|
@app.route("/api/learn/questions", methods=["GET"]) |
|
|
def learner_list_questions(): |
|
|
"""Optional: learners can browse questions for free-form practice.""" |
|
|
subject_id = request.args.get("subjectId") |
|
|
topic_id = request.args.get("topicId") |
|
|
level = request.args.get("level") |
|
|
|
|
|
questions = db_ref.child("questions").get() or {} |
|
|
result = [] |
|
|
for qid, q in questions.items(): |
|
|
if subject_id and q.get("subjectId") != subject_id: |
|
|
continue |
|
|
if topic_id and q.get("topicId") != topic_id: |
|
|
continue |
|
|
if level and q.get("level") != level: |
|
|
continue |
|
|
result.append(q) |
|
|
return jsonify(result) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _select_questions(subject_id, topic_ids, level, paper_id, num_questions): |
|
|
"""Selects questions; falls back to all matching questions.""" |
|
|
all_questions = db_ref.child("questions").get() or {} |
|
|
pool = [] |
|
|
for _, q in all_questions.items(): |
|
|
if subject_id and q.get("subjectId") != subject_id: |
|
|
continue |
|
|
if level and q.get("level") != level: |
|
|
continue |
|
|
if paper_id and q.get("paperId") != paper_id: |
|
|
continue |
|
|
if topic_ids and q.get("topicId") not in topic_ids: |
|
|
continue |
|
|
pool.append(q) |
|
|
|
|
|
if not pool: |
|
|
return [] |
|
|
|
|
|
if len(pool) <= num_questions: |
|
|
return pool |
|
|
return random.sample(pool, num_questions) |
|
|
|
|
|
|
|
|
def _generate_ai_questions(subject_name, topic_name, num_questions): |
|
|
""" |
|
|
AI fallback: generate questions on the fly when bank is empty. |
|
|
Returns list of question docs (not stored unless you decide to). |
|
|
""" |
|
|
|
|
|
prompt = f""" |
|
|
You are a Zimbabwean exam coach preparing practice questions. |
|
|
|
|
|
Subject: {subject_name} |
|
|
Topic: {topic_name} |
|
|
Exam level: Grade 7 / O-Level / A-Level depending on difficulty of topic. |
|
|
|
|
|
Generate {num_questions} practice questions as JSON ONLY, no extra text. |
|
|
Each question must have: |
|
|
- questionId: a short slug (you can invent) |
|
|
- type: "mcq" or "short" |
|
|
- stem: the main question text |
|
|
- options: array of 4 options for mcq, [] for short |
|
|
- answer: the correct answer (string) |
|
|
- difficulty: "easy", "medium", or "hard" |
|
|
|
|
|
Return a JSON object: |
|
|
{{ "questions": [ ... ] }} |
|
|
""" |
|
|
|
|
|
|
|
|
raw = send_gemini_text(prompt) |
|
|
|
|
|
try: |
|
|
|
|
|
raw_clean = raw.replace("```json", "").replace("```", "").strip() |
|
|
data = json.loads(raw_clean) |
|
|
return data.get("questions", []) |
|
|
except Exception: |
|
|
logger.error("Failed to parse AI-generated questions, raw response:") |
|
|
logger.error(raw) |
|
|
return [] |
|
|
|
|
|
|
|
|
@app.route("/api/learn/start-quiz", methods=["POST"]) |
|
|
def start_quiz(): |
|
|
""" |
|
|
Body: |
|
|
subjectId (required) |
|
|
level (optional) |
|
|
topicIds: [] (optional) |
|
|
paperId (optional, for past paper mode) |
|
|
numQuestions (default 10) |
|
|
Returns: |
|
|
quizId, questions (without correct answers) |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
subject_id = data.get("subjectId") |
|
|
level = data.get("level") |
|
|
topic_ids = data.get("topicIds") or [] |
|
|
paper_id = data.get("paperId") |
|
|
num_questions = int(data.get("numQuestions") or 10) |
|
|
|
|
|
if not subject_id: |
|
|
return jsonify({"error": "subjectId required"}), 400 |
|
|
|
|
|
subject_doc = db_ref.child(f"subjects/{subject_id}").get() or {} |
|
|
subject_name = subject_doc.get("name", subject_id) |
|
|
|
|
|
|
|
|
questions = _select_questions(subject_id, topic_ids, level, paper_id, num_questions) |
|
|
|
|
|
|
|
|
if not questions: |
|
|
topic_name = None |
|
|
if topic_ids: |
|
|
topic_doc = db_ref.child(f"topics/{topic_ids[0]}").get() or {} |
|
|
topic_name = topic_doc.get("name") |
|
|
topic_name = topic_name or "General Revision" |
|
|
logger.info( |
|
|
f"No static questions found for {subject_id}/{topic_ids}, using AI fallback." |
|
|
) |
|
|
ai_questions = _generate_ai_questions(subject_name, topic_name, num_questions) |
|
|
|
|
|
questions = [] |
|
|
for q in ai_questions: |
|
|
q_id = q.get("questionId") or uuid.uuid4().hex |
|
|
questions.append( |
|
|
{ |
|
|
"questionId": q_id, |
|
|
"subjectId": subject_id, |
|
|
"topicId": topic_ids[0] if topic_ids else None, |
|
|
"type": q.get("type", "mcq"), |
|
|
"stem": q.get("stem"), |
|
|
"options": q.get("options") or [], |
|
|
"answer": q.get("answer"), |
|
|
"difficulty": q.get("difficulty", "medium"), |
|
|
"isAiGenerated": True, |
|
|
} |
|
|
) |
|
|
|
|
|
if not questions: |
|
|
return jsonify({"error": "No questions available for this selection"}), 404 |
|
|
|
|
|
|
|
|
quiz_id = uuid.uuid4().hex |
|
|
quiz_questions = [] |
|
|
marking_key = {} |
|
|
|
|
|
for q in questions: |
|
|
qid = q["questionId"] |
|
|
marking_key[qid] = { |
|
|
"answer": q.get("answer"), |
|
|
"topicId": q.get("topicId"), |
|
|
"subjectId": q.get("subjectId"), |
|
|
} |
|
|
q_out = { |
|
|
"questionId": qid, |
|
|
"subjectId": q.get("subjectId"), |
|
|
"topicId": q.get("topicId"), |
|
|
"type": q.get("type"), |
|
|
"stem": q.get("stem"), |
|
|
"options": q.get("options") or [], |
|
|
"difficulty": q.get("difficulty"), |
|
|
"isAiGenerated": q.get("isAiGenerated", False), |
|
|
} |
|
|
quiz_questions.append(q_out) |
|
|
|
|
|
quiz_doc = { |
|
|
"quizId": quiz_id, |
|
|
"userId": uid, |
|
|
"subjectId": subject_id, |
|
|
"level": level, |
|
|
"topicIds": topic_ids, |
|
|
"paperId": paper_id, |
|
|
"questions": quiz_questions, |
|
|
"markingKey": marking_key, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
"status": "in_progress", |
|
|
} |
|
|
db_ref.child(f"quizzes/{quiz_id}").set(quiz_doc) |
|
|
return jsonify({"quizId": quiz_id, "questions": quiz_questions}), 201 |
|
|
|
|
|
|
|
|
@app.route("/api/learn/submit-quiz", methods=["POST"]) |
|
|
def submit_quiz(): |
|
|
""" |
|
|
Body: |
|
|
quizId |
|
|
answers: [{questionId, answer}, ...] |
|
|
Returns: |
|
|
score, total, percentage, topicBreakdown, wrongQuestions |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
quiz_id = data.get("quizId") |
|
|
answers = data.get("answers") or [] |
|
|
|
|
|
if not quiz_id: |
|
|
return jsonify({"error": "quizId required"}), 400 |
|
|
|
|
|
quiz = db_ref.child(f"quizzes/{quiz_id}").get() |
|
|
if not quiz or quiz.get("userId") != uid: |
|
|
return jsonify({"error": "Quiz not found or access denied"}), 404 |
|
|
|
|
|
marking_key = quiz.get("markingKey") or {} |
|
|
total = len(marking_key) |
|
|
if total == 0: |
|
|
return jsonify({"error": "Quiz has no questions"}), 400 |
|
|
|
|
|
ans_map = {a["questionId"]: a.get("answer") for a in answers} |
|
|
|
|
|
score = 0 |
|
|
topic_stats = {} |
|
|
wrong_questions = [] |
|
|
|
|
|
for qid, meta in marking_key.items(): |
|
|
correct = str(meta.get("answer")).strip().lower() |
|
|
user_ans = str(ans_map.get(qid, "")).strip().lower() |
|
|
is_correct = correct and user_ans and correct == user_ans |
|
|
if is_correct: |
|
|
score += 1 |
|
|
topic_id = meta.get("topicId") or "unknown" |
|
|
if topic_id not in topic_stats: |
|
|
topic_stats[topic_id] = {"correct": 0, "total": 0} |
|
|
topic_stats[topic_id]["total"] += 1 |
|
|
if is_correct: |
|
|
topic_stats[topic_id]["correct"] += 1 |
|
|
else: |
|
|
wrong_questions.append(qid) |
|
|
|
|
|
percentage = (score / total) * 100.0 |
|
|
|
|
|
|
|
|
attempt_id = uuid.uuid4().hex |
|
|
attempt_doc = { |
|
|
"attemptId": attempt_id, |
|
|
"quizId": quiz_id, |
|
|
"userId": uid, |
|
|
"subjectId": quiz.get("subjectId"), |
|
|
"topicIds": quiz.get("topicIds"), |
|
|
"score": score, |
|
|
"total": total, |
|
|
"percentage": percentage, |
|
|
"topicStats": topic_stats, |
|
|
"answers": ans_map, |
|
|
"wrongQuestions": wrong_questions, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"attempts/{attempt_id}").set(attempt_doc) |
|
|
db_ref.child(f"quizzes/{quiz_id}/status").set("completed") |
|
|
db_ref.child(f"quizzes/{quiz_id}/completedAt").set(datetime.utcnow().isoformat()) |
|
|
|
|
|
|
|
|
topics = db_ref.child("topics").get() or {} |
|
|
topic_breakdown = [] |
|
|
for tid, stats in topic_stats.items(): |
|
|
t_doc = topics.get(tid) or {} |
|
|
topic_breakdown.append( |
|
|
{ |
|
|
"topicId": tid, |
|
|
"topicName": t_doc.get("name", tid), |
|
|
"correct": stats["correct"], |
|
|
"total": stats["total"], |
|
|
"percentage": (stats["correct"] / stats["total"]) * 100.0, |
|
|
} |
|
|
) |
|
|
|
|
|
return jsonify( |
|
|
{ |
|
|
"attemptId": attempt_id, |
|
|
"score": score, |
|
|
"total": total, |
|
|
"percentage": percentage, |
|
|
"topicBreakdown": topic_breakdown, |
|
|
"wrongQuestions": wrong_questions, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/api/ai/explain-question", methods=["POST"]) |
|
|
def ai_explain_question(): |
|
|
""" |
|
|
Body: |
|
|
question: {stem, options?, answer?} |
|
|
userAnswer: string |
|
|
subjectId, topicId (optional) |
|
|
withAudio: bool |
|
|
Returns: |
|
|
explanation, keyConcepts[], narrationUrl? |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
question = data.get("question") or {} |
|
|
user_answer = data.get("userAnswer") |
|
|
with_audio = bool(data.get("withAudio")) |
|
|
|
|
|
stem = question.get("stem") |
|
|
if not stem: |
|
|
return jsonify({"error": "question.stem required"}), 400 |
|
|
|
|
|
correct_answer = question.get("answer") |
|
|
|
|
|
prompt = f""" |
|
|
You are a patient Zimbabwean exam coach for Grade 7, O-Level and A-Level. |
|
|
|
|
|
Question: |
|
|
{stem} |
|
|
|
|
|
Options (if any): |
|
|
{json.dumps(question.get('options') or [], ensure_ascii=False)} |
|
|
|
|
|
Learner's answer: |
|
|
{user_answer} |
|
|
|
|
|
Correct answer (if provided): |
|
|
{correct_answer} |
|
|
|
|
|
1. Briefly state if the learner is correct or not. |
|
|
2. Explain step-by-step how to solve it, in simple language. |
|
|
3. Highlight the key concept(s) they need to understand better. |
|
|
4. End with 1–2 quick checkpoint questions they can answer mentally. |
|
|
|
|
|
Format: |
|
|
EXPLANATION: |
|
|
[paragraphs] |
|
|
|
|
|
KEY_CONCEPTS: |
|
|
- concept 1 |
|
|
- concept 2 |
|
|
|
|
|
CHECKPOINT_QUESTIONS: |
|
|
- q1 |
|
|
- q2 |
|
|
""" |
|
|
text = send_gemini_text(prompt) |
|
|
if not text: |
|
|
return jsonify({"error": "AI explanation failed"}), 500 |
|
|
|
|
|
|
|
|
key_concepts = [] |
|
|
match = re.search(r"KEY_CONCEPTS:(.*?)(CHECKPOINT_QUESTIONS:|$)", text, re.DOTALL) |
|
|
if match: |
|
|
block = match.group(1) |
|
|
lines = [ln.strip("- ").strip() for ln in block.splitlines() if ln.strip()] |
|
|
key_concepts = lines |
|
|
|
|
|
narration_url = None |
|
|
if with_audio: |
|
|
narration_url = generate_tts_audio_and_upload(text, uid, "explain", 1) |
|
|
|
|
|
return jsonify( |
|
|
{ |
|
|
"explanation": text, |
|
|
"keyConcepts": key_concepts, |
|
|
"narrationUrl": narration_url, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
@app.route("/api/ai/generate-followup", methods=["POST"]) |
|
|
def ai_generate_followup(): |
|
|
""" |
|
|
Body: |
|
|
subjectId, topicId, numQuestions |
|
|
Returns: |
|
|
questions[] |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
subject_id = data.get("subjectId") |
|
|
topic_id = data.get("topicId") |
|
|
num_questions = int(data.get("numQuestions") or 5) |
|
|
if not subject_id or not topic_id: |
|
|
return jsonify({"error": "subjectId and topicId required"}), 400 |
|
|
|
|
|
subject_doc = db_ref.child(f"subjects/{subject_id}").get() or {} |
|
|
subject_name = subject_doc.get("name", subject_id) |
|
|
topic_doc = db_ref.child(f"topics/{topic_id}").get() or {} |
|
|
topic_name = topic_doc.get("name", topic_id) |
|
|
|
|
|
questions = _generate_ai_questions(subject_name, topic_name, num_questions) |
|
|
|
|
|
return jsonify({"questions": questions}) |
|
|
|
|
|
|
|
|
@app.route("/api/ai/analyse-work", methods=["POST"]) |
|
|
def ai_analyse_work(): |
|
|
""" |
|
|
Learner uploads handwritten working for a question. |
|
|
Form-data: |
|
|
image (file) |
|
|
questionText |
|
|
expectedAnswer (optional) |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
|
|
|
if "image" not in request.files: |
|
|
return jsonify({"error": "image file is required"}), 400 |
|
|
img_file = request.files["image"] |
|
|
q_text = request.form.get("questionText", "") |
|
|
expected = request.form.get("expectedAnswer", "") |
|
|
|
|
|
img_bytes = img_file.read() |
|
|
pil_image = Image.open(BytesIO(img_bytes)).convert("RGB") |
|
|
|
|
|
|
|
|
upload_id = uuid.uuid4().hex |
|
|
path = f"users/{uid}/handwritten/{upload_id}.jpg" |
|
|
img_url = upload_to_storage(img_bytes, path, "image/jpeg") |
|
|
|
|
|
prompt = f""" |
|
|
You are a math and science marker. |
|
|
|
|
|
You will see a question and the learner's handwritten work as an image. |
|
|
|
|
|
Question: |
|
|
{q_text} |
|
|
|
|
|
Expected answer (if given): |
|
|
{expected} |
|
|
|
|
|
1. Extract the steps the learner used. |
|
|
2. Identify where the first mistake happens. |
|
|
3. Explain what the mistake is and what the correct reasoning should be. |
|
|
4. Summarize what concept they are missing or shaky on. |
|
|
5. Give them a short, encouraging correction, not harsh. |
|
|
|
|
|
Format: |
|
|
ERROR_STEP: |
|
|
[description] |
|
|
|
|
|
EXPLANATION: |
|
|
[paragraphs] |
|
|
|
|
|
KEY_CONCEPT: |
|
|
[one sentence] |
|
|
""" |
|
|
text = send_gemini_multimodal(prompt, pil_image) |
|
|
if not text: |
|
|
return jsonify({"error": "AI analysis failed"}), 500 |
|
|
|
|
|
analysis_doc = { |
|
|
"uploadId": upload_id, |
|
|
"userId": uid, |
|
|
"imageUrl": img_url, |
|
|
"questionText": q_text, |
|
|
"expectedAnswer": expected, |
|
|
"analysis": text, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"handwritten_uploads/{upload_id}").set(analysis_doc) |
|
|
|
|
|
return jsonify(analysis_doc) |
|
|
|
|
|
|
|
|
@app.route("/api/ai/revision-session", methods=["POST"]) |
|
|
def ai_revision_session(): |
|
|
""" |
|
|
Builds a short revision session (structured steps) for learner's weak topics. |
|
|
Body: |
|
|
subjectId |
|
|
topicIds[] |
|
|
withAudio: bool |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
subject_id = data.get("subjectId") |
|
|
topic_ids = data.get("topicIds") or [] |
|
|
with_audio = bool(data.get("withAudio")) |
|
|
|
|
|
if not subject_id or not topic_ids: |
|
|
return jsonify({"error": "subjectId and topicIds required"}), 400 |
|
|
|
|
|
subject_doc = db_ref.child(f"subjects/{subject_id}").get() or {} |
|
|
subject_name = subject_doc.get("name", subject_id) |
|
|
|
|
|
topics = db_ref.child("topics").get() or {} |
|
|
topic_names = [topics.get(t, {}).get("name", t) for t in topic_ids] |
|
|
|
|
|
|
|
|
attempts = db_ref.child("attempts").get() or {} |
|
|
recent = [ |
|
|
a |
|
|
for a in attempts.values() |
|
|
if a.get("userId") == uid and a.get("subjectId") == subject_id |
|
|
] |
|
|
recent = sorted(recent, key=lambda x: x.get("createdAt", ""), reverse=True)[:10] |
|
|
|
|
|
prompt = f""" |
|
|
You are a Zimbabwean exam coach. |
|
|
|
|
|
Learner subject: {subject_name} |
|
|
Topics to focus on: {', '.join(topic_names)} |
|
|
|
|
|
Recent attempts (JSON): |
|
|
{json.dumps(recent, ensure_ascii=False)[:4000]} |
|
|
|
|
|
Design a short revision session of maximum 5 steps. |
|
|
|
|
|
Each step should: |
|
|
- Explain a sub-concept in simple terms. |
|
|
- Include one example question. |
|
|
- Include the answer. |
|
|
|
|
|
Format: |
|
|
TITLE: [short session title] |
|
|
|
|
|
STEPS: |
|
|
1. [step explanation, include example + answer] |
|
|
2. ... |
|
|
""" |
|
|
text = send_gemini_text(prompt) |
|
|
if not text: |
|
|
return jsonify({"error": "AI revision session failed"}), 500 |
|
|
|
|
|
steps = parse_numbered_steps(text) |
|
|
session_id = uuid.uuid4().hex |
|
|
tts_urls = [] |
|
|
|
|
|
if with_audio: |
|
|
for i, step in enumerate(steps, start=1): |
|
|
url = generate_tts_audio_and_upload(step["text"], uid, session_id, i) |
|
|
tts_urls.append(url) |
|
|
else: |
|
|
tts_urls = [None] * len(steps) |
|
|
|
|
|
for i, step in enumerate(steps): |
|
|
step["narrationUrl"] = tts_urls[i] |
|
|
|
|
|
session_doc = { |
|
|
"sessionId": session_id, |
|
|
"userId": uid, |
|
|
"subjectId": subject_id, |
|
|
"topicIds": topic_ids, |
|
|
"rawText": text, |
|
|
"steps": steps, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"revision_sessions/{session_id}").set(session_doc) |
|
|
|
|
|
return jsonify(session_doc) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _aggregate_attempts(attempts_list): |
|
|
"""Aggregate attempts into subject-level and topic-level stats.""" |
|
|
subj_stats = {} |
|
|
topic_stats = {} |
|
|
for a in attempts_list: |
|
|
subj = a.get("subjectId") or "unknown" |
|
|
subj_stats.setdefault(subj, {"scoreSum": 0, "totalSum": 0, "attempts": 0}) |
|
|
subj_stats[subj]["scoreSum"] += a.get("score", 0) |
|
|
subj_stats[subj]["totalSum"] += a.get("total", 0) |
|
|
subj_stats[subj]["attempts"] += 1 |
|
|
|
|
|
tstats = a.get("topicStats") or {} |
|
|
for tid, s in tstats.items(): |
|
|
topic_stats.setdefault(tid, {"correct": 0, "total": 0}) |
|
|
topic_stats[tid]["correct"] += s.get("correct", 0) |
|
|
topic_stats[tid]["total"] += s.get("total", 0) |
|
|
|
|
|
return subj_stats, topic_stats |
|
|
|
|
|
|
|
|
@app.route("/api/learn/analytics", methods=["GET"]) |
|
|
def learner_analytics(): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
|
|
|
attempts = db_ref.child("attempts").get() or {} |
|
|
my_attempts = [ |
|
|
a for a in attempts.values() if a.get("userId") == uid |
|
|
] |
|
|
subj_stats, topic_stats = _aggregate_attempts(my_attempts) |
|
|
|
|
|
subjects = db_ref.child("subjects").get() or {} |
|
|
topics = db_ref.child("topics").get() or {} |
|
|
|
|
|
subjects_out = [] |
|
|
for sid, s in subj_stats.items(): |
|
|
avg_pct = ( |
|
|
(s["scoreSum"] / s["totalSum"]) * 100.0 if s["totalSum"] > 0 else 0.0 |
|
|
) |
|
|
subjects_out.append( |
|
|
{ |
|
|
"subjectId": sid, |
|
|
"subjectName": subjects.get(sid, {}).get("name", sid), |
|
|
"attempts": s["attempts"], |
|
|
"averagePercentage": avg_pct, |
|
|
} |
|
|
) |
|
|
|
|
|
topics_out = [] |
|
|
for tid, s in topic_stats.items(): |
|
|
pct = (s["correct"] / s["total"]) * 100.0 if s["total"] > 0 else 0.0 |
|
|
topics_out.append( |
|
|
{ |
|
|
"topicId": tid, |
|
|
"topicName": topics.get(tid, {}).get("name", tid), |
|
|
"correct": s["correct"], |
|
|
"total": s["total"], |
|
|
"percentage": pct, |
|
|
} |
|
|
) |
|
|
|
|
|
return jsonify( |
|
|
{ |
|
|
"subjects": subjects_out, |
|
|
"topics": topics_out, |
|
|
"attemptsCount": len(my_attempts), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/api/teacher/classes", methods=["POST"]) |
|
|
def teacher_create_class(): |
|
|
"""Teacher creates a class.""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["teacher", "admin"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
name = data.get("name") |
|
|
grade = data.get("grade") |
|
|
subject_ids = data.get("subjectIds") or [] |
|
|
if not name: |
|
|
return jsonify({"error": "name required"}), 400 |
|
|
|
|
|
class_id = uuid.uuid4().hex |
|
|
class_doc = { |
|
|
"classId": class_id, |
|
|
"teacherId": uid, |
|
|
"name": name, |
|
|
"grade": grade, |
|
|
"subjectIds": subject_ids, |
|
|
"learnerIds": [], |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"classes/{class_id}").set(class_doc) |
|
|
return jsonify(class_doc), 201 |
|
|
|
|
|
|
|
|
@app.route("/api/teacher/classes", methods=["GET"]) |
|
|
def teacher_list_classes(): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["teacher", "admin"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
classes = db_ref.child("classes").get() or {} |
|
|
my = [c for c in classes.values() if c.get("teacherId") == uid] |
|
|
return jsonify(my) |
|
|
|
|
|
|
|
|
@app.route("/api/teacher/classes/<class_id>/add-learner", methods=["POST"]) |
|
|
def teacher_add_learner_to_class(class_id): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["teacher", "admin"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
learner_id = data.get("learnerId") |
|
|
if not learner_id: |
|
|
return jsonify({"error": "learnerId required"}), 400 |
|
|
|
|
|
class_ref = db_ref.child(f"classes/{class_id}") |
|
|
class_doc = class_ref.get() |
|
|
if not class_doc or class_doc.get("teacherId") != uid: |
|
|
return jsonify({"error": "Class not found or access denied"}), 404 |
|
|
|
|
|
learners = class_doc.get("learnerIds") or [] |
|
|
if learner_id not in learners: |
|
|
learners.append(learner_id) |
|
|
class_ref.child("learnerIds").set(learners) |
|
|
|
|
|
return jsonify({"success": True, "classId": class_id, "learnerIds": learners}) |
|
|
|
|
|
|
|
|
@app.route("/api/teacher/classes/<class_id>/analytics", methods=["GET"]) |
|
|
def teacher_class_analytics(class_id): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["teacher", "admin"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
class_doc = db_ref.child(f"classes/{class_id}").get() |
|
|
if not class_doc or class_doc.get("teacherId") != uid: |
|
|
return jsonify({"error": "Class not found or access denied"}), 404 |
|
|
|
|
|
learner_ids = class_doc.get("learnerIds") or [] |
|
|
attempts = db_ref.child("attempts").get() or {} |
|
|
class_attempts = [a for a in attempts.values() if a.get("userId") in learner_ids] |
|
|
|
|
|
subj_stats, topic_stats = _aggregate_attempts(class_attempts) |
|
|
subjects = db_ref.child("subjects").get() or {} |
|
|
topics = db_ref.child("topics").get() or {} |
|
|
|
|
|
subjects_out = [] |
|
|
for sid, s in subj_stats.items(): |
|
|
avg_pct = ( |
|
|
(s["scoreSum"] / s["totalSum"]) * 100.0 if s["totalSum"] > 0 else 0.0 |
|
|
) |
|
|
subjects_out.append( |
|
|
{ |
|
|
"subjectId": sid, |
|
|
"subjectName": subjects.get(sid, {}).get("name", sid), |
|
|
"averagePercentage": avg_pct, |
|
|
"attempts": s["attempts"], |
|
|
} |
|
|
) |
|
|
|
|
|
topics_out = [] |
|
|
for tid, s in topic_stats.items(): |
|
|
pct = (s["correct"] / s["total"]) * 100.0 if s["total"] > 0 else 0.0 |
|
|
topics_out.append( |
|
|
{ |
|
|
"topicId": tid, |
|
|
"topicName": topics.get(tid, {}).get("name", tid), |
|
|
"correct": s["correct"], |
|
|
"total": s["total"], |
|
|
"percentage": pct, |
|
|
} |
|
|
) |
|
|
|
|
|
return jsonify( |
|
|
{ |
|
|
"classId": class_id, |
|
|
"learnersCount": len(learner_ids), |
|
|
"subjects": subjects_out, |
|
|
"topics": topics_out, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/api/parent/link-learner", methods=["POST"]) |
|
|
def parent_link_learner(): |
|
|
""" |
|
|
Parent links a learner by UID (you can later switch to invite codes). |
|
|
Body: { learnerId } |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["parent", "admin"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
learner_id = data.get("learnerId") |
|
|
if not learner_id: |
|
|
return jsonify({"error": "learnerId required"}), 400 |
|
|
|
|
|
parent_ref = db_ref.child(f"users/{uid}") |
|
|
parent = parent_ref.get() or {} |
|
|
links = parent.get("linkedLearnerIds") or [] |
|
|
if learner_id not in links: |
|
|
links.append(learner_id) |
|
|
parent_ref.child("linkedLearnerIds").set(links) |
|
|
return jsonify({"success": True, "linkedLearnerIds": links}) |
|
|
|
|
|
|
|
|
@app.route("/api/parent/linked-learners", methods=["GET"]) |
|
|
def parent_list_learners(): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["parent", "admin"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
parent = get_user(uid) |
|
|
links = parent.get("linkedLearnerIds") or [] |
|
|
users = db_ref.child("users").get() or {} |
|
|
out = [] |
|
|
for lid in links: |
|
|
if lid in users: |
|
|
out.append({"uid": lid, **users[lid]}) |
|
|
return jsonify(out) |
|
|
|
|
|
|
|
|
@app.route("/api/parent/learner/<learner_id>/analytics", methods=["GET"]) |
|
|
def parent_learner_analytics(learner_id): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
try: |
|
|
require_role(uid, ["parent", "admin"]) |
|
|
except PermissionError as e: |
|
|
return jsonify({"error": str(e)}), 403 |
|
|
|
|
|
parent = get_user(uid) |
|
|
links = parent.get("linkedLearnerIds") or [] |
|
|
if learner_id not in links and parent.get("role") != "admin": |
|
|
return jsonify({"error": "This learner is not linked to your account"}), 403 |
|
|
|
|
|
attempts = db_ref.child("attempts").get() or {} |
|
|
learner_attempts = [ |
|
|
a for a in attempts.values() if a.get("userId") == learner_id |
|
|
] |
|
|
|
|
|
subj_stats, topic_stats = _aggregate_attempts(learner_attempts) |
|
|
subjects = db_ref.child("subjects").get() or {} |
|
|
topics = db_ref.child("topics").get() or {} |
|
|
|
|
|
subjects_out = [] |
|
|
for sid, s in subj_stats.items(): |
|
|
avg_pct = ( |
|
|
(s["scoreSum"] / s["totalSum"]) * 100.0 if s["totalSum"] > 0 else 0.0 |
|
|
) |
|
|
subjects_out.append( |
|
|
{ |
|
|
"subjectId": sid, |
|
|
"subjectName": subjects.get(sid, {}).get("name", sid), |
|
|
"attempts": s["attempts"], |
|
|
"averagePercentage": avg_pct, |
|
|
} |
|
|
) |
|
|
|
|
|
topics_out = [] |
|
|
for tid, s in topic_stats.items(): |
|
|
pct = (s["correct"] / s["total"]) * 100.0 if s["total"] > 0 else 0.0 |
|
|
topics_out.append( |
|
|
{ |
|
|
"topicId": tid, |
|
|
"topicName": topics.get(tid, {}).get("name", tid), |
|
|
"correct": s["correct"], |
|
|
"total": s["total"], |
|
|
"percentage": pct, |
|
|
} |
|
|
) |
|
|
|
|
|
return jsonify( |
|
|
{ |
|
|
"subjects": subjects_out, |
|
|
"topics": topics_out, |
|
|
"attemptsCount": len(learner_attempts), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def summarize_learning_history(uid: str) -> str: |
|
|
""" |
|
|
Summarize learner's history for a voice tutor. |
|
|
Uses attempts + revision sessions. |
|
|
""" |
|
|
try: |
|
|
attempts = db_ref.child("attempts").get() or {} |
|
|
my_attempts = [ |
|
|
a for a in attempts.values() if a.get("userId") == uid |
|
|
] |
|
|
|
|
|
sessions = db_ref.child("revision_sessions").get() or {} |
|
|
my_sessions = [ |
|
|
s for s in sessions.values() if s.get("userId") == uid |
|
|
] |
|
|
|
|
|
payload = { |
|
|
"attempts": my_attempts, |
|
|
"revisionSessions": my_sessions, |
|
|
} |
|
|
payload_json = json.dumps(payload, ensure_ascii=False) |
|
|
|
|
|
prompt = """ |
|
|
You are an AI tutor getting ready for a voice call with a learner. |
|
|
|
|
|
You will receive their past quiz attempts and revision sessions in JSON. |
|
|
|
|
|
Write a briefing for the tutor with: |
|
|
- Overall performance level. |
|
|
- Strong subjects and topics. |
|
|
- Weak subjects and topics. |
|
|
- Common mistakes or patterns. |
|
|
- Suggested tone/approach for the tutor (e.g. more encouragement, more practice questions). |
|
|
|
|
|
Start with: |
|
|
Here is your briefing on this learner: |
|
|
Then use bullet points. |
|
|
""" |
|
|
resp = client.models.generate_content( |
|
|
model=TEXT_MODEL, |
|
|
contents=[prompt, payload_json], |
|
|
) |
|
|
return (resp.text or "").strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to summarize learning history for {uid}: {e}") |
|
|
return "Here is your briefing on this learner:\n* No history available yet. Treat as a new learner." |
|
|
|
|
|
|
|
|
@app.route("/api/user/tutor-briefing", methods=["GET"]) |
|
|
def get_tutor_briefing(): |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
summary = summarize_learning_history(uid) |
|
|
return jsonify({"summary": summary}) |
|
|
|
|
|
|
|
|
@app.route("/api/learn/log-tutor-call", methods=["POST"]) |
|
|
def log_tutor_call(): |
|
|
""" |
|
|
Logs a voice tutoring session (e.g. ElevenLabs convai). |
|
|
Body: |
|
|
durationSeconds |
|
|
transcript |
|
|
""" |
|
|
uid = verify_token(request.headers.get("Authorization")) |
|
|
if not uid: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
|
|
|
data = request.get_json() or {} |
|
|
duration_seconds = data.get("durationSeconds") |
|
|
transcript = data.get("transcript") |
|
|
|
|
|
if duration_seconds is None: |
|
|
return jsonify({"error": "durationSeconds required"}), 400 |
|
|
|
|
|
call_id = f"{int(time.time())}_{uuid.uuid4().hex[:6]}" |
|
|
doc = { |
|
|
"callId": call_id, |
|
|
"userId": uid, |
|
|
"durationSeconds": duration_seconds, |
|
|
"transcript": transcript, |
|
|
"createdAt": datetime.utcnow().isoformat(), |
|
|
} |
|
|
db_ref.child(f"tutor_calls/{uid}/{call_id}").set(doc) |
|
|
return jsonify({"success": True, "callId": call_id}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route("/api/image-proxy", methods=["GET"]) |
|
|
def image_proxy(): |
|
|
image_url = request.args.get("url") |
|
|
if not image_url: |
|
|
return jsonify({"error": "URL parameter is missing."}), 400 |
|
|
|
|
|
try: |
|
|
if "storage.googleapis.com" not in image_url: |
|
|
return jsonify({"error": "Invalid Firebase Storage URL."}), 400 |
|
|
|
|
|
url_parts = image_url.split("storage.googleapis.com/")[1] |
|
|
url_parts = url_parts.split("?")[0] |
|
|
path_components = url_parts.split("/", 1) |
|
|
if len(path_components) < 2: |
|
|
return jsonify({"error": "Invalid URL format."}), 400 |
|
|
|
|
|
url_bucket_name = path_components[0] |
|
|
blob_path = path_components[1] |
|
|
|
|
|
expected_bucket_name = bucket.name |
|
|
if url_bucket_name != expected_bucket_name: |
|
|
return jsonify({"error": "Bucket name mismatch."}), 403 |
|
|
|
|
|
blob = bucket.blob(blob_path) |
|
|
if not blob.exists(): |
|
|
return jsonify({"error": "Image not found."}), 404 |
|
|
|
|
|
image_bytes = blob.download_as_bytes() |
|
|
content_type = blob.content_type or "application/octet-stream" |
|
|
|
|
|
resp = Response(image_bytes, content_type=content_type) |
|
|
resp.headers["Cache-Control"] = "public, max-age=3600" |
|
|
return resp |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Image proxy error: {e}") |
|
|
logger.error(traceback.format_exc()) |
|
|
return jsonify({"error": "Internal server error processing the image request."}), 500 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
app.run(debug=True, host="0.0.0.0", port=port) |