notebooklm-fast / services /auth_service.py
jashdoshi77
feat: Add AI-powered query understanding with DeepSeek parsing
64deb3c
"""
Authentication Service with Role-Based Access
Handles user registration, login, and JWT token management
Supports Admin and Employee roles
Uses ChromaDB for user storage
"""
import bcrypt
import jwt
import time
from datetime import datetime, timedelta
from config import Config
from services.chroma_service import chroma_service
class AuthService:
def __init__(self):
self.jwt_secret = Config.JWT_SECRET
self.jwt_expiry_hours = Config.JWT_EXPIRY_HOURS
def _hash_password(self, password: str) -> str:
"""Hash password using bcrypt"""
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def _verify_password(self, password: str, hashed: str) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(
password.encode('utf-8'),
hashed.encode('utf-8')
)
def _generate_token(self, user_id: str, username: str, role: str) -> str:
"""Generate JWT token with role"""
payload = {
"user_id": user_id,
"username": username,
"role": role,
"exp": datetime.utcnow() + timedelta(hours=self.jwt_expiry_hours),
"iat": datetime.utcnow()
}
return jwt.encode(payload, self.jwt_secret, algorithm="HS256")
def verify_token(self, token: str) -> dict | None:
"""Verify and decode JWT token"""
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=["HS256"])
return {
"user_id": payload['user_id'],
"username": payload['username'],
"role": payload.get('role', 'employee')
}
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def register_admin(self, username: str, password: str, email: str = "") -> dict:
"""
Register a new admin user
Returns: {"success": bool, "token": str, "user_id": str, "error": str}
"""
# Validate input
if not username or len(username) < 3:
return {"success": False, "error": "Username must be at least 3 characters"}
if not password or len(password) < 6:
return {"success": False, "error": "Password must be at least 6 characters"}
# Check if user exists
existing = chroma_service.get_user(username)
if existing:
return {"success": False, "error": "Username already exists"}
# Hash password and create admin user
password_hash = self._hash_password(password)
result = chroma_service.create_user(username, password_hash, email, role="admin")
if "error" in result:
return {"success": False, "error": result['error']}
# Generate token
token = self._generate_token(result['user_id'], username, "admin")
return {
"success": True,
"token": token,
"user_id": result['user_id'],
"username": username,
"role": "admin"
}
def register_employee(self, admin_user_id: str, email: str, password: str) -> dict:
"""
Admin registers an employee
Returns: {"success": bool, "user_id": str, "error": str}
"""
# Validate input
if not email or "@" not in email:
return {"success": False, "error": "Valid email is required"}
if not password or len(password) < 6:
return {"success": False, "error": "Password must be at least 6 characters"}
# Check if employee email already exists
existing = chroma_service.get_user(email)
if existing:
return {"success": False, "error": "Employee with this email already exists"}
# Hash password and create employee user
password_hash = self._hash_password(password)
result = chroma_service.create_user(
username=email,
password_hash=password_hash,
email=email,
role="employee",
admin_id=admin_user_id
)
if "error" in result:
return {"success": False, "error": result['error']}
return {
"success": True,
"user_id": result['user_id'],
"email": email
}
def login(self, username: str, password: str, role: str = "admin") -> dict:
"""
Login user with role check
Returns: {"success": bool, "token": str, "user_id": str, "error": str}
"""
# Get user
user = chroma_service.get_user(username)
if not user:
return {"success": False, "error": "Invalid credentials"}
# Verify password
if not self._verify_password(password, user['password_hash']):
return {"success": False, "error": "Invalid credentials"}
# Verify role matches
user_role = user.get('role', 'admin')
if user_role != role:
if role == "admin":
return {"success": False, "error": "This account is not an admin account"}
else:
return {"success": False, "error": "This account is not an employee account"}
# Generate token
token = self._generate_token(user['user_id'], username, user_role)
return {
"success": True,
"token": token,
"user_id": user['user_id'],
"username": username,
"role": user_role
}
def get_admin_employees(self, admin_user_id: str) -> list:
"""Get all employees created by an admin"""
return chroma_service.get_employees_by_admin(admin_user_id)
def delete_employee(self, admin_user_id: str, employee_id: str) -> bool:
"""Admin deletes an employee"""
return chroma_service.delete_employee(admin_user_id, employee_id)
def get_current_user(self, token: str) -> dict | None:
"""Get current user from token"""
return self.verify_token(token)
# Singleton instance
auth_service = AuthService()