|
|
"""Authentication module""" |
|
|
import bcrypt |
|
|
from typing import Optional |
|
|
from fastapi import HTTPException, Security |
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
|
from .config import config |
|
|
|
|
|
security = HTTPBearer() |
|
|
|
|
|
class AuthManager: |
|
|
"""Authentication manager""" |
|
|
|
|
|
@staticmethod |
|
|
def verify_api_key(api_key: str) -> bool: |
|
|
"""Verify API key""" |
|
|
return api_key == config.api_key |
|
|
|
|
|
@staticmethod |
|
|
def verify_admin(username: str, password: str) -> bool: |
|
|
"""Verify admin credentials""" |
|
|
print(config.admin_username+","+config.admin_password) |
|
|
|
|
|
return username == config.admin_username and password == config.admin_password |
|
|
|
|
|
@staticmethod |
|
|
def hash_password(password: str) -> str: |
|
|
"""Hash password""" |
|
|
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() |
|
|
|
|
|
@staticmethod |
|
|
def verify_password(password: str, hashed: str) -> bool: |
|
|
"""Verify password""" |
|
|
return bcrypt.checkpw(password.encode(), hashed.encode()) |
|
|
|
|
|
async def verify_api_key_header(credentials: HTTPAuthorizationCredentials = Security(security)) -> str: |
|
|
"""Verify API key from Authorization header""" |
|
|
api_key = credentials.credentials |
|
|
if not AuthManager.verify_api_key(api_key): |
|
|
raise HTTPException(status_code=401, detail="Invalid API key") |
|
|
return api_key |
|
|
|