Spaces:
Paused
Paused
| from fastapi import APIRouter, HTTPException, status, Depends, Query | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select | |
| from typing import List, Optional | |
| from ..core.dependencies import get_current_active_user | |
| from ..db.database import get_db | |
| from ..db.models import Order, Product, OrderItem, User | |
| from ..db.schemas import OrderCreate, OrderInDB | |
| from datetime import datetime | |
| router = APIRouter() | |
| async def create_order( | |
| order: OrderCreate, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> OrderInDB: | |
| # Ensure user belongs to the branch they're creating the order for | |
| if current_user.branch_id != order.branch_id and not current_user.is_superuser: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only create orders for your own branch" | |
| ) | |
| # Calculate total and validate products | |
| total = 0 | |
| order_items = [] | |
| for item in order.items: | |
| # Get product | |
| stmt = select(Product).where( | |
| Product.id == item.product_id, | |
| Product.branch_id == order.branch_id # Ensure product belongs to the same branch | |
| ) | |
| result = await db.execute(stmt) | |
| product = result.scalar_one_or_none() | |
| if not product: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Product {item.product_id} not found in this branch" | |
| ) | |
| if product.inventory_count < item.quantity: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Insufficient inventory for product {item.product_id}" | |
| ) | |
| # Update inventory | |
| product.inventory_count -= item.quantity | |
| total += product.price * item.quantity | |
| # Create order item | |
| order_item = OrderItem( | |
| product_id=item.product_id, | |
| quantity=item.quantity, | |
| price=product.price | |
| ) | |
| order_items.append(order_item) | |
| # Create order | |
| db_order = Order( | |
| customer_id=order.customer_id, | |
| branch_id=order.branch_id, | |
| total_amount=total, | |
| status="pending", | |
| items=order_items | |
| ) | |
| db.add(db_order) | |
| await db.commit() | |
| await db.refresh(db_order) | |
| return db_order | |
| async def list_orders( | |
| skip: int = 0, | |
| limit: int = 10, | |
| status: Optional[str] = None, | |
| branch_id: Optional[int] = Query(None, description="Filter orders by branch"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> List[OrderInDB]: | |
| query = select(Order) | |
| # Filter by status if provided | |
| if status: | |
| query = query.where(Order.status == status) | |
| # Filter by branch if provided, otherwise use user's branch | |
| if branch_id: | |
| if not current_user.is_superuser and branch_id != current_user.branch_id: | |
| raise HTTPException( | |
| status_code=403, | |
| detail="You can only view orders from your own branch" | |
| ) | |
| query = query.where(Order.branch_id == branch_id) | |
| elif not current_user.is_superuser: | |
| # Non-superusers can only see orders from their branch | |
| query = query.where(Order.branch_id == current_user.branch_id) | |
| query = query.offset(skip).limit(limit) | |
| result = await db.execute(query) | |
| return result.scalars().all() | |
| async def get_order( | |
| order_id: int, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> OrderInDB: | |
| stmt = select(Order).where(Order.id == order_id) | |
| result = await db.execute(stmt) | |
| order = result.scalar_one_or_none() | |
| if not order: | |
| raise HTTPException(status_code=404, detail="Order not found") | |
| # Check if user has access to this order's branch | |
| if not current_user.is_superuser and order.branch_id != current_user.branch_id: | |
| raise HTTPException(status_code=403, detail="You cannot access orders from other branches") | |
| return order | |
| async def update_order_status( | |
| order_id: int, | |
| status: str, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> OrderInDB: | |
| valid_statuses = ["pending", "processing", "shipped", "delivered", "cancelled"] | |
| if status not in valid_statuses: | |
| raise HTTPException(status_code=400, detail="Invalid status") | |
| stmt = select(Order).where(Order.id == order_id) | |
| result = await db.execute(stmt) | |
| order = result.scalar_one_or_none() | |
| if not order: | |
| raise HTTPException(status_code=404, detail="Order not found") | |
| # Check if user has access to this order's branch | |
| if not current_user.is_superuser and order.branch_id != current_user.branch_id: | |
| raise HTTPException(status_code=403, detail="You cannot modify orders from other branches") | |
| order.status = status | |
| order.updated_at = datetime.utcnow() | |
| await db.commit() | |
| await db.refresh(order) | |
| return order | |
| async def delete_order( | |
| order_id: int, | |
| current_user: User = Depends(get_current_active_user), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| # Get the order | |
| stmt = select(Order).where(Order.id == order_id) | |
| result = await db.execute(stmt) | |
| order = result.scalar_one_or_none() | |
| if not order: | |
| raise HTTPException(status_code=404, detail="Order not found") | |
| # Check if user has access to this order's branch | |
| if not current_user.is_superuser and order.branch_id != current_user.branch_id: | |
| raise HTTPException(status_code=403, detail="You cannot delete orders from other branches") | |
| # Restore inventory for each product | |
| for item in order.items: | |
| product_stmt = select(Product).where(Product.id == item.product_id) | |
| product_result = await db.execute(product_stmt) | |
| product = product_result.scalar_one_or_none() | |
| if product: | |
| product.inventory_count += item.quantity | |
| await db.delete(order) | |
| await db.commit() | |
| return {"status": "success", "message": "Order deleted and inventory restored"} |