suhail commited on
Commit
12ccf36
·
1 Parent(s): 62b3ff8
Files changed (1) hide show
  1. src/core/security.py +131 -68
src/core/security.py CHANGED
@@ -1,106 +1,169 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """Security utilities for authentication and authorization."""
2
  import jwt
3
  from datetime import datetime, timedelta
4
  from passlib.context import CryptContext
5
  from fastapi import HTTPException, status
6
- from typing import Optional
7
 
8
- # Password hashing context
9
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
10
 
11
-
12
- def hash_password(password: str) -> str:
13
- if len(password.encode("utf-8")) > 72:
14
- raise HTTPException(
15
- status_code=status.HTTP_400_BAD_REQUEST,
16
- detail="Password must be at most 72 characters"
17
- )
18
- return pwd_context.hash(password)
19
 
20
 
21
- def verify_password(plain_password: str, hashed_password: str) -> bool:
22
  """
23
- Verify a password against its hash.
 
 
 
 
 
 
24
 
25
- Args:
26
- plain_password: Plain text password to verify
27
- hashed_password: Hashed password to compare against
28
 
29
- Returns:
30
- True if password matches, False otherwise
31
- """
32
- return pwd_context.verify(plain_password, hashed_password)
33
 
34
 
35
- def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str:
36
- """
37
- Create a JWT token for a user.
 
 
38
 
39
- Args:
40
- user_id: User's unique identifier
41
- email: User's email address
42
- secret: Secret key for signing the token
43
- expiration_days: Number of days until token expires (default: 7)
44
 
45
- Returns:
46
- Encoded JWT token string
47
- """
48
  now = datetime.utcnow()
49
  payload = {
50
  "sub": str(user_id),
51
  "email": email,
52
  "iat": now,
53
  "exp": now + timedelta(days=expiration_days),
54
- "iss": "better-auth"
55
  }
56
  return jwt.encode(payload, secret, algorithm="HS256")
57
 
58
 
59
  def verify_jwt_token(token: str, secret: str) -> dict:
60
- """
61
- Verify and decode a JWT token.
62
-
63
- Args:
64
- token: JWT token string to verify
65
- secret: Secret key used to sign the token
66
-
67
- Returns:
68
- Decoded token payload as dictionary
69
-
70
- Raises:
71
- HTTPException: 401 if token is expired or invalid
72
- """
73
  try:
74
  payload = jwt.decode(
75
  token,
76
  secret,
77
  algorithms=["HS256"],
78
- options={
79
- "verify_signature": True,
80
- "verify_exp": True,
81
- "require": ["sub", "email", "iat", "exp", "iss"]
82
- }
83
  )
84
-
85
- # Validate issuer
86
  if payload.get("iss") != "better-auth":
87
- raise HTTPException(
88
- status_code=status.HTTP_401_UNAUTHORIZED,
89
- detail="Invalid token issuer",
90
- headers={"WWW-Authenticate": "Bearer"}
91
- )
92
-
93
  return payload
94
-
95
  except jwt.ExpiredSignatureError:
96
- raise HTTPException(
97
- status_code=status.HTTP_401_UNAUTHORIZED,
98
- detail="Token has expired",
99
- headers={"WWW-Authenticate": "Bearer"}
100
- )
101
  except jwt.InvalidTokenError:
102
- raise HTTPException(
103
- status_code=status.HTTP_401_UNAUTHORIZED,
104
- detail="Invalid token",
105
- headers={"WWW-Authenticate": "Bearer"}
106
- )
 
