Spaces:
Running
Running
File size: 4,904 Bytes
2f43b92 8713eac 2f43b92 8713eac d1d97b7 8713eac 2f43b92 a854ef9 0359012 a854ef9 022e710 0359012 022e710 2f43b92 022e710 8713eac 2f43b92 8713eac a854ef9 022e710 0359012 c936edb d1d97b7 c936edb 0359012 d1d97b7 a854ef9 d1d97b7 a854ef9 d1d97b7 a854ef9 d1d97b7 022e710 f1cd69e d1d97b7 a854ef9 f1cd69e 35e685d 0359012 35e685d 0359012 35e685d a854ef9 d1d97b7 a854ef9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import uuid
from app.core.config import settings
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy import Column, String, Boolean, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from sqlalchemy import or_
from app.core.database import Base
from app.core.security import get_password_hash, verify_password
from app.utils.utils import is_valid_email, is_valid_password
from app.core.models import AuthTokenController
from fastapi import HTTPException, status
class UserInDB(Base):
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
transcribes = relationship("TranscibeInDB", back_populates="user")
auth_tokens = relationship("AuthToken", back_populates="user")
created_at = Column(DateTime(timezone=True), server_default=func.now())
def __init__(self, username: str, email: str, hashed_password: str):
self.username = username
self.email = email
self.hashed_password = hashed_password
def data(self):
return {
"id": self.id,
"username": self.username,
"email": self.email,
"created_at": self.created_at,
}
class UserController:
UserInDB = UserInDB
def __init__(self, database):
self.db = database
def create(self, username: str, email: str, password: str, init_token: bool = True):
if not is_valid_email(email):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid Email"
)
if not is_valid_password(password):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid Password",
)
is_user_exists: Boolean = self.CheckUserIsExistsByEmailAndUsername(
email, username
)
if is_user_exists:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email or Username Already Registered",
)
self.username = username
self.email = email
self.hashed_password = get_password_hash(password)
self.db_user = UserInDB(
username=self.username,
email=self.email,
hashed_password=self.hashed_password,
)
self.db.add(self.db_user)
self.db.commit()
self.db.refresh(self.db_user)
self.user = self.db_user.data()
if init_token == False:
return
AuthTokenController(self.db).create(self.db_user.id)
def read_token(self, email: str, password: str):
self.read_by_email(email)
if not verify_password(password, self.db_user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect password")
TOKEN = AuthTokenController(self.db)
TOKEN.get_token_from_user_id(self.db_user.id)
return TOKEN.get_token()
def CheckUserIsExistsByEmailAndUsername(self, email: str, username: str):
db_user = (
self.db.query(UserInDB)
.filter(or_(UserInDB.email == email, UserInDB.username == username))
.first()
)
if db_user:
return True
return False
def read_by_email(self, email: str):
self.db_user = self.db.query(UserInDB).filter(UserInDB.email == email).first()
if not self.db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
self.user = self.db_user.data()
def read(self, user_id: uuid.UUID):
self.db_user = self.db.query(UserInDB).filter(UserInDB.id == user_id).first()
if not self.db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
self.user = self.db_user.data()
def update_password(
self, user_id: uuid.UUID, current_password: str, new_password: str
):
self.read(user_id)
if verify_password(current_password, self.db_user.hashed_password):
self.db_user.hashed_password = get_password_hash(new_password)
self.db.commit()
self.db.refresh(self.db_user)
self.user = self.db_user.data()
else:
raise HTTPException(status_code=400, detail="Incorrect password")
def delete(self, user_id: uuid.UUID):
self.read(user_id)
self.db.delete(self.db_user)
self.db.commit()
return user_id
def details(self):
return self.db_user
def detailsInJSON(self):
return self.user
|