Spaces:
Running
Running
| import uuid | |
| from app.core.config import settings | |
| from sqlalchemy.dialects.postgresql import UUID | |
| from sqlalchemy import Column, String, Boolean, DateTime | |
| from sqlalchemy.orm import relationship | |
| from sqlalchemy.sql import func | |
| from sqlalchemy import or_ | |
| from app.core.database import Base | |
| from app.core.security import get_password_hash, verify_password | |
| from app.utils.utils import is_valid_email, is_valid_password | |
| from app.core.models import AuthTokenController | |
| from fastapi import HTTPException, status | |
| class UserInDB(Base): | |
| __tablename__ = "users" | |
| id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) | |
| username = Column(String, unique=True, index=True) | |
| email = Column(String, unique=True, index=True) | |
| hashed_password = Column(String) | |
| is_active = Column(Boolean, default=True) | |
| transcribes = relationship("TranscibeInDB", back_populates="user") | |
| auth_tokens = relationship("AuthToken", back_populates="user") | |
| created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
| def __init__(self, username: str, email: str, hashed_password: str): | |
| self.username = username | |
| self.email = email | |
| self.hashed_password = hashed_password | |
| def data(self): | |
| return { | |
| "id": self.id, | |
| "username": self.username, | |
| "email": self.email, | |
| "created_at": self.created_at, | |
| } | |
| class UserController: | |
| UserInDB = UserInDB | |
| def __init__(self, database): | |
| self.db = database | |
| def create(self, username: str, email: str, password: str, init_token: bool = True): | |
| if not is_valid_email(email): | |
| raise HTTPException( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid Email" | |
| ) | |
| if not is_valid_password(password): | |
| raise HTTPException( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| detail="Invalid Password", | |
| ) | |
| is_user_exists: Boolean = self.CheckUserIsExistsByEmailAndUsername( | |
| email, username | |
| ) | |
| if is_user_exists: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="Email or Username Already Registered", | |
| ) | |
| self.username = username | |
| self.email = email | |
| self.hashed_password = get_password_hash(password) | |
| self.db_user = UserInDB( | |
| username=self.username, | |
| email=self.email, | |
| hashed_password=self.hashed_password, | |
| ) | |
| self.db.add(self.db_user) | |
| self.db.commit() | |
| self.db.refresh(self.db_user) | |
| self.user = self.db_user.data() | |
| if init_token == False: | |
| return | |
| AuthTokenController(self.db).create(self.db_user.id) | |
| def read_token(self, email: str, password: str): | |
| self.read_by_email(email) | |
| if not verify_password(password, self.db_user.hashed_password): | |
| raise HTTPException(status_code=400, detail="Incorrect password") | |
| TOKEN = AuthTokenController(self.db) | |
| TOKEN.get_token_from_user_id(self.db_user.id) | |
| return TOKEN.get_token() | |
| def CheckUserIsExistsByEmailAndUsername(self, email: str, username: str): | |
| db_user = ( | |
| self.db.query(UserInDB) | |
| .filter(or_(UserInDB.email == email, UserInDB.username == username)) | |
| .first() | |
| ) | |
| if db_user: | |
| return True | |
| return False | |
| def read_by_email(self, email: str): | |
| self.db_user = self.db.query(UserInDB).filter(UserInDB.email == email).first() | |
| if not self.db_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| self.user = self.db_user.data() | |
| def read(self, user_id: uuid.UUID): | |
| self.db_user = self.db.query(UserInDB).filter(UserInDB.id == user_id).first() | |
| if not self.db_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, detail="User not found" | |
| ) | |
| self.user = self.db_user.data() | |
| def update_password( | |
| self, user_id: uuid.UUID, current_password: str, new_password: str | |
| ): | |
| self.read(user_id) | |
| if verify_password(current_password, self.db_user.hashed_password): | |
| self.db_user.hashed_password = get_password_hash(new_password) | |
| self.db.commit() | |
| self.db.refresh(self.db_user) | |
| self.user = self.db_user.data() | |
| else: | |
| raise HTTPException(status_code=400, detail="Incorrect password") | |
| def delete(self, user_id: uuid.UUID): | |
| self.read(user_id) | |
| self.db.delete(self.db_user) | |
| self.db.commit() | |
| return user_id | |
| def details(self): | |
| return self.db_user | |
| def detailsInJSON(self): | |
| return self.user | |