emu-rag / src /api /services /auth_service.py
Sarp Bilgiç
hf initialized
edda8af
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
import logging
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import HTTPException
from src.core.settings import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
logger = logging.getLogger(__name__)
ALLOWED_EMAIL_DOMAIN = "@emu.edu.tr"
class AuthService:
def __init__(self):
self.secret_key = settings.secret_key
self.algorithm = settings.algorithm
self.access_token_expire_minutes = settings.access_token_expire_minutes
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=self.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
return encoded_jwt
def decode_access_token(self, token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except JWTError:
return None
@staticmethod
def extract_email_from_token(decoded_token: Dict[str, Any]) -> Optional[str]:
return (
decoded_token.get('email') or
decoded_token.get('preferred_username') or
decoded_token.get('upn') or
decoded_token.get('unique_name')
)
@staticmethod
def extract_user_info_from_token(decoded_token: Dict[str, Any]) -> Dict[str, Optional[str]]:
return {
"user_id": decoded_token.get('oid') or decoded_token.get('sub'),
"display_name": decoded_token.get('name'),
"first_name": decoded_token.get('given_name'),
"last_name": decoded_token.get('family_name'),
}
@staticmethod
def extract_user_info_from_sso_user_data(user_data) -> Dict[str, Optional[str]]:
return {
"email": user_data.email,
"user_id": user_data.id,
"display_name": user_data.display_name,
"first_name": user_data.first_name,
"last_name": user_data.last_name,
}
@staticmethod
def validate_emu_email(email: str) -> None:
if not email or not email.lower().endswith(ALLOWED_EMAIL_DOMAIN.lower()):
logger.warning(f"Login attempt with non-EMU email: {email}")
raise HTTPException(
status_code=403,
detail="Only EMU students and staff can login"
)