Spaces:
Runtime error
Runtime error
File size: 9,997 Bytes
71a3948 93cf1dc 71a3948 3358b33 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 1a8e744 71a3948 17d3149 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 3358b33 71a3948 3358b33 71a3948 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlmodel import Session, SQLModel
from typing import List, Optional, Dict
from src.database import get_session
from src.auth import get_current_active_user, get_api_key, require_super_admin
from src.models import (
User, UserCreate, UserRead, UserUpdate, Role,
Student, StudentCreate, StudentReadWithClearance, StudentUpdate, StudentRead,
TagLink, RFIDTagRead, Device, DeviceCreate, DeviceRead, TagScan
)
from src.crud import users as user_crud
from src.crud import students as student_crud
from src.crud import tag_linking as tag_crud
from src.crud import devices as device_crud
# Define the main administrative router
router = APIRouter(
prefix="/admin",
tags=["Administration"],
dependencies=[Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))],
)
# --- All other administrative endpoints remain the same ---
# ... (Stuent Management, User Management, etc.) ...
@router.post("/students/", response_model=StudentReadWithClearance, status_code=status.HTTP_201_CREATED)
def create_student(student: StudentCreate, db: Session = Depends(get_session)):
"""(Admin & Staff) Creates a new student and initializes their clearance status."""
db_student = student_crud.get_student_by_matric_no(db, matric_no=student.matric_no)
if db_student:
raise HTTPException(status_code=400, detail="Matriculation number already registered")
return student_crud.create_student(db=db, student_data=student)
@router.get("/students/", response_model=List[StudentReadWithClearance])
def read_all_students(skip: int = 0, limit: int = 100, db: Session = Depends(get_session)):
"""(Admin & Staff) Retrieves a list of all student records."""
return student_crud.get_all_students(db, skip=skip, limit=limit)
@router.get("/students/lookup", response_model=StudentReadWithClearance)
def lookup_student(
matric_no: Optional[str] = Query(None, description="Matriculation number of the student."),
tag_id: Optional[str] = Query(None, description="RFID tag ID linked to the student."),
db: Session = Depends(get_session)
):
"""(Admin & Staff) Looks up a single student by Matric Number OR Tag ID."""
if not matric_no and not tag_id:
raise HTTPException(status_code=400, detail="A matric_no or tag_id must be provided.")
if matric_no and tag_id:
raise HTTPException(status_code=400, detail="Provide either matric_no or tag_id, not both.")
db_student = None
if matric_no:
db_student = student_crud.get_student_by_matric_no(db, matric_no=matric_no)
elif tag_id:
db_student = student_crud.get_student_by_tag_id(db, tag_id=tag_id)
if not db_student:
raise HTTPException(status_code=404, detail="Student not found with the provided identifier.")
return db_student
@router.get("/students/{student_id}", response_model=StudentReadWithClearance)
def read_single_student(student_id: int, db: Session = Depends(get_session)):
"""(Admin & Staff) Retrieves a single student's complete record by their internal ID."""
db_student = student_crud.get_student_by_id(db, student_id=student_id)
if not db_student:
raise HTTPException(status_code=404, detail="Student not found")
return db_student
@router.put("/students/{student_id}", response_model=StudentReadWithClearance)
def update_student_details(student_id: int, student: StudentUpdate, db: Session = Depends(get_session)):
"""(Admin & Staff) Updates a student's information."""
updated_student = student_crud.update_student(db, student_id=student_id, student_update=student)
if not updated_student:
raise HTTPException(status_code=404, detail="Student not found")
return updated_student
# --- Tag Management (Admin + Staff) ---
@router.post("/tags/link", response_model=RFIDTagRead)
def link_rfid_tag(link_data: TagLink, db: Session = Depends(get_session)):
"""(Admin & Staff) Links an RFID tag to a student or user."""
new_tag = tag_crud.link_tag(db, link_data)
if not new_tag:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Could not link tag. The tag may already be in use, or the user/student already has a tag."
)
return new_tag
@router.delete("/tags/{tag_id}/unlink", response_model=RFIDTagRead)
def unlink_rfid_tag(tag_id: str, db: Session = Depends(get_session)):
"""(Admin & Staff) Unlinks an RFID tag, making it available again."""
deleted_tag = tag_crud.unlink_tag(db, tag_id)
if not deleted_tag:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="RFID Tag not found."
)
return deleted_tag
# --- Super Admin Only Functions ---
def require_super_admin(current_user: User = Depends(get_current_active_user())):
"""Dependency to ensure a user has the ADMIN role."""
if current_user.role != Role.ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="This action requires Super Admin privileges."
)
@router.post(
"/users/",
response_model=UserRead,
status_code=status.HTTP_201_CREATED,
dependencies=[Depends(get_current_active_user(required_roles=[Role.ADMIN]))]
)
def create_user_as_admin(user: UserCreate, db: Session = Depends(get_session)):
"""(Super Admin Only) Creates a new user (admin or staff)."""
if user_crud.get_user_by_username(db, username=user.username):
raise HTTPException(status_code=400, detail="Username already registered.")
if user_crud.get_user_by_email(db, email=user.email):
raise HTTPException(status_code=400, detail="Email already registered.")
return user_crud.create_user(db=db, user=user)
@router.get("/users/", response_model=List[UserRead], dependencies=[Depends(require_super_admin)])
def read_all_users(db: Session = Depends(get_session)):
"""(Super Admin Only) Retrieves a list of all users."""
return user_crud.get_all_users(db)
@router.get("/users/lookup", response_model=UserRead, dependencies=[Depends(require_super_admin)])
def lookup_user(
username: Optional[str] = Query(None, description="Username of the user."),
tag_id: Optional[str] = Query(None, description="RFID tag ID linked to the user."),
db: Session = Depends(get_session)
):
"""(Super Admin Only) Looks up a single user by Username OR Tag ID."""
if not username and not tag_id:
raise HTTPException(status_code=400, detail="A username or tag_id must be provided.")
if username and tag_id:
raise HTTPException(status_code=400, detail="Provide either username or tag_id, not both.")
db_user = None
if username:
db_user = user_crud.get_user_by_username(db, username=username)
elif tag_id:
db_user = user_crud.get_user_by_tag_id(db, tag_id=tag_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found with the provided identifier.")
return db_user
@router.put("/users/{user_id}", response_model=UserRead, dependencies=[Depends(require_super_admin)])
def update_user_details(user_id: int, user: UserUpdate, db: Session = Depends(get_session)):
"""(Super Admin Only) Updates a user's details (e.g., role)."""
db_user = user_crud.get_user_by_id(db, user_id=user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
updated_user = user_crud.update_user(db, user=db_user, updates=user)
if not updated_user:
raise HTTPException(status_code=404, detail="User not found")
return updated_user
@router.delete("/users/{user_id}", response_model=UserRead, dependencies=[Depends(require_super_admin)])
def delete_user_account(user_id: int, db: Session = Depends(get_session), current_user: User = Depends(get_current_active_user())):
"""(Super Admin Only) Deletes a user account."""
if current_user.id == user_id:
raise HTTPException(status_code=400, detail="Cannot delete your own account.")
deleted_user = user_crud.delete_user(db, user_id=user_id)
if not deleted_user:
raise HTTPException(status_code=404, detail="User not found")
return deleted_user
@router.delete("/students/{student_id}", response_model=StudentRead, dependencies=[Depends(require_super_admin)])
def delete_student_record(student_id: int, db: Session = Depends(get_session)):
"""(Super Admin Only) Deletes a student record and all associated data."""
deleted_student = student_crud.delete_student(db, student_id=student_id)
if not deleted_student:
raise HTTPException(status_code=404, detail="Student not found")
return deleted_student
@router.post("/devices/", response_model=DeviceRead, status_code=status.HTTP_201_CREATED, dependencies=[Depends(require_super_admin)])
def create_device(device: DeviceCreate, db: Session = Depends(get_session)):
"""(Super Admin Only) Registers a new RFID hardware device."""
db_device = device_crud.get_device_by_location(db, location=device.location)
if db_device:
raise HTTPException(status_code=400, detail=f"A device at location '{device.location}' already exists.")
return device_crud.create_device(db=db, device=device)
@router.get("/devices/", response_model=List[DeviceRead], dependencies=[Depends(require_super_admin)])
def read_all_devices(skip: int = 0, limit: int = 100, db: Session = Depends(get_session)):
return device_crud.get_all_devices(db, skip=skip, limit=limit)
@router.delete("/devices/{device_id}", response_model=DeviceRead, dependencies=[Depends(require_super_admin)])
def delete_device_registration(device_id: int, db: Session = Depends(get_session)):
"""(Super Admin Only) De-authorizes a hardware device."""
deleted_device = device_crud.delete_device(db, device_id=device_id)
if not deleted_device:
raise HTTPException(status_code=404, detail="Device not found")
return deleted_device
|