| import os |
| import re |
| import sys |
| import bcrypt |
| import dotenv |
| from src.config.env_constant import EnvFilepath |
| dotenv.load_dotenv(EnvFilepath.ENVPATH) |
| |
| from typing import Dict, Any |
|
|
| from src.models.security import ValidatePassword |
| |
|
|
| |
| def validate_password(password: str) -> ValidatePassword: |
| """ |
| Validates a password based on specified constraints. |
| - Minimum 8 characters. |
| - At least one uppercase letter. |
| - At least one digit. |
| - At least one special character. |
| """ |
| |
| |
| |
| |
| |
| |
| |
| regex = r"^(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>/?]).{8,}$" |
| |
| if not re.match(regex, password): |
| |
| response = ValidatePassword( |
| status=400, |
| data=False, |
| error="Password must be at least 8 characters long and contain at least one uppercase letter, one digit, and one special character." |
| ) |
| else: |
| |
| response = ValidatePassword( |
| status=200, |
| data=True, |
| error=None |
| ) |
| return response.model_dump() |
|
|
| |
| def hash_password(password: str) -> str: |
| """ |
| Menggunakan bcrypt untuk melakukan hashing pada password. |
| |
| Args: |
| password (str): Password dalam bentuk teks biasa. |
| |
| Returns: |
| str: Password yang sudah di-hash dan di-salt dengan aman. |
| """ |
| salt = bytes(os.environ.get('emarcal--bcrypt--salt'), encoding='utf-8') |
| bpassword = bytes(password, encoding='utf-8') |
| hashed_password = bcrypt.hashpw(bpassword, salt=salt) |
| hashed_password = hashed_password.decode('utf-8') |
| return hashed_password |
|
|
| |
| def verify_password(password: str, hashed_password: str) -> bool: |
| """ |
| Memverifikasi password teks biasa terhadap password yang sudah di-hash. |
| |
| Args: |
| password (str): Password dalam bentuk teks biasa yang ingin diverifikasi. |
| hashed_password (str): Password yang sudah di-hash yang diambil dari database. |
| |
| Returns: |
| bool: True jika password cocok, False sebaliknya. |
| """ |
| |
| |
|
|
| |
| |
|
|
| try: |
| if hash_password(password)==hashed_password: |
| return True |
| else: |
| return False |
| |
| except ValueError: |
| |
| return False |
|
|
|
|
| import jwt |
| import uuid |
| from datetime import datetime |
| from jwt.exceptions import ExpiredSignatureError, DecodeError |
| from src.models.user_info import UserCreate |
| from pymongo import MongoClient |
|
|
|
|
|
|
| async def get_mongo_conn(collection_name:str = 'users'): |
| mongo_client = MongoClient( |
| host=os.environ.get('emarcal--mongo--endpoint--url'), |
| |
| |
| |
| ) |
| db = mongo_client[os.environ.get('emarcal--buma--mongo--dbname')] |
| users_collection = db[collection_name] |
| return users_collection |
|
|
|
|
| |
| def encode_jwt(input:Dict) -> str: |
| safe_payload = { |
| k: (str(v) if isinstance(v, (uuid.UUID, datetime)) else v) |
| for k, v in input.items() |
| } |
| encoded_jwt = jwt.encode(safe_payload, os.environ.get("emarcal--jwt--secret-key"), algorithm=os.environ.get("emarcal--jwt--algorithm")) |
| return encoded_jwt |
|
|
|
|
| |
| def decode_jwt(encoded_input:str) -> Any: |
| decoded_payload = jwt.decode(encoded_input, os.environ.get("emarcal--jwt--secret-key"), algorithms=[os.environ.get("emarcal--jwt--algorithm")]) |
| return decoded_payload |
|
|
|
|
| async def get_user(email: str) -> dict: |
| try: |
| users_collection = await get_mongo_conn(collection_name="users") |
| user_profile:dict = users_collection.find_one({"email":email}) |
| if user_profile: |
| return user_profile |
| else: |
| return None |
| except Exception as E: |
| print(f"❌ get user error, {E}") |
| exc_type, exc_obj, exc_tb = sys.exc_info() |
| fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] |
| print(exc_type, fname, exc_tb.tb_lineno) |
| raise |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |