Fred808 commited on
Commit
3a41de0
·
verified ·
1 Parent(s): 03ea6dc

Update app/services/auth_service.py

Browse files
Files changed (1) hide show
  1. app/services/auth_service.py +124 -122
app/services/auth_service.py CHANGED
@@ -1,123 +1,125 @@
1
- from datetime import datetime, timedelta
2
- from typing import Optional
3
- from fastapi import Depends, HTTPException, status
4
- from fastapi.security import OAuth2PasswordBearer
5
- from jose import JWTError, jwt
6
- from passlib.context import CryptContext
7
- from ..models.auth import UserProfile, LoginResponse
8
- from ..core.config import settings
9
- from ..db.crud import get_user_by_email
10
-
11
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
12
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
13
-
14
- async def authenticate_user(email: str, password: str) -> Optional[UserProfile]:
15
- """Authenticate a user and return their profile if credentials are valid."""
16
- user = await get_user_by_email(email)
17
- if not user or not verify_password(password, user.hashed_password):
18
- return None
19
- return UserProfile(
20
- id=user.id,
21
- email=user.email,
22
- full_name=user.full_name,
23
- role=user.role
24
- )
25
-
26
- def verify_password(plain_password: str, hashed_password: str) -> bool:
27
- """Verify a password against its hash."""
28
- return pwd_context.verify(plain_password, hashed_password)
29
-
30
- def get_password_hash(password: str) -> str:
31
- """Generate password hash."""
32
- return pwd_context.hash(password)
33
-
34
- def create_access_token(data: dict) -> str:
35
- """Create a JWT access token."""
36
- to_encode = data.copy()
37
- expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
38
- to_encode.update({"exp": expire})
39
- encoded_jwt = jwt.encode(
40
- to_encode,
41
- settings.SECRET_KEY,
42
- algorithm=settings.ALGORITHM
43
- )
44
- return encoded_jwt
45
-
46
- def create_refresh_token(data: dict) -> str:
47
- """Create a JWT refresh token."""
48
- to_encode = data.copy()
49
- expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
50
- to_encode.update({"exp": expire, "refresh": True})
51
- return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
52
-
53
- async def refresh_access_token(refresh_token: str) -> LoginResponse:
54
- """Create new access token using refresh token."""
55
- try:
56
- payload = jwt.decode(
57
- refresh_token,
58
- settings.SECRET_KEY,
59
- algorithms=[settings.ALGORITHM]
60
- )
61
- if not payload.get("refresh"):
62
- raise HTTPException(
63
- status_code=status.HTTP_400_BAD_REQUEST,
64
- detail="Invalid refresh token"
65
- )
66
- email: str = payload.get("sub")
67
- if email is None:
68
- raise HTTPException(
69
- status_code=status.HTTP_401_UNAUTHORIZED,
70
- detail="Invalid refresh token"
71
- )
72
- user = await get_user_by_email(email)
73
- if not user:
74
- raise HTTPException(
75
- status_code=status.HTTP_401_UNAUTHORIZED,
76
- detail="User not found"
77
- )
78
- access_token = create_access_token({"sub": user.email})
79
- return LoginResponse(
80
- access_token=access_token,
81
- token_type="bearer",
82
- user=UserProfile(
83
- id=user.id,
84
- email=user.email,
85
- full_name=user.full_name,
86
- role=user.role
87
- )
88
- )
89
- except JWTError:
90
- raise HTTPException(
91
- status_code=status.HTTP_401_UNAUTHORIZED,
92
- detail="Invalid refresh token"
93
- )
94
-
95
- async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserProfile:
96
- """Get the current authenticated user from JWT token."""
97
- credentials_exception = HTTPException(
98
- status_code=status.HTTP_401_UNAUTHORIZED,
99
- detail="Could not validate credentials",
100
- headers={"WWW-Authenticate": "Bearer"},
101
- )
102
- try:
103
- payload = jwt.decode(
104
- token,
105
- settings.SECRET_KEY,
106
- algorithms=[settings.ALGORITHM]
107
- )
108
- email: str = payload.get("sub")
109
- if email is None:
110
- raise credentials_exception
111
- except JWTError:
112
- raise credentials_exception
113
-
114
- user = await get_user_by_email(email)
115
- if user is None:
116
- raise credentials_exception
117
-
118
- return UserProfile(
119
- id=user.id,
120
- email=user.email,
121
- full_name=user.full_name,
122
- role=user.role
 
 
123
  )
 
