File size: 6,943 Bytes
1941764 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | """
Password reset service for secure token management and validation.
This module provides utilities for:
- Generating cryptographically secure reset tokens
- Validating token expiry and usage
- Rate limiting reset requests
- Password strength validation
- Token cleanup
"""
import os
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, List
from sqlmodel import Session, select
from src.models.password_reset import PasswordResetToken
from src.models.user import User
# Configuration from environment variables
TOKEN_EXPIRY_MINUTES = int(os.getenv("PASSWORD_RESET_TOKEN_EXPIRY_MINUTES", "15"))
MAX_REQUESTS_PER_HOUR = int(os.getenv("PASSWORD_RESET_MAX_REQUESTS_PER_HOUR", "3"))
def generate_reset_token() -> str:
"""
Generate a cryptographically secure random token.
Returns:
URL-safe random token string (32 bytes = 43 characters)
Example:
>>> token = generate_reset_token()
>>> len(token)
43
"""
return secrets.token_urlsafe(32)
def create_reset_token(session: Session, user_id: int) -> str:
"""
Create a password reset token for a user.
Args:
session: Database session
user_id: User ID to create token for
Returns:
Generated reset token string
Example:
>>> token = create_reset_token(session, user_id=1)
>>> print(token)
abc123def456...
"""
# Generate token
token = generate_reset_token()
# Calculate expiry time
expires_at = datetime.utcnow() + timedelta(minutes=TOKEN_EXPIRY_MINUTES)
# Create token record
reset_token = PasswordResetToken(
user_id=user_id,
token=token,
expires_at=expires_at,
used=False,
created_at=datetime.utcnow()
)
session.add(reset_token)
session.commit()
session.refresh(reset_token)
return token
def validate_reset_token(session: Session, token: str) -> Optional[PasswordResetToken]:
"""
Validate a password reset token.
Checks:
- Token exists in database
- Token has not expired
- Token has not been used
Args:
session: Database session
token: Reset token to validate
Returns:
PasswordResetToken if valid, None otherwise
Example:
>>> token_record = validate_reset_token(session, "abc123")
>>> if token_record:
... print(f"Valid token for user {token_record.user_id}")
"""
# Find token in database
statement = select(PasswordResetToken).where(PasswordResetToken.token == token)
token_record = session.exec(statement).first()
if not token_record:
return None
# Check if token has been used
if token_record.used:
return None
# Check if token has expired
if datetime.utcnow() > token_record.expires_at:
return None
return token_record
def invalidate_token(session: Session, token: str) -> bool:
"""
Mark a token as used (invalidate it).
Args:
session: Database session
token: Reset token to invalidate
Returns:
True if token was invalidated, False if not found
Example:
>>> success = invalidate_token(session, "abc123")
>>> print(success)
True
"""
statement = select(PasswordResetToken).where(PasswordResetToken.token == token)
token_record = session.exec(statement).first()
if not token_record:
return False
token_record.used = True
session.add(token_record)
session.commit()
return True
def check_rate_limit(session: Session, user_id: int) -> bool:
"""
Check if user has exceeded rate limit for password reset requests.
Rate limit: MAX_REQUESTS_PER_HOUR requests per hour
Args:
session: Database session
user_id: User ID to check
Returns:
True if user is within rate limit, False if exceeded
Example:
>>> can_request = check_rate_limit(session, user_id=1)
>>> if not can_request:
... print("Rate limit exceeded")
"""
# Get tokens created in the last hour
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
statement = select(PasswordResetToken).where(
PasswordResetToken.user_id == user_id,
PasswordResetToken.created_at >= one_hour_ago
)
recent_tokens = session.exec(statement).all()
# Check if user has exceeded rate limit
return len(recent_tokens) < MAX_REQUESTS_PER_HOUR
def validate_password_strength(password: str) -> Dict[str, any]:
"""
Validate password strength requirements.
Requirements:
- At least 8 characters
- Contains uppercase letter
- Contains lowercase letter
- Contains number
Args:
password: Password to validate
Returns:
Dictionary with 'valid' boolean and 'errors' list
Example:
>>> result = validate_password_strength("weak")
>>> print(result)
{'valid': False, 'errors': ['Password must be at least 8 characters', ...]}
"""
errors = []
# Check length
if len(password) < 8:
errors.append("Password must be at least 8 characters")
# Check for uppercase
if not any(c.isupper() for c in password):
errors.append("Password must contain at least one uppercase letter")
# Check for lowercase
if not any(c.islower() for c in password):
errors.append("Password must contain at least one lowercase letter")
# Check for number
if not any(c.isdigit() for c in password):
errors.append("Password must contain at least one number")
return {
"valid": len(errors) == 0,
"errors": errors
}
def cleanup_expired_tokens(session: Session) -> int:
"""
Delete expired password reset tokens from database.
This should be run periodically as a background job.
Args:
session: Database session
Returns:
Number of tokens deleted
Example:
>>> deleted_count = cleanup_expired_tokens(session)
>>> print(f"Deleted {deleted_count} expired tokens")
"""
# Find expired tokens
statement = select(PasswordResetToken).where(
PasswordResetToken.expires_at < datetime.utcnow()
)
expired_tokens = session.exec(statement).all()
# Delete expired tokens
for token in expired_tokens:
session.delete(token)
session.commit()
return len(expired_tokens)
def get_user_by_email(session: Session, email: str) -> Optional[User]:
"""
Get user by email address.
Args:
session: Database session
email: User email address
Returns:
User if found, None otherwise
Example:
>>> user = get_user_by_email(session, "user@example.com")
>>> if user:
... print(f"Found user {user.id}")
"""
statement = select(User).where(User.email == email)
return session.exec(statement).first()
|