JRNET / core /services /user_service.py
Factor Studios
Upload 96 files
6a5b8d8 verified
"""
User management and authentication services
"""
from datetime import datetime, timedelta
from typing import Optional, List, Tuple
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from core.models.user import User, UserRole, UserStatus, UserSession, UserPermission
class UserService:
def __init__(self, db: Session):
self.db = db
def create_user(self, username: str, password: str, role: UserRole = UserRole.USER) -> Tuple[bool, str, Optional[User]]:
"""
Create a new user
Returns: (success, message, user)
"""
try:
user = User(
username=username,
password_hash=User.hash_password(password),
role=role,
status=UserStatus.ACTIVE
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return True, "User created successfully", user
except IntegrityError:
self.db.rollback()
return False, "Username already exists", None
except Exception as e:
self.db.rollback()
return False, f"Error creating user: {str(e)}", None
def authenticate_user(self, username: str, password: str) -> Tuple[bool, str, Optional[User]]:
"""
Authenticate user credentials
Returns: (success, message, user)
"""
user = self.db.query(User).filter(User.username == username).first()
if not user:
return False, "Invalid username or password", None
if user.is_locked():
return False, f"Account is locked until {user.lockout_until}", None
if user.status != UserStatus.ACTIVE:
return False, f"Account is {user.status.value}", None
if not user.verify_password(password):
user.record_login_attempt(success=False)
self.db.commit()
return False, "Invalid username or password", None
user.record_login_attempt(success=True)
self.db.commit()
return True, "Authentication successful", user
def create_session(self, user: User, ip_address: str, device_info: str = None) -> UserSession:
"""Create a new session for user"""
session = UserSession(
user_id=user.id,
token=self._generate_session_token(),
ip_address=ip_address,
device_info=device_info,
expires_at=datetime.utcnow() + timedelta(days=1)
)
self.db.add(session)
self.db.commit()
self.db.refresh(session)
return session
def validate_session(self, token: str) -> Tuple[bool, str, Optional[UserSession]]:
"""Validate session token"""
session = self.db.query(UserSession).filter(UserSession.token == token).first()
if not session:
return False, "Invalid session", None
if session.expires_at < datetime.utcnow():
return False, "Session expired", None
# Update last active
session.last_active = datetime.utcnow()
self.db.commit()
return True, "Session valid", session
def get_user_permissions(self, user_id: int) -> List[UserPermission]:
"""Get user permissions"""
return self.db.query(UserPermission).filter(UserPermission.user_id == user_id).all()
def _generate_session_token(self) -> str:
"""Generate a unique session token"""
import secrets
return secrets.token_urlsafe(32)