File size: 3,268 Bytes
9c5c050
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import jwt
from hashlib import sha256
from fastapi import HTTPException, status, Request, Response
from . import log_module, settings
from datetime import datetime, timedelta    
from Crypto.Signature import pss
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA

import base64


for key in settings.USERS:
    if key == "master": continue
    password = key+settings.USERS[key]+settings.USERS["master"]
    settings.USERS[key] = sha256(password.encode('UTF-8')).hexdigest()


def create_jwt_token(data, maxlife=settings.JWT_EXPIRATION_TIME_MINUTES_API):
    expire = datetime.utcnow() + timedelta(minutes=maxlife)    
    to_encode = data | {"exp": expire}
    encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
    return encoded_jwt

def validate_jwt_token(token: str, usage: str = "api"):
    try:
        payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
        return payload
    except Exception as e: 
        log_module.logger().error(repr(e) + " - Invalid token: " + str(token))
        if usage == "view":
            return raise_307("Failed validating jwt token")
        return raise_401("Failed validating jwt token")  

def token_from_cookie(request:Request):
    if token := request.cookies.get('token', ""):
        return validate_jwt_token(token, "view")
    return raise_307("Token not found")

def token_from_headers(request:Request):
    if (bearer := request.headers.get("Autorization", " ").split(" ",1)) and bearer[0] == "Bearer":
        return validate_jwt_token(bearer[1])
    return raise_401("Bearer malformed")
    

def can_use(role, activity):
    can = {
        "chat": ["user", "admin"]
    }
    return role in can[activity]

def raise_401(detail:str):
    headers = {"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT",
               "Location": "/login"}
    raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=detail,
            headers=headers
        )  
    
def raise_307(detail:str):
    headers = {
        "set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT",
               #"Location": "/login"
               }
    raise HTTPException(
            status_code=status.HTTP_307_TEMPORARY_REDIRECT,
            detail=detail,
            headers=headers
        )  

def validate_signature(public_key: str, signature: str, data:str) -> bool:
    public_key = RSA.import_key(public_key)
    signature = base64.b64decode(signature)
    data_ = SHA256.new(data.encode())
    try:
        pss.new(public_key).verify(data_, signature)
        return True
    except ValueError:
        raise_401("Signature failed")

def sha256(data:str|bytes) -> str:
    data_ = data if isinstance(data, bytes) else data.encode()
    return SHA256.new(data_).hexdigest()

def set_cookie(response: Response, key: str, value: str, expire_time: dict = {"days":7}):
    expires = datetime.now(tz=settings.TZ) + timedelta(**expire_time)
    response.set_cookie(
        key=key, 
        value=value, 
        # httponly=True, 
        # samesite='none', 
        expires=expires.strftime("%a, %d %b %Y %H:%M:%S %Z"), 
        #domain='.<YOUR DOMAIN>'

        )