Spaces:
Build error
Build error
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional, Dict, Any | |
| import logging | |
| from jose import jwt, JWTError | |
| from passlib.context import CryptContext | |
| from fastapi import HTTPException | |
| from src.core.settings import settings | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| logger = logging.getLogger(__name__) | |
| ALLOWED_EMAIL_DOMAIN = "@emu.edu.tr" | |
| class AuthService: | |
| def __init__(self): | |
| self.secret_key = settings.secret_key | |
| self.algorithm = settings.algorithm | |
| self.access_token_expire_minutes = settings.access_token_expire_minutes | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| return pwd_context.verify(plain_password, hashed_password) | |
| def get_password_hash(password: str) -> str: | |
| return pwd_context.hash(password) | |
| def create_access_token(self, data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.now(timezone.utc) + expires_delta | |
| else: | |
| expire = datetime.now(timezone.utc) + timedelta(minutes=self.access_token_expire_minutes) | |
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) | |
| return encoded_jwt | |
| def decode_access_token(self, token: str) -> Optional[dict]: | |
| try: | |
| payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) | |
| return payload | |
| except JWTError: | |
| return None | |
| def extract_email_from_token(decoded_token: Dict[str, Any]) -> Optional[str]: | |
| return ( | |
| decoded_token.get('email') or | |
| decoded_token.get('preferred_username') or | |
| decoded_token.get('upn') or | |
| decoded_token.get('unique_name') | |
| ) | |
| def extract_user_info_from_token(decoded_token: Dict[str, Any]) -> Dict[str, Optional[str]]: | |
| return { | |
| "user_id": decoded_token.get('oid') or decoded_token.get('sub'), | |
| "display_name": decoded_token.get('name'), | |
| "first_name": decoded_token.get('given_name'), | |
| "last_name": decoded_token.get('family_name'), | |
| } | |
| def extract_user_info_from_sso_user_data(user_data) -> Dict[str, Optional[str]]: | |
| return { | |
| "email": user_data.email, | |
| "user_id": user_data.id, | |
| "display_name": user_data.display_name, | |
| "first_name": user_data.first_name, | |
| "last_name": user_data.last_name, | |
| } | |
| def validate_emu_email(email: str) -> None: | |
| if not email or not email.lower().endswith(ALLOWED_EMAIL_DOMAIN.lower()): | |
| logger.warning(f"Login attempt with non-EMU email: {email}") | |
| raise HTTPException( | |
| status_code=403, | |
| detail="Only EMU students and staff can login" | |
| ) | |