Admin-Desk2 / app /api /products.py
Fred808's picture
Update app/api/products.py
8a44fbc verified
from fastapi import APIRouter, HTTPException, status, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional
from ..core.dependencies import get_current_active_user
from ..db.database import get_db
from ..db.models import Product, User
from ..db.schemas import ProductCreate, ProductInDB
router = APIRouter()
@router.post("/", response_model=ProductInDB)
async def create_product(
product: ProductCreate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> ProductInDB:
# Ensure user belongs to the branch they're creating the product for
if current_user.branch_id != product.branch_id and not current_user.is_superuser:
raise HTTPException(
status_code=403,
detail="You can only create products for your own branch"
)
# Create product with current user as seller
product_data = product.dict()
product_data["seller_id"] = current_user.id
db_product = Product(**product_data)
db.add(db_product)
await db.commit()
await db.refresh(db_product)
return db_product
@router.get("/", response_model=List[ProductInDB])
async def list_products(
skip: int = 0,
limit: int = 10,
category: Optional[str] = None,
branch_id: Optional[int] = Query(None, description="Filter products by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> List[ProductInDB]:
query = select(Product)
# Filter by category if provided
if category:
query = query.where(Product.category == category)
# Filter by branch if provided, otherwise use user's branch
if branch_id:
if not current_user.is_superuser and branch_id != current_user.branch_id:
raise HTTPException(
status_code=403,
detail="You can only view products from your own branch"
)
query = query.where(Product.branch_id == branch_id)
elif not current_user.is_superuser:
# Non-superusers can only see products from their branch
query = query.where(Product.branch_id == current_user.branch_id)
query = query.offset(skip).limit(limit)
result = await db.execute(query)
return result.scalars().all()
@router.get("/{product_id}", response_model=ProductInDB)
async def get_product(
product_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> ProductInDB:
stmt = select(Product).where(Product.id == product_id)
result = await db.execute(stmt)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Check if user has access to this product's branch
if not current_user.is_superuser and product.branch_id != current_user.branch_id:
raise HTTPException(status_code=403, detail="You cannot access products from other branches")
return product
@router.put("/{product_id}", response_model=ProductInDB)
async def update_product(
product_id: int,
product_update: ProductCreate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> ProductInDB:
stmt = select(Product).where(Product.id == product_id)
result = await db.execute(stmt)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Check if user has access to this product's branch
if not current_user.is_superuser and product.branch_id != current_user.branch_id:
raise HTTPException(status_code=403, detail="You cannot modify products from other branches")
# Ensure the branch isn't being changed to a different branch
if product_update.branch_id != product.branch_id:
raise HTTPException(status_code=400, detail="Cannot change product's branch")
# Update product fields
update_data = product_update.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(product, field, value)
await db.commit()
await db.refresh(product)
return product
@router.delete("/{product_id}")
async def delete_product(
product_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
stmt = select(Product).where(Product.id == product_id)
result = await db.execute(stmt)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Check if user has access to this product's branch
if not current_user.is_superuser and product.branch_id != current_user.branch_id:
raise HTTPException(status_code=403, detail="You cannot delete products from other branches")
await db.delete(product)
await db.commit()
return {"status": "success", "message": "Product deleted"}