Spaces:
Sleeping
Sleeping
File size: 3,755 Bytes
08af9fd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from pydantic import BaseModel
import uuid
from datetime import datetime, timezone
from ..database.database import get_session
from ..models.user import User, Account, UserCreate
from ..services.auth_service import AuthUtils, pwd_context, jwt, SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, Token
router = APIRouter()
class LoginRequest(BaseModel):
email: str
password: str
@router.post("/register", response_model=Token)
async def register(user_data: UserCreate, session: Session = Depends(get_session)):
# 1. Check if user exists
statement = select(User).where(User.email == user_data.email)
existing_user = session.exec(statement).first()
if existing_user:
raise HTTPException(status_code=400, detail="User already registered")
# 2. Create User
user_id = str(uuid.uuid4())
new_user = User(
id=user_id,
email=user_data.email,
createdAt=datetime.now(timezone.utc),
updatedAt=datetime.now(timezone.utc)
)
session.add(new_user)
# 3. Create Account (for password storage)
hashed_password = pwd_context.hash(user_data.password)
new_account = Account(
id=str(uuid.uuid4()),
userId=user_id,
accountId=user_data.email,
providerId="credential",
password=hashed_password,
createdAt=datetime.now(timezone.utc),
updatedAt=datetime.now(timezone.utc)
)
session.add(new_account)
session.commit()
session.refresh(new_user)
# 4. Generate JWT
access_token = jwt.encode(
{"sub": user_id},
SECRET_KEY,
algorithm=ALGORITHM
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/login", response_model=Token)
async def login(login_data: LoginRequest, session: Session = Depends(get_session)):
print(f"DEBUG: Login attempt for email: {login_data.email}")
# 1. Find User
statement = select(User).where(User.email == login_data.email)
user = session.exec(statement).first()
if not user:
print(f"DEBUG: User not found for email: {login_data.email}")
raise HTTPException(status_code=401, detail="Invalid email or password")
print(f"DEBUG: User found: {user.id}")
# 2. Find Credential Account and check password
acc_statement = select(Account).where(Account.userId == user.id).where(Account.providerId == "credential")
account = session.exec(acc_statement).first()
if not account:
print(f"DEBUG: No account found for user ID: {user.id}")
raise HTTPException(status_code=401, detail="Invalid email or password")
if not account.password:
print(f"DEBUG: Account found but has no password (maybe Social login?)")
raise HTTPException(status_code=401, detail="Invalid email or password")
print(f"DEBUG: Password hash in DB: {account.password[:10]}...")
# 3. Check password using Hybrid Verification (Bcrypt + Scrypt)
try:
is_valid = AuthUtils.verify_password(login_data.password, account.password)
print(f"DEBUG: Hybrid Password verification result: {is_valid}")
if not is_valid:
raise HTTPException(status_code=401, detail="Invalid email or password")
except Exception as e:
print(f"DEBUG: Password verification error: {str(e)}")
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email or password")
# 4. Generate JWT
access_token = jwt.encode(
{"sub": user.id},
SECRET_KEY,
algorithm=ALGORITHM
)
return {"access_token": access_token, "token_type": "bearer"} |