suhail commited on
Commit
687894b
·
1 Parent(s): 44da760

security.py

Browse files
Files changed (1) hide show
  1. src/core/security.py +52 -126
src/core/security.py CHANGED
@@ -107,138 +107,17 @@
107
 
108
 
109
 
110
- # """Security utilities for authentication and authorization."""
111
- # import jwt
112
- # from datetime import datetime, timedelta
113
- # from passlib.context import CryptContext
114
- # from fastapi import HTTPException, status, Depends
115
- # from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
116
- # from typing import Dict, Any
117
- # from src.core.config import settings
118
-
119
- # pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
120
- # security = HTTPBearer()
121
- # import hashlib
122
- # MAX_BCRYPT_BYTES = 72
123
-
124
-
125
- # def _normalize_password(password: str) -> bytes:
126
- # """
127
- # bcrypt only supports 72 bytes.
128
- # This guarantees no runtime crash in any environment.
129
- # """
130
- # password_bytes = password.encode("utf-8")
131
- # if len(password_bytes) > MAX_BCRYPT_BYTES:
132
- # password_bytes = password_bytes[:MAX_BCRYPT_BYTES]
133
- # return password_bytes
134
-
135
-
136
- # def hash_password(password: str) -> str:
137
- # # SHA-256 produces 64 hex chars = 64 bytes < 72-byte limit
138
- # pre_hashed = hashlib.sha256(password.encode("utf-8")).hexdigest()
139
- # return pwd_context.hash(pre_hashed)
140
-
141
- # def verify_password(plain_password: str, hashed_password: str) -> bool:
142
- # pre_hashed = hashlib.sha256(plain_password.encode("utf-8")).hexdigest()
143
- # return pwd_context.verify(pre_hashed, hashed_password)
144
-
145
- # def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str:
146
- # now = datetime.utcnow()
147
- # payload = {
148
- # "sub": str(user_id),
149
- # "email": email,
150
- # "iat": now,
151
- # "exp": now + timedelta(days=expiration_days),
152
- # "iss": "better-auth",
153
- # }
154
- # return jwt.encode(payload, secret, algorithm="HS256")
155
-
156
-
157
- # def verify_jwt_token(token: str, secret: str) -> dict:
158
- # try:
159
- # payload = jwt.decode(
160
- # token,
161
- # secret,
162
- # algorithms=["HS256"],
163
- # options={"verify_exp": True},
164
- # )
165
- # if payload.get("iss") != "better-auth":
166
- # raise HTTPException(status_code=401, detail="Invalid token issuer")
167
- # return payload
168
- # except jwt.ExpiredSignatureError:
169
- # raise HTTPException(status_code=401, detail="Token expired")
170
- # except jwt.InvalidTokenError:
171
- # raise HTTPException(status_code=401, detail="Invalid token")
172
-
173
-
174
- # def get_current_user(
175
- # credentials: HTTPAuthorizationCredentials = Depends(security)
176
- # ) -> Dict[str, Any]:
177
- # """
178
- # FastAPI dependency to extract and validate JWT token from Authorization header.
179
-
180
- # Args:
181
- # credentials: HTTP Bearer token credentials from request header
182
-
183
- # Returns:
184
- # Dictionary containing user information from token payload:
185
- # - id: User ID (parsed from 'sub' claim)
186
- # - email: User email
187
- # - iat: Token issued at timestamp
188
- # - exp: Token expiration timestamp
189
-
190
- # Raises:
191
- # HTTPException 401: If token is missing, invalid, or expired
192
- # """
193
- # token = credentials.credentials
194
-
195
- # try:
196
- # payload = verify_jwt_token(token, settings.BETTER_AUTH_SECRET)
197
-
198
- # # Extract user ID from 'sub' claim and convert to integer
199
- # user_id = int(payload.get("sub"))
200
-
201
- # return {
202
- # "id": user_id,
203
- # "email": payload.get("email"),
204
- # "iat": payload.get("iat"),
205
- # "exp": payload.get("exp")
206
- # }
207
- # except ValueError:
208
- # raise HTTPException(
209
- # status_code=status.HTTP_401_UNAUTHORIZED,
210
- # detail="Invalid user ID in token",
211
- # headers={"WWW-Authenticate": "Bearer"}
212
- # )
213
- # except Exception as e:
214
- # raise HTTPException(
215
- # status_code=status.HTTP_401_UNAUTHORIZED,
216
- # detail=f"Authentication failed: {str(e)}",
217
- # headers={"WWW-Authenticate": "Bearer"}
218
- # )
219
-
220
-
221
-
222
-
223
-
224
-
225
-
226
-
227
-
228
-
229
-
230
-
231
-
232
-
233
- # //////////////////////////////////////////////////////////////
234
-
235
  """Security utilities for authentication and authorization."""
