from fastapi import APIRouter, Depends, HTTPException, Request, Cookie, UploadFile, File, Form from typing import List, Optional, Dict, Any, Callable from pydantic import BaseModel, Field from services.api.db.token_utils import decode_token from dotenv import load_dotenv import os import pymysql import pandas as pd import boto3 import json import threading import asyncio from botocore.exceptions import ClientError from datetime import datetime from fastapi.middleware.cors import CORSMiddleware from services.utils.cache_utils import cache_data, cache_with_fallback, clear_cache from services.config.valkey_config import get_redis_client from services.utils.api_cache import get_cached_data, invalidate_cache, invalidate_cache_pattern # Get Valkey client redis_client = get_redis_client() # Load environment variables load_dotenv() MYSQL_USER = os.getenv("MYSQL_USER") MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD") MYSQL_HOST = os.getenv("MYSQL_HOST", "localhost") MYSQL_DB = os.getenv("MYSQL_DB") MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306")) SSL = { "ca": os.path.join(os.path.dirname(__file__), "ca.pem") } # AWS Configuration AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") REGION = os.getenv("REGION_NAME", "ap-southeast-1") # Initialize S3 client s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION ) router = APIRouter( tags=["courses"], responses={404: {"description": "Not found"}}, ) # Simple test endpoint to verify routing @router.post("/test") async def test_post(): return {"message": "POST test endpoint works"} def connect_db(): try: print(f"Connecting to database {MYSQL_DB} on {MYSQL_HOST}:{MYSQL_PORT}") connection = pymysql.connect( host=MYSQL_HOST, user=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DB, port=MYSQL_PORT, ssl=False ) print("Database connection successful") return connection except Exception as e: print(f"Database connection error: {str(e)}") raise HTTPException(status_code=500, detail=f"Database connection error: {str(e)}") # Models class Course(BaseModel): id: int name: str instructor: str description: Optional[str] rating: Optional[float] = None enrolled: Optional[int] = 0 class QuestionOption(BaseModel): text: str isCorrect: bool = False class QuizQuestion(BaseModel): question: str options: list[str] correctAnswer: int class Quiz(BaseModel): title: str description: Optional[str] = None questions: Dict[str, QuizQuestion] class LectureListItem(BaseModel): id: int title: str courseId: int description: Optional[str] = None passed: Optional[bool] = False class Lecture(BaseModel): id: int courseId: int title: str description: Optional[str] = None content: Optional[str] = None videoUrl: Optional[str] = None quiz: Optional[Quiz] = None class LectureDetails(Lecture): courseName: Optional[str] = None courseDescription: Optional[str] = None courseLectures: Optional[List[LectureListItem]] = None # Add the missing CourseDetails model class CourseDetails(BaseModel): id: int name: str description: Optional[str] duration: Optional[str] skills: Optional[List[str]] = [] difficulty: Optional[str] instructor: str instructor_id: Optional[int] enrolled: Optional[int] = 0 rating: Optional[float] = None is_enrolled: Optional[bool] = False # Optimized /courses endpoint @router.get("/courses", response_model=List[Course]) async def get_courses(request: Request, auth_token: str = Cookie(None)): try: # Authentication (keep existing code) if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] if not auth_token: raise HTTPException(status_code=401, detail="No authentication token provided") try: user_data = decode_token(auth_token) except Exception as e: print(f"Token decode error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token") # Better cache key with shorter TTL for faster updates cache_key = "courses:public:v2" # Optimized database fetch function async def fetch_courses_from_db(): conn = connect_db() try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Single optimized query with JOINs instead of N+1 queries query = """ SELECT c.CourseID as id, c.CourseName as name, CONCAT(i.InstructorName, ' (', i.AccountName, ')') as instructor, c.Descriptions as description, COALESCE(enrollment_stats.enrolled, 0) as enrolled, COALESCE(rating_stats.avg_rating, NULL) as rating FROM Courses c JOIN Instructors i ON c.InstructorID = i.InstructorID LEFT JOIN ( SELECT CourseID, COUNT(*) as enrolled FROM Enrollments GROUP BY CourseID ) enrollment_stats ON c.CourseID = enrollment_stats.CourseID LEFT JOIN ( SELECT CourseID, AVG(Rating) as avg_rating FROM Enrollments WHERE Rating IS NOT NULL GROUP BY CourseID ) rating_stats ON c.CourseID = rating_stats.CourseID ORDER BY c.CourseID DESC LIMIT 50 """ cursor.execute(query) courses = cursor.fetchall() # Format the data efficiently formatted_courses = [] for course in courses: formatted_courses.append({ 'id': course['id'], 'name': course['name'], 'instructor': course['instructor'], 'description': course['description'], 'enrolled': course['enrolled'], 'rating': float(course['rating']) if course['rating'] else None }) return formatted_courses finally: conn.close() # Use optimized caching with compression and shorter TTL return await get_cached_data( cache_key, fetch_courses_from_db, ttl=900, # 15 minutes instead of 1 hour for faster updates use_compression=True # Enable compression for faster transfer ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Get course details # Optimize the course details endpoint with caching @router.get("/courses/{course_id}", response_model=CourseDetails) async def get_course_details(course_id: int, request: Request, auth_token: str = Cookie(None)): try: # Authentication if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] if not auth_token: raise HTTPException(status_code=401, detail="No authentication token provided") try: user_data = decode_token(auth_token) user_id = user_data.get('user_id') except Exception as e: print(f"Token decode error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token") # Cache key for course details cache_key = f"course:details:{course_id}:user:{user_id}" # Optimized database fetch function async def fetch_course_details_from_db(): conn = connect_db() try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Single query with all course info, enrollment status, and user's enrollment query = """ SELECT c.CourseID as id, c.CourseName as name, c.Descriptions as description, c.EstimatedDuration as duration, c.Skills as skills, c.Difficulty as difficulty, CONCAT(i.InstructorName, ' (', i.AccountName, ')') as instructor, i.InstructorID as instructor_id, COALESCE(enrollment_stats.enrolled, 0) as enrolled, COALESCE(rating_stats.avg_rating, NULL) as rating, CASE WHEN user_enrollment.LearnerID IS NOT NULL THEN TRUE ELSE FALSE END as is_enrolled FROM Courses c JOIN Instructors i ON c.InstructorID = i.InstructorID LEFT JOIN ( SELECT CourseID, COUNT(*) as enrolled FROM Enrollments GROUP BY CourseID ) enrollment_stats ON c.CourseID = enrollment_stats.CourseID LEFT JOIN ( SELECT CourseID, AVG(Rating) as avg_rating FROM Enrollments WHERE Rating IS NOT NULL GROUP BY CourseID ) rating_stats ON c.CourseID = rating_stats.CourseID LEFT JOIN ( SELECT CourseID, LearnerID FROM Enrollments e2 JOIN Learners l ON e2.LearnerID = l.LearnerID WHERE l.LearnerID = %s ) user_enrollment ON c.CourseID = user_enrollment.CourseID WHERE c.CourseID = %s """ cursor.execute(query, (user_id, course_id)) course = cursor.fetchone() if not course: raise HTTPException(status_code=404, detail="Course not found") # Format skills if it's JSON skills = [] if course['skills']: try: skills = json.loads(course['skills']) except: skills = [] return { 'id': course['id'], 'name': course['name'], 'description': course['description'], 'duration': course['duration'], 'skills': skills, 'difficulty': course['difficulty'], 'instructor': course['instructor'], 'instructor_id': course['instructor_id'], 'enrolled': course['enrolled'], 'rating': float(course['rating']) if course['rating'] else None, 'is_enrolled': course['is_enrolled'] } finally: conn.close() # Use caching with 30 minutes TTL return await get_cached_data( cache_key, fetch_course_details_from_db, ttl=1800, # 30 minutes use_compression=True ) except HTTPException: raise except Exception as e: print(f"Error in get_course_details: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Get lectures for a course @router.get("/courses/{course_id}/lectures", response_model=List[LectureListItem]) async def get_course_lectures(request: Request, course_id: int, auth_token: str = Cookie(None)): try: # Try to get token from Authorization header if cookie is not present if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] if not auth_token: raise HTTPException(status_code=401, detail="No authentication token provided") try: user_data = decode_token(auth_token) except Exception as e: print(f"Token decode error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token") # Create cache key using course ID and user ID cache_key = f"courses:id:{course_id}:lectures:user:{user_data.get('user_id', 'anonymous')}" # Define database fetch function async def fetch_lectures_from_db(): conn = connect_db() lectures = [] try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: # First verify the course exists cursor.execute("SELECT CourseID FROM Courses WHERE CourseID = %s", (course_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail="Course not found") # Get lectures with pass status for the current user query = """ SELECT l.LectureID as id, l.CourseID as courseId, l.Title as title, l.Description as description, CASE WHEN lr.State = 'passed' THEN 1 ELSE 0 END as passed FROM Lectures l LEFT JOIN LectureResults lr ON l.LectureID = lr.LectureID AND lr.LearnerID = %s AND lr.CourseID = %s WHERE l.CourseID = %s ORDER BY l.LectureID """ cursor.execute(query, (user_data.get('user_id'), course_id, course_id)) lectures = cursor.fetchall() # Ensure all fields match the LectureListItem model for lecture in lectures: # Ensure the fields exist and have appropriate null values if missing if lecture.get('description') is None: lecture['description'] = None # Convert passed from 1/0 to True/False lecture['passed'] = bool(lecture.get('passed', 0)) finally: conn.close() return lectures # Use cached data helper return await get_cached_data(cache_key, fetch_lectures_from_db, ttl=3600) except HTTPException: raise except Exception as e: print(f"Error in get_course_lectures: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Get lecture details @router.get("/lectures/{lecture_id}", response_model=LectureDetails) async def get_lecture_details(request: Request, lecture_id: int, auth_token: str = Cookie(None)): try: # Try to get token from Authorization header if cookie is not present if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] if not auth_token: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify user is authenticated try: user_data = decode_token(auth_token) except Exception as e: print(f"Token decode error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token") # Create cache key using lecture ID and user ID cache_key = f"lectures:id:{lecture_id}:user:{user_data.get('user_id', 'anonymous')}" # Define database fetch function async def fetch_lecture_from_db(): conn = connect_db() cursor = None try: cursor = conn.cursor(pymysql.cursors.DictCursor) # Get lecture details with course data query = """ SELECT l.LectureID as id, l.CourseID as courseId, l.Title as title, l.Description as description, l.Content as content, c.CourseName as courseName, c.CourseID, c.Descriptions as courseDescription FROM Lectures l JOIN Courses c ON l.CourseID = c.CourseID WHERE l.LectureID = %s """ cursor.execute(query, (lecture_id,)) lecture = cursor.fetchone() if not lecture: raise HTTPException(status_code=404, detail="Lecture not found") # Get course lectures with pass status cursor.execute(""" SELECT l.LectureID as id, l.CourseID as courseId, l.Title as title, l.Description as description, CASE WHEN lr.State = 'passed' THEN 1 ELSE 0 END as passed FROM Lectures l LEFT JOIN LectureResults lr ON l.LectureID = lr.LectureID AND lr.LearnerID = %s AND lr.CourseID = %s WHERE l.CourseID = %s ORDER BY l.LectureID """, (user_data.get('user_id'), lecture['courseId'], lecture['courseId'])) course_lectures = cursor.fetchall() if not course_lectures: course_lectures = [] else: # Convert passed from 1/0 to True/False for each lecture for course_lecture in course_lectures: if course_lecture.get('description') is None: course_lecture['description'] = None course_lecture['passed'] = bool(course_lecture.get('passed', 0)) response_data = { "id": lecture['id'], "courseId": lecture['courseId'], "title": lecture['title'], "description": lecture['description'], "content": lecture['content'], "courseName": lecture['courseName'], "courseDescription": lecture['courseDescription'], "courseLectures": course_lectures, "videoUrl": None, "quiz": None # Initialize quiz as None } # Get video URL if exists video_path = f"videos/cid{lecture['courseId']}/lid{lecture['id']}/vid_lecture.mp4" try: s3.head_object(Bucket="tlhmaterials", Key=video_path) response_data['videoUrl'] = f"https://tlhmaterials.s3-{REGION}.amazonaws.com/{video_path}" except: pass # Keep videoUrl as None if no video exists # Get quiz if exists - fixing this part cursor.execute(""" SELECT q.QuizID, q.Title, q.Description FROM Quizzes q WHERE q.LectureID = %s """, (lecture_id,)) quiz_data = cursor.fetchone() if quiz_data: quiz = { "id": quiz_data['QuizID'], "title": quiz_data['Title'], "description": quiz_data['Description'], "questions": {} } # Get quiz questions cursor.execute(""" SELECT q.QuestionID, q.QuestionText, o.OptionID, o.OptionText, o.IsCorrect FROM Questions q JOIN Options o ON q.QuestionID = o.QuestionID WHERE q.QuizID = %s ORDER BY q.QuestionID, o.OptionID """, (quiz_data['QuizID'],)) questions_data = cursor.fetchall() current_question_id = None current_options = [] correct_option_index = 0 for row in questions_data: if current_question_id != row['QuestionID']: # Save previous question data if current_question_id is not None: quiz['questions'][str(current_question_id)] = { 'question': question_text, 'options': current_options, 'correctAnswer': correct_option_index } # Start new question current_question_id = row['QuestionID'] question_text = row['QuestionText'] current_options = [] correct_option_index = 0 current_options.append(row['OptionText']) if row['IsCorrect']: correct_option_index = len(current_options) - 1 # Save the last question if current_question_id is not None: quiz['questions'][str(current_question_id)] = { 'question': question_text, 'options': current_options, 'correctAnswer': correct_option_index } response_data['quiz'] = quiz return response_data finally: if cursor: cursor.close() if conn: conn.close() # Use the caching mechanism to get the data return await get_cached_data( cache_key, fetch_lecture_from_db, ttl=3600, # Cache for 1 hour use_compression=True # Enable compression for large response ) except HTTPException: raise except Exception as e: print(f"Error in get_lecture_details: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Get instructor's courses with proper caching @router.get("/instructor/courses", response_model=List[Course]) async def get_instructor_courses( request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data user_data = decode_token(auth_token) username = user_data['username'] role = user_data['role'] instructor_id = user_data.get('user_id') # Verify user is an instructor if role != "Instructor": raise HTTPException(status_code=403, detail="Only instructors can access this endpoint") # Create a cache key for instructor courses cache_key = f"instructor:courses:{instructor_id or username}" # Define database fetch function async def fetch_instructor_courses_from_db(): conn = connect_db() try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Get instructor ID from token or fallback to database lookup current_instructor_id = instructor_id if not current_instructor_id: # Fallback for old tokens without user_id cursor.execute(""" SELECT InstructorID FROM Instructors WHERE AccountName = %s """, (username,)) instructor = cursor.fetchone() if not instructor: raise HTTPException(status_code=404, detail="Instructor not found") current_instructor_id = instructor['InstructorID'] # Get courses by this instructor query = """ SELECT c.CourseID as id, c.CourseName as name, CONCAT(i.InstructorName, ' (', i.AccountName, ')') as instructor, c.Descriptions as description, (SELECT COUNT(*) FROM Enrollments WHERE CourseID = c.CourseID) as enrolled, COALESCE( (SELECT AVG(Rating) FROM Enrollments WHERE CourseID = c.CourseID AND Rating IS NOT NULL), 0 ) as rating FROM Courses c JOIN Instructors i ON c.InstructorID = i.InstructorID WHERE c.InstructorID = %s ORDER BY c.CourseID DESC """ cursor.execute(query, (current_instructor_id,)) courses = cursor.fetchall() # Format the courses data formatted_courses = [] for course in courses: formatted_course = { 'id': course['id'], 'name': course['name'], 'instructor': course['instructor'], 'description': course['description'], 'enrolled': course['enrolled'], 'rating': float(course['rating']) if course['rating'] else None } formatted_courses.append(formatted_course) return formatted_courses finally: conn.close() # Use the caching mechanism to get the data return await get_cached_data( cache_key, fetch_instructor_courses_from_db, ttl=1800, # Cache for 30 minutes use_compression=True # Enable compression for large response ) except HTTPException as he: raise he except Exception as e: print(f"Error fetching instructor courses: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Create a new course class CourseCreate(BaseModel): name: str description: str skills: List[str] difficulty: str duration: Optional[int] = None @router.post("/instructor/courses", response_model=Course) async def create_course( course_data: CourseCreate, request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) username = user_data['username'] role = user_data['role'] instructor_id = user_data.get('user_id') # Log debugging information print(f"POST /instructor/courses - User: {username}, Role: {role}, InstructorID: {instructor_id}") print(f"Course data: {course_data}") # Verify user is an instructor if role != "Instructor": raise HTTPException(status_code=403, detail="Only instructors can create courses") # Connect to database conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Get instructor ID from token or fallback to database lookup if not instructor_id: # Fallback for old tokens without user_id cursor.execute(""" SELECT InstructorID FROM Instructors WHERE AccountName = %s """, (username,)) instructor = cursor.fetchone() if not instructor: raise HTTPException(status_code=404, detail="Instructor not found") instructor_id = instructor['InstructorID'] # Insert new course cursor.execute(""" INSERT INTO Courses (CourseName, Descriptions, Skills, Difficulty, EstimatedDuration, InstructorID) VALUES (%s, %s, %s, %s, %s, %s) """, ( course_data.name, course_data.description, json.dumps(course_data.skills), course_data.difficulty, course_data.duration or "Self-paced", instructor_id )) # Get the created course ID course_id = cursor.lastrowid conn.commit() # Return the created course cursor.execute(""" SELECT c.CourseID as id, c.CourseName as name, CONCAT(i.InstructorName, ' (', i.AccountName, ')') as instructor, c.Descriptions as description, 0 as enrolled, NULL as rating FROM Courses c JOIN Instructors i ON c.InstructorID = i.InstructorID WHERE c.CourseID = %s """, (course_id,)) new_course = cursor.fetchone() if not new_course: raise HTTPException(status_code=500, detail="Course was created but couldn't be retrieved") # Invalidate Valkey cache for all courses and instructor-specific cache pattern_all = "courses:all" pattern_instructor = f"instructor:courses:{instructor_id or username}" pattern_course_details = f"courses:id" # Invalidate specific course details too print(f"Invalidating cache patterns: {pattern_all}, {pattern_instructor}, and {pattern_course_details}") invalidate_cache_pattern(pattern_all) invalidate_cache_pattern(pattern_course_details) invalidate_cache([pattern_instructor]) return new_course except Exception as e: if 'conn' in locals(): conn.rollback() print(f"Database error: {str(e)}") raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") except HTTPException as he: raise he except Exception as e: print(f"Error creating course: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) finally: if 'conn' in locals(): conn.close() # Enroll in a course @router.post("/courses/{course_id}/enroll") async def enroll_in_course( course_id: int, request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) learner_id = user_data.get('user_id') if not learner_id: # Fallback for old tokens without user_id - do database lookup conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT LearnerID FROM Learners WHERE AccountName = %s """, (user_data['username'],)) learner = cursor.fetchone() if not learner: raise HTTPException(status_code=404, detail="Learner not found") learner_id = learner['LearnerID'] else: # Use user_id from token conn = connect_db() except Exception as e: print(f"Token/user verification error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token or user not found") # Check if the course exists with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT CourseID FROM Courses WHERE CourseID = %s """, (course_id,)) course = cursor.fetchone() if not course: raise HTTPException(status_code=404, detail="Course not found") # Check if already enrolled cursor.execute(""" SELECT EnrollmentID FROM Enrollments WHERE LearnerID = %s AND CourseID = %s """, (learner_id, course_id)) existing_enrollment = cursor.fetchone() if existing_enrollment: return {"message": "Already enrolled in this course"} # Get the actual column names from the Enrollments table cursor.execute("DESCRIBE Enrollments") columns = cursor.fetchall() column_names = [col['Field'] for col in columns] print(f"Available columns in Enrollments table: {column_names}") # Enroll the learner in the course with the correct date column try: # Try different common column names for the enrollment date enroll_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor.execute( "CALL sp_EnrollLearner(%s, %s, %s)", (learner_id, course_id, enroll_date) ) conn.commit() # Invalidate Valkey cache for this course and user-specific data redis_client = get_redis_client() # Clear specific course cache for all users pattern_course = f"courses:id:{course_id}:*" # Clear course preview cache for all users (this is the missing piece!) pattern_preview = f"course:preview:{course_id}:*" # Clear user-specific enrolled courses cache pattern_user_courses = f"learner:courses:{learner_id}" print(f"Invalidating cache patterns: {pattern_course}, {pattern_preview}, and {pattern_user_courses}") invalidate_cache_pattern(pattern_course) invalidate_cache_pattern(pattern_preview) invalidate_cache([pattern_user_courses]) return {"message": "Successfully enrolled in the course"} except Exception as e: conn.rollback() print(f"Error enrolling in course: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to enroll in course: {str(e)}") except HTTPException as he: raise he except Exception as e: print(f"Error enrolling in course: {str(e)}") raise HTTPException(status_code=500, detail=f"Error enrolling in course: {str(e)}") finally: if 'conn' in locals(): conn.close() # Get enrolled courses for the current learner @router.get("/learner/courses", response_model=List[Course]) async def get_enrolled_courses( request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) learner_id = user_data.get('user_id') if not learner_id: # Fallback for old tokens without user_id - do database lookup conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT LearnerID FROM Learners WHERE AccountName = %s """, (user_data['username'],)) learner = cursor.fetchone() if not learner: raise HTTPException(status_code=404, detail="Learner not found") learner_id = learner['LearnerID'] else: # Use user_id from token conn = connect_db() except Exception as e: print(f"Token/user verification error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token or user not found") # Get enrolled courses courses = [] with conn.cursor(pymysql.cursors.DictCursor) as cursor: query = """ SELECT c.CourseID as id, c.CourseName as name, CONCAT(i.InstructorName, ' (', i.AccountName, ')') as instructor, c.Descriptions as description FROM Courses c JOIN Instructors i ON c.InstructorID = i.InstructorID JOIN Enrollments e ON c.CourseID = e.CourseID WHERE e.LearnerID = %s """ cursor.execute(query, (learner_id,)) courses = cursor.fetchall() # Get ratings and enrollment count for each course for course in courses: cursor.execute(""" SELECT AVG(Rating) as avg_rating, COUNT(*) as count FROM Enrollments WHERE CourseID = %s AND Rating IS NOT NULL """, (course['id'],)) rating_data = cursor.fetchone() if rating_data and rating_data['avg_rating']: course['rating'] = float(rating_data['avg_rating']) else: course['rating'] = None # Get enrollment count cursor.execute(""" SELECT COUNT(*) as enrolled FROM Enrollments WHERE CourseID = %s """, (course['id'],)) enrolled_data = cursor.fetchone() if enrolled_data: course['enrolled'] = enrolled_data['enrolled'] else: course['enrolled'] = 0 return courses except HTTPException as he: raise he except Exception as e: print(f"Error fetching enrolled courses: {str(e)}") raise HTTPException(status_code=500, detail=f"Error fetching enrolled courses: {str(e)}") finally: if 'conn' in locals(): conn.close() # Get user profile @router.get("/user/profile") async def get_user_profile( request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) username = user_data['username'] role = user_data['role'] # Create a unique cache key for this user's profile cache_key = f"user:profile:{username}:{role}" # Define the database fetch function async def fetch_profile_from_db(): # Connect to database conn = connect_db() try: # Get user information based on role with conn.cursor(pymysql.cursors.DictCursor) as cursor: if role == "Learner": cursor.execute(""" SELECT LearnerName as name, Email as email, PhoneNumber as phoneNumber FROM Learners WHERE AccountName = %s """, (username,)) user_info = cursor.fetchone() if not user_info: raise HTTPException(status_code=404, detail="Learner not found") elif role == "Instructor": cursor.execute(""" SELECT InstructorName as name, Email as email, Expertise as expertise FROM Instructors WHERE AccountName = %s """, (username,)) user_info = cursor.fetchone() if not user_info: raise HTTPException(status_code=404, detail="Instructor not found") else: raise HTTPException(status_code=403, detail="Invalid user role") # Add role and username to the response user_info['role'] = role user_info['username'] = username return user_info finally: if 'conn' in locals(): conn.close() # Use the cached data helper to implement the Valkey caching pattern return await get_cached_data(cache_key, fetch_profile_from_db, ttl=1800) # 30 minutes TTL except Exception as e: print(f"Token/user verification error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token or user not found") except HTTPException as he: raise he except Exception as e: print(f"Error fetching user profile: {str(e)}") raise HTTPException(status_code=500, detail=f"Error fetching user profile: {str(e)}") finally: pass # Connection is closed in the fetch function # Update user profile @router.put("/user/profile") async def update_user_profile( request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) username = user_data['username'] role = user_data['role'] # Get request body profile_data = await request.json() # Connect to database conn = connect_db() # Update user information based on role with conn.cursor(pymysql.cursors.DictCursor) as cursor: if role == "Learner": # Prepare update fields update_fields = [] params = [] if 'name' in profile_data: update_fields.append("LearnerName = %s") params.append(profile_data['name']) if 'email' in profile_data: update_fields.append("Email = %s") params.append(profile_data['email']) if 'phoneNumber' in profile_data: update_fields.append("PhoneNumber = %s") params.append(profile_data['phoneNumber']) if not update_fields: return {"message": "No fields to update"} # Add username to params params.append(username) # Construct and execute SQL sql = f""" UPDATE Learners SET {', '.join(update_fields)} WHERE AccountName = %s """ cursor.execute(sql, params) elif role == "Instructor": # Prepare update fields update_fields = [] params = [] if 'name' in profile_data: update_fields.append("InstructorName = %s") params.append(profile_data['name']) if 'email' in profile_data: update_fields.append("Email = %s") params.append(profile_data['email']) if 'expertise' in profile_data: update_fields.append("Expertise = %s") params.append(profile_data['expertise']) if not update_fields: return {"message": "No fields to update"} # Add username to params params.append(username) # Construct and execute SQL sql = f""" UPDATE Instructors SET {', '.join(update_fields)} WHERE AccountName = %s """ cursor.execute(sql, params) else: raise HTTPException(status_code=403, detail="Invalid user role") conn.commit() return {"message": "Profile updated successfully"} except Exception as e: print(f"Token/user verification error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token or user not found") except HTTPException as he: raise he except Exception as e: print(f"Error updating user profile: {str(e)}") raise HTTPException(status_code=500, detail=f"Error updating user profile: {str(e)}") finally: if 'conn' in locals(): conn.close() # Get dashboard data for the current user @router.get("/learner/dashboard") async def get_learner_dashboard( request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) # Get LearnerID from Learners table using the username conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT LearnerID, LearnerName FROM Learners WHERE AccountName = %s """, (user_data['username'],)) learner = cursor.fetchone() if not learner: raise HTTPException(status_code=404, detail="Learner not found") learner_id = learner['LearnerID'] learner_name = learner['LearnerName'] except Exception as e: print(f"Token/user verification error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token or user not found") # Calculate dashboard metrics dashboard_data = { "learnerName": learner_name, "enrolled": 0, "completed": 0, "completionRate": "0%", "lecturesPassed": 0, "statistics": { "lecturesPassed": [], "averageScores": [] }, "enrolledCourses": [] } with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Get enrollment count cursor.execute("SELECT COUNT(*) as count FROM Enrollments WHERE LearnerID = %s", (learner_id,)) enrolled_data = cursor.fetchone() dashboard_data["enrolled"] = enrolled_data['count'] if enrolled_data else 0 # Get completed courses count cursor.execute( "SELECT COUNT(*) as count FROM Enrollments WHERE LearnerID = %s AND Percentage = 100", (learner_id,) ) completed_data = cursor.fetchone() completed = completed_data['count'] if completed_data else 0 dashboard_data["completed"] = completed # Calculate completion rate if dashboard_data["enrolled"] > 0: rate = (completed / dashboard_data["enrolled"]) * 100 dashboard_data["completionRate"] = f"{rate:.1f}%" # Get passed lectures count cursor.execute( "SELECT COUNT(*) as count FROM LectureResults WHERE LearnerID = %s AND State = 'passed'", (learner_id,) ) passed_data = cursor.fetchone() dashboard_data["lecturesPassed"] = passed_data['count'] if passed_data else 0 # Get statistics data - passed lectures over time # Fix the learner dashboard query around line 1270 cursor.execute(""" SELECT Date, Score, DATE_FORMAT(Date, '%%Y-%%m-%%d') as formatted_date FROM LectureResults WHERE LearnerID = %s AND State = 'passed' ORDER BY Date """, (learner_id,)) stats_data = cursor.fetchall() date_groups = {} score_groups = {} for row in stats_data: date_str = row['formatted_date'] if date_str not in date_groups: date_groups[date_str] = 0 date_groups[date_str] += 1 if date_str not in score_groups: score_groups[date_str] = {"total": 0, "count": 0} score_groups[date_str]["total"] += row['Score'] score_groups[date_str]["count"] += 1 # Format the statistics data for date_str in date_groups: dashboard_data["statistics"]["lecturesPassed"].append({ "date": date_str, "count": date_groups[date_str] }) avg_score = score_groups[date_str]["total"] / score_groups[date_str]["count"] dashboard_data["statistics"]["averageScores"].append({ "date": date_str, "score": round(avg_score, 2) }) # Get enrolled courses with percentage cursor.execute(""" SELECT c.CourseID as id, c.CourseName as name, CONCAT(i.InstructorName, ' (', i.AccountName, ')') as instructor, c.Descriptions as description, e.Percentage as percentage FROM Courses c JOIN Instructors i ON c.InstructorID = i.InstructorID JOIN Enrollments e ON c.CourseID = e.CourseID WHERE e.LearnerID = %s """, (learner_id,)) courses = cursor.fetchall() dashboard_data["enrolledCourses"] = courses return dashboard_data except HTTPException as he: raise he except Exception as e: print(f"Error fetching dashboard data: {str(e)}") raise HTTPException(status_code=500, detail=f"Error fetching dashboard data: {str(e)}") finally: if 'conn' in locals(): conn.close() # Get dashboard data for instructor @router.get("/instructor/dashboard") async def get_instructor_dashboard( request: Request, course_id: Optional[int] = None, auth_token: str = Cookie(None) ): conn = None try: # 1) Auth token via cookie or header if not auth_token: auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): auth_token = auth_header.split(" ", 1)[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # 2) Decode and verify user_data = decode_token(auth_token) username = user_data.get("username") role = user_data.get("role") instructor_id = user_data.get("user_id") if role != "Instructor": raise HTTPException(status_code=403, detail="Only instructors can access this endpoint") if not username: raise HTTPException(status_code=401, detail="Invalid token payload") # 3) DB connection conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Fallback for old tokens without user_id if not instructor_id: cursor.execute( "SELECT InstructorID FROM Instructors WHERE AccountName = %s", (username,) ) row = cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="Instructor not found") instructor_id = row["InstructorID"] # --- General metrics --- cursor.execute(""" SELECT COUNT(*) AS total_courses FROM Courses WHERE InstructorID = %s """, (instructor_id,)) total_courses = cursor.fetchone()["total_courses"] cursor.execute(""" SELECT COUNT(DISTINCT e.LearnerID) AS total_students, COALESCE(AVG(e.Rating), 0) AS average_rating, COUNT(*) AS total_enrollments, SUM(CASE WHEN e.Percentage = 100 THEN 1 ELSE 0 END) AS completed_enrollments FROM Courses c LEFT JOIN Enrollments e ON c.CourseID = e.CourseID WHERE c.InstructorID = %s """, (instructor_id,)) stats = cursor.fetchone() or {} completion_rate = ( round( stats.get("completed_enrollments", 0) / stats.get("total_enrollments", 1) * 100, 1 ) if stats.get("total_enrollments") else 0.0 ) # --- Student growth (last 2 months) --- cursor.execute(""" SELECT DATE_FORMAT(EnrollmentDate, '%%Y-%%m') AS month, COUNT(DISTINCT LearnerID) AS students FROM Courses c JOIN Enrollments e ON c.CourseID = e.CourseID WHERE c.InstructorID = %s AND EnrollmentDate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 MONTH) GROUP BY month ORDER BY month DESC LIMIT 2 """, (instructor_id,)) growth = cursor.fetchall() current = growth[0]["students"] if len(growth) > 0 else 0 previous = growth[1]["students"] if len(growth) > 1 else 0 student_growth = ( round((current - previous) / previous * 100, 1) if previous else 0.0 ) # --- Course list summary --- cursor.execute(""" SELECT c.CourseID AS id, c.CourseName AS name, c.Descriptions AS description, c.AverageRating AS rating, COUNT(DISTINCT e.LearnerID) AS enrollments, AVG(e.Percentage) AS completionRate FROM Courses c LEFT JOIN Enrollments e ON c.CourseID = e.CourseID WHERE c.InstructorID = %s GROUP BY c.CourseID ORDER BY c.CreatedAt DESC """, (instructor_id,)) raw_courses = cursor.fetchall() formatted_courses = [ { "id": c["id"], "name": c["name"], "description": c["description"] or "", "enrollments": c["enrollments"] or 0, "rating": round(float(c["rating"]), 1) if c["rating"] else 0.0, "completionRate": round(float(c["completionRate"]), 1) if c["completionRate"] else 0.0 } for c in raw_courses ] # --- Enrollment trends (last 30 days) --- cursor.execute(""" SELECT DATE_FORMAT(e.EnrollmentDate, '%%Y-%%m-%%d') AS date, COUNT(*) AS value FROM Courses c JOIN Enrollments e ON c.CourseID = e.CourseID WHERE c.InstructorID = %s AND e.EnrollmentDate >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) GROUP BY date ORDER BY date """, (instructor_id,)) enroll_trends = cursor.fetchall() # --- Rating trends (last 30 days) --- cursor.execute(""" SELECT DATE_FORMAT(e.EnrollmentDate, '%%Y-%%m-%%d') AS date, AVG(e.Rating) AS value FROM Courses c JOIN Enrollments e ON c.CourseID = e.CourseID WHERE c.InstructorID = %s AND e.Rating IS NOT NULL AND e.EnrollmentDate >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) GROUP BY date ORDER BY date """, (instructor_id,)) rating_trends = cursor.fetchall() # --- Build base payload --- dashboard_data = { "metrics": { "totalCourses": total_courses, "totalStudents": stats.get("total_students", 0), "averageRating": round(stats.get("average_rating", 0), 1), "completionRate": completion_rate, "studentGrowth": student_growth }, "courses": formatted_courses, "enrollmentTrends": [ {"date": r["date"], "value": r["value"]} for r in enroll_trends ], "ratingTrends": [ {"date": r["date"], "value": round(r["value"], 1)} for r in rating_trends if r["value"] is not None ], "courseEnrollments": [ {"courseName": c["name"], "enrollments": c["enrollments"]} for c in formatted_courses ], "courseAnalytics": {} } # --- Detailed courseAnalytics if course_id given --- if course_id: # Ownership check cursor.execute(""" SELECT CourseID, CourseName FROM Courses WHERE CourseID = %s AND InstructorID = %s """, (course_id, instructor_id)) course_row = cursor.fetchone() if not course_row: raise HTTPException(status_code=404, detail="Course not found or not owned") course_name = course_row["CourseName"] # Basic course metrics cursor.execute(""" SELECT COUNT(DISTINCT e.LearnerID) AS total_enrollments, COALESCE(AVG(e.Rating), 0) AS average_rating, SUM(CASE WHEN e.Percentage = 100 THEN 1 ELSE 0 END) AS completed_enrollments, COUNT(*) AS all_with_progress FROM Enrollments e WHERE e.CourseID = %s """, (course_id,)) cm = cursor.fetchone() or {} total_enr = cm.get("total_enrollments", 0) total_with = cm.get("all_with_progress", 0) comp_rate = ( round(cm.get("completed_enrollments", 0) / total_with * 100, 1) if total_with else 0.0 ) # Enroll/Ratings trends (60 days) cursor.execute(""" SELECT DATE_FORMAT(EnrollmentDate, '%%Y-%%m-%%d') AS date, COUNT(*) AS value FROM Enrollments WHERE CourseID = %s AND EnrollmentDate >= DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) GROUP BY date ORDER BY date """, (course_id,)) ce_trends = cursor.fetchall() cursor.execute(""" SELECT DATE_FORMAT(EnrollmentDate, '%%Y-%%m-%%d') AS date, AVG(Rating) AS value FROM Enrollments WHERE CourseID = %s AND Rating IS NOT NULL AND EnrollmentDate >= DATE_SUB(CURRENT_DATE, INTERVAL 60 DAY) GROUP BY date ORDER BY date """, (course_id,)) cr_trends = cursor.fetchall() # Completion via LectureResults (30 days) cursor.execute(""" SELECT DATE_FORMAT(lr.Date, '%%Y-%%m-%%d') AS date, COUNT(DISTINCT CASE WHEN e.Percentage = 100 THEN e.LearnerID END) AS completed, COUNT(DISTINCT lr.LearnerID) AS total FROM LectureResults lr JOIN Enrollments e ON lr.LearnerID = e.LearnerID AND lr.CourseID = e.CourseID WHERE lr.CourseID = %s AND lr.Date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY) GROUP BY date ORDER BY date """, (course_id,)) comp_rows = cursor.fetchall() completion_trends = [ { "date": r["date"], "value": round(r["completed"] / r["total"] * 100, 1) if r["total"] else 0.0 } for r in comp_rows ] # Lecture-level analytics cursor.execute(""" SELECT l.LectureID AS lectureId, l.Title AS lecture_title, COUNT(DISTINCT lr.LearnerID) AS total_attempts, SUM(lr.State = 'passed') AS passed_count, COALESCE(AVG(lr.Score), 0) AS average_score FROM Lectures l LEFT JOIN LectureResults lr ON l.LectureID = lr.LectureID WHERE l.CourseID = %s GROUP BY l.LectureID, l.Title ORDER BY l.LectureID """, (course_id,)) lects = cursor.fetchall() lecture_analytics = [ { "lectureId": l["lectureId"], "title": l["lecture_title"], "totalAttempts": l["total_attempts"], "passedCount": l["passed_count"], "passRate": round(l["passed_count"] / l["total_attempts"] * 100, 1) if l["total_attempts"] else 0.0, "averageScore": round(float(l["average_score"]), 1) } for l in lects ] # Student progress distribution cursor.execute(""" SELECT CASE WHEN Percentage = 0 THEN 'Not Started' WHEN Percentage < 25 THEN '0-25%%' WHEN Percentage < 50 THEN '25-50%%' WHEN Percentage < 75 THEN '50-75%%' WHEN Percentage < 100 THEN '75-99%%' ELSE 'Completed' END AS progress_range, COUNT(*) AS student_count FROM Enrollments WHERE CourseID = %s GROUP BY progress_range ORDER BY FIELD(progress_range, 'Not Started','0-25%%','25-50%%', '50-75%%','75-99%%','Completed') """, (course_id,)) pd = cursor.fetchall() progress = [ {"range": p["progress_range"], "count": p["student_count"]} for p in pd ] dashboard_data["courseAnalytics"] = { "courseId": course_id, "courseName": course_name, "totalEnrollments": total_enr, "averageRating": round(float(cm.get("average_rating", 0)), 1), "completionRate": comp_rate, "enrollmentTrends": [ {"date": r["date"], "value": int(r["value"])} for r in ce_trends ], "ratingTrends": [ {"date": r["date"], "value": round(float(r["value"]), 1)} for r in cr_trends if r["value"] is not None ], "completionTrends": completion_trends, "lectureAnalytics": lecture_analytics, "studentProgress": progress } # 4) Return final payload return dashboard_data except HTTPException: raise except Exception as e: print(f"[ERROR] get_instructor_dashboard: {e}") raise HTTPException(status_code=500, detail="Internal server error") finally: if conn: conn.close() # Quiz submission model class QuizSubmission(BaseModel): answers: dict[int, str] # questionId -> selected answer text @router.post("/lectures/{lecture_id}/quiz/submit") async def submit_quiz_answers( lecture_id: int, submission: QuizSubmission, request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) # Get LearnerID from Learners table using the username conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT LearnerID FROM Learners WHERE AccountName = %s """, (user_data['username'],)) learner = cursor.fetchone() if not learner: raise HTTPException(status_code=404, detail="Learner not found") learner_id = learner['LearnerID'] except Exception as e: print(f"Token/user verification error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token or user not found") try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: # First verify the quiz exists for this lecture cursor.execute(""" SELECT QuizID FROM Quizzes WHERE LectureID = %s """, (lecture_id,)) quiz = cursor.fetchone() if not quiz: raise HTTPException(status_code=404, detail="Quiz not found for this lecture") quiz_id = quiz['QuizID'] # Get correct answers for validation cursor.execute(""" SELECT q.QuestionID, o.OptionText FROM Questions q JOIN Options o ON q.QuestionID = o.QuestionID WHERE q.QuizID = %s AND o.IsCorrect = 1 """, (quiz_id,)) correct_answers = {row['QuestionID']: row['OptionText'] for row in cursor.fetchall()} # Calculate score total_questions = len(correct_answers) if total_questions == 0: raise HTTPException(status_code=500, detail="No questions found for this quiz") correct_count = sum( 1 for q_id, answer in submission.answers.items() if str(q_id) in map(str, correct_answers.keys()) and answer == correct_answers[int(q_id)] ) score = (correct_count / total_questions) * 100 # Get the CourseID for this lecture cursor.execute(""" SELECT CourseID FROM Lectures WHERE LectureID = %s """, (lecture_id,)) lecture_data = cursor.fetchone() if not lecture_data: raise HTTPException(status_code=404, detail="Lecture not found") course_id = lecture_data['CourseID'] # Save or update the score using direct SQL instead of stored procedure try: # Use stored procedure to update or insert the lecture result cursor.execute( "CALL sp_update_lecture_result(%s, %s, %s, %s)", (learner_id, course_id, lecture_id, score) ) # Update course completion percentage try: # Get total lectures in the course cursor.execute(""" SELECT COUNT(*) as total_lectures FROM Lectures WHERE CourseID = %s """, (course_id,)) total_lectures = cursor.fetchone()['total_lectures'] # Get passed lectures cursor.execute(""" SELECT COUNT(*) as passed_lectures FROM LectureResults WHERE LearnerID = %s AND CourseID = %s AND State = 'passed' """, (learner_id, course_id)) passed_lectures = cursor.fetchone()['passed_lectures'] # Calculate percentage if total_lectures > 0: percentage_raw = (passed_lectures * 100.0) / total_lectures # Convert to percentage scale if percentage_raw < 10: percentage = 0 elif percentage_raw < 30: percentage = 20 elif percentage_raw < 50: percentage = 40 elif percentage_raw < 70: percentage = 60 elif percentage_raw < 90: percentage = 80 else: percentage = 100 # Update enrollment record cursor.execute(""" UPDATE Enrollments SET Percentage = %s WHERE LearnerID = %s AND CourseID = %s """, (percentage, learner_id, course_id)) except Exception as e: print(f"Error updating course percentage: {str(e)}") # Continue even if percentage update fails conn.commit() print(f"Score updated successfully for learner {learner_id}, lecture {lecture_id}") except Exception as e: print(f"Error saving quiz score: {str(e)}") conn.rollback() raise HTTPException(status_code=500, detail=f"Failed to save quiz score: {str(e)}") return { "score": score, "total_questions": total_questions, "correct_answers": correct_count } finally: conn.close() except HTTPException as he: raise he except Exception as e: print(f"Error submitting quiz: {str(e)}") raise HTTPException(status_code=500, detail=f"Error submitting quiz: {str(e)}") @router.get("/lectures/{lecture_id}/quiz/results") async def get_quiz_results( lecture_id: int, request: Request, auth_token: str = Cookie(None) ): try: # Verify auth token if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] if not auth_token: raise HTTPException(status_code=401, detail="No authentication token provided") try: user_data = decode_token(auth_token) user_id = user_data.get("id") # Create a cache key based on user ID and lecture ID cache_key = f"quiz:results:lecture:{lecture_id}:learner:{user_id}" # Define the database fetch function async def fetch_quiz_results_from_db(): conn = connect_db() try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT Score, State, Date FROM LectureResults WHERE LearnerID = %s AND LectureID = %s ORDER BY Date DESC LIMIT 1 """, (user_id, lecture_id)) result = cursor.fetchone() if not result: return None return { "score": float(result["Score"]), "status": result["State"], "date": result["Date"].isoformat() } finally: conn.close() # Use the caching mechanism to get the data # Short TTL since quiz results may change frequently return await get_cached_data( cache_key, fetch_quiz_results_from_db, ttl=300 # Cache for 5 minutes ) except Exception as e: print(f"Token decode error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token") except HTTPException: raise except Exception as e: print(f"Error in get_quiz_results: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Get instructor course details @router.get("/instructor/courses/{course_id}", response_model=Course) async def get_instructor_course_details( request: Request, course_id: int, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) username = user_data['username'] role = user_data['role'] instructor_id = user_data.get('user_id') # Verify user is an instructor if role != "Instructor": raise HTTPException(status_code=403, detail="Only instructors can access this endpoint") # Connect to database conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Get instructor ID from token or fallback to database lookup if not instructor_id: # Fallback for old tokens without user_id cursor.execute(""" SELECT InstructorID FROM Instructors WHERE AccountName = %s """, (username,)) instructor = cursor.fetchone() if not instructor: raise HTTPException(status_code=404, detail="Instructor not found") instructor_id = instructor['InstructorID'] # Get course details, ensuring it belongs to this instructor query = """ SELECT c.CourseID as id, c.CourseName as name, CONCAT(i.InstructorName, ' (', i.AccountName, ')') as instructor, c.Descriptions as description, (SELECT COUNT(*) FROM Enrollments WHERE CourseID = c.CourseID) as enrolled, COALESCE( (SELECT AVG(Rating) FROM Enrollments WHERE CourseID = c.CourseID AND Rating IS NOT NULL), 0 ) as rating, c.Skills as skills, c.Difficulty as difficulty, c.EstimatedDuration as duration FROM Courses c JOIN Instructors i ON c.InstructorID = i.InstructorID WHERE c.CourseID = %s AND c.InstructorID = %s """ cursor.execute(query, (course_id, instructor_id)) course = cursor.fetchone() if not course: raise HTTPException(status_code=404, detail=f"Course with ID {course_id} not found or not owned by this instructor") # Format the course data if course['rating']: course['rating'] = float(course['rating']) # Convert skills from JSON string if needed if isinstance(course.get('skills'), str): try: course['skills'] = json.loads(course['skills']) except: course['skills'] = [] return course except Exception as e: print(f"Database error: {str(e)}") raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") except HTTPException as he: raise he except Exception as e: print(f"Error fetching instructor course details: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) finally: if 'conn' in locals(): conn.close() # CreateLecture model and create_lecture endpoint to handle lecture creation with video upload and quiz class CreateLecture(BaseModel): title: str description: str content: str quiz: Optional[dict] = None @router.post("/courses/{course_id}/lectures") async def create_lecture( request: Request, course_id: int, auth_token: str = Cookie(None), title: str = Form(...), description: str = Form(...), content: str = Form(...), video: Optional[UploadFile] = File(None), quiz: Optional[str] = Form(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) username = user_data['username'] role = user_data['role'] # Verify user is an instructor if role != "Instructor": raise HTTPException(status_code=403, detail="Only instructors can access this endpoint") # Connect to database conn = connect_db() cursor = conn.cursor(pymysql.cursors.DictCursor) try: # Get instructor ID from token or fallback to database lookup instructor_id = user_data.get('user_id') if not instructor_id: # Fallback for old tokens without user_id cursor.execute(""" SELECT InstructorID FROM Instructors WHERE AccountName = %s """, (username,)) instructor = cursor.fetchone() if not instructor: raise HTTPException(status_code=404, detail="Instructor not found") instructor_id = instructor['InstructorID'] # Verify this instructor owns this course cursor.execute(""" SELECT CourseID FROM Courses WHERE CourseID = %s AND InstructorID = %s """, (course_id, instructor_id)) if not cursor.fetchone(): raise HTTPException(status_code=403, detail="Not authorized to modify this course") # Create the lecture cursor.execute(""" INSERT INTO Lectures (CourseID, Title, Description, Content) VALUES (%s, %s, %s, %s) """, (course_id, title, description, content)) # Get the newly created lecture ID lecture_id = cursor.lastrowid # Process quiz data first (faster database operations) quiz_id = None if quiz: quiz_data = json.loads(quiz) if quiz_data and quiz_data.get('questions'): # Insert quiz cursor.execute(""" INSERT INTO Quizzes (LectureID, Title, Description) VALUES (%s, %s, %s) """, (lecture_id, f"Quiz for {title}", description)) quiz_id = cursor.lastrowid # Batch insert questions and options for better performance questions_to_insert = [] options_to_insert = [] for question in quiz_data['questions']: # Insert question first to get the ID cursor.execute(""" INSERT INTO Questions (QuizID, QuestionText) VALUES (%s, %s) """, (quiz_id, question['question'])) question_id = cursor.lastrowid # Prepare batch options for this question for i, option in enumerate(question['options']): options_to_insert.append(( question_id, option, i == question['correctAnswer'] )) # Batch insert all options at once if options_to_insert: cursor.executemany(""" INSERT INTO Options (QuestionID, OptionText, IsCorrect) VALUES (%s, %s, %s) """, options_to_insert) # Commit all database changes at once conn.commit() # Prepare response response = { "id": lecture_id, "title": title, "message": "Lecture created successfully" } # Note: Video upload is now handled separately through upload_endpoints.py # This separates concerns and allows for better error handling and chunked uploads if video: response["note"] = "Lecture created successfully. Please use the dedicated upload endpoints for video upload." # Invalidate cache patterns in background (non-blocking) def background_cache_invalidation(): try: # Clear course-specific caches invalidate_cache_pattern(f"courses:id:{course_id}:*") invalidate_cache_pattern(f"instructor:courses:*") # Clear lecture cache if it exists invalidate_cache_pattern(f"lectures:id:*") print(f"Cache invalidation completed for course {course_id}") except Exception as cache_error: print(f"Background cache invalidation failed: {str(cache_error)}") # Start background cache invalidation cache_thread = threading.Thread(target=background_cache_invalidation) cache_thread.daemon = True cache_thread.start() return response except HTTPException: raise except Exception as e: print(f"Error in create_lecture: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to create lecture: {str(e)}") except Exception as e: print(f"Token/user verification error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token or user not found") except HTTPException: raise except Exception as e: print(f"Error in create_lecture: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) finally: if 'conn' in locals(): conn.close() # Optimized preview endpoint for CoursePreview.js - combines course + lectures @router.get("/courses/{course_id}/preview") async def get_course_preview_data(course_id: int, request: Request, auth_token: str = Cookie(None)): try: # Authentication if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] if not auth_token: raise HTTPException(status_code=401, detail="No authentication token provided") try: user_data = decode_token(auth_token) user_id = user_data.get('user_id') except Exception as e: print(f"Token decode error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token") # Cache key for combined preview data cache_key = f"course:preview:{course_id}:user:{user_id}" # Fetch all data in one optimized query async def fetch_preview_data_from_db(): conn = connect_db() try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Get course details with enrollment status course_query = """ SELECT c.CourseID as id, c.CourseName as name, c.Descriptions as description, c.EstimatedDuration as duration, c.Skills as skills, c.Difficulty as difficulty, CONCAT(i.InstructorName, ' (', i.AccountName, ')') as instructor, i.InstructorID as instructor_id, COALESCE(enrollment_stats.enrolled, 0) as enrolled, COALESCE(rating_stats.avg_rating, NULL) as rating, CASE WHEN user_enrollment.LearnerID IS NOT NULL THEN TRUE ELSE FALSE END as is_enrolled, user_enrollment.Rating as user_rating FROM Courses c JOIN Instructors i ON c.InstructorID = i.InstructorID LEFT JOIN ( SELECT CourseID, COUNT(*) as enrolled FROM Enrollments GROUP BY CourseID ) enrollment_stats ON c.CourseID = enrollment_stats.CourseID LEFT JOIN ( SELECT CourseID, AVG(Rating) as avg_rating FROM Enrollments WHERE Rating IS NOT NULL GROUP BY CourseID ) rating_stats ON c.CourseID = rating_stats.CourseID LEFT JOIN ( SELECT e2.CourseID, e2.LearnerID, e2.Rating FROM Enrollments e2 JOIN Learners l ON e2.LearnerID = l.LearnerID WHERE l.LearnerID = %s ) user_enrollment ON c.CourseID = user_enrollment.CourseID WHERE c.CourseID = %s """ cursor.execute(course_query, (user_id, course_id)) course = cursor.fetchone() if not course: raise HTTPException(status_code=404, detail="Course not found") # Get lectures for this course lectures_query = """ SELECT LectureID as id, Title as title, Description as description FROM Lectures WHERE CourseID = %s ORDER BY LectureID ASC """ cursor.execute(lectures_query, (course_id,)) lectures = cursor.fetchall() # Format skills if it's JSON skills = [] if course['skills']: try: skills = json.loads(course['skills']) except: skills = [] # Format the response return { 'course': { 'id': course['id'], 'name': course['name'], 'description': course['description'], 'duration': course['duration'], 'skills': skills, 'difficulty': course['difficulty'], 'instructor': course['instructor'], 'instructor_id': course['instructor_id'], 'enrolled': course['enrolled'], 'rating': float(course['rating']) if course['rating'] else None, 'is_enrolled': course['is_enrolled'], 'user_rating': int(course['user_rating']) if course['user_rating'] else None }, 'lectures': [ { 'id': lecture['id'], 'title': lecture['title'], 'description': lecture['description'] } for lecture in lectures ] } finally: conn.close() # Use caching with shorter TTL since it includes user-specific data return await get_cached_data( cache_key, fetch_preview_data_from_db, ttl=900, # 15 minutes for user-specific data use_compression=True ) except HTTPException: raise except Exception as e: print(f"Error in get_course_preview_data: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # Debug endpoint for instructor to list their courses - no caching, direct DB access @router.get("/instructor/debug/my-courses") async def debug_my_courses( request: Request, auth_token: str = Cookie(None) ): try: if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") user_data = decode_token(auth_token) username = user_data['username'] role = user_data['role'] instructor_id = user_data.get('user_id') if role != "Instructor": raise HTTPException(status_code=403, detail="Only instructors can access this endpoint") conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: if not instructor_id: cursor.execute(""" SELECT InstructorID FROM Instructors WHERE AccountName = %s """, (username,)) instructor = cursor.fetchone() if not instructor: raise HTTPException(status_code=404, detail="Instructor not found") instructor_id = instructor['InstructorID'] cursor.execute(""" SELECT CourseID, CourseName, InstructorID FROM Courses WHERE InstructorID = %s ORDER BY CourseID """, (instructor_id,)) courses = cursor.fetchall() return { "instructor_id": instructor_id, "username": username, "courses": courses, "course_count": len(courses) } except Exception as e: print(f"Debug error: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) finally: if 'conn' in locals(): conn.close() # Get enrolled learners for a specific course (instructor only) @router.get("/instructor/courses/{course_id}/enrollments") async def get_course_enrollments( course_id: int, request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data user_data = decode_token(auth_token) username = user_data['username'] role = user_data['role'] instructor_id = user_data.get('user_id') # Verify user is an instructor if role != "Instructor": raise HTTPException(status_code=403, detail="Only instructors can access this endpoint") conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: # Get instructor ID if not in token if not instructor_id: cursor.execute(""" SELECT InstructorID FROM Instructors WHERE AccountName = %s """, (username,)) instructor = cursor.fetchone() if not instructor: raise HTTPException(status_code=404, detail="Instructor not found") instructor_id = instructor['InstructorID'] # Verify the course belongs to this instructor cursor.execute(""" SELECT CourseID, CourseName FROM Courses WHERE CourseID = %s AND InstructorID = %s """, (course_id, instructor_id)) course = cursor.fetchone() if not course: raise HTTPException(status_code=404, detail="Course not found or you don't have permission to access it") # Get enrolled learners with their enrollment details cursor.execute(""" SELECT l.LearnerID, l.LearnerName, l.Email, e.EnrollmentDate, COALESCE(e.Percentage, 0) as progress, COALESCE(e.Rating, 0) as rating FROM Enrollments e JOIN Learners l ON e.LearnerID = l.LearnerID WHERE e.CourseID = %s ORDER BY e.EnrollmentDate DESC """, (course_id,)) enrollments = cursor.fetchall() # Calculate completion rate total_enrollments = len(enrollments) completed_enrollments = sum(1 for e in enrollments if e['progress'] == 100) completion_rate = (completed_enrollments / total_enrollments * 100) if total_enrollments > 0 else 0 # Format the data for the frontend formatted_enrollments = [] for enrollment in enrollments: formatted_enrollments.append({ 'learner_id': enrollment['LearnerID'], 'learner_name': enrollment['LearnerName'], 'email': enrollment['Email'], 'enrollment_date': enrollment['EnrollmentDate'].strftime('%b %d, %Y') if enrollment['EnrollmentDate'] else 'N/A', 'progress': enrollment['progress'], 'rating': enrollment['rating'] }) return { 'course_id': course_id, 'course_name': course['CourseName'], 'total_enrollments': total_enrollments, 'completion_rate': round(completion_rate, 1), 'enrollments': formatted_enrollments } except HTTPException as he: raise he except Exception as e: print(f"Error fetching course enrollments: {str(e)}") raise HTTPException(status_code=500, detail=f"Error fetching course enrollments: {str(e)}") finally: if 'conn' in locals(): conn.close() # Rating submission model class RatingSubmission(BaseModel): rating: int = Field(..., ge=1, le=5, description="Rating value between 1 and 5") # Submit rating for a course @router.put("/courses/{course_id}/rating") async def submit_course_rating( course_id: int, rating_data: RatingSubmission, request: Request, auth_token: str = Cookie(None) ): try: # Get token from header if not in cookie if not auth_token: auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): auth_token = auth_header.split(' ')[1] else: raise HTTPException(status_code=401, detail="No authentication token provided") # Verify token and get user data try: user_data = decode_token(auth_token) learner_id = user_data.get('user_id') if not learner_id: # Fallback for old tokens without user_id - do database lookup conn = connect_db() with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT LearnerID FROM Learners WHERE AccountName = %s """, (user_data['username'],)) learner = cursor.fetchone() if not learner: raise HTTPException(status_code=404, detail="Learner not found") learner_id = learner['LearnerID'] else: # Use user_id from token conn = connect_db() except Exception as e: print(f"Token/user verification error: {str(e)}") raise HTTPException(status_code=401, detail="Invalid authentication token or user not found") # Check if the course exists and user is enrolled with conn.cursor(pymysql.cursors.DictCursor) as cursor: cursor.execute(""" SELECT e.EnrollmentID FROM Enrollments e JOIN Courses c ON e.CourseID = c.CourseID WHERE e.CourseID = %s AND e.LearnerID = %s """, (course_id, learner_id)) enrollment = cursor.fetchone() if not enrollment: raise HTTPException(status_code=404, detail="Course not found or you are not enrolled") # Update the rating in the Enrollments table try: cursor.execute(""" UPDATE Enrollments SET Rating = %s WHERE CourseID = %s AND LearnerID = %s """, (rating_data.rating, course_id, learner_id)) conn.commit() # Invalidate cache for course rating data redis_client = get_redis_client() pattern_course = f"courses:id:{course_id}:*" pattern_preview = f"course:preview:{course_id}:*" print(f"Invalidating cache patterns: {pattern_course}, {pattern_preview}") invalidate_cache_pattern(pattern_course) invalidate_cache_pattern(pattern_preview) return {"message": "Rating submitted successfully", "rating": rating_data.rating} except Exception as e: conn.rollback() print(f"Error updating rating: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to submit rating: {str(e)}") except HTTPException as he: raise he except Exception as e: print(f"Error submitting rating: {str(e)}") raise HTTPException(status_code=500, detail=f"Error submitting rating: {str(e)}") finally: if 'conn' in locals(): conn.close()