File size: 6,276 Bytes
64deb3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""
Authentication Service with Role-Based Access
Handles user registration, login, and JWT token management
Supports Admin and Employee roles
Uses ChromaDB for user storage
"""

import bcrypt
import jwt
import time
from datetime import datetime, timedelta
from config import Config
from services.chroma_service import chroma_service


class AuthService:
    def __init__(self):
        self.jwt_secret = Config.JWT_SECRET
        self.jwt_expiry_hours = Config.JWT_EXPIRY_HOURS
    
    def _hash_password(self, password: str) -> str:
        """Hash password using bcrypt"""
        salt = bcrypt.gensalt()
        return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
    
    def _verify_password(self, password: str, hashed: str) -> bool:
        """Verify password against hash"""
        return bcrypt.checkpw(
            password.encode('utf-8'),
            hashed.encode('utf-8')
        )
    
    def _generate_token(self, user_id: str, username: str, role: str) -> str:
        """Generate JWT token with role"""
        payload = {
            "user_id": user_id,
            "username": username,
            "role": role,
            "exp": datetime.utcnow() + timedelta(hours=self.jwt_expiry_hours),
            "iat": datetime.utcnow()
        }
        return jwt.encode(payload, self.jwt_secret, algorithm="HS256")
    
    def verify_token(self, token: str) -> dict | None:
        """Verify and decode JWT token"""
        try:
            payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
            return {
                "user_id": payload['user_id'],
                "username": payload['username'],
                "role": payload.get('role', 'employee')
            }
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def register_admin(self, username: str, password: str, email: str = "") -> dict:
        """
        Register a new admin user
        Returns: {"success": bool, "token": str, "user_id": str, "error": str}
        """
        # Validate input
        if not username or len(username) < 3:
            return {"success": False, "error": "Username must be at least 3 characters"}
        
        if not password or len(password) < 6:
            return {"success": False, "error": "Password must be at least 6 characters"}
        
        # Check if user exists
        existing = chroma_service.get_user(username)
        if existing:
            return {"success": False, "error": "Username already exists"}
        
        # Hash password and create admin user
        password_hash = self._hash_password(password)
        result = chroma_service.create_user(username, password_hash, email, role="admin")
        
        if "error" in result:
            return {"success": False, "error": result['error']}
        
        # Generate token
        token = self._generate_token(result['user_id'], username, "admin")
        
        return {
            "success": True,
            "token": token,
            "user_id": result['user_id'],
            "username": username,
            "role": "admin"
        }
    
    def register_employee(self, admin_user_id: str, email: str, password: str) -> dict:
        """
        Admin registers an employee
        Returns: {"success": bool, "user_id": str, "error": str}
        """
        # Validate input
        if not email or "@" not in email:
            return {"success": False, "error": "Valid email is required"}
        
        if not password or len(password) < 6:
            return {"success": False, "error": "Password must be at least 6 characters"}
        
        # Check if employee email already exists
        existing = chroma_service.get_user(email)
        if existing:
            return {"success": False, "error": "Employee with this email already exists"}
        
        # Hash password and create employee user
        password_hash = self._hash_password(password)
        result = chroma_service.create_user(
            username=email,
            password_hash=password_hash,
            email=email,
            role="employee",
            admin_id=admin_user_id
        )
        
        if "error" in result:
            return {"success": False, "error": result['error']}
        
        return {
            "success": True,
            "user_id": result['user_id'],
            "email": email
        }
    
    def login(self, username: str, password: str, role: str = "admin") -> dict:
        """
        Login user with role check
        Returns: {"success": bool, "token": str, "user_id": str, "error": str}
        """
        # Get user
        user = chroma_service.get_user(username)
        
        if not user:
            return {"success": False, "error": "Invalid credentials"}
        
        # Verify password
        if not self._verify_password(password, user['password_hash']):
            return {"success": False, "error": "Invalid credentials"}
        
        # Verify role matches
        user_role = user.get('role', 'admin')
        if user_role != role:
            if role == "admin":
                return {"success": False, "error": "This account is not an admin account"}
            else:
                return {"success": False, "error": "This account is not an employee account"}
        
        # Generate token
        token = self._generate_token(user['user_id'], username, user_role)
        
        return {
            "success": True,
            "token": token,
            "user_id": user['user_id'],
            "username": username,
            "role": user_role
        }
    
    def get_admin_employees(self, admin_user_id: str) -> list:
        """Get all employees created by an admin"""
        return chroma_service.get_employees_by_admin(admin_user_id)
    
    def delete_employee(self, admin_user_id: str, employee_id: str) -> bool:
        """Admin deletes an employee"""
        return chroma_service.delete_employee(admin_user_id, employee_id)
    
    def get_current_user(self, token: str) -> dict | None:
        """Get current user from token"""
        return self.verify_token(token)


# Singleton instance
auth_service = AuthService()