File size: 4,508 Bytes
bef5e76 08df5ae bef5e76 08df5ae bef5e76 08df5ae bef5e76 08df5ae bef5e76 08df5ae bef5e76 08df5ae bef5e76 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import os
import re
import sys
import bcrypt
import dotenv
from src.config.env_constant import EnvFilepath
dotenv.load_dotenv(EnvFilepath.ENVPATH)
# from passlib.hash import bcrypt
from typing import Dict, Any
from src.models.security import ValidatePassword
# from app.utils.decorator import trace_runtime
# @trace_runtime
def validate_password(password: str) -> ValidatePassword:
"""
Validates a password based on specified constraints.
- Minimum 8 characters.
- At least one uppercase letter.
- At least one digit.
- At least one special character.
"""
# Regex untuk mengecek semua kriteria sekaligus
# ^ # Start of string
# (?=.*[A-Z]) # At least one uppercase letter
# (?=.*\d) # At least one digit
# (?=.*[!@#$%^&*()_+\-=\[\]{};':"\\|,.<>/?]) # At least one special character
# .{8,} # Minimum 8 characters
# $ # End of string
regex = r"^(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>/?]).{8,}$"
if not re.match(regex, password):
# return False
response = ValidatePassword(
status=400,
data=False,
error="Password must be at least 8 characters long and contain at least one uppercase letter, one digit, and one special character."
)
else:
# return True
response = ValidatePassword(
status=200,
data=True,
error=None
)
return response.model_dump()
# @trace_runtime
def hash_password(password: str) -> str:
"""
Menggunakan bcrypt untuk melakukan hashing pada password.
Args:
password (str): Password dalam bentuk teks biasa.
Returns:
str: Password yang sudah di-hash dan di-salt dengan aman.
"""
salt = bytes(os.environ.get('emarcal__bcrypt__salt'), encoding='utf-8')
bpassword = bytes(password, encoding='utf-8')
hashed_password = bcrypt.hashpw(bpassword, salt=salt)
hashed_password = hashed_password.decode('utf-8') # convert byte to str
return hashed_password
# @trace_runtime
def verify_password(password: str, hashed_password: str) -> bool:
"""
Memverifikasi password teks biasa terhadap password yang sudah di-hash.
Args:
password (str): Password dalam bentuk teks biasa yang ingin diverifikasi.
hashed_password (str): Password yang sudah di-hash yang diambil dari database.
Returns:
bool: True jika password cocok, False sebaliknya.
"""
# bpassword = bytes(password, encoding='utf-8')
# print(f"Password to verify: {password}")
# bhashed_password = bytes(hashed_password, encoding='utf-8')
# print(f"Hashed password: {hashed_password}")
try:
if hash_password(password)==hashed_password:
return True
else:
return False
# return bcrypt.checkpw(bpassword, bhashed_password)
except ValueError:
# Menangani kasus di mana format hashed_password tidak valid
return False
import jwt
import uuid
from datetime import datetime
from jwt.exceptions import ExpiredSignatureError, DecodeError
from sqlalchemy import select
from src.db.postgres.connection import AsyncSessionLocal
from src.db.postgres.models import User
# @trace_runtime
def encode_jwt(input:Dict) -> str:
safe_payload = {
k: (str(v) if isinstance(v, (uuid.UUID, datetime)) else v)
for k, v in input.items()
}
encoded_jwt = jwt.encode(safe_payload, os.environ.get("emarcal__jwt__secret_key"), algorithm=os.environ.get("emarcal__jwt__algorithm"))
return encoded_jwt
# @trace_runtime
def decode_jwt(encoded_input:str) -> Any:
decoded_payload = jwt.decode(encoded_input, os.environ.get("emarcal__jwt__secret_key"), algorithms=[os.environ.get("emarcal__jwt__algorithm")])
return decoded_payload
async def get_user(email: str) -> dict | None:
try:
async with AsyncSessionLocal() as session:
result = await session.execute(select(User).where(User.email == email))
user = result.scalars().first()
if user:
return {c.key: getattr(user, c.key) for c in user.__mapper__.column_attrs}
return None
except Exception as E:
print(f"❌ get user error, {E}")
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
raise
|