Spaces:
Running
Running
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from sqlalchemy.orm import Session | |
| from pydantic import BaseModel | |
| import bcrypt | |
| from jose import jwt | |
| from datetime import datetime, timedelta | |
| from google.oauth2 import id_token | |
| from google.auth.transport import requests | |
| from sqlalchemy.orm.attributes import flag_modified | |
| from database import User, get_db | |
| from fastapi.security import OAuth2PasswordBearer | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login") | |
| router = APIRouter(prefix="/api/auth", tags=["auth"]) | |
| # --- SECURITY CONFIG --- | |
| SECRET_KEY = "super_secret_luigi_key_change_this_later_in_production" | |
| ALGORITHM = "HS256" | |
| # You will get this ID from Google Cloud Console later | |
| GOOGLE_CLIENT_ID = ( | |
| "525088967752-vhdm44u6qddh5ldot4p1hibe1k0f7mk2.apps.googleusercontent.com" | |
| ) | |
| # --- PYDANTIC MODELS (Payloads) --- | |
| class UserCreate(BaseModel): | |
| email: str | |
| password: str | |
| class UserLogin(BaseModel): | |
| email: str | |
| password: str | |
| class GoogleLogin(BaseModel): | |
| token: str | |
| # --- HELPER FUNCTIONS --- | |
| def create_access_token(data: dict): | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + timedelta(days=7) # Stay logged in for 7 days | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| def check_admin_status(email: str): | |
| """The magic function that makes you God""" | |
| if email.lower() == "anayshukla11@gmail.com": | |
| return True | |
| return False | |
| # --- ROUTES --- | |
| def register_user(user: UserCreate, db: Session = Depends(get_db)): | |
| # 1. Check if email exists | |
| db_user = db.query(User).filter(User.email == user.email).first() | |
| if db_user: | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| # 2. Hash password directly with bcrypt | |
| salt = bcrypt.gensalt() | |
| hashed_pw = bcrypt.hashpw(user.password.encode("utf-8"), salt).decode("utf-8") | |
| # 3. Check if it is the admin email | |
| is_admin = check_admin_status(user.email) | |
| # 4. Save to DB | |
| new_user = User(email=user.email, hashed_password=hashed_pw, is_admin=is_admin) | |
| db.add(new_user) | |
| db.commit() | |
| db.refresh(new_user) | |
| # 5. Issue Token | |
| token = create_access_token( | |
| {"sub": new_user.email, "role": "admin" if is_admin else "user"} | |
| ) | |
| return { | |
| "access_token": token, | |
| "token_type": "bearer", | |
| "email": new_user.email, | |
| "is_admin": is_admin, | |
| } | |
| def login_user(user: UserLogin, db: Session = Depends(get_db)): | |
| # 1. Fetch user | |
| db_user = db.query(User).filter(User.email == user.email).first() | |
| if not db_user or not db_user.hashed_password: | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| # 2. Verify password directly with bcrypt | |
| if not bcrypt.checkpw( | |
| user.password.encode("utf-8"), db_user.hashed_password.encode("utf-8") | |
| ): | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| # 3. Issue Token | |
| token = create_access_token( | |
| {"sub": db_user.email, "role": "admin" if db_user.is_admin else "user"} | |
| ) | |
| return { | |
| "access_token": token, | |
| "token_type": "bearer", | |
| "email": db_user.email, | |
| "is_admin": db_user.is_admin, | |
| } | |
| def google_auth(payload: GoogleLogin, db: Session = Depends(get_db)): | |
| try: | |
| # Verify the token Google's frontend sent us | |
| # THE FIX: Added clock_skew_in_seconds=10 to forgive slight time differences! | |
| idinfo = id_token.verify_oauth2_token( | |
| payload.token, | |
| requests.Request(), | |
| GOOGLE_CLIENT_ID, | |
| clock_skew_in_seconds=10, | |
| ) | |
| email = idinfo["email"] | |
| # Check if user exists | |
| db_user = db.query(User).filter(User.email == email).first() | |
| # If they don't exist, register them silently via Google | |
| if not db_user: | |
| is_admin = check_admin_status(email) | |
| db_user = User( | |
| email=email, is_admin=is_admin | |
| ) # No password needed for Google auth | |
| db.add(db_user) | |
| db.commit() | |
| db.refresh(db_user) | |
| token = create_access_token( | |
| {"sub": db_user.email, "role": "admin" if db_user.is_admin else "user"} | |
| ) | |
| return { | |
| "access_token": token, | |
| "token_type": "bearer", | |
| "email": db_user.email, | |
| "is_admin": db_user.is_admin, | |
| } | |
| except ValueError as e: | |
| # THIS WILL TELL YOU EXACTLY WHY IT FAILED | |
| print(f"GOOGLE AUTH ERROR: {str(e)}") | |
| raise HTTPException(status_code=401, detail=f"Google Error: {str(e)}") | |
| def get_current_user( | |
| token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) | |
| ): | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| email = payload.get("sub") | |
| if email is None: | |
| raise HTTPException(status_code=401) | |
| except: | |
| raise HTTPException(status_code=401) | |
| user = db.query(User).filter(User.email == email).first() | |
| if user is None: | |
| raise HTTPException(status_code=401) | |
| return user | |
| def get_user_me(current_user: User = Depends(get_current_user)): | |
| return { | |
| "email": current_user.email, | |
| "is_admin": current_user.is_admin, | |
| "default_team_id": current_user.default_team_id, | |
| "saved_edits": current_user.saved_edits, | |
| "drafts": current_user.drafts, # <-- NEW: Send realities to React | |
| } | |
| def save_session( | |
| payload: dict, | |
| current_user: User = Depends(get_current_user), | |
| db: Session = Depends(get_db), | |
| ): | |
| if "default_team_id" in payload: | |
| current_user.default_team_id = payload["default_team_id"] | |
| if "saved_edits" in payload: | |
| current_user.saved_edits = payload["saved_edits"] | |
| flag_modified(current_user, "saved_edits") | |
| # THE FIX: Catch and permanently save the Multiverse array | |
| if "drafts" in payload: | |
| current_user.drafts = payload["drafts"] | |
| flag_modified(current_user, "drafts") | |
| db.commit() | |
| return {"status": "success"} | |