fpl-solver / auth.py
AnayShukla's picture
Clean Production Release
f7cecf3
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
import bcrypt
from jose import jwt
from datetime import datetime, timedelta
from google.oauth2 import id_token
from google.auth.transport import requests
from sqlalchemy.orm.attributes import flag_modified
from database import User, get_db
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
router = APIRouter(prefix="/api/auth", tags=["auth"])
# --- SECURITY CONFIG ---
SECRET_KEY = "super_secret_luigi_key_change_this_later_in_production"
ALGORITHM = "HS256"
# You will get this ID from Google Cloud Console later
GOOGLE_CLIENT_ID = (
"525088967752-vhdm44u6qddh5ldot4p1hibe1k0f7mk2.apps.googleusercontent.com"
)
# --- PYDANTIC MODELS (Payloads) ---
class UserCreate(BaseModel):
email: str
password: str
class UserLogin(BaseModel):
email: str
password: str
class GoogleLogin(BaseModel):
token: str
# --- HELPER FUNCTIONS ---
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=7) # Stay logged in for 7 days
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def check_admin_status(email: str):
"""The magic function that makes you God"""
if email.lower() == "anayshukla11@gmail.com":
return True
return False
# --- ROUTES ---
@router.post("/register")
def register_user(user: UserCreate, db: Session = Depends(get_db)):
# 1. Check if email exists
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
# 2. Hash password directly with bcrypt
salt = bcrypt.gensalt()
hashed_pw = bcrypt.hashpw(user.password.encode("utf-8"), salt).decode("utf-8")
# 3. Check if it is the admin email
is_admin = check_admin_status(user.email)
# 4. Save to DB
new_user = User(email=user.email, hashed_password=hashed_pw, is_admin=is_admin)
db.add(new_user)
db.commit()
db.refresh(new_user)
# 5. Issue Token
token = create_access_token(
{"sub": new_user.email, "role": "admin" if is_admin else "user"}
)
return {
"access_token": token,
"token_type": "bearer",
"email": new_user.email,
"is_admin": is_admin,
}
@router.post("/login")
def login_user(user: UserLogin, db: Session = Depends(get_db)):
# 1. Fetch user
db_user = db.query(User).filter(User.email == user.email).first()
if not db_user or not db_user.hashed_password:
raise HTTPException(status_code=401, detail="Invalid credentials")
# 2. Verify password directly with bcrypt
if not bcrypt.checkpw(
user.password.encode("utf-8"), db_user.hashed_password.encode("utf-8")
):
raise HTTPException(status_code=401, detail="Invalid credentials")
# 3. Issue Token
token = create_access_token(
{"sub": db_user.email, "role": "admin" if db_user.is_admin else "user"}
)
return {
"access_token": token,
"token_type": "bearer",
"email": db_user.email,
"is_admin": db_user.is_admin,
}
@router.post("/google")
def google_auth(payload: GoogleLogin, db: Session = Depends(get_db)):
try:
# Verify the token Google's frontend sent us
# THE FIX: Added clock_skew_in_seconds=10 to forgive slight time differences!
idinfo = id_token.verify_oauth2_token(
payload.token,
requests.Request(),
GOOGLE_CLIENT_ID,
clock_skew_in_seconds=10,
)
email = idinfo["email"]
# Check if user exists
db_user = db.query(User).filter(User.email == email).first()
# If they don't exist, register them silently via Google
if not db_user:
is_admin = check_admin_status(email)
db_user = User(
email=email, is_admin=is_admin
) # No password needed for Google auth
db.add(db_user)
db.commit()
db.refresh(db_user)
token = create_access_token(
{"sub": db_user.email, "role": "admin" if db_user.is_admin else "user"}
)
return {
"access_token": token,
"token_type": "bearer",
"email": db_user.email,
"is_admin": db_user.is_admin,
}
except ValueError as e:
# THIS WILL TELL YOU EXACTLY WHY IT FAILED
print(f"GOOGLE AUTH ERROR: {str(e)}")
raise HTTPException(status_code=401, detail=f"Google Error: {str(e)}")
def get_current_user(
token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email = payload.get("sub")
if email is None:
raise HTTPException(status_code=401)
except:
raise HTTPException(status_code=401)
user = db.query(User).filter(User.email == email).first()
if user is None:
raise HTTPException(status_code=401)
return user
@router.get("/me")
def get_user_me(current_user: User = Depends(get_current_user)):
return {
"email": current_user.email,
"is_admin": current_user.is_admin,
"default_team_id": current_user.default_team_id,
"saved_edits": current_user.saved_edits,
"drafts": current_user.drafts, # <-- NEW: Send realities to React
}
@router.post("/save_session")
def save_session(
payload: dict,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
if "default_team_id" in payload:
current_user.default_team_id = payload["default_team_id"]
if "saved_edits" in payload:
current_user.saved_edits = payload["saved_edits"]
flag_modified(current_user, "saved_edits")
# THE FIX: Catch and permanently save the Multiverse array
if "drafts" in payload:
current_user.drafts = payload["drafts"]
flag_modified(current_user, "drafts")
db.commit()
return {"status": "success"}