TreeTrack / auth.py
RoyAalekh's picture
Security: remove auto-login demo endpoint, require password for demo_user
2a49dd7
raw
history blame
10.5 kB
"""
TreeTrack Authentication Module
Simple session-based authentication with predefined users
"""
import hashlib
import secrets
import os
from typing import Dict, Optional, Any
from datetime import datetime, timedelta
import logging
import bcrypt
from constants import (
SESSION_TIMEOUT, AUTH_TOKEN_LENGTH, DEV_PASSWORDS,
BCRYPT_ROUNDS, REQUIRED_ENV_VARS
)
logger = logging.getLogger(__name__)
class AuthManager:
def __init__(self):
self.sessions: Dict[str, Dict[str, Any]] = {}
self.session_timeout = SESSION_TIMEOUT
self.conference_session_token = None
# Get passwords from environment variables with defaults for development
aalekh_password = os.getenv('AALEKH_PASSWORD', DEV_PASSWORDS['AALEKH_PASSWORD'])
admin_password = os.getenv('ADMIN_PASSWORD', DEV_PASSWORDS['ADMIN_PASSWORD'])
ishita_password = os.getenv('ISHITA_PASSWORD', DEV_PASSWORDS['ISHITA_PASSWORD'])
jeeb_password = os.getenv('JEEB_PASSWORD', DEV_PASSWORDS['JEEB_PASSWORD'])
demo_password = os.getenv('DEMO_PASSWORD', DEV_PASSWORDS.get('DEMO_PASSWORD'))
# Warn if using development passwords
env_vars = ['AALEKH_PASSWORD', 'ADMIN_PASSWORD', 'ISHITA_PASSWORD', 'JEEB_PASSWORD', 'DEMO_PASSWORD']
missing_vars = [var for var in env_vars if not os.getenv(var)]
if missing_vars:
logger.warning(f"Using default development passwords for: {', '.join(missing_vars)}. Set these environment variables for production!")
# Predefined user accounts (in production, use a database)
self.users = {
# Administrator account
"aalekh": {
"password_hash": self._hash_password(aalekh_password),
"role": "admin",
"full_name": "Aalekh",
"permissions": ["read", "write", "delete", "admin"]
},
# System account (for admin use)
"admin": {
"password_hash": self._hash_password(admin_password),
"role": "admin",
"full_name": "System Administrator",
"permissions": ["read", "write", "delete", "admin"]
},
# User accounts
"ishita": {
"password_hash": self._hash_password(ishita_password),
"role": "admin",
"full_name": "Ishita",
"permissions": ["read", "write", "delete", "admin"]
},
"jeeb": {
"password_hash": self._hash_password(jeeb_password),
"role": "researcher",
"full_name": "Jeeb",
"permissions": ["read", "write", "edit_own"]
},
# Demo account for public demonstrations
"demo_user": {
"password_hash": self._hash_password(demo_password),
"role": "demo_user",
"full_name": "Demo Account",
"permissions": ["read", "demo_view", "demo_interact", "map_view", "demo_navigation"]
}
}
logger.info(f"AuthManager initialized with {len(self.users)} user accounts")
def create_demo_session(self) -> Optional[Dict[str, Any]]:
"""Create a new demo session when requested"""
try:
demo_user = self.users.get("demo_user")
if not demo_user:
return None
# Create session token
session_token = secrets.token_urlsafe(AUTH_TOKEN_LENGTH)
# Extended timeout for demo (12 hours)
demo_timeout = timedelta(hours=12)
session_data = {
"username": "demo_user",
"role": demo_user["role"],
"full_name": demo_user["full_name"],
"permissions": demo_user["permissions"],
"created_at": datetime.now(),
"last_activity": datetime.now(),
"is_demo_session": True,
"session_timeout": demo_timeout
}
self.sessions[session_token] = session_data
logger.info("Demo session created")
return {
"token": session_token,
"user": session_data
}
except Exception as e:
logger.error(f"Error creating demo session: {e}")
return None
def _hash_password(self, password: str) -> str:
"""Hash password using bcrypt with automatic salt generation"""
# Generate salt and hash password with bcrypt
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def _verify_password(self, password: str, hashed: str) -> bool:
"""Verify password against hash using bcrypt"""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def authenticate(self, username: str, password: str) -> Optional[Dict[str, Any]]:
"""Authenticate user credentials"""
try:
if username not in self.users:
logger.warning(f"Authentication attempt with unknown username: {username}")
return None
user = self.users[username]
if self._verify_password(password, user["password_hash"]):
# Create session
session_token = secrets.token_urlsafe(AUTH_TOKEN_LENGTH)
session_data = {
"username": username,
"role": user["role"],
"full_name": user["full_name"],
"permissions": user["permissions"],
"created_at": datetime.now(),
"last_activity": datetime.now()
}
self.sessions[session_token] = session_data
logger.info(f"User {username} authenticated successfully")
return {
"token": session_token,
"user": session_data
}
else:
logger.warning(f"Invalid password for user: {username}")
return None
except Exception as e:
logger.error(f"Authentication error for {username}: {e}")
return None
def validate_session(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate session token and return user data"""
try:
if not token or token not in self.sessions:
return None
session = self.sessions[token]
now = datetime.now()
# Use extended timeout for conference sessions
timeout = session.get("session_timeout", self.session_timeout)
# Check if session has expired
if now - session["last_activity"] > timeout:
# Don't delete demo sessions, just refresh them
if session.get("is_demo_session"):
session["last_activity"] = now
logger.info(f"Demo session refreshed for: {session['username']}")
return session
else:
del self.sessions[token]
logger.info(f"Session expired for user: {session['username']}")
return None
# Update last activity
session["last_activity"] = now
return session
except Exception as e:
logger.error(f"Session validation error: {e}")
return None
def logout(self, token: str) -> bool:
"""Logout user and invalidate session"""
try:
if token in self.sessions:
username = self.sessions[token]["username"]
del self.sessions[token]
logger.info(f"User {username} logged out")
return True
return False
except Exception as e:
logger.error(f"Logout error: {e}")
return False
def has_permission(self, token: str, permission: str) -> bool:
"""Check if user has specific permission"""
session = self.validate_session(token)
if not session:
return False
return permission in session.get("permissions", [])
def can_edit_tree(self, token: str, tree_created_by: str) -> bool:
"""Check if user can edit a specific tree"""
session = self.validate_session(token)
if not session:
return False
# Admin and system can edit any tree
if "admin" in session["permissions"] or "system" in session["permissions"]:
return True
# Users can edit trees they created
if "edit_own" in session["permissions"] and tree_created_by == session["username"]:
return True
# Users with delete permission can edit any tree
if "delete" in session["permissions"]:
return True
return False
def can_delete_tree(self, token: str, tree_created_by: str) -> bool:
"""Check if user can delete a specific tree"""
session = self.validate_session(token)
if not session:
return False
# Only admin and system can delete trees
if "admin" in session["permissions"] or "system" in session["permissions"]:
return True
# Users with explicit delete permission
if "delete" in session["permissions"]:
return True
return False
def cleanup_expired_sessions(self):
"""Remove expired sessions (can be called periodically)"""
now = datetime.now()
expired_tokens = []
for token, session in self.sessions.items():
if now - session["last_activity"] > self.session_timeout:
expired_tokens.append(token)
for token in expired_tokens:
username = self.sessions[token]["username"]
del self.sessions[token]
logger.info(f"Cleaned up expired session for user: {username}")
return len(expired_tokens)
# Global auth manager instance
auth_manager = AuthManager()