Spaces:
Sleeping
Sleeping
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from jose import JWTError, jwt | |
| from app.core.config import settings | |
| from app.schemas.token import TokenData | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 15 # 15 minutes | |
| REFRESH_TOKEN_EXPIRE_DAYS = 7 # 7 days | |
| def create_access_token(data: dict): | |
| """Creates a short-lived access token.""" | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) | |
| return encoded_jwt | |
| def create_refresh_token(data: dict): | |
| """Creates a long-lived refresh token.""" | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) | |
| to_encode.update({"exp": expire}) | |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) | |
| return encoded_jwt | |
| def verify_token(token: str, credentials_exception) -> TokenData: | |
| """Verifies any JWT token and returns its payload.""" | |
| try: | |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) | |
| username: str = payload.get("sub") | |
| role: str = payload.get("role") | |
| tenant_id: int = payload.get("tenant_id") | |
| if username is None or role is None: | |
| raise credentials_exception | |
| token_data = TokenData(username=username, role=role, tenant_id=tenant_id) | |
| return token_data | |
| except JWTError: | |
| raise credentials_exception | |