Ali2206 commited on
Commit
081d154
·
verified ·
1 Parent(s): 51ace9a

Update core/security.py

Browse files
Files changed (1) hide show
  1. core/security.py +45 -68
core/security.py CHANGED
@@ -1,80 +1,57 @@
1
  from datetime import datetime, timedelta
2
- from fastapi import HTTPException, status, Depends, Request
3
- from fastapi.security import OAuth2PasswordBearer
4
  from jose import jwt, JWTError
5
- from pydantic import BaseModel
6
- from typing import Optional
 
 
7
  import logging
8
- from core.config import settings
9
 
10
  logger = logging.getLogger(__name__)
11
 
12
- oauth2_scheme = OAuth2PasswordBearer(
13
- tokenUrl=f"{settings.API_V1_STR}/auth/login",
14
- scheme_name="JWT"
15
- )
16
 
17
- class TokenPayload(BaseModel):
18
- sub: str
19
- exp: int
20
- iat: int
21
- role: Optional[str] = None
22
 
23
- def create_access_token(*, data: dict, expires_delta: timedelta = None) -> str:
24
- try:
25
- to_encode = data.copy()
26
- expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
27
- to_encode.update({"exp": expire, "iat": datetime.utcnow()})
28
- encoded_jwt = jwt.encode(
29
- to_encode,
30
- settings.SECRET_KEY,
31
- algorithm=settings.ALGORITHM
32
- )
33
- logger.info(f"Token created for {data.get('sub')}")
34
- return encoded_jwt
35
- except Exception as e:
36
- logger.error(f"Token creation failed: {str(e)}")
37
- raise HTTPException(
38
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
39
- detail="Token creation failed"
40
- )
41
 
42
- async def verify_token(token: str = Depends(oauth2_scheme)) -> TokenPayload:
43
- credentials_exception = HTTPException(
44
- status_code=status.HTTP_401_UNAUTHORIZED,
45
- detail="Could not validate credentials",
46
- headers={"WWW-Authenticate": "Bearer"},
47
- )
48
-
49
- try:
50
- if not token:
51
- logger.error("Empty token provided")
52
- raise credentials_exception
53
 
54
- if token.startswith("Bearer "):
55
- token = token[7:]
56
- logger.debug("Removed 'Bearer ' prefix from token")
57
 
58
- payload = jwt.decode(
59
- token,
60
- settings.SECRET_KEY,
61
- algorithms=[settings.ALGORITHM]
62
- )
63
- token_data = TokenPayload(**payload)
64
-
65
- if datetime.fromtimestamp(token_data.exp) < datetime.utcnow():
66
- logger.error("Token expired")
67
- raise HTTPException(
68
- status_code=status.HTTP_401_UNAUTHORIZED,
69
- detail="Token expired"
70
- )
71
-
72
- logger.info(f"Token verified for {token_data.sub}")
73
- return token_data
74
-
75
  except JWTError as e:
76
- logger.error(f"JWT validation failed: {str(e)}")
77
- raise credentials_exception
78
- except Exception as e:
79
- logger.error(f"Unexpected token verification error: {str(e)}")
80
- raise credentials_exception
 
 
 
 
 
1
  from datetime import datetime, timedelta
2
+ from passlib.context import CryptContext
 
3
  from jose import jwt, JWTError
4
+ from fastapi import Depends, HTTPException, status
5
+ from fastapi.security import OAuth2PasswordBearer
6
+ from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
7
+ from db.mongo import users_collection
8
  import logging
 
9
 
10
  logger = logging.getLogger(__name__)
11
 
12
+ # OAuth2 setup
13
+ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
 
 
14
 
15
+ # Password hashing context
16
+ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 
 
 
17
 
18
+ # Hash a plain password
19
+ def hash_password(password: str) -> str:
20
+ return pwd_context.hash(password)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
+ # Verify a plain password against the hash
23
+ def verify_password(plain: str, hashed: str) -> bool:
24
+ return pwd_context.verify(plain, hashed)
25
+
26
+ # Create a JWT access token
27
+ def create_access_token(data: dict, expires_delta: timedelta = None):
28
+ to_encode = data.copy()
29
+ expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
30
+ to_encode.update({"exp": expire})
31
+ return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
 
32
 
33
+ # Get the current user from the JWT token
34
+ async def get_current_user(token: str = Depends(oauth2_scheme)):
35
+ print("🔐 Token received:", token)
36
 
37
+ if not token:
38
+ print("❌ No token received")
39
+ raise HTTPException(status_code=401, detail="No token provided")
40
+
41
+ try:
42
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
43
+ print("🧠 Token payload:", payload)
44
+
45
+ email = payload.get("sub")
46
+ if not email:
47
+ raise HTTPException(status_code=401, detail="Invalid token: missing subject")
 
 
 
 
 
 
48
  except JWTError as e:
49
+ print("JWT decode error:", str(e))
50
+ raise HTTPException(status_code=401, detail="Could not validate token")
51
+
52
+ user = await users_collection.find_one({"email": email})
53
+ if not user:
54
+ raise HTTPException(status_code=404, detail="User not found")
55
+
56
+ print("✅ Authenticated user:", user["email"])
57
+ return user