File size: 3,268 Bytes
b3f1645 6f6eeb8 039afa2 e2e403d ea0a765 9a8d759 ea0a765 6f6eeb8 e2e403d 6f6eeb8 e2e403d 6f6eeb8 72092ce e2e403d 6f6eeb8 d70033b 6f6eeb8 e2e403d 4e433fa 6f6eeb8 a1888d0 d70033b 002d869 4e433fa d70033b e2e403d 002d869 d70033b e2e403d 002d869 d70033b 002d869 d70033b 002d869 d70033b 002d869 a24791e d70033b 002d869 d70033b ea0a765 002d869 656f5ab 72092ce 1f6fba1 039afa2 72092ce 039afa2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | import jwt
from hashlib import sha256
from fastapi import HTTPException, status, Request, Response
from . import log_module, settings
from datetime import datetime, timedelta
from Crypto.Signature import pss
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
import base64
for key in settings.USERS:
if key == "master": continue
password = key+settings.USERS[key]+settings.USERS["master"]
settings.USERS[key] = sha256(password.encode('UTF-8')).hexdigest()
def create_jwt_token(data, maxlife=settings.JWT_EXPIRATION_TIME_MINUTES_API):
expire = datetime.utcnow() + timedelta(minutes=maxlife)
to_encode = data | {"exp": expire}
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
return encoded_jwt
def validate_jwt_token(token: str, usage: str = "api"):
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
return payload
except Exception as e:
log_module.logger().error(repr(e) + " - Invalid token: " + str(token))
if usage == "view":
return raise_307("Failed validating jwt token")
return raise_401("Failed validating jwt token")
def token_from_cookie(request:Request):
if token := request.cookies.get('token', ""):
return validate_jwt_token(token, "view")
return raise_307("Token not found")
def token_from_headers(request:Request):
if (bearer := request.headers.get("Autorization", " ").split(" ",1)) and bearer[0] == "Bearer":
return validate_jwt_token(bearer[1])
return raise_401("Bearer malformed")
def can_use(role, activity):
can = {
"chat": ["user", "admin"]
}
return role in can[activity]
def raise_401(detail:str):
headers = {"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT",
"Location": "/login"}
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
headers=headers
)
def raise_307(detail:str):
headers = {
"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT",
#"Location": "/login"
}
raise HTTPException(
status_code=status.HTTP_307_TEMPORARY_REDIRECT,
detail=detail,
headers=headers
)
def validate_signature(public_key: str, signature: str, data:str) -> bool:
public_key = RSA.import_key(public_key)
signature = base64.b64decode(signature)
data_ = SHA256.new(data.encode())
try:
pss.new(public_key).verify(data_, signature)
return True
except ValueError:
raise_401("Signature failed")
def sha256(data:str|bytes) -> str:
data_ = data if isinstance(data, bytes) else data.encode()
return SHA256.new(data_).hexdigest()
def set_cookie(response: Response, key: str, value: str, expire_time: dict = {"days":7}):
expires = datetime.now(tz=settings.TZ) + timedelta(**expire_time)
response.set_cookie(
key=key,
value=value,
# httponly=True,
# samesite='none',
expires=expires.strftime("%a, %d %b %Y %H:%M:%S %Z"),
#domain='.<YOUR DOMAIN>'
) |