""" TreeTrack Authentication Module Simple session-based authentication with predefined users """ import hashlib import secrets import os from typing import Dict, Optional, Any from datetime import datetime, timedelta import logging import bcrypt from constants import ( SESSION_TIMEOUT, AUTH_TOKEN_LENGTH, DEV_PASSWORDS, BCRYPT_ROUNDS, REQUIRED_ENV_VARS ) logger = logging.getLogger(__name__) class AuthManager: def __init__(self): self.sessions: Dict[str, Dict[str, Any]] = {} self.session_timeout = SESSION_TIMEOUT self.conference_session_token = None # Get passwords from environment variables with defaults for development aalekh_password = os.getenv('AALEKH_PASSWORD', DEV_PASSWORDS['AALEKH_PASSWORD']) admin_password = os.getenv('ADMIN_PASSWORD', DEV_PASSWORDS['ADMIN_PASSWORD']) ishita_password = os.getenv('ISHITA_PASSWORD', DEV_PASSWORDS['ISHITA_PASSWORD']) jeeb_password = os.getenv('JEEB_PASSWORD', DEV_PASSWORDS['JEEB_PASSWORD']) demo_password = os.getenv('DEMO_PASSWORD', DEV_PASSWORDS.get('DEMO_PASSWORD')) # Warn if using development passwords env_vars = ['AALEKH_PASSWORD', 'ADMIN_PASSWORD', 'ISHITA_PASSWORD', 'JEEB_PASSWORD', 'DEMO_PASSWORD'] missing_vars = [var for var in env_vars if not os.getenv(var)] if missing_vars: logger.warning(f"Using default development passwords for: {', '.join(missing_vars)}. Set these environment variables for production!") # Predefined user accounts (in production, use a database) self.users = { # Administrator account "aalekh": { "password_hash": self._hash_password(aalekh_password), "role": "admin", "full_name": "Aalekh", "permissions": ["read", "write", "delete", "admin"] }, # System account (for admin use) "admin": { "password_hash": self._hash_password(admin_password), "role": "admin", "full_name": "System Administrator", "permissions": ["read", "write", "delete", "admin"] }, # User accounts "ishita": { "password_hash": self._hash_password(ishita_password), "role": "admin", "full_name": "Ishita", "permissions": ["read", "write", "delete", "admin"] }, "jeeb": { "password_hash": self._hash_password(jeeb_password), "role": "researcher", "full_name": "Jeeb", "permissions": ["read", "write", "edit_own"] }, # Demo account for public demonstrations "demo_user": { "password_hash": self._hash_password(demo_password), "role": "demo_user", "full_name": "Demo Account", "permissions": ["read", "demo_view", "demo_interact", "map_view", "demo_navigation"] } } logger.info(f"AuthManager initialized with {len(self.users)} user accounts") def create_demo_session(self) -> Optional[Dict[str, Any]]: """Create a new demo session when requested""" try: demo_user = self.users.get("demo_user") if not demo_user: return None # Create session token session_token = secrets.token_urlsafe(AUTH_TOKEN_LENGTH) # Extended timeout for demo (12 hours) demo_timeout = timedelta(hours=12) session_data = { "username": "demo_user", "role": demo_user["role"], "full_name": demo_user["full_name"], "permissions": demo_user["permissions"], "created_at": datetime.now(), "last_activity": datetime.now(), "is_demo_session": True, "session_timeout": demo_timeout } self.sessions[session_token] = session_data logger.info("Demo session created") return { "token": session_token, "user": session_data } except Exception as e: logger.error(f"Error creating demo session: {e}") return None def _hash_password(self, password: str) -> str: """Hash password using bcrypt with automatic salt generation""" # Generate salt and hash password with bcrypt salt = bcrypt.gensalt() hashed = bcrypt.hashpw(password.encode('utf-8'), salt) return hashed.decode('utf-8') def _verify_password(self, password: str, hashed: str) -> bool: """Verify password against hash using bcrypt""" return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) def authenticate(self, username: str, password: str) -> Optional[Dict[str, Any]]: """Authenticate user credentials""" try: if username not in self.users: logger.warning(f"Authentication attempt with unknown username: {username}") return None user = self.users[username] if self._verify_password(password, user["password_hash"]): # Create session session_token = secrets.token_urlsafe(AUTH_TOKEN_LENGTH) session_data = { "username": username, "role": user["role"], "full_name": user["full_name"], "permissions": user["permissions"], "created_at": datetime.now(), "last_activity": datetime.now() } self.sessions[session_token] = session_data logger.info(f"User {username} authenticated successfully") return { "token": session_token, "user": session_data } else: logger.warning(f"Invalid password for user: {username}") return None except Exception as e: logger.error(f"Authentication error for {username}: {e}") return None def validate_session(self, token: str) -> Optional[Dict[str, Any]]: """Validate session token and return user data""" try: if not token or token not in self.sessions: return None session = self.sessions[token] now = datetime.now() # Use extended timeout for conference sessions timeout = session.get("session_timeout", self.session_timeout) # Check if session has expired if now - session["last_activity"] > timeout: # Don't delete demo sessions, just refresh them if session.get("is_demo_session"): session["last_activity"] = now logger.info(f"Demo session refreshed for: {session['username']}") return session else: del self.sessions[token] logger.info(f"Session expired for user: {session['username']}") return None # Update last activity session["last_activity"] = now return session except Exception as e: logger.error(f"Session validation error: {e}") return None def logout(self, token: str) -> bool: """Logout user and invalidate session""" try: if token in self.sessions: username = self.sessions[token]["username"] del self.sessions[token] logger.info(f"User {username} logged out") return True return False except Exception as e: logger.error(f"Logout error: {e}") return False def has_permission(self, token: str, permission: str) -> bool: """Check if user has specific permission""" session = self.validate_session(token) if not session: return False return permission in session.get("permissions", []) def can_edit_tree(self, token: str, tree_created_by: str) -> bool: """Check if user can edit a specific tree""" session = self.validate_session(token) if not session: return False # Admin and system can edit any tree if "admin" in session["permissions"] or "system" in session["permissions"]: return True # Users can edit trees they created if "edit_own" in session["permissions"] and tree_created_by == session["username"]: return True # Users with delete permission can edit any tree if "delete" in session["permissions"]: return True return False def can_delete_tree(self, token: str, tree_created_by: str) -> bool: """Check if user can delete a specific tree""" session = self.validate_session(token) if not session: return False # Only admin and system can delete trees if "admin" in session["permissions"] or "system" in session["permissions"]: return True # Users with explicit delete permission if "delete" in session["permissions"]: return True return False def cleanup_expired_sessions(self): """Remove expired sessions (can be called periodically)""" now = datetime.now() expired_tokens = [] for token, session in self.sessions.items(): if now - session["last_activity"] > self.session_timeout: expired_tokens.append(token) for token in expired_tokens: username = self.sessions[token]["username"] del self.sessions[token] logger.info(f"Cleaned up expired session for user: {username}") return len(expired_tokens) # Global auth manager instance auth_manager = AuthManager()