File size: 3,718 Bytes
6a5b8d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""

User management and authentication services

"""
from datetime import datetime, timedelta
from typing import Optional, List, Tuple
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from core.models.user import User, UserRole, UserStatus, UserSession, UserPermission

class UserService:
    def __init__(self, db: Session):
        self.db = db

    def create_user(self, username: str, password: str, role: UserRole = UserRole.USER) -> Tuple[bool, str, Optional[User]]:
        """

        Create a new user

        Returns: (success, message, user)

        """
        try:
            user = User(
                username=username,
                password_hash=User.hash_password(password),
                role=role,
                status=UserStatus.ACTIVE
            )
            self.db.add(user)
            self.db.commit()
            self.db.refresh(user)
            return True, "User created successfully", user
        except IntegrityError:
            self.db.rollback()
            return False, "Username already exists", None
        except Exception as e:
            self.db.rollback()
            return False, f"Error creating user: {str(e)}", None

    def authenticate_user(self, username: str, password: str) -> Tuple[bool, str, Optional[User]]:
        """

        Authenticate user credentials

        Returns: (success, message, user)

        """
        user = self.db.query(User).filter(User.username == username).first()
        
        if not user:
            return False, "Invalid username or password", None
            
        if user.is_locked():
            return False, f"Account is locked until {user.lockout_until}", None
            
        if user.status != UserStatus.ACTIVE:
            return False, f"Account is {user.status.value}", None
            
        if not user.verify_password(password):
            user.record_login_attempt(success=False)
            self.db.commit()
            return False, "Invalid username or password", None
            
        user.record_login_attempt(success=True)
        self.db.commit()
        return True, "Authentication successful", user

    def create_session(self, user: User, ip_address: str, device_info: str = None) -> UserSession:
        """Create a new session for user"""
        session = UserSession(
            user_id=user.id,
            token=self._generate_session_token(),
            ip_address=ip_address,
            device_info=device_info,
            expires_at=datetime.utcnow() + timedelta(days=1)
        )
        self.db.add(session)
        self.db.commit()
        self.db.refresh(session)
        return session

    def validate_session(self, token: str) -> Tuple[bool, str, Optional[UserSession]]:
        """Validate session token"""
        session = self.db.query(UserSession).filter(UserSession.token == token).first()
        
        if not session:
            return False, "Invalid session", None
            
        if session.expires_at < datetime.utcnow():
            return False, "Session expired", None
            
        # Update last active
        session.last_active = datetime.utcnow()
        self.db.commit()
        
        return True, "Session valid", session

    def get_user_permissions(self, user_id: int) -> List[UserPermission]:
        """Get user permissions"""
        return self.db.query(UserPermission).filter(UserPermission.user_id == user_id).all()

    def _generate_session_token(self) -> str:
        """Generate a unique session token"""
        import secrets
        return secrets.token_urlsafe(32)