ProfAI-API / main.py
rairo's picture
Create main.py
d5b6c91 verified
raw
history blame
17.3 kB
# main.py
import os
import uuid
import json
import traceback
import threading
from datetime import datetime
from flask import Flask, request, jsonify
from flask_cors import CORS
import firebase_admin
from firebase_admin import credentials, db, storage, auth
from lesson_gen import (
fetch_arxiv_papers, generate_knowledge_base,
generate_lesson_from_knowledge_base, generate_remedial_lesson,
create_lesson_video, deepgram_tts
)
import logging
# --- Configuration & Initialization ---
app = Flask(__name__)
CORS(app)
try:
credentials_json_string = os.environ.get("FIREBASE")
if not credentials_json_string: raise ValueError("FIREBASE env var not set.")
credentials_json = json.loads(credentials_json_string)
firebase_db_url = os.environ.get("Firebase_DB")
firebase_storage_bucket = os.environ.get("Firebase_Storage")
if not all([firebase_db_url, firebase_storage_bucket]): raise ValueError("Firebase DB/Storage env vars must be set.")
cred = credentials.Certificate(credentials_json)
firebase_admin.initialize_app(cred, {'databaseURL': firebase_db_url, 'storageBucket': firebase_storage_bucket})
logging.info("Firebase Admin SDK initialized successfully for ProfAI.")
except Exception as e:
logging.fatal(f"FATAL: Error initializing Firebase: {e}")
bucket = storage.bucket()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Helper Functions & Workers ---
def verify_token(token):
try: return auth.verify_id_token(token)['uid']
except Exception as e:
logging.error(f"Token verification failed: {e}")
return None
def generation_worker(course_id, user_id, topic, level, goal):
"""Worker for the entire initial course generation process."""
logging.info(f"WORKER [{course_id}]: Starting for topic '{topic}'.")
course_ref = db.reference(f'profai_courses/{course_id}')
try:
arxiv_docs = fetch_arxiv_papers(topic)
knowledge_base = generate_knowledge_base(topic, level, goal, arxiv_docs)
learning_path = knowledge_base.get("learning_path", [])
course_ref.update({"knowledgeBase": knowledge_base, "learningPath": learning_path})
first_concept = learning_path[0] if learning_path else "Introduction"
lesson_content = generate_lesson_from_knowledge_base(knowledge_base, first_concept)
narration_bytes = deepgram_tts(lesson_content['script'])
if not narration_bytes: raise Exception("TTS failed, cannot proceed.")
video_bytes = create_lesson_video(lesson_content['script'], narration_bytes)
blob_name = f"profai_courses/{user_id}/{course_id}/lesson_1.mp4"
blob = bucket.blob(blob_name)
blob.upload_from_string(video_bytes, content_type="video/mp4")
steps = [{
"step": 1, "type": "video", "concept": first_concept,
"videoUrl": blob.public_url, "status": "unlocked"
}, {
"step": 2, "type": "quiz", "concept": first_concept,
"quizData": lesson_content['quiz'], "status": "locked"
}]
course_ref.update({"status": "ready", "steps": steps})
user_ref = db.reference(f'users/{user_id}')
user_credits = user_ref.child('credits').get()
user_ref.update({'credits': user_credits - 8})
logging.info(f"WORKER [{course_id}]: Success. Charged 8 credits.")
except Exception as e:
logging.error(f"WORKER [{course_id}]: Failed: {traceback.format_exc()}")
course_ref.update({"status": "failed", "error": str(e)})
def lesson_worker(course_id, user_id, concept, current_steps, is_remedial=False, regenerate_step_num=None):
"""Worker for generating subsequent, remedial, or regenerated lessons."""
action = "Regenerating" if regenerate_step_num else "Generating"
logging.info(f"WORKER [{course_id}]: {action} lesson for concept '{concept}'. Remedial: {is_remedial}")
course_ref = db.reference(f'profai_courses/{course_id}')
try:
knowledge_base = course_ref.child('knowledgeBase').get()
if not knowledge_base: raise Exception("Knowledge base not found for course.")
if is_remedial:
lesson_content = generate_remedial_lesson(concept)
cost = 5
else: # Standard or Regenerated lesson
lesson_content = generate_lesson_from_knowledge_base(knowledge_base, concept)
cost = 8
narration_bytes = deepgram_tts(lesson_content['script'])
if not narration_bytes: raise Exception("TTS failed.")
video_bytes = create_lesson_video(lesson_content['script'], narration_bytes)
step_num = regenerate_step_num if regenerate_step_num else len(current_steps) + 1
blob_name = f"profai_courses/{user_id}/{course_id}/lesson_{step_num}_{uuid.uuid4().hex[:4]}.mp4"
blob = bucket.blob(blob_name)
blob.upload_from_string(video_bytes, content_type="video/mp4")
new_video_step = {
"step": step_num, "type": "video", "concept": concept,
"videoUrl": blob.public_url, "status": "unlocked"
}
new_quiz_step = {
"step": step_num + 1, "type": "quiz", "concept": concept,
"quizData": lesson_content['quiz'], "status": "locked"
}
if regenerate_step_num:
# Replace existing steps
current_steps[regenerate_step_num - 1] = new_video_step
if (regenerate_step_num < len(current_steps)) and current_steps[regenerate_step_num]['type'] == 'quiz':
current_steps[regenerate_step_num] = new_quiz_step
else:
# Append new steps
current_steps.extend([new_video_step, new_quiz_step])
course_ref.child('steps').set(current_steps)
user_ref = db.reference(f'users/{user_id}')
user_credits = user_ref.child('credits').get()
user_ref.update({'credits': user_credits - cost})
logging.info(f"WORKER [{course_id}]: Lesson '{concept}' processed. Charged {cost} credits.")
except Exception as e:
logging.error(f"WORKER [{course_id}]: Lesson generation for '{concept}' failed: {e}")
# Mark the last unlocked step as failed to signal the frontend
current_steps[-1]['status'] = 'generation_failed'
course_ref.child('steps').set(current_steps)
# --- USER AUTHENTICATION (EXPANDED) ---
@app.route('/api/auth/signup', methods=['POST'])
def signup():
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password: return jsonify({'error': 'Email and password are required'}), 400
user = auth.create_user(email=email, password=password)
user_ref = db.reference(f'users/{user.uid}')
user_data = {'email': email, 'credits': 20, 'created_at': datetime.utcnow().isoformat()} # Default credits
user_ref.set(user_data)
return jsonify({'success': True, 'user': {'uid': user.uid, **user_data}}), 201
except Exception as e: return jsonify({'error': str(e)}), 400
@app.route('/api/auth/google-signin', methods=['POST'])
def google_signin():
try:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid token'}), 401
token = auth_header.split(' ')[1]
decoded_token = auth.verify_id_token(token)
uid = decoded_token['uid']
email = decoded_token.get('email')
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data:
logging.info(f"New user signed in with Google: {email}, UID: {uid}. Creating profile.")
user_data = {'email': email, 'credits': 20, 'created_at': datetime.utcnow().isoformat()}
user_ref.set(user_data)
else:
logging.info(f"Existing user signed in with Google: {email}, UID: {uid}.")
return jsonify({'success': True, 'user': {'uid': uid, **user_data}}), 200
except Exception as e:
logging.error(f"Google Sign-in failed: {traceback.format_exc()}")
return jsonify({'error': str(e)}), 400
# --- ProfAI CRUD ENDPOINTS ---
@app.route('/api/profai/courses', methods=['GET'])
def get_user_courses():
"""Lists all courses created by the authenticated user."""
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
courses_ref = db.reference('profai_courses')
user_courses = courses_ref.order_by_child('uid').equal_to(uid).get()
if not user_courses: return jsonify([]), 200
# Convert dict to a list sorted by creation date
courses_list = sorted(user_courses.values(), key=lambda x: x.get('createdAt', ''), reverse=True)
return jsonify(courses_list), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/profai/start-course', methods=['POST'])
def start_course():
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
user_credits = db.reference(f'users/{uid}/credits').get()
if not isinstance(user_credits, (int, float)) or user_credits < 8:
return jsonify({'error': 'Insufficient credits. Requires 8 credits to start a new course.'}), 402
data = request.get_json()
topic, level, goal = data.get('topic'), data.get('level'), data.get('goal')
if not all([topic, level, goal]):
return jsonify({'error': 'topic, level, and goal are required'}), 400
course_id = uuid.uuid4().hex
course_data = {
'id': course_id, 'uid': uid, 'topic': topic,
'status': 'generating', 'createdAt': datetime.utcnow().isoformat(),
}
db.reference(f'profai_courses/{course_id}').set(course_data)
thread = threading.Thread(target=generation_worker, args=(course_id, uid, topic, level, goal))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': 'Your personalized course is being generated!', 'courseId': course_id}), 202
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/profai/course-status/<string:course_id>', methods=['GET'])
def get_course_status(course_id):
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
course_data = db.reference(f'profai_courses/{course_id}').get()
if not course_data or course_data.get('uid') != uid:
return jsonify({'error': 'Course not found or unauthorized'}), 404
return jsonify(course_data), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/profai/courses/<string:course_id>', methods=['DELETE'])
def delete_course(course_id):
"""Deletes a course and all associated videos from storage."""
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
course_ref = db.reference(f'profai_courses/{course_id}')
course_data = course_ref.get()
if not course_data or course_data.get('uid') != uid:
return jsonify({'error': 'Course not found or unauthorized'}), 404
# Delete associated files from Firebase Storage
storage_prefix = f"profai_courses/{uid}/{course_id}/"
blobs_to_delete = bucket.list_blobs(prefix=storage_prefix)
for blob in blobs_to_delete:
logging.info(f"Deleting file from storage: {blob.name}")
blob.delete()
# Delete course record from database
course_ref.delete()
logging.info(f"Deleted course {course_id} for user {uid}")
return jsonify({'success': True, 'message': 'Course deleted successfully.'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/profai/courses/<string:course_id>/steps/<int:step_num>/regenerate', methods=['POST'])
def regenerate_step(course_id, step_num):
"""Regenerates a specific lesson video and its quiz."""
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
user_credits = db.reference(f'users/{uid}/credits').get()
if user_credits < 8: return jsonify({'error': 'Insufficient credits. Regeneration requires 8 credits.'}), 402
course_ref = db.reference(f'profai_courses/{course_id}')
course_data = course_ref.get()
if not course_data or course_data.get('uid') != uid:
return jsonify({'error': 'Course not found or unauthorized'}), 404
steps = course_data.get('steps', [])
if not (0 < step_num <= len(steps) and steps[step_num - 1]['type'] == 'video'):
return jsonify({'error': 'Invalid step number. You can only regenerate video steps.'}), 400
concept_to_regenerate = steps[step_num - 1]['concept']
steps[step_num - 1]['status'] = 'regenerating' # Update UI state
course_ref.child('steps').set(steps)
thread = threading.Thread(target=lesson_worker, args=(course_id, uid, concept_to_regenerate, steps, False, step_num))
thread.daemon = True
thread.start()
return jsonify({'success': True, 'message': f"Regenerating lesson for '{concept_to_regenerate}'."}), 202
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/profai/submit-quiz/<string:course_id>', methods=['POST'])
def submit_quiz(course_id):
try:
token = request.headers.get('Authorization', '').split(' ')[1]
uid = verify_token(token)
if not uid: return jsonify({'error': 'Unauthorized'}), 401
data = request.get_json()
answers = data.get('answers', {})
course_ref = db.reference(f'profai_courses/{course_id}')
course_data = course_ref.get()
if not course_data or course_data.get('uid') != uid:
return jsonify({'error': 'Course not found'}), 404
steps = course_data.get('steps', [])
learning_path = course_data.get('learningPath', [])
quiz_step_index = next((i for i, step in enumerate(steps) if step.get('status') == 'unlocked' and step.get('type') == 'quiz'), -1)
if quiz_step_index == -1: return jsonify({'error': 'No active quiz found'}), 400
quiz_data = steps[quiz_step_index]['quizData']
correct_count = sum(1 for q in quiz_data if answers.get(q['question']) == q['answer'])
score = correct_count / len(quiz_data)
steps[quiz_step_index]['status'] = 'completed'
next_action_message = ""
if score > 0.6: # Pass
user_credits = db.reference(f'users/{uid}/credits').get()
if user_credits < 8: return jsonify({'error': 'Insufficient credits for next lesson (requires 8).'}), 402
last_concept = steps[quiz_step_index]['concept']
try:
next_concept_index = learning_path.index(last_concept) + 1
if next_concept_index < len(learning_path):
next_concept = learning_path[next_concept_index]
thread = threading.Thread(target=lesson_worker, args=(course_id, uid, next_concept, steps, False))
thread.daemon = True
thread.start()
next_action_message = f"Passed! Generating your next lesson on '{next_concept}'."
else:
steps.append({"step": len(steps) + 1, "type": "course_complete", "status": "unlocked"})
next_action_message = "Congratulations! You've completed the course."
except (ValueError, IndexError):
return jsonify({'error': 'Could not determine the next lesson step.'}), 500
else: # Struggle
user_credits = db.reference(f'users/{uid}/credits').get()
if user_credits < 5: return jsonify({'error': 'Insufficient credits for remedial lesson (requires 5).'}), 402
failed_concept = steps[quiz_step_index]['concept']
thread = threading.Thread(target=lesson_worker, args=(course_id, uid, failed_concept, steps, True))
thread.daemon = True
thread.start()
next_action_message = f"Let's review. Generating a remedial lesson on '{failed_concept}'."
course_ref.child('steps').set(steps)
return jsonify({'success': True, 'score': score, 'message': next_action_message}), 202
except Exception as e:
return jsonify({'error': str(e)}), 500
# --- Main Execution ---
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860))
app.run(debug=True, host="0.0.0.0", port=port)