suhail commited on
Commit
b5e6f76
·
1 Parent(s): 561422e
Files changed (1) hide show
  1. src/core/security.py +62 -72
src/core/security.py CHANGED
@@ -104,71 +104,44 @@
104
  # detail="Invalid token",
105
  # headers={"WWW-Authenticate": "Bearer"}
106
  # )
107
- """
108
- Security utilities for authentication and authorization.
109
- """
110
- from datetime import datetime, timedelta
111
- from typing import Dict, Any
112
 
 
 
113
  import jwt
 
114
  from passlib.context import CryptContext
115
  from fastapi import HTTPException, status, Depends
116
  from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
117
-
118
  from src.core.config import settings
119
 
120
- # =========================
121
- # Password hashing (bcrypt SAFE)
122
- # =========================
123
-
124
- pwd_context = CryptContext(
125
- schemes=["bcrypt"],
126
- deprecated="auto"
127
- )
128
-
129
  security = HTTPBearer()
130
-
131
  MAX_BCRYPT_BYTES = 72
132
 
133
 
134
- def _bcrypt_safe(password: str) -> bytes:
135
  """
136
- bcrypt supports MAX 72 bytes.
137
- Always truncate BEFORE hashing or verifying.
138
  """
139
- return password.encode("utf-8")[:MAX_BCRYPT_BYTES]
 
 
 
140
 
141
 
142
  def hash_password(password: str) -> str:
143
- if len(password.encode("utf-8")) > 72:
144
- raise ValueError("Password too long (max 72 bytes)")
145
- return pwd_context.hash(password)
146
-
147
-
148
 
149
  def verify_password(plain_password: str, hashed_password: str) -> bool:
150
- """
151
- Verify password safely (NO crashes).
152
- """
153
- try:
154
- return pwd_context.verify(
155
- _bcrypt_safe(plain_password),
156
- hashed_password
157
- )
158
- except Exception:
159
- return False
160
 
161
-
162
- # =========================
163
- # JWT utilities
164
- # =========================
165
-
166
- def create_jwt_token(
167
- user_id: int,
168
- email: str,
169
- secret: str,
170
- expiration_days: int = 7
171
- ) -> str:
172
  now = datetime.utcnow()
173
  payload = {
174
  "sub": str(user_id),
@@ -188,40 +161,57 @@ def verify_jwt_token(token: str, secret: str) -> dict:
188
  algorithms=["HS256"],
189
  options={"verify_exp": True},
190
  )
191
-
192
  if payload.get("iss") != "better-auth":
193
- raise HTTPException(
194
- status_code=status.HTTP_401_UNAUTHORIZED,
195
- detail="Invalid token issuer"
196
- )
197
-
198
  return payload
199
-
200
  except jwt.ExpiredSignatureError:
201
- raise HTTPException(
202
- status_code=status.HTTP_401_UNAUTHORIZED,
203
- detail="Token expired"
204
- )
205
  except jwt.InvalidTokenError:
206
- raise HTTPException(
207
- status_code=status.HTTP_401_UNAUTHORIZED,
208
- detail="Invalid token"
209
- )
210
-
211
 
212
- # =========================
213
- # FastAPI dependency
214
- # =========================
215
 
216
  def get_current_user(
217
  credentials: HTTPAuthorizationCredentials = Depends(security)
218
  ) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
219
  token = credentials.credentials
220
- payload = verify_jwt_token(token, settings.BETTER_AUTH_SECRET)
221
 
