Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Depends, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy.future import select | |
| from sqlalchemy.orm import sessionmaker, joinedload, selectinload | |
| from collections import OrderedDict | |
| import uuid | |
| from app.db.base import * | |
| from app.core import schemas | |
| from app.db.models import models | |
| from app.core.auth import * | |
| # from app.services.email import * | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| router = APIRouter() | |
| # Create a user | |
| async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_async_db)): | |
| # Check if passwords match | |
| if user.password != user.confirm_password: | |
| raise HTTPException(status_code=400, detail="Passwords do not match") | |
| db_user = await get_user_by_email(db, email=user.email) | |
| if db_user: | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| db_user_by_phone = await get_user_by_phone(db, phone=user.phone) | |
| if db_user_by_phone: | |
| raise HTTPException(status_code=400, detail="Phone number already registered") | |
| # Create the user and add it to the database | |
| created_user = await create_user_in_db(db=db, user=user) | |
| # Generate the verification link | |
| verification_link = f"http://localhost:8000/api/v1/user/verify/{created_user.verification_token}" | |
| print(created_user.verification_token) | |
| # Send the verification email | |
| # send_verification_email(created_user.email, verification_link) | |
| return created_user | |
| async def verify_email(token: str, db: AsyncSession = Depends(get_async_db)): | |
| async with db as session: | |
| try: | |
| user = await get_verification_token(db=session, token=token) | |
| if not user: | |
| return {"message": "Invalid or expired verification link"} | |
| raise HTTPException(status_code=400, detail="Invalid or expired verification link") | |
| user.is_verified = True | |
| user.verification_token = None # Clear the token after verification | |
| # Commit changes to the database | |
| await session.commit() | |
| # Refresh the user object to reflect changes made in the database | |
| await session.refresh(user) | |
| # Return the updated user | |
| return {"message": "Email verified successfully"} | |
| except Exception as e: | |
| logging.error(f"Error verifying email: {e}") | |
| raise HTTPException(status_code=500, detail="Internal server error") | |
| # Read users with pagination | |
| async def read_users(skip: int = 0, limit: int = 10, db: AsyncSession = Depends(get_async_db),current_user: schemas.User = Depends(get_current_active_user)): | |
| users = await get_users(db, skip=skip, limit=limit) | |
| return users | |
| # Read a specific user by ID | |
| async def read_user(user_id: int, db: AsyncSession = Depends(get_async_db)): | |
| db_user = await get_user(db, user_id=user_id) | |
| if db_user is None: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return db_user | |
| # Read the current logged-in user | |
| async def read_users_me(current_user: schemas.User = Depends(get_current_active_user)): | |
| return current_user | |
| # Update a user by ID | |
| async def update_user(user_id: int, user_update: schemas.UserUpdate, db: AsyncSession = Depends(get_async_db)): | |
| async with db as session: | |
| db_user = await get_user(session, user_id=user_id) | |
| print("User ID:", id(db_user)) # Print the ID of db_user | |
| print("Session ID:", id(session)) # Print the ID of the session | |
| if db_user is None: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| if user_update.email and user_update.email != db_user.email: | |
| db_user.email = user_update.email | |
| # Update user's password if provided | |
| if user_update.password: | |
| # Securely hash the password before updating | |
| db_user.hashed_password = await get_password_hash(user_update.password) | |
| # Update user's active status | |
| db_user.is_active = user_update.is_active | |
| # Commit changes to the database | |
| await session.commit() | |
| # Refresh the user object to reflect changes made in the database | |
| await session.refresh(db_user) | |
| # Return the updated user | |
| return db_user | |
| # Delete a user by ID | |
| async def delete_user(user_id: int, db: AsyncSession = Depends(get_async_db)): | |
| async with db as session: | |
| db_user = await get_user(db, user_id=user_id) | |
| if db_user is None: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| await session.delete(db_user) | |
| await session.commit() | |
| return JSONResponse(content={"message": f"{db_user.email } has been deleted successfully"}) | |
| # User CRUD Operations | |
| async def get_user(session: AsyncSession, user_id: int): | |
| # async with db as session: | |
| # async with session.begin(): | |
| stmt = select(models.User).filter(models.User.id == user_id) | |
| result = await session.execute(stmt) | |
| return result.scalars().first() | |
| async def get_user_by_email(db: AsyncSession, email: str): | |
| async with db as session: | |
| # async with session.begin(): | |
| stmt = select(models.User).filter(models.User.email == email) | |
| result = await session.execute(stmt) | |
| return result.scalars().first() | |
| async def get_user_by_phone(db: AsyncSession, phone: str): | |
| async with db as session: | |
| stmt = select(models.User).filter(models.User.phone == phone) | |
| result = await session.execute(stmt) | |
| return result.scalars().first() | |
| async def get_users(db: AsyncSession, skip: int = 0, limit: int = 10): | |
| async with db as session: | |
| # async with session.begin(): | |
| stmt = select(models.User).offset(skip).limit(limit) | |
| result = await session.execute(stmt) | |
| users = result.unique().scalars().all() | |
| return users | |
| async def get_verification_token(db: AsyncSession, token:str): | |
| stmt = select(models.User).filter(models.User.verification_token == token) | |
| result = await db.execute(stmt) | |
| user = result.scalars().first() | |
| return user | |
| async def create_user_in_db(db: AsyncSession, user: schemas.UserCreate): | |
| async with db as session: | |
| verification_token = str(uuid.uuid4()) | |
| hashed_password = await get_password_hash(user.password) | |
| # Eagerly load the items along with the user | |
| created_user = db_user = models.User( | |
| first_name = user.first_name, | |
| surname = user.surname, | |
| email=user.email, | |
| password=hashed_password, | |
| phone = user.phone, | |
| is_verified = False, | |
| verification_token = verification_token) | |
| session.add(db_user) | |
| await session.commit() | |
| await session.refresh(db_user) | |
| stmt = select(models.User).filter(models.User.id == db_user.id) | |
| result = await session.execute(stmt) | |
| return result.scalars().first() | |