1
+ # """Security utilities for authentication and authorization."""
2
+ # import jwt
3
+ # from datetime import datetime, timedelta
4
+ # from passlib.context import CryptContext
5
+ # from fastapi import HTTPException, status
6
+ # from typing import Optional
7
+
8
+ # # Password hashing context
9
+ # pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
10
+
11
+
12
+ # def hash_password(password: str) -> str:
13
+ # if len(password.encode("utf-8")) > 72:
14
+ # raise HTTPException(
15
+ # status_code=status.HTTP_400_BAD_REQUEST,
16
+ # detail="Password must be at most 72 characters"
17
+ # )
18
+ # return pwd_context.hash(password)
19
+
20
+
21
+ # def verify_password(plain_password: str, hashed_password: str) -> bool:
22
+ # """
23
+ # Verify a password against its hash.
24
+
25
+ # Args:
26
+ # plain_password: Plain text password to verify
27
+ # hashed_password: Hashed password to compare against
28
+
29
+ # Returns:
30
+ # True if password matches, False otherwise
31
+ # """
32
+ # return pwd_context.verify(plain_password, hashed_password)
33
+
34
+
35
+ # def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str:
36
+ # """
37
+ # Create a JWT token for a user.
38
+
39
+ # Args:
40
+ # user_id: User's unique identifier
41
+ # email: User's email address
42
+ # secret: Secret key for signing the token
43
+ # expiration_days: Number of days until token expires (default: 7)
44
+
45
+ # Returns:
46
+ # Encoded JWT token string
47
+ # """
48
+ # now = datetime.utcnow()
49
+ # payload = {
50
+ # "sub": str(user_id),
51
+ # "email": email,
52
+ # "iat": now,
53
+ # "exp": now + timedelta(days=expiration_days),
54
+ # "iss": "better-auth"
55
+ # }
56
+ # return jwt.encode(payload, secret, algorithm="HS256")
57
+
58
+
59
+ # def verify_jwt_token(token: str, secret: str) -> dict:
60
+ # """
61
+ # Verify and decode a JWT token.
62
+
63
+ # Args:
64
+ # token: JWT token string to verify
65
+ # secret: Secret key used to sign the token
66
+
67
+ # Returns:
68
+ # Decoded token payload as dictionary
69
+
70
+ # Raises:
71
+ # HTTPException: 401 if token is expired or invalid
72
+ # """
73
+ # try:
74
+ # payload = jwt.decode(
75
+ # token,
76
+ # secret,
77
+ # algorithms=["HS256"],
78
+ # options={
79
+ # "verify_signature": True,
80
+ # "verify_exp": True,
81
+ # "require": ["sub", "email", "iat", "exp", "iss"]
82
+ # }
83
+ # )
84
+
85
+ # # Validate issuer
86
+ # if payload.get("iss") != "better-auth":
87
+ # raise HTTPException(
88
+ # status_code=status.HTTP_401_UNAUTHORIZED,
89
+ # detail="Invalid token issuer",
90
+ # headers={"WWW-Authenticate": "Bearer"}
91
+ # )
92
+
93
+ # return payload
94
+
95
+ # except jwt.ExpiredSignatureError:
96
+ # raise HTTPException(
97
+ # status_code=status.HTTP_401_UNAUTHORIZED,
98
+ # detail="Token has expired",
99
+ # headers={"WWW-Authenticate": "Bearer"}
100
+ # )
101
+ # except jwt.InvalidTokenError:
102
+ # raise HTTPException(
103
+ # status_code=status.HTTP_401_UNAUTHORIZED,
104
+ # detail="Invalid token",
105
+ # headers={"WWW-Authenticate": "Bearer"}
106
+ # )
107
+
108
+
109
+
110
  """Security utilities for authentication and authorization."""
111
  import jwt
112
  from datetime import datetime, timedelta
113
  from passlib.context import CryptContext
114
  from fastapi import HTTPException, status
 
115
 
 
116
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
117
 
118
+ MAX_BCRYPT_BYTES = 72
 
 
 
 
 
 
 
119
 
120
 
121
+ def _normalize_password(password: str) -> bytes:
122
  """
123
+ bcrypt only supports 72 bytes.
124
+ This guarantees no runtime crash in any environment.
125
+ """
126
+ password_bytes = password.encode("utf-8")
127
+ if len(password_bytes) > MAX_BCRYPT_BYTES:
128
+ password_bytes = password_bytes[:MAX_BCRYPT_BYTES]
129
+ return password_bytes
130
 
 
 
 
131
 
132
+ def hash_password(password: str) -> str:
133
+ return pwd_context.hash(_normalize_password(password))
 
 
134
 
135
 
136
+ def verify_password(plain_password: str, hashed_password: str) -> bool:
137
+ return pwd_context.verify(
138
+ _normalize_password(plain_password),
139
+ hashed_password,
140
+ )
141
 
 
 
 
 
 
142
 
143
+ def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str:
 
 
144
  now = datetime.utcnow()
145
  payload = {
146
  "sub": str(user_id),
147
  "email": email,
148
  "iat": now,
149
  "exp": now + timedelta(days=expiration_days),
150
+ "iss": "better-auth",
151
  }
152
  return jwt.encode(payload, secret, algorithm="HS256")
153
 
154
 
155
  def verify_jwt_token(token: str, secret: str) -> dict:
 
 
 
 
 
 
 
 
 
 
 
 
 
156
  try:
157
  payload = jwt.decode(
158
  token,
159
  secret,
160
  algorithms=["HS256"],
161
+ options={"verify_exp": True},
 
 
 
 
162
  )
 
 
163
  if payload.get("iss") != "better-auth":
164
+ raise HTTPException(status_code=401, detail="Invalid token issuer")
 
 
 
 
 
165
  return payload
 
166
  except jwt.ExpiredSignatureError:
167
+ raise HTTPException(status_code=401, detail="Token expired")
 
 
 
 
168
  except jwt.InvalidTokenError:
169
+ raise HTTPException(status_code=401, detail="Invalid token")