Spaces:
Running
Running
| """ | |
| Authentication Service with Role-Based Access | |
| Handles user registration, login, and JWT token management | |
| Supports Admin and Employee roles | |
| Uses ChromaDB for user storage | |
| """ | |
| import bcrypt | |
| import jwt | |
| import time | |
| from datetime import datetime, timedelta | |
| from config import Config | |
| from services.chroma_service import chroma_service | |
| class AuthService: | |
| def __init__(self): | |
| self.jwt_secret = Config.JWT_SECRET | |
| self.jwt_expiry_hours = Config.JWT_EXPIRY_HOURS | |
| def _hash_password(self, password: str) -> str: | |
| """Hash password using bcrypt""" | |
| salt = bcrypt.gensalt() | |
| return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8') | |
| def _verify_password(self, password: str, hashed: str) -> bool: | |
| """Verify password against hash""" | |
| return bcrypt.checkpw( | |
| password.encode('utf-8'), | |
| hashed.encode('utf-8') | |
| ) | |
| def _generate_token(self, user_id: str, username: str, role: str) -> str: | |
| """Generate JWT token with role""" | |
| payload = { | |
| "user_id": user_id, | |
| "username": username, | |
| "role": role, | |
| "exp": datetime.utcnow() + timedelta(hours=self.jwt_expiry_hours), | |
| "iat": datetime.utcnow() | |
| } | |
| return jwt.encode(payload, self.jwt_secret, algorithm="HS256") | |
| def verify_token(self, token: str) -> dict | None: | |
| """Verify and decode JWT token""" | |
| try: | |
| payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"]) | |
| return { | |
| "user_id": payload['user_id'], | |
| "username": payload['username'], | |
| "role": payload.get('role', 'employee') | |
| } | |
| except jwt.ExpiredSignatureError: | |
| return None | |
| except jwt.InvalidTokenError: | |
| return None | |
| def register_admin(self, username: str, password: str, email: str = "") -> dict: | |
| """ | |
| Register a new admin user | |
| Returns: {"success": bool, "token": str, "user_id": str, "error": str} | |
| """ | |
| # Validate input | |
| if not username or len(username) < 3: | |
| return {"success": False, "error": "Username must be at least 3 characters"} | |
| if not password or len(password) < 6: | |
| return {"success": False, "error": "Password must be at least 6 characters"} | |
| # Check if user exists | |
| existing = chroma_service.get_user(username) | |
| if existing: | |
| return {"success": False, "error": "Username already exists"} | |
| # Hash password and create admin user | |
| password_hash = self._hash_password(password) | |
| result = chroma_service.create_user(username, password_hash, email, role="admin") | |
| if "error" in result: | |
| return {"success": False, "error": result['error']} | |
| # Generate token | |
| token = self._generate_token(result['user_id'], username, "admin") | |
| return { | |
| "success": True, | |
| "token": token, | |
| "user_id": result['user_id'], | |
| "username": username, | |
| "role": "admin" | |
| } | |
| def register_employee(self, admin_user_id: str, email: str, password: str) -> dict: | |
| """ | |
| Admin registers an employee | |
| Returns: {"success": bool, "user_id": str, "error": str} | |
| """ | |
| # Validate input | |
| if not email or "@" not in email: | |
| return {"success": False, "error": "Valid email is required"} | |
| if not password or len(password) < 6: | |
| return {"success": False, "error": "Password must be at least 6 characters"} | |
| # Check if employee email already exists | |
| existing = chroma_service.get_user(email) | |
| if existing: | |
| return {"success": False, "error": "Employee with this email already exists"} | |
| # Hash password and create employee user | |
| password_hash = self._hash_password(password) | |
| result = chroma_service.create_user( | |
| username=email, | |
| password_hash=password_hash, | |
| email=email, | |
| role="employee", | |
| admin_id=admin_user_id | |
| ) | |
| if "error" in result: | |
| return {"success": False, "error": result['error']} | |
| return { | |
| "success": True, | |
| "user_id": result['user_id'], | |
| "email": email | |
| } | |
| def login(self, username: str, password: str, role: str = "admin") -> dict: | |
| """ | |
| Login user with role check | |
| Returns: {"success": bool, "token": str, "user_id": str, "error": str} | |
| """ | |
| # Get user | |
| user = chroma_service.get_user(username) | |
| if not user: | |
| return {"success": False, "error": "Invalid credentials"} | |
| # Verify password | |
| if not self._verify_password(password, user['password_hash']): | |
| return {"success": False, "error": "Invalid credentials"} | |
| # Verify role matches | |
| user_role = user.get('role', 'admin') | |
| if user_role != role: | |
| if role == "admin": | |
| return {"success": False, "error": "This account is not an admin account"} | |
| else: | |
| return {"success": False, "error": "This account is not an employee account"} | |
| # Generate token | |
| token = self._generate_token(user['user_id'], username, user_role) | |
| return { | |
| "success": True, | |
| "token": token, | |
| "user_id": user['user_id'], | |
| "username": username, | |
| "role": user_role | |
| } | |
| def get_admin_employees(self, admin_user_id: str) -> list: | |
| """Get all employees created by an admin""" | |
| return chroma_service.get_employees_by_admin(admin_user_id) | |
| def delete_employee(self, admin_user_id: str, employee_id: str) -> bool: | |
| """Admin deletes an employee""" | |
| return chroma_service.delete_employee(admin_user_id, employee_id) | |
| def get_current_user(self, token: str) -> dict | None: | |
| """Get current user from token""" | |
| return self.verify_token(token) | |
| # Singleton instance | |
| auth_service = AuthService() | |