Spaces:
Runtime error
Runtime error
| import jwt | |
| from hashlib import sha256 | |
| from fastapi import HTTPException, status, Request, Response | |
| from . import log_module, settings | |
| from datetime import datetime, timedelta | |
| from Crypto.Signature import pss | |
| from Crypto.Hash import SHA256 | |
| from Crypto.PublicKey import RSA | |
| import base64 | |
| for key in settings.USERS: | |
| if key == "master": continue | |
| password = key+settings.USERS[key]+settings.USERS["master"] | |
| settings.USERS[key] = sha256(password.encode('UTF-8')).hexdigest() | |
| def create_jwt_token(data, maxlife=settings.JWT_EXPIRATION_TIME_MINUTES_API): | |
| expire = datetime.utcnow() + timedelta(minutes=maxlife) | |
| to_encode = data | {"exp": expire} | |
| encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) | |
| return encoded_jwt | |
| def validate_jwt_token(token: str, usage: str = "api"): | |
| try: | |
| payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) | |
| return payload | |
| except Exception as e: | |
| log_module.logger().error(repr(e) + " - Invalid token: " + str(token)) | |
| if usage == "view": | |
| return raise_307("Failed validating jwt token") | |
| return raise_401("Failed validating jwt token") | |
| def token_from_cookie(request:Request): | |
| if token := request.cookies.get('token', ""): | |
| return validate_jwt_token(token, "view") | |
| return raise_307("Token not found") | |
| def token_from_headers(request:Request): | |
| if (bearer := request.headers.get("Autorization", " ").split(" ",1)) and bearer[0] == "Bearer": | |
| return validate_jwt_token(bearer[1]) | |
| return raise_401("Bearer malformed") | |
| def can_use(role, activity): | |
| can = { | |
| "chat": ["user", "admin"] | |
| } | |
| return role in can[activity] | |
| def raise_401(detail:str): | |
| headers = {"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT", | |
| "Location": "/login"} | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=detail, | |
| headers=headers | |
| ) | |
| def raise_307(detail:str): | |
| headers = { | |
| "set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT", | |
| #"Location": "/login" | |
| } | |
| raise HTTPException( | |
| status_code=status.HTTP_307_TEMPORARY_REDIRECT, | |
| detail=detail, | |
| headers=headers | |
| ) | |
| def validate_signature(public_key: str, signature: str, data:str) -> bool: | |
| public_key = RSA.import_key(public_key) | |
| signature = base64.b64decode(signature) | |
| data_ = SHA256.new(data.encode()) | |
| try: | |
| pss.new(public_key).verify(data_, signature) | |
| return True | |
| except ValueError: | |
| raise_401("Signature failed") | |
| def sha256(data:str|bytes) -> str: | |
| data_ = data if isinstance(data, bytes) else data.encode() | |
| return SHA256.new(data_).hexdigest() | |
| def set_cookie(response: Response, key: str, value: str, expire_time: dict = {"days":7}): | |
| expires = datetime.now(tz=settings.TZ) + timedelta(**expire_time) | |
| response.set_cookie( | |
| key=key, | |
| value=value, | |
| # httponly=True, | |
| # samesite='none', | |
| expires=expires.strftime("%a, %d %b %Y %H:%M:%S %Z"), | |
| #domain='.<YOUR DOMAIN>' | |
| ) |