JRNET / core /models /user.py
Factor Studios
Upload 96 files
6a5b8d8 verified
"""
User models and authentication
"""
from datetime import datetime, timedelta
from typing import Optional, List
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Enum as SQLEnum
from sqlalchemy.sql import func
import enum
from passlib.context import CryptContext
from core.database import Base
# Password hashing context
pwd_context = CryptContext(
schemes=["argon2"],
deprecated="auto",
argon2__memory_cost=65536,
argon2__parallelism=4,
argon2__time_cost=3
)
class UserRole(enum.Enum):
ADMIN = "admin"
POWER_USER = "power_user"
USER = "user"
GUEST = "guest"
class UserStatus(enum.Enum):
ACTIVE = "active"
SUSPENDED = "suspended"
LOCKED = "locked"
PENDING = "pending"
EXPIRED = "expired"
class User(Base):
"""User model for authentication and authorization"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True, nullable=False)
password_hash = Column(String, nullable=False)
role = Column(SQLEnum(UserRole), nullable=False, default=UserRole.USER)
status = Column(SQLEnum(UserStatus), nullable=False, default=UserStatus.PENDING)
# Account tracking
created_at = Column(DateTime(timezone=True), server_default=func.now())
last_login = Column(DateTime(timezone=True), nullable=True)
failed_attempts = Column(Integer, default=0)
lockout_until = Column(DateTime(timezone=True), nullable=True)
def verify_password(self, password: str) -> bool:
"""Verify password against hash"""
return pwd_context.verify(password, self.password_hash)
@staticmethod
def hash_password(password: str) -> str:
"""Hash password using Argon2"""
return pwd_context.hash(password)
def is_locked(self) -> bool:
"""Check if account is locked"""
if self.lockout_until and self.lockout_until > datetime.utcnow():
return True
return False
def record_login_attempt(self, success: bool):
"""Record login attempt and handle lockout"""
if success:
self.failed_attempts = 0
self.last_login = datetime.utcnow()
self.lockout_until = None
else:
self.failed_attempts += 1
if self.failed_attempts >= 5: # Lock after 5 failed attempts
self.lockout_until = datetime.utcnow() + timedelta(minutes=15)
class UserSession(Base):
"""User session tracking"""
__tablename__ = "user_sessions"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, nullable=False)
token = Column(String, unique=True, index=True, nullable=False)
ip_address = Column(String)
device_info = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
expires_at = Column(DateTime(timezone=True), nullable=False)
last_active = Column(DateTime(timezone=True), server_default=func.now())
class UserPermission(Base):
"""User specific permissions"""
__tablename__ = "user_permissions"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, nullable=False)
permission_type = Column(String, nullable=False)
resource_id = Column(String, nullable=True) # Optional resource identifier
granted_at = Column(DateTime(timezone=True), server_default=func.now())
granted_by = Column(Integer, nullable=True) # User ID who granted the permission