Testys commited on
Commit
93cf1dc
·
1 Parent(s): ec5cc84

FEAT: Code for crud completed

Browse files
main.py CHANGED
@@ -4,25 +4,51 @@ from datetime import datetime
4
  from fastapi.middleware.cors import CORSMiddleware
5
  import uvicorn
6
  from contextlib import asynccontextmanager
 
 
 
 
 
7
 
8
  # Correctly import the database table creation function
9
- from src.database import create_db_and_tables
10
  # Import all the necessary routers for the application
11
  from src.routers import students, devices, clearance, token, users, admin
 
 
12
 
13
  @asynccontextmanager
14
- async def lifespan(app_instance: FastAPI):
15
- """
16
- Handles application startup and shutdown events.
17
- - On startup, it ensures that all necessary database tables are created.
18
- - On shutdown, it can be used for cleanup tasks.
19
- """
20
- print("Application startup: Initializing...")
21
- # Correctly call the function to create database tables
22
  create_db_and_tables()
23
- print("Application startup: Database tables checked/created.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  yield
25
- print("Application shutdown: Cleaning up.")
 
 
26
 
27
 
28
  # Initialize the FastAPI application instance with metadata for documentation
 
4
  from fastapi.middleware.cors import CORSMiddleware
5
  import uvicorn
6
  from contextlib import asynccontextmanager
7
+ import os
8
+ from fastapi import FastAPI
9
+ from sqlmodel import Session
10
+ from starlette.middleware.cors import CORSMiddleware
11
+
12
 
13
  # Correctly import the database table creation function
14
+ from src.database import create_db_and_tables, engine
15
  # Import all the necessary routers for the application
16
  from src.routers import students, devices, clearance, token, users, admin
17
+ from src.crud import users as user_crud
18
+ from src.models import UserCreate, Role
19
 
20
  @asynccontextmanager
21
+ async def lifespan(app: FastAPI):
22
+ # On startup
23
+ print("Starting up...")
 
 
 
 
 
24
  create_db_and_tables()
25
+
26
+ # --- Create first superuser ---
27
+ # This runs only on startup. It checks if an admin exists and creates one if not.
28
+ # It's best practice to get credentials from environment variables for security.
29
+ with Session(engine) as session:
30
+ initial_username = os.getenv("INITIAL_ADMIN_USERNAME", "admin")
31
+
32
+ # Check if the user already exists
33
+ user = user_crud.get_user_by_username(session, username=initial_username)
34
+ if not user:
35
+ print("Initial admin user not found, creating one...")
36
+ initial_user = UserCreate(
37
+ username=initial_username,
38
+ email=os.getenv("INITIAL_ADMIN_EMAIL", "admin@example.com"),
39
+ full_name="Initial Admin",
40
+ password=os.getenv("INITIAL_ADMIN_PASSWORD", "changethispassword"),
41
+ role=Role.ADMIN
42
+ )
43
+ user_crud.create_user(db=session, user=initial_user)
44
+ print("Initial admin user created successfully.")
45
+ else:
46
+ print("Initial admin user already exists.")
47
+
48
  yield
49
+ # On shutdown
50
+ print("Shutting down...")
51
+
52
 
53
 
54
  # Initialize the FastAPI application instance with metadata for documentation
src/auth.py CHANGED
@@ -1,17 +1,16 @@
1
- from fastapi import Depends, HTTPException, status
2
  from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
3
  from jose import JWTError, jwt
4
  from sqlmodel import Session, select
5
  from typing import List, Optional
6
- from typing import List, Optional
7
  from datetime import datetime, timedelta
8
 
9
-
10
  from src.config import settings
11
  from src.database import get_session
12
  from src.crud import users as user_crud
13
  from src.crud import devices as device_crud
14
  from src.models import User, Role, Device
 
15
 
16
  # --- Configuration ---
17
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@@ -28,13 +27,6 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
28
  encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
29
  return encoded_jwt
30
 
31
- # --- Password Hashing ---
32
- def verify_password(plain_password: str, hashed_password: str) -> bool:
33
- return settings.PWD_CONTEXT.verify(plain_password, hashed_password)
34
-
35
- def hash_password(password: str) -> str:
36
- return settings.PWD_CONTEXT.hash(password)
37
-
38
  # --- User Authentication ---
39
  def authenticate_user(db: Session, username: str, password: str):
40
  """Authenticate user by username and password."""
@@ -47,20 +39,20 @@ def authenticate_user(db: Session, username: str, password: str):
47
 
48
  # --- Dependency for API Key Authentication ---
49
 
50
- def get_api_key(
51
- key: str = Depends(api_key_header), db: Session = Depends(get_session)
52
- ) -> Device:
53
- """
54
- Dependency to validate the API key from the x-api-key header.
55
- Ensures the device is registered in the database.
56
- """
57
- db_device = device_crud.get_device_by_api_key(db, api_key=key)
58
- if not db_device:
59
- raise HTTPException(
60
- status_code=status.HTTP_401_UNAUTHORIZED,
61
- detail="Invalid or missing API Key",
62
- )
63
- return db_device
64
 
65
 
66
  # --- Dependency for User Authentication and Authorization ---
@@ -98,3 +90,20 @@ def get_current_active_user(required_roles: List[Role] = None):
98
  return user
99
 
100
  return dependency
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import Depends, HTTPException, status, Security
2
  from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
3
  from jose import JWTError, jwt
4
  from sqlmodel import Session, select
5
  from typing import List, Optional
 
6
  from datetime import datetime, timedelta
7
 
 
8
  from src.config import settings
9
  from src.database import get_session
10
  from src.crud import users as user_crud
11
  from src.crud import devices as device_crud
12
  from src.models import User, Role, Device
13
+ from src.crud.utils import verify_password, hash_password
14
 
15
  # --- Configuration ---
16
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 
27
  encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
28
  return encoded_jwt
29
 
 
 
 
 
 
 
 
30
  # --- User Authentication ---
31
  def authenticate_user(db: Session, username: str, password: str):
32
  """Authenticate user by username and password."""
 
39
 
40
  # --- Dependency for API Key Authentication ---
41
 
42
+ # def get_api_key(
43
+ # key: str = Depends(api_key_header), db: Session = Depends(get_session)
44
+ # ) -> Device:
45
+ # """
46
+ # Dependency to validate the API key from the x-api-key header.
47
+ # Ensures the device is registered in the database.
48
+ # """
49
+ # db_device = device_crud.get_device_by_api_key(db, api_key=key)
50
+ # if not db_device:
51
+ # raise HTTPException(
52
+ # status_code=status.HTTP_401_UNAUTHORIZED,
53
+ # detail="Invalid or missing API Key",
54
+ # )
55
+ # return db_device
56
 
57
 
58
  # --- Dependency for User Authentication and Authorization ---
 
90
  return user
91
 
92
  return dependency
93
+
94
+ async def get_api_key(api_key: str = Security(api_key_header), db: Session = Depends(get_session)):
95
+ """Validate device API key."""
96
+ if not api_key:
97
+ raise HTTPException(
98
+ status_code=status.HTTP_401_UNAUTHORIZED,
99
+ detail="API key required"
100
+ )
101
+
102
+ device = device_crud.get_device_by_api_key(db, api_key=api_key)
103
+ if not device or not device.is_active:
104
+ raise HTTPException(
105
+ status_code=status.HTTP_401_UNAUTHORIZED,
106
+ detail="Invalid or inactive API key"
107
+ )
108
+
109
+ return api_key
src/crud/clearance.py CHANGED
@@ -1,7 +1,6 @@
1
  from sqlmodel import Session, select
2
  from typing import List, Optional
3
-
4
- from src.models import Student, ClearanceStatus, ClearanceUpdate, Department, ClearanceProcess
5
 
6
  def get_clearance_status_for_student(db: Session, student: Student) -> List[ClearanceStatus]:
7
  """
@@ -9,37 +8,38 @@ def get_clearance_status_for_student(db: Session, student: Student) -> List[Clea
9
  """
10
  return student.clearance_statuses
11
 
12
- def update_clearance_status(db: Session, clearance_update: ClearanceUpdate) -> Optional[ClearanceStatus]:
13
  """
14
- Updates the clearance status for a student in a specific department.
15
-
16
- This function performs a direct lookup on the ClearanceStatus table, which is
17
- more efficient than fetching the student and iterating through their statuses.
18
  """
19
- # First, find the student to ensure they exist.
20
- student = db.exec(select(Student).where(Student.matric_no == clearance_update.matric_no)).first()
 
21
  if not student:
22
  return None # Student not found
23
 
24
- # Directly query for the specific clearance status record.
25
- statement = select(ClearanceStatus).where(
26
  ClearanceStatus.student_id == student.id,
27
- ClearanceStatus.department == clearance_update.department
28
  )
29
- status_to_update = db.exec(statement).first()
30
 
31
- if not status_to_update:
32
- # This case should theoretically not happen if students are created correctly,
33
- # but it's a good safeguard.
34
  return None
35
 
36
- # Update the status and commit the change.
37
- status_to_update.status = clearance_update.status
38
- db.add(status_to_update)
39
- db.commit()
40
- db.refresh(status_to_update)
41
 
42
- return status_to_update
 
 
 
 
 
43
 
44
  def is_student_fully_cleared(db: Session, matric_no: str) -> bool:
45
  """
@@ -51,7 +51,7 @@ def is_student_fully_cleared(db: Session, matric_no: str) -> bool:
51
 
52
  # Check if any of the student's clearance statuses are NOT 'approved'.
53
  for status in student.clearance_statuses:
54
- if status.status != ClearanceProcess.APPROVED:
55
  return False
56
 
57
  # If the loop completes without returning, all statuses are approved.
 
1
  from sqlmodel import Session, select
2
  from typing import List, Optional
3
+ from src.models import ClearanceStatus, Student, ClearanceUpdate, ClearanceStatusEnum
 
4
 
5
  def get_clearance_status_for_student(db: Session, student: Student) -> List[ClearanceStatus]:
6
  """
 
8
  """
9
  return student.clearance_statuses
10
 
11
+ def update_clearance_status(db: Session, update_data: ClearanceUpdate) -> ClearanceStatus | None:
12
  """
13
+ Updates the clearance status for a specific student and department.
 
 
 
14
  """
15
+ # Find the student first
16
+ student_statement = select(Student).where(Student.matric_no == update_data.matric_no)
17
+ student = db.exec(student_statement).first()
18
  if not student:
19
  return None # Student not found
20
 
21
+ # Now find the specific clearance status record for that student
22
+ status_statement = select(ClearanceStatus).where(
23
  ClearanceStatus.student_id == student.id,
24
+ ClearanceStatus.department == update_data.department
25
  )
26
+ clearance_record = db.exec(status_statement).first()
27
 
28
+ if not clearance_record:
29
+ # This case should ideally not happen if students are created correctly
 
30
  return None
31
 
32
+ # Update the status and remarks
33
+ clearance_record.status = update_data.status
34
+ if update_data.remarks is not None:
35
+ clearance_record.remarks = update_data.remarks
 
36
 
37
+ db.add(clearance_record)
38
+ db.commit()
39
+ db.refresh(clearance_record)
40
+
41
+ return clearance_record
42
+
43
 
44
  def is_student_fully_cleared(db: Session, matric_no: str) -> bool:
45
  """
 
51
 
52
  # Check if any of the student's clearance statuses are NOT 'approved'.
53
  for status in student.clearance_statuses:
54
+ if status.status != ClearanceStatusEnum.APPROVED:
55
  return False
56
 
57
  # If the loop completes without returning, all statuses are approved.
src/crud/devices.py CHANGED
@@ -20,7 +20,8 @@ def create_device(db: Session, device: DeviceCreate) -> Optional[Device]:
20
 
21
  db_device = Device(
22
  device_name=device.device_name,
23
- department=device.department,
 
24
  api_key=api_key,
25
  is_active=True
26
  )
@@ -30,6 +31,10 @@ def create_device(db: Session, device: DeviceCreate) -> Optional[Device]:
30
  db.refresh(db_device)
31
  return db_device
32
 
 
 
 
 
33
  def get_device_by_api_key(db: Session, api_key: str) -> Optional[Device]:
34
  """Retrieves an active device by its API key."""
35
  statement = select(Device).where(Device.api_key == api_key, Device.is_active == True)
 
20
 
21
  db_device = Device(
22
  device_name=device.device_name,
23
+ location=device.location, # ADD THIS
24
+ department=device.department, # ADD THIS
25
  api_key=api_key,
26
  is_active=True
27
  )
 
31
  db.refresh(db_device)
32
  return db_device
33
 
34
+ def get_device_by_id(db: Session, device_id: int) -> Optional[Device]:
35
+ """Retrieves a device by its primary key ID."""
36
+ return db.get(Device, device_id)
37
+
38
  def get_device_by_api_key(db: Session, api_key: str) -> Optional[Device]:
39
  """Retrieves an active device by its API key."""
40
  statement = select(Device).where(Device.api_key == api_key, Device.is_active == True)
src/crud/students.py CHANGED
@@ -1,9 +1,10 @@
1
  from sqlmodel import Session, select
2
  from typing import List, Optional
3
 
4
- from src.models import Student, StudentCreate, StudentUpdate, RFIDTag, ClearanceStatus, Department, ClearanceProcess
5
- from src.auth import hash_password
6
-
 
7
  # --- Read Operations ---
8
 
9
  def get_student_by_id(db: Session, student_id: int) -> Optional[Student]:
@@ -15,84 +16,81 @@ def get_student_by_matric_no(db: Session, matric_no: str) -> Optional[Student]:
15
  return db.exec(select(Student).where(Student.matric_no == matric_no)).first()
16
 
17
  def get_student_by_tag_id(db: Session, tag_id: str) -> Optional[Student]:
18
- """Retrieves a student by their linked RFID tag ID."""
19
- statement = select(Student).join(RFIDTag).where(RFIDTag.tag_id == tag_id)
20
- return db.exec(statement).first()
 
 
 
21
 
22
  def get_all_students(db: Session, skip: int = 0, limit: int = 100) -> List[Student]:
23
  """Retrieves a paginated list of all students."""
24
  return db.exec(select(Student).offset(skip).limit(limit)).all()
25
 
26
  # --- Write Operations ---
27
-
28
- def create_student(db: Session, student: StudentCreate) -> Student:
29
  """
30
- Creates a new student and initializes their clearance statuses.
31
-
32
- This is a critical business logic function. When a student is created,
33
- this function automatically creates a 'pending' clearance record for every
34
- department defined in the `Department` enum.
35
  """
36
- hashed_pass = hash_password(student.password)
37
- db_student = Student(
38
- matric_no=student.matric_no,
39
- full_name=student.full_name,
40
- email=student.email,
41
- hashed_password=hashed_pass,
42
- # Department will be set from the StudentCreate model
43
- department=student.department
44
  )
45
-
46
- # --- Auto-populate clearance statuses ---
47
- initial_statuses = []
48
- for dept in Department:
49
- status = ClearanceStatus(
50
- department=dept,
51
- status=ClearanceProcess.PENDING
52
- )
53
- initial_statuses.append(status)
54
-
55
- db_student.clearance_statuses = initial_statuses
56
- # --- End of auto-population ---
57
 
 
 
 
 
 
 
 
 
58
  db.add(db_student)
59
  db.commit()
60
  db.refresh(db_student)
61
- return db_student
62
-
63
- def update_student(db: Session, student_id: int, student_update: StudentUpdate) -> Optional[Student]:
64
- """
65
- Updates a student's information.
66
- If a new password is provided, it will be hashed.
67
- """
68
- db_student = db.get(Student, student_id)
69
- if not db_student:
70
- return None
71
-
72
- update_data = student_update.model_dump(exclude_unset=True)
73
-
74
- if "password" in update_data:
75
- update_data["hashed_password"] = hash_password(update_data.pop("password"))
76
 
77
- for key, value in update_data.items():
78
- setattr(db_student, key, value)
 
 
79
 
80
- db.add(db_student)
81
  db.commit()
82
  db.refresh(db_student)
 
83
  return db_student
84
 
85
- def delete_student(db: Session, student_id: int) -> Optional[Student]:
86
- """
 
 
 
 
 
 
 
 
 
 
87
 
88
- Deletes a student from the database.
89
- All associated clearance statuses and the linked RFID tag will also be
90
- deleted automatically due to the cascade settings in the data models.
91
- """
92
- db_student = db.get(Student, student_id)
93
- if not db_student:
94
  return None
95
 
96
- db.delete(db_student)
 
 
 
 
 
97
  db.commit()
98
- return db_student
 
1
  from sqlmodel import Session, select
2
  from typing import List, Optional
3
 
4
+ from src.models import (
5
+ Student, StudentCreate, StudentUpdate, User, Role, ClearanceStatus, ClearanceDepartment, RFIDTag
6
+ )
7
+ from src.crud import users as user_crud
8
  # --- Read Operations ---
9
 
10
  def get_student_by_id(db: Session, student_id: int) -> Optional[Student]:
 
16
  return db.exec(select(Student).where(Student.matric_no == matric_no)).first()
17
 
18
  def get_student_by_tag_id(db: Session, tag_id: str) -> Optional[Student]:
19
+ """Get student by RFID tag ID."""
20
+ from src.models import RFIDTag
21
+ tag = db.exec(select(RFIDTag).where(RFIDTag.tag_id == tag_id)).first()
22
+ if tag and tag.student_id:
23
+ return db.exec(select(Student).where(Student.id == tag.student_id)).first()
24
+ return None
25
 
26
  def get_all_students(db: Session, skip: int = 0, limit: int = 100) -> List[Student]:
27
  """Retrieves a paginated list of all students."""
28
  return db.exec(select(Student).offset(skip).limit(limit)).all()
29
 
30
  # --- Write Operations ---
31
+ def create_student(db: Session, student_data: StudentCreate) -> Student:
 
32
  """
33
+ Creates a new student along with their associated user account for login
34
+ and initializes their clearance statuses.
 
 
 
35
  """
36
+ # 1. Create the associated User account for the student to log in
37
+ # The student's username is their matriculation number.
38
+ user_for_student = user_crud.UserCreate(
39
+ username=student_data.matric_no,
40
+ password=student_data.password,
41
+ email=student_data.email,
42
+ full_name=student_data.full_name,
43
+ role=Role.STUDENT
44
  )
45
+ # This might raise an exception if username/email exists, which is good.
46
+ db_user = user_crud.create_user(db, user=user_for_student)
 
 
 
 
 
 
 
 
 
 
47
 
48
+ # 2. Create the Student profile
49
+ db_student = Student(
50
+ full_name=student_data.full_name,
51
+ matric_no=student_data.matric_no,
52
+ email=student_data.email,
53
+ department=student_data.department,
54
+ # Note: The User linkage is handled via the RFID tag linking process
55
+ )
56
  db.add(db_student)
57
  db.commit()
58
  db.refresh(db_student)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
 
60
+ # 3. Initialize all clearance statuses for the new student
61
+ for dept in ClearanceDepartment:
62
+ status = ClearanceStatus(student_id=db_student.id, department=dept)
63
+ db.add(status)
64
 
 
65
  db.commit()
66
  db.refresh(db_student)
67
+
68
  return db_student
69
 
70
+ def update_student(db: Session, student: Student, updates: StudentUpdate) -> Student:
71
+ """Updates a student's profile information."""
72
+ student = get_student_by_id(db, student_id=student.id)
73
+ if not student:
74
+ return None
75
+
76
+ update_data = updates.model_dump(exclude_unset=True)
77
+ student.sqlmodel_update(update_data)
78
+ db.add(student)
79
+ db.commit()
80
+ db.refresh(student)
81
+ return student
82
 
83
+ def delete_student(db: Session, student_id: int) -> Student | None:
84
+ """Deletes a student and their associated clearance records."""
85
+ student_to_delete = db.get(Student, student_id)
86
+ if not student_to_delete:
 
 
87
  return None
88
 
89
+ # Also delete the associated user account
90
+ user_to_delete = user_crud.get_user_by_username(db, username=student_to_delete.matric_no)
91
+ if user_to_delete:
92
+ db.delete(user_to_delete)
93
+
94
+ db.delete(student_to_delete)
95
  db.commit()
96
+ return student_to_delete
src/crud/users.py CHANGED
@@ -2,7 +2,7 @@ from sqlmodel import Session, select
2
  from typing import List, Optional
3
 
4
  from src.models import User, UserCreate, UserUpdate, RFIDTag
5
- from src.auth import hash_password
6
 
7
  # --- Read Operations ---
8
 
@@ -19,67 +19,58 @@ def get_user_by_email(db: Session, email: str) -> Optional[User]:
19
  return db.exec(select(User).where(User.email == email)).first()
20
 
21
  def get_user_by_tag_id(db: Session, tag_id: str) -> Optional[User]:
22
- """Retrieves a user by their linked RFID tag ID."""
23
- statement = select(User).join(RFIDTag).where(RFIDTag.tag_id == tag_id)
24
- return db.exec(statement).first()
 
 
 
25
 
26
  def get_all_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
27
  """Retrieves a paginated list of all users."""
28
  return db.exec(select(User).offset(skip).limit(limit)).all()
29
 
30
- # --- Write Operations ---
31
-
32
  def create_user(db: Session, user: UserCreate) -> User:
33
- """
34
- Creates a new user.
35
- - Hashes the password before saving.
36
- - The router should handle checks for existing username/email to provide clean HTTP errors.
37
- """
38
- hashed_pass = hash_password(user.password)
39
  db_user = User(
40
  username=user.username,
 
41
  email=user.email,
42
  full_name=user.full_name,
43
- hashed_password=hashed_pass,
44
- role=user.role
45
  )
46
  db.add(db_user)
47
  db.commit()
48
  db.refresh(db_user)
49
  return db_user
50
 
51
- def update_user(db: Session, user_id: int, user_update: UserUpdate) -> Optional[User]:
52
- """
53
- Updates a user's information.
54
- - If a new password is provided, it will be hashed.
55
- """
56
- db_user = db.get(User, user_id)
57
- if not db_user:
58
  return None
59
-
60
- update_data = user_update.model_dump(exclude_unset=True)
61
 
62
- # Hash password if it's being updated
 
63
  if "password" in update_data:
 
64
  update_data["hashed_password"] = hash_password(update_data.pop("password"))
65
-
66
- for key, value in update_data.items():
67
- setattr(db_user, key, value)
68
 
69
- db.add(db_user)
 
 
70
  db.commit()
71
- db.refresh(db_user)
72
- return db_user
73
 
74
- def delete_user(db: Session, user_id: int) -> Optional[User]:
75
- """
76
- Deletes a user from the database.
77
- The linked RFID tag will also be deleted due to cascade settings.
78
- """
79
- db_user = db.get(User, user_id)
80
- if not db_user:
81
  return None
82
-
83
- db.delete(db_user)
84
  db.commit()
85
- return db_user
 
 
2
  from typing import List, Optional
3
 
4
  from src.models import User, UserCreate, UserUpdate, RFIDTag
5
+ from src.crud.utils import hash_password
6
 
7
  # --- Read Operations ---
8
 
 
19
  return db.exec(select(User).where(User.email == email)).first()
20
 
21
  def get_user_by_tag_id(db: Session, tag_id: str) -> Optional[User]:
22
+ """Get user by RFID tag ID."""
23
+ from src.models import RFIDTag
24
+ tag = db.exec(select(RFIDTag).where(RFIDTag.tag_id == tag_id)).first()
25
+ if tag and tag.user_id:
26
+ return db.exec(select(User).where(User.id == tag.user_id)).first()
27
+ return None
28
 
29
  def get_all_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
30
  """Retrieves a paginated list of all users."""
31
  return db.exec(select(User).offset(skip).limit(limit)).all()
32
 
 
 
33
  def create_user(db: Session, user: UserCreate) -> User:
34
+ """Creates a new user and hashes their password."""
35
+ hashed_password = hash_password(user.password)
 
 
 
 
36
  db_user = User(
37
  username=user.username,
38
+ hashed_password=hashed_password,
39
  email=user.email,
40
  full_name=user.full_name,
41
+ role=user.role,
42
+ department=user.department
43
  )
44
  db.add(db_user)
45
  db.commit()
46
  db.refresh(db_user)
47
  return db_user
48
 
49
+ def update_user(db: Session, user: User, updates: UserUpdate) -> User:
50
+ """Updates a user's information."""
51
+ user = get_user_by_id(db, user_id=user.id)
52
+ if not user:
 
 
 
53
  return None
 
 
54
 
55
+ update_data = updates.model_dump(exclude_unset=True)
56
+
57
  if "password" in update_data:
58
+ # Hash the new password if it's being updated
59
  update_data["hashed_password"] = hash_password(update_data.pop("password"))
 
 
 
60
 
61
+ user.sqlmodel_update(update_data)
62
+
63
+ db.add(user)
64
  db.commit()
65
+ db.refresh(user)
66
+ return user
67
 
68
+ def delete_user(db: Session, user_id: int) -> User | None:
69
+ """Deletes a user by their ID."""
70
+ user_to_delete = db.get(User, user_id)
71
+ if not user_to_delete:
 
 
 
72
  return None
73
+ db.delete(user_to_delete)
 
74
  db.commit()
75
+ # The user object is no longer valid after deletion, so we return the in-memory object
76
+ return user_to_delete
src/crud/utils.py CHANGED
@@ -1,14 +1,11 @@
1
  """
2
  Utility functions for CRUD operations.
3
  """
4
- import bcrypt
5
 
6
- def hash_password(password: str) -> str:
7
- """Hashes a password using bcrypt."""
8
- return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
9
 
10
- def verify_password(plain_password: str, hashed_password_str: str) -> bool:
11
- """Verifies a plain password against a hashed password."""
12
- if not plain_password or not hashed_password_str:
13
- return False
14
- return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password_str.encode('utf-8'))
 
