File size: 4,508 Bytes
bef5e76
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08df5ae
bef5e76
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08df5ae
 
 
bef5e76
 
 
 
 
 
 
 
08df5ae
bef5e76
 
 
 
 
08df5ae
bef5e76
 
 
08df5ae
bef5e76
08df5ae
 
 
 
 
bef5e76
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import re
import sys
import bcrypt
import dotenv
from src.config.env_constant import EnvFilepath
dotenv.load_dotenv(EnvFilepath.ENVPATH) 
# from passlib.hash import bcrypt
from typing import Dict, Any

from src.models.security import ValidatePassword
# from app.utils.decorator import trace_runtime

# @trace_runtime
def validate_password(password: str) -> ValidatePassword:
    """
    Validates a password based on specified constraints.
    - Minimum 8 characters.
    - At least one uppercase letter.
    - At least one digit.
    - At least one special character.
    """
    # Regex untuk mengecek semua kriteria sekaligus
    # ^                 # Start of string
    # (?=.*[A-Z])       # At least one uppercase letter
    # (?=.*\d)          # At least one digit
    # (?=.*[!@#$%^&*()_+\-=\[\]{};':"\\|,.<>/?]) # At least one special character
    # .{8,}             # Minimum 8 characters
    # $                 # End of string
    regex = r"^(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>/?]).{8,}$"
    
    if not re.match(regex, password):
        # return False
        response = ValidatePassword(
            status=400,
            data=False,
            error="Password must be at least 8 characters long and contain at least one uppercase letter, one digit, and one special character."
        )
    else:
        # return True
        response = ValidatePassword(
            status=200,
            data=True,
            error=None
        )
    return response.model_dump()

# @trace_runtime
def hash_password(password: str) -> str:
    """
    Menggunakan bcrypt untuk melakukan hashing pada password.
    
    Args:
        password (str): Password dalam bentuk teks biasa.
    
    Returns:
        str: Password yang sudah di-hash dan di-salt dengan aman.
    """
    salt = bytes(os.environ.get('emarcal__bcrypt__salt'), encoding='utf-8')
    bpassword = bytes(password, encoding='utf-8')
    hashed_password = bcrypt.hashpw(bpassword, salt=salt)
    hashed_password = hashed_password.decode('utf-8') # convert byte to str
    return hashed_password

# @trace_runtime
def verify_password(password: str, hashed_password: str) -> bool:
    """
    Memverifikasi password teks biasa terhadap password yang sudah di-hash.
    
    Args:
        password (str): Password dalam bentuk teks biasa yang ingin diverifikasi.
        hashed_password (str): Password yang sudah di-hash yang diambil dari database.
    
    Returns:
        bool: True jika password cocok, False sebaliknya.
    """
    # bpassword = bytes(password, encoding='utf-8')
    # print(f"Password to verify: {password}")

    # bhashed_password = bytes(hashed_password, encoding='utf-8')
    # print(f"Hashed password: {hashed_password}")

    try:
        if hash_password(password)==hashed_password:
            return True
        else:
            return False
        # return bcrypt.checkpw(bpassword, bhashed_password)
    except ValueError:
        # Menangani kasus di mana format hashed_password tidak valid
        return False


import jwt
import uuid
from datetime import datetime
from jwt.exceptions import ExpiredSignatureError, DecodeError
from sqlalchemy import select
from src.db.postgres.connection import AsyncSessionLocal
from src.db.postgres.models import User


# @trace_runtime
def encode_jwt(input:Dict) -> str:
    safe_payload = {
        k: (str(v) if isinstance(v, (uuid.UUID, datetime)) else v)
        for k, v in input.items()
    }
    encoded_jwt = jwt.encode(safe_payload, os.environ.get("emarcal__jwt__secret_key"), algorithm=os.environ.get("emarcal__jwt__algorithm"))
    return encoded_jwt


# @trace_runtime
def decode_jwt(encoded_input:str) -> Any:
    decoded_payload = jwt.decode(encoded_input, os.environ.get("emarcal__jwt__secret_key"), algorithms=[os.environ.get("emarcal__jwt__algorithm")])
    return decoded_payload


async def get_user(email: str) -> dict | None:
    try:
        async with AsyncSessionLocal() as session:
            result = await session.execute(select(User).where(User.email == email))
            user = result.scalars().first()
            if user:
                return {c.key: getattr(user, c.key) for c in user.__mapper__.column_attrs}
            return None
    except Exception as E:
        print(f"❌ get user error, {E}")
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        print(exc_type, fname, exc_tb.tb_lineno)
        raise