""" User management and authentication services """ from datetime import datetime, timedelta from typing import Optional, List, Tuple from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from core.models.user import User, UserRole, UserStatus, UserSession, UserPermission class UserService: def __init__(self, db: Session): self.db = db def create_user(self, username: str, password: str, role: UserRole = UserRole.USER) -> Tuple[bool, str, Optional[User]]: """ Create a new user Returns: (success, message, user) """ try: user = User( username=username, password_hash=User.hash_password(password), role=role, status=UserStatus.ACTIVE ) self.db.add(user) self.db.commit() self.db.refresh(user) return True, "User created successfully", user except IntegrityError: self.db.rollback() return False, "Username already exists", None except Exception as e: self.db.rollback() return False, f"Error creating user: {str(e)}", None def authenticate_user(self, username: str, password: str) -> Tuple[bool, str, Optional[User]]: """ Authenticate user credentials Returns: (success, message, user) """ user = self.db.query(User).filter(User.username == username).first() if not user: return False, "Invalid username or password", None if user.is_locked(): return False, f"Account is locked until {user.lockout_until}", None if user.status != UserStatus.ACTIVE: return False, f"Account is {user.status.value}", None if not user.verify_password(password): user.record_login_attempt(success=False) self.db.commit() return False, "Invalid username or password", None user.record_login_attempt(success=True) self.db.commit() return True, "Authentication successful", user def create_session(self, user: User, ip_address: str, device_info: str = None) -> UserSession: """Create a new session for user""" session = UserSession( user_id=user.id, token=self._generate_session_token(), ip_address=ip_address, device_info=device_info, expires_at=datetime.utcnow() + timedelta(days=1) ) self.db.add(session) self.db.commit() self.db.refresh(session) return session def validate_session(self, token: str) -> Tuple[bool, str, Optional[UserSession]]: """Validate session token""" session = self.db.query(UserSession).filter(UserSession.token == token).first() if not session: return False, "Invalid session", None if session.expires_at < datetime.utcnow(): return False, "Session expired", None # Update last active session.last_active = datetime.utcnow() self.db.commit() return True, "Session valid", session def get_user_permissions(self, user_id: int) -> List[UserPermission]: """Get user permissions""" return self.db.query(UserPermission).filter(UserPermission.user_id == user_id).all() def _generate_session_token(self) -> str: """Generate a unique session token""" import secrets return secrets.token_urlsafe(32)