Spaces:
Runtime error
Runtime error
| """ | |
| TreeTrack Authentication Module | |
| Simple session-based authentication with predefined users | |
| """ | |
| import hashlib | |
| import secrets | |
| import os | |
| from typing import Dict, Optional, Any | |
| from datetime import datetime, timedelta | |
| import logging | |
| import bcrypt | |
| from constants import ( | |
| SESSION_TIMEOUT, AUTH_TOKEN_LENGTH, DEV_PASSWORDS, | |
| BCRYPT_ROUNDS, REQUIRED_ENV_VARS | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class AuthManager: | |
| def __init__(self): | |
| self.sessions: Dict[str, Dict[str, Any]] = {} | |
| self.session_timeout = SESSION_TIMEOUT | |
| self.conference_session_token = None | |
| # Get passwords from environment variables with defaults for development | |
| aalekh_password = os.getenv('AALEKH_PASSWORD', DEV_PASSWORDS['AALEKH_PASSWORD']) | |
| admin_password = os.getenv('ADMIN_PASSWORD', DEV_PASSWORDS['ADMIN_PASSWORD']) | |
| ishita_password = os.getenv('ISHITA_PASSWORD', DEV_PASSWORDS['ISHITA_PASSWORD']) | |
| jeeb_password = os.getenv('JEEB_PASSWORD', DEV_PASSWORDS['JEEB_PASSWORD']) | |
| demo_password = os.getenv('DEMO_PASSWORD', DEV_PASSWORDS.get('DEMO_PASSWORD')) | |
| # Warn if using development passwords | |
| env_vars = ['AALEKH_PASSWORD', 'ADMIN_PASSWORD', 'ISHITA_PASSWORD', 'JEEB_PASSWORD', 'DEMO_PASSWORD'] | |
| missing_vars = [var for var in env_vars if not os.getenv(var)] | |
| if missing_vars: | |
| logger.warning(f"Using default development passwords for: {', '.join(missing_vars)}. Set these environment variables for production!") | |
| # Predefined user accounts (in production, use a database) | |
| self.users = { | |
| # Administrator account | |
| "aalekh": { | |
| "password_hash": self._hash_password(aalekh_password), | |
| "role": "admin", | |
| "full_name": "Aalekh", | |
| "permissions": ["read", "write", "delete", "admin"] | |
| }, | |
| # System account (for admin use) | |
| "admin": { | |
| "password_hash": self._hash_password(admin_password), | |
| "role": "admin", | |
| "full_name": "System Administrator", | |
| "permissions": ["read", "write", "delete", "admin"] | |
| }, | |
| # User accounts | |
| "ishita": { | |
| "password_hash": self._hash_password(ishita_password), | |
| "role": "admin", | |
| "full_name": "Ishita", | |
| "permissions": ["read", "write", "delete", "admin"] | |
| }, | |
| "jeeb": { | |
| "password_hash": self._hash_password(jeeb_password), | |
| "role": "researcher", | |
| "full_name": "Jeeb", | |
| "permissions": ["read", "write", "edit_own"] | |
| }, | |
| # Demo account for public demonstrations | |
| "demo_user": { | |
| "password_hash": self._hash_password(demo_password), | |
| "role": "demo_user", | |
| "full_name": "Demo Account", | |
| "permissions": ["read", "demo_view", "demo_interact", "map_view", "demo_navigation"] | |
| } | |
| } | |
| logger.info(f"AuthManager initialized with {len(self.users)} user accounts") | |
| def create_demo_session(self) -> Optional[Dict[str, Any]]: | |
| """Create a new demo session when requested""" | |
| try: | |
| demo_user = self.users.get("demo_user") | |
| if not demo_user: | |
| return None | |
| # Create session token | |
| session_token = secrets.token_urlsafe(AUTH_TOKEN_LENGTH) | |
| # Extended timeout for demo (12 hours) | |
| demo_timeout = timedelta(hours=12) | |
| session_data = { | |
| "username": "demo_user", | |
| "role": demo_user["role"], | |
| "full_name": demo_user["full_name"], | |
| "permissions": demo_user["permissions"], | |
| "created_at": datetime.now(), | |
| "last_activity": datetime.now(), | |
| "is_demo_session": True, | |
| "session_timeout": demo_timeout | |
| } | |
| self.sessions[session_token] = session_data | |
| logger.info("Demo session created") | |
| return { | |
| "token": session_token, | |
| "user": session_data | |
| } | |
| except Exception as e: | |
| logger.error(f"Error creating demo session: {e}") | |
| return None | |
| def _hash_password(self, password: str) -> str: | |
| """Hash password using bcrypt with automatic salt generation""" | |
| # Generate salt and hash password with bcrypt | |
| salt = bcrypt.gensalt() | |
| hashed = bcrypt.hashpw(password.encode('utf-8'), salt) | |
| return hashed.decode('utf-8') | |
| def _verify_password(self, password: str, hashed: str) -> bool: | |
| """Verify password against hash using bcrypt""" | |
| return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) | |
| def authenticate(self, username: str, password: str) -> Optional[Dict[str, Any]]: | |
| """Authenticate user credentials""" | |
| try: | |
| if username not in self.users: | |
| logger.warning(f"Authentication attempt with unknown username: {username}") | |
| return None | |
| user = self.users[username] | |
| if self._verify_password(password, user["password_hash"]): | |
| # Create session | |
| session_token = secrets.token_urlsafe(AUTH_TOKEN_LENGTH) | |
| session_data = { | |
| "username": username, | |
| "role": user["role"], | |
| "full_name": user["full_name"], | |
| "permissions": user["permissions"], | |
| "created_at": datetime.now(), | |
| "last_activity": datetime.now() | |
| } | |
| self.sessions[session_token] = session_data | |
| logger.info(f"User {username} authenticated successfully") | |
| return { | |
| "token": session_token, | |
| "user": session_data | |
| } | |
| else: | |
| logger.warning(f"Invalid password for user: {username}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Authentication error for {username}: {e}") | |
| return None | |
| def validate_session(self, token: str) -> Optional[Dict[str, Any]]: | |
| """Validate session token and return user data""" | |
| try: | |
| if not token or token not in self.sessions: | |
| return None | |
| session = self.sessions[token] | |
| now = datetime.now() | |
| # Use extended timeout for conference sessions | |
| timeout = session.get("session_timeout", self.session_timeout) | |
| # Check if session has expired | |
| if now - session["last_activity"] > timeout: | |
| # Don't delete demo sessions, just refresh them | |
| if session.get("is_demo_session"): | |
| session["last_activity"] = now | |
| logger.info(f"Demo session refreshed for: {session['username']}") | |
| return session | |
| else: | |
| del self.sessions[token] | |
| logger.info(f"Session expired for user: {session['username']}") | |
| return None | |
| # Update last activity | |
| session["last_activity"] = now | |
| return session | |
| except Exception as e: | |
| logger.error(f"Session validation error: {e}") | |
| return None | |
| def logout(self, token: str) -> bool: | |
| """Logout user and invalidate session""" | |
| try: | |
| if token in self.sessions: | |
| username = self.sessions[token]["username"] | |
| del self.sessions[token] | |
| logger.info(f"User {username} logged out") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Logout error: {e}") | |
| return False | |
| def has_permission(self, token: str, permission: str) -> bool: | |
| """Check if user has specific permission""" | |
| session = self.validate_session(token) | |
| if not session: | |
| return False | |
| return permission in session.get("permissions", []) | |
| def can_edit_tree(self, token: str, tree_created_by: str) -> bool: | |
| """Check if user can edit a specific tree""" | |
| session = self.validate_session(token) | |
| if not session: | |
| return False | |
| # Admin and system can edit any tree | |
| if "admin" in session["permissions"] or "system" in session["permissions"]: | |
| return True | |
| # Users can edit trees they created | |
| if "edit_own" in session["permissions"] and tree_created_by == session["username"]: | |
| return True | |
| # Users with delete permission can edit any tree | |
| if "delete" in session["permissions"]: | |
| return True | |
| return False | |
| def can_delete_tree(self, token: str, tree_created_by: str) -> bool: | |
| """Check if user can delete a specific tree""" | |
| session = self.validate_session(token) | |
| if not session: | |
| return False | |
| # Only admin and system can delete trees | |
| if "admin" in session["permissions"] or "system" in session["permissions"]: | |
| return True | |
| # Users with explicit delete permission | |
| if "delete" in session["permissions"]: | |
| return True | |
| return False | |
| def cleanup_expired_sessions(self): | |
| """Remove expired sessions (can be called periodically)""" | |
| now = datetime.now() | |
| expired_tokens = [] | |
| for token, session in self.sessions.items(): | |
| if now - session["last_activity"] > self.session_timeout: | |
| expired_tokens.append(token) | |
| for token in expired_tokens: | |
| username = self.sessions[token]["username"] | |
| del self.sessions[token] | |
| logger.info(f"Cleaned up expired session for user: {username}") | |
| return len(expired_tokens) | |
| # Global auth manager instance | |
| auth_manager = AuthManager() | |