keeai / app /main.py
Seth0330's picture
Update app/main.py
c074dbc verified
# app/main.py
from datetime import date, datetime, timedelta
from typing import List
import os
import random
import secrets
import smtplib
from email.mime.text import MIMEText
from fastapi import FastAPI, Depends, Request, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from sqlalchemy import func
import stripe
# DB + models + config
from core.database import Base, engine, SessionLocal
from core.config import settings
from models.membership_plan import MembershipPlan
from models.membership import Membership
from models.login_otp import LoginOTP
from models.product import Product
from models.membership_invite import MembershipInvite
from models.membership_request import MembershipRequest
from models.student import Student
from models.exam import Exam, ExamBatch, ExamStudentRegistration
from models.attendance import Attendance
from models.student_comment import StudentComment
from models.class_ import Class, ClassEnrollment, ClassMembershipPlan
# Classes schemas + service
from schemas.class_ import (
ClassCreate,
ClassUpdate,
ClassOut,
ClassEnrollmentCreate,
ClassEnrollmentOut,
)
from schemas.exam import (
ExamCreate,
ExamUpdate,
ExamOut,
ExamBatchCreate,
ExamBatchUpdate,
ExamBatchOut,
ExamStudentRegistrationCreate,
ExamStudentRegistrationUpdate,
ExamStudentRegistrationOut,
ExamWithBatches,
)
from services import class_service
from services.email_service import send_email
# ---------- Stripe config ----------
if settings.STRIPE_SECRET_KEY:
stripe.api_key = settings.STRIPE_SECRET_KEY
app = FastAPI(
title="Karate School Membership API",
version="0.1.0",
)
# ---------- CORS ----------
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # tighten later if needed
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------- Serve built React frontend ----------
# React build folder is copied to /app/frontend_dist by Dockerfile
STATIC_DIR = os.path.join(os.path.dirname(__file__), "frontend_dist")
ASSETS_DIR = os.path.join(STATIC_DIR, "assets")
if os.path.isdir(ASSETS_DIR):
app.mount(
"/assets",
StaticFiles(directory=ASSETS_DIR),
name="assets",
)
# ---------- Database dependency ----------
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ---------- Email helper for OTP ----------
def send_otp_email(email: str, code: str) -> bool:
"""Send OTP email. Returns True if sent, False if SMTP not configured or failed."""
if not settings.SMTP_HOST or not settings.SMTP_FROM:
return False
msg = MIMEText(
f"Your Karate Portal login code is: {code}\n\n"
f"This code will expire in 10 minutes."
)
msg["Subject"] = "Your Karate Portal login code"
msg["From"] = settings.SMTP_FROM
msg["To"] = email
try:
with smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.starttls()
if settings.SMTP_USERNAME and settings.SMTP_PASSWORD:
server.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD)
server.send_message(msg)
return True
except Exception:
return False
# ---------- Startup ----------
@app.on_event("startup")
def on_startup():
Base.metadata.create_all(bind=engine)
# ---------- Diagnostic endpoints ----------
@app.get("/api/health")
def health_check():
return {"status": "ok", "message": "Karate API is running"}
@app.get("/api/db-status")
def db_status():
return {
"database_url": str(engine.url)
}
# ---------- AUTH: Email + OTP ----------
@app.post("/api/auth/request-otp")
async def request_otp(payload: dict, db: Session = Depends(get_db)):
email = payload.get("email")
if not email:
raise HTTPException(status_code=400, detail="Email is required")
code = f"{random.randint(0, 999999):06d}"
expires_at = datetime.utcnow() + timedelta(minutes=10)
# remove old OTPs for this email
db.query(LoginOTP).filter(LoginOTP.email == email).delete()
otp = LoginOTP(
email=email,
code=code,
expires_at=expires_at,
used=False,
)
db.add(otp)
db.commit()
email_sent = send_otp_email(email, code)
# For testing, if SMTP not configured, return the code so you can log in
if not email_sent:
return {
"status": "sent",
"debug_code": code,
"note": "SMTP not configured; OTP returned in response for testing.",
}
return {"status": "sent"}
@app.post("/api/auth/login")
async def login_with_otp(payload: dict, db: Session = Depends(get_db)):
email = payload.get("email")
code = payload.get("otp")
if not email or not code:
raise HTTPException(status_code=400, detail="Email and otp are required")
now = datetime.utcnow()
otp = (
db.query(LoginOTP)
.filter(LoginOTP.email == email)
.filter(LoginOTP.code == code)
.filter(LoginOTP.used == False) # noqa: E712
.filter(LoginOTP.expires_at >= now)
.first()
)
if not otp:
raise HTTPException(status_code=400, detail="Invalid or expired OTP")
otp.used = True
db.commit()
# update last_login_at on memberships for this email
db.query(Membership).filter(Membership.user_email == email).update(
{Membership.last_login_at: now}
)
db.commit()
membership = (
db.query(Membership)
.filter(Membership.user_email == email)
.order_by(Membership.start_date.desc())
.first()
)
name = membership.user_name if membership else (email.split("@")[0])
return {
"status": "ok",
"email": email,
"name": name,
}
# ---------- COACH: Login & Classes ----------
@app.post("/api/coach/login")
async def coach_login_with_otp(payload: dict, db: Session = Depends(get_db)):
"""Coach login with OTP - checks if email is assigned to any class."""
email = payload.get("email")
code = payload.get("otp")
if not email or not code:
raise HTTPException(status_code=400, detail="Email and otp are required")
now = datetime.utcnow()
otp = (
db.query(LoginOTP)
.filter(LoginOTP.email == email)
.filter(LoginOTP.code == code)
.filter(LoginOTP.used == False) # noqa: E712
.filter(LoginOTP.expires_at >= now)
.first()
)
if not otp:
raise HTTPException(status_code=400, detail="Invalid or expired OTP")
# Check if this email is assigned as a coach to any class
coach_classes = db.query(Class).filter(Class.coach_email == email).first()
if not coach_classes:
raise HTTPException(status_code=403, detail="Email is not assigned as a coach to any class")
otp.used = True
db.commit()
# Extract name from email (part before @)
name = email.split("@")[0].replace(".", " ").title()
return {
"status": "ok",
"email": email,
"name": name,
}
@app.get("/api/coach/classes")
def get_coach_classes(email: str, db: Session = Depends(get_db)):
"""Get all classes assigned to a coach."""
classes = db.query(Class).filter(Class.coach_email == email, Class.is_active == True).all()
result = []
for c in classes:
enrollments = db.query(ClassEnrollment).filter(
ClassEnrollment.class_id == c.id,
ClassEnrollment.status == "joined"
).all()
students = []
for e in enrollments:
if e.student_id:
student = db.query(Student).filter(Student.id == e.student_id).first()
if student:
students.append({
"id": student.id,
"first_name": student.first_name,
"last_name": student.last_name,
"email": e.student_email,
})
else:
students.append({
"id": None,
"first_name": e.student_name or e.student_email.split("@")[0],
"last_name": "",
"email": e.student_email,
})
result.append({
"id": c.id,
"name": c.name,
"description": c.description,
"class_time": c.class_time,
"days_of_week": c.days_of_week.split(",") if c.days_of_week else [],
"max_students": c.max_students,
"students": students,
"student_count": len(students),
})
return result
@app.get("/api/coach/classes/{class_id}/students")
def get_coach_class_students(class_id: int, email: str, db: Session = Depends(get_db)):
"""Get students in a class with their attendance for today."""
# Verify coach is assigned to this class
class_obj = db.query(Class).filter(Class.id == class_id, Class.coach_email == email).first()
if not class_obj:
raise HTTPException(status_code=404, detail="Class not found or not assigned to this coach")
enrollments = db.query(ClassEnrollment).filter(
ClassEnrollment.class_id == class_id,
ClassEnrollment.status == "joined"
).all()
today = date.today()
students = []
for e in enrollments:
student_data = None
if e.student_id:
student = db.query(Student).filter(Student.id == e.student_id).first()
if student:
student_data = {
"id": student.id,
"first_name": student.first_name,
"last_name": student.last_name,
"email": e.student_email,
}
if not student_data:
student_data = {
"id": None,
"first_name": e.student_name or e.student_email.split("@")[0],
"last_name": "",
"email": e.student_email,
}
# Get today's attendance (only if student_id exists)
attendance = None
if e.student_id:
attendance = db.query(Attendance).filter(
Attendance.class_id == class_id,
Attendance.student_id == e.student_id,
Attendance.attendance_date == today
).first()
student_data["attendance_id"] = attendance.id if attendance else None
student_data["is_present"] = attendance.is_present if attendance else None
students.append(student_data)
return {
"class": {
"id": class_obj.id,
"name": class_obj.name,
"description": class_obj.description,
},
"students": students,
}
@app.post("/api/coach/classes/{class_id}/attendance")
def mark_attendance(
class_id: int,
payload: dict,
db: Session = Depends(get_db)
):
"""Mark attendance for a student in a class."""
email = payload.get("coach_email")
student_id = payload.get("student_id")
is_present = payload.get("is_present", True)
if not email or not student_id:
raise HTTPException(status_code=400, detail="coach_email and student_id are required")
# Verify coach is assigned to this class
class_obj = db.query(Class).filter(Class.id == class_id, Class.coach_email == email).first()
if not class_obj:
raise HTTPException(status_code=404, detail="Class not found or not assigned to this coach")
today = date.today()
# Check if attendance already exists for today
attendance = db.query(Attendance).filter(
Attendance.class_id == class_id,
Attendance.student_id == student_id,
Attendance.attendance_date == today
).first()
if attendance:
attendance.is_present = is_present
attendance.coach_email = email
else:
attendance = Attendance(
class_id=class_id,
student_id=student_id,
attendance_date=today,
is_present=is_present,
coach_email=email,
)
db.add(attendance)
db.commit()
db.refresh(attendance)
return {
"status": "ok",
"attendance_id": attendance.id,
"is_present": attendance.is_present,
}
@app.get("/api/coach/students/{student_id}")
def get_coach_student_profile(student_id: int, email: str, db: Session = Depends(get_db)):
"""Get student profile with comments for coach."""
student = db.query(Student).filter(Student.id == student_id).first()
if not student:
raise HTTPException(status_code=404, detail="Student not found")
# Get all comments for this student
comments = db.query(StudentComment).filter(
StudentComment.student_id == student_id
).order_by(StudentComment.created_at.desc()).all()
comments_list = []
for c in comments:
# Extract name from email
coach_name = c.coach_email.split("@")[0].replace(".", " ").title()
comments_list.append({
"id": c.id,
"coach_email": c.coach_email,
"coach_name": coach_name,
"comment": c.comment,
"created_at": c.created_at.isoformat(),
})
return {
"id": student.id,
"first_name": student.first_name,
"last_name": student.last_name,
"date_of_birth": student.date_of_birth.isoformat() if student.date_of_birth else None,
"gender": student.gender,
"medical_notes": student.medical_notes,
"comments": comments_list,
}
@app.post("/api/coach/students/{student_id}/comments")
def add_student_comment(
student_id: int,
payload: dict,
db: Session = Depends(get_db)
):
"""Add a comment to a student profile."""
email = payload.get("coach_email")
comment_text = payload.get("comment")
if not email or not comment_text:
raise HTTPException(status_code=400, detail="coach_email and comment are required")
student = db.query(Student).filter(Student.id == student_id).first()
if not student:
raise HTTPException(status_code=404, detail="Student not found")
comment = StudentComment(
student_id=student_id,
coach_email=email,
comment=comment_text,
)
db.add(comment)
db.commit()
db.refresh(comment)
coach_name = email.split("@")[0].replace(".", " ").title()
return {
"id": comment.id,
"coach_email": comment.coach_email,
"coach_name": coach_name,
"comment": comment.comment,
"created_at": comment.created_at.isoformat(),
}
# ---------- ADMIN: Overview & Plans ----------
@app.get("/api/admin/overview")
def get_admin_overview(db: Session = Depends(get_db)):
memberships = db.query(Membership).all()
total_members = len(memberships)
active_members = len([m for m in memberships if m.status == "active"])
pending_members = len([m for m in memberships if m.status == "pending"])
expired_members = len([m for m in memberships if m.status == "expired"])
mrr = 0.0
for m in memberships:
if m.status != "active":
continue
if m.billing_period == "monthly":
mrr += float(m.price)
elif m.billing_period == "quarterly":
mrr += float(m.price) / 3.0
elif m.billing_period == "half_yearly":
mrr += float(m.price) / 6.0
elif m.billing_period == "annual":
mrr += float(m.price) / 12.0
plans = db.query(MembershipPlan).all()
members_by_plan = []
for p in plans:
count = len(
[m for m in memberships if m.plan_id == p.id and m.status == "active"]
)
members_by_plan.append(
{"plan_id": p.id, "plan_name": p.name, "count": count}
)
return {
"total_members": total_members,
"active_members": active_members,
"pending_members": pending_members,
"expired_members": expired_members,
"monthly_recurring_revenue": round(mrr, 2),
"annual_recurring_revenue": round(mrr * 12, 2),
"churn_last_30_days": 0,
"new_members_last_30_days": 0,
"members_by_plan": members_by_plan,
}
@app.get("/api/admin/plans")
def get_admin_plans(db: Session = Depends(get_db)):
plans = db.query(MembershipPlan).all()
return [
{
"id": p.id,
"name": p.name,
"billing_period": p.billing_period,
"price": float(p.price),
"stripe_link": p.stripe_link,
"description": p.description,
"max_students": p.max_students,
"is_active": p.is_active,
"is_default": p.is_default,
}
for p in plans
]
@app.post("/api/admin/plans")
def create_plan(plan: dict, db: Session = Depends(get_db)):
"""
Create a new membership plan.
Expects JSON with: name, billing_period, price, stripe_link, description?, is_active?, is_default?
"""
# Validate max 2 default plans
is_default = plan.get("is_default", False)
if is_default:
current_default_count = db.query(MembershipPlan).filter(MembershipPlan.is_default == True).count()
if current_default_count >= 2:
raise HTTPException(status_code=400, detail="Maximum 2 default plans allowed. Please uncheck another default plan first.")
new_plan = MembershipPlan(
name=plan["name"],
billing_period=plan["billing_period"],
price=plan["price"],
stripe_link=plan.get("stripe_link"),
description=plan.get("description"),
max_students=plan.get("max_students", 1),
is_active=plan.get("is_active", True),
is_default=is_default,
)
db.add(new_plan)
db.commit()
db.refresh(new_plan)
return {
"id": new_plan.id,
"name": new_plan.name,
"billing_period": new_plan.billing_period,
"price": float(new_plan.price),
"stripe_link": new_plan.stripe_link,
"description": new_plan.description,
"max_students": new_plan.max_students,
"is_active": new_plan.is_active,
"is_default": new_plan.is_default,
}
@app.put("/api/admin/plans/{plan_id}")
def update_plan(plan_id: int, plan: dict, db: Session = Depends(get_db)):
"""
Update an existing membership plan.
"""
db_plan = db.query(MembershipPlan).filter(MembershipPlan.id == plan_id).first()
if not db_plan:
raise HTTPException(status_code=404, detail="Plan not found")
# Validate max 2 default plans
is_default = plan.get("is_default", db_plan.is_default)
if is_default and not db_plan.is_default: # Only check if setting to default
current_default_count = db.query(MembershipPlan).filter(
MembershipPlan.is_default == True,
MembershipPlan.id != plan_id
).count()
if current_default_count >= 2:
raise HTTPException(status_code=400, detail="Maximum 2 default plans allowed. Please uncheck another default plan first.")
db_plan.name = plan.get("name", db_plan.name)
db_plan.billing_period = plan.get("billing_period", db_plan.billing_period)
db_plan.price = plan.get("price", db_plan.price)
db_plan.stripe_link = plan.get("stripe_link", db_plan.stripe_link)
db_plan.description = plan.get("description", db_plan.description)
db_plan.max_students = plan.get("max_students", db_plan.max_students)
db_plan.is_active = plan.get("is_active", db_plan.is_active)
db_plan.is_default = is_default
db.commit()
db.refresh(db_plan)
return {
"id": db_plan.id,
"name": db_plan.name,
"billing_period": db_plan.billing_period,
"price": float(db_plan.price),
"stripe_link": db_plan.stripe_link,
"description": db_plan.description,
"max_students": db_plan.max_students,
"is_active": db_plan.is_active,
"is_default": db_plan.is_default,
}
@app.delete("/api/admin/plans/{plan_id}")
def delete_plan(plan_id: int, db: Session = Depends(get_db)):
"""
Delete a membership plan.
"""
db_plan = db.query(MembershipPlan).filter(MembershipPlan.id == plan_id).first()
if not db_plan:
raise HTTPException(status_code=404, detail="Plan not found")
db.delete(db_plan)
db.commit()
return {"status": "deleted"}
# ---------- ADMIN: Products ----------
@app.get("/api/admin/products")
def get_admin_products(db: Session = Depends(get_db)):
products = db.query(Product).all()
return [
{
"id": p.id,
"name": p.name,
"price": float(p.price),
"stripe_link": p.stripe_link,
"description": p.description,
"image_url": p.image_url,
"is_active": p.is_active,
"created_at": p.created_at.isoformat() if p.created_at else None,
"updated_at": p.updated_at.isoformat() if p.updated_at else None,
}
for p in products
]
@app.post("/api/admin/products")
def create_product(product: dict, db: Session = Depends(get_db)):
"""
Create a new product.
Expects JSON with: name, price, stripe_link?, description?, image_url?, is_active?
"""
new_product = Product(
name=product["name"],
price=product["price"],
stripe_link=product.get("stripe_link"),
description=product.get("description"),
image_url=product.get("image_url"),
is_active=product.get("is_active", True),
)
db.add(new_product)
db.commit()
db.refresh(new_product)
return {
"id": new_product.id,
"name": new_product.name,
"price": float(new_product.price),
"stripe_link": new_product.stripe_link,
"description": new_product.description,
"image_url": new_product.image_url,
"is_active": new_product.is_active,
"created_at": new_product.created_at.isoformat() if new_product.created_at else None,
"updated_at": new_product.updated_at.isoformat() if new_product.updated_at else None,
}
@app.put("/api/admin/products/{product_id}")
def update_product(product_id: int, product: dict, db: Session = Depends(get_db)):
"""
Update an existing product.
"""
db_product = db.query(Product).filter(Product.id == product_id).first()
if not db_product:
raise HTTPException(status_code=404, detail="Product not found")
db_product.name = product.get("name", db_product.name)
db_product.price = product.get("price", db_product.price)
db_product.stripe_link = product.get("stripe_link", db_product.stripe_link)
db_product.description = product.get("description", db_product.description)
db_product.image_url = product.get("image_url", db_product.image_url)
db_product.is_active = product.get("is_active", db_product.is_active)
db.commit()
db.refresh(db_product)
return {
"id": db_product.id,
"name": db_product.name,
"price": float(db_product.price),
"stripe_link": db_product.stripe_link,
"description": db_product.description,
"image_url": db_product.image_url,
"is_active": db_product.is_active,
"created_at": db_product.created_at.isoformat() if db_product.created_at else None,
"updated_at": db_product.updated_at.isoformat() if db_product.updated_at else None,
}
@app.delete("/api/admin/products/{product_id}")
def delete_product(product_id: int, db: Session = Depends(get_db)):
"""
Delete a product.
"""
db_product = db.query(Product).filter(Product.id == product_id).first()
if not db_product:
raise HTTPException(status_code=404, detail="Product not found")
db.delete(db_product)
db.commit()
return {"status": "deleted"}
# ---------- ADMIN: Classes & Enrollments ----------
@app.get("/api/admin/classes", response_model=list[ClassOut])
def get_admin_classes(db: Session = Depends(get_db)):
"""
List all classes (most recent first).
"""
classes = class_service.get_classes(db)
# Add membership_plan_ids and schedule to each class
result = []
for cls in classes:
# Reconstruct schedule from days_of_week and class_time
days = cls.days_of_week.split(",") if cls.days_of_week else []
class_time_parts = cls.class_time.split("-") if cls.class_time and "-" in cls.class_time else [cls.class_time, ""]
start_time = class_time_parts[0].strip() if len(class_time_parts) > 0 else ""
end_time = class_time_parts[1].strip() if len(class_time_parts) > 1 else ""
schedule = []
for day in days:
if day.strip():
schedule.append({
"day": day.strip(),
"start_time": start_time,
"end_time": end_time,
"time": f"{start_time}-{end_time}" if start_time and end_time else start_time or "",
})
# If no schedule, create a default one
if not schedule:
schedule = [{"day": "Monday", "start_time": "", "end_time": "", "time": ""}]
class_dict = {
"id": cls.id,
"name": cls.name,
"description": cls.description,
"class_time": cls.class_time,
"days_of_week": days,
"classes_per_week": cls.classes_per_week,
"max_students": cls.max_students,
"is_active": cls.is_active,
"coach_email": cls.coach_email,
"location": cls.location,
"created_at": cls.created_at,
"updated_at": cls.updated_at,
"membership_plan_ids": [
cmp.membership_plan_id
for cmp in db.query(ClassMembershipPlan)
.filter(ClassMembershipPlan.class_id == cls.id)
.all()
],
"schedule": schedule,
}
result.append(class_dict)
return result
@app.post("/api/admin/classes", response_model=ClassOut, status_code=201)
def create_admin_class(
class_in: ClassCreate,
db: Session = Depends(get_db),
):
"""
Create a new class configuration.
"""
db_class = class_service.create_class(db, class_in)
# Get membership_plan_ids for response
membership_plan_ids = [
cmp.membership_plan_id
for cmp in db.query(ClassMembershipPlan)
.filter(ClassMembershipPlan.class_id == db_class.id)
.all()
]
# Reconstruct schedule
days = db_class.days_of_week.split(",") if db_class.days_of_week else []
class_time_parts = db_class.class_time.split("-") if db_class.class_time and "-" in db_class.class_time else [db_class.class_time, ""]
start_time = class_time_parts[0].strip() if len(class_time_parts) > 0 else ""
end_time = class_time_parts[1].strip() if len(class_time_parts) > 1 else ""
schedule = []
for day in days:
if day.strip():
schedule.append({
"day": day.strip(),
"start_time": start_time,
"end_time": end_time,
"time": f"{start_time}-{end_time}" if start_time and end_time else start_time or "",
})
if not schedule:
schedule = [{"day": "Monday", "start_time": "", "end_time": "", "time": ""}]
# Convert to dict for response
class_dict = {
"id": db_class.id,
"name": db_class.name,
"description": db_class.description,
"class_time": db_class.class_time,
"days_of_week": days,
"classes_per_week": db_class.classes_per_week,
"max_students": db_class.max_students,
"is_active": db_class.is_active,
"coach_email": db_class.coach_email,
"location": db_class.location,
"created_at": db_class.created_at,
"updated_at": db_class.updated_at,
"membership_plan_ids": membership_plan_ids,
"schedule": schedule,
}
return class_dict
@app.put("/api/admin/classes/{class_id}", response_model=ClassOut)
def update_admin_class(
class_id: int,
class_in: ClassUpdate,
db: Session = Depends(get_db),
):
"""
Update an existing class.
"""
db_class = class_service.update_class(db, class_id, class_in)
if not db_class:
raise HTTPException(status_code=404, detail="Class not found")
# Get membership_plan_ids for response
membership_plan_ids = [
cmp.membership_plan_id
for cmp in db.query(ClassMembershipPlan)
.filter(ClassMembershipPlan.class_id == class_id)
.all()
]
# Reconstruct schedule
days = db_class.days_of_week.split(",") if db_class.days_of_week else []
class_time_parts = db_class.class_time.split("-") if db_class.class_time and "-" in db_class.class_time else [db_class.class_time, ""]
start_time = class_time_parts[0].strip() if len(class_time_parts) > 0 else ""
end_time = class_time_parts[1].strip() if len(class_time_parts) > 1 else ""
schedule = []
for day in days:
if day.strip():
schedule.append({
"day": day.strip(),
"start_time": start_time,
"end_time": end_time,
"time": f"{start_time}-{end_time}" if start_time and end_time else start_time or "",
})
if not schedule:
schedule = [{"day": "Monday", "start_time": "", "end_time": "", "time": ""}]
# Convert to dict for response
class_dict = {
"id": db_class.id,
"name": db_class.name,
"description": db_class.description,
"class_time": db_class.class_time,
"days_of_week": days,
"classes_per_week": db_class.classes_per_week,
"max_students": db_class.max_students,
"is_active": db_class.is_active,
"coach_email": db_class.coach_email,
"location": db_class.location,
"created_at": db_class.created_at,
"updated_at": db_class.updated_at,
"membership_plan_ids": membership_plan_ids,
"schedule": schedule,
}
return class_dict
@app.delete("/api/admin/classes/{class_id}", status_code=204)
def delete_admin_class(
class_id: int,
db: Session = Depends(get_db),
):
"""
Delete a class and its enrollments.
"""
ok = class_service.delete_class(db, class_id)
if not ok:
raise HTTPException(status_code=404, detail="Class not found")
return None
@app.get(
"/api/admin/classes/{class_id}/enrollments",
response_model=list[ClassEnrollmentOut],
)
def get_admin_class_enrollments(
class_id: int,
db: Session = Depends(get_db),
):
"""
List all enrollments for a given class.
"""
db_class = class_service.get_class(db, class_id)
if not db_class:
raise HTTPException(status_code=404, detail="Class not found")
enrollments = class_service.get_enrollments(db, class_id)
return enrollments
@app.post(
"/api/admin/classes/{class_id}/invite",
response_model=ClassEnrollmentOut,
status_code=201,
)
def invite_student_to_class(
class_id: int,
enrollment_in: ClassEnrollmentCreate,
db: Session = Depends(get_db),
):
"""
Invite a student to a class by email.
Creates an 'invited' enrollment and sends an email (via email_service).
"""
enrollment = class_service.invite_student_to_class(db, class_id, enrollment_in)
if not enrollment:
raise HTTPException(status_code=404, detail="Class not found")
return enrollment
@app.post(
"/api/admin/classes/{class_id}/enroll",
response_model=ClassEnrollmentOut,
status_code=201,
)
def enroll_student_to_class(
class_id: int,
enrollment_in: ClassEnrollmentCreate,
db: Session = Depends(get_db),
):
"""
Directly enroll a student to a class (status='joined').
No email is sent - this is for direct admin enrollment.
"""
enrollment = class_service.enroll_student_to_class(db, class_id, enrollment_in)
if not enrollment:
raise HTTPException(status_code=404, detail="Class not found")
return enrollment
@app.delete(
"/api/admin/classes/{class_id}/enrollments/{enrollment_id}",
status_code=204,
)
def remove_student_from_class(
class_id: int,
enrollment_id: int,
db: Session = Depends(get_db),
):
"""
Mark an enrollment as removed (does not physically delete the row).
"""
db_class = class_service.get_class(db, class_id)
if not db_class:
raise HTTPException(status_code=404, detail="Class not found")
ok = class_service.remove_enrollment(db, enrollment_id)
if not ok:
raise HTTPException(status_code=404, detail="Enrollment not found")
return None
# ---------- ADMIN: Memberships, Renewals, At-risk ----------
@app.get("/api/admin/memberships")
def get_admin_memberships(db: Session = Depends(get_db)):
memberships = db.query(Membership).all()
items = [
{
"id": m.id,
"user_email": m.user_email,
"user_name": m.user_name,
"plan_name": m.plan_name,
"status": m.status,
"start_date": m.start_date.isoformat(),
"renewal_date": m.renewal_date.isoformat(),
"price": float(m.price),
"billing_period": m.billing_period,
"max_students": m.max_students,
"last_login_at": m.last_login_at.isoformat() if m.last_login_at else None,
"member_first_name": m.member_first_name,
"member_last_name": m.member_last_name,
"phone_number": m.phone_number,
"emergency_contact_person": m.emergency_contact_person,
"emergency_contact_number": m.emergency_contact_number,
}
for m in memberships
]
return {
"items": items,
"total": len(items),
"page": 1,
"page_size": len(items),
}
@app.put("/api/admin/memberships/{membership_id}")
def update_membership(membership_id: int, membership_data: dict, db: Session = Depends(get_db)):
"""Update a membership."""
db_membership = db.query(Membership).filter(Membership.id == membership_id).first()
if not db_membership:
raise HTTPException(status_code=404, detail="Membership not found")
if "status" in membership_data:
db_membership.status = membership_data["status"]
if "start_date" in membership_data:
if membership_data["start_date"]:
try:
from datetime import datetime as dt
db_membership.start_date = dt.strptime(membership_data["start_date"], "%Y-%m-%d").date()
except:
pass
if "renewal_date" in membership_data:
if membership_data["renewal_date"]:
try:
from datetime import datetime as dt
db_membership.renewal_date = dt.strptime(membership_data["renewal_date"], "%Y-%m-%d").date()
except:
pass
if "price" in membership_data:
db_membership.price = float(membership_data["price"])
db.commit()
db.refresh(db_membership)
return {
"id": db_membership.id,
"user_email": db_membership.user_email,
"user_name": db_membership.user_name,
"plan_name": db_membership.plan_name,
"status": db_membership.status,
"start_date": db_membership.start_date.isoformat(),
"renewal_date": db_membership.renewal_date.isoformat(),
"price": float(db_membership.price),
"billing_period": db_membership.billing_period,
"max_students": db_membership.max_students,
}
@app.delete("/api/admin/memberships/{membership_id}")
def delete_membership(membership_id: int, db: Session = Depends(get_db)):
"""Delete a membership (cascade deletes students)."""
db_membership = db.query(Membership).filter(Membership.id == membership_id).first()
if not db_membership:
raise HTTPException(status_code=404, detail="Membership not found")
db.delete(db_membership)
db.commit()
return {"status": "ok", "message": "Membership deleted"}
@app.get("/api/admin/renewals")
def get_admin_renewals(db: Session = Depends(get_db)):
today = date.today()
in_30 = today + timedelta(days=30)
memberships = (
db.query(Membership)
.filter(Membership.status == "active")
.filter(Membership.renewal_date >= today)
.filter(Membership.renewal_date <= in_30)
.all()
)
return [
{
"membership_id": m.id,
"user_name": m.user_name,
"user_email": m.user_email,
"plan_name": m.plan_name,
"status": m.status,
"renewal_date": m.renewal_date.isoformat(),
"days_until_renewal": (m.renewal_date - today).days,
"price": float(m.price),
}
for m in memberships
]
@app.get("/api/admin/at-risk")
def get_admin_at_risk(db: Session = Depends(get_db)):
today = date.today()
in_14 = today + timedelta(days=14)
memberships = (
db.query(Membership)
.filter(Membership.status == "active")
.filter(Membership.renewal_date >= today)
.filter(Membership.renewal_date <= in_14)
.all()
)
results = []
for m in memberships:
days_left = (m.renewal_date - today).days
risk_score = max(0.0, 1.0 - days_left / 14.0)
if risk_score > 0.66:
risk_level = "high"
elif risk_score > 0.33:
risk_level = "medium"
else:
risk_level = "low"
results.append(
{
"membership_id": m.id,
"user_name": m.user_name,
"user_email": m.user_email,
"plan_name": m.plan_name,
"status": m.status,
"renewal_date": m.renewal_date.isoformat(),
"risk_score": round(risk_score, 2),
"risk_level": risk_level,
"reasons": [
f"Renewal in {days_left} days",
],
}
)
return results
# ---------- STUDENT endpoints ----------
@app.get("/api/student/plans")
def get_student_plans(db: Session = Depends(get_db)):
# Get default plans first, then other active plans
default_plans = db.query(MembershipPlan).filter(
MembershipPlan.is_active == True,
MembershipPlan.is_default == True
).all() # noqa: E712
other_plans = db.query(MembershipPlan).filter(
MembershipPlan.is_active == True,
MembershipPlan.is_default == False
).all() # noqa: E712
# Combine: default plans first, then others
all_plans = list(default_plans) + list(other_plans)
return [
{
"id": p.id,
"name": p.name,
"billing_period": p.billing_period,
"price": float(p.price),
"stripe_link": p.stripe_link,
"description": p.description,
"max_students": p.max_students,
"is_active": p.is_active,
"is_default": p.is_default,
"is_current": False,
}
for p in all_plans
]
@app.get("/api/student/invites")
def get_student_invites(email: str, db: Session = Depends(get_db)):
"""Get pending invites for a student email."""
invites = (
db.query(MembershipInvite)
.filter(MembershipInvite.email == email)
.filter(MembershipInvite.status == "pending")
.order_by(MembershipInvite.created_at.desc())
.all()
)
# Get plan details for each invite
result = []
for i in invites:
plan = db.query(MembershipPlan).filter(MembershipPlan.id == i.plan_id).first() if i.plan_id else None
result.append({
"id": i.id,
"email": i.email,
"plan_id": i.plan_id,
"plan_name": i.plan_name,
"plan_price": i.plan_price,
"max_students": i.max_students,
"class_details": i.class_details,
"status": i.status,
"invite_token": i.invite_token,
"invited_by": i.invited_by,
"invite_date": i.invite_date.isoformat() if i.invite_date else None,
"created_at": i.created_at.isoformat() if i.created_at else None,
"billing_period": plan.billing_period if plan else "monthly",
"stripe_link": plan.stripe_link if plan else None,
})
return result
@app.post("/api/student/memberships/{membership_id}/pause-request")
def request_pause_membership(membership_id: int, db: Session = Depends(get_db)):
"""Submit a pause request for membership. Requires admin approval."""
membership = db.query(Membership).filter(Membership.id == membership_id).first()
if not membership:
raise HTTPException(status_code=404, detail="Membership not found")
if membership.status not in ["active", "paused"]:
raise HTTPException(status_code=400, detail="Only active or paused memberships can request pause")
# Check if there's already a pending pause request
existing_request = db.query(MembershipRequest).filter(
MembershipRequest.membership_id == membership_id,
MembershipRequest.request_type == "pause",
MembershipRequest.status == "pending"
).first()
if existing_request:
raise HTTPException(status_code=400, detail="A pause request is already pending")
# Check if membership has already been paused in this period (check if paused_at exists)
if membership.paused_at:
raise HTTPException(status_code=400, detail="Membership can only be paused once per period")
# Create pause request
pause_request = MembershipRequest(
membership_id=membership_id,
request_type="pause",
status="pending",
requested_by=membership.user_email
)
db.add(pause_request)
db.commit()
db.refresh(pause_request)
return {
"status": "ok",
"message": "Pause request submitted. Waiting for admin approval.",
"request_id": pause_request.id
}
@app.post("/api/student/memberships/{membership_id}/cancel-request")
def request_cancel_membership(membership_id: int, db: Session = Depends(get_db)):
"""Submit a cancellation request for membership. Requires admin approval."""
membership = db.query(Membership).filter(Membership.id == membership_id).first()
if not membership:
raise HTTPException(status_code=404, detail="Membership not found")
if membership.status != "active":
raise HTTPException(status_code=400, detail="Only active memberships can request cancellation")
# Check if there's already a pending cancel request
existing_request = db.query(MembershipRequest).filter(
MembershipRequest.membership_id == membership_id,
MembershipRequest.request_type == "cancel",
MembershipRequest.status == "pending"
).first()
if existing_request:
raise HTTPException(status_code=400, detail="A cancellation request is already pending")
# Create cancel request
cancel_request = MembershipRequest(
membership_id=membership_id,
request_type="cancel",
status="pending",
requested_by=membership.user_email
)
db.add(cancel_request)
db.commit()
db.refresh(cancel_request)
return {
"status": "ok",
"message": "Cancellation request submitted. Waiting for admin approval.",
"request_id": cancel_request.id
}
# ---------- MEMBERSHIP REQUESTS (Pause/Cancel) ----------
@app.get("/api/student/requests")
def get_student_requests(email: str, db: Session = Depends(get_db)):
"""Get all requests for a student (by email)."""
# Get all memberships for this email
memberships = db.query(Membership).filter(Membership.user_email == email).all()
membership_ids = [m.id for m in memberships]
if not membership_ids:
return []
requests = db.query(MembershipRequest).filter(
MembershipRequest.membership_id.in_(membership_ids)
).order_by(MembershipRequest.requested_at.desc()).all()
return [
{
"id": r.id,
"membership_id": r.membership_id,
"request_type": r.request_type,
"status": r.status,
"requested_at": r.requested_at.isoformat() if r.requested_at else None,
"approved_at": r.approved_at.isoformat() if r.approved_at else None,
"rejected_at": r.rejected_at.isoformat() if r.rejected_at else None,
"withdrawn_at": r.withdrawn_at.isoformat() if r.withdrawn_at else None,
"notes": r.notes,
}
for r in requests
]
@app.get("/api/student/memberships/{membership_id}/requests")
def get_membership_requests(membership_id: int, db: Session = Depends(get_db)):
"""Get all requests for a membership."""
requests = db.query(MembershipRequest).filter(
MembershipRequest.membership_id == membership_id
).order_by(MembershipRequest.requested_at.desc()).all()
return [
{
"id": r.id,
"request_type": r.request_type,
"status": r.status,
"requested_at": r.requested_at.isoformat() if r.requested_at else None,
"approved_at": r.approved_at.isoformat() if r.approved_at else None,
"rejected_at": r.rejected_at.isoformat() if r.rejected_at else None,
"withdrawn_at": r.withdrawn_at.isoformat() if r.withdrawn_at else None,
"notes": r.notes,
}
for r in requests
]
@app.post("/api/student/memberships/{membership_id}/requests/{request_id}/withdraw")
def withdraw_request(membership_id: int, request_id: int, db: Session = Depends(get_db)):
"""Withdraw a pending request."""
request = db.query(MembershipRequest).filter(
MembershipRequest.id == request_id,
MembershipRequest.membership_id == membership_id,
MembershipRequest.status == "pending"
).first()
if not request:
raise HTTPException(status_code=404, detail="Request not found or already processed")
request.status = "withdrawn"
request.withdrawn_at = datetime.utcnow()
db.commit()
return {"status": "ok", "message": "Request withdrawn"}
@app.post("/api/student/memberships/{membership_id}/resume")
def resume_membership(membership_id: int, db: Session = Depends(get_db)):
"""Resume a paused membership."""
membership = db.query(Membership).filter(Membership.id == membership_id).first()
if not membership:
raise HTTPException(status_code=404, detail="Membership not found")
if membership.status != "paused":
raise HTTPException(status_code=400, detail="Only paused memberships can be resumed")
if not membership.paused_at:
raise HTTPException(status_code=400, detail="Membership pause date not found")
# Calculate the original renewal date (before pause added 30 days)
# The current renewal_date has 30 days added from the original
original_renewal_date = membership.renewal_date - timedelta(days=30)
# Calculate actual pause duration (from pause date to today)
resume_date = date.today()
actual_pause_duration = (resume_date - membership.paused_at).days
# Calculate new renewal date: original renewal date + actual pause duration
new_renewal_date = original_renewal_date + timedelta(days=actual_pause_duration)
# Resume membership - set status back to active and update renewal date
# Keep paused_at for tracking (don't clear it) so we know pause was used in this period
membership.status = "active"
membership.renewal_date = new_renewal_date
# Note: We keep paused_at to track that pause was used in this membership period
db.commit()
db.refresh(membership)
return {
"status": "ok",
"message": "Membership resumed",
"renewal_date": membership.renewal_date.isoformat()
}
@app.post("/api/student/memberships/{membership_id}/reactivate")
def reactivate_membership(membership_id: int, db: Session = Depends(get_db)):
"""Reactivate a cancelled membership. Creates a checkout session for payment."""
membership = db.query(Membership).filter(Membership.id == membership_id).first()
if not membership:
raise HTTPException(status_code=404, detail="Membership not found")
if membership.status != "cancelled":
raise HTTPException(status_code=400, detail="Only cancelled memberships can be reactivated")
# Check if renewal date has passed
today = date.today()
if membership.renewal_date and membership.renewal_date < today:
raise HTTPException(status_code=400, detail="Cannot reactivate membership after renewal date has passed")
# Get the plan
plan = db.query(MembershipPlan).filter(MembershipPlan.id == membership.plan_id).first()
if not plan:
raise HTTPException(status_code=404, detail="Plan not found")
# Create Stripe checkout session for reactivation
# The membership will start from the renewal date
try:
checkout_session = stripe.checkout.Session.create(
payment_method_types=["card"],
line_items=[
{
"price_data": {
"currency": "cad",
"product_data": {
"name": f"Reactivate {membership.plan_name}",
"description": f"Reactivate membership - starts from {membership.renewal_date.isoformat()}",
},
"unit_amount": int(float(membership.price) * 100), # Convert to cents
"recurring": {
"interval": membership.billing_period.replace("_", ""), # monthly, yearly, halfyearly
},
},
"quantity": 1,
}
],
mode="subscription",
success_url=f"{os.getenv('BASE_URL', 'http://localhost:3000')}/student?reactivated=true",
cancel_url=f"{os.getenv('BASE_URL', 'http://localhost:3000')}/student",
customer_email=membership.user_email,
metadata={
"membership_id": str(membership_id),
"type": "reactivation",
"start_date": membership.renewal_date.isoformat(),
},
)
return {
"status": "ok",
"checkout_url": checkout_session.url,
"message": "Checkout session created for reactivation"
}
except Exception as e:
print(f"[reactivate] Error creating checkout session: {e}")
raise HTTPException(status_code=500, detail=f"Error creating payment session: {str(e)}")
# ---------- ADMIN: Membership Requests ----------
@app.get("/api/admin/requests")
def get_admin_requests(db: Session = Depends(get_db)):
"""Get all pending membership requests."""
requests = db.query(MembershipRequest).filter(
MembershipRequest.status == "pending"
).order_by(MembershipRequest.requested_at.desc()).all()
result = []
for r in requests:
membership = db.query(Membership).filter(Membership.id == r.membership_id).first()
if membership:
result.append({
"id": r.id,
"membership_id": r.membership_id,
"request_type": r.request_type,
"status": r.status,
"requested_at": r.requested_at.isoformat() if r.requested_at else None,
"requested_by": r.requested_by,
"member_name": f"{membership.member_first_name or ''} {membership.member_last_name or ''}".strip() or membership.user_name,
"member_email": membership.user_email,
"plan_name": membership.plan_name,
"membership_status": membership.status,
"renewal_date": membership.renewal_date.isoformat() if membership.renewal_date else None,
})
return result
@app.post("/api/admin/requests/{request_id}/approve")
def approve_request(request_id: int, payload: dict = None, db: Session = Depends(get_db)):
"""Approve a membership request and execute it."""
request = db.query(MembershipRequest).filter(
MembershipRequest.id == request_id,
MembershipRequest.status == "pending"
).first()
if not request:
raise HTTPException(status_code=404, detail="Request not found or already processed")
membership = db.query(Membership).filter(Membership.id == request.membership_id).first()
if not membership:
raise HTTPException(status_code=404, detail="Membership not found")
stored_admin = {} # In real app, get from session
admin_name = payload.get("approved_by", "admin") if payload else "admin"
if request.request_type == "pause":
# Execute pause
pause_date = date.today()
# Store the original renewal date before pause
original_renewal_date = membership.renewal_date
# Add 30 days to the original renewal date (pause extends membership by 30 days)
new_renewal_date = original_renewal_date + timedelta(days=30)
membership.renewal_date = new_renewal_date
membership.status = "paused"
membership.paused_at = pause_date
elif request.request_type == "cancel":
# Execute cancellation
# Cancel Stripe subscription if it exists
try:
customers = stripe.Customer.list(email=membership.user_email, limit=1)
if customers.data:
customer = customers.data[0]
subscriptions = stripe.Subscription.list(customer=customer.id, status="active", limit=1)
if subscriptions.data:
subscription = subscriptions.data[0]
stripe.Subscription.modify(
subscription.id,
cancel_at_period_end=True
)
print(f"[approve cancel] Stripe subscription {subscription.id} set to cancel at period end")
except Exception as e:
print(f"[approve cancel] Error canceling Stripe subscription: {e}")
membership.status = "cancelled"
membership.cancelled_at = date.today()
# Update request status
request.status = "approved"
request.approved_at = datetime.utcnow()
request.approved_by = admin_name
request.notes = payload.get("notes", "") if payload else None
db.commit()
db.refresh(membership)
db.refresh(request)
return {
"status": "ok",
"message": f"{request.request_type.capitalize()} request approved and executed",
"membership_status": membership.status
}
@app.post("/api/admin/requests/{request_id}/reject")
def reject_request(request_id: int, payload: dict = None, db: Session = Depends(get_db)):
"""Reject a membership request."""
request = db.query(MembershipRequest).filter(
MembershipRequest.id == request_id,
MembershipRequest.status == "pending"
).first()
if not request:
raise HTTPException(status_code=404, detail="Request not found or already processed")
stored_admin = {} # In real app, get from session
admin_name = payload.get("rejected_by", "admin") if payload else "admin"
request.status = "rejected"
request.rejected_at = datetime.utcnow()
request.rejected_by = admin_name
request.notes = payload.get("notes", "") if payload else None
db.commit()
db.refresh(request)
return {"status": "ok", "message": "Request rejected"}
@app.get("/api/student/memberships")
def get_student_memberships(email: str, db: Session = Depends(get_db)):
memberships = db.query(Membership).filter(Membership.user_email == email).all()
result = []
for m in memberships:
membership_data = {
"id": m.id,
"plan_name": m.plan_name,
"status": m.status,
"start_date": m.start_date.isoformat(),
"renewal_date": m.renewal_date.isoformat(),
"price": float(m.price),
"billing_period": m.billing_period,
"max_students": m.max_students,
"cancelled_at": m.cancelled_at.isoformat() if m.cancelled_at else None,
"paused_at": m.paused_at.isoformat() if m.paused_at else None,
}
result.append(membership_data)
# If membership was cancelled, add a cancellation entry to history
if m.cancelled_at:
result.append({
"id": f"{m.id}_cancelled",
"plan_name": m.plan_name,
"status": "cancelled",
"start_date": m.cancelled_at.isoformat(), # Show cancellation date as "started"
"renewal_date": m.renewal_date.isoformat(),
"price": float(m.price),
"billing_period": m.billing_period,
"max_students": m.max_students,
"cancelled_at": m.cancelled_at.isoformat(),
"is_cancellation_entry": True, # Flag to identify this as a cancellation entry
})
# If membership was paused, add a pause entry to history
if m.paused_at:
result.append({
"id": f"{m.id}_paused",
"plan_name": m.plan_name,
"status": "paused",
"start_date": m.paused_at.isoformat(), # Show pause date as "started"
"renewal_date": m.renewal_date.isoformat(), # This is the resume date
"price": float(m.price),
"billing_period": m.billing_period,
"max_students": m.max_students,
"paused_at": m.paused_at.isoformat(),
"is_pause_entry": True, # Flag to identify this as a pause entry
})
return result
@app.get("/api/student/students")
def get_student_students(email: str, db: Session = Depends(get_db)):
"""Get all students for a membership email."""
# Find membership by email
membership = db.query(Membership).filter(Membership.user_email == email).first()
if not membership:
return []
# Get students for this membership
students = db.query(Student).filter(Student.membership_id == membership.id).all()
result = []
for s in students:
result.append({
"id": s.id,
"membership_id": s.membership_id,
"first_name": s.first_name,
"last_name": s.last_name,
"date_of_birth": s.date_of_birth.isoformat() if s.date_of_birth else None,
"gender": s.gender,
"medical_notes": s.medical_notes,
"created_at": s.created_at.isoformat() if s.created_at else None,
})
return result
@app.get("/api/student/enrollments")
def get_student_enrollments(email: str, db: Session = Depends(get_db)):
"""Get all class enrollments for students under a membership email."""
from models.class_ import ClassEnrollment, Class
# Find membership by email
membership = db.query(Membership).filter(Membership.user_email == email).first()
if not membership:
return []
# Get students for this membership
students = db.query(Student).filter(Student.membership_id == membership.id).all()
student_ids = [s.id for s in students]
if not student_ids:
return []
# Get enrollments for these students
enrollments = (
db.query(ClassEnrollment)
.filter(ClassEnrollment.student_id.in_(student_ids))
.filter(ClassEnrollment.status != "removed")
.all()
)
result = []
for e in enrollments:
# Get class details
class_obj = db.query(Class).filter(Class.id == e.class_id).first()
result.append({
"id": e.id,
"class_id": e.class_id,
"class_name": class_obj.name if class_obj else "Unknown Class",
"student_id": e.student_id,
"student_name": e.student_name,
"student_email": e.student_email,
"status": e.status,
"joined_at": e.joined_at.isoformat() if e.joined_at else None,
})
return result
@app.get("/api/student/classes")
def get_student_classes(email: str, db: Session = Depends(get_db)):
"""Get all unique classes that students under a membership are enrolled in."""
from models.class_ import ClassEnrollment, Class
# Find membership by email
membership = db.query(Membership).filter(Membership.user_email == email).first()
if not membership:
return []
# Get students for this membership
students = db.query(Student).filter(Student.membership_id == membership.id).all()
student_ids = [s.id for s in students]
if not student_ids:
return []
# Get enrollments for these students
enrollments = (
db.query(ClassEnrollment)
.filter(ClassEnrollment.student_id.in_(student_ids))
.filter(ClassEnrollment.status == "joined")
.all()
)
# Get unique class IDs
class_ids = list(set([e.class_id for e in enrollments]))
if not class_ids:
return []
# Get class details
classes = db.query(Class).filter(Class.id.in_(class_ids), Class.is_active == True).all()
result = []
for c in classes:
# Count students from this membership enrolled in this class
class_enrollments = [
e for e in enrollments
if e.class_id == c.id and e.student_id in student_ids
]
result.append({
"id": c.id,
"name": c.name,
"description": c.description,
"class_time": c.class_time,
"days_of_week": c.days_of_week.split(",") if c.days_of_week else [],
"max_students": c.max_students,
"student_count": len(class_enrollments),
})
return result
@app.get("/api/student/classes/{class_id}/attendance")
def get_student_class_attendance(class_id: int, email: str, db: Session = Depends(get_db)):
"""Get attendance history for all students under a membership in a specific class."""
from models.class_ import ClassEnrollment, Class
# Find membership by email
membership = db.query(Membership).filter(Membership.user_email == email).first()
if not membership:
raise HTTPException(status_code=404, detail="Membership not found")
# Verify class exists
class_obj = db.query(Class).filter(Class.id == class_id, Class.is_active == True).first()
if not class_obj:
raise HTTPException(status_code=404, detail="Class not found")
# Get students for this membership
students = db.query(Student).filter(Student.membership_id == membership.id).all()
student_ids = [s.id for s in students]
if not student_ids:
return {
"class": {
"id": class_obj.id,
"name": class_obj.name,
"description": class_obj.description,
"class_time": class_obj.class_time,
"days_of_week": class_obj.days_of_week.split(",") if class_obj.days_of_week else [],
},
"students": [],
}
# Verify students are enrolled in this class
enrollments = (
db.query(ClassEnrollment)
.filter(ClassEnrollment.class_id == class_id)
.filter(ClassEnrollment.student_id.in_(student_ids))
.filter(ClassEnrollment.status == "joined")
.all()
)
# Get enrollment student IDs for filtering
enrollment_student_ids = {e.student_id for e in enrollments if e.student_id is not None}
# Get all attendance records for students in this membership for this class
# We query by student_ids (all students in membership) to get all possible attendance
# Then filter to only show enrolled students
attendance_records = []
if student_ids:
attendance_records = (
db.query(Attendance)
.filter(Attendance.class_id == class_id)
.filter(Attendance.student_id.in_(student_ids))
.order_by(Attendance.attendance_date.desc())
.all()
)
# Group attendance by student
students_data = []
# Create a mapping of student_id to enrollment email
enrollment_email_map = {e.student_id: e.student_email for e in enrollments if e.student_id}
for student in students:
# Only include students that are enrolled in this class
if student.id not in enrollment_student_ids:
continue
# Get attendance records for this specific student
# Filter attendance records that match this student's ID
student_attendance = [
{
"id": a.id,
"attendance_date": a.attendance_date.isoformat(),
"is_present": a.is_present,
"coach_email": a.coach_email,
}
for a in attendance_records
if a.student_id == student.id
]
# Get email from enrollment, fallback to empty string if not found
student_email = enrollment_email_map.get(student.id, "")
students_data.append({
"id": student.id,
"first_name": student.first_name,
"last_name": student.last_name,
"email": student_email,
"attendance_history": student_attendance,
})
return {
"class": {
"id": class_obj.id,
"name": class_obj.name,
"description": class_obj.description,
"class_time": class_obj.class_time,
"days_of_week": class_obj.days_of_week.split(",") if class_obj.days_of_week else [],
},
"students": students_data,
}
@app.get("/api/student/upcoming-renewal")
def get_student_upcoming_renewal(email: str, db: Session = Depends(get_db)):
today = date.today()
membership = (
db.query(Membership)
.filter(Membership.user_email == email)
.filter(Membership.status == "active")
.filter(Membership.renewal_date >= today)
.order_by(Membership.renewal_date.asc())
.first()
)
if not membership:
raise HTTPException(status_code=404, detail="No upcoming renewal")
return {
"membership_id": membership.id,
"plan_name": membership.plan_name,
"status": membership.status,
"renewal_date": membership.renewal_date.isoformat(),
"days_until_renewal": (membership.renewal_date - today).days,
"price": float(membership.price),
"billing_period": membership.billing_period,
}
@app.get("/api/student/products")
def get_student_products(db: Session = Depends(get_db)):
products = db.query(Product).filter(Product.is_active == True).all() # noqa: E712
return [
{
"id": p.id,
"name": p.name,
"price": float(p.price),
"stripe_link": p.stripe_link,
"description": p.description,
"image_url": p.image_url,
}
for p in products
]
@app.get("/api/student/exam-registrations")
def get_student_exam_registrations(email: str, db: Session = Depends(get_db)):
"""Get exam registrations for a student by email."""
# Find all registrations for this email
registrations = db.query(ExamStudentRegistration).filter(
ExamStudentRegistration.student_email == email
).all()
result = []
for reg in registrations:
# Get exam details
exam = db.query(Exam).filter(Exam.id == reg.exam_id).first()
# Get batch details
batch = db.query(ExamBatch).filter(ExamBatch.id == reg.batch_id).first()
if exam and batch:
result.append({
"id": reg.id,
"exam_id": reg.exam_id,
"batch_id": reg.batch_id,
"student_id": reg.student_id,
"student_name": reg.student_name,
"student_email": reg.student_email,
"current_level": reg.current_level,
"target_level": reg.target_level,
"registration_status": reg.registration_status,
"payment_status": reg.payment_status,
"notification_sent": reg.notification_sent,
"added_by_ai": reg.added_by_ai,
"created_at": reg.created_at.isoformat() if reg.created_at else None,
"exam": {
"id": exam.id,
"name": exam.name,
"exam_date": exam.exam_date.isoformat() if exam.exam_date else None,
"exam_fee": float(exam.exam_fee),
"registration_deadline": exam.registration_deadline.isoformat() if exam.registration_deadline else None,
"stripe_link": exam.stripe_link,
"location": exam.location,
"description": exam.description,
"status": exam.status,
},
"batch": {
"id": batch.id,
"batch_name": batch.batch_name,
"time_from": batch.time_from.strftime("%H:%M:%S") if batch.time_from else None,
"time_to": batch.time_to.strftime("%H:%M:%S") if batch.time_to else None,
"max_students": batch.max_students,
}
})
return result
# ---------- Stripe webhook ----------
@app.post("/api/stripe/webhook")
async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
payload = await request.body()
sig_header = request.headers.get("stripe-signature")
try:
if settings.STRIPE_WEBHOOK_SECRET:
event = stripe.Webhook.construct_event(
payload=payload,
sig_header=sig_header,
secret=settings.STRIPE_WEBHOOK_SECRET,
)
else:
event = stripe.Event.construct_from(
await request.json(), stripe.api_key
)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid payload")
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")
# Log the event type for debugging
event_type = event.get("type")
print(f"[webhook] Received event: {event_type}")
data = event["data"]["object"]
if event_type == "checkout.session.completed":
customer_details = data.get("customer_details", {}) or {}
email = customer_details.get("email")
name = customer_details.get("name") or "Student"
# Also try to get email from other possible locations
if not email:
email = data.get("customer_email") or data.get("email")
print(f"[webhook] checkout.session.completed - email={email}, name={name}")
metadata = data.get("metadata", {}) or {}
# Extract membership-related metadata first to determine payment type
invite_token = metadata.get("invite_token")
plan_name = metadata.get("plan_name")
# Check if this payment is for an exam registration (via metadata)
exam_registration_id = metadata.get("exam_registration_id")
if exam_registration_id:
try:
reg_id = int(exam_registration_id)
registration = db.query(ExamStudentRegistration).filter(
ExamStudentRegistration.id == reg_id
).first()
if registration:
# Update registration status
registration.payment_status = "paid"
registration.registration_status = "registered"
db.commit()
print(f"[webhook] Exam payment processed via metadata: registration_id={reg_id}")
return {"received": True, "type": "exam_payment"}
except (ValueError, TypeError) as e:
print(f"[webhook] Error processing exam_registration_id: {e}")
pass # Not a valid exam registration ID, continue with membership flow
# Also check by matching payment amount and email to exam registrations
# This handles cases where metadata isn't available (static payment links)
# BUT: Only check for exams if this is NOT a membership payment (no invite_token or plan_name)
# This ensures membership payments are not incorrectly processed as exam payments
if not invite_token and not plan_name:
amount_total = data.get("amount_total") # in cents
if amount_total and email:
amount_decimal = float(amount_total) / 100.0
print(f"[webhook] Checking exam payment: email={email}, amount={amount_decimal}")
# Find pending exam registrations for this email (case-insensitive)
# Use a small tolerance for floating point comparison
# Check for both "pending" and "unpaid" statuses, and "invited" registration status
pending_regs = (
db.query(ExamStudentRegistration)
.join(Exam)
.filter(
func.lower(ExamStudentRegistration.student_email) == func.lower(email),
ExamStudentRegistration.payment_status.in_(["pending", "unpaid"]),
ExamStudentRegistration.registration_status.in_(["invited", "pending"]),
func.abs(Exam.exam_fee - amount_decimal) < 0.01 # Allow small difference for floating point
)
.order_by(ExamStudentRegistration.created_at.desc())
.all()
)
if pending_regs:
# Update the first matching registration (most recent)
registration = pending_regs[0]
registration.payment_status = "paid"
registration.registration_status = "registered"
db.commit()
print(f"[webhook] Exam payment processed via amount/email match: registration_id={registration.id}, email={email}, amount={amount_decimal}")
return {"received": True, "type": "exam_payment"}
else:
print(f"[webhook] No matching exam registration found with exact amount: email={email}, amount={amount_decimal}")
# Fallback: try to find by email only (in case amount doesn't match exactly)
# Check for both "pending" and "unpaid" statuses, and "invited" registration status
pending_by_email = (
db.query(ExamStudentRegistration)
.join(Exam)
.filter(
func.lower(ExamStudentRegistration.student_email) == func.lower(email),
ExamStudentRegistration.payment_status.in_(["pending", "unpaid"]),
ExamStudentRegistration.registration_status.in_(["invited", "pending"])
)
.order_by(ExamStudentRegistration.created_at.desc())
.all()
)
if pending_by_email:
# Update the most recent unpaid registration for this email
registration = pending_by_email[0]
exam_fee = registration.exam.exam_fee if registration.exam else None
registration.payment_status = "paid"
registration.registration_status = "registered"
db.commit()
print(f"[webhook] Exam payment processed via email match (amount mismatch): registration_id={registration.id}, email={email}, expected_amount={amount_decimal}, exam_fee={exam_fee}")
return {"received": True, "type": "exam_payment"}
else:
print(f"[webhook] No unpaid exam registrations found for email: {email}")
# Check if this payment is for reactivation
reactivation_type = metadata.get("type") == "reactivation"
if reactivation_type:
membership_id = metadata.get("membership_id")
start_date_str = metadata.get("start_date")
if membership_id and start_date_str:
try:
membership = db.query(Membership).filter(Membership.id == int(membership_id)).first()
if membership and membership.status == "cancelled":
# Reactivate the membership
start_date = datetime.fromisoformat(start_date_str.replace('Z', '+00:00')).date()
membership.status = "active"
membership.start_date = start_date
membership.cancelled_at = None
# Calculate new renewal date based on billing period
if membership.billing_period == "monthly":
membership.renewal_date = start_date + timedelta(days=30)
elif membership.billing_period == "quarterly":
membership.renewal_date = start_date + timedelta(days=90)
elif membership.billing_period == "half_yearly":
membership.renewal_date = start_date + timedelta(days=182)
elif membership.billing_period == "annual":
membership.renewal_date = start_date + timedelta(days=365)
else:
membership.renewal_date = start_date + timedelta(days=30)
db.commit()
db.refresh(membership)
print(f"[webhook] Membership reactivated: membership_id={membership_id}, start_date={start_date}")
return {"received": True, "type": "reactivation"}
except (ValueError, TypeError) as e:
print(f"[webhook] Error processing reactivation: {e}")
pass
# Check if this payment is from a direct subscription (without invite)
direct_subscription = metadata.get("direct_subscription") == "true"
if direct_subscription:
plan_id = metadata.get("plan_id")
if plan_id:
try:
plan = db.query(MembershipPlan).filter(MembershipPlan.id == int(plan_id)).first()
if plan:
# Create membership directly from plan
start = date.today()
if plan.billing_period == "monthly":
renewal = start + timedelta(days=30)
elif plan.billing_period == "quarterly":
renewal = start + timedelta(days=90)
elif plan.billing_period == "half_yearly":
renewal = start + timedelta(days=182)
elif plan.billing_period == "annual":
renewal = start + timedelta(days=365)
else:
renewal = start + timedelta(days=30)
membership = Membership(
user_email=email,
user_name=name,
plan_id=plan.id,
plan_name=plan.name,
status="active",
start_date=start,
renewal_date=renewal,
price=float(plan.price),
billing_period=plan.billing_period,
max_students=plan.max_students,
)
db.add(membership)
db.commit()
db.refresh(membership)
# Get stored student data if available
stored_data = plan_subscription_data.get(email, {}).get(int(plan_id))
if stored_data:
member_details = stored_data.get("member_details", {})
students_data = stored_data.get("students", [])
# Update membership with member details
if member_details:
membership.member_first_name = member_details.get("first_name", "")
membership.member_last_name = member_details.get("last_name", "")
membership.phone_number = member_details.get("phone_number")
membership.emergency_contact_person = member_details.get("emergency_contact_person")
membership.emergency_contact_number = member_details.get("emergency_contact_number")
# Create Student records
for student_data in students_data:
student = Student(
membership_id=membership.id,
first_name=student_data.get("first_name", ""),
last_name=student_data.get("last_name", ""),
date_of_birth=datetime.strptime(student_data["date_of_birth"], "%Y-%m-%d").date() if student_data.get("date_of_birth") else None,
gender=student_data.get("gender"),
medical_notes=student_data.get("medical_notes"),
)
db.add(student)
db.commit()
# Clean up stored data after successful processing
if email in plan_subscription_data and int(plan_id) in plan_subscription_data[email]:
del plan_subscription_data[email][int(plan_id)]
if not plan_subscription_data[email]:
del plan_subscription_data[email]
print(f"[webhook] Direct subscription payment processed with students: membership_id={membership.id}, email={email}, plan={plan.name}, students_count={len(students_data)}")
else:
print(f"[webhook] Direct subscription payment processed (no student data): membership_id={membership.id}, email={email}, plan={plan.name}")
return {"received": True, "type": "direct_subscription"}
except (ValueError, TypeError) as e:
print(f"[webhook] Error processing direct subscription: {e}")
pass
# Check if this payment is from an invite (membership payment)
# invite_token and plan_name already extracted above
if not plan_name:
plan_name = "Unknown Plan"
billing_period = metadata.get("billing_period", "monthly")
price_str = metadata.get("price", "0")
try:
price_value = float(price_str)
except ValueError:
price_value = 0.0
plan = (
db.query(MembershipPlan)
.filter(MembershipPlan.name == plan_name)
.first()
)
start = date.today()
if billing_period == "monthly":
renewal = start + timedelta(days=30)
elif billing_period == "quarterly":
renewal = start + timedelta(days=90)
elif billing_period == "half_yearly":
renewal = start + timedelta(days=182) # 6 months (approximately)
elif billing_period == "annual":
renewal = start + timedelta(days=365)
else:
renewal = start + timedelta(days=30)
membership = Membership(
user_email=email,
user_name=name,
plan_id=plan.id if plan else None,
plan_name=plan_name,
status="active",
start_date=start,
renewal_date=renewal,
price=price_value,
billing_period=billing_period,
max_students=plan.max_students if plan else 1,
)
db.add(membership)
db.commit()
db.refresh(membership)
# Check if this payment is from an invite
# First try invite_token from metadata, then try matching by email
invite = None
if invite_token:
invite = db.query(MembershipInvite).filter(
MembershipInvite.invite_token == invite_token
).first()
else:
# Try to find pending invite by email
invite = db.query(MembershipInvite).filter(
MembershipInvite.email == email,
MembershipInvite.status == "pending"
).order_by(MembershipInvite.created_at.desc()).first()
if invite and invite.students_data:
# Handle new structure with member_details and students array
if isinstance(invite.students_data, dict) and "member_details" in invite.students_data:
# New structure: { member_details: {...}, students: [...] }
member_details = invite.students_data.get("member_details", {})
students_list = invite.students_data.get("students", [])
# Update membership with member details
membership.member_first_name = member_details.get("first_name")
membership.member_last_name = member_details.get("last_name")
membership.phone_number = member_details.get("phone_number")
membership.emergency_contact_person = member_details.get("emergency_contact_person")
membership.emergency_contact_number = member_details.get("emergency_contact_number")
# Also update user_name from member details
if member_details.get("first_name") and member_details.get("last_name"):
membership.user_name = f"{member_details.get('first_name')} {member_details.get('last_name')}"
# Create student records
for student_data in students_list:
# Parse date_of_birth if provided
date_of_birth = None
if student_data.get("date_of_birth"):
try:
date_of_birth = datetime.strptime(student_data["date_of_birth"], "%Y-%m-%d").date()
except:
pass
student = Student(
membership_id=membership.id,
first_name=student_data.get("first_name", ""),
last_name=student_data.get("last_name", ""),
date_of_birth=date_of_birth,
gender=student_data.get("gender"),
medical_notes=student_data.get("medical_notes"),
)
db.add(student)
else:
# Old structure: array of student objects (backward compatibility)
students_list = invite.students_data if isinstance(invite.students_data, list) else []
for student_data in students_list:
# Try to extract name or use first_name/last_name
name = student_data.get("name", "")
first_name = student_data.get("first_name", name.split()[0] if name else "")
last_name = student_data.get("last_name", " ".join(name.split()[1:]) if len(name.split()) > 1 else "")
# Parse date_of_birth if provided
date_of_birth = None
if student_data.get("date_of_birth"):
try:
date_of_birth = datetime.strptime(student_data["date_of_birth"], "%Y-%m-%d").date()
except:
pass
student = Student(
membership_id=membership.id,
first_name=first_name,
last_name=last_name,
date_of_birth=date_of_birth,
gender=student_data.get("gender"),
medical_notes=student_data.get("medical_notes"),
)
db.add(student)
# Mark invite as completed
invite.status = "completed"
invite.completed_at = datetime.utcnow()
db.commit()
return {"received": True}
# ---------- ADMIN: Membership Invites ----------
@app.get("/api/admin/invites")
def get_admin_invites(db: Session = Depends(get_db)):
"""Get all membership invites."""
invites = db.query(MembershipInvite).order_by(MembershipInvite.created_at.desc()).all()
return [
{
"id": i.id,
"email": i.email,
"plan_id": i.plan_id,
"plan_name": i.plan_name,
"plan_price": i.plan_price,
"max_students": i.max_students,
"class_details": i.class_details,
"status": i.status,
"invite_token": i.invite_token,
"invited_by": i.invited_by,
"invite_date": i.invite_date.isoformat() if i.invite_date else None,
"completed_at": i.completed_at.isoformat() if i.completed_at else None,
"created_at": i.created_at.isoformat() if i.created_at else None,
}
for i in invites
]
@app.delete("/api/admin/invites/{invite_id}")
def delete_invite(invite_id: int, db: Session = Depends(get_db)):
"""Delete a membership invite."""
invite = db.query(MembershipInvite).filter(MembershipInvite.id == invite_id).first()
if not invite:
raise HTTPException(status_code=404, detail="Invite not found")
db.delete(invite)
db.commit()
return {"status": "ok", "message": "Invite deleted"}
@app.post("/api/admin/invites")
def create_membership_invite(invite: dict, db: Session = Depends(get_db)):
"""Create a new membership invite."""
# Generate unique token
token = secrets.token_urlsafe(32)
# Get plan details
plan = db.query(MembershipPlan).filter(MembershipPlan.id == invite.get("plan_id")).first()
if not plan:
raise HTTPException(status_code=404, detail="Plan not found")
new_invite = MembershipInvite(
email=invite["email"],
plan_id=plan.id,
plan_name=plan.name,
plan_price=str(float(plan.price)),
max_students=plan.max_students,
class_details=invite.get("class_details"),
status="pending",
invite_token=token,
invited_by=invite.get("invited_by", "admin"),
)
db.add(new_invite)
db.commit()
db.refresh(new_invite)
# Send email with student dashboard link
try:
# Construct the student dashboard URL - user will login and see pending invite
base_url = os.getenv("BASE_URL", "http://localhost:7860")
dashboard_url = f"{base_url}/student"
subject = f"Membership Invitation - {plan.name}"
body_lines = [
f"Hello,",
"",
f"You've been invited to join our Karate Dojo with the {plan.name} membership plan.",
"",
f"Plan Details:",
f"- Price: ${plan.price}/{plan.billing_period}",
f"- Maximum Students: {plan.max_students}",
"",
]
if invite.get("class_details"):
body_lines.extend([
"Classes Included:",
invite.get("class_details"),
"",
])
body_lines.extend([
"To complete your membership and add your students, please:",
"1. Log in to the student dashboard using this email address:",
f" {new_invite.email}",
"",
f"2. Click here to access your dashboard:",
dashboard_url,
"",
"3. You'll see your pending invite on the dashboard",
"4. Click on it to add student information and complete payment",
"",
"If you have any questions, please contact us.",
"",
"See you on the mat!",
])
body = "\n".join(body_lines)
send_email(new_invite.email, subject, body)
except Exception as e:
print(f"[create_invite] Failed to send invite email: {e}")
# Don't fail the request if email fails
return {
"id": new_invite.id,
"email": new_invite.email,
"plan_id": new_invite.plan_id,
"plan_name": new_invite.plan_name,
"plan_price": new_invite.plan_price,
"max_students": new_invite.max_students,
"class_details": new_invite.class_details,
"status": new_invite.status,
"invite_token": new_invite.invite_token,
"invited_by": new_invite.invited_by,
"invite_date": new_invite.invite_date.isoformat() if new_invite.invite_date else None,
}
@app.delete("/api/admin/invites/{invite_id}")
def delete_membership_invite(invite_id: int, db: Session = Depends(get_db)):
"""Delete a membership invite."""
invite = db.query(MembershipInvite).filter(MembershipInvite.id == invite_id).first()
if not invite:
raise HTTPException(status_code=404, detail="Invite not found")
db.delete(invite)
db.commit()
return {"status": "deleted"}
# ---------- PUBLIC: Invite Acceptance ----------
@app.get("/api/invite/id/{invite_id}")
def get_invite_by_id(invite_id: int, db: Session = Depends(get_db)):
"""Get invite details by ID."""
invite = db.query(MembershipInvite).filter(MembershipInvite.id == invite_id).first()
if not invite:
raise HTTPException(status_code=404, detail="Invite not found")
plan = db.query(MembershipPlan).filter(MembershipPlan.id == invite.plan_id).first() if invite.plan_id else None
return {
"id": invite.id,
"email": invite.email,
"plan_id": invite.plan_id,
"plan_name": invite.plan_name,
"plan_price": invite.plan_price,
"max_students": invite.max_students,
"class_details": invite.class_details,
"status": invite.status,
"invite_token": invite.invite_token,
"invited_by": invite.invited_by,
"invite_date": invite.invite_date.isoformat() if invite.invite_date else None,
"billing_period": plan.billing_period if plan else "monthly",
"stripe_link": plan.stripe_link if plan else None,
}
@app.get("/api/invite/{token}")
def get_invite_by_token(token: str, db: Session = Depends(get_db)):
"""Get invite details by token (public endpoint)."""
invite = db.query(MembershipInvite).filter(MembershipInvite.invite_token == token).first()
if not invite:
raise HTTPException(status_code=404, detail="Invite not found")
if invite.status != "pending":
raise HTTPException(status_code=400, detail="This invite has already been used or expired")
plan = db.query(MembershipPlan).filter(MembershipPlan.id == invite.plan_id).first()
return {
"id": invite.id,
"email": invite.email,
"plan_id": invite.plan_id,
"plan_name": invite.plan_name,
"plan_price": invite.plan_price,
"max_students": invite.max_students,
"class_details": invite.class_details,
"stripe_link": plan.stripe_link if plan else None,
"invite_token": invite.invite_token,
}
@app.post("/api/invite/{token}/students")
def save_invite_students(token: str, payload: dict, db: Session = Depends(get_db)):
"""Save student data and member details for an invite (before payment)."""
invite = db.query(MembershipInvite).filter(MembershipInvite.invite_token == token).first()
if not invite:
raise HTTPException(status_code=404, detail="Invite not found")
if invite.status != "pending":
raise HTTPException(status_code=400, detail="This invite has already been used")
member_details = payload.get("member_details", {})
students_data = payload.get("students", [])
if not students_data or len(students_data) == 0:
raise HTTPException(status_code=400, detail="At least one student is required")
if len(students_data) > invite.max_students:
raise HTTPException(status_code=400, detail=f"Maximum {invite.max_students} students allowed")
# Validate member details - check for non-empty strings after trimming
member_first_name = member_details.get("first_name", "").strip() if member_details.get("first_name") else ""
member_last_name = member_details.get("last_name", "").strip() if member_details.get("last_name") else ""
if not member_first_name or not member_last_name:
raise HTTPException(status_code=400, detail="Member first name and last name are required")
# Store both member details and students data in invite
# We'll create actual Student and Membership records after payment
invite.students_data = {
"member_details": member_details,
"students": students_data,
}
db.commit()
return {
"status": "ok",
"students_count": len(students_data),
"message": "Student information saved. Proceed to payment.",
}
@app.post("/api/invite/{token}/checkout")
def create_checkout_session(token: str, db: Session = Depends(get_db)):
"""Create a Stripe checkout session dynamically with invite token in metadata."""
invite = db.query(MembershipInvite).filter(MembershipInvite.invite_token == token).first()
if not invite:
raise HTTPException(status_code=404, detail="Invite not found")
if invite.status != "pending":
raise HTTPException(status_code=400, detail="This invite has already been used")
plan = db.query(MembershipPlan).filter(MembershipPlan.id == invite.plan_id).first()
if not plan:
raise HTTPException(status_code=400, detail="Plan not found")
# Get the base URL from settings or environment
base_url = os.getenv("BASE_URL", "http://localhost:7860")
if not base_url.startswith("http"):
base_url = f"https://{base_url}"
# Calculate price in cents
price_cents = int(float(plan.price) * 100)
try:
# Create a Stripe Checkout Session dynamically
checkout_session = stripe.checkout.Session.create(
payment_method_types=["card"],
line_items=[
{
"price_data": {
"currency": "cad",
"product_data": {
"name": f"{plan.name} Membership",
"description": plan.description or f"{plan.billing_period.capitalize()} membership plan",
},
"unit_amount": price_cents,
"recurring": {
"interval": "month" if plan.billing_period in ["monthly", "half_yearly"] else "year",
"interval_count": 6 if plan.billing_period == "half_yearly" else 1,
} if plan.billing_period in ["monthly", "half_yearly", "annual"] else None,
},
"quantity": 1,
}
],
mode="subscription" if plan.billing_period in ["monthly", "half_yearly", "annual"] else "payment",
success_url=f"{base_url}/student?payment=success",
cancel_url=f"{base_url}/student?payment=cancelled",
customer_email=invite.email,
metadata={
"invite_token": token,
"plan_name": plan.name,
"plan_id": str(plan.id),
"billing_period": plan.billing_period,
"price": str(plan.price),
"max_students": str(plan.max_students),
},
)
return {"checkout_url": checkout_session.url, "session_id": checkout_session.id}
except Exception as e:
print(f"[checkout] Error creating Stripe session: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create payment session: {str(e)}")
@app.post("/api/exam/{registration_id}/checkout")
def create_exam_checkout_session(registration_id: int, db: Session = Depends(get_db)):
"""Create a Stripe checkout session dynamically for exam registration."""
registration = db.query(ExamStudentRegistration).filter(
ExamStudentRegistration.id == registration_id
).first()
if not registration:
raise HTTPException(status_code=404, detail="Exam registration not found")
if registration.payment_status == "paid":
raise HTTPException(status_code=400, detail="This registration has already been paid")
exam = db.query(Exam).filter(Exam.id == registration.exam_id).first()
if not exam:
raise HTTPException(status_code=404, detail="Exam not found")
# Get batch name
batch = db.query(ExamBatch).filter(ExamBatch.id == registration.batch_id).first()
batch_name = batch.batch_name if batch else "Exam"
# Get the base URL from settings or environment
base_url = os.getenv("BASE_URL", "http://localhost:7860")
if not base_url.startswith("http"):
base_url = f"https://{base_url}"
# Calculate exam fee in cents
exam_fee_cents = int(float(exam.exam_fee) * 100)
try:
# Create a Stripe Checkout Session dynamically
checkout_session = stripe.checkout.Session.create(
payment_method_types=["card"],
line_items=[
{
"price_data": {
"currency": "cad",
"product_data": {
"name": f"{exam.name} - {batch_name}",
"description": f"Exam registration for {registration.student_name}",
},
"unit_amount": exam_fee_cents,
},
"quantity": 1,
}
],
mode="payment",
success_url=f"{base_url}/student/exams?payment=success",
cancel_url=f"{base_url}/student/exams?payment=cancelled",
customer_email=registration.student_email,
metadata={
"exam_registration_id": str(registration_id),
"exam_id": str(exam.id),
"batch_id": str(registration.batch_id),
"student_email": registration.student_email,
"student_name": registration.student_name,
},
)
return {"checkout_url": checkout_session.url, "session_id": checkout_session.id}
except Exception as e:
print(f"[checkout] Error creating exam Stripe session: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create payment session: {str(e)}")
# Store plan subscription data temporarily (similar to invite students_data)
# We'll use a simple in-memory store or we can create a temporary table
# For now, let's use a simple approach with a temporary storage
plan_subscription_data = {} # {email: {plan_id: {...}}}
@app.post("/api/plan/{plan_id}/students")
def save_plan_students(plan_id: int, email: str, payload: dict, db: Session = Depends(get_db)):
"""Save student data and member details for a direct plan subscription (before payment)."""
plan = db.query(MembershipPlan).filter(MembershipPlan.id == plan_id).first()
if not plan:
raise HTTPException(status_code=404, detail="Plan not found")
if not plan.is_active:
raise HTTPException(status_code=400, detail="This plan is not active")
member_details = payload.get("member_details", {})
students_data = payload.get("students", [])
if not students_data or len(students_data) == 0:
raise HTTPException(status_code=400, detail="At least one student is required")
if len(students_data) > plan.max_students:
raise HTTPException(status_code=400, detail=f"Maximum {plan.max_students} students allowed")
# Validate member details
member_first_name = member_details.get("first_name", "").strip() if member_details.get("first_name") else ""
member_last_name = member_details.get("last_name", "").strip() if member_details.get("last_name") else ""
if not member_first_name or not member_last_name:
raise HTTPException(status_code=400, detail="Member first name and last name are required")
# Store data temporarily (in production, you might want to use a database table)
if email not in plan_subscription_data:
plan_subscription_data[email] = {}
plan_subscription_data[email][plan_id] = {
"member_details": member_details,
"students": students_data,
"plan_id": plan_id,
"email": email,
}
return {
"status": "ok",
"students_count": len(students_data),
"message": "Student information saved. Proceed to payment.",
}
@app.post("/api/plan/{plan_id}/checkout")
def create_plan_checkout_session(plan_id: int, email: str = Query(..., description="Student email"), db: Session = Depends(get_db)):
"""Create a Stripe checkout session dynamically for direct plan subscription (without invite)."""
plan = db.query(MembershipPlan).filter(MembershipPlan.id == plan_id).first()
if not plan:
raise HTTPException(status_code=404, detail="Plan not found")
if not plan.is_active:
raise HTTPException(status_code=400, detail="This plan is not active")
# Get the base URL from settings or environment
base_url = os.getenv("BASE_URL", "http://localhost:7860")
if not base_url.startswith("http"):
base_url = f"https://{base_url}"
# Calculate price in cents
price_cents = int(float(plan.price) * 100)
try:
# Create a Stripe Checkout Session dynamically
checkout_session = stripe.checkout.Session.create(
payment_method_types=["card"],
line_items=[
{
"price_data": {
"currency": "cad",
"product_data": {
"name": f"{plan.name} Membership",
"description": plan.description or f"{plan.billing_period.capitalize()} membership plan",
},
"unit_amount": price_cents,
"recurring": {
"interval": "month" if plan.billing_period in ["monthly", "half_yearly"] else "year",
"interval_count": 6 if plan.billing_period == "half_yearly" else 1,
} if plan.billing_period in ["monthly", "half_yearly", "annual"] else None,
},
"quantity": 1,
}
],
mode="subscription" if plan.billing_period in ["monthly", "half_yearly", "annual"] else "payment",
success_url=f"{base_url}/student?payment=success",
cancel_url=f"{base_url}/student?payment=cancelled",
customer_email=email,
metadata={
"plan_id": str(plan.id),
"plan_name": plan.name,
"billing_period": plan.billing_period,
"price": str(plan.price),
"max_students": str(plan.max_students),
"direct_subscription": "true", # Flag to indicate this is a direct subscription, not from invite
},
)
return {"checkout_url": checkout_session.url, "session_id": checkout_session.id}
except Exception as e:
print(f"[checkout] Error creating plan Stripe session: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create payment session: {str(e)}")
# ---------- ADMIN: Students ----------
@app.get("/api/admin/students")
def get_admin_students(membership_id: int = None, db: Session = Depends(get_db)):
"""Get all students, optionally filtered by membership_id. Includes membership info."""
query = db.query(Student)
if membership_id:
query = query.filter(Student.membership_id == membership_id)
students = query.all()
result = []
for s in students:
# Get membership info
membership = db.query(Membership).filter(Membership.id == s.membership_id).first()
result.append({
"id": s.id,
"membership_id": s.membership_id,
"first_name": s.first_name,
"last_name": s.last_name,
"date_of_birth": s.date_of_birth.isoformat() if s.date_of_birth else None,
"gender": s.gender,
"medical_notes": s.medical_notes,
"created_at": s.created_at.isoformat() if s.created_at else None,
# Include membership info for display
"membership_email": membership.user_email if membership else None,
"membership_name": membership.user_name if membership else None,
"membership_plan_name": membership.plan_name if membership else None,
})
return result
@app.get("/api/admin/students/{student_id}/comments")
def get_admin_student_comments(student_id: int, db: Session = Depends(get_db)):
"""Get all comments for a student (admin view)."""
student = db.query(Student).filter(Student.id == student_id).first()
if not student:
raise HTTPException(status_code=404, detail="Student not found")
# Get all comments for this student
comments = db.query(StudentComment).filter(
StudentComment.student_id == student_id
).order_by(StudentComment.created_at.desc()).all()
comments_list = []
for c in comments:
# Extract name from email
coach_name = c.coach_email.split("@")[0].replace(".", " ").title()
comments_list.append({
"id": c.id,
"coach_email": c.coach_email,
"coach_name": coach_name,
"comment": c.comment,
"created_at": c.created_at.isoformat(),
})
return comments_list
@app.post("/api/admin/students")
def create_student(student: dict, db: Session = Depends(get_db)):
"""Create a new student."""
from datetime import datetime as dt
date_of_birth = None
if student.get("date_of_birth"):
try:
date_of_birth = dt.strptime(student["date_of_birth"], "%Y-%m-%d").date()
except:
pass
new_student = Student(
membership_id=student["membership_id"],
first_name=student["first_name"],
last_name=student["last_name"],
date_of_birth=date_of_birth,
gender=student.get("gender"),
medical_notes=student.get("medical_notes"),
)
db.add(new_student)
db.commit()
db.refresh(new_student)
return {
"id": new_student.id,
"membership_id": new_student.membership_id,
"first_name": new_student.first_name,
"last_name": new_student.last_name,
"date_of_birth": new_student.date_of_birth.isoformat() if new_student.date_of_birth else None,
"gender": new_student.gender,
"medical_notes": new_student.medical_notes,
"created_at": new_student.created_at.isoformat() if new_student.created_at else None,
}
@app.put("/api/admin/students/{student_id}")
def update_student(student_id: int, student: dict, db: Session = Depends(get_db)):
"""Update a student."""
from datetime import datetime as dt
db_student = db.query(Student).filter(Student.id == student_id).first()
if not db_student:
raise HTTPException(status_code=404, detail="Student not found")
if "first_name" in student:
db_student.first_name = student["first_name"]
if "last_name" in student:
db_student.last_name = student["last_name"]
if "date_of_birth" in student:
if student["date_of_birth"]:
try:
db_student.date_of_birth = dt.strptime(student["date_of_birth"], "%Y-%m-%d").date()
except:
pass
else:
db_student.date_of_birth = None
if "gender" in student:
db_student.gender = student.get("gender")
if "medical_notes" in student:
db_student.medical_notes = student.get("medical_notes")
db.commit()
db.refresh(db_student)
return {
"id": db_student.id,
"membership_id": db_student.membership_id,
"first_name": db_student.first_name,
"last_name": db_student.last_name,
"date_of_birth": db_student.date_of_birth.isoformat() if db_student.date_of_birth else None,
"gender": db_student.gender,
"medical_notes": db_student.medical_notes,
"created_at": db_student.created_at.isoformat() if db_student.created_at else None,
}
@app.delete("/api/admin/students/{student_id}")
def delete_student(student_id: int, db: Session = Depends(get_db)):
"""Delete a student."""
db_student = db.query(Student).filter(Student.id == student_id).first()
if not db_student:
raise HTTPException(status_code=404, detail="Student not found")
db.delete(db_student)
db.commit()
return {"status": "deleted"}
# ---------- ADMIN: Exams ----------
@app.get("/api/admin/exams", response_model=List[ExamWithBatches])
def get_admin_exams(db: Session = Depends(get_db)):
"""Get all exams with their batches and registrations."""
exams = db.query(Exam).order_by(Exam.exam_date.desc()).all()
result = []
for exam in exams:
batches = db.query(ExamBatch).filter(ExamBatch.exam_id == exam.id).all()
batch_data = []
for batch in batches:
registrations = db.query(ExamStudentRegistration).filter(
ExamStudentRegistration.batch_id == batch.id
).all()
batch_data.append({
**ExamBatchOut.model_validate(batch).model_dump(),
"registrations": [ExamStudentRegistrationOut.model_validate(r).model_dump() for r in registrations]
})
result.append({
**ExamOut.model_validate(exam).model_dump(),
"batches": batch_data
})
return result
@app.post("/api/admin/exams", response_model=ExamOut, status_code=201)
def create_exam(exam: ExamCreate, db: Session = Depends(get_db)):
"""Create a new exam."""
db_exam = Exam(**exam.model_dump())
db.add(db_exam)
db.commit()
db.refresh(db_exam)
return db_exam
@app.put("/api/admin/exams/{exam_id}", response_model=ExamOut)
def update_exam(exam_id: int, exam_update: ExamUpdate, db: Session = Depends(get_db)):
"""Update an exam."""
db_exam = db.query(Exam).filter(Exam.id == exam_id).first()
if not db_exam:
raise HTTPException(status_code=404, detail="Exam not found")
update_data = exam_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_exam, key, value)
db.commit()
db.refresh(db_exam)
return db_exam
@app.delete("/api/admin/exams/{exam_id}", status_code=204)
def delete_exam(exam_id: int, db: Session = Depends(get_db)):
"""Delete an exam and all its batches and registrations."""
db_exam = db.query(Exam).filter(Exam.id == exam_id).first()
if not db_exam:
raise HTTPException(status_code=404, detail="Exam not found")
db.delete(db_exam)
db.commit()
return None
# ---------- ADMIN: Exam Batches ----------
@app.get("/api/admin/exam-batches", response_model=List[ExamBatchOut])
def get_exam_batches(exam_id: int = None, db: Session = Depends(get_db)):
"""Get exam batches, optionally filtered by exam_id."""
query = db.query(ExamBatch)
if exam_id:
query = query.filter(ExamBatch.exam_id == exam_id)
return query.all()
@app.post("/api/admin/exam-batches", response_model=ExamBatchOut, status_code=201)
def create_exam_batch(batch: ExamBatchCreate, db: Session = Depends(get_db)):
"""Create a new exam batch."""
# Verify exam exists
exam = db.query(Exam).filter(Exam.id == batch.exam_id).first()
if not exam:
raise HTTPException(status_code=404, detail="Exam not found")
db_batch = ExamBatch(**batch.model_dump())
db.add(db_batch)
db.commit()
db.refresh(db_batch)
return db_batch
@app.put("/api/admin/exam-batches/{batch_id}", response_model=ExamBatchOut)
def update_exam_batch(batch_id: int, batch_update: ExamBatchUpdate, db: Session = Depends(get_db)):
"""Update an exam batch."""
db_batch = db.query(ExamBatch).filter(ExamBatch.id == batch_id).first()
if not db_batch:
raise HTTPException(status_code=404, detail="Batch not found")
update_data = batch_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_batch, key, value)
db.commit()
db.refresh(db_batch)
return db_batch
@app.delete("/api/admin/exam-batches/{batch_id}", status_code=204)
def delete_exam_batch(batch_id: int, db: Session = Depends(get_db)):
"""Delete an exam batch and all its registrations."""
db_batch = db.query(ExamBatch).filter(ExamBatch.id == batch_id).first()
if not db_batch:
raise HTTPException(status_code=404, detail="Batch not found")
db.delete(db_batch)
db.commit()
return None
# ---------- ADMIN: Exam Student Registrations ----------
@app.get("/api/admin/exam-registrations", response_model=List[ExamStudentRegistrationOut])
def get_exam_registrations(exam_id: int = None, batch_id: int = None, db: Session = Depends(get_db)):
"""Get exam registrations, optionally filtered by exam_id or batch_id."""
query = db.query(ExamStudentRegistration)
if exam_id:
query = query.filter(ExamStudentRegistration.exam_id == exam_id)
if batch_id:
query = query.filter(ExamStudentRegistration.batch_id == batch_id)
return query.all()
@app.post("/api/admin/exam-registrations", response_model=ExamStudentRegistrationOut, status_code=201)
def create_exam_registration(registration: ExamStudentRegistrationCreate, db: Session = Depends(get_db)):
"""Create a new exam student registration."""
# Verify exam and batch exist
exam = db.query(Exam).filter(Exam.id == registration.exam_id).first()
if not exam:
raise HTTPException(status_code=404, detail="Exam not found")
batch = db.query(ExamBatch).filter(ExamBatch.id == registration.batch_id).first()
if not batch:
raise HTTPException(status_code=404, detail="Batch not found")
# If student_id is provided, verify it exists and populate student info
if registration.student_id:
student = db.query(Student).filter(Student.id == registration.student_id).first()
if student:
# Get student's belt level from membership or student data
# For now, we'll use what's provided in the registration
pass
db_registration = ExamStudentRegistration(**registration.model_dump())
db.add(db_registration)
db.commit()
db.refresh(db_registration)
return db_registration
@app.put("/api/admin/exam-registrations/{registration_id}", response_model=ExamStudentRegistrationOut)
def update_exam_registration(registration_id: int, registration_update: ExamStudentRegistrationUpdate, db: Session = Depends(get_db)):
"""Update an exam registration."""
db_registration = db.query(ExamStudentRegistration).filter(ExamStudentRegistration.id == registration_id).first()
if not db_registration:
raise HTTPException(status_code=404, detail="Registration not found")
update_data = registration_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_registration, key, value)
db.commit()
db.refresh(db_registration)
return db_registration
@app.delete("/api/admin/exam-registrations/{registration_id}", status_code=204)
def delete_exam_registration(registration_id: int, db: Session = Depends(get_db)):
"""Delete an exam registration."""
db_registration = db.query(ExamStudentRegistration).filter(ExamStudentRegistration.id == registration_id).first()
if not db_registration:
raise HTTPException(status_code=404, detail="Registration not found")
db.delete(db_registration)
db.commit()
return None
@app.post("/api/admin/exam-registrations/notify")
def notify_exam_registrations(registration_ids: List[int], db: Session = Depends(get_db)):
"""Mark registrations as notified (send notification emails)."""
registrations = db.query(ExamStudentRegistration).filter(
ExamStudentRegistration.id.in_(registration_ids)
).all()
for reg in registrations:
reg.notification_sent = True
db.commit()
return {"notified_count": len(registrations)}
# ---------- AI endpoints (dummy) ----------
@app.post("/api/ai/admin")
def admin_ai_dummy():
return {
"answer": "This is a placeholder answer from the Admin AI assistant.",
"insights": [
"Beginner Monthly has strong growth.",
"Advanced Monthly has slightly higher churn.",
],
"suggested_actions": [
"Send renewal reminders 7 days before due date.",
"Offer a free trial class to at-risk members.",
],
}
@app.post("/api/ai/student")
def student_ai_dummy():
return {
"answer": "This is a placeholder answer from the Student AI assistant. Your next renewal is on 2025-02-01 for $99."
}
# ---------- SPA helpers & routes ----------
def _serve_index() -> FileResponse:
"""Return the built React index.html."""
index_path = os.path.join(STATIC_DIR, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path, media_type="text/html")
raise HTTPException(status_code=404, detail="Index not found")
@app.get("/", include_in_schema=False)
async def serve_index_root():
return _serve_index()
@app.get("/login", include_in_schema=False)
@app.get("/student", include_in_schema=False)
@app.get("/admin", include_in_schema=False)
@app.get("/admin/members", include_in_schema=False)
@app.get("/admin/classes", include_in_schema=False)
@app.get("/admin/exams", include_in_schema=False)
async def serve_index_named_routes():
return _serve_index()
@app.get("/{full_path:path}", include_in_schema=False)
async def spa_fallback(full_path: str):
"""
Catch-all route for React Router (e.g. /something-else).
Let /api/* and /assets/* fall through to their own handlers.
"""
if full_path.startswith("api/") or full_path.startswith("assets/") or "." in full_path:
raise HTTPException(status_code=404, detail="Not Found")
return _serve_index()