Spaces:
Paused
Paused
| from fastapi import APIRouter, HTTPException, status, Depends, Query | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select | |
| from typing import List, Optional | |
| from ..core.dependencies import get_current_active_user | |
| from ..db.database import get_db | |
| from ..db.models import Product, User | |
| from ..db.schemas import ProductCreate, ProductInDB | |
| router = APIRouter() | |
| async def create_product( | |
| product: ProductCreate, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> ProductInDB: | |
| # Ensure user belongs to the branch they're creating the product for | |
| if current_user.branch_id != product.branch_id and not current_user.is_superuser: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only create products for your own branch" | |
| ) | |
| # Create product with current user as seller | |
| product_data = product.dict() | |
| product_data["seller_id"] = current_user.id | |
| db_product = Product(**product_data) | |
| db.add(db_product) | |
| await db.commit() | |
| await db.refresh(db_product) | |
| return db_product | |
| async def list_products( | |
| skip: int = 0, | |
| limit: int = 10, | |
| category: Optional[str] = None, | |
| branch_id: Optional[int] = Query(None, description="Filter products by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> List[ProductInDB]: | |
| query = select(Product) | |
| # Filter by category if provided | |
| if category: | |
| query = query.where(Product.category == category) | |
| # Filter by branch if provided, otherwise use user's branch | |
| if branch_id: | |
| if not current_user.is_superuser and branch_id != current_user.branch_id: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only view products from your own branch" | |
| ) | |
| query = query.where(Product.branch_id == branch_id) | |
| elif not current_user.is_superuser: | |
| # Non-superusers can only see products from their branch | |
| query = query.where(Product.branch_id == current_user.branch_id) | |
| query = query.offset(skip).limit(limit) | |
| result = await db.execute(query) | |
| return result.scalars().all() | |
| async def get_product( | |
| product_id: int, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> ProductInDB: | |
| stmt = select(Product).where(Product.id == product_id) | |
| result = await db.execute(stmt) | |
| product = result.scalar_one_or_none() | |
| if not product: | |
| raise HTTPException(status_code=404, detail="Product not found") | |
| # Check if user has access to this product's branch | |
| if not current_user.is_superuser and product.branch_id != current_user.branch_id: | |
| raise HTTPException(status_code=403, detail="You cannot access products from other branches") | |
| return product | |
| async def update_product( | |
| product_id: int, | |
| product_update: ProductCreate, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> ProductInDB: | |
| stmt = select(Product).where(Product.id == product_id) | |
| result = await db.execute(stmt) | |
| product = result.scalar_one_or_none() | |
| if not product: | |
| raise HTTPException(status_code=404, detail="Product not found") | |
| # Check if user has access to this product's branch | |
| if not current_user.is_superuser and product.branch_id != current_user.branch_id: | |
| raise HTTPException(status_code=403, detail="You cannot modify products from other branches") | |
| # Ensure the branch isn't being changed to a different branch | |
| if product_update.branch_id != product.branch_id: | |
| raise HTTPException(status_code=400, detail="Cannot change product's branch") | |
| # Update product fields | |
| update_data = product_update.dict(exclude_unset=True) | |
| for field, value in update_data.items(): | |
| setattr(product, field, value) | |
| await db.commit() | |
| await db.refresh(product) | |
| return product | |
| async def delete_product( | |
| product_id: int, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| stmt = select(Product).where(Product.id == product_id) | |
| result = await db.execute(stmt) | |
| product = result.scalar_one_or_none() | |
| if not product: | |
| raise HTTPException(status_code=404, detail="Product not found") | |
| # Check if user has access to this product's branch | |
| if not current_user.is_superuser and product.branch_id != current_user.branch_id: | |
| raise HTTPException(status_code=403, detail="You cannot delete products from other branches") | |
| await db.delete(product) | |
| await db.commit() | |
| return {"status": "success", "message": "Product deleted"} |