suhail commited on
Commit
9d2dad3
·
1 Parent(s): 687894b

security.py

Browse files
Files changed (1) hide show
  1. src/core/security.py +60 -38
src/core/security.py CHANGED
@@ -105,44 +105,64 @@
105
  # headers={"WWW-Authenticate": "Bearer"}
106
  # )
107
 
 
 
 
108
 
 
 
109
 
110
- """Security utilities for authentication and authorization."""
111
  import jwt
112
- from datetime import datetime, timedelta
113
  from passlib.context import CryptContext
114
  from fastapi import HTTPException, status, Depends
115
  from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
116
- from typing import Dict, Any
117
  from src.core.config import settings
118
 
 
 
 
 
119
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
120
  security = HTTPBearer()
121
- import hashlib
122
  MAX_BCRYPT_BYTES = 72
123
 
124
 
125
- def _normalize_password(password: str) -> bytes:
126
  """
127
  bcrypt only supports 72 bytes.
128
- This guarantees no runtime crash in any environment.
129
  """
130
- password_bytes = password.encode("utf-8")
131
- if len(password_bytes) > MAX_BCRYPT_BYTES:
132
- password_bytes = password_bytes[:MAX_BCRYPT_BYTES]
133
- return password_bytes
134
 
135
 
136
  def hash_password(password: str) -> str:
137
- # SHA-256 produces 64 hex chars = 64 bytes < 72-byte limit
138
- pre_hashed = hashlib.sha256(password.encode("utf-8")).hexdigest()
139
- return pwd_context.hash(pre_hashed)
 
140
 
141
- def verify_password(plain_password: str, hashed_password: str) -> bool:
142
- pre_hashed = hashlib.sha256(plain_password.encode("utf-8")).hexdigest()
143
- return pwd_context.verify(pre_hashed, hashed_password)
144
 
145
- def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
146
  now = datetime.utcnow()
147
  payload = {
148
  "sub": str(user_id),
@@ -162,57 +182,59 @@ def verify_jwt_token(token: str, secret: str) -> dict:
162
  algorithms=["HS256"],
163
  options={"verify_exp": True},
164
  )
 
165
  if payload.get("iss") != "better-auth":
166
- raise HTTPException(status_code=401, detail="Invalid token issuer")
 
 
 
 
167
  return payload
 
168
  except jwt.ExpiredSignatureError:
169
- raise HTTPException(status_code=401, detail="Token expired")
 
 
 
170
  except jwt.InvalidTokenError:
171
- raise HTTPException(status_code=401, detail="Invalid token")
 
 
 
172
 
 
 
 
173
 
174
  def get_current_user(
175
  credentials: HTTPAuthorizationCredentials = Depends(security)
176
  ) -> Dict[str, Any]:
177
  """
178
- FastAPI dependency to extract and validate JWT token from Authorization header.
179
-
180
- Args:
181
- credentials: HTTP Bearer token credentials from request header
182
-
183
- Returns:
184
- Dictionary containing user information from token payload:
185
- - id: User ID (parsed from 'sub' claim)
186
- - email: User email
187
- - iat: Token issued at timestamp
188
- - exp: Token expiration timestamp
189
-
190
- Raises:
191
- HTTPException 401: If token is missing, invalid, or expired
192
  """
193
  token = credentials.credentials
194
 
195
  try:
196
  payload = verify_jwt_token(token, settings.BETTER_AUTH_SECRET)
197
 
198
- # Extract user ID from 'sub' claim and convert to integer
199
  user_id = int(payload.get("sub"))
200
 
201
  return {
202
  "id": user_id,
203
  "email": payload.get("email"),
204
  "iat": payload.get("iat"),
205
- "exp": payload.get("exp")
206
  }
 
207
  except ValueError:
208
  raise HTTPException(
209
  status_code=status.HTTP_401_UNAUTHORIZED,
210
  detail="Invalid user ID in token",
211
- headers={"WWW-Authenticate": "Bearer"}
212
  )
213
  except Exception as e:
