File size: 3,268 Bytes
b3f1645
6f6eeb8
039afa2
e2e403d
 
ea0a765
 
 
9a8d759
ea0a765
6f6eeb8
e2e403d
 
6f6eeb8
e2e403d
 
 
6f6eeb8
72092ce
 
 
e2e403d
6f6eeb8
 
d70033b
6f6eeb8
e2e403d
4e433fa
6f6eeb8
a1888d0
d70033b
002d869
 
4e433fa
 
d70033b
e2e403d
002d869
d70033b
 
 
e2e403d
002d869
d70033b
 
 
 
 
 
 
 
002d869
d70033b
 
 
 
002d869
d70033b
 
 
002d869
a24791e
 
 
 
d70033b
 
002d869
d70033b
ea0a765
 
 
 
 
 
 
 
 
 
002d869
656f5ab
72092ce
 
 
 
1f6fba1
039afa2
 
 
 
 
 
 
 
72092ce
039afa2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import jwt
from hashlib import sha256
from fastapi import HTTPException, status, Request, Response
from . import log_module, settings
from datetime import datetime, timedelta    
from Crypto.Signature import pss
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA

import base64


for key in settings.USERS:
    if key == "master": continue
    password = key+settings.USERS[key]+settings.USERS["master"]
    settings.USERS[key] = sha256(password.encode('UTF-8')).hexdigest()


def create_jwt_token(data, maxlife=settings.JWT_EXPIRATION_TIME_MINUTES_API):
    expire = datetime.utcnow() + timedelta(minutes=maxlife)    
    to_encode = data | {"exp": expire}
    encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
    return encoded_jwt

def validate_jwt_token(token: str, usage: str = "api"):
    try:
        payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
        return payload
    except Exception as e: 
        log_module.logger().error(repr(e) + " - Invalid token: " + str(token))
        if usage == "view":
            return raise_307("Failed validating jwt token")
        return raise_401("Failed validating jwt token")  

def token_from_cookie(request:Request):
    if token := request.cookies.get('token', ""):
        return validate_jwt_token(token, "view")
    return raise_307("Token not found")

def token_from_headers(request:Request):
    if (bearer := request.headers.get("Autorization", " ").split(" ",1)) and bearer[0] == "Bearer":
        return validate_jwt_token(bearer[1])
    return raise_401("Bearer malformed")
    

def can_use(role, activity):
    can = {
        "chat": ["user", "admin"]
    }
    return role in can[activity]

def raise_401(detail:str):
    headers = {"set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT",
               "Location": "/login"}
    raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=detail,
            headers=headers
        )  
    
def raise_307(detail:str):
    headers = {
        "set-cookie": "token=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT",
               #"Location": "/login"
               }
    raise HTTPException(
            status_code=status.HTTP_307_TEMPORARY_REDIRECT,
            detail=detail,
            headers=headers
        )  

def validate_signature(public_key: str, signature: str, data:str) -> bool:
    public_key = RSA.import_key(public_key)
    signature = base64.b64decode(signature)
    data_ = SHA256.new(data.encode())
    try:
        pss.new(public_key).verify(data_, signature)
        return True
    except ValueError:
        raise_401("Signature failed")

def sha256(data:str|bytes) -> str:
    data_ = data if isinstance(data, bytes) else data.encode()
    return SHA256.new(data_).hexdigest()

def set_cookie(response: Response, key: str, value: str, expire_time: dict = {"days":7}):
    expires = datetime.now(tz=settings.TZ) + timedelta(**expire_time)
    response.set_cookie(
        key=key, 
        value=value, 
        # httponly=True, 
        # samesite='none', 
        expires=expires.strftime("%a, %d %b %Y %H:%M:%S %Z"), 
        #domain='.<YOUR DOMAIN>'

        )