1
  """
2
  Utility functions for CRUD operations.
3
  """
4
+ from src.config import settings
5
 
6
+ # --- Password Hashing ---
7
+ def verify_password(plain_password: str, hashed_password: str) -> bool:
8
+ return settings.PWD_CONTEXT.verify(plain_password, hashed_password)
9
 
10
+ def hash_password(password: str) -> str:
11
+ return settings.PWD_CONTEXT.hash(password)
 
 
 
src/models.py CHANGED
@@ -70,6 +70,7 @@ class Device(SQLModel, table=True):
70
  device_name: str = Field(unique=True, index=True)
71
  api_key: str = Field(unique=True, index=True)
72
  location: str
 
73
  is_active: bool = Field(default=True)
74
 
75
  # --- Pydantic Models for API Operations ---
@@ -156,10 +157,10 @@ class RFIDScanRequest(SQLModel):
156
  tag_id: str
157
 
158
  class RFIDStatusResponse(SQLModel):
159
- status: str
160
  full_name: Optional[str] = None
161
- message: Optional[str] = None
162
- clearance: Optional[str] = None
163
 
164
  class TagScan(SQLModel):
165
  tag_id: str
@@ -168,10 +169,12 @@ class TagScan(SQLModel):
168
  class DeviceCreate(SQLModel):
