Spaces:
Sleeping
Sleeping
| # from fastapi import Depends, HTTPException, status | |
| # from jose import JWTError, jwt | |
| # from datetime import datetime, timedelta | |
| # from typing import Optional | |
| # from fastapi.security import OAuth2PasswordBearer | |
| # from passlib.context import CryptContext | |
| # # For demo purposes, using simple keys | |
| # SECRET_KEY = "hbjwdcgvcdjsavbcjkv" | |
| # ALGORITHM = "HS256" | |
| # ACCESS_TOKEN_EXPIRE_MINUTES = 30 | |
| # pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| # oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") | |
| # def verify_password(plain_password, hashed_password): | |
| # return pwd_context.verify(plain_password, hashed_password) | |
| # def get_password_hash(password): | |
| # return pwd_context.hash(password) | |
| # def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): | |
| # to_encode = data.copy() | |
| # expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15)) | |
| # to_encode.update({"exp": expire}) | |
| # return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| # async def get_current_user(token: str = Depends(oauth2_scheme)): | |
| # try: | |
| # payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| # user_id: str = payload.get("sub") | |
| # if not user_id: | |
| # raise HTTPException(status_code=401, detail="Invalid token") | |
| # return user_id | |
| # except JWTError: | |
| # raise HTTPException(status_code=401, detail="Invalid token") | |
| from fastapi import Depends, HTTPException, status | |
| from jose import JWTError, jwt | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from fastapi.security import OAuth2PasswordBearer | |
| from passlib.context import CryptContext | |
| # ⚠️ move to env vars in prod | |
| SECRET_KEY = "hbjwdcgvcdjsavbcjkv" | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 30 | |
| pwd_context = CryptContext( | |
| schemes=["argon2"], | |
| deprecated="auto" | |
| ) | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") | |
| def get_password_hash(password: str) -> str: | |
| return pwd_context.hash(password) | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| return pwd_context.verify(plain_password, hashed_password) | |
| def create_access_token( | |
| data: dict, | |
| expires_delta: Optional[timedelta] = None | |
| ): | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + ( | |
| expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| ) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| async def get_current_user(token: str = Depends(oauth2_scheme)): | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| user_id: str | None = payload.get("sub") | |
| if user_id is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid token", | |
| ) | |
| return user_id | |
| except JWTError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid token", | |
| ) | |