import os import re import sys import bcrypt import dotenv from src.config.env_constant import EnvFilepath dotenv.load_dotenv(EnvFilepath.ENVPATH) # from passlib.hash import bcrypt from typing import Dict, Any from src.models.security import ValidatePassword # from app.utils.decorator import trace_runtime # @trace_runtime def validate_password(password: str) -> ValidatePassword: """ Validates a password based on specified constraints. - Minimum 8 characters. - At least one uppercase letter. - At least one digit. - At least one special character. """ # Regex untuk mengecek semua kriteria sekaligus # ^ # Start of string # (?=.*[A-Z]) # At least one uppercase letter # (?=.*\d) # At least one digit # (?=.*[!@#$%^&*()_+\-=\[\]{};':"\\|,.<>/?]) # At least one special character # .{8,} # Minimum 8 characters # $ # End of string regex = r"^(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>/?]).{8,}$" if not re.match(regex, password): # return False response = ValidatePassword( status=400, data=False, error="Password must be at least 8 characters long and contain at least one uppercase letter, one digit, and one special character." ) else: # return True response = ValidatePassword( status=200, data=True, error=None ) return response.model_dump() # @trace_runtime def hash_password(password: str) -> str: """ Menggunakan bcrypt untuk melakukan hashing pada password. Args: password (str): Password dalam bentuk teks biasa. Returns: str: Password yang sudah di-hash dan di-salt dengan aman. """ salt = bytes(os.environ.get('emarcal__bcrypt__salt'), encoding='utf-8') bpassword = bytes(password, encoding='utf-8') hashed_password = bcrypt.hashpw(bpassword, salt=salt) hashed_password = hashed_password.decode('utf-8') # convert byte to str return hashed_password # @trace_runtime def verify_password(password: str, hashed_password: str) -> bool: """ Memverifikasi password teks biasa terhadap password yang sudah di-hash. Args: password (str): Password dalam bentuk teks biasa yang ingin diverifikasi. hashed_password (str): Password yang sudah di-hash yang diambil dari database. Returns: bool: True jika password cocok, False sebaliknya. """ # bpassword = bytes(password, encoding='utf-8') # print(f"Password to verify: {password}") # bhashed_password = bytes(hashed_password, encoding='utf-8') # print(f"Hashed password: {hashed_password}") try: if hash_password(password)==hashed_password: return True else: return False # return bcrypt.checkpw(bpassword, bhashed_password) except ValueError: # Menangani kasus di mana format hashed_password tidak valid return False import jwt import uuid from datetime import datetime from jwt.exceptions import ExpiredSignatureError, DecodeError from sqlalchemy import select from src.db.postgres.connection import AsyncSessionLocal from src.db.postgres.models import User # @trace_runtime def encode_jwt(input:Dict) -> str: safe_payload = { k: (str(v) if isinstance(v, (uuid.UUID, datetime)) else v) for k, v in input.items() } encoded_jwt = jwt.encode(safe_payload, os.environ.get("emarcal__jwt__secret_key"), algorithm=os.environ.get("emarcal__jwt__algorithm")) return encoded_jwt # @trace_runtime def decode_jwt(encoded_input:str) -> Any: decoded_payload = jwt.decode(encoded_input, os.environ.get("emarcal__jwt__secret_key"), algorithms=[os.environ.get("emarcal__jwt__algorithm")]) return decoded_payload async def get_user(email: str) -> dict | None: try: async with AsyncSessionLocal() as session: result = await session.execute(select(User).where(User.email == email)) user = result.scalars().first() if user: return {c.key: getattr(user, c.key) for c in user.__mapper__.column_attrs} return None except Exception as E: print(f"❌ get user error, {E}") exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(exc_type, fname, exc_tb.tb_lineno) raise