Spaces:
Runtime error
Runtime error
| """ | |
| User management and authentication services | |
| """ | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Tuple | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy.exc import IntegrityError | |
| from core.models.user import User, UserRole, UserStatus, UserSession, UserPermission | |
| class UserService: | |
| def __init__(self, db: Session): | |
| self.db = db | |
| def create_user(self, username: str, password: str, role: UserRole = UserRole.USER) -> Tuple[bool, str, Optional[User]]: | |
| """ | |
| Create a new user | |
| Returns: (success, message, user) | |
| """ | |
| try: | |
| user = User( | |
| username=username, | |
| password_hash=User.hash_password(password), | |
| role=role, | |
| status=UserStatus.ACTIVE | |
| ) | |
| self.db.add(user) | |
| self.db.commit() | |
| self.db.refresh(user) | |
| return True, "User created successfully", user | |
| except IntegrityError: | |
| self.db.rollback() | |
| return False, "Username already exists", None | |
| except Exception as e: | |
| self.db.rollback() | |
| return False, f"Error creating user: {str(e)}", None | |
| def authenticate_user(self, username: str, password: str) -> Tuple[bool, str, Optional[User]]: | |
| """ | |
| Authenticate user credentials | |
| Returns: (success, message, user) | |
| """ | |
| user = self.db.query(User).filter(User.username == username).first() | |
| if not user: | |
| return False, "Invalid username or password", None | |
| if user.is_locked(): | |
| return False, f"Account is locked until {user.lockout_until}", None | |
| if user.status != UserStatus.ACTIVE: | |
| return False, f"Account is {user.status.value}", None | |
| if not user.verify_password(password): | |
| user.record_login_attempt(success=False) | |
| self.db.commit() | |
| return False, "Invalid username or password", None | |
| user.record_login_attempt(success=True) | |
| self.db.commit() | |
| return True, "Authentication successful", user | |
| def create_session(self, user: User, ip_address: str, device_info: str = None) -> UserSession: | |
| """Create a new session for user""" | |
| session = UserSession( | |
| user_id=user.id, | |
| token=self._generate_session_token(), | |
| ip_address=ip_address, | |
| device_info=device_info, | |
| expires_at=datetime.utcnow() + timedelta(days=1) | |
| ) | |
| self.db.add(session) | |
| self.db.commit() | |
| self.db.refresh(session) | |
| return session | |
| def validate_session(self, token: str) -> Tuple[bool, str, Optional[UserSession]]: | |
| """Validate session token""" | |
| session = self.db.query(UserSession).filter(UserSession.token == token).first() | |
| if not session: | |
| return False, "Invalid session", None | |
| if session.expires_at < datetime.utcnow(): | |
| return False, "Session expired", None | |
| # Update last active | |
| session.last_active = datetime.utcnow() | |
| self.db.commit() | |
| return True, "Session valid", session | |
| def get_user_permissions(self, user_id: int) -> List[UserPermission]: | |
| """Get user permissions""" | |
| return self.db.query(UserPermission).filter(UserPermission.user_id == user_id).all() | |
| def _generate_session_token(self) -> str: | |
| """Generate a unique session token""" | |
| import secrets | |
| return secrets.token_urlsafe(32) | |