DigitalPal / digipal /auth /session_manager.py
BladeSzaSza's picture
🥚 Initial DigiPal deployment to HuggingFace Spaces🤖 Generated with [Claude Code](https://claude.ai/code)Co-Authored-By: Claude <noreply@anthropic.com>
4399e64
"""
Session management for DigiPal authentication system.
"""
import logging
import json
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from pathlib import Path
from .models import User, AuthSession, AuthStatus
from ..storage.database import DatabaseConnection
logger = logging.getLogger(__name__)
class SessionManager:
"""Manages user sessions with secure token storage and caching."""
def __init__(self, db_connection: DatabaseConnection, cache_dir: Optional[str] = None):
"""
Initialize session manager.
Args:
db_connection: Database connection for persistent storage
cache_dir: Directory for session cache files (optional)
"""
self.db = db_connection
self.cache_dir = Path(cache_dir) if cache_dir else Path.home() / '.digipal' / 'cache'
self.cache_dir.mkdir(parents=True, exist_ok=True)
# In-memory session cache for performance
self._session_cache: Dict[str, AuthSession] = {}
# Load existing sessions from database
self._load_sessions_from_db()
def create_session(self, user: User, token: str, expires_hours: int = 24, is_offline: bool = False) -> AuthSession:
"""
Create a new authentication session.
Args:
user: Authenticated user
token: Authentication token
expires_hours: Session expiration in hours
is_offline: Whether this is an offline session
Returns:
Created authentication session
"""
expires_at = datetime.now() + timedelta(hours=expires_hours)
session = AuthSession(
user_id=user.id,
token=token,
expires_at=expires_at,
is_offline=is_offline
)
# Ensure user exists in database before saving session
self._ensure_user_exists(user)
# Store in database
self._save_session_to_db(session)
# Cache in memory
self._session_cache[user.id] = session
# Save to file cache for offline access
if not is_offline:
self._save_session_to_cache(session)
logger.info(f"Created session for user {user.id} (offline: {is_offline})")
return session
def get_session(self, user_id: str) -> Optional[AuthSession]:
"""
Get session for user ID.
Args:
user_id: User ID to get session for
Returns:
Authentication session if found and valid, None otherwise
"""
# Check memory cache first
if user_id in self._session_cache:
session = self._session_cache[user_id]
if session.is_valid:
session.refresh_access()
return session
else:
# Remove expired session
del self._session_cache[user_id]
self._remove_session_from_db(user_id)
# Try to load from database
session = self._load_session_from_db(user_id)
if session and session.is_valid:
self._session_cache[user_id] = session
session.refresh_access()
return session
# Try to load from cache for offline mode
cached_session = self._load_session_from_cache(user_id)
if cached_session:
# Mark as offline session
cached_session.is_offline = True
cached_session.extend_session(hours=168) # 1 week for offline
self._session_cache[user_id] = cached_session
return cached_session
return None
def validate_session(self, user_id: str, token: str) -> bool:
"""
Validate session token for user.
Args:
user_id: User ID
token: Token to validate
Returns:
True if session is valid, False otherwise
"""
session = self.get_session(user_id)
if not session:
return False
# For offline sessions, we're more lenient with token validation
if session.is_offline:
return self._hash_token(token) == self._hash_token(session.token)
return session.token == token and session.is_valid
def refresh_session(self, user_id: str, extend_hours: int = 24) -> bool:
"""
Refresh session expiration.
Args:
user_id: User ID
extend_hours: Hours to extend session
Returns:
True if session was refreshed, False otherwise
"""
session = self.get_session(user_id)
if not session:
return False
session.extend_session(extend_hours)
self._save_session_to_db(session)
if not session.is_offline:
self._save_session_to_cache(session)
logger.info(f"Refreshed session for user {user_id}")
return True
def revoke_session(self, user_id: str) -> bool:
"""
Revoke user session.
Args:
user_id: User ID
Returns:
True if session was revoked, False if not found
"""
# Remove from memory cache
if user_id in self._session_cache:
del self._session_cache[user_id]
# Remove from database
removed_from_db = self._remove_session_from_db(user_id)
# Remove from file cache
self._remove_session_from_cache(user_id)
if removed_from_db:
logger.info(f"Revoked session for user {user_id}")
return removed_from_db
def cleanup_expired_sessions(self) -> int:
"""
Clean up expired sessions from storage.
Returns:
Number of sessions cleaned up
"""
cleaned_count = 0
# Clean memory cache
expired_users = [
user_id for user_id, session in self._session_cache.items()
if session.is_expired
]
for user_id in expired_users:
del self._session_cache[user_id]
cleaned_count += 1
# Clean database
try:
db_cleaned = self.db.execute_update(
'DELETE FROM users WHERE session_data IS NOT NULL AND '
'json_extract(session_data, "$.expires_at") < ?',
(datetime.now().isoformat(),)
)
cleaned_count += db_cleaned
except Exception as e:
logger.error(f"Error cleaning expired sessions from database: {e}")
if cleaned_count > 0:
logger.info(f"Cleaned up {cleaned_count} expired sessions")
return cleaned_count
def _save_session_to_db(self, session: AuthSession) -> None:
"""Save session to database."""
try:
session_json = json.dumps(session.to_dict())
self.db.execute_update(
'''UPDATE users SET session_data = ?, last_login = ?
WHERE id = ?''',
(session_json, session.last_accessed.isoformat(), session.user_id)
)
except Exception as e:
logger.error(f"Error saving session to database: {e}")
def _load_session_from_db(self, user_id: str) -> Optional[AuthSession]:
"""Load session from database."""
try:
rows = self.db.execute_query(
'SELECT session_data FROM users WHERE id = ? AND session_data IS NOT NULL',
(user_id,)
)
if rows:
session_data = json.loads(rows[0]['session_data'])
return AuthSession.from_dict(session_data)
except Exception as e:
logger.error(f"Error loading session from database: {e}")
return None
def _remove_session_from_db(self, user_id: str) -> bool:
"""Remove session from database."""
try:
# First check if user exists
rows = self.db.execute_query('SELECT id FROM users WHERE id = ?', (user_id,))
if not rows:
return False
affected = self.db.execute_update(
'UPDATE users SET session_data = NULL WHERE id = ?',
(user_id,)
)
return affected > 0
except Exception as e:
logger.error(f"Error removing session from database: {e}")
return False
def _save_session_to_cache(self, session: AuthSession) -> None:
"""Save session to file cache for offline access."""
try:
cache_file = self.cache_dir / f"session_{self._hash_user_id(session.user_id)}.json"
# Only cache essential session data for offline use
cache_data = {
'user_id': session.user_id,
'token_hash': self._hash_token(session.token),
'expires_at': session.expires_at.isoformat(),
'created_at': session.created_at.isoformat(),
'cached_at': datetime.now().isoformat()
}
with open(cache_file, 'w') as f:
json.dump(cache_data, f)
except Exception as e:
logger.error(f"Error saving session to cache: {e}")
def _load_session_from_cache(self, user_id: str) -> Optional[AuthSession]:
"""Load session from file cache."""
try:
cache_file = self.cache_dir / f"session_{self._hash_user_id(user_id)}.json"
if not cache_file.exists():
return None
with open(cache_file, 'r') as f:
cache_data = json.load(f)
# Check if cache is not too old (max 1 week)
cached_at = datetime.fromisoformat(cache_data['cached_at'])
if datetime.now() - cached_at > timedelta(days=7):
cache_file.unlink() # Remove old cache
return None
# Create session from cache (token will be validated separately)
return AuthSession(
user_id=cache_data['user_id'],
token=cache_data['token_hash'], # This is hashed, will need special handling
expires_at=datetime.fromisoformat(cache_data['expires_at']),
created_at=datetime.fromisoformat(cache_data['created_at']),
is_offline=True
)
except Exception as e:
logger.error(f"Error loading session from cache: {e}")
return None
def _remove_session_from_cache(self, user_id: str) -> None:
"""Remove session from file cache."""
try:
cache_file = self.cache_dir / f"session_{self._hash_user_id(user_id)}.json"
if cache_file.exists():
cache_file.unlink()
except Exception as e:
logger.error(f"Error removing session from cache: {e}")
def _load_sessions_from_db(self) -> None:
"""Load all valid sessions from database into memory cache."""
try:
rows = self.db.execute_query(
'SELECT id, session_data FROM users WHERE session_data IS NOT NULL'
)
for row in rows:
try:
session_data = json.loads(row['session_data'])
session = AuthSession.from_dict(session_data)
if session.is_valid:
self._session_cache[row['id']] = session
except Exception as e:
logger.warning(f"Error loading session for user {row['id']}: {e}")
except Exception as e:
logger.error(f"Error loading sessions from database: {e}")
def _hash_token(self, token: str) -> str:
"""Hash token for secure storage."""
return hashlib.sha256(token.encode()).hexdigest()
def _hash_user_id(self, user_id: str) -> str:
"""Hash user ID for cache file naming."""
return hashlib.md5(user_id.encode()).hexdigest()[:16]
def _ensure_user_exists(self, user: User) -> None:
"""Ensure user exists in database before creating session."""
try:
# Check if user exists
rows = self.db.execute_query('SELECT id FROM users WHERE id = ?', (user.id,))
if not rows:
# Create user record
self.db.execute_update(
'''INSERT INTO users (id, username, created_at, last_login)
VALUES (?, ?, ?, ?)''',
(user.id, user.username,
user.created_at.isoformat() if user.created_at else datetime.now().isoformat(),
user.last_login.isoformat() if user.last_login else None)
)
logger.info(f"Created user record for session: {user.id}")
except Exception as e:
logger.error(f"Error ensuring user exists: {e}")