Spaces:
Runtime error
Runtime error
| """ | |
| User models and authentication | |
| """ | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List | |
| from sqlalchemy import Column, Integer, String, DateTime, Boolean, Enum as SQLEnum | |
| from sqlalchemy.sql import func | |
| import enum | |
| from passlib.context import CryptContext | |
| from core.database import Base | |
| # Password hashing context | |
| pwd_context = CryptContext( | |
| schemes=["argon2"], | |
| deprecated="auto", | |
| argon2__memory_cost=65536, | |
| argon2__parallelism=4, | |
| argon2__time_cost=3 | |
| ) | |
| class UserRole(enum.Enum): | |
| ADMIN = "admin" | |
| POWER_USER = "power_user" | |
| USER = "user" | |
| GUEST = "guest" | |
| class UserStatus(enum.Enum): | |
| ACTIVE = "active" | |
| SUSPENDED = "suspended" | |
| LOCKED = "locked" | |
| PENDING = "pending" | |
| EXPIRED = "expired" | |
| class User(Base): | |
| """User model for authentication and authorization""" | |
| __tablename__ = "users" | |
| id = Column(Integer, primary_key=True, index=True) | |
| username = Column(String, unique=True, index=True, nullable=False) | |
| password_hash = Column(String, nullable=False) | |
| role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.USER) | |
| status = Column(SQLEnum(UserStatus), nullable=False, default=UserStatus.PENDING) | |
| # Account tracking | |
| created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| last_login = Column(DateTime(timezone=True), nullable=True) | |
| failed_attempts = Column(Integer, default=0) | |
| lockout_until = Column(DateTime(timezone=True), nullable=True) | |
| def verify_password(self, password: str) -> bool: | |
| """Verify password against hash""" | |
| return pwd_context.verify(password, self.password_hash) | |
| def hash_password(password: str) -> str: | |
| """Hash password using Argon2""" | |
| return pwd_context.hash(password) | |
| def is_locked(self) -> bool: | |
| """Check if account is locked""" | |
| if self.lockout_until and self.lockout_until > datetime.utcnow(): | |
| return True | |
| return False | |
| def record_login_attempt(self, success: bool): | |
| """Record login attempt and handle lockout""" | |
| if success: | |
| self.failed_attempts = 0 | |
| self.last_login = datetime.utcnow() | |
| self.lockout_until = None | |
| else: | |
| self.failed_attempts += 1 | |
| if self.failed_attempts >= 5: # Lock after 5 failed attempts | |
| self.lockout_until = datetime.utcnow() + timedelta(minutes=15) | |
| class UserSession(Base): | |
| """User session tracking""" | |
| __tablename__ = "user_sessions" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, nullable=False) | |
| token = Column(String, unique=True, index=True, nullable=False) | |
| ip_address = Column(String) | |
| device_info = Column(String) | |
| created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| expires_at = Column(DateTime(timezone=True), nullable=False) | |
| last_active = Column(DateTime(timezone=True), server_default=func.now()) | |
| class UserPermission(Base): | |
| """User specific permissions""" | |
| __tablename__ = "user_permissions" | |
| id = Column(Integer, primary_key=True, index=True) | |
| user_id = Column(Integer, nullable=False) | |
| permission_type = Column(String, nullable=False) | |
| resource_id = Column(String, nullable=True) # Optional resource identifier | |
| granted_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| granted_by = Column(Integer, nullable=True) # User ID who granted the permission | |