214
  raise HTTPException(
215
  status_code=status.HTTP_401_UNAUTHORIZED,
216
  detail=f"Authentication failed: {str(e)}",
217
- headers={"WWW-Authenticate": "Bearer"}
218
  )
 
105
  # headers={"WWW-Authenticate": "Bearer"}
106
  # )
107
 
108
+ """
109
+ Security utilities for authentication and authorization.
110
+ """
111
 
112
+ from datetime import datetime, timedelta
113
+ from typing import Dict, Any
114
 
 
115
  import jwt
 
116
  from passlib.context import CryptContext
117
  from fastapi import HTTPException, status, Depends
118
  from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
119
+
120
  from src.core.config import settings
121
 
122
+ # =========================
123
+ # Password hashing (bcrypt-safe)
124
+ # =========================
125
+
126
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
127
  security = HTTPBearer()
128
+
129
  MAX_BCRYPT_BYTES = 72
130
 
131
 
132
+ def _bcrypt_safe(password: str) -> bytes:
133
  """
134
  bcrypt only supports 72 bytes.
135
+ Truncate safely to avoid runtime crashes.
136
  """
137
+ return password.encode("utf-8")[:MAX_BCRYPT_BYTES]
 
 
 
138
 
139
 
140
  def hash_password(password: str) -> str:
141
+ """
142
+ Hash a password using bcrypt (safe for all environments).
143
+ """
144
+ return pwd_context.hash(_bcrypt_safe(password))
145
 
 
 
 
146
 
147
+ def verify_password(plain_password: str, hashed_password: str) -> bool:
148
+ """
149
+ Verify a password against its bcrypt hash.
150
+ """
151
+ return pwd_context.verify(
152
+ _bcrypt_safe(plain_password),
153
+ hashed_password
154
+ )
155
+
156
+ # =========================
157
+ # JWT utilities
158
+ # =========================
159
+
160
+ def create_jwt_token(
161
+ user_id: int,
162
+ email: str,
163
+ secret: str,
164
+ expiration_days: int = 7
165
+ ) -> str:
166
  now = datetime.utcnow()
167
  payload = {
168
  "sub": str(user_id),
 
182
  algorithms=["HS256"],
183
  options={"verify_exp": True},
184
  )
185
+
186
  if payload.get("iss") != "better-auth":
187
+ raise HTTPException(
188
+ status_code=status.HTTP_401_UNAUTHORIZED,
189
+ detail="Invalid token issuer"
190
+ )
191
+
192
  return payload
193
+
194
  except jwt.ExpiredSignatureError:
195
+ raise HTTPException(
196
+ status_code=status.HTTP_401_UNAUTHORIZED,
197
+ detail="Token expired"
198
+ )
199
  except jwt.InvalidTokenError:
200
+ raise HTTPException(
201
+ status_code=status.HTTP_401_UNAUTHORIZED,
202
+ detail="Invalid token"
203
+ )
204
 
205
+ # =========================
206
+ # FastAPI dependency
207
+ # =========================
208
 
209
  def get_current_user(
210
  credentials: HTTPAuthorizationCredentials = Depends(security)
211
  ) -> Dict[str, Any]:
212
  """
213
+ Extract and validate JWT token from Authorization header.
 
 
 
 
 
 
 
 
 
 
 
 
 
214
  """
215
  token = credentials.credentials
216
 
217
  try:
218
  payload = verify_jwt_token(token, settings.BETTER_AUTH_SECRET)
219
 
 
220
  user_id = int(payload.get("sub"))
221
 
222
  return {
223
  "id": user_id,
224
  "email": payload.get("email"),
225
  "iat": payload.get("iat"),
226
+ "exp": payload.get("exp"),
227
  }
228
+
229
  except ValueError:
230
  raise HTTPException(
231
  status_code=status.HTTP_401_UNAUTHORIZED,
232
  detail="Invalid user ID in token",
233
+ headers={"WWW-Authenticate": "Bearer"},
234
  )
235
  except Exception as e:
236
  raise HTTPException(
237
  status_code=status.HTTP_401_UNAUTHORIZED,
238
  detail=f"Authentication failed: {str(e)}",
239
+ headers={"WWW-Authenticate": "Bearer"},
240
  )