1
+ from datetime import datetime, timedelta
2
+ from typing import Optional
3
+ from fastapi import Depends, HTTPException, status
4
+ from fastapi.security import OAuth2PasswordBearer
5
+ from jose import JWTError, jwt
6
+ from passlib.context import CryptContext
7
+ from ..models.auth import UserProfile, LoginResponse
8
+ from ..core.config import settings
9
+ from ..db.crud import get_user_by_email
10
+ from ..db.database import get_db
11
+ from sqlalchemy.ext.asyncio import AsyncSession
12
+
13
+ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
14
+ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login")
15
+
16
+ async def authenticate_user(email: str, password: str, db: AsyncSession = Depends(get_db)) -> Optional[UserProfile]:
17
+ """Authenticate a user and return their profile if credentials are valid."""
18
+ user = await get_user_by_email(db, email)
19
+ if not user or not verify_password(password, user.hashed_password):
20
+ return None
21
+ return UserProfile(
22
+ id=user.id,
23
+ email=user.email,
24
+ full_name=user.full_name,
25
+ role=user.role
26
+ )
27
+
28
+ def verify_password(plain_password: str, hashed_password: str) -> bool:
29
+ """Verify a password against its hash."""
30
+ return pwd_context.verify(plain_password, hashed_password)
31
+
32
+ def get_password_hash(password: str) -> str:
33
+ """Generate password hash."""
34
+ return pwd_context.hash(password)
35
+
36
+ def create_access_token(data: dict) -> str:
37
+ """Create a JWT access token."""
38
+ to_encode = data.copy()
39
+ expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
40
+ to_encode.update({"exp": expire})
41
+ encoded_jwt = jwt.encode(
42
+ to_encode,
43
+ settings.SECRET_KEY,
44
+ algorithm=settings.ALGORITHM
45
+ )
46
+ return encoded_jwt
47
+
48
+ def create_refresh_token(data: dict) -> str:
49
+ """Create a JWT refresh token."""
50
+ to_encode = data.copy()
51
+ expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
52
+ to_encode.update({"exp": expire, "refresh": True})
53
+ return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
54
+
55
+ async def refresh_access_token(refresh_token: str) -> LoginResponse:
56
+ """Create new access token using refresh token."""
57
+ try:
58
+ payload = jwt.decode(
59
+ refresh_token,
60
+ settings.SECRET_KEY,
61
+ algorithms=[settings.ALGORITHM]
62
+ )
63
+ if not payload.get("refresh"):
64
+ raise HTTPException(
65
+ status_code=status.HTTP_400_BAD_REQUEST,
66
+ detail="Invalid refresh token"
67
+ )
68
+ email: str = payload.get("sub")
69
+ if email is None:
70
+ raise HTTPException(
71
+ status_code=status.HTTP_401_UNAUTHORIZED,
72
+ detail="Invalid refresh token"
73
+ )
74
+ user = await get_user_by_email(email)
75
+ if not user:
76
+ raise HTTPException(
77
+ status_code=status.HTTP_401_UNAUTHORIZED,
78
+ detail="User not found"
79
+ )
80
+ access_token = create_access_token({"sub": user.email})
81
+ return LoginResponse(
82
+ access_token=access_token,
83
+ token_type="bearer",
84
+ user=UserProfile(
85
+ id=user.id,
86
+ email=user.email,
87
+ full_name=user.full_name,
88
+ role=user.role
89
+ )
90
+ )
91
+ except JWTError:
92
+ raise HTTPException(
93
+ status_code=status.HTTP_401_UNAUTHORIZED,
94
+ detail="Invalid refresh token"
95
+ )
96
+
97
+ async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)) -> UserProfile:
98
+ """Get the current authenticated user from JWT token."""
99
+ credentials_exception = HTTPException(
100
+ status_code=status.HTTP_401_UNAUTHORIZED,
101
+ detail="Could not validate credentials",
102
+ headers={"WWW-Authenticate": "Bearer"},
103
+ )
104
+ try:
105
+ payload = jwt.decode(
106
+ token,
107
+ settings.SECRET_KEY,
108
+ algorithms=[settings.ALGORITHM]
109
+ )
110
+ email: str = payload.get("sub")
111
+ if email is None:
112
+ raise credentials_exception
113
+ except JWTError:
114
+ raise credentials_exception
115
+
116
+ user = await get_user_by_email(db, email)
117
+ if user is None:
118
+ raise credentials_exception
119
+
120
+ return UserProfile(
121
+ id=user.id,
122
+ email=user.email,
123
+ full_name=user.full_name,
124
+ role=user.role
125
  )