| """ |
| session_manager.py - Session Management for User Authentication |
| Handles session token generation, validation, and expiry |
| """ |
|
|
| import uuid |
| import hashlib |
| import secrets |
| from datetime import datetime, timedelta |
| import logging |
| from typing import Optional, Dict, Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| |
| SESSION_EXPIRY_DAYS = 30 |
|
|
| |
| |
| _session_store: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
| |
| |
| |
|
|
| def generate_session_token() -> str: |
| """ |
| Generate a secure random session token. |
| """ |
| |
| return secrets.token_urlsafe(32) |
|
|
|
|
| def create_session(user_id: str) -> Dict[str, Any]: |
| """ |
| Create a new session for a user. |
| Returns session token and expiry information. |
| """ |
| session_token = generate_session_token() |
| expires_at = datetime.now() + timedelta(days=SESSION_EXPIRY_DAYS) |
| |
| _session_store[session_token] = { |
| 'user_id': user_id, |
| 'expires_at': expires_at.timestamp(), |
| 'created_at': datetime.now().timestamp(), |
| 'last_accessed': datetime.now().timestamp() |
| } |
| |
| logger.debug(f"Created session for user {user_id}") |
| |
| return { |
| 'session_token': session_token, |
| 'expires_at': expires_at.isoformat(), |
| 'expires_in_days': SESSION_EXPIRY_DAYS |
| } |
|
|
|
|
| def validate_session(session_token: str) -> Optional[str]: |
| """ |
| Validate a session token and return user_id if valid. |
| Also refreshes the session expiry if within refresh window. |
| """ |
| if session_token not in _session_store: |
| return None |
| |
| session_data = _session_store[session_token] |
| expires_at = datetime.fromtimestamp(session_data['expires_at']) |
| |
| |
| if datetime.now() > expires_at: |
| |
| delete_session(session_token) |
| logger.debug(f"Session expired: {session_token[:16]}...") |
| return None |
| |
| |
| session_data['last_accessed'] = datetime.now().timestamp() |
| |
| |
| |
| days_remaining = (expires_at - datetime.now()).days |
| if days_remaining < 7: |
| |
| new_expiry = datetime.now() + timedelta(days=SESSION_EXPIRY_DAYS) |
| session_data['expires_at'] = new_expiry.timestamp() |
| logger.debug(f"Refreshed session for user {session_data['user_id']}") |
| |
| return session_data['user_id'] |
|
|
|
|
| def delete_session(session_token: str) -> bool: |
| """ |
| Delete a session (logout). |
| """ |
| if session_token in _session_store: |
| user_id = _session_store[session_token]['user_id'] |
| del _session_store[session_token] |
| logger.debug(f"Deleted session for user {user_id}") |
| return True |
| return False |
|
|
|
|
| def delete_all_user_sessions(user_id: str) -> int: |
| """ |
| Delete all sessions for a user (e.g., when changing password). |
| Returns number of sessions deleted. |
| """ |
| deleted = 0 |
| tokens_to_delete = [] |
| |
| for token, data in _session_store.items(): |
| if data['user_id'] == user_id: |
| tokens_to_delete.append(token) |
| |
| for token in tokens_to_delete: |
| del _session_store[token] |
| deleted += 1 |
| |
| if deleted > 0: |
| logger.info(f"Deleted {deleted} sessions for user {user_id}") |
| |
| return deleted |
|
|
|
|
| def get_session_info(session_token: str) -> Optional[Dict[str, Any]]: |
| """ |
| Get information about a session (without validating). |
| """ |
| if session_token not in _session_store: |
| return None |
| |
| data = _session_store[session_token] |
| return { |
| 'user_id': data['user_id'], |
| 'expires_at': datetime.fromtimestamp(data['expires_at']).isoformat(), |
| 'created_at': datetime.fromtimestamp(data['created_at']).isoformat(), |
| 'last_accessed': datetime.fromtimestamp(data['last_accessed']).isoformat() |
| } |
|
|
|
|
| def get_active_session_count(user_id: str) -> int: |
| """ |
| Get the number of active sessions for a user. |
| """ |
| count = 0 |
| for token, data in _session_store.items(): |
| if data['user_id'] == user_id: |
| |
| expires_at = datetime.fromtimestamp(data['expires_at']) |
| if datetime.now() <= expires_at: |
| count += 1 |
| return count |
|
|
|
|
| def cleanup_expired_sessions() -> int: |
| """ |
| Remove all expired sessions from memory. |
| Returns number of sessions removed. |
| """ |
| now = datetime.now() |
| expired = [] |
| |
| for token, data in _session_store.items(): |
| expires_at = datetime.fromtimestamp(data['expires_at']) |
| if now > expires_at: |
| expired.append(token) |
| |
| for token in expired: |
| del _session_store[token] |
| |
| if expired: |
| logger.debug(f"Cleaned up {len(expired)} expired sessions") |
| |
| return len(expired) |
|
|
|
|
| |
| |
| |
|
|
| def hash_password(password: str, salt: str = None) -> tuple: |
| """ |
| Hash a password with salt. |
| Returns (hashed_password, salt) |
| """ |
| if salt is None: |
| salt = secrets.token_hex(16) |
| |
| |
| hashed = hashlib.pbkdf2_hmac( |
| 'sha256', |
| password.encode('utf-8'), |
| salt.encode('utf-8'), |
| 100000 |
| ).hex() |
| |
| return hashed, salt |
|
|
|
|
| def verify_password(password: str, hashed_password: str, salt: str) -> bool: |
| """ |
| Verify a password against its hash and salt. |
| """ |
| test_hash, _ = hash_password(password, salt) |
| return test_hash == hashed_password |
|
|
|
|
| |
| |
| |
|
|
| def generate_user_id() -> str: |
| """ |
| Generate a unique user ID. |
| """ |
| return str(uuid.uuid4()) |
|
|
|
|
| def is_valid_email(email: str) -> bool: |
| """ |
| Basic email validation. |
| """ |
| import re |
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
| return re.match(pattern, email) is not None |
|
|
|
|
| def is_valid_username(username: str) -> bool: |
| """ |
| Validate username (alphanumeric + underscore, 3-30 chars). |
| """ |
| import re |
| pattern = r'^[a-zA-Z0-9_]{3,30}$' |
| return re.match(pattern, username) is not None |
|
|
|
|
| |
| |
| |
|
|
| |
| cleanup_expired_sessions() |
|
|
| logger.info("Session manager initialized") |