169
  device_name: str
170
  location: str
 
171
 
172
  class DeviceRead(SQLModel):
173
  id: int
174
  device_name: str
175
  api_key: str
176
  location: str
 
177
  is_active: bool
 
70
  device_name: str = Field(unique=True, index=True)
71
  api_key: str = Field(unique=True, index=True)
72
  location: str
73
+ department: Department # ADD THIS - referenced in devices.py CRUD
74
  is_active: bool = Field(default=True)
75
 
76
  # --- Pydantic Models for API Operations ---
 
157
  tag_id: str
158
 
159
  class RFIDStatusResponse(SQLModel):
160
+ status: str # "found" or "unregistered" # "found" or "unregistered"
161
  full_name: Optional[str] = None
162
+ entity_type: Optional[str] = None # "Student", "Admin", "Staff"None # "Student", "Admin", "Staff"
163
+ clearance_status: Optional[str] = None # "Fully Cleared", "Pending Clearance", "N/A" = None # "Fully Cleared", "Pending Clearance", "N/A"
164
 
165
  class TagScan(SQLModel):
166
  tag_id: str
 
169
  class DeviceCreate(SQLModel):
170
  device_name: str
171
  location: str
172
+ department: Department # ADD THIS
173
 
174
  class DeviceRead(SQLModel):
175
  id: int
176
  device_name: str
177
  api_key: str
178
  location: str
179
+ department: Department # ADD THIS
180
  is_active: bool
src/routers/admin.py CHANGED
@@ -1,63 +1,94 @@
1
  from fastapi import APIRouter, Depends, HTTPException, status, Query
2
- from sqlmodel import Session
3
  from typing import List, Optional, Dict
4
 
5
  from src.database import get_session
6
- from src.auth import get_current_active_user
7
  from src.models import (
8
  User, UserCreate, UserRead, UserUpdate, Role,
9
- Student, StudentCreate, StudentRead, StudentUpdate, StudentReadWithClearance,
10
- TagLink, RFIDTagRead, TagScan,
11
- Device, DeviceCreate, DeviceRead
12
  )
13
  from src.crud import users as user_crud
14
  from src.crud import students as student_crud
15
  from src.crud import tag_linking as tag_crud
16
  from src.crud import devices as device_crud
17
 
18
- # In-memory storage for last scanned tags by an admin.
19
- # Key: admin user ID, Value: last scanned tag ID.
20
- # This is simple and effective for a non-distributed system.
 
 
 
 
21
  admin_scanned_tags: Dict[int, str] = {}
22
 
23
- # Define the router.
24
- # It's accessible to both Admins and Staff, providing a unified management panel.
25
  router = APIRouter(
26
  prefix="/admin",
27
  tags=["Administration"],
28
  dependencies=[Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))],