222
- return {
223
- "id": int(payload["sub"]),
224
- "email": payload.get("email"),
225
- "iat": payload.get("iat"),
226
- "exp": payload.get("exp"),
227
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  # detail="Invalid token",
105
  # headers={"WWW-Authenticate": "Bearer"}
106
  # )
 
 
 
 
 
107
 
108
+
109
+ """Security utilities for authentication and authorization."""
110
  import jwt
111
+ from datetime import datetime, timedelta
112
  from passlib.context import CryptContext
113
  from fastapi import HTTPException, status, Depends
114
  from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
115
+ from typing import Dict, Any
116
  from src.core.config import settings
117
 
118
+ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
 
 
 
 
 
 
 
119
  security = HTTPBearer()
120
+ import hashlib
121
  MAX_BCRYPT_BYTES = 72
122
 
123
 
124
+ def _normalize_password(password: str) -> bytes:
125
  """
126
+ bcrypt only supports 72 bytes.
127
+ This guarantees no runtime crash in any environment.
128
  """
129
+ password_bytes = password.encode("utf-8")
130
+ if len(password_bytes) > MAX_BCRYPT_BYTES:
131
+ password_bytes = password_bytes[:MAX_BCRYPT_BYTES]
132
+ return password_bytes
133
 
134
 
135
  def hash_password(password: str) -> str:
136
+ # SHA-256 produces 64 hex chars = 64 bytes < 72-byte limit
137
+ pre_hashed = hashlib.sha256(password.encode("utf-8")).hexdigest()
138
+ return pwd_context.hash(pre_hashed)
 
 
139
 
140
  def verify_password(plain_password: str, hashed_password: str) -> bool:
141
+ pre_hashed = hashlib.sha256(plain_password.encode("utf-8")).hexdigest()
142
+ return pwd_context.verify(pre_hashed, hashed_password)
 
 
 
 
 
 
 
 
143
 
144
+ def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str:
 
 
 
 
 
 
 
 
 
 
145
  now = datetime.utcnow()
146
  payload = {
147
  "sub": str(user_id),
 
161
  algorithms=["HS256"],
162
  options={"verify_exp": True},
163
  )
 
164
  if payload.get("iss") != "better-auth":
165
+ raise HTTPException(status_code=401, detail="Invalid token issuer")
 
 
 
 
166
  return payload
 
167
  except jwt.ExpiredSignatureError:
168
+ raise HTTPException(status_code=401, detail="Token expired")
 
 
 
169
  except jwt.InvalidTokenError:
170
+ raise HTTPException(status_code=401, detail="Invalid token")
 
 
 
 
171
 
 
 
 
172
 
173
  def get_current_user(
174
  credentials: HTTPAuthorizationCredentials = Depends(security)
175
  ) -> Dict[str, Any]:
176
+ """
177
+ FastAPI dependency to extract and validate JWT token from Authorization header.
178
+
179
+ Args:
180
+ credentials: HTTP Bearer token credentials from request header
181
+
182
+ Returns:
183
+ Dictionary containing user information from token payload:
184
+ - id: User ID (parsed from 'sub' claim)
185
+ - email: User email
186
+ - iat: Token issued at timestamp
187
+ - exp: Token expiration timestamp
188
+
189
+ Raises:
190
+ HTTPException 401: If token is missing, invalid, or expired
191
+ """
192
  token = credentials.credentials
 
193
 
194
+ try:
195
+ payload = verify_jwt_token(token, settings.BETTER_AUTH_SECRET)
196
+
197
+ # Extract user ID from 'sub' claim and convert to integer
198
+ user_id = int(payload.get("sub"))
199
+
200
+ return {
201
+ "id": user_id,
202
+ "email": payload.get("email"),
203
+ "iat": payload.get("iat"),
204
+ "exp": payload.get("exp")
205
+ }
206
+ except ValueError:
207
+ raise HTTPException(
208
+ status_code=status.HTTP_401_UNAUTHORIZED,
209
+ detail="Invalid user ID in token",
210
+ headers={"WWW-Authenticate": "Bearer"}
211
+ )
212
+ except Exception as e:
213
+ raise HTTPException(
214
+ status_code=status.HTTP_401_UNAUTHORIZED,
215
+ detail=f"Authentication failed: {str(e)}",
216
+ headers={"WWW-Authenticate": "Bearer"}
217
+ )