ishaq101's picture
[NOTICKET] Demo agentic agent
bef5e76
import os
import re
import sys
import bcrypt
import dotenv
from src.config.env_constant import EnvFilepath
dotenv.load_dotenv(EnvFilepath.ENVPATH)
# from passlib.hash import bcrypt
from typing import Dict, Any
from src.models.security import ValidatePassword
# from app.utils.decorator import trace_runtime
# @trace_runtime
def validate_password(password: str) -> ValidatePassword:
"""
Validates a password based on specified constraints.
- Minimum 8 characters.
- At least one uppercase letter.
- At least one digit.
- At least one special character.
"""
# Regex untuk mengecek semua kriteria sekaligus
# ^ # Start of string
# (?=.*[A-Z]) # At least one uppercase letter
# (?=.*\d) # At least one digit
# (?=.*[!@#$%^&*()_+\-=\[\]{};':"\\|,.<>/?]) # At least one special character
# .{8,} # Minimum 8 characters
# $ # End of string
regex = r"^(?=.*[A-Z])(?=.*\d)(?=.*[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>/?]).{8,}$"
if not re.match(regex, password):
# return False
response = ValidatePassword(
status=400,
data=False,
error="Password must be at least 8 characters long and contain at least one uppercase letter, one digit, and one special character."
)
else:
# return True
response = ValidatePassword(
status=200,
data=True,
error=None
)
return response.model_dump()
# @trace_runtime
def hash_password(password: str) -> str:
"""
Menggunakan bcrypt untuk melakukan hashing pada password.
Args:
password (str): Password dalam bentuk teks biasa.
Returns:
str: Password yang sudah di-hash dan di-salt dengan aman.
"""
salt = bytes(os.environ.get('emarcal--bcrypt--salt'), encoding='utf-8')
bpassword = bytes(password, encoding='utf-8')
hashed_password = bcrypt.hashpw(bpassword, salt=salt)
hashed_password = hashed_password.decode('utf-8') # convert byte to str
return hashed_password
# @trace_runtime
def verify_password(password: str, hashed_password: str) -> bool:
"""
Memverifikasi password teks biasa terhadap password yang sudah di-hash.
Args:
password (str): Password dalam bentuk teks biasa yang ingin diverifikasi.
hashed_password (str): Password yang sudah di-hash yang diambil dari database.
Returns:
bool: True jika password cocok, False sebaliknya.
"""
# bpassword = bytes(password, encoding='utf-8')
# print(f"Password to verify: {password}")
# bhashed_password = bytes(hashed_password, encoding='utf-8')
# print(f"Hashed password: {hashed_password}")
try:
if hash_password(password)==hashed_password:
return True
else:
return False
# return bcrypt.checkpw(bpassword, bhashed_password)
except ValueError:
# Menangani kasus di mana format hashed_password tidak valid
return False
import jwt
import uuid
from datetime import datetime
from jwt.exceptions import ExpiredSignatureError, DecodeError
from src.models.user_info import UserCreate
from pymongo import MongoClient
async def get_mongo_conn(collection_name:str = 'users'):
mongo_client = MongoClient(
host=os.environ.get('emarcal--mongo--endpoint--url'),
# serverSelectionTimeoutMS=5000,
# connectTimeoutMS=5000,
# socketTimeoutMS=5000,
)
db = mongo_client[os.environ.get('emarcal--buma--mongo--dbname')]
users_collection = db[collection_name]
return users_collection
# @trace_runtime
def encode_jwt(input:Dict) -> str:
safe_payload = {
k: (str(v) if isinstance(v, (uuid.UUID, datetime)) else v)
for k, v in input.items()
}
encoded_jwt = jwt.encode(safe_payload, os.environ.get("emarcal--jwt--secret-key"), algorithm=os.environ.get("emarcal--jwt--algorithm"))
return encoded_jwt
# @trace_runtime
def decode_jwt(encoded_input:str) -> Any:
decoded_payload = jwt.decode(encoded_input, os.environ.get("emarcal--jwt--secret-key"), algorithms=[os.environ.get("emarcal--jwt--algorithm")])
return decoded_payload
async def get_user(email: str) -> dict:
try:
users_collection = await get_mongo_conn(collection_name="users")
user_profile:dict = users_collection.find_one({"email":email})
if user_profile:
return user_profile
else:
return None
except Exception as E:
print(f"❌ get user error, {E}")
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
raise
# input = UserCreate(
# fullname="Harry",
# email="harryyanto.ia@bukittechnology.com",
# password="#$ema.harry#$",
# company="BUMA ID",
# company_size="5000+",
# function="MANAGEMENT",
# site="HO",
# role="admin"
# )
# xxx= input.model_dump()
# encoded_jwt = encode_jwt(xxx)
# decoded_jwt = decode_jwt(encoded_jwt)