29
  )
30
 
31
- # --- Admin Tag Scanning for Web UI ---
32
 
33
- @router.post("/tags/scan", status_code=status.HTTP_204_NO_CONTENT)
34
- def report_scanned_tag(
35
- scan_data: TagScan,
 
 
 
 
36
  current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))
37
  ):
38
  """
39
- (Admin & Staff) Endpoint for the admin's desk RFID reader to report a scanned tag.
40
- The backend stores this tag ID temporarily against the admin's user ID.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  """
42
- admin_scanned_tags[current_user.id] = scan_data.tag_id
 
 
 
 
 
 
43
  return
44
 
45
- @router.get("/tags/scan", response_model=TagScan)
46
- def retrieve_scanned_tag(
47
  current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))
48
  ):
49
  """
50
- (Admin & Staff) Endpoint for the admin's web portal to retrieve the last scanned tag.
51
- The tag is removed after being retrieved to prevent re-use.
52
  """
53
  tag_id = admin_scanned_tags.pop(current_user.id, None)
54
  if not tag_id:
55
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No tag has been scanned recently.")
56
  return TagScan(tag_id=tag_id)
57
 
58
 
59
- # --- Student Management (Admin + Staff) ---
60
-
61
  @router.post("/students/", response_model=StudentReadWithClearance, status_code=status.HTTP_201_CREATED)
