File size: 10,487 Bytes
ca371b2
 
 
 
 
 
 
ea9b5e5
ca371b2
 
 
d06b99c
 
 
 
 
ca371b2
 
 
 
 
 
d06b99c
a3d7db1
ca371b2
d06b99c
 
 
 
 
2a49dd7
ea9b5e5
d06b99c
2a49dd7
d06b99c
 
 
ea9b5e5
ca371b2
 
8bcd80c
 
ea9b5e5
ca371b2
8bcd80c
ca371b2
 
 
8bcd80c
 
ea9b5e5
8bcd80c
 
 
ca371b2
 
8bcd80c
 
ea9b5e5
0dbb26a
8bcd80c
0dbb26a
ca371b2
 
8bcd80c
ea9b5e5
ca371b2
8bcd80c
ca371b2
a3d7db1
 
e71677a
 
2a49dd7
e71677a
 
eec0e56
ca371b2
 
 
 
a3d7db1
e71677a
 
eee83a0
e71677a
 
eee83a0
 
 
 
a3d7db1
e71677a
 
a3d7db1
 
e71677a
 
 
 
a3d7db1
 
e71677a
 
a3d7db1
 
eee83a0
e71677a
eee83a0
 
 
 
 
 
 
e71677a
eee83a0
ca371b2
 
d06b99c
 
 
 
 
 
 
 
 
ca371b2
 
 
 
 
 
 
 
 
 
d06b99c
ca371b2
d06b99c
ca371b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a3d7db1
 
 
ca371b2
a3d7db1
e71677a
 
