Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Depends | |
| from fastapi.responses import JSONResponse | |
| from api.router.user import user_dependency | |
| from typing import Annotated | |
| from sqlalchemy.orm import Session | |
| from db.database import get_db | |
| from db.models import Role, User | |
| from service.dto import RoleCreate, RoleUpdate | |
| router = APIRouter(tags=["Roles"]) | |
| db_dependency = Annotated[Session, Depends(get_db)] | |
| async def get_data_roles(user: user_dependency, db: db_dependency): | |
| # Periksa apakah user valid dan memiliki role_id = 1 | |
| if user is None or user.get("role_id") != 1: | |
| return JSONResponse(status_code=401, content="Authentication Failed") | |
| try: | |
| # Query data role | |
| roles = db.query(Role).all() | |
| # Jika tidak ada role ditemukan, kembalikan 404 | |
| if not roles: | |
| return JSONResponse(status_code=404, content="No roles found") | |
| return roles | |
| except Exception as e: | |
| # Menangkap kesalahan untuk debug | |
| print(f"Error fetching roles: {str(e)}") | |
| return JSONResponse(status_code=500, content="Internal Server Error") | |
| # POST: Add a new role | |
| async def add_data_roles( | |
| role_data: RoleCreate, user: user_dependency, db: db_dependency | |
| ): | |
| if user is None or user.get("role_id") != 1: | |
| return JSONResponse(status_code=401, content="Authentication Failed") | |
| new_role = Role(name=role_data.name) | |
| db.add(new_role) | |
| db.commit() | |
| db.refresh(new_role) | |
| return {"message": "Role added successfully", "role": new_role} | |
| async def update_data_roles( | |
| role_id: int, role_data: RoleUpdate, user: user_dependency, db: db_dependency | |
| ): | |
| if user is None or user.get("role_id") != 1: | |
| return JSONResponse(status_code=401, content="Authentication Failed") | |
| role = db.query(Role).filter(Role.id == id).first() | |
| if role is None: | |
| return JSONResponse(status_code=404, content="Role not found") | |
| role.name = role_data.name | |
| db.commit() | |
| db.refresh(role) | |
| return {"message": "Role updated successfully", "role": role} | |
| # DELETE: Remove a role | |
| async def remove_data_roles(id: int, user: user_dependency, db: db_dependency): | |
| if user is None or user.get("role_id") != 1: | |
| return JSONResponse(status_code=401, content="Authentication Failed") | |
| role = db.query(Role).filter(Role.id == id).first() | |
| if role is None: | |
| return JSONResponse(status_code=404, content="Role not found") | |
| db.delete(role) | |
| db.commit() | |
| return {"message": "Role removed successfully"} | |
| async def update_user_role( | |
| user: user_dependency, db: db_dependency, user_id: int, role_data: RoleUpdate | |
| ): | |
| # Check if the current user is authenticated and has an admin role (role_id == 1) | |
| if user is None or user.get("role_id") != 1: | |
| return JSONResponse(status_code=401, content="Authentication Failed") | |
| # Fetch the user to be updated | |
| user_to_update = db.query(User).filter(User.id == user_id).first() | |
| if user_to_update is None: | |
| return JSONResponse(status_code=404, content="User not found") | |
| # Update the user's role | |
| user_to_update.role_id = ( | |
| role_data.role_id | |
| ) # Assuming role_data contains the new role_id | |
| # Commit the changes to the database | |
| db.commit() | |
| db.refresh(user_to_update) | |
| return {"message": "User role updated successfully", "user": user_to_update} | |