| """
|
| User models and authentication
|
| """
|
| from datetime import datetime, timedelta
|
| from typing import Optional, List
|
| from sqlalchemy import Column, Integer, String, DateTime, Boolean, Enum as SQLEnum
|
| from sqlalchemy.sql import func
|
| import enum
|
| from passlib.context import CryptContext
|
| from core.database import Base
|
|
|
|
|
| pwd_context = CryptContext(
|
| schemes=["argon2"],
|
| deprecated="auto",
|
| argon2__memory_cost=65536,
|
| argon2__parallelism=4,
|
| argon2__time_cost=3
|
| )
|
|
|
| class UserRole(enum.Enum):
|
| ADMIN = "admin"
|
| POWER_USER = "power_user"
|
| USER = "user"
|
| GUEST = "guest"
|
|
|
| class UserStatus(enum.Enum):
|
| ACTIVE = "active"
|
| SUSPENDED = "suspended"
|
| LOCKED = "locked"
|
| PENDING = "pending"
|
| EXPIRED = "expired"
|
|
|
| class User(Base):
|
| """User model for authentication and authorization"""
|
| __tablename__ = "users"
|
|
|
| id = Column(Integer, primary_key=True, index=True)
|
| username = Column(String, unique=True, index=True, nullable=False)
|
| password_hash = Column(String, nullable=False)
|
| role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.USER)
|
| status = Column(SQLEnum(UserStatus), nullable=False, default=UserStatus.PENDING)
|
|
|
|
|
| created_at = Column(DateTime(timezone=True), server_default=func.now())
|
| last_login = Column(DateTime(timezone=True), nullable=True)
|
| failed_attempts = Column(Integer, default=0)
|
| lockout_until = Column(DateTime(timezone=True), nullable=True)
|
|
|
| def verify_password(self, password: str) -> bool:
|
| """Verify password against hash"""
|
| return pwd_context.verify(password, self.password_hash)
|
|
|
| @staticmethod
|
| def hash_password(password: str) -> str:
|
| """Hash password using Argon2"""
|
| return pwd_context.hash(password)
|
|
|
| def is_locked(self) -> bool:
|
| """Check if account is locked"""
|
| if self.lockout_until and self.lockout_until > datetime.utcnow():
|
| return True
|
| return False
|
|
|
| def record_login_attempt(self, success: bool):
|
| """Record login attempt and handle lockout"""
|
| if success:
|
| self.failed_attempts = 0
|
| self.last_login = datetime.utcnow()
|
| self.lockout_until = None
|
| else:
|
| self.failed_attempts += 1
|
| if self.failed_attempts >= 5:
|
| self.lockout_until = datetime.utcnow() + timedelta(minutes=15)
|
|
|
| class UserSession(Base):
|
| """User session tracking"""
|
| __tablename__ = "user_sessions"
|
|
|
| id = Column(Integer, primary_key=True, index=True)
|
| user_id = Column(Integer, nullable=False)
|
| token = Column(String, unique=True, index=True, nullable=False)
|
| ip_address = Column(String)
|
| device_info = Column(String)
|
| created_at = Column(DateTime(timezone=True), server_default=func.now())
|
| expires_at = Column(DateTime(timezone=True), nullable=False)
|
| last_active = Column(DateTime(timezone=True), server_default=func.now())
|
|
|
| class UserPermission(Base):
|
| """User specific permissions"""
|
| __tablename__ = "user_permissions"
|
|
|
| id = Column(Integer, primary_key=True, index=True)
|
| user_id = Column(Integer, nullable=False)
|
| permission_type = Column(String, nullable=False)
|
| resource_id = Column(String, nullable=True)
|
| granted_at = Column(DateTime(timezone=True), server_default=func.now())
|
| granted_by = Column(Integer, nullable=True)
|
|
|