File size: 4,904 Bytes
2f43b92
 
 
 
 
8713eac
2f43b92
8713eac
d1d97b7
8713eac
2f43b92
a854ef9
0359012
a854ef9
022e710
0359012
022e710
2f43b92
 
 
 
 
 
 
 
 
 
022e710
8713eac
2f43b92
 
 
 
 
8713eac
 
 
 
 
 
 
 
a854ef9
 
 
 
 
 
 
 
022e710
0359012
 
 
 
 
 
 
 
 
 
c936edb
d1d97b7
 
c936edb
0359012
 
 
 
d1d97b7
a854ef9
 
 
d1d97b7
 
a854ef9
 
 
 
d1d97b7
a854ef9
d1d97b7
 
 
022e710
 
 
 
f1cd69e
 
 
 
 
 
 
 
d1d97b7
 
 
 
 
 
 
 
 
a854ef9
f1cd69e
 
 
 
 
 
 
 
35e685d
 
 
0359012
 
 
35e685d
 
 
 
 
 
 
 
 
 
 
 
0359012
 
 
 
 
 
 
35e685d
a854ef9
d1d97b7
 
 
a854ef9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import uuid

from app.core.config import settings

from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, String, Boolean, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from sqlalchemy import or_
from app.core.database import Base

from app.core.security import get_password_hash, verify_password
from app.utils.utils import is_valid_email, is_valid_password

from app.core.models import AuthTokenController
from fastapi import HTTPException, status


class UserInDB(Base):
    __tablename__ = "users"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    transcribes = relationship("TranscibeInDB", back_populates="user")
    auth_tokens = relationship("AuthToken", back_populates="user")
    created_at = Column(DateTime(timezone=True), server_default=func.now())

    def __init__(self, username: str, email: str, hashed_password: str):
        self.username = username
        self.email = email
        self.hashed_password = hashed_password

    def data(self):
        return {
            "id": self.id,
            "username": self.username,
            "email": self.email,
            "created_at": self.created_at,
        }


class UserController:
    UserInDB = UserInDB

    def __init__(self, database):
        self.db = database

    def create(self, username: str, email: str, password: str, init_token: bool = True):
        if not is_valid_email(email):
            raise HTTPException(
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid Email"
            )
        if not is_valid_password(password):
            raise HTTPException(
                status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
                detail="Invalid Password",
            )

        is_user_exists: Boolean = self.CheckUserIsExistsByEmailAndUsername(
            email, username
        )
        if is_user_exists:
            raise HTTPException(
                status_code=status.HTTP_409_CONFLICT,
                detail="Email or Username Already Registered",
            )

        self.username = username
        self.email = email
        self.hashed_password = get_password_hash(password)

        self.db_user = UserInDB(
            username=self.username,
            email=self.email,
            hashed_password=self.hashed_password,
        )
        self.db.add(self.db_user)
        self.db.commit()
        self.db.refresh(self.db_user)
        self.user = self.db_user.data()

        if init_token == False:
            return
        AuthTokenController(self.db).create(self.db_user.id)

    def read_token(self, email: str, password: str):
        self.read_by_email(email)
        if not verify_password(password, self.db_user.hashed_password):
            raise HTTPException(status_code=400, detail="Incorrect password")
        TOKEN = AuthTokenController(self.db)
        TOKEN.get_token_from_user_id(self.db_user.id)
        return TOKEN.get_token()

    def CheckUserIsExistsByEmailAndUsername(self, email: str, username: str):
        db_user = (
            self.db.query(UserInDB)
            .filter(or_(UserInDB.email == email, UserInDB.username == username))
            .first()
        )
        if db_user:
            return True
        return False

    def read_by_email(self, email: str):
        self.db_user = self.db.query(UserInDB).filter(UserInDB.email == email).first()
        if not self.db_user:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
            )
        self.user = self.db_user.data()

    def read(self, user_id: uuid.UUID):
        self.db_user = self.db.query(UserInDB).filter(UserInDB.id == user_id).first()
        if not self.db_user:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
            )
        self.user = self.db_user.data()

    def update_password(
        self, user_id: uuid.UUID, current_password: str, new_password: str
    ):
        self.read(user_id)
        if verify_password(current_password, self.db_user.hashed_password):
            self.db_user.hashed_password = get_password_hash(new_password)
            self.db.commit()
            self.db.refresh(self.db_user)
            self.user = self.db_user.data()
        else:
            raise HTTPException(status_code=400, detail="Incorrect password")

    def delete(self, user_id: uuid.UUID):
        self.read(user_id)
        self.db.delete(self.db_user)
        self.db.commit()
        return user_id

    def details(self):
        return self.db_user

    def detailsInJSON(self):
        return self.user