236
  import jwt
237
  from datetime import datetime, timedelta
238
  from passlib.context import CryptContext
239
- from fastapi import HTTPException, status
 
 
 
240
 
241
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
242
  import hashlib
243
  MAX_BCRYPT_BYTES = 72
244
 
@@ -290,3 +169,50 @@ def verify_jwt_token(token: str, secret: str) -> dict:
290
  raise HTTPException(status_code=401, detail="Token expired")
291
  except jwt.InvalidTokenError:
292
  raise HTTPException(status_code=401, detail="Invalid token")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
 
108
 
109
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  """Security utilities for authentication and authorization."""
111
  import jwt
112
  from datetime import datetime, timedelta
113
  from passlib.context import CryptContext
114
+ from fastapi import HTTPException, status, Depends
115
+ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
116
+ from typing import Dict, Any
117
+ from src.core.config import settings
118
 
119
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
120
+ security = HTTPBearer()
121
  import hashlib
122
  MAX_BCRYPT_BYTES = 72
123
 
 
169
  raise HTTPException(status_code=401, detail="Token expired")
170
  except jwt.InvalidTokenError:
171
  raise HTTPException(status_code=401, detail="Invalid token")
172
+
173
+
174
+ def get_current_user(
175
+ credentials: HTTPAuthorizationCredentials = Depends(security)
176
+ ) -> Dict[str, Any]:
177
+ """
178
+ FastAPI dependency to extract and validate JWT token from Authorization header.
179
+
180
+ Args:
181
+ credentials: HTTP Bearer token credentials from request header
182
+
183
+ Returns:
184
+ Dictionary containing user information from token payload:
185
+ - id: User ID (parsed from 'sub' claim)
186
+ - email: User email
187
+ - iat: Token issued at timestamp
188
+ - exp: Token expiration timestamp
189
+
190
+ Raises:
191
+ HTTPException 401: If token is missing, invalid, or expired
192
+ """
193
+ token = credentials.credentials
194
+
195
+ try:
196
+ payload = verify_jwt_token(token, settings.BETTER_AUTH_SECRET)
197
+
198
+ # Extract user ID from 'sub' claim and convert to integer
199
+ user_id = int(payload.get("sub"))
200
+
201
+ return {
202
+ "id": user_id,
203
+ "email": payload.get("email"),
204
+ "iat": payload.get("iat"),
205
+ "exp": payload.get("exp")
206
+ }
207
+ except ValueError:
208
+ raise HTTPException(
209
+ status_code=status.HTTP_401_UNAUTHORIZED,
210
+ detail="Invalid user ID in token",
211
+ headers={"WWW-Authenticate": "Bearer"}
212
+ )
213
+ except Exception as e:
214
+ raise HTTPException(
215
+ status_code=status.HTTP_401_UNAUTHORIZED,
216
+ detail=f"Authentication failed: {str(e)}",
217
+ headers={"WWW-Authenticate": "Bearer"}
218
+ )