""" User models and authentication """ from datetime import datetime, timedelta from typing import Optional, List from sqlalchemy import Column, Integer, String, DateTime, Boolean, Enum as SQLEnum from sqlalchemy.sql import func import enum from passlib.context import CryptContext from core.database import Base # Password hashing context pwd_context = CryptContext( schemes=["argon2"], deprecated="auto", argon2__memory_cost=65536, argon2__parallelism=4, argon2__time_cost=3 ) class UserRole(enum.Enum): ADMIN = "admin" POWER_USER = "power_user" USER = "user" GUEST = "guest" class UserStatus(enum.Enum): ACTIVE = "active" SUSPENDED = "suspended" LOCKED = "locked" PENDING = "pending" EXPIRED = "expired" class User(Base): """User model for authentication and authorization""" __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) username = Column(String, unique=True, index=True, nullable=False) password_hash = Column(String, nullable=False) role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.USER) status = Column(SQLEnum(UserStatus), nullable=False, default=UserStatus.PENDING) # Account tracking created_at = Column(DateTime(timezone=True), server_default=func.now()) last_login = Column(DateTime(timezone=True), nullable=True) failed_attempts = Column(Integer, default=0) lockout_until = Column(DateTime(timezone=True), nullable=True) def verify_password(self, password: str) -> bool: """Verify password against hash""" return pwd_context.verify(password, self.password_hash) @staticmethod def hash_password(password: str) -> str: """Hash password using Argon2""" return pwd_context.hash(password) def is_locked(self) -> bool: """Check if account is locked""" if self.lockout_until and self.lockout_until > datetime.utcnow(): return True return False def record_login_attempt(self, success: bool): """Record login attempt and handle lockout""" if success: self.failed_attempts = 0 self.last_login = datetime.utcnow() self.lockout_until = None else: self.failed_attempts += 1 if self.failed_attempts >= 5: # Lock after 5 failed attempts self.lockout_until = datetime.utcnow() + timedelta(minutes=15) class UserSession(Base): """User session tracking""" __tablename__ = "user_sessions" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, nullable=False) token = Column(String, unique=True, index=True, nullable=False) ip_address = Column(String) device_info = Column(String) created_at = Column(DateTime(timezone=True), server_default=func.now()) expires_at = Column(DateTime(timezone=True), nullable=False) last_active = Column(DateTime(timezone=True), server_default=func.now()) class UserPermission(Base): """User specific permissions""" __tablename__ = "user_permissions" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer, nullable=False) permission_type = Column(String, nullable=False) resource_id = Column(String, nullable=True) # Optional resource identifier granted_at = Column(DateTime(timezone=True), server_default=func.now()) granted_by = Column(Integer, nullable=True) # User ID who granted the permission