File size: 3,050 Bytes
edda8af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
import logging
from jose import jwt, JWTError
from passlib.context import CryptContext 
from fastapi import HTTPException

from src.core.settings import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
logger = logging.getLogger(__name__)

ALLOWED_EMAIL_DOMAIN = "@emu.edu.tr"


class AuthService:
    def __init__(self):
        self.secret_key = settings.secret_key
        self.algorithm = settings.algorithm
        self.access_token_expire_minutes = settings.access_token_expire_minutes

    @staticmethod
    def verify_password(plain_password: str, hashed_password: str) -> bool:
        return pwd_context.verify(plain_password, hashed_password)
    
    @staticmethod
    def get_password_hash(password: str) -> str:
        return pwd_context.hash(password)
    
    def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str:
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.now(timezone.utc) + expires_delta
        else:
            expire = datetime.now(timezone.utc) + timedelta(minutes=self.access_token_expire_minutes)
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
        return encoded_jwt
    
    def decode_access_token(self, token: str) -> Optional[dict]:
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            return payload
        except JWTError:
            return None

    @staticmethod
    def extract_email_from_token(decoded_token: Dict[str, Any]) -> Optional[str]:
        return (
            decoded_token.get('email') or 
            decoded_token.get('preferred_username') or 
            decoded_token.get('upn') or 
            decoded_token.get('unique_name')
        )

    @staticmethod
    def extract_user_info_from_token(decoded_token: Dict[str, Any]) -> Dict[str, Optional[str]]:
        return {
            "user_id": decoded_token.get('oid') or decoded_token.get('sub'),
            "display_name": decoded_token.get('name'),
            "first_name": decoded_token.get('given_name'),
            "last_name": decoded_token.get('family_name'),
        }

    @staticmethod
    def extract_user_info_from_sso_user_data(user_data) -> Dict[str, Optional[str]]:
        return {
            "email": user_data.email,
            "user_id": user_data.id,
            "display_name": user_data.display_name,
            "first_name": user_data.first_name,
            "last_name": user_data.last_name,
        }

    @staticmethod
    def validate_emu_email(email: str) -> None:
        if not email or not email.lower().endswith(ALLOWED_EMAIL_DOMAIN.lower()):
            logger.warning(f"Login attempt with non-EMU email: {email}")
            raise HTTPException(
                status_code=403, 
                detail="Only EMU students and staff can login"
            )