|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional |
|
|
from sqlalchemy.orm import Session |
|
|
import jwt |
|
|
|
|
|
from config import settings |
|
|
from database.database import get_db |
|
|
from models.user import User |
|
|
from utils.password_utils import get_password_hash, verify_password |
|
|
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): |
|
|
""" |
|
|
Create a JWT access token with expiration time |
|
|
""" |
|
|
to_encode = data.copy() |
|
|
|
|
|
|
|
|
if expires_delta: |
|
|
expire = datetime.utcnow() + expires_delta |
|
|
else: |
|
|
expire = datetime.utcnow() + timedelta(days=30) |
|
|
|
|
|
to_encode.update({"exp": expire}) |
|
|
|
|
|
|
|
|
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) |
|
|
return encoded_jwt |
|
|
|
|
|
|
|
|
def verify_token(token: str) -> Optional[dict]: |
|
|
""" |
|
|
Verify a JWT token and return the payload if valid |
|
|
""" |
|
|
try: |
|
|
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) |
|
|
return payload |
|
|
except jwt.ExpiredSignatureError: |
|
|
|
|
|
return None |
|
|
except jwt.JWTError: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def is_authenticated(token: str) -> Optional[User]: |
|
|
""" |
|
|
Decode the token and return the user object based on the user ID |
|
|
or return None if not authenticated |
|
|
""" |
|
|
|
|
|
if token.startswith("Bearer "): |
|
|
token = token[7:] |
|
|
|
|
|
|
|
|
payload = verify_token(token) |
|
|
if payload is None: |
|
|
return None |
|
|
|
|
|
|
|
|
user_id: str = payload.get("sub") |
|
|
if user_id is None: |
|
|
return None |
|
|
|
|
|
|
|
|
from database.database import get_db |
|
|
db: Session = next(get_db()) |
|
|
try: |
|
|
user = db.query(User).filter(User.id == user_id).first() |
|
|
return user |
|
|
finally: |
|
|
db.close() |