| """
|
| User management and authentication services
|
| """
|
| from datetime import datetime, timedelta
|
| from typing import Optional, List, Tuple
|
| from sqlalchemy.orm import Session
|
| from sqlalchemy.exc import IntegrityError
|
| from core.models.user import User, UserRole, UserStatus, UserSession, UserPermission
|
|
|
| class UserService:
|
| def __init__(self, db: Session):
|
| self.db = db
|
|
|
| def create_user(self, username: str, password: str, role: UserRole = UserRole.USER) -> Tuple[bool, str, Optional[User]]:
|
| """
|
| Create a new user
|
| Returns: (success, message, user)
|
| """
|
| try:
|
| user = User(
|
| username=username,
|
| password_hash=User.hash_password(password),
|
| role=role,
|
| status=UserStatus.ACTIVE
|
| )
|
| self.db.add(user)
|
| self.db.commit()
|
| self.db.refresh(user)
|
| return True, "User created successfully", user
|
| except IntegrityError:
|
| self.db.rollback()
|
| return False, "Username already exists", None
|
| except Exception as e:
|
| self.db.rollback()
|
| return False, f"Error creating user: {str(e)}", None
|
|
|
| def authenticate_user(self, username: str, password: str) -> Tuple[bool, str, Optional[User]]:
|
| """
|
| Authenticate user credentials
|
| Returns: (success, message, user)
|
| """
|
| user = self.db.query(User).filter(User.username == username).first()
|
|
|
| if not user:
|
| return False, "Invalid username or password", None
|
|
|
| if user.is_locked():
|
| return False, f"Account is locked until {user.lockout_until}", None
|
|
|
| if user.status != UserStatus.ACTIVE:
|
| return False, f"Account is {user.status.value}", None
|
|
|
| if not user.verify_password(password):
|
| user.record_login_attempt(success=False)
|
| self.db.commit()
|
| return False, "Invalid username or password", None
|
|
|
| user.record_login_attempt(success=True)
|
| self.db.commit()
|
| return True, "Authentication successful", user
|
|
|
| def create_session(self, user: User, ip_address: str, device_info: str = None) -> UserSession:
|
| """Create a new session for user"""
|
| session = UserSession(
|
| user_id=user.id,
|
| token=self._generate_session_token(),
|
| ip_address=ip_address,
|
| device_info=device_info,
|
| expires_at=datetime.utcnow() + timedelta(days=1)
|
| )
|
| self.db.add(session)
|
| self.db.commit()
|
| self.db.refresh(session)
|
| return session
|
|
|
| def validate_session(self, token: str) -> Tuple[bool, str, Optional[UserSession]]:
|
| """Validate session token"""
|
| session = self.db.query(UserSession).filter(UserSession.token == token).first()
|
|
|
| if not session:
|
| return False, "Invalid session", None
|
|
|
| if session.expires_at < datetime.utcnow():
|
| return False, "Session expired", None
|
|
|
|
|
| session.last_active = datetime.utcnow()
|
| self.db.commit()
|
|
|
| return True, "Session valid", session
|
|
|
| def get_user_permissions(self, user_id: int) -> List[UserPermission]:
|
| """Get user permissions"""
|
| return self.db.query(UserPermission).filter(UserPermission.user_id == user_id).all()
|
|
|
| def _generate_session_token(self) -> str:
|
| """Generate a unique session token"""
|
| import secrets
|
| return secrets.token_urlsafe(32)
|
|
|