62
  def create_student(student: StudentCreate, db: Session = Depends(get_session)):
63
  """(Admin & Staff) Creates a new student and initializes their clearance status."""
@@ -128,7 +159,10 @@ def unlink_rfid_tag(tag_id: str, db: Session = Depends(get_session)):
128
  """(Admin & Staff) Unlinks an RFID tag, making it available again."""
129
  deleted_tag = tag_crud.unlink_tag(db, tag_id)
130
  if not deleted_tag:
131
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="RFID Tag not found.")
 
 
 
132
  return deleted_tag
133
 
134
 
@@ -142,12 +176,18 @@ def require_super_admin(current_user: User = Depends(get_current_active_user()))
142
  detail="This action requires Super Admin privileges."
143
  )
144
 
145
- @router.post("/users/", response_model=UserRead, status_code=status.HTTP_201_CREATED, dependencies=[Depends(require_super_admin)])
146
- def create_user(user: UserCreate, db: Session = Depends(get_session)):
147
- """(Super Admin Only) Creates a new user (Staff or Admin)."""
148
- db_user = user_crud.get_user_by_username(db, username=user.username)
149
- if db_user:
150
- raise HTTPException(status_code=400, detail="Username already registered")
 
 
 
 
 
 
151
  return user_crud.create_user(db=db, user=user)
