import re import logging from fastapi import HTTPException, Security # Import Security here from jose import JWTError, jwt from datetime import datetime, timedelta from fastapi.security import HTTPBearer security = HTTPBearer() SECRET_KEY = "B@@kMy$er^!(e" ALGORITHM = "HS256" def validate_email(email: str) -> bool: """ Validate the email address format. Args: email (str): The email address to validate. Returns: bool: True if valid, False otherwise. """ email_regex = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$' return re.match(email_regex, email) is not None def validate_mobile(mobile: str) -> bool: """ Validate the mobile number format. Args: mobile (str): The mobile number to validate. Returns: bool: True if valid, False otherwise. """ return mobile.isdigit() and len(mobile) == 10 def generate_tokens(identifier: str, merchant_id: str = None, role: str = None) -> dict: access_token_expiry = datetime.utcnow() + timedelta(minutes=30) refresh_token_expiry = datetime.utcnow() + timedelta(days=1) payload = {"sub": identifier, "exp": access_token_expiry} if merchant_id: payload["merchant_id"] = merchant_id if role: payload["role"] = role access_token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) refresh_token = jwt.encode({"sub": identifier, "exp": refresh_token_expiry}, SECRET_KEY, algorithm=ALGORITHM) return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"} def verify_token(token: str = Security(security)) -> dict: """ Verify the access token. Args: token (str): The access token. Returns: dict: Decoded token data. Raises: HTTPException: If the token is invalid or expired. """ try: payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=[ALGORITHM]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token has expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token")