henaiv2 / session_manager.py
joashsam's picture
Create session_manager.py
beec165 verified
Raw
History Blame Contribute Delete
7.03 kB
"""
session_manager.py - Session Management for User Authentication
Handles session token generation, validation, and expiry
"""
import uuid
import hashlib
import secrets
from datetime import datetime, timedelta
import logging
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
# ===========================================
# CONFIGURATION
# ===========================================
# Session expiry (30 days)
SESSION_EXPIRY_DAYS = 30
# In-memory session store (for demo - use database in production)
# Structure: {session_token: {'user_id': user_id, 'expires_at': timestamp, 'created_at': timestamp}}
_session_store: Dict[str, Dict[str, Any]] = {}
# ===========================================
# SESSION MANAGEMENT
# ===========================================
def generate_session_token() -> str:
"""
Generate a secure random session token.
"""
# Use secrets for cryptographically secure random token
return secrets.token_urlsafe(32)
def create_session(user_id: str) -> Dict[str, Any]:
"""
Create a new session for a user.
Returns session token and expiry information.
"""
session_token = generate_session_token()
expires_at = datetime.now() + timedelta(days=SESSION_EXPIRY_DAYS)
_session_store[session_token] = {
'user_id': user_id,
'expires_at': expires_at.timestamp(),
'created_at': datetime.now().timestamp(),
'last_accessed': datetime.now().timestamp()
}
logger.debug(f"Created session for user {user_id}")
return {
'session_token': session_token,
'expires_at': expires_at.isoformat(),
'expires_in_days': SESSION_EXPIRY_DAYS
}
def validate_session(session_token: str) -> Optional[str]:
"""
Validate a session token and return user_id if valid.
Also refreshes the session expiry if within refresh window.
"""
if session_token not in _session_store:
return None
session_data = _session_store[session_token]
expires_at = datetime.fromtimestamp(session_data['expires_at'])
# Check if expired
if datetime.now() > expires_at:
# Session expired, remove it
delete_session(session_token)
logger.debug(f"Session expired: {session_token[:16]}...")
return None
# Update last accessed time
session_data['last_accessed'] = datetime.now().timestamp()
# Refresh session if within 7 days of expiry (rolling expiry)
# This implements the 30-day rolling requirement
days_remaining = (expires_at - datetime.now()).days
if days_remaining < 7:
# Extend by another 30 days
new_expiry = datetime.now() + timedelta(days=SESSION_EXPIRY_DAYS)
session_data['expires_at'] = new_expiry.timestamp()
logger.debug(f"Refreshed session for user {session_data['user_id']}")
return session_data['user_id']
def delete_session(session_token: str) -> bool:
"""
Delete a session (logout).
"""
if session_token in _session_store:
user_id = _session_store[session_token]['user_id']
del _session_store[session_token]
logger.debug(f"Deleted session for user {user_id}")
return True
return False
def delete_all_user_sessions(user_id: str) -> int:
"""
Delete all sessions for a user (e.g., when changing password).
Returns number of sessions deleted.
"""
deleted = 0
tokens_to_delete = []
for token, data in _session_store.items():
if data['user_id'] == user_id:
tokens_to_delete.append(token)
for token in tokens_to_delete:
del _session_store[token]
deleted += 1
if deleted > 0:
logger.info(f"Deleted {deleted} sessions for user {user_id}")
return deleted
def get_session_info(session_token: str) -> Optional[Dict[str, Any]]:
"""
Get information about a session (without validating).
"""
if session_token not in _session_store:
return None
data = _session_store[session_token]
return {
'user_id': data['user_id'],
'expires_at': datetime.fromtimestamp(data['expires_at']).isoformat(),
'created_at': datetime.fromtimestamp(data['created_at']).isoformat(),
'last_accessed': datetime.fromtimestamp(data['last_accessed']).isoformat()
}
def get_active_session_count(user_id: str) -> int:
"""
Get the number of active sessions for a user.
"""
count = 0
for token, data in _session_store.items():
if data['user_id'] == user_id:
# Check if still valid
expires_at = datetime.fromtimestamp(data['expires_at'])
if datetime.now() <= expires_at:
count += 1
return count
def cleanup_expired_sessions() -> int:
"""
Remove all expired sessions from memory.
Returns number of sessions removed.
"""
now = datetime.now()
expired = []
for token, data in _session_store.items():
expires_at = datetime.fromtimestamp(data['expires_at'])
if now > expires_at:
expired.append(token)
for token in expired:
del _session_store[token]
if expired:
logger.debug(f"Cleaned up {len(expired)} expired sessions")
return len(expired)
# ===========================================
# PASSWORD HASHING (for user management)
# ===========================================
def hash_password(password: str, salt: str = None) -> tuple:
"""
Hash a password with salt.
Returns (hashed_password, salt)
"""
if salt is None:
salt = secrets.token_hex(16)
# Use PBKDF2 for secure password hashing
hashed = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000 # Number of iterations
).hex()
return hashed, salt
def verify_password(password: str, hashed_password: str, salt: str) -> bool:
"""
Verify a password against its hash and salt.
"""
test_hash, _ = hash_password(password, salt)
return test_hash == hashed_password
# ===========================================
# USER MANAGEMENT HELPERS
# ===========================================
def generate_user_id() -> str:
"""
Generate a unique user ID.
"""
return str(uuid.uuid4())
def is_valid_email(email: str) -> bool:
"""
Basic email validation.
"""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def is_valid_username(username: str) -> bool:
"""
Validate username (alphanumeric + underscore, 3-30 chars).
"""
import re
pattern = r'^[a-zA-Z0-9_]{3,30}$'
return re.match(pattern, username) is not None
# ===========================================
# INITIALIZATION
# ===========================================
# Run cleanup on module load (optional)
cleanup_expired_sessions()
logger.info("Session manager initialized")