Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, status, Request | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from db.mongo import get_user_db | |
| from core.security import ( | |
| create_access_token, | |
| verify_token, | |
| get_password_hash, | |
| verify_password | |
| ) | |
| from models.schemas import ( | |
| TokenResponse, | |
| UserCreate, | |
| UserInDB, | |
| UserResponse | |
| ) | |
| from datetime import timedelta | |
| import logging | |
| from core.config import settings | |
| router = APIRouter() | |
| logger = logging.getLogger(__name__) | |
| async def login( | |
| request: Request, | |
| form_data: OAuth2PasswordRequestForm = Depends() | |
| ): | |
| logger.info(f"Login attempt from {request.client.host}") | |
| user_db = await get_user_db() | |
| user = await user_db.find_one({"email": form_data.username.lower()}) | |
| if not user or not verify_password(form_data.password, user["password"]): | |
| logger.warning(f"Failed login attempt for {form_data.username}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect email or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) | |
| access_token = create_access_token( | |
| data={"sub": user["email"], "role": user.get("role", "user")}, | |
| expires_delta=access_token_expires | |
| ) | |
| logger.info(f"Successful login for {user['email']}") | |
| return { | |
| "access_token": access_token, | |
| "token_type": "bearer", | |
| "expires_in": access_token_expires.total_seconds() | |
| } | |
| async def signup(user_in: UserCreate): | |
| logger.info(f"Signup attempt for {user_in.email}") | |
| user_db = await get_user_db() | |
| if await user_db.find_one({"email": user_in.email.lower()}): | |
| logger.warning(f"Email already registered: {user_in.email}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email already registered" | |
| ) | |
| hashed_password = get_password_hash(user_in.password) | |
| db_user = UserInDB( | |
| **user_in.dict(), | |
| password=hashed_password, | |
| role="user", | |
| created_at=datetime.utcnow(), | |
| updated_at=datetime.utcnow() | |
| ) | |
| try: | |
| result = await user_db.insert_one(db_user.dict()) | |
| logger.info(f"User created: {user_in.email}") | |
| return UserResponse( | |
| id=str(result.inserted_id), | |
| email=db_user.email, | |
| full_name=db_user.full_name, | |
| role=db_user.role | |
| ) | |
| except Exception as e: | |
| logger.error(f"User creation failed: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="User creation failed" | |
| ) | |
| async def read_users_me( | |
| request: Request, | |
| current_user: UserInDB = Depends(verify_token) | |
| ): | |
| logger.info(f"User profile request for {current_user.sub}") | |
| user_db = await get_user_db() | |
| user = await user_db.find_one({"email": current_user.sub}) | |
| if not user: | |
| logger.error(f"User not found: {current_user.sub}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| return UserResponse( | |
| id=str(user["_id"]), | |
| email=user["email"], | |
| full_name=user.get("full_name", ""), | |
| role=user.get("role", "user") | |
| ) |