Spaces:
Runtime error
Runtime error
| from datetime import timedelta | |
| from typing import Annotated | |
| from fastapi import APIRouter, Depends, status | |
| from fastapi.responses import JSONResponse | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from passlib.context import CryptContext | |
| from sqlalchemy.orm import Session | |
| from db.models import User | |
| from db.database import get_db | |
| from api.auth import get_current_user, create_access_token | |
| from service.dto import CreateUserRequest, UserVerification, Token | |
| router = APIRouter(tags=["User"]) | |
| bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| db_dependency = Annotated[Session, Depends(get_db)] | |
| user_dependency = Annotated[dict, Depends(get_current_user)] | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 43200 | |
| async def login_for_access_token( | |
| login_data: Annotated[OAuth2PasswordRequestForm, Depends()], | |
| db: Session = Depends(get_db), | |
| ): | |
| user = db.query(User).filter(User.username == login_data.username).first() | |
| if not user or not bcrypt_context.verify(login_data.password, user.password_hash): | |
| return JSONResponse( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| content="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| access_token = create_access_token( | |
| user.username, | |
| user.name, | |
| user.id, | |
| user.role_id, | |
| access_token_expires, | |
| user.email, | |
| ) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| except Exception as e: | |
| print(e) | |
| return JSONResponse(status_code=500, content="An error occuring when login") | |
| async def get_user(user: user_dependency): | |
| if user is None: | |
| return JSONResponse(status_code=401, content="Authentication Failed") | |
| return { | |
| "username": user.get("username"), | |
| "name": user.get("name"), | |
| "id": user.get("id"), | |
| "email": user.get("email"), | |
| "role": user.get("role_id"), | |
| } | |
| async def get_all_users(user: user_dependency, db: Session = Depends(get_db)): | |
| # Check if the current user has an admin role | |
| if user.get("role_id") != 1: # Adjust this check based on how roles are represented | |
| return JSONResponse(status_code=401, content="Authentication Failed") | |
| # Query the database to retrieve all users | |
| users = db.query( | |
| User | |
| ).all() # Assuming you have a User model with an SQLAlchemy session | |
| return [ | |
| { | |
| "id": user.id, | |
| "username": user.username, | |
| "name": user.name, | |
| "email": user.email, | |
| "role": user.role_id, | |
| } | |
| for user in users | |
| ] | |
| async def register_user(db: db_dependency, create_user_request: CreateUserRequest): | |
| existing_user = ( | |
| db.query(User).filter(User.email == create_user_request.email).first() | |
| ) | |
| if existing_user: | |
| return JSONResponse(status_code=400, content="Email is already registered") | |
| try: | |
| password_hash = bcrypt_context.hash(create_user_request.password) | |
| create_user_model = User( | |
| name=create_user_request.name, | |
| username=create_user_request.username, | |
| email=create_user_request.email, | |
| role_id=create_user_request.role_id, | |
| password_hash=password_hash, | |
| ) | |
| db.add(create_user_model) | |
| db.commit() | |
| db.refresh(create_user_model) | |
| return {"message": "User created successfully", "user_id": create_user_model.id} | |
| except Exception as e: | |
| print(e) | |
| return JSONResponse( | |
| status_code=500, content="An error occuring when register user" | |
| ) | |
| async def forget_password(): | |
| pass | |
| async def change_password( | |
| user: user_dependency, db: db_dependency, user_verification: UserVerification | |
| ): | |
| if user is None: | |
| return JSONResponse(status_code=401, content="Authentication Failed") | |
| user_model = db.query(User).filter(User.id == user.get("id")).first() | |
| if not bcrypt_context.verify( | |
| user_verification.password, user_model.hashed_password | |
| ): | |
| return JSONResponse(status_code=401, content="Error on password change") | |
| user_model.hashed_password = bcrypt_context.hash(user_verification.new_password) | |
| db.add(user_model) | |
| db.commit() | |
| db.refresh(user_model) | |
| return {"message": "User's password successfully changed", "user_id": user_model.id} | |