Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Depends, HTTPException, status, Security | |
| from fastapi.security import APIKeyHeader | |
| from sqlmodel import Session, SQLModel | |
| from typing import Dict | |
| from threading import Lock # Added for thread-safety on global state | |
| from src.database import get_session | |
| from src.auth import get_current_active_user, get_api_key | |
| from src.models import ( | |
| RFIDStatusResponse, RFIDScanRequest, ClearanceStatusEnum, | |
| TagScan, Device, Role, User | |
| ) | |
| from src.crud import students as student_crud | |
| from src.crud import users as user_crud | |
| from src.crud import devices as device_crud | |
| # Define the router and the API key security scheme | |
| router = APIRouter(prefix="/rfid", tags=["RFID"]) | |
| api_key_header = APIKeyHeader(name="x-api-key", auto_error=False) | |
| # --- Moved from admin.py: State for secure admin scanning --- | |
| # Maps a device's API key to the admin user ID who activated it. | |
| activated_scanners: Dict[str, int] = {} | |
| # Stores the last tag scanned by a device, keyed by the admin ID who was waiting. | |
| admin_scanned_tags: Dict[int, tuple[str, str]] = {} | |
| # Locks for thread-safe access to global dicts (since FastAPI is async/multi-threaded) | |
| activated_scanners_lock = Lock() | |
| admin_scanned_tags_lock = Lock() | |
| # --- Moved from admin.py: Secure Scanning Workflow --- | |
| class ActivationRequest(SQLModel): | |
| api_key: str | |
| def activate_admin_scanner( | |
| activation: ActivationRequest, | |
| db: Session = Depends(get_session), | |
| current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF])) | |
| ): | |
| """ | |
| STEP 1 (Browser): Admin clicks "Scan Card" in the UI. | |
| The browser calls this endpoint to 'arm' their designated desk scanner. | |
| """ | |
| device = device_crud.get_device_by_api_key(db, api_key=activation.api_key) | |
| if not device: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Device not found.") | |
| # Map the device's API key to the currently logged-in admin's ID (thread-safe). | |
| with activated_scanners_lock: | |
| activated_scanners[device.api_key] = current_user.id | |
| return | |
| def receive_scan_from_activated_device( | |
| scan_data: TagScan, | |
| db: Session = Depends(get_session) # Added back for device validation | |
| ): | |
| """ | |
| STEP 2 (Device): The ESP32 device sends the scanned tag to this endpoint. | |
| No authentication required for free use, but validates device existence. | |
| """ | |
| # Validate that the device exists (minimal security check without full auth). | |
| device = device_crud.get_device_by_api_key(db, api_key=scan_data.api_key) | |
| if not device: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Device not found.") | |
| # Check if this device was activated by an admin (thread-safe). | |
| with activated_scanners_lock: | |
| admin_id = activated_scanners.pop(scan_data.api_key, None) | |
| if admin_id is None: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="This scanner has not been activated for a scan.") | |
| # Store the scanned tag against the admin who was waiting for it (thread-safe). | |
| with admin_scanned_tags_lock: | |
| admin_scanned_tags[admin_id] = (scan_data.tag_id, scan_data.api_key) | |
| return | |
| def retrieve_scanned_tag_for_ui( | |
| current_user: User = Depends(get_current_active_user(required_roles=[Role.ADMIN, Role.STAFF])) | |
| ): | |
| """ | |
| STEP 3 (Browser): The browser polls this endpoint to get the tag ID | |
| that the device reported in STEP 2. | |
| """ | |
| # Retrieve the tag data (thread-safe). | |
| with admin_scanned_tags_lock: | |
| tag_data = admin_scanned_tags.pop(current_user.id, None) | |
| if not tag_data: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="No tag has been scanned by the activated device yet.") | |
| tag_id, api_key = tag_data | |
| return TagScan(tag_id=tag_id, api_key=api_key) | |
| # --- Existing /check-status endpoint remains --- | |
| def check_rfid_status( | |
| scan_data: RFIDScanRequest, | |
| db: Session = Depends(get_session), | |
| # This dependency ensures the request comes from a valid, registered device | |
| api_key: str = Security(get_api_key), | |
| ): | |
| """ | |
| Public endpoint for hardware devices to check the status of a scanned RFID tag. | |
| The device must provide a valid API key in the 'x-api-key' header. | |
| """ | |
| tag_id = scan_data.tag_id | |
| # 1. Check if the tag belongs to a student | |
| student = student_crud.get_student_by_tag_id(db, tag_id=tag_id) | |
| if student: | |
| # Check overall clearance status using proper enum comparison | |
| is_cleared = all( | |
| clearance.status == ClearanceStatusEnum.APPROVED | |
| for clearance in student.clearance_statuses | |
| ) | |
| clearance_status_str = "Fully Cleared" if is_cleared else "Pending Clearance" | |
| return RFIDStatusResponse( | |
| status="found", | |
| full_name=student.full_name, | |
| entity_type="Student", | |
| clearance_status=clearance_status_str, | |
| ) | |
| # 2. If not a student, check if it belongs to a user (staff/admin) | |
| user = user_crud.get_user_by_tag_id(db, tag_id=tag_id) | |
| if user: | |
| return RFIDStatusResponse( | |
| status="found", | |
| full_name=user.full_name, | |
| entity_type=user.role.value.title(), # "Admin" or "Staff" | |
| clearance_status="N/A", | |
| ) | |
| # 3. If the tag is not linked to anyone | |
| return RFIDStatusResponse( | |
| status="unregistered", | |
| full_name=None, | |
| entity_type=None, | |
| clearance_status=None, | |
| ) |