| | """Authentication module""" |
| | import bcrypt |
| | from typing import Optional |
| | from fastapi import HTTPException, Security |
| | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| | from .config import config |
| |
|
| | security = HTTPBearer() |
| |
|
| | class AuthManager: |
| | """Authentication manager""" |
| |
|
| | @staticmethod |
| | def verify_api_key(api_key: str) -> bool: |
| | """Verify API key""" |
| | return api_key == config.api_key |
| |
|
| | @staticmethod |
| | def verify_admin(username: str, password: str) -> bool: |
| | """Verify admin credentials""" |
| | |
| | return username == config.admin_username and password == config.admin_password |
| |
|
| | @staticmethod |
| | def hash_password(password: str) -> str: |
| | """Hash password""" |
| | return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() |
| |
|
| | @staticmethod |
| | def verify_password(password: str, hashed: str) -> bool: |
| | """Verify password""" |
| | return bcrypt.checkpw(password.encode(), hashed.encode()) |
| |
|
| | async def verify_api_key_header(credentials: HTTPAuthorizationCredentials = Security(security)) -> str: |
| | """Verify API key from Authorization header""" |
| | api_key = credentials.credentials |
| | if not AuthManager.verify_api_key(api_key): |
| | raise HTTPException(status_code=401, detail="Invalid API key") |
| | return api_key |
| |
|