152
 
153
  @router.get("/users/", response_model=List[UserRead], dependencies=[Depends(require_super_admin)])
 
1
  from fastapi import APIRouter, Depends, HTTPException, status, Query
2
+ from sqlmodel import Session, SQLModel
3
  from typing import List, Optional, Dict
4
 
5
  from src.database import get_session
6
+ from src.auth import get_current_active_user, get_api_key
7
  from src.models import (
8
  User, UserCreate, UserRead, UserUpdate, Role,
9
+ Student, StudentCreate, StudentReadWithClearance, StudentUpdate, StudentRead,
10
+ TagLink, RFIDTagRead, Device, DeviceCreate, DeviceRead, TagScan
 
11
  )
12
  from src.crud import users as user_crud
13
  from src.crud import students as student_crud
14
  from src.crud import tag_linking as tag_crud
15
  from src.crud import devices as device_crud
16
 
17
+ # --- New State Management for Secure Admin Scanning ---
18
+
19
+ # Maps a device's API key to the admin user ID who activated it.
20
+ # This "activates" a scanner for a specific admin.
21
+ activated_scanners: Dict[str, int] = {}
22
+
23
+ # Stores the last tag scanned by a device, keyed by the admin ID who was waiting.
24
  admin_scanned_tags: Dict[int, str] = {}
25
 
26
+
27
+ # Define the main administrative router
28
  router = APIRouter(
29
  prefix="/admin",
30
  tags=["Administration"],
31
  dependencies=[Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))],
32
  )
33
 
34
+ # --- New Secure Scanning Workflow ---
35
 
36
+ class ActivationRequest(SQLModel):
37
+ device_id: int
38
+
39
+ @router.post("/scanners/activate", status_code=status.HTTP_204_NO_CONTENT)
40
+ def activate_admin_scanner(
41
+ activation: ActivationRequest,
42
+ db: Session = Depends(get_session),
43
  current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))
44
  ):
45
  """
46
+ STEP 1 (Browser): Admin clicks "Scan Card" in the UI.
47
+ The browser calls this endpoint to 'arm' their designated desk scanner.
48
+ """
49
+ device = device_crud.get_device_by_id(db, device_id=activation.device_id)
50
+ if not device:
51
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Device not found.")
52
+
53
+ # Map the device's API key to the currently logged-in admin's ID.
54
+ activated_scanners[device.api_key] = current_user.id
55
+ return
56
+
57
+
58
+ @router.post("/scanners/scan", status_code=status.HTTP_204_NO_CONTENT)
59
+ def receive_scan_from_activated_device(
60
+ scan_data: TagScan,
61
+ api_key: str = Depends(get_api_key) # Device authenticates with its API Key
62
+ ):
63
+ """
64
+ STEP 2 (Device): The ESP32 device sends the scanned tag to this endpoint.
65
+ This endpoint is protected by the device's API Key.
66
  """
67
+ # Check if this device was activated by an admin.
68
+ admin_id = activated_scanners.pop(api_key, None)
69
+ if admin_id is None:
70
+ raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="This scanner has not been activated for a scan.")
71
+
72
+ # Store the scanned tag against the admin who was waiting for it.
73
+ admin_scanned_tags[admin_id] = scan_data.tag_id
74
  return
75
 
76
+ @router.get("/scanners/retrieve", response_model=TagScan)
77
+ def retrieve_scanned_tag_for_ui(
78
  current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF]))
79
  ):
80
  """
81
+ STEP 3 (Browser): The browser polls this endpoint to get the tag ID
82
+ that the device reported in STEP 2.
83
  """
84
  tag_id = admin_scanned_tags.pop(current_user.id, None)
85
  if not tag_id:
86
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No tag has been scanned by the activated device yet.")
87
  return TagScan(tag_id=tag_id)
88
 
89
 
90
+ # --- All other administrative endpoints remain the same ---
91
+ # ... (Student Management, User Management, etc.) ...
92
  @router.post("/students/", response_model=StudentReadWithClearance, status_code=status.HTTP_201_CREATED)
93
  def create_student(student: StudentCreate, db: Session = Depends(get_session)):
94
  """(Admin & Staff) Creates a new student and initializes their clearance status."""
 
159
  """(Admin & Staff) Unlinks an RFID tag, making it available again."""
160
  deleted_tag = tag_crud.unlink_tag(db, tag_id)
161
  if not deleted_tag:
162
+ raise HTTPException(
163
+ status_code=status.HTTP_404_NOT_FOUND,
164
+ detail="RFID Tag not found."
165
+ )
166
  return deleted_tag
167
 
168
 
 
176
  detail="This action requires Super Admin privileges."
177
  )
178
 
179
+ @router.post(
180
+ "/users/",
181
+ response_model=UserRead,
182
+ status_code=status.HTTP_201_CREATED,
183
+ dependencies=[Depends(get_current_active_user(required_roles=[Role.ADMIN]))]
184
+ )
185
+ def create_user_as_admin(user: UserCreate, db: Session = Depends(get_session)):
186
+ """(Super Admin Only) Creates a new user (admin or staff)."""
187
+ if user_crud.get_user_by_username(db, username=user.username):
188
+ raise HTTPException(status_code=400, detail="Username already registered.")
189
+ if user_crud.get_user_by_email(db, email=user.email):
190
+ raise HTTPException(status_code=400, detail="Email already registered.")
191
  return user_crud.create_user(db=db, user=user)
192
 
193
  @router.get("/users/", response_model=List[UserRead], dependencies=[Depends(require_super_admin)])
src/routers/rfid.py CHANGED
@@ -4,7 +4,7 @@ from sqlmodel import Session
4
 
5
  from src.database import get_session
6
  from src.auth import get_api_key
7
- from src.models import RFIDStatusResponse, RFIDScanRequest
8
  from src.crud import students as student_crud
9
  from src.crud import users as user_crud
10
 
@@ -28,9 +28,10 @@ def check_rfid_status(
28
  # 1. Check if the tag belongs to a student
29
  student = student_crud.get_student_by_tag_id(db, tag_id=tag_id)
30
  if student:
31
- # Check overall clearance status
32
  is_cleared = all(
33
- status.status == "approved" for status in student.clearance_statuses
 
34
  )
35
  clearance_status_str = "Fully Cleared" if is_cleared else "Pending Clearance"
36
 
@@ -47,7 +48,7 @@ def check_rfid_status(
47
  return RFIDStatusResponse(
48
  status="found",
49
  full_name=user.full_name,
50
- entity_type=user.role.value, # e.g. "Admin" or "Staff"
51
  clearance_status="N/A",
52
  )
53
 
 
4
 
5
  from src.database import get_session
6
  from src.auth import get_api_key
7
+ from src.models import RFIDStatusResponse, RFIDScanRequest, ClearanceStatusEnum
8
  from src.crud import students as student_crud
9
  from src.crud import users as user_crud
10
 
 
28
  # 1. Check if the tag belongs to a student
29
  student = student_crud.get_student_by_tag_id(db, tag_id=tag_id)
30
  if student:
31
+ # Check overall clearance status using proper enum comparison
32
  is_cleared = all(
33
+ clearance.status == ClearanceStatusEnum.APPROVED
34
+ for clearance in student.clearance_statuses
35
  )
36
  clearance_status_str = "Fully Cleared" if is_cleared else "Pending Clearance"
37
 
 
48
  return RFIDStatusResponse(
49
  status="found",
50
  full_name=user.full_name,
51
+ entity_type=user.role.value.title(), # "Admin" or "Staff"
52
  clearance_status="N/A",
53
  )
54
 
src/routers/students.py CHANGED
@@ -1,29 +1,36 @@
1
  from fastapi import APIRouter, Depends, HTTPException, status
2
- from sqlmodel import Session
3
 
4
- from src.database import get_session
5
  from src.auth import get_current_active_user
6
- from src.models import Student, StudentReadWithClearance
 
 
 
 
 
7
 
8
  router = APIRouter(
9
  prefix="/students",
10
  tags=["Students"],
 
 
11
  )
12
 
13
- @router.get("/me", response_model=StudentReadWithClearance)
14
- def read_student_me(
15
- # This dependency ensures the user is an authenticated student
16
- # and injects their database object into the 'current_student' parameter.
17
- current_student: Student = Depends(get_current_active_user)
18
  ):
19
  """
20
- Endpoint for a logged-in student to retrieve their own profile
21
- and clearance information. The user is identified via their JWT token.
22
  """
23
- # Because the dependency returns the full student object, we can just return it.
24
- # No need for another database call.
25
- if not current_student:
26
- # This should not happen if the dependency is set up correctly
27
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Student not found")
28
- return current_student
29
 
 
 
 
 
 
 
 
 
1
  from fastapi import APIRouter, Depends, HTTPException, status
2
+ from sqlmodel import Session, SQLModel
3
 
 
4
  from src.auth import get_current_active_user
5
+ from src.database import get_session
6
+ from src.models import StudentReadWithClearance
7
+
8
+ # This is the new request body model for the POST request.
9
+ class StudentLookupRequest(SQLModel):
10
+ matric_no: str
11
 
12
  router = APIRouter(
13
  prefix="/students",
14
  tags=["Students"],
15
+ # This endpoint still requires a user to be logged in.
16
+ dependencies=[Depends(get_current_active_user)]
17
  )
18
 
19
+ @router.post("/lookup", response_model=StudentReadWithClearance)
20
+ def lookup_student_by_matric_no(
21
+ request: StudentLookupRequest,
22
+ db: Session = Depends(get_session)
 
23
  ):
24
  """
25
+ Endpoint for a logged-in user to retrieve a student's profile
26
+ and clearance information by providing their matriculation number.
27
  """
28
+ student = student_crud.get_student_by_matric_no(db=db, matric_no=request.matric_no)
 
 
 
 
 
29
 
30
+ if not student:
31
+ raise HTTPException(
32
+ status_code=status.HTTP_404_NOT_FOUND,
33
+ detail="Student with the provided matriculation number not found."
34
+ )
35
+
36
+ return student
src/routers/token.py CHANGED
@@ -34,8 +34,9 @@ async def login_for_access_token(
34
 
35
  # Create the JWT token
36
  access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
 
37
  access_token = create_access_token(
38
- data={"sub": user.email}, expires_delta=access_token_expires
39
  )
40
 
41
  return {"access_token": access_token, "token_type": "bearer"}
 
34
 
35
  # Create the JWT token
36
  access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
37
+ # FIX: Use username instead of email
38
  access_token = create_access_token(
39
+ data={"sub": user.username}, expires_delta=access_token_expires # CHANGED from user.email
40
  )
41
 
42
  return {"access_token": access_token, "token_type": "bearer"}