TalentaTalkBackend / app /services /auth_service.py
vithariumz's picture
Deploy: Initial Backend Release v1.0
52f5a2a
from datetime import datetime, timedelta
from typing import Any, Union
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
from app.core.exceptions import AuthenticationError, DuplicateError
from app.repositories.talent_repository import TalentRepository
from app.schemas.auth import LoginRequest, TalentCreate
from app.models.models import Manajemen
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthService:
def __init__(self, talent_repo: TalentRepository):
self.talent_repo = talent_repo
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(self, password: str) -> str:
return pwd_context.hash(password)
def create_access_token(self, subject: Union[str, Any], role: str, extra_data: dict = None) -> str:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {
"exp": expire,
"sub": str(subject),
"role": role,
"id": subject
}
if extra_data:
to_encode.update(extra_data)
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
async def authenticate_talent(self, login_data: LoginRequest) -> dict:
talent = await self.talent_repo.get_by_email(login_data.email)
if not talent:
raise AuthenticationError("Email tidak terdaftar")
if not self.verify_password(login_data.password, talent.password):
raise AuthenticationError("Password salah")
token = self.create_access_token(talent.idtalent, role="talent")
return {
"access_token": token,
"token_type": "bearer",
"role": "talent",
"user_id": str(talent.idtalent)
}
async def register_talent(self, data: TalentCreate):
existing = await self.talent_repo.get_by_email(data.email)
if existing:
raise DuplicateError("Email sudah terdaftar")
hashed_pw = self.get_password_hash(data.password)
new_talent_data = {
"nama": data.nama,
"email": data.email,
"password": hashed_pw,
"role": data.role
}
return await self.talent_repo.create(new_talent_data)
async def authenticate_admin(self, login_data: LoginRequest, db_session: AsyncSession) -> dict:
"""Autentikasi khusus untuk Manajemen/Admin"""
query = select(Manajemen).where(Manajemen.email == login_data.email)
result = await db_session.execute(query)
admin = result.scalar_one_or_none()
if not admin:
raise AuthenticationError("Admin email tidak terdaftar")
if not self.verify_password(login_data.password, admin.password):
raise AuthenticationError("Password admin salah")
token = self.create_access_token(admin.idmanajemen, role="manajemen")
return {
"access_token": token,
"token_type": "bearer",
"idUser": f"ADM{admin.idmanajemen:03d}",
"name": admin.namamanajemen,
"role": "admin",
"email": admin.email
}