| |
|
|
| from datetime import date, datetime, timedelta |
| from typing import List |
| import os |
| import random |
| import secrets |
| import smtplib |
| from email.mime.text import MIMEText |
|
|
| from fastapi import FastAPI, Depends, Request, HTTPException, Query |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import FileResponse |
|
|
| from sqlalchemy.orm import Session |
| from sqlalchemy import func |
|
|
| import stripe |
|
|
| |
| from core.database import Base, engine, SessionLocal |
| from core.config import settings |
| from models.membership_plan import MembershipPlan |
| from models.membership import Membership |
| from models.login_otp import LoginOTP |
| from models.product import Product |
| from models.membership_invite import MembershipInvite |
| from models.membership_request import MembershipRequest |
| from models.student import Student |
| from models.exam import Exam, ExamBatch, ExamStudentRegistration |
| from models.attendance import Attendance |
| from models.student_comment import StudentComment |
| from models.class_ import Class, ClassEnrollment, ClassMembershipPlan |
|
|
| |
| from schemas.class_ import ( |
| ClassCreate, |
| ClassUpdate, |
| ClassOut, |
| ClassEnrollmentCreate, |
| ClassEnrollmentOut, |
| ) |
| from schemas.exam import ( |
| ExamCreate, |
| ExamUpdate, |
| ExamOut, |
| ExamBatchCreate, |
| ExamBatchUpdate, |
| ExamBatchOut, |
| ExamStudentRegistrationCreate, |
| ExamStudentRegistrationUpdate, |
| ExamStudentRegistrationOut, |
| ExamWithBatches, |
| ) |
| from services import class_service |
| from services.email_service import send_email |
|
|
|
|
| |
|
|
| if settings.STRIPE_SECRET_KEY: |
| stripe.api_key = settings.STRIPE_SECRET_KEY |
|
|
|
|
| app = FastAPI( |
| title="Karate School Membership API", |
| version="0.1.0", |
| ) |
|
|
| |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
|
|
| |
| STATIC_DIR = os.path.join(os.path.dirname(__file__), "frontend_dist") |
| ASSETS_DIR = os.path.join(STATIC_DIR, "assets") |
|
|
| if os.path.isdir(ASSETS_DIR): |
| app.mount( |
| "/assets", |
| StaticFiles(directory=ASSETS_DIR), |
| name="assets", |
| ) |
|
|
|
|
| |
|
|
| def get_db(): |
| db = SessionLocal() |
| try: |
| yield db |
| finally: |
| db.close() |
|
|
|
|
| |
|
|
| def send_otp_email(email: str, code: str) -> bool: |
| """Send OTP email. Returns True if sent, False if SMTP not configured or failed.""" |
| if not settings.SMTP_HOST or not settings.SMTP_FROM: |
| return False |
|
|
| msg = MIMEText( |
| f"Your Karate Portal login code is: {code}\n\n" |
| f"This code will expire in 10 minutes." |
| ) |
| msg["Subject"] = "Your Karate Portal login code" |
| msg["From"] = settings.SMTP_FROM |
| msg["To"] = email |
|
|
| try: |
| with smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) as server: |
| server.starttls() |
| if settings.SMTP_USERNAME and settings.SMTP_PASSWORD: |
| server.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD) |
| server.send_message(msg) |
| return True |
| except Exception: |
| return False |
|
|
|
|
| |
|
|
| @app.on_event("startup") |
| def on_startup(): |
| Base.metadata.create_all(bind=engine) |
|
|
|
|
| |
|
|
| @app.get("/api/health") |
| def health_check(): |
| return {"status": "ok", "message": "Karate API is running"} |
|
|
|
|
| @app.get("/api/db-status") |
| def db_status(): |
| return { |
| "database_url": str(engine.url) |
| } |
|
|
|
|
| |
|
|
| @app.post("/api/auth/request-otp") |
| async def request_otp(payload: dict, db: Session = Depends(get_db)): |
| email = payload.get("email") |
| if not email: |
| raise HTTPException(status_code=400, detail="Email is required") |
|
|
| code = f"{random.randint(0, 999999):06d}" |
| expires_at = datetime.utcnow() + timedelta(minutes=10) |
|
|
| |
| db.query(LoginOTP).filter(LoginOTP.email == email).delete() |
|
|
| otp = LoginOTP( |
| email=email, |
| code=code, |
| expires_at=expires_at, |
| used=False, |
| ) |
| db.add(otp) |
| db.commit() |
|
|
| email_sent = send_otp_email(email, code) |
|
|
| |
| if not email_sent: |
| return { |
| "status": "sent", |
| "debug_code": code, |
| "note": "SMTP not configured; OTP returned in response for testing.", |
| } |
|
|
| return {"status": "sent"} |
|
|
|
|
| @app.post("/api/auth/login") |
| async def login_with_otp(payload: dict, db: Session = Depends(get_db)): |
| email = payload.get("email") |
| code = payload.get("otp") |
|
|
| if not email or not code: |
| raise HTTPException(status_code=400, detail="Email and otp are required") |
|
|
| now = datetime.utcnow() |
|
|
| otp = ( |
| db.query(LoginOTP) |
| .filter(LoginOTP.email == email) |
| .filter(LoginOTP.code == code) |
| .filter(LoginOTP.used == False) |
| .filter(LoginOTP.expires_at >= now) |
| .first() |
| ) |
|
|
| if not otp: |
| raise HTTPException(status_code=400, detail="Invalid or expired OTP") |
|
|
| otp.used = True |
| db.commit() |
|
|
| |
| db.query(Membership).filter(Membership.user_email == email).update( |
| {Membership.last_login_at: now} |
| ) |
| db.commit() |
|
|
| membership = ( |
| db.query(Membership) |
| .filter(Membership.user_email == email) |
| .order_by(Membership.start_date.desc()) |
| .first() |
| ) |
| name = membership.user_name if membership else (email.split("@")[0]) |
|
|
| return { |
| "status": "ok", |
| "email": email, |
| "name": name, |
| } |
|
|
|
|
| |
|
|
| @app.post("/api/coach/login") |
| async def coach_login_with_otp(payload: dict, db: Session = Depends(get_db)): |
| """Coach login with OTP - checks if email is assigned to any class.""" |
| email = payload.get("email") |
| code = payload.get("otp") |
|
|
| if not email or not code: |
| raise HTTPException(status_code=400, detail="Email and otp are required") |
|
|
| now = datetime.utcnow() |
|
|
| otp = ( |
| db.query(LoginOTP) |
| .filter(LoginOTP.email == email) |
| .filter(LoginOTP.code == code) |
| .filter(LoginOTP.used == False) |
| .filter(LoginOTP.expires_at >= now) |
| .first() |
| ) |
|
|
| if not otp: |
| raise HTTPException(status_code=400, detail="Invalid or expired OTP") |
|
|
| |
| coach_classes = db.query(Class).filter(Class.coach_email == email).first() |
| if not coach_classes: |
| raise HTTPException(status_code=403, detail="Email is not assigned as a coach to any class") |
|
|
| otp.used = True |
| db.commit() |
|
|
| |
| name = email.split("@")[0].replace(".", " ").title() |
|
|
| return { |
| "status": "ok", |
| "email": email, |
| "name": name, |
| } |
|
|
|
|
| @app.get("/api/coach/classes") |
| def get_coach_classes(email: str, db: Session = Depends(get_db)): |
| """Get all classes assigned to a coach.""" |
| classes = db.query(Class).filter(Class.coach_email == email, Class.is_active == True).all() |
| result = [] |
| for c in classes: |
| enrollments = db.query(ClassEnrollment).filter( |
| ClassEnrollment.class_id == c.id, |
| ClassEnrollment.status == "joined" |
| ).all() |
| |
| students = [] |
| for e in enrollments: |
| if e.student_id: |
| student = db.query(Student).filter(Student.id == e.student_id).first() |
| if student: |
| students.append({ |
| "id": student.id, |
| "first_name": student.first_name, |
| "last_name": student.last_name, |
| "email": e.student_email, |
| }) |
| else: |
| students.append({ |
| "id": None, |
| "first_name": e.student_name or e.student_email.split("@")[0], |
| "last_name": "", |
| "email": e.student_email, |
| }) |
| |
| result.append({ |
| "id": c.id, |
| "name": c.name, |
| "description": c.description, |
| "class_time": c.class_time, |
| "days_of_week": c.days_of_week.split(",") if c.days_of_week else [], |
| "max_students": c.max_students, |
| "students": students, |
| "student_count": len(students), |
| }) |
| return result |
|
|
|
|
| @app.get("/api/coach/classes/{class_id}/students") |
| def get_coach_class_students(class_id: int, email: str, db: Session = Depends(get_db)): |
| """Get students in a class with their attendance for today.""" |
| |
| class_obj = db.query(Class).filter(Class.id == class_id, Class.coach_email == email).first() |
| if not class_obj: |
| raise HTTPException(status_code=404, detail="Class not found or not assigned to this coach") |
| |
| enrollments = db.query(ClassEnrollment).filter( |
| ClassEnrollment.class_id == class_id, |
| ClassEnrollment.status == "joined" |
| ).all() |
| |
| today = date.today() |
| students = [] |
| for e in enrollments: |
| student_data = None |
| if e.student_id: |
| student = db.query(Student).filter(Student.id == e.student_id).first() |
| if student: |
| student_data = { |
| "id": student.id, |
| "first_name": student.first_name, |
| "last_name": student.last_name, |
| "email": e.student_email, |
| } |
| |
| if not student_data: |
| student_data = { |
| "id": None, |
| "first_name": e.student_name or e.student_email.split("@")[0], |
| "last_name": "", |
| "email": e.student_email, |
| } |
| |
| |
| attendance = None |
| if e.student_id: |
| attendance = db.query(Attendance).filter( |
| Attendance.class_id == class_id, |
| Attendance.student_id == e.student_id, |
| Attendance.attendance_date == today |
| ).first() |
| |
| student_data["attendance_id"] = attendance.id if attendance else None |
| student_data["is_present"] = attendance.is_present if attendance else None |
| students.append(student_data) |
| |
| return { |
| "class": { |
| "id": class_obj.id, |
| "name": class_obj.name, |
| "description": class_obj.description, |
| }, |
| "students": students, |
| } |
|
|
|
|
| @app.post("/api/coach/classes/{class_id}/attendance") |
| def mark_attendance( |
| class_id: int, |
| payload: dict, |
| db: Session = Depends(get_db) |
| ): |
| """Mark attendance for a student in a class.""" |
| email = payload.get("coach_email") |
| student_id = payload.get("student_id") |
| is_present = payload.get("is_present", True) |
| |
| if not email or not student_id: |
| raise HTTPException(status_code=400, detail="coach_email and student_id are required") |
| |
| |
| class_obj = db.query(Class).filter(Class.id == class_id, Class.coach_email == email).first() |
| if not class_obj: |
| raise HTTPException(status_code=404, detail="Class not found or not assigned to this coach") |
| |
| today = date.today() |
| |
| |
| attendance = db.query(Attendance).filter( |
| Attendance.class_id == class_id, |
| Attendance.student_id == student_id, |
| Attendance.attendance_date == today |
| ).first() |
| |
| if attendance: |
| attendance.is_present = is_present |
| attendance.coach_email = email |
| else: |
| attendance = Attendance( |
| class_id=class_id, |
| student_id=student_id, |
| attendance_date=today, |
| is_present=is_present, |
| coach_email=email, |
| ) |
| db.add(attendance) |
| |
| db.commit() |
| db.refresh(attendance) |
| |
| return { |
| "status": "ok", |
| "attendance_id": attendance.id, |
| "is_present": attendance.is_present, |
| } |
|
|
|
|
| @app.get("/api/coach/students/{student_id}") |
| def get_coach_student_profile(student_id: int, email: str, db: Session = Depends(get_db)): |
| """Get student profile with comments for coach.""" |
| student = db.query(Student).filter(Student.id == student_id).first() |
| if not student: |
| raise HTTPException(status_code=404, detail="Student not found") |
| |
| |
| comments = db.query(StudentComment).filter( |
| StudentComment.student_id == student_id |
| ).order_by(StudentComment.created_at.desc()).all() |
| |
| comments_list = [] |
| for c in comments: |
| |
| coach_name = c.coach_email.split("@")[0].replace(".", " ").title() |
| comments_list.append({ |
| "id": c.id, |
| "coach_email": c.coach_email, |
| "coach_name": coach_name, |
| "comment": c.comment, |
| "created_at": c.created_at.isoformat(), |
| }) |
| |
| return { |
| "id": student.id, |
| "first_name": student.first_name, |
| "last_name": student.last_name, |
| "date_of_birth": student.date_of_birth.isoformat() if student.date_of_birth else None, |
| "gender": student.gender, |
| "medical_notes": student.medical_notes, |
| "comments": comments_list, |
| } |
|
|
|
|
| @app.post("/api/coach/students/{student_id}/comments") |
| def add_student_comment( |
| student_id: int, |
| payload: dict, |
| db: Session = Depends(get_db) |
| ): |
| """Add a comment to a student profile.""" |
| email = payload.get("coach_email") |
| comment_text = payload.get("comment") |
| |
| if not email or not comment_text: |
| raise HTTPException(status_code=400, detail="coach_email and comment are required") |
| |
| student = db.query(Student).filter(Student.id == student_id).first() |
| if not student: |
| raise HTTPException(status_code=404, detail="Student not found") |
| |
| comment = StudentComment( |
| student_id=student_id, |
| coach_email=email, |
| comment=comment_text, |
| ) |
| db.add(comment) |
| db.commit() |
| db.refresh(comment) |
| |
| coach_name = email.split("@")[0].replace(".", " ").title() |
| |
| return { |
| "id": comment.id, |
| "coach_email": comment.coach_email, |
| "coach_name": coach_name, |
| "comment": comment.comment, |
| "created_at": comment.created_at.isoformat(), |
| } |
|
|
|
|
| |
|
|
| @app.get("/api/admin/overview") |
| def get_admin_overview(db: Session = Depends(get_db)): |
| memberships = db.query(Membership).all() |
| total_members = len(memberships) |
| active_members = len([m for m in memberships if m.status == "active"]) |
| pending_members = len([m for m in memberships if m.status == "pending"]) |
| expired_members = len([m for m in memberships if m.status == "expired"]) |
|
|
| mrr = 0.0 |
| for m in memberships: |
| if m.status != "active": |
| continue |
| if m.billing_period == "monthly": |
| mrr += float(m.price) |
| elif m.billing_period == "quarterly": |
| mrr += float(m.price) / 3.0 |
| elif m.billing_period == "half_yearly": |
| mrr += float(m.price) / 6.0 |
| elif m.billing_period == "annual": |
| mrr += float(m.price) / 12.0 |
|
|
| plans = db.query(MembershipPlan).all() |
| members_by_plan = [] |
| for p in plans: |
| count = len( |
| [m for m in memberships if m.plan_id == p.id and m.status == "active"] |
| ) |
| members_by_plan.append( |
| {"plan_id": p.id, "plan_name": p.name, "count": count} |
| ) |
|
|
| return { |
| "total_members": total_members, |
| "active_members": active_members, |
| "pending_members": pending_members, |
| "expired_members": expired_members, |
| "monthly_recurring_revenue": round(mrr, 2), |
| "annual_recurring_revenue": round(mrr * 12, 2), |
| "churn_last_30_days": 0, |
| "new_members_last_30_days": 0, |
| "members_by_plan": members_by_plan, |
| } |
|
|
|
|
| @app.get("/api/admin/plans") |
| def get_admin_plans(db: Session = Depends(get_db)): |
| plans = db.query(MembershipPlan).all() |
| return [ |
| { |
| "id": p.id, |
| "name": p.name, |
| "billing_period": p.billing_period, |
| "price": float(p.price), |
| "stripe_link": p.stripe_link, |
| "description": p.description, |
| "max_students": p.max_students, |
| "is_active": p.is_active, |
| "is_default": p.is_default, |
| } |
| for p in plans |
| ] |
|
|
|
|
| @app.post("/api/admin/plans") |
| def create_plan(plan: dict, db: Session = Depends(get_db)): |
| """ |
| Create a new membership plan. |
| Expects JSON with: name, billing_period, price, stripe_link, description?, is_active?, is_default? |
| """ |
| |
| is_default = plan.get("is_default", False) |
| if is_default: |
| current_default_count = db.query(MembershipPlan).filter(MembershipPlan.is_default == True).count() |
| if current_default_count >= 2: |
| raise HTTPException(status_code=400, detail="Maximum 2 default plans allowed. Please uncheck another default plan first.") |
| |
| new_plan = MembershipPlan( |
| name=plan["name"], |
| billing_period=plan["billing_period"], |
| price=plan["price"], |
| stripe_link=plan.get("stripe_link"), |
| description=plan.get("description"), |
| max_students=plan.get("max_students", 1), |
| is_active=plan.get("is_active", True), |
| is_default=is_default, |
| ) |
| db.add(new_plan) |
| db.commit() |
| db.refresh(new_plan) |
| return { |
| "id": new_plan.id, |
| "name": new_plan.name, |
| "billing_period": new_plan.billing_period, |
| "price": float(new_plan.price), |
| "stripe_link": new_plan.stripe_link, |
| "description": new_plan.description, |
| "max_students": new_plan.max_students, |
| "is_active": new_plan.is_active, |
| "is_default": new_plan.is_default, |
| } |
|
|
|
|
| @app.put("/api/admin/plans/{plan_id}") |
| def update_plan(plan_id: int, plan: dict, db: Session = Depends(get_db)): |
| """ |
| Update an existing membership plan. |
| """ |
| db_plan = db.query(MembershipPlan).filter(MembershipPlan.id == plan_id).first() |
| if not db_plan: |
| raise HTTPException(status_code=404, detail="Plan not found") |
|
|
| |
| is_default = plan.get("is_default", db_plan.is_default) |
| if is_default and not db_plan.is_default: |
| current_default_count = db.query(MembershipPlan).filter( |
| MembershipPlan.is_default == True, |
| MembershipPlan.id != plan_id |
| ).count() |
| if current_default_count >= 2: |
| raise HTTPException(status_code=400, detail="Maximum 2 default plans allowed. Please uncheck another default plan first.") |
|
|
| db_plan.name = plan.get("name", db_plan.name) |
| db_plan.billing_period = plan.get("billing_period", db_plan.billing_period) |
| db_plan.price = plan.get("price", db_plan.price) |
| db_plan.stripe_link = plan.get("stripe_link", db_plan.stripe_link) |
| db_plan.description = plan.get("description", db_plan.description) |
| db_plan.max_students = plan.get("max_students", db_plan.max_students) |
| db_plan.is_active = plan.get("is_active", db_plan.is_active) |
| db_plan.is_default = is_default |
|
|
| db.commit() |
| db.refresh(db_plan) |
|
|
| return { |
| "id": db_plan.id, |
| "name": db_plan.name, |
| "billing_period": db_plan.billing_period, |
| "price": float(db_plan.price), |
| "stripe_link": db_plan.stripe_link, |
| "description": db_plan.description, |
| "max_students": db_plan.max_students, |
| "is_active": db_plan.is_active, |
| "is_default": db_plan.is_default, |
| } |
|
|
|
|
| @app.delete("/api/admin/plans/{plan_id}") |
| def delete_plan(plan_id: int, db: Session = Depends(get_db)): |
| """ |
| Delete a membership plan. |
| """ |
| db_plan = db.query(MembershipPlan).filter(MembershipPlan.id == plan_id).first() |
| if not db_plan: |
| raise HTTPException(status_code=404, detail="Plan not found") |
|
|
| db.delete(db_plan) |
| db.commit() |
| return {"status": "deleted"} |
|
|
|
|
| |
|
|
| @app.get("/api/admin/products") |
| def get_admin_products(db: Session = Depends(get_db)): |
| products = db.query(Product).all() |
| return [ |
| { |
| "id": p.id, |
| "name": p.name, |
| "price": float(p.price), |
| "stripe_link": p.stripe_link, |
| "description": p.description, |
| "image_url": p.image_url, |
| "is_active": p.is_active, |
| "created_at": p.created_at.isoformat() if p.created_at else None, |
| "updated_at": p.updated_at.isoformat() if p.updated_at else None, |
| } |
| for p in products |
| ] |
|
|
|
|
| @app.post("/api/admin/products") |
| def create_product(product: dict, db: Session = Depends(get_db)): |
| """ |
| Create a new product. |
| Expects JSON with: name, price, stripe_link?, description?, image_url?, is_active? |
| """ |
| new_product = Product( |
| name=product["name"], |
| price=product["price"], |
| stripe_link=product.get("stripe_link"), |
| description=product.get("description"), |
| image_url=product.get("image_url"), |
| is_active=product.get("is_active", True), |
| ) |
| db.add(new_product) |
| db.commit() |
| db.refresh(new_product) |
|
|
| return { |
| "id": new_product.id, |
| "name": new_product.name, |
| "price": float(new_product.price), |
| "stripe_link": new_product.stripe_link, |
| "description": new_product.description, |
| "image_url": new_product.image_url, |
| "is_active": new_product.is_active, |
| "created_at": new_product.created_at.isoformat() if new_product.created_at else None, |
| "updated_at": new_product.updated_at.isoformat() if new_product.updated_at else None, |
| } |
|
|
|
|
| @app.put("/api/admin/products/{product_id}") |
| def update_product(product_id: int, product: dict, db: Session = Depends(get_db)): |
| """ |
| Update an existing product. |
| """ |
| db_product = db.query(Product).filter(Product.id == product_id).first() |
| if not db_product: |
| raise HTTPException(status_code=404, detail="Product not found") |
|
|
| db_product.name = product.get("name", db_product.name) |
| db_product.price = product.get("price", db_product.price) |
| db_product.stripe_link = product.get("stripe_link", db_product.stripe_link) |
| db_product.description = product.get("description", db_product.description) |
| db_product.image_url = product.get("image_url", db_product.image_url) |
| db_product.is_active = product.get("is_active", db_product.is_active) |
|
|
| db.commit() |
| db.refresh(db_product) |
|
|
| return { |
| "id": db_product.id, |
| "name": db_product.name, |
| "price": float(db_product.price), |
| "stripe_link": db_product.stripe_link, |
| "description": db_product.description, |
| "image_url": db_product.image_url, |
| "is_active": db_product.is_active, |
| "created_at": db_product.created_at.isoformat() if db_product.created_at else None, |
| "updated_at": db_product.updated_at.isoformat() if db_product.updated_at else None, |
| } |
|
|
|
|
| @app.delete("/api/admin/products/{product_id}") |
| def delete_product(product_id: int, db: Session = Depends(get_db)): |
| """ |
| Delete a product. |
| """ |
| db_product = db.query(Product).filter(Product.id == product_id).first() |
| if not db_product: |
| raise HTTPException(status_code=404, detail="Product not found") |
|
|
| db.delete(db_product) |
| db.commit() |
| return {"status": "deleted"} |
|
|
|
|
| |
|
|
| @app.get("/api/admin/classes", response_model=list[ClassOut]) |
| def get_admin_classes(db: Session = Depends(get_db)): |
| """ |
| List all classes (most recent first). |
| """ |
| classes = class_service.get_classes(db) |
| |
| result = [] |
| for cls in classes: |
| |
| days = cls.days_of_week.split(",") if cls.days_of_week else [] |
| class_time_parts = cls.class_time.split("-") if cls.class_time and "-" in cls.class_time else [cls.class_time, ""] |
| start_time = class_time_parts[0].strip() if len(class_time_parts) > 0 else "" |
| end_time = class_time_parts[1].strip() if len(class_time_parts) > 1 else "" |
| |
| schedule = [] |
| for day in days: |
| if day.strip(): |
| schedule.append({ |
| "day": day.strip(), |
| "start_time": start_time, |
| "end_time": end_time, |
| "time": f"{start_time}-{end_time}" if start_time and end_time else start_time or "", |
| }) |
| |
| |
| if not schedule: |
| schedule = [{"day": "Monday", "start_time": "", "end_time": "", "time": ""}] |
| |
| class_dict = { |
| "id": cls.id, |
| "name": cls.name, |
| "description": cls.description, |
| "class_time": cls.class_time, |
| "days_of_week": days, |
| "classes_per_week": cls.classes_per_week, |
| "max_students": cls.max_students, |
| "is_active": cls.is_active, |
| "coach_email": cls.coach_email, |
| "location": cls.location, |
| "created_at": cls.created_at, |
| "updated_at": cls.updated_at, |
| "membership_plan_ids": [ |
| cmp.membership_plan_id |
| for cmp in db.query(ClassMembershipPlan) |
| .filter(ClassMembershipPlan.class_id == cls.id) |
| .all() |
| ], |
| "schedule": schedule, |
| } |
| result.append(class_dict) |
| return result |
|
|
|
|
| @app.post("/api/admin/classes", response_model=ClassOut, status_code=201) |
| def create_admin_class( |
| class_in: ClassCreate, |
| db: Session = Depends(get_db), |
| ): |
| """ |
| Create a new class configuration. |
| """ |
| db_class = class_service.create_class(db, class_in) |
| |
| membership_plan_ids = [ |
| cmp.membership_plan_id |
| for cmp in db.query(ClassMembershipPlan) |
| .filter(ClassMembershipPlan.class_id == db_class.id) |
| .all() |
| ] |
| |
| days = db_class.days_of_week.split(",") if db_class.days_of_week else [] |
| class_time_parts = db_class.class_time.split("-") if db_class.class_time and "-" in db_class.class_time else [db_class.class_time, ""] |
| start_time = class_time_parts[0].strip() if len(class_time_parts) > 0 else "" |
| end_time = class_time_parts[1].strip() if len(class_time_parts) > 1 else "" |
| |
| schedule = [] |
| for day in days: |
| if day.strip(): |
| schedule.append({ |
| "day": day.strip(), |
| "start_time": start_time, |
| "end_time": end_time, |
| "time": f"{start_time}-{end_time}" if start_time and end_time else start_time or "", |
| }) |
| |
| if not schedule: |
| schedule = [{"day": "Monday", "start_time": "", "end_time": "", "time": ""}] |
| |
| |
| class_dict = { |
| "id": db_class.id, |
| "name": db_class.name, |
| "description": db_class.description, |
| "class_time": db_class.class_time, |
| "days_of_week": days, |
| "classes_per_week": db_class.classes_per_week, |
| "max_students": db_class.max_students, |
| "is_active": db_class.is_active, |
| "coach_email": db_class.coach_email, |
| "location": db_class.location, |
| "created_at": db_class.created_at, |
| "updated_at": db_class.updated_at, |
| "membership_plan_ids": membership_plan_ids, |
| "schedule": schedule, |
| } |
| return class_dict |
|
|
|
|
| @app.put("/api/admin/classes/{class_id}", response_model=ClassOut) |
| def update_admin_class( |
| class_id: int, |
| class_in: ClassUpdate, |
| db: Session = Depends(get_db), |
| ): |
| """ |
| Update an existing class. |
| """ |
| db_class = class_service.update_class(db, class_id, class_in) |
| if not db_class: |
| raise HTTPException(status_code=404, detail="Class not found") |
| |
| membership_plan_ids = [ |
| cmp.membership_plan_id |
| for cmp in db.query(ClassMembershipPlan) |
| .filter(ClassMembershipPlan.class_id == class_id) |
| .all() |
| ] |
| |
| days = db_class.days_of_week.split(",") if db_class.days_of_week else [] |
| class_time_parts = db_class.class_time.split("-") if db_class.class_time and "-" in db_class.class_time else [db_class.class_time, ""] |
| start_time = class_time_parts[0].strip() if len(class_time_parts) > 0 else "" |
| end_time = class_time_parts[1].strip() if len(class_time_parts) > 1 else "" |
| |
| schedule = [] |
| for day in days: |
| if day.strip(): |
| schedule.append({ |
| "day": day.strip(), |
| "start_time": start_time, |
| "end_time": end_time, |
| "time": f"{start_time}-{end_time}" if start_time and end_time else start_time or "", |
| }) |
| |
| if not schedule: |
| schedule = [{"day": "Monday", "start_time": "", "end_time": "", "time": ""}] |
| |
| |
| class_dict = { |
| "id": db_class.id, |
| "name": db_class.name, |
| "description": db_class.description, |
| "class_time": db_class.class_time, |
| "days_of_week": days, |
| "classes_per_week": db_class.classes_per_week, |
| "max_students": db_class.max_students, |
| "is_active": db_class.is_active, |
| "coach_email": db_class.coach_email, |
| "location": db_class.location, |
| "created_at": db_class.created_at, |
| "updated_at": db_class.updated_at, |
| "membership_plan_ids": membership_plan_ids, |
| "schedule": schedule, |
| } |
| return class_dict |
|
|
|
|
| @app.delete("/api/admin/classes/{class_id}", status_code=204) |
| def delete_admin_class( |
| class_id: int, |
| db: Session = Depends(get_db), |
| ): |
| """ |
| Delete a class and its enrollments. |
| """ |
| ok = class_service.delete_class(db, class_id) |
| if not ok: |
| raise HTTPException(status_code=404, detail="Class not found") |
| return None |
|
|
|
|
| @app.get( |
| "/api/admin/classes/{class_id}/enrollments", |
| response_model=list[ClassEnrollmentOut], |
| ) |
| def get_admin_class_enrollments( |
| class_id: int, |
| db: Session = Depends(get_db), |
| ): |
| """ |
| List all enrollments for a given class. |
| """ |
| db_class = class_service.get_class(db, class_id) |
| if not db_class: |
| raise HTTPException(status_code=404, detail="Class not found") |
|
|
| enrollments = class_service.get_enrollments(db, class_id) |
| return enrollments |
|
|
|
|
| @app.post( |
| "/api/admin/classes/{class_id}/invite", |
| response_model=ClassEnrollmentOut, |
| status_code=201, |
| ) |
| def invite_student_to_class( |
| class_id: int, |
| enrollment_in: ClassEnrollmentCreate, |
| db: Session = Depends(get_db), |
| ): |
| """ |
| Invite a student to a class by email. |
| Creates an 'invited' enrollment and sends an email (via email_service). |
| """ |
| enrollment = class_service.invite_student_to_class(db, class_id, enrollment_in) |
| if not enrollment: |
| raise HTTPException(status_code=404, detail="Class not found") |
| return enrollment |
|
|
|
|
| @app.post( |
| "/api/admin/classes/{class_id}/enroll", |
| response_model=ClassEnrollmentOut, |
| status_code=201, |
| ) |
| def enroll_student_to_class( |
| class_id: int, |
| enrollment_in: ClassEnrollmentCreate, |
| db: Session = Depends(get_db), |
| ): |
| """ |
| Directly enroll a student to a class (status='joined'). |
| No email is sent - this is for direct admin enrollment. |
| """ |
| enrollment = class_service.enroll_student_to_class(db, class_id, enrollment_in) |
| if not enrollment: |
| raise HTTPException(status_code=404, detail="Class not found") |
| return enrollment |
|
|
|
|
| @app.delete( |
| "/api/admin/classes/{class_id}/enrollments/{enrollment_id}", |
| status_code=204, |
| ) |
| def remove_student_from_class( |
| class_id: int, |
| enrollment_id: int, |
| db: Session = Depends(get_db), |
| ): |
| """ |
| Mark an enrollment as removed (does not physically delete the row). |
| """ |
| db_class = class_service.get_class(db, class_id) |
| if not db_class: |
| raise HTTPException(status_code=404, detail="Class not found") |
|
|
| ok = class_service.remove_enrollment(db, enrollment_id) |
| if not ok: |
| raise HTTPException(status_code=404, detail="Enrollment not found") |
| return None |
|
|
|
|
| |
|
|
| @app.get("/api/admin/memberships") |
| def get_admin_memberships(db: Session = Depends(get_db)): |
| memberships = db.query(Membership).all() |
| items = [ |
| { |
| "id": m.id, |
| "user_email": m.user_email, |
| "user_name": m.user_name, |
| "plan_name": m.plan_name, |
| "status": m.status, |
| "start_date": m.start_date.isoformat(), |
| "renewal_date": m.renewal_date.isoformat(), |
| "price": float(m.price), |
| "billing_period": m.billing_period, |
| "max_students": m.max_students, |
| "last_login_at": m.last_login_at.isoformat() if m.last_login_at else None, |
| "member_first_name": m.member_first_name, |
| "member_last_name": m.member_last_name, |
| "phone_number": m.phone_number, |
| "emergency_contact_person": m.emergency_contact_person, |
| "emergency_contact_number": m.emergency_contact_number, |
| } |
| for m in memberships |
| ] |
| return { |
| "items": items, |
| "total": len(items), |
| "page": 1, |
| "page_size": len(items), |
| } |
|
|
|
|
| @app.put("/api/admin/memberships/{membership_id}") |
| def update_membership(membership_id: int, membership_data: dict, db: Session = Depends(get_db)): |
| """Update a membership.""" |
| db_membership = db.query(Membership).filter(Membership.id == membership_id).first() |
| if not db_membership: |
| raise HTTPException(status_code=404, detail="Membership not found") |
| |
| if "status" in membership_data: |
| db_membership.status = membership_data["status"] |
| if "start_date" in membership_data: |
| if membership_data["start_date"]: |
| try: |
| from datetime import datetime as dt |
| db_membership.start_date = dt.strptime(membership_data["start_date"], "%Y-%m-%d").date() |
| except: |
| pass |
| if "renewal_date" in membership_data: |
| if membership_data["renewal_date"]: |
| try: |
| from datetime import datetime as dt |
| db_membership.renewal_date = dt.strptime(membership_data["renewal_date"], "%Y-%m-%d").date() |
| except: |
| pass |
| if "price" in membership_data: |
| db_membership.price = float(membership_data["price"]) |
| |
| db.commit() |
| db.refresh(db_membership) |
| return { |
| "id": db_membership.id, |
| "user_email": db_membership.user_email, |
| "user_name": db_membership.user_name, |
| "plan_name": db_membership.plan_name, |
| "status": db_membership.status, |
| "start_date": db_membership.start_date.isoformat(), |
| "renewal_date": db_membership.renewal_date.isoformat(), |
| "price": float(db_membership.price), |
| "billing_period": db_membership.billing_period, |
| "max_students": db_membership.max_students, |
| } |
|
|
|
|
| @app.delete("/api/admin/memberships/{membership_id}") |
| def delete_membership(membership_id: int, db: Session = Depends(get_db)): |
| """Delete a membership (cascade deletes students).""" |
| db_membership = db.query(Membership).filter(Membership.id == membership_id).first() |
| if not db_membership: |
| raise HTTPException(status_code=404, detail="Membership not found") |
| |
| db.delete(db_membership) |
| db.commit() |
| return {"status": "ok", "message": "Membership deleted"} |
|
|
|
|
| @app.get("/api/admin/renewals") |
| def get_admin_renewals(db: Session = Depends(get_db)): |
| today = date.today() |
| in_30 = today + timedelta(days=30) |
|
|
| memberships = ( |
| db.query(Membership) |
| .filter(Membership.status == "active") |
| .filter(Membership.renewal_date >= today) |
| .filter(Membership.renewal_date <= in_30) |
| .all() |
| ) |
|
|
| return [ |
| { |
| "membership_id": m.id, |
| "user_name": m.user_name, |
| "user_email": m.user_email, |
| "plan_name": m.plan_name, |
| "status": m.status, |
| "renewal_date": m.renewal_date.isoformat(), |
| "days_until_renewal": (m.renewal_date - today).days, |
| "price": float(m.price), |
| } |
| for m in memberships |
| ] |
|
|
|
|
| @app.get("/api/admin/at-risk") |
| def get_admin_at_risk(db: Session = Depends(get_db)): |
| today = date.today() |
| in_14 = today + timedelta(days=14) |
|
|
| memberships = ( |
| db.query(Membership) |
| .filter(Membership.status == "active") |
| .filter(Membership.renewal_date >= today) |
| .filter(Membership.renewal_date <= in_14) |
| .all() |
| ) |
|
|
| results = [] |
| for m in memberships: |
| days_left = (m.renewal_date - today).days |
| risk_score = max(0.0, 1.0 - days_left / 14.0) |
| if risk_score > 0.66: |
| risk_level = "high" |
| elif risk_score > 0.33: |
| risk_level = "medium" |
| else: |
| risk_level = "low" |
|
|
| results.append( |
| { |
| "membership_id": m.id, |
| "user_name": m.user_name, |
| "user_email": m.user_email, |
| "plan_name": m.plan_name, |
| "status": m.status, |
| "renewal_date": m.renewal_date.isoformat(), |
| "risk_score": round(risk_score, 2), |
| "risk_level": risk_level, |
| "reasons": [ |
| f"Renewal in {days_left} days", |
| ], |
| } |
| ) |
| return results |
|
|
|
|
| |
|
|
| @app.get("/api/student/plans") |
| def get_student_plans(db: Session = Depends(get_db)): |
| |
| default_plans = db.query(MembershipPlan).filter( |
| MembershipPlan.is_active == True, |
| MembershipPlan.is_default == True |
| ).all() |
| other_plans = db.query(MembershipPlan).filter( |
| MembershipPlan.is_active == True, |
| MembershipPlan.is_default == False |
| ).all() |
| |
| |
| all_plans = list(default_plans) + list(other_plans) |
| |
| return [ |
| { |
| "id": p.id, |
| "name": p.name, |
| "billing_period": p.billing_period, |
| "price": float(p.price), |
| "stripe_link": p.stripe_link, |
| "description": p.description, |
| "max_students": p.max_students, |
| "is_active": p.is_active, |
| "is_default": p.is_default, |
| "is_current": False, |
| } |
| for p in all_plans |
| ] |
|
|
|
|
| @app.get("/api/student/invites") |
| def get_student_invites(email: str, db: Session = Depends(get_db)): |
| """Get pending invites for a student email.""" |
| invites = ( |
| db.query(MembershipInvite) |
| .filter(MembershipInvite.email == email) |
| .filter(MembershipInvite.status == "pending") |
| .order_by(MembershipInvite.created_at.desc()) |
| .all() |
| ) |
| |
| |
| result = [] |
| for i in invites: |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == i.plan_id).first() if i.plan_id else None |
| result.append({ |
| "id": i.id, |
| "email": i.email, |
| "plan_id": i.plan_id, |
| "plan_name": i.plan_name, |
| "plan_price": i.plan_price, |
| "max_students": i.max_students, |
| "class_details": i.class_details, |
| "status": i.status, |
| "invite_token": i.invite_token, |
| "invited_by": i.invited_by, |
| "invite_date": i.invite_date.isoformat() if i.invite_date else None, |
| "created_at": i.created_at.isoformat() if i.created_at else None, |
| "billing_period": plan.billing_period if plan else "monthly", |
| "stripe_link": plan.stripe_link if plan else None, |
| }) |
| |
| return result |
|
|
|
|
| @app.post("/api/student/memberships/{membership_id}/pause-request") |
| def request_pause_membership(membership_id: int, db: Session = Depends(get_db)): |
| """Submit a pause request for membership. Requires admin approval.""" |
| membership = db.query(Membership).filter(Membership.id == membership_id).first() |
| if not membership: |
| raise HTTPException(status_code=404, detail="Membership not found") |
| |
| if membership.status not in ["active", "paused"]: |
| raise HTTPException(status_code=400, detail="Only active or paused memberships can request pause") |
| |
| |
| existing_request = db.query(MembershipRequest).filter( |
| MembershipRequest.membership_id == membership_id, |
| MembershipRequest.request_type == "pause", |
| MembershipRequest.status == "pending" |
| ).first() |
| |
| if existing_request: |
| raise HTTPException(status_code=400, detail="A pause request is already pending") |
| |
| |
| if membership.paused_at: |
| raise HTTPException(status_code=400, detail="Membership can only be paused once per period") |
| |
| |
| pause_request = MembershipRequest( |
| membership_id=membership_id, |
| request_type="pause", |
| status="pending", |
| requested_by=membership.user_email |
| ) |
| db.add(pause_request) |
| db.commit() |
| db.refresh(pause_request) |
| |
| return { |
| "status": "ok", |
| "message": "Pause request submitted. Waiting for admin approval.", |
| "request_id": pause_request.id |
| } |
|
|
|
|
| @app.post("/api/student/memberships/{membership_id}/cancel-request") |
| def request_cancel_membership(membership_id: int, db: Session = Depends(get_db)): |
| """Submit a cancellation request for membership. Requires admin approval.""" |
| membership = db.query(Membership).filter(Membership.id == membership_id).first() |
| if not membership: |
| raise HTTPException(status_code=404, detail="Membership not found") |
| |
| if membership.status != "active": |
| raise HTTPException(status_code=400, detail="Only active memberships can request cancellation") |
| |
| |
| existing_request = db.query(MembershipRequest).filter( |
| MembershipRequest.membership_id == membership_id, |
| MembershipRequest.request_type == "cancel", |
| MembershipRequest.status == "pending" |
| ).first() |
| |
| if existing_request: |
| raise HTTPException(status_code=400, detail="A cancellation request is already pending") |
| |
| |
| cancel_request = MembershipRequest( |
| membership_id=membership_id, |
| request_type="cancel", |
| status="pending", |
| requested_by=membership.user_email |
| ) |
| db.add(cancel_request) |
| db.commit() |
| db.refresh(cancel_request) |
| |
| return { |
| "status": "ok", |
| "message": "Cancellation request submitted. Waiting for admin approval.", |
| "request_id": cancel_request.id |
| } |
|
|
|
|
| |
|
|
| @app.get("/api/student/requests") |
| def get_student_requests(email: str, db: Session = Depends(get_db)): |
| """Get all requests for a student (by email).""" |
| |
| memberships = db.query(Membership).filter(Membership.user_email == email).all() |
| membership_ids = [m.id for m in memberships] |
| |
| if not membership_ids: |
| return [] |
| |
| requests = db.query(MembershipRequest).filter( |
| MembershipRequest.membership_id.in_(membership_ids) |
| ).order_by(MembershipRequest.requested_at.desc()).all() |
| |
| return [ |
| { |
| "id": r.id, |
| "membership_id": r.membership_id, |
| "request_type": r.request_type, |
| "status": r.status, |
| "requested_at": r.requested_at.isoformat() if r.requested_at else None, |
| "approved_at": r.approved_at.isoformat() if r.approved_at else None, |
| "rejected_at": r.rejected_at.isoformat() if r.rejected_at else None, |
| "withdrawn_at": r.withdrawn_at.isoformat() if r.withdrawn_at else None, |
| "notes": r.notes, |
| } |
| for r in requests |
| ] |
|
|
|
|
| @app.get("/api/student/memberships/{membership_id}/requests") |
| def get_membership_requests(membership_id: int, db: Session = Depends(get_db)): |
| """Get all requests for a membership.""" |
| requests = db.query(MembershipRequest).filter( |
| MembershipRequest.membership_id == membership_id |
| ).order_by(MembershipRequest.requested_at.desc()).all() |
| |
| return [ |
| { |
| "id": r.id, |
| "request_type": r.request_type, |
| "status": r.status, |
| "requested_at": r.requested_at.isoformat() if r.requested_at else None, |
| "approved_at": r.approved_at.isoformat() if r.approved_at else None, |
| "rejected_at": r.rejected_at.isoformat() if r.rejected_at else None, |
| "withdrawn_at": r.withdrawn_at.isoformat() if r.withdrawn_at else None, |
| "notes": r.notes, |
| } |
| for r in requests |
| ] |
|
|
|
|
| @app.post("/api/student/memberships/{membership_id}/requests/{request_id}/withdraw") |
| def withdraw_request(membership_id: int, request_id: int, db: Session = Depends(get_db)): |
| """Withdraw a pending request.""" |
| request = db.query(MembershipRequest).filter( |
| MembershipRequest.id == request_id, |
| MembershipRequest.membership_id == membership_id, |
| MembershipRequest.status == "pending" |
| ).first() |
| |
| if not request: |
| raise HTTPException(status_code=404, detail="Request not found or already processed") |
| |
| request.status = "withdrawn" |
| request.withdrawn_at = datetime.utcnow() |
| db.commit() |
| |
| return {"status": "ok", "message": "Request withdrawn"} |
|
|
|
|
| @app.post("/api/student/memberships/{membership_id}/resume") |
| def resume_membership(membership_id: int, db: Session = Depends(get_db)): |
| """Resume a paused membership.""" |
| membership = db.query(Membership).filter(Membership.id == membership_id).first() |
| if not membership: |
| raise HTTPException(status_code=404, detail="Membership not found") |
| |
| if membership.status != "paused": |
| raise HTTPException(status_code=400, detail="Only paused memberships can be resumed") |
| |
| if not membership.paused_at: |
| raise HTTPException(status_code=400, detail="Membership pause date not found") |
| |
| |
| |
| original_renewal_date = membership.renewal_date - timedelta(days=30) |
| |
| |
| resume_date = date.today() |
| actual_pause_duration = (resume_date - membership.paused_at).days |
| |
| |
| new_renewal_date = original_renewal_date + timedelta(days=actual_pause_duration) |
| |
| |
| |
| membership.status = "active" |
| membership.renewal_date = new_renewal_date |
| |
| db.commit() |
| db.refresh(membership) |
| |
| return { |
| "status": "ok", |
| "message": "Membership resumed", |
| "renewal_date": membership.renewal_date.isoformat() |
| } |
|
|
|
|
| @app.post("/api/student/memberships/{membership_id}/reactivate") |
| def reactivate_membership(membership_id: int, db: Session = Depends(get_db)): |
| """Reactivate a cancelled membership. Creates a checkout session for payment.""" |
| membership = db.query(Membership).filter(Membership.id == membership_id).first() |
| if not membership: |
| raise HTTPException(status_code=404, detail="Membership not found") |
| |
| if membership.status != "cancelled": |
| raise HTTPException(status_code=400, detail="Only cancelled memberships can be reactivated") |
| |
| |
| today = date.today() |
| if membership.renewal_date and membership.renewal_date < today: |
| raise HTTPException(status_code=400, detail="Cannot reactivate membership after renewal date has passed") |
| |
| |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == membership.plan_id).first() |
| if not plan: |
| raise HTTPException(status_code=404, detail="Plan not found") |
| |
| |
| |
| try: |
| checkout_session = stripe.checkout.Session.create( |
| payment_method_types=["card"], |
| line_items=[ |
| { |
| "price_data": { |
| "currency": "cad", |
| "product_data": { |
| "name": f"Reactivate {membership.plan_name}", |
| "description": f"Reactivate membership - starts from {membership.renewal_date.isoformat()}", |
| }, |
| "unit_amount": int(float(membership.price) * 100), |
| "recurring": { |
| "interval": membership.billing_period.replace("_", ""), |
| }, |
| }, |
| "quantity": 1, |
| } |
| ], |
| mode="subscription", |
| success_url=f"{os.getenv('BASE_URL', 'http://localhost:3000')}/student?reactivated=true", |
| cancel_url=f"{os.getenv('BASE_URL', 'http://localhost:3000')}/student", |
| customer_email=membership.user_email, |
| metadata={ |
| "membership_id": str(membership_id), |
| "type": "reactivation", |
| "start_date": membership.renewal_date.isoformat(), |
| }, |
| ) |
| |
| return { |
| "status": "ok", |
| "checkout_url": checkout_session.url, |
| "message": "Checkout session created for reactivation" |
| } |
| except Exception as e: |
| print(f"[reactivate] Error creating checkout session: {e}") |
| raise HTTPException(status_code=500, detail=f"Error creating payment session: {str(e)}") |
|
|
|
|
| |
|
|
| @app.get("/api/admin/requests") |
| def get_admin_requests(db: Session = Depends(get_db)): |
| """Get all pending membership requests.""" |
| requests = db.query(MembershipRequest).filter( |
| MembershipRequest.status == "pending" |
| ).order_by(MembershipRequest.requested_at.desc()).all() |
| |
| result = [] |
| for r in requests: |
| membership = db.query(Membership).filter(Membership.id == r.membership_id).first() |
| if membership: |
| result.append({ |
| "id": r.id, |
| "membership_id": r.membership_id, |
| "request_type": r.request_type, |
| "status": r.status, |
| "requested_at": r.requested_at.isoformat() if r.requested_at else None, |
| "requested_by": r.requested_by, |
| "member_name": f"{membership.member_first_name or ''} {membership.member_last_name or ''}".strip() or membership.user_name, |
| "member_email": membership.user_email, |
| "plan_name": membership.plan_name, |
| "membership_status": membership.status, |
| "renewal_date": membership.renewal_date.isoformat() if membership.renewal_date else None, |
| }) |
| |
| return result |
|
|
|
|
| @app.post("/api/admin/requests/{request_id}/approve") |
| def approve_request(request_id: int, payload: dict = None, db: Session = Depends(get_db)): |
| """Approve a membership request and execute it.""" |
| request = db.query(MembershipRequest).filter( |
| MembershipRequest.id == request_id, |
| MembershipRequest.status == "pending" |
| ).first() |
| |
| if not request: |
| raise HTTPException(status_code=404, detail="Request not found or already processed") |
| |
| membership = db.query(Membership).filter(Membership.id == request.membership_id).first() |
| if not membership: |
| raise HTTPException(status_code=404, detail="Membership not found") |
| |
| stored_admin = {} |
| admin_name = payload.get("approved_by", "admin") if payload else "admin" |
| |
| if request.request_type == "pause": |
| |
| pause_date = date.today() |
| |
| |
| original_renewal_date = membership.renewal_date |
| |
| |
| new_renewal_date = original_renewal_date + timedelta(days=30) |
| |
| membership.renewal_date = new_renewal_date |
| membership.status = "paused" |
| membership.paused_at = pause_date |
| |
| elif request.request_type == "cancel": |
| |
| |
| try: |
| customers = stripe.Customer.list(email=membership.user_email, limit=1) |
| if customers.data: |
| customer = customers.data[0] |
| subscriptions = stripe.Subscription.list(customer=customer.id, status="active", limit=1) |
| if subscriptions.data: |
| subscription = subscriptions.data[0] |
| stripe.Subscription.modify( |
| subscription.id, |
| cancel_at_period_end=True |
| ) |
| print(f"[approve cancel] Stripe subscription {subscription.id} set to cancel at period end") |
| except Exception as e: |
| print(f"[approve cancel] Error canceling Stripe subscription: {e}") |
| |
| membership.status = "cancelled" |
| membership.cancelled_at = date.today() |
| |
| |
| request.status = "approved" |
| request.approved_at = datetime.utcnow() |
| request.approved_by = admin_name |
| request.notes = payload.get("notes", "") if payload else None |
| |
| db.commit() |
| db.refresh(membership) |
| db.refresh(request) |
| |
| return { |
| "status": "ok", |
| "message": f"{request.request_type.capitalize()} request approved and executed", |
| "membership_status": membership.status |
| } |
|
|
|
|
| @app.post("/api/admin/requests/{request_id}/reject") |
| def reject_request(request_id: int, payload: dict = None, db: Session = Depends(get_db)): |
| """Reject a membership request.""" |
| request = db.query(MembershipRequest).filter( |
| MembershipRequest.id == request_id, |
| MembershipRequest.status == "pending" |
| ).first() |
| |
| if not request: |
| raise HTTPException(status_code=404, detail="Request not found or already processed") |
| |
| stored_admin = {} |
| admin_name = payload.get("rejected_by", "admin") if payload else "admin" |
| |
| request.status = "rejected" |
| request.rejected_at = datetime.utcnow() |
| request.rejected_by = admin_name |
| request.notes = payload.get("notes", "") if payload else None |
| |
| db.commit() |
| db.refresh(request) |
| |
| return {"status": "ok", "message": "Request rejected"} |
|
|
|
|
| @app.get("/api/student/memberships") |
| def get_student_memberships(email: str, db: Session = Depends(get_db)): |
| memberships = db.query(Membership).filter(Membership.user_email == email).all() |
| result = [] |
| for m in memberships: |
| membership_data = { |
| "id": m.id, |
| "plan_name": m.plan_name, |
| "status": m.status, |
| "start_date": m.start_date.isoformat(), |
| "renewal_date": m.renewal_date.isoformat(), |
| "price": float(m.price), |
| "billing_period": m.billing_period, |
| "max_students": m.max_students, |
| "cancelled_at": m.cancelled_at.isoformat() if m.cancelled_at else None, |
| "paused_at": m.paused_at.isoformat() if m.paused_at else None, |
| } |
| result.append(membership_data) |
| |
| |
| if m.cancelled_at: |
| result.append({ |
| "id": f"{m.id}_cancelled", |
| "plan_name": m.plan_name, |
| "status": "cancelled", |
| "start_date": m.cancelled_at.isoformat(), |
| "renewal_date": m.renewal_date.isoformat(), |
| "price": float(m.price), |
| "billing_period": m.billing_period, |
| "max_students": m.max_students, |
| "cancelled_at": m.cancelled_at.isoformat(), |
| "is_cancellation_entry": True, |
| }) |
| |
| |
| if m.paused_at: |
| result.append({ |
| "id": f"{m.id}_paused", |
| "plan_name": m.plan_name, |
| "status": "paused", |
| "start_date": m.paused_at.isoformat(), |
| "renewal_date": m.renewal_date.isoformat(), |
| "price": float(m.price), |
| "billing_period": m.billing_period, |
| "max_students": m.max_students, |
| "paused_at": m.paused_at.isoformat(), |
| "is_pause_entry": True, |
| }) |
| |
| return result |
|
|
|
|
| @app.get("/api/student/students") |
| def get_student_students(email: str, db: Session = Depends(get_db)): |
| """Get all students for a membership email.""" |
| |
| membership = db.query(Membership).filter(Membership.user_email == email).first() |
| if not membership: |
| return [] |
| |
| |
| students = db.query(Student).filter(Student.membership_id == membership.id).all() |
| result = [] |
| for s in students: |
| result.append({ |
| "id": s.id, |
| "membership_id": s.membership_id, |
| "first_name": s.first_name, |
| "last_name": s.last_name, |
| "date_of_birth": s.date_of_birth.isoformat() if s.date_of_birth else None, |
| "gender": s.gender, |
| "medical_notes": s.medical_notes, |
| "created_at": s.created_at.isoformat() if s.created_at else None, |
| }) |
| return result |
|
|
|
|
| @app.get("/api/student/enrollments") |
| def get_student_enrollments(email: str, db: Session = Depends(get_db)): |
| """Get all class enrollments for students under a membership email.""" |
| from models.class_ import ClassEnrollment, Class |
| |
| |
| membership = db.query(Membership).filter(Membership.user_email == email).first() |
| if not membership: |
| return [] |
| |
| |
| students = db.query(Student).filter(Student.membership_id == membership.id).all() |
| student_ids = [s.id for s in students] |
| |
| if not student_ids: |
| return [] |
| |
| |
| enrollments = ( |
| db.query(ClassEnrollment) |
| .filter(ClassEnrollment.student_id.in_(student_ids)) |
| .filter(ClassEnrollment.status != "removed") |
| .all() |
| ) |
| |
| result = [] |
| for e in enrollments: |
| |
| class_obj = db.query(Class).filter(Class.id == e.class_id).first() |
| result.append({ |
| "id": e.id, |
| "class_id": e.class_id, |
| "class_name": class_obj.name if class_obj else "Unknown Class", |
| "student_id": e.student_id, |
| "student_name": e.student_name, |
| "student_email": e.student_email, |
| "status": e.status, |
| "joined_at": e.joined_at.isoformat() if e.joined_at else None, |
| }) |
| return result |
|
|
|
|
| @app.get("/api/student/classes") |
| def get_student_classes(email: str, db: Session = Depends(get_db)): |
| """Get all unique classes that students under a membership are enrolled in.""" |
| from models.class_ import ClassEnrollment, Class |
| |
| |
| membership = db.query(Membership).filter(Membership.user_email == email).first() |
| if not membership: |
| return [] |
| |
| |
| students = db.query(Student).filter(Student.membership_id == membership.id).all() |
| student_ids = [s.id for s in students] |
| |
| if not student_ids: |
| return [] |
| |
| |
| enrollments = ( |
| db.query(ClassEnrollment) |
| .filter(ClassEnrollment.student_id.in_(student_ids)) |
| .filter(ClassEnrollment.status == "joined") |
| .all() |
| ) |
| |
| |
| class_ids = list(set([e.class_id for e in enrollments])) |
| |
| if not class_ids: |
| return [] |
| |
| |
| classes = db.query(Class).filter(Class.id.in_(class_ids), Class.is_active == True).all() |
| |
| result = [] |
| for c in classes: |
| |
| class_enrollments = [ |
| e for e in enrollments |
| if e.class_id == c.id and e.student_id in student_ids |
| ] |
| |
| result.append({ |
| "id": c.id, |
| "name": c.name, |
| "description": c.description, |
| "class_time": c.class_time, |
| "days_of_week": c.days_of_week.split(",") if c.days_of_week else [], |
| "max_students": c.max_students, |
| "student_count": len(class_enrollments), |
| }) |
| |
| return result |
|
|
|
|
| @app.get("/api/student/classes/{class_id}/attendance") |
| def get_student_class_attendance(class_id: int, email: str, db: Session = Depends(get_db)): |
| """Get attendance history for all students under a membership in a specific class.""" |
| from models.class_ import ClassEnrollment, Class |
| |
| |
| membership = db.query(Membership).filter(Membership.user_email == email).first() |
| if not membership: |
| raise HTTPException(status_code=404, detail="Membership not found") |
| |
| |
| class_obj = db.query(Class).filter(Class.id == class_id, Class.is_active == True).first() |
| if not class_obj: |
| raise HTTPException(status_code=404, detail="Class not found") |
| |
| |
| students = db.query(Student).filter(Student.membership_id == membership.id).all() |
| student_ids = [s.id for s in students] |
| |
| if not student_ids: |
| return { |
| "class": { |
| "id": class_obj.id, |
| "name": class_obj.name, |
| "description": class_obj.description, |
| "class_time": class_obj.class_time, |
| "days_of_week": class_obj.days_of_week.split(",") if class_obj.days_of_week else [], |
| }, |
| "students": [], |
| } |
| |
| |
| enrollments = ( |
| db.query(ClassEnrollment) |
| .filter(ClassEnrollment.class_id == class_id) |
| .filter(ClassEnrollment.student_id.in_(student_ids)) |
| .filter(ClassEnrollment.status == "joined") |
| .all() |
| ) |
| |
| |
| enrollment_student_ids = {e.student_id for e in enrollments if e.student_id is not None} |
| |
| |
| |
| |
| attendance_records = [] |
| if student_ids: |
| attendance_records = ( |
| db.query(Attendance) |
| .filter(Attendance.class_id == class_id) |
| .filter(Attendance.student_id.in_(student_ids)) |
| .order_by(Attendance.attendance_date.desc()) |
| .all() |
| ) |
| |
| |
| students_data = [] |
| |
| enrollment_email_map = {e.student_id: e.student_email for e in enrollments if e.student_id} |
| |
| for student in students: |
| |
| if student.id not in enrollment_student_ids: |
| continue |
| |
| |
| |
| student_attendance = [ |
| { |
| "id": a.id, |
| "attendance_date": a.attendance_date.isoformat(), |
| "is_present": a.is_present, |
| "coach_email": a.coach_email, |
| } |
| for a in attendance_records |
| if a.student_id == student.id |
| ] |
| |
| |
| student_email = enrollment_email_map.get(student.id, "") |
| |
| students_data.append({ |
| "id": student.id, |
| "first_name": student.first_name, |
| "last_name": student.last_name, |
| "email": student_email, |
| "attendance_history": student_attendance, |
| }) |
| |
| return { |
| "class": { |
| "id": class_obj.id, |
| "name": class_obj.name, |
| "description": class_obj.description, |
| "class_time": class_obj.class_time, |
| "days_of_week": class_obj.days_of_week.split(",") if class_obj.days_of_week else [], |
| }, |
| "students": students_data, |
| } |
|
|
|
|
| @app.get("/api/student/upcoming-renewal") |
| def get_student_upcoming_renewal(email: str, db: Session = Depends(get_db)): |
| today = date.today() |
| membership = ( |
| db.query(Membership) |
| .filter(Membership.user_email == email) |
| .filter(Membership.status == "active") |
| .filter(Membership.renewal_date >= today) |
| .order_by(Membership.renewal_date.asc()) |
| .first() |
| ) |
|
|
| if not membership: |
| raise HTTPException(status_code=404, detail="No upcoming renewal") |
|
|
| return { |
| "membership_id": membership.id, |
| "plan_name": membership.plan_name, |
| "status": membership.status, |
| "renewal_date": membership.renewal_date.isoformat(), |
| "days_until_renewal": (membership.renewal_date - today).days, |
| "price": float(membership.price), |
| "billing_period": membership.billing_period, |
| } |
|
|
|
|
| @app.get("/api/student/products") |
| def get_student_products(db: Session = Depends(get_db)): |
| products = db.query(Product).filter(Product.is_active == True).all() |
| return [ |
| { |
| "id": p.id, |
| "name": p.name, |
| "price": float(p.price), |
| "stripe_link": p.stripe_link, |
| "description": p.description, |
| "image_url": p.image_url, |
| } |
| for p in products |
| ] |
|
|
|
|
| @app.get("/api/student/exam-registrations") |
| def get_student_exam_registrations(email: str, db: Session = Depends(get_db)): |
| """Get exam registrations for a student by email.""" |
| |
| registrations = db.query(ExamStudentRegistration).filter( |
| ExamStudentRegistration.student_email == email |
| ).all() |
| |
| result = [] |
| for reg in registrations: |
| |
| exam = db.query(Exam).filter(Exam.id == reg.exam_id).first() |
| |
| batch = db.query(ExamBatch).filter(ExamBatch.id == reg.batch_id).first() |
| |
| if exam and batch: |
| result.append({ |
| "id": reg.id, |
| "exam_id": reg.exam_id, |
| "batch_id": reg.batch_id, |
| "student_id": reg.student_id, |
| "student_name": reg.student_name, |
| "student_email": reg.student_email, |
| "current_level": reg.current_level, |
| "target_level": reg.target_level, |
| "registration_status": reg.registration_status, |
| "payment_status": reg.payment_status, |
| "notification_sent": reg.notification_sent, |
| "added_by_ai": reg.added_by_ai, |
| "created_at": reg.created_at.isoformat() if reg.created_at else None, |
| "exam": { |
| "id": exam.id, |
| "name": exam.name, |
| "exam_date": exam.exam_date.isoformat() if exam.exam_date else None, |
| "exam_fee": float(exam.exam_fee), |
| "registration_deadline": exam.registration_deadline.isoformat() if exam.registration_deadline else None, |
| "stripe_link": exam.stripe_link, |
| "location": exam.location, |
| "description": exam.description, |
| "status": exam.status, |
| }, |
| "batch": { |
| "id": batch.id, |
| "batch_name": batch.batch_name, |
| "time_from": batch.time_from.strftime("%H:%M:%S") if batch.time_from else None, |
| "time_to": batch.time_to.strftime("%H:%M:%S") if batch.time_to else None, |
| "max_students": batch.max_students, |
| } |
| }) |
| |
| return result |
|
|
|
|
| |
|
|
| @app.post("/api/stripe/webhook") |
| async def stripe_webhook(request: Request, db: Session = Depends(get_db)): |
| payload = await request.body() |
| sig_header = request.headers.get("stripe-signature") |
|
|
| try: |
| if settings.STRIPE_WEBHOOK_SECRET: |
| event = stripe.Webhook.construct_event( |
| payload=payload, |
| sig_header=sig_header, |
| secret=settings.STRIPE_WEBHOOK_SECRET, |
| ) |
| else: |
| event = stripe.Event.construct_from( |
| await request.json(), stripe.api_key |
| ) |
| except ValueError: |
| raise HTTPException(status_code=400, detail="Invalid payload") |
| except stripe.error.SignatureVerificationError: |
| raise HTTPException(status_code=400, detail="Invalid signature") |
| |
| |
| event_type = event.get("type") |
| print(f"[webhook] Received event: {event_type}") |
| |
| data = event["data"]["object"] |
|
|
| if event_type == "checkout.session.completed": |
| customer_details = data.get("customer_details", {}) or {} |
| email = customer_details.get("email") |
| name = customer_details.get("name") or "Student" |
| |
| |
| if not email: |
| email = data.get("customer_email") or data.get("email") |
| |
| print(f"[webhook] checkout.session.completed - email={email}, name={name}") |
|
|
| metadata = data.get("metadata", {}) or {} |
|
|
| |
| invite_token = metadata.get("invite_token") |
| plan_name = metadata.get("plan_name") |
|
|
| |
| exam_registration_id = metadata.get("exam_registration_id") |
| if exam_registration_id: |
| try: |
| reg_id = int(exam_registration_id) |
| registration = db.query(ExamStudentRegistration).filter( |
| ExamStudentRegistration.id == reg_id |
| ).first() |
| if registration: |
| |
| registration.payment_status = "paid" |
| registration.registration_status = "registered" |
| db.commit() |
| print(f"[webhook] Exam payment processed via metadata: registration_id={reg_id}") |
| return {"received": True, "type": "exam_payment"} |
| except (ValueError, TypeError) as e: |
| print(f"[webhook] Error processing exam_registration_id: {e}") |
| pass |
| |
| |
| |
| |
| |
| if not invite_token and not plan_name: |
| amount_total = data.get("amount_total") |
| if amount_total and email: |
| amount_decimal = float(amount_total) / 100.0 |
| print(f"[webhook] Checking exam payment: email={email}, amount={amount_decimal}") |
| |
| |
| |
| |
| pending_regs = ( |
| db.query(ExamStudentRegistration) |
| .join(Exam) |
| .filter( |
| func.lower(ExamStudentRegistration.student_email) == func.lower(email), |
| ExamStudentRegistration.payment_status.in_(["pending", "unpaid"]), |
| ExamStudentRegistration.registration_status.in_(["invited", "pending"]), |
| func.abs(Exam.exam_fee - amount_decimal) < 0.01 |
| ) |
| .order_by(ExamStudentRegistration.created_at.desc()) |
| .all() |
| ) |
| |
| if pending_regs: |
| |
| registration = pending_regs[0] |
| registration.payment_status = "paid" |
| registration.registration_status = "registered" |
| db.commit() |
| print(f"[webhook] Exam payment processed via amount/email match: registration_id={registration.id}, email={email}, amount={amount_decimal}") |
| return {"received": True, "type": "exam_payment"} |
| else: |
| print(f"[webhook] No matching exam registration found with exact amount: email={email}, amount={amount_decimal}") |
| |
| |
| pending_by_email = ( |
| db.query(ExamStudentRegistration) |
| .join(Exam) |
| .filter( |
| func.lower(ExamStudentRegistration.student_email) == func.lower(email), |
| ExamStudentRegistration.payment_status.in_(["pending", "unpaid"]), |
| ExamStudentRegistration.registration_status.in_(["invited", "pending"]) |
| ) |
| .order_by(ExamStudentRegistration.created_at.desc()) |
| .all() |
| ) |
| if pending_by_email: |
| |
| registration = pending_by_email[0] |
| exam_fee = registration.exam.exam_fee if registration.exam else None |
| registration.payment_status = "paid" |
| registration.registration_status = "registered" |
| db.commit() |
| print(f"[webhook] Exam payment processed via email match (amount mismatch): registration_id={registration.id}, email={email}, expected_amount={amount_decimal}, exam_fee={exam_fee}") |
| return {"received": True, "type": "exam_payment"} |
| else: |
| print(f"[webhook] No unpaid exam registrations found for email: {email}") |
|
|
| |
| reactivation_type = metadata.get("type") == "reactivation" |
| if reactivation_type: |
| membership_id = metadata.get("membership_id") |
| start_date_str = metadata.get("start_date") |
| |
| if membership_id and start_date_str: |
| try: |
| membership = db.query(Membership).filter(Membership.id == int(membership_id)).first() |
| if membership and membership.status == "cancelled": |
| |
| start_date = datetime.fromisoformat(start_date_str.replace('Z', '+00:00')).date() |
| membership.status = "active" |
| membership.start_date = start_date |
| membership.cancelled_at = None |
| |
| |
| if membership.billing_period == "monthly": |
| membership.renewal_date = start_date + timedelta(days=30) |
| elif membership.billing_period == "quarterly": |
| membership.renewal_date = start_date + timedelta(days=90) |
| elif membership.billing_period == "half_yearly": |
| membership.renewal_date = start_date + timedelta(days=182) |
| elif membership.billing_period == "annual": |
| membership.renewal_date = start_date + timedelta(days=365) |
| else: |
| membership.renewal_date = start_date + timedelta(days=30) |
| |
| db.commit() |
| db.refresh(membership) |
| print(f"[webhook] Membership reactivated: membership_id={membership_id}, start_date={start_date}") |
| return {"received": True, "type": "reactivation"} |
| except (ValueError, TypeError) as e: |
| print(f"[webhook] Error processing reactivation: {e}") |
| pass |
| |
| |
| direct_subscription = metadata.get("direct_subscription") == "true" |
| if direct_subscription: |
| plan_id = metadata.get("plan_id") |
| if plan_id: |
| try: |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == int(plan_id)).first() |
| if plan: |
| |
| start = date.today() |
| if plan.billing_period == "monthly": |
| renewal = start + timedelta(days=30) |
| elif plan.billing_period == "quarterly": |
| renewal = start + timedelta(days=90) |
| elif plan.billing_period == "half_yearly": |
| renewal = start + timedelta(days=182) |
| elif plan.billing_period == "annual": |
| renewal = start + timedelta(days=365) |
| else: |
| renewal = start + timedelta(days=30) |
| |
| membership = Membership( |
| user_email=email, |
| user_name=name, |
| plan_id=plan.id, |
| plan_name=plan.name, |
| status="active", |
| start_date=start, |
| renewal_date=renewal, |
| price=float(plan.price), |
| billing_period=plan.billing_period, |
| max_students=plan.max_students, |
| ) |
| db.add(membership) |
| db.commit() |
| db.refresh(membership) |
| |
| |
| stored_data = plan_subscription_data.get(email, {}).get(int(plan_id)) |
| if stored_data: |
| member_details = stored_data.get("member_details", {}) |
| students_data = stored_data.get("students", []) |
| |
| |
| if member_details: |
| membership.member_first_name = member_details.get("first_name", "") |
| membership.member_last_name = member_details.get("last_name", "") |
| membership.phone_number = member_details.get("phone_number") |
| membership.emergency_contact_person = member_details.get("emergency_contact_person") |
| membership.emergency_contact_number = member_details.get("emergency_contact_number") |
| |
| |
| for student_data in students_data: |
| student = Student( |
| membership_id=membership.id, |
| first_name=student_data.get("first_name", ""), |
| last_name=student_data.get("last_name", ""), |
| date_of_birth=datetime.strptime(student_data["date_of_birth"], "%Y-%m-%d").date() if student_data.get("date_of_birth") else None, |
| gender=student_data.get("gender"), |
| medical_notes=student_data.get("medical_notes"), |
| ) |
| db.add(student) |
| |
| db.commit() |
| |
| if email in plan_subscription_data and int(plan_id) in plan_subscription_data[email]: |
| del plan_subscription_data[email][int(plan_id)] |
| if not plan_subscription_data[email]: |
| del plan_subscription_data[email] |
| print(f"[webhook] Direct subscription payment processed with students: membership_id={membership.id}, email={email}, plan={plan.name}, students_count={len(students_data)}") |
| else: |
| print(f"[webhook] Direct subscription payment processed (no student data): membership_id={membership.id}, email={email}, plan={plan.name}") |
| |
| return {"received": True, "type": "direct_subscription"} |
| except (ValueError, TypeError) as e: |
| print(f"[webhook] Error processing direct subscription: {e}") |
| pass |
| |
| |
| |
| if not plan_name: |
| plan_name = "Unknown Plan" |
| |
| billing_period = metadata.get("billing_period", "monthly") |
| price_str = metadata.get("price", "0") |
|
|
| try: |
| price_value = float(price_str) |
| except ValueError: |
| price_value = 0.0 |
|
|
| plan = ( |
| db.query(MembershipPlan) |
| .filter(MembershipPlan.name == plan_name) |
| .first() |
| ) |
|
|
| start = date.today() |
| if billing_period == "monthly": |
| renewal = start + timedelta(days=30) |
| elif billing_period == "quarterly": |
| renewal = start + timedelta(days=90) |
| elif billing_period == "half_yearly": |
| renewal = start + timedelta(days=182) |
| elif billing_period == "annual": |
| renewal = start + timedelta(days=365) |
| else: |
| renewal = start + timedelta(days=30) |
|
|
| membership = Membership( |
| user_email=email, |
| user_name=name, |
| plan_id=plan.id if plan else None, |
| plan_name=plan_name, |
| status="active", |
| start_date=start, |
| renewal_date=renewal, |
| price=price_value, |
| billing_period=billing_period, |
| max_students=plan.max_students if plan else 1, |
| ) |
| db.add(membership) |
| db.commit() |
| db.refresh(membership) |
| |
| |
| |
| invite = None |
| if invite_token: |
| invite = db.query(MembershipInvite).filter( |
| MembershipInvite.invite_token == invite_token |
| ).first() |
| else: |
| |
| invite = db.query(MembershipInvite).filter( |
| MembershipInvite.email == email, |
| MembershipInvite.status == "pending" |
| ).order_by(MembershipInvite.created_at.desc()).first() |
| |
| if invite and invite.students_data: |
| |
| if isinstance(invite.students_data, dict) and "member_details" in invite.students_data: |
| |
| member_details = invite.students_data.get("member_details", {}) |
| students_list = invite.students_data.get("students", []) |
| |
| |
| membership.member_first_name = member_details.get("first_name") |
| membership.member_last_name = member_details.get("last_name") |
| membership.phone_number = member_details.get("phone_number") |
| membership.emergency_contact_person = member_details.get("emergency_contact_person") |
| membership.emergency_contact_number = member_details.get("emergency_contact_number") |
| |
| if member_details.get("first_name") and member_details.get("last_name"): |
| membership.user_name = f"{member_details.get('first_name')} {member_details.get('last_name')}" |
| |
| |
| for student_data in students_list: |
| |
| date_of_birth = None |
| if student_data.get("date_of_birth"): |
| try: |
| date_of_birth = datetime.strptime(student_data["date_of_birth"], "%Y-%m-%d").date() |
| except: |
| pass |
| |
| student = Student( |
| membership_id=membership.id, |
| first_name=student_data.get("first_name", ""), |
| last_name=student_data.get("last_name", ""), |
| date_of_birth=date_of_birth, |
| gender=student_data.get("gender"), |
| medical_notes=student_data.get("medical_notes"), |
| ) |
| db.add(student) |
| else: |
| |
| students_list = invite.students_data if isinstance(invite.students_data, list) else [] |
| for student_data in students_list: |
| |
| name = student_data.get("name", "") |
| first_name = student_data.get("first_name", name.split()[0] if name else "") |
| last_name = student_data.get("last_name", " ".join(name.split()[1:]) if len(name.split()) > 1 else "") |
| |
| |
| date_of_birth = None |
| if student_data.get("date_of_birth"): |
| try: |
| date_of_birth = datetime.strptime(student_data["date_of_birth"], "%Y-%m-%d").date() |
| except: |
| pass |
| |
| student = Student( |
| membership_id=membership.id, |
| first_name=first_name, |
| last_name=last_name, |
| date_of_birth=date_of_birth, |
| gender=student_data.get("gender"), |
| medical_notes=student_data.get("medical_notes"), |
| ) |
| db.add(student) |
| |
| |
| invite.status = "completed" |
| invite.completed_at = datetime.utcnow() |
| db.commit() |
|
|
| return {"received": True} |
|
|
|
|
| |
|
|
| @app.get("/api/admin/invites") |
| def get_admin_invites(db: Session = Depends(get_db)): |
| """Get all membership invites.""" |
| invites = db.query(MembershipInvite).order_by(MembershipInvite.created_at.desc()).all() |
| return [ |
| { |
| "id": i.id, |
| "email": i.email, |
| "plan_id": i.plan_id, |
| "plan_name": i.plan_name, |
| "plan_price": i.plan_price, |
| "max_students": i.max_students, |
| "class_details": i.class_details, |
| "status": i.status, |
| "invite_token": i.invite_token, |
| "invited_by": i.invited_by, |
| "invite_date": i.invite_date.isoformat() if i.invite_date else None, |
| "completed_at": i.completed_at.isoformat() if i.completed_at else None, |
| "created_at": i.created_at.isoformat() if i.created_at else None, |
| } |
| for i in invites |
| ] |
|
|
|
|
| @app.delete("/api/admin/invites/{invite_id}") |
| def delete_invite(invite_id: int, db: Session = Depends(get_db)): |
| """Delete a membership invite.""" |
| invite = db.query(MembershipInvite).filter(MembershipInvite.id == invite_id).first() |
| if not invite: |
| raise HTTPException(status_code=404, detail="Invite not found") |
| |
| db.delete(invite) |
| db.commit() |
| return {"status": "ok", "message": "Invite deleted"} |
|
|
|
|
| @app.post("/api/admin/invites") |
| def create_membership_invite(invite: dict, db: Session = Depends(get_db)): |
| """Create a new membership invite.""" |
| |
| token = secrets.token_urlsafe(32) |
| |
| |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == invite.get("plan_id")).first() |
| if not plan: |
| raise HTTPException(status_code=404, detail="Plan not found") |
| |
| new_invite = MembershipInvite( |
| email=invite["email"], |
| plan_id=plan.id, |
| plan_name=plan.name, |
| plan_price=str(float(plan.price)), |
| max_students=plan.max_students, |
| class_details=invite.get("class_details"), |
| status="pending", |
| invite_token=token, |
| invited_by=invite.get("invited_by", "admin"), |
| ) |
| db.add(new_invite) |
| db.commit() |
| db.refresh(new_invite) |
| |
| |
| try: |
| |
| base_url = os.getenv("BASE_URL", "http://localhost:7860") |
| dashboard_url = f"{base_url}/student" |
| |
| subject = f"Membership Invitation - {plan.name}" |
| body_lines = [ |
| f"Hello,", |
| "", |
| f"You've been invited to join our Karate Dojo with the {plan.name} membership plan.", |
| "", |
| f"Plan Details:", |
| f"- Price: ${plan.price}/{plan.billing_period}", |
| f"- Maximum Students: {plan.max_students}", |
| "", |
| ] |
| |
| if invite.get("class_details"): |
| body_lines.extend([ |
| "Classes Included:", |
| invite.get("class_details"), |
| "", |
| ]) |
| |
| body_lines.extend([ |
| "To complete your membership and add your students, please:", |
| "1. Log in to the student dashboard using this email address:", |
| f" {new_invite.email}", |
| "", |
| f"2. Click here to access your dashboard:", |
| dashboard_url, |
| "", |
| "3. You'll see your pending invite on the dashboard", |
| "4. Click on it to add student information and complete payment", |
| "", |
| "If you have any questions, please contact us.", |
| "", |
| "See you on the mat!", |
| ]) |
| |
| body = "\n".join(body_lines) |
| send_email(new_invite.email, subject, body) |
| except Exception as e: |
| print(f"[create_invite] Failed to send invite email: {e}") |
| |
| |
| return { |
| "id": new_invite.id, |
| "email": new_invite.email, |
| "plan_id": new_invite.plan_id, |
| "plan_name": new_invite.plan_name, |
| "plan_price": new_invite.plan_price, |
| "max_students": new_invite.max_students, |
| "class_details": new_invite.class_details, |
| "status": new_invite.status, |
| "invite_token": new_invite.invite_token, |
| "invited_by": new_invite.invited_by, |
| "invite_date": new_invite.invite_date.isoformat() if new_invite.invite_date else None, |
| } |
|
|
|
|
| @app.delete("/api/admin/invites/{invite_id}") |
| def delete_membership_invite(invite_id: int, db: Session = Depends(get_db)): |
| """Delete a membership invite.""" |
| invite = db.query(MembershipInvite).filter(MembershipInvite.id == invite_id).first() |
| if not invite: |
| raise HTTPException(status_code=404, detail="Invite not found") |
| db.delete(invite) |
| db.commit() |
| return {"status": "deleted"} |
|
|
|
|
| |
|
|
| @app.get("/api/invite/id/{invite_id}") |
| def get_invite_by_id(invite_id: int, db: Session = Depends(get_db)): |
| """Get invite details by ID.""" |
| invite = db.query(MembershipInvite).filter(MembershipInvite.id == invite_id).first() |
| if not invite: |
| raise HTTPException(status_code=404, detail="Invite not found") |
| |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == invite.plan_id).first() if invite.plan_id else None |
| |
| return { |
| "id": invite.id, |
| "email": invite.email, |
| "plan_id": invite.plan_id, |
| "plan_name": invite.plan_name, |
| "plan_price": invite.plan_price, |
| "max_students": invite.max_students, |
| "class_details": invite.class_details, |
| "status": invite.status, |
| "invite_token": invite.invite_token, |
| "invited_by": invite.invited_by, |
| "invite_date": invite.invite_date.isoformat() if invite.invite_date else None, |
| "billing_period": plan.billing_period if plan else "monthly", |
| "stripe_link": plan.stripe_link if plan else None, |
| } |
|
|
|
|
| @app.get("/api/invite/{token}") |
| def get_invite_by_token(token: str, db: Session = Depends(get_db)): |
| """Get invite details by token (public endpoint).""" |
| invite = db.query(MembershipInvite).filter(MembershipInvite.invite_token == token).first() |
| if not invite: |
| raise HTTPException(status_code=404, detail="Invite not found") |
| |
| if invite.status != "pending": |
| raise HTTPException(status_code=400, detail="This invite has already been used or expired") |
| |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == invite.plan_id).first() |
| |
| return { |
| "id": invite.id, |
| "email": invite.email, |
| "plan_id": invite.plan_id, |
| "plan_name": invite.plan_name, |
| "plan_price": invite.plan_price, |
| "max_students": invite.max_students, |
| "class_details": invite.class_details, |
| "stripe_link": plan.stripe_link if plan else None, |
| "invite_token": invite.invite_token, |
| } |
|
|
|
|
| @app.post("/api/invite/{token}/students") |
| def save_invite_students(token: str, payload: dict, db: Session = Depends(get_db)): |
| """Save student data and member details for an invite (before payment).""" |
| invite = db.query(MembershipInvite).filter(MembershipInvite.invite_token == token).first() |
| if not invite: |
| raise HTTPException(status_code=404, detail="Invite not found") |
| |
| if invite.status != "pending": |
| raise HTTPException(status_code=400, detail="This invite has already been used") |
| |
| member_details = payload.get("member_details", {}) |
| students_data = payload.get("students", []) |
| |
| if not students_data or len(students_data) == 0: |
| raise HTTPException(status_code=400, detail="At least one student is required") |
| |
| if len(students_data) > invite.max_students: |
| raise HTTPException(status_code=400, detail=f"Maximum {invite.max_students} students allowed") |
| |
| |
| member_first_name = member_details.get("first_name", "").strip() if member_details.get("first_name") else "" |
| member_last_name = member_details.get("last_name", "").strip() if member_details.get("last_name") else "" |
| |
| if not member_first_name or not member_last_name: |
| raise HTTPException(status_code=400, detail="Member first name and last name are required") |
| |
| |
| |
| invite.students_data = { |
| "member_details": member_details, |
| "students": students_data, |
| } |
| db.commit() |
| |
| return { |
| "status": "ok", |
| "students_count": len(students_data), |
| "message": "Student information saved. Proceed to payment.", |
| } |
|
|
|
|
| @app.post("/api/invite/{token}/checkout") |
| def create_checkout_session(token: str, db: Session = Depends(get_db)): |
| """Create a Stripe checkout session dynamically with invite token in metadata.""" |
| invite = db.query(MembershipInvite).filter(MembershipInvite.invite_token == token).first() |
| if not invite: |
| raise HTTPException(status_code=404, detail="Invite not found") |
| |
| if invite.status != "pending": |
| raise HTTPException(status_code=400, detail="This invite has already been used") |
| |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == invite.plan_id).first() |
| if not plan: |
| raise HTTPException(status_code=400, detail="Plan not found") |
| |
| |
| base_url = os.getenv("BASE_URL", "http://localhost:7860") |
| if not base_url.startswith("http"): |
| base_url = f"https://{base_url}" |
| |
| |
| price_cents = int(float(plan.price) * 100) |
| |
| try: |
| |
| checkout_session = stripe.checkout.Session.create( |
| payment_method_types=["card"], |
| line_items=[ |
| { |
| "price_data": { |
| "currency": "cad", |
| "product_data": { |
| "name": f"{plan.name} Membership", |
| "description": plan.description or f"{plan.billing_period.capitalize()} membership plan", |
| }, |
| "unit_amount": price_cents, |
| "recurring": { |
| "interval": "month" if plan.billing_period in ["monthly", "half_yearly"] else "year", |
| "interval_count": 6 if plan.billing_period == "half_yearly" else 1, |
| } if plan.billing_period in ["monthly", "half_yearly", "annual"] else None, |
| }, |
| "quantity": 1, |
| } |
| ], |
| mode="subscription" if plan.billing_period in ["monthly", "half_yearly", "annual"] else "payment", |
| success_url=f"{base_url}/student?payment=success", |
| cancel_url=f"{base_url}/student?payment=cancelled", |
| customer_email=invite.email, |
| metadata={ |
| "invite_token": token, |
| "plan_name": plan.name, |
| "plan_id": str(plan.id), |
| "billing_period": plan.billing_period, |
| "price": str(plan.price), |
| "max_students": str(plan.max_students), |
| }, |
| ) |
| |
| return {"checkout_url": checkout_session.url, "session_id": checkout_session.id} |
| except Exception as e: |
| print(f"[checkout] Error creating Stripe session: {e}") |
| raise HTTPException(status_code=500, detail=f"Failed to create payment session: {str(e)}") |
|
|
|
|
| @app.post("/api/exam/{registration_id}/checkout") |
| def create_exam_checkout_session(registration_id: int, db: Session = Depends(get_db)): |
| """Create a Stripe checkout session dynamically for exam registration.""" |
| registration = db.query(ExamStudentRegistration).filter( |
| ExamStudentRegistration.id == registration_id |
| ).first() |
| |
| if not registration: |
| raise HTTPException(status_code=404, detail="Exam registration not found") |
| |
| if registration.payment_status == "paid": |
| raise HTTPException(status_code=400, detail="This registration has already been paid") |
| |
| exam = db.query(Exam).filter(Exam.id == registration.exam_id).first() |
| if not exam: |
| raise HTTPException(status_code=404, detail="Exam not found") |
| |
| |
| batch = db.query(ExamBatch).filter(ExamBatch.id == registration.batch_id).first() |
| batch_name = batch.batch_name if batch else "Exam" |
| |
| |
| base_url = os.getenv("BASE_URL", "http://localhost:7860") |
| if not base_url.startswith("http"): |
| base_url = f"https://{base_url}" |
| |
| |
| exam_fee_cents = int(float(exam.exam_fee) * 100) |
| |
| try: |
| |
| checkout_session = stripe.checkout.Session.create( |
| payment_method_types=["card"], |
| line_items=[ |
| { |
| "price_data": { |
| "currency": "cad", |
| "product_data": { |
| "name": f"{exam.name} - {batch_name}", |
| "description": f"Exam registration for {registration.student_name}", |
| }, |
| "unit_amount": exam_fee_cents, |
| }, |
| "quantity": 1, |
| } |
| ], |
| mode="payment", |
| success_url=f"{base_url}/student/exams?payment=success", |
| cancel_url=f"{base_url}/student/exams?payment=cancelled", |
| customer_email=registration.student_email, |
| metadata={ |
| "exam_registration_id": str(registration_id), |
| "exam_id": str(exam.id), |
| "batch_id": str(registration.batch_id), |
| "student_email": registration.student_email, |
| "student_name": registration.student_name, |
| }, |
| ) |
| |
| return {"checkout_url": checkout_session.url, "session_id": checkout_session.id} |
| except Exception as e: |
| print(f"[checkout] Error creating exam Stripe session: {e}") |
| raise HTTPException(status_code=500, detail=f"Failed to create payment session: {str(e)}") |
|
|
|
|
| |
| |
| |
| plan_subscription_data = {} |
|
|
| @app.post("/api/plan/{plan_id}/students") |
| def save_plan_students(plan_id: int, email: str, payload: dict, db: Session = Depends(get_db)): |
| """Save student data and member details for a direct plan subscription (before payment).""" |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == plan_id).first() |
| if not plan: |
| raise HTTPException(status_code=404, detail="Plan not found") |
| |
| if not plan.is_active: |
| raise HTTPException(status_code=400, detail="This plan is not active") |
| |
| member_details = payload.get("member_details", {}) |
| students_data = payload.get("students", []) |
| |
| if not students_data or len(students_data) == 0: |
| raise HTTPException(status_code=400, detail="At least one student is required") |
| |
| if len(students_data) > plan.max_students: |
| raise HTTPException(status_code=400, detail=f"Maximum {plan.max_students} students allowed") |
| |
| |
| member_first_name = member_details.get("first_name", "").strip() if member_details.get("first_name") else "" |
| member_last_name = member_details.get("last_name", "").strip() if member_details.get("last_name") else "" |
| |
| if not member_first_name or not member_last_name: |
| raise HTTPException(status_code=400, detail="Member first name and last name are required") |
| |
| |
| if email not in plan_subscription_data: |
| plan_subscription_data[email] = {} |
| plan_subscription_data[email][plan_id] = { |
| "member_details": member_details, |
| "students": students_data, |
| "plan_id": plan_id, |
| "email": email, |
| } |
| |
| return { |
| "status": "ok", |
| "students_count": len(students_data), |
| "message": "Student information saved. Proceed to payment.", |
| } |
|
|
|
|
| @app.post("/api/plan/{plan_id}/checkout") |
| def create_plan_checkout_session(plan_id: int, email: str = Query(..., description="Student email"), db: Session = Depends(get_db)): |
| """Create a Stripe checkout session dynamically for direct plan subscription (without invite).""" |
| plan = db.query(MembershipPlan).filter(MembershipPlan.id == plan_id).first() |
| |
| if not plan: |
| raise HTTPException(status_code=404, detail="Plan not found") |
| |
| if not plan.is_active: |
| raise HTTPException(status_code=400, detail="This plan is not active") |
| |
| |
| base_url = os.getenv("BASE_URL", "http://localhost:7860") |
| if not base_url.startswith("http"): |
| base_url = f"https://{base_url}" |
| |
| |
| price_cents = int(float(plan.price) * 100) |
| |
| try: |
| |
| checkout_session = stripe.checkout.Session.create( |
| payment_method_types=["card"], |
| line_items=[ |
| { |
| "price_data": { |
| "currency": "cad", |
| "product_data": { |
| "name": f"{plan.name} Membership", |
| "description": plan.description or f"{plan.billing_period.capitalize()} membership plan", |
| }, |
| "unit_amount": price_cents, |
| "recurring": { |
| "interval": "month" if plan.billing_period in ["monthly", "half_yearly"] else "year", |
| "interval_count": 6 if plan.billing_period == "half_yearly" else 1, |
| } if plan.billing_period in ["monthly", "half_yearly", "annual"] else None, |
| }, |
| "quantity": 1, |
| } |
| ], |
| mode="subscription" if plan.billing_period in ["monthly", "half_yearly", "annual"] else "payment", |
| success_url=f"{base_url}/student?payment=success", |
| cancel_url=f"{base_url}/student?payment=cancelled", |
| customer_email=email, |
| metadata={ |
| "plan_id": str(plan.id), |
| "plan_name": plan.name, |
| "billing_period": plan.billing_period, |
| "price": str(plan.price), |
| "max_students": str(plan.max_students), |
| "direct_subscription": "true", |
| }, |
| ) |
| |
| return {"checkout_url": checkout_session.url, "session_id": checkout_session.id} |
| except Exception as e: |
| print(f"[checkout] Error creating plan Stripe session: {e}") |
| raise HTTPException(status_code=500, detail=f"Failed to create payment session: {str(e)}") |
|
|
|
|
| |
|
|
| @app.get("/api/admin/students") |
| def get_admin_students(membership_id: int = None, db: Session = Depends(get_db)): |
| """Get all students, optionally filtered by membership_id. Includes membership info.""" |
| query = db.query(Student) |
| if membership_id: |
| query = query.filter(Student.membership_id == membership_id) |
| students = query.all() |
| result = [] |
| for s in students: |
| |
| membership = db.query(Membership).filter(Membership.id == s.membership_id).first() |
| result.append({ |
| "id": s.id, |
| "membership_id": s.membership_id, |
| "first_name": s.first_name, |
| "last_name": s.last_name, |
| "date_of_birth": s.date_of_birth.isoformat() if s.date_of_birth else None, |
| "gender": s.gender, |
| "medical_notes": s.medical_notes, |
| "created_at": s.created_at.isoformat() if s.created_at else None, |
| |
| "membership_email": membership.user_email if membership else None, |
| "membership_name": membership.user_name if membership else None, |
| "membership_plan_name": membership.plan_name if membership else None, |
| }) |
| return result |
|
|
|
|
| @app.get("/api/admin/students/{student_id}/comments") |
| def get_admin_student_comments(student_id: int, db: Session = Depends(get_db)): |
| """Get all comments for a student (admin view).""" |
| student = db.query(Student).filter(Student.id == student_id).first() |
| if not student: |
| raise HTTPException(status_code=404, detail="Student not found") |
| |
| |
| comments = db.query(StudentComment).filter( |
| StudentComment.student_id == student_id |
| ).order_by(StudentComment.created_at.desc()).all() |
| |
| comments_list = [] |
| for c in comments: |
| |
| coach_name = c.coach_email.split("@")[0].replace(".", " ").title() |
| comments_list.append({ |
| "id": c.id, |
| "coach_email": c.coach_email, |
| "coach_name": coach_name, |
| "comment": c.comment, |
| "created_at": c.created_at.isoformat(), |
| }) |
| |
| return comments_list |
|
|
|
|
| @app.post("/api/admin/students") |
| def create_student(student: dict, db: Session = Depends(get_db)): |
| """Create a new student.""" |
| from datetime import datetime as dt |
| |
| date_of_birth = None |
| if student.get("date_of_birth"): |
| try: |
| date_of_birth = dt.strptime(student["date_of_birth"], "%Y-%m-%d").date() |
| except: |
| pass |
| |
| new_student = Student( |
| membership_id=student["membership_id"], |
| first_name=student["first_name"], |
| last_name=student["last_name"], |
| date_of_birth=date_of_birth, |
| gender=student.get("gender"), |
| medical_notes=student.get("medical_notes"), |
| ) |
| db.add(new_student) |
| db.commit() |
| db.refresh(new_student) |
| return { |
| "id": new_student.id, |
| "membership_id": new_student.membership_id, |
| "first_name": new_student.first_name, |
| "last_name": new_student.last_name, |
| "date_of_birth": new_student.date_of_birth.isoformat() if new_student.date_of_birth else None, |
| "gender": new_student.gender, |
| "medical_notes": new_student.medical_notes, |
| "created_at": new_student.created_at.isoformat() if new_student.created_at else None, |
| } |
|
|
|
|
| @app.put("/api/admin/students/{student_id}") |
| def update_student(student_id: int, student: dict, db: Session = Depends(get_db)): |
| """Update a student.""" |
| from datetime import datetime as dt |
| |
| db_student = db.query(Student).filter(Student.id == student_id).first() |
| if not db_student: |
| raise HTTPException(status_code=404, detail="Student not found") |
| |
| if "first_name" in student: |
| db_student.first_name = student["first_name"] |
| if "last_name" in student: |
| db_student.last_name = student["last_name"] |
| if "date_of_birth" in student: |
| if student["date_of_birth"]: |
| try: |
| db_student.date_of_birth = dt.strptime(student["date_of_birth"], "%Y-%m-%d").date() |
| except: |
| pass |
| else: |
| db_student.date_of_birth = None |
| if "gender" in student: |
| db_student.gender = student.get("gender") |
| if "medical_notes" in student: |
| db_student.medical_notes = student.get("medical_notes") |
| |
| db.commit() |
| db.refresh(db_student) |
| return { |
| "id": db_student.id, |
| "membership_id": db_student.membership_id, |
| "first_name": db_student.first_name, |
| "last_name": db_student.last_name, |
| "date_of_birth": db_student.date_of_birth.isoformat() if db_student.date_of_birth else None, |
| "gender": db_student.gender, |
| "medical_notes": db_student.medical_notes, |
| "created_at": db_student.created_at.isoformat() if db_student.created_at else None, |
| } |
|
|
|
|
| @app.delete("/api/admin/students/{student_id}") |
| def delete_student(student_id: int, db: Session = Depends(get_db)): |
| """Delete a student.""" |
| db_student = db.query(Student).filter(Student.id == student_id).first() |
| if not db_student: |
| raise HTTPException(status_code=404, detail="Student not found") |
| db.delete(db_student) |
| db.commit() |
| return {"status": "deleted"} |
|
|
|
|
| |
|
|
| @app.get("/api/admin/exams", response_model=List[ExamWithBatches]) |
| def get_admin_exams(db: Session = Depends(get_db)): |
| """Get all exams with their batches and registrations.""" |
| exams = db.query(Exam).order_by(Exam.exam_date.desc()).all() |
| result = [] |
| for exam in exams: |
| batches = db.query(ExamBatch).filter(ExamBatch.exam_id == exam.id).all() |
| batch_data = [] |
| for batch in batches: |
| registrations = db.query(ExamStudentRegistration).filter( |
| ExamStudentRegistration.batch_id == batch.id |
| ).all() |
| batch_data.append({ |
| **ExamBatchOut.model_validate(batch).model_dump(), |
| "registrations": [ExamStudentRegistrationOut.model_validate(r).model_dump() for r in registrations] |
| }) |
| result.append({ |
| **ExamOut.model_validate(exam).model_dump(), |
| "batches": batch_data |
| }) |
| return result |
|
|
|
|
| @app.post("/api/admin/exams", response_model=ExamOut, status_code=201) |
| def create_exam(exam: ExamCreate, db: Session = Depends(get_db)): |
| """Create a new exam.""" |
| db_exam = Exam(**exam.model_dump()) |
| db.add(db_exam) |
| db.commit() |
| db.refresh(db_exam) |
| return db_exam |
|
|
|
|
| @app.put("/api/admin/exams/{exam_id}", response_model=ExamOut) |
| def update_exam(exam_id: int, exam_update: ExamUpdate, db: Session = Depends(get_db)): |
| """Update an exam.""" |
| db_exam = db.query(Exam).filter(Exam.id == exam_id).first() |
| if not db_exam: |
| raise HTTPException(status_code=404, detail="Exam not found") |
| |
| update_data = exam_update.model_dump(exclude_unset=True) |
| for key, value in update_data.items(): |
| setattr(db_exam, key, value) |
| |
| db.commit() |
| db.refresh(db_exam) |
| return db_exam |
|
|
|
|
| @app.delete("/api/admin/exams/{exam_id}", status_code=204) |
| def delete_exam(exam_id: int, db: Session = Depends(get_db)): |
| """Delete an exam and all its batches and registrations.""" |
| db_exam = db.query(Exam).filter(Exam.id == exam_id).first() |
| if not db_exam: |
| raise HTTPException(status_code=404, detail="Exam not found") |
| db.delete(db_exam) |
| db.commit() |
| return None |
|
|
|
|
| |
|
|
| @app.get("/api/admin/exam-batches", response_model=List[ExamBatchOut]) |
| def get_exam_batches(exam_id: int = None, db: Session = Depends(get_db)): |
| """Get exam batches, optionally filtered by exam_id.""" |
| query = db.query(ExamBatch) |
| if exam_id: |
| query = query.filter(ExamBatch.exam_id == exam_id) |
| return query.all() |
|
|
|
|
| @app.post("/api/admin/exam-batches", response_model=ExamBatchOut, status_code=201) |
| def create_exam_batch(batch: ExamBatchCreate, db: Session = Depends(get_db)): |
| """Create a new exam batch.""" |
| |
| exam = db.query(Exam).filter(Exam.id == batch.exam_id).first() |
| if not exam: |
| raise HTTPException(status_code=404, detail="Exam not found") |
| |
| db_batch = ExamBatch(**batch.model_dump()) |
| db.add(db_batch) |
| db.commit() |
| db.refresh(db_batch) |
| return db_batch |
|
|
|
|
| @app.put("/api/admin/exam-batches/{batch_id}", response_model=ExamBatchOut) |
| def update_exam_batch(batch_id: int, batch_update: ExamBatchUpdate, db: Session = Depends(get_db)): |
| """Update an exam batch.""" |
| db_batch = db.query(ExamBatch).filter(ExamBatch.id == batch_id).first() |
| if not db_batch: |
| raise HTTPException(status_code=404, detail="Batch not found") |
| |
| update_data = batch_update.model_dump(exclude_unset=True) |
| for key, value in update_data.items(): |
| setattr(db_batch, key, value) |
| |
| db.commit() |
| db.refresh(db_batch) |
| return db_batch |
|
|
|
|
| @app.delete("/api/admin/exam-batches/{batch_id}", status_code=204) |
| def delete_exam_batch(batch_id: int, db: Session = Depends(get_db)): |
| """Delete an exam batch and all its registrations.""" |
| db_batch = db.query(ExamBatch).filter(ExamBatch.id == batch_id).first() |
| if not db_batch: |
| raise HTTPException(status_code=404, detail="Batch not found") |
| db.delete(db_batch) |
| db.commit() |
| return None |
|
|
|
|
| |
|
|
| @app.get("/api/admin/exam-registrations", response_model=List[ExamStudentRegistrationOut]) |
| def get_exam_registrations(exam_id: int = None, batch_id: int = None, db: Session = Depends(get_db)): |
| """Get exam registrations, optionally filtered by exam_id or batch_id.""" |
| query = db.query(ExamStudentRegistration) |
| if exam_id: |
| query = query.filter(ExamStudentRegistration.exam_id == exam_id) |
| if batch_id: |
| query = query.filter(ExamStudentRegistration.batch_id == batch_id) |
| return query.all() |
|
|
|
|
| @app.post("/api/admin/exam-registrations", response_model=ExamStudentRegistrationOut, status_code=201) |
| def create_exam_registration(registration: ExamStudentRegistrationCreate, db: Session = Depends(get_db)): |
| """Create a new exam student registration.""" |
| |
| exam = db.query(Exam).filter(Exam.id == registration.exam_id).first() |
| if not exam: |
| raise HTTPException(status_code=404, detail="Exam not found") |
| |
| batch = db.query(ExamBatch).filter(ExamBatch.id == registration.batch_id).first() |
| if not batch: |
| raise HTTPException(status_code=404, detail="Batch not found") |
| |
| |
| if registration.student_id: |
| student = db.query(Student).filter(Student.id == registration.student_id).first() |
| if student: |
| |
| |
| pass |
| |
| db_registration = ExamStudentRegistration(**registration.model_dump()) |
| db.add(db_registration) |
| db.commit() |
| db.refresh(db_registration) |
| return db_registration |
|
|
|
|
| @app.put("/api/admin/exam-registrations/{registration_id}", response_model=ExamStudentRegistrationOut) |
| def update_exam_registration(registration_id: int, registration_update: ExamStudentRegistrationUpdate, db: Session = Depends(get_db)): |
| """Update an exam registration.""" |
| db_registration = db.query(ExamStudentRegistration).filter(ExamStudentRegistration.id == registration_id).first() |
| if not db_registration: |
| raise HTTPException(status_code=404, detail="Registration not found") |
| |
| update_data = registration_update.model_dump(exclude_unset=True) |
| for key, value in update_data.items(): |
| setattr(db_registration, key, value) |
| |
| db.commit() |
| db.refresh(db_registration) |
| return db_registration |
|
|
|
|
| @app.delete("/api/admin/exam-registrations/{registration_id}", status_code=204) |
| def delete_exam_registration(registration_id: int, db: Session = Depends(get_db)): |
| """Delete an exam registration.""" |
| db_registration = db.query(ExamStudentRegistration).filter(ExamStudentRegistration.id == registration_id).first() |
| if not db_registration: |
| raise HTTPException(status_code=404, detail="Registration not found") |
| db.delete(db_registration) |
| db.commit() |
| return None |
|
|
|
|
| @app.post("/api/admin/exam-registrations/notify") |
| def notify_exam_registrations(registration_ids: List[int], db: Session = Depends(get_db)): |
| """Mark registrations as notified (send notification emails).""" |
| registrations = db.query(ExamStudentRegistration).filter( |
| ExamStudentRegistration.id.in_(registration_ids) |
| ).all() |
| |
| for reg in registrations: |
| reg.notification_sent = True |
| |
| db.commit() |
| return {"notified_count": len(registrations)} |
|
|
|
|
| |
|
|
| @app.post("/api/ai/admin") |
| def admin_ai_dummy(): |
| return { |
| "answer": "This is a placeholder answer from the Admin AI assistant.", |
| "insights": [ |
| "Beginner Monthly has strong growth.", |
| "Advanced Monthly has slightly higher churn.", |
| ], |
| "suggested_actions": [ |
| "Send renewal reminders 7 days before due date.", |
| "Offer a free trial class to at-risk members.", |
| ], |
| } |
|
|
|
|
| @app.post("/api/ai/student") |
| def student_ai_dummy(): |
| return { |
| "answer": "This is a placeholder answer from the Student AI assistant. Your next renewal is on 2025-02-01 for $99." |
| } |
|
|
|
|
| |
|
|
| def _serve_index() -> FileResponse: |
| """Return the built React index.html.""" |
| index_path = os.path.join(STATIC_DIR, "index.html") |
| if os.path.exists(index_path): |
| return FileResponse(index_path, media_type="text/html") |
| raise HTTPException(status_code=404, detail="Index not found") |
|
|
|
|
| @app.get("/", include_in_schema=False) |
| async def serve_index_root(): |
| return _serve_index() |
|
|
|
|
| @app.get("/login", include_in_schema=False) |
| @app.get("/student", include_in_schema=False) |
| @app.get("/admin", include_in_schema=False) |
| @app.get("/admin/members", include_in_schema=False) |
| @app.get("/admin/classes", include_in_schema=False) |
| @app.get("/admin/exams", include_in_schema=False) |
| async def serve_index_named_routes(): |
| return _serve_index() |
|
|
|
|
| @app.get("/{full_path:path}", include_in_schema=False) |
| async def spa_fallback(full_path: str): |
| """ |
| Catch-all route for React Router (e.g. /something-else). |
| Let /api/* and /assets/* fall through to their own handlers. |
| """ |
| if full_path.startswith("api/") or full_path.startswith("assets/") or "." in full_path: |
| raise HTTPException(status_code=404, detail="Not Found") |
|
|
| return _serve_index() |
|
|