Spaces:
Running
Running
File size: 6,230 Bytes
f7cecf3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
import bcrypt
from jose import jwt
from datetime import datetime, timedelta
from google.oauth2 import id_token
from google.auth.transport import requests
from sqlalchemy.orm.attributes import flag_modified
from database import User, get_db
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
router = APIRouter(prefix="/api/auth", tags=["auth"])
# --- SECURITY CONFIG ---
SECRET_KEY = "super_secret_luigi_key_change_this_later_in_production"
ALGORITHM = "HS256"
# You will get this ID from Google Cloud Console later
GOOGLE_CLIENT_ID = (
"525088967752-vhdm44u6qddh5ldot4p1hibe1k0f7mk2.apps.googleusercontent.com"
)
# --- PYDANTIC MODELS (Payloads) ---
class UserCreate(BaseModel):
email: str
password: str
class UserLogin(BaseModel):
email: str
password: str
class GoogleLogin(BaseModel):
token: str
# --- HELPER FUNCTIONS ---
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=7) # Stay logged in for 7 days
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def check_admin_status(email: str):
"""The magic function that makes you God"""
if email.lower() == "anayshukla11@gmail.com":
return True
return False
# --- ROUTES ---
@router.post("/register")
def register_user(user: UserCreate, db: Session = Depends(get_db)):
# 1. Check if email exists
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
# 2. Hash password directly with bcrypt
salt = bcrypt.gensalt()
hashed_pw = bcrypt.hashpw(user.password.encode("utf-8"), salt).decode("utf-8")
# 3. Check if it is the admin email
is_admin = check_admin_status(user.email)
# 4. Save to DB
new_user = User(email=user.email, hashed_password=hashed_pw, is_admin=is_admin)
db.add(new_user)
db.commit()
db.refresh(new_user)
# 5. Issue Token
token = create_access_token(
{"sub": new_user.email, "role": "admin" if is_admin else "user"}
)
return {
"access_token": token,
"token_type": "bearer",
"email": new_user.email,
"is_admin": is_admin,
}
@router.post("/login")
def login_user(user: UserLogin, db: Session = Depends(get_db)):
# 1. Fetch user
db_user = db.query(User).filter(User.email == user.email).first()
if not db_user or not db_user.hashed_password:
raise HTTPException(status_code=401, detail="Invalid credentials")
# 2. Verify password directly with bcrypt
if not bcrypt.checkpw(
user.password.encode("utf-8"), db_user.hashed_password.encode("utf-8")
):
raise HTTPException(status_code=401, detail="Invalid credentials")
# 3. Issue Token
token = create_access_token(
{"sub": db_user.email, "role": "admin" if db_user.is_admin else "user"}
)
return {
"access_token": token,
"token_type": "bearer",
"email": db_user.email,
"is_admin": db_user.is_admin,
}
@router.post("/google")
def google_auth(payload: GoogleLogin, db: Session = Depends(get_db)):
try:
# Verify the token Google's frontend sent us
# THE FIX: Added clock_skew_in_seconds=10 to forgive slight time differences!
idinfo = id_token.verify_oauth2_token(
payload.token,
requests.Request(),
GOOGLE_CLIENT_ID,
clock_skew_in_seconds=10,
)
email = idinfo["email"]
# Check if user exists
db_user = db.query(User).filter(User.email == email).first()
# If they don't exist, register them silently via Google
if not db_user:
is_admin = check_admin_status(email)
db_user = User(
email=email, is_admin=is_admin
) # No password needed for Google auth
db.add(db_user)
db.commit()
db.refresh(db_user)
token = create_access_token(
{"sub": db_user.email, "role": "admin" if db_user.is_admin else "user"}
)
return {
"access_token": token,
"token_type": "bearer",
"email": db_user.email,
"is_admin": db_user.is_admin,
}
except ValueError as e:
# THIS WILL TELL YOU EXACTLY WHY IT FAILED
print(f"GOOGLE AUTH ERROR: {str(e)}")
raise HTTPException(status_code=401, detail=f"Google Error: {str(e)}")
def get_current_user(
token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email = payload.get("sub")
if email is None:
raise HTTPException(status_code=401)
except:
raise HTTPException(status_code=401)
user = db.query(User).filter(User.email == email).first()
if user is None:
raise HTTPException(status_code=401)
return user
@router.get("/me")
def get_user_me(current_user: User = Depends(get_current_user)):
return {
"email": current_user.email,
"is_admin": current_user.is_admin,
"default_team_id": current_user.default_team_id,
"saved_edits": current_user.saved_edits,
"drafts": current_user.drafts, # <-- NEW: Send realities to React
}
@router.post("/save_session")
def save_session(
payload: dict,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
if "default_team_id" in payload:
current_user.default_team_id = payload["default_team_id"]
if "saved_edits" in payload:
current_user.saved_edits = payload["saved_edits"]
flag_modified(current_user, "saved_edits")
# THE FIX: Catch and permanently save the Multiverse array
if "drafts" in payload:
current_user.drafts = payload["drafts"]
flag_modified(current_user, "drafts")
db.commit()
return {"status": "success"}
|