""" session_manager.py - Session Management for User Authentication Handles session token generation, validation, and expiry """ import uuid import hashlib import secrets from datetime import datetime, timedelta import logging from typing import Optional, Dict, Any logger = logging.getLogger(__name__) # =========================================== # CONFIGURATION # =========================================== # Session expiry (30 days) SESSION_EXPIRY_DAYS = 30 # In-memory session store (for demo - use database in production) # Structure: {session_token: {'user_id': user_id, 'expires_at': timestamp, 'created_at': timestamp}} _session_store: Dict[str, Dict[str, Any]] = {} # =========================================== # SESSION MANAGEMENT # =========================================== def generate_session_token() -> str: """ Generate a secure random session token. """ # Use secrets for cryptographically secure random token return secrets.token_urlsafe(32) def create_session(user_id: str) -> Dict[str, Any]: """ Create a new session for a user. Returns session token and expiry information. """ session_token = generate_session_token() expires_at = datetime.now() + timedelta(days=SESSION_EXPIRY_DAYS) _session_store[session_token] = { 'user_id': user_id, 'expires_at': expires_at.timestamp(), 'created_at': datetime.now().timestamp(), 'last_accessed': datetime.now().timestamp() } logger.debug(f"Created session for user {user_id}") return { 'session_token': session_token, 'expires_at': expires_at.isoformat(), 'expires_in_days': SESSION_EXPIRY_DAYS } def validate_session(session_token: str) -> Optional[str]: """ Validate a session token and return user_id if valid. Also refreshes the session expiry if within refresh window. """ if session_token not in _session_store: return None session_data = _session_store[session_token] expires_at = datetime.fromtimestamp(session_data['expires_at']) # Check if expired if datetime.now() > expires_at: # Session expired, remove it delete_session(session_token) logger.debug(f"Session expired: {session_token[:16]}...") return None # Update last accessed time session_data['last_accessed'] = datetime.now().timestamp() # Refresh session if within 7 days of expiry (rolling expiry) # This implements the 30-day rolling requirement days_remaining = (expires_at - datetime.now()).days if days_remaining < 7: # Extend by another 30 days new_expiry = datetime.now() + timedelta(days=SESSION_EXPIRY_DAYS) session_data['expires_at'] = new_expiry.timestamp() logger.debug(f"Refreshed session for user {session_data['user_id']}") return session_data['user_id'] def delete_session(session_token: str) -> bool: """ Delete a session (logout). """ if session_token in _session_store: user_id = _session_store[session_token]['user_id'] del _session_store[session_token] logger.debug(f"Deleted session for user {user_id}") return True return False def delete_all_user_sessions(user_id: str) -> int: """ Delete all sessions for a user (e.g., when changing password). Returns number of sessions deleted. """ deleted = 0 tokens_to_delete = [] for token, data in _session_store.items(): if data['user_id'] == user_id: tokens_to_delete.append(token) for token in tokens_to_delete: del _session_store[token] deleted += 1 if deleted > 0: logger.info(f"Deleted {deleted} sessions for user {user_id}") return deleted def get_session_info(session_token: str) -> Optional[Dict[str, Any]]: """ Get information about a session (without validating). """ if session_token not in _session_store: return None data = _session_store[session_token] return { 'user_id': data['user_id'], 'expires_at': datetime.fromtimestamp(data['expires_at']).isoformat(), 'created_at': datetime.fromtimestamp(data['created_at']).isoformat(), 'last_accessed': datetime.fromtimestamp(data['last_accessed']).isoformat() } def get_active_session_count(user_id: str) -> int: """ Get the number of active sessions for a user. """ count = 0 for token, data in _session_store.items(): if data['user_id'] == user_id: # Check if still valid expires_at = datetime.fromtimestamp(data['expires_at']) if datetime.now() <= expires_at: count += 1 return count def cleanup_expired_sessions() -> int: """ Remove all expired sessions from memory. Returns number of sessions removed. """ now = datetime.now() expired = [] for token, data in _session_store.items(): expires_at = datetime.fromtimestamp(data['expires_at']) if now > expires_at: expired.append(token) for token in expired: del _session_store[token] if expired: logger.debug(f"Cleaned up {len(expired)} expired sessions") return len(expired) # =========================================== # PASSWORD HASHING (for user management) # =========================================== def hash_password(password: str, salt: str = None) -> tuple: """ Hash a password with salt. Returns (hashed_password, salt) """ if salt is None: salt = secrets.token_hex(16) # Use PBKDF2 for secure password hashing hashed = hashlib.pbkdf2_hmac( 'sha256', password.encode('utf-8'), salt.encode('utf-8'), 100000 # Number of iterations ).hex() return hashed, salt def verify_password(password: str, hashed_password: str, salt: str) -> bool: """ Verify a password against its hash and salt. """ test_hash, _ = hash_password(password, salt) return test_hash == hashed_password # =========================================== # USER MANAGEMENT HELPERS # =========================================== def generate_user_id() -> str: """ Generate a unique user ID. """ return str(uuid.uuid4()) def is_valid_email(email: str) -> bool: """ Basic email validation. """ import re pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def is_valid_username(username: str) -> bool: """ Validate username (alphanumeric + underscore, 3-30 chars). """ import re pattern = r'^[a-zA-Z0-9_]{3,30}$' return re.match(pattern, username) is not None # =========================================== # INITIALIZATION # =========================================== # Run cleanup on module load (optional) cleanup_expired_sessions() logger.info("Session manager initialized")