File size: 2,749 Bytes
04a921d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""Authentication service."""
from datetime import datetime
from uuid import UUID

from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select

from src.models.user import User
from src.schemas.user import UserCreate, UserLogin, UserResponse
from src.utils.security import (
    create_access_token,
    hash_password,
    verify_password,
)


class AuthService:
    """Service for user authentication operations."""

    def __init__(self, db: AsyncSession) -> None:
        """Initialize the auth service."""
        self.db = db

    async def register(
        self,
        data: UserCreate,
    ) -> tuple[User, str, datetime]:
        """Register a new user."""
        # Check if email already exists
        result = await self.db.execute(
            select(User).where(User.email == data.email)
        )
        existing_user = result.scalar_one_or_none()
        if existing_user:
            raise ValueError("Email already registered")

        # Create new user
        user = User(
            email=data.email,
            hashed_password=hash_password(data.password),
        )
        self.db.add(user)
        await self.db.commit()
        await self.db.refresh(user)

        # Create access token
        token, expires_at = create_access_token({
            "user_id": str(user.id),
            "email": user.email,
        })

        return user, token, expires_at

    async def login(
        self,
        data: UserLogin,
    ) -> tuple[User, str, datetime]:
        """Login an existing user."""
        # Find user by email
        result = await self.db.execute(
            select(User).where(User.email == data.email)
        )
        user = result.scalar_one_or_none()
        if not user:
            raise ValueError("Invalid credentials")

        # Verify password
        if not verify_password(data.password, user.hashed_password):
            raise ValueError("Invalid credentials")

        # Create access token
        token, expires_at = create_access_token({
            "user_id": str(user.id),
            "email": user.email,
        })

        return user, token, expires_at

    async def get_user_by_id(self, user_id: UUID) -> User | None:
        """Get a user by ID."""
        result = await self.db.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

    def create_auth_response(
        self, 
        user: User, 
        token: str, 
        expires_at: datetime
    ) -> dict:
        """Create authentication response dict matching AuthResponse schema."""
        return {
            "user": UserResponse.model_validate(user),
            "token": token,
            "expires_at": expires_at,
        }