a3d7db1
e71677a
a3d7db1
 
 
 
 
ca371b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""
TreeTrack Authentication Module
Simple session-based authentication with predefined users
"""

import hashlib
import secrets
import os
from typing import Dict, Optional, Any
from datetime import datetime, timedelta
import logging
import bcrypt
from constants import (
    SESSION_TIMEOUT, AUTH_TOKEN_LENGTH, DEV_PASSWORDS, 
    BCRYPT_ROUNDS, REQUIRED_ENV_VARS
)

logger = logging.getLogger(__name__)

class AuthManager:
    def __init__(self):
        self.sessions: Dict[str, Dict[str, Any]] = {}
        self.session_timeout = SESSION_TIMEOUT
        self.conference_session_token = None
        
        # Get passwords from environment variables with defaults for development
        aalekh_password = os.getenv('AALEKH_PASSWORD', DEV_PASSWORDS['AALEKH_PASSWORD'])
        admin_password = os.getenv('ADMIN_PASSWORD', DEV_PASSWORDS['ADMIN_PASSWORD'])
        ishita_password = os.getenv('ISHITA_PASSWORD', DEV_PASSWORDS['ISHITA_PASSWORD'])
        jeeb_password = os.getenv('JEEB_PASSWORD', DEV_PASSWORDS['JEEB_PASSWORD'])
        demo_password = os.getenv('DEMO_PASSWORD', DEV_PASSWORDS.get('DEMO_PASSWORD'))
        
        # Warn if using development passwords
        env_vars = ['AALEKH_PASSWORD', 'ADMIN_PASSWORD', 'ISHITA_PASSWORD', 'JEEB_PASSWORD', 'DEMO_PASSWORD']
        missing_vars = [var for var in env_vars if not os.getenv(var)]
        if missing_vars:
            logger.warning(f"Using default development passwords for: {', '.join(missing_vars)}. Set these environment variables for production!")
        
        # Predefined user accounts (in production, use a database)
        self.users = {
            # Administrator account
            "aalekh": {
                "password_hash": self._hash_password(aalekh_password),
                "role": "admin",
                "full_name": "Aalekh",
                "permissions": ["read", "write", "delete", "admin"]
            },
            
            # System account (for admin use)
            "admin": {
                "password_hash": self._hash_password(admin_password),
                "role": "admin", 
                "full_name": "System Administrator",
                "permissions": ["read", "write", "delete", "admin"]
            },
            
            # User accounts
            "ishita": {
                "password_hash": self._hash_password(ishita_password),
                "role": "admin",
                "full_name": "Ishita", 
                "permissions": ["read", "write", "delete", "admin"]
            },
            
            "jeeb": {
                "password_hash": self._hash_password(jeeb_password),
                "role": "researcher",
                "full_name": "Jeeb",
                "permissions": ["read", "write", "edit_own"]
            },
            
            # Demo account for public demonstrations
            "demo_user": {
                "password_hash": self._hash_password(demo_password),
                "role": "demo_user",
                "full_name": "Demo Account",
                "permissions": ["read", "demo_view", "demo_interact", "map_view", "demo_navigation"]
            }
        }
        
        logger.info(f"AuthManager initialized with {len(self.users)} user accounts")
    
    def create_demo_session(self) -> Optional[Dict[str, Any]]:
        """Create a new demo session when requested"""
        try:
            demo_user = self.users.get("demo_user")
            if not demo_user:
                return None
            
            # Create session token
            session_token = secrets.token_urlsafe(AUTH_TOKEN_LENGTH)
            
            # Extended timeout for demo (12 hours)
            demo_timeout = timedelta(hours=12)
            
            session_data = {
                "username": "demo_user",
                "role": demo_user["role"],
                "full_name": demo_user["full_name"],
                "permissions": demo_user["permissions"],
                "created_at": datetime.now(),
                "last_activity": datetime.now(),
                "is_demo_session": True,
                "session_timeout": demo_timeout
            }
            
            self.sessions[session_token] = session_data
            logger.info("Demo session created")
            
            return {
                "token": session_token,
                "user": session_data
            }
            
        except Exception as e:
            logger.error(f"Error creating demo session: {e}")
            return None
    
    def _hash_password(self, password: str) -> str:
        """Hash password using bcrypt with automatic salt generation"""
        # Generate salt and hash password with bcrypt
        salt = bcrypt.gensalt()
        hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
        return hashed.decode('utf-8')
    
    def _verify_password(self, password: str, hashed: str) -> bool:
        """Verify password against hash using bcrypt"""
        return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
    
    def authenticate(self, username: str, password: str) -> Optional[Dict[str, Any]]:
        """Authenticate user credentials"""
        try:
            if username not in self.users:
                logger.warning(f"Authentication attempt with unknown username: {username}")
                return None
            
            user = self.users[username]
            
            if self._verify_password(password, user["password_hash"]):
                # Create session
                session_token = secrets.token_urlsafe(AUTH_TOKEN_LENGTH)
                session_data = {
                    "username": username,
                    "role": user["role"],
                    "full_name": user["full_name"],
                    "permissions": user["permissions"],
                    "created_at": datetime.now(),
                    "last_activity": datetime.now()
                }
                
                self.sessions[session_token] = session_data
                logger.info(f"User {username} authenticated successfully")
                
                return {
                    "token": session_token,
                    "user": session_data
                }
            else:
                logger.warning(f"Invalid password for user: {username}")
                return None
                
        except Exception as e:
            logger.error(f"Authentication error for {username}: {e}")
            return None
    
    def validate_session(self, token: str) -> Optional[Dict[str, Any]]:
        """Validate session token and return user data"""
        try:
            if not token or token not in self.sessions:
                return None
            
            session = self.sessions[token]
            now = datetime.now()
            
            # Use extended timeout for conference sessions
            timeout = session.get("session_timeout", self.session_timeout)
            
            # Check if session has expired
            if now - session["last_activity"] > timeout:
                # Don't delete demo sessions, just refresh them
                if session.get("is_demo_session"):
                    session["last_activity"] = now
                    logger.info(f"Demo session refreshed for: {session['username']}")
                    return session
                else:
                    del self.sessions[token]
                    logger.info(f"Session expired for user: {session['username']}")
                    return None
            
            # Update last activity
            session["last_activity"] = now
            return session
            
        except Exception as e:
            logger.error(f"Session validation error: {e}")
            return None
    
    def logout(self, token: str) -> bool:
        """Logout user and invalidate session"""
        try:
            if token in self.sessions:
                username = self.sessions[token]["username"]
                del self.sessions[token]
                logger.info(f"User {username} logged out")
                return True
            return False
        except Exception as e:
            logger.error(f"Logout error: {e}")
            return False
    
    def has_permission(self, token: str, permission: str) -> bool:
        """Check if user has specific permission"""
        session = self.validate_session(token)
        if not session:
            return False
        return permission in session.get("permissions", [])
    
    def can_edit_tree(self, token: str, tree_created_by: str) -> bool:
        """Check if user can edit a specific tree"""
        session = self.validate_session(token)
        if not session:
            return False
        
        # Admin and system can edit any tree
        if "admin" in session["permissions"] or "system" in session["permissions"]:
            return True
        
        # Users can edit trees they created
        if "edit_own" in session["permissions"] and tree_created_by == session["username"]:
            return True
        
        # Users with delete permission can edit any tree
        if "delete" in session["permissions"]:
            return True
        
        return False
    
    def can_delete_tree(self, token: str, tree_created_by: str) -> bool:
        """Check if user can delete a specific tree"""
        session = self.validate_session(token)
        if not session:
            return False
        
        # Only admin and system can delete trees
        if "admin" in session["permissions"] or "system" in session["permissions"]:
            return True
        
        # Users with explicit delete permission
        if "delete" in session["permissions"]:
            return True
        
        return False
    
    def cleanup_expired_sessions(self):
        """Remove expired sessions (can be called periodically)"""
        now = datetime.now()
        expired_tokens = []
        
        for token, session in self.sessions.items():
            if now - session["last_activity"] > self.session_timeout:
                expired_tokens.append(token)
        
        for token in expired_tokens:
            username = self.sessions[token]["username"]
            del self.sessions[token]
            logger.info(f"Cleaned up expired session for user: {username}")
        
        return len(expired_tokens)

# Global auth manager instance
auth_manager = AuthManager()