| import jwt |
| from hashlib import sha256 |
| from fastapi import HTTPException, status, Request, Response |
| from . import log_module, settings |
| from datetime import datetime, timedelta |
| from Crypto.Signature import pss |
| from Crypto.Hash import SHA256 |
| from Crypto.PublicKey import RSA |
|
|
| import base64 |
|
|
|
|
| for key in settings.USERS: |
| if key == "master": continue |
| password = key+settings.USERS[key]+settings.USERS["master"] |
| settings.USERS[key] = sha256(password.encode('UTF-8')).hexdigest() |
|
|
|
|
| def create_jwt_token(data, maxlife=settings.JWT_EXPIRATION_TIME_MINUTES_API): |
| expire = datetime.utcnow() + timedelta(minutes=maxlife) |
| to_encode = data | {"exp": expire} |
| encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) |
| return encoded_jwt |
|
|
| def validate_jwt_token(token: str, usage: str = "api"): |
| try: |
| payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) |
| return payload |
| except Exception as e: |
| log_module.logger().error(repr(e) + " - Invalid token: " + str(token)) |
| if usage == "view": |
| return raise_307("Failed validating jwt token") |
| return raise_401("Failed validating jwt token") |
|
|
| def token_from_cookie(request:Request): |
| if token := request.cookies.get('token', ""): |
| return validate_jwt_token(token, "view") |
| return raise_307("Token not found") |
|
|
| def token_from_headers(request:Request): |
| if (bearer := request.headers.get("Autorization", " ").split(" ",1)) and bearer[0] == "Bearer": |
| return validate_jwt_token(bearer[1]) |
| return raise_401("Bearer malformed") |
| |
|
|
| def can_use(role, activity): |
| can = { |
| "chat": ["user", "admin"] |
| } |
| return role in can[activity] |
|
|
| def raise_401(detail:str): |
| headers = {"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT", |
| "Location": "/login"} |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail=detail, |
| headers=headers |
| ) |
| |
| def raise_307(detail:str): |
| headers = { |
| "set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT", |
| |
| } |
| raise HTTPException( |
| status_code=status.HTTP_307_TEMPORARY_REDIRECT, |
| detail=detail, |
| headers=headers |
| ) |
|
|
| def validate_signature(public_key: str, signature: str, data:str) -> bool: |
| public_key = RSA.import_key(public_key) |
| signature = base64.b64decode(signature) |
| data_ = SHA256.new(data.encode()) |
| try: |
| pss.new(public_key).verify(data_, signature) |
| return True |
| except ValueError: |
| raise_401("Signature failed") |
|
|
| def sha256(data:str|bytes) -> str: |
| data_ = data if isinstance(data, bytes) else data.encode() |
| return SHA256.new(data_).hexdigest() |
|
|
| def set_cookie(response: Response, key: str, value: str, expire_time: dict = {"days":7}): |
| expires = datetime.now(tz=settings.TZ) + timedelta(**expire_time) |
| response.set_cookie( |
| key=key, |
| value=value, |
| |
| |
| expires=expires.strftime("%a, %d %b %Y %H:%M:%S %Z"), |
| |
|
|
| ) |