Admin-Desk / app /api /orders.py
Fred808's picture
Upload 56 files
3e5c7dd verified
from fastapi import APIRouter, HTTPException, status, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional
from ..core.dependencies import get_current_active_user
from ..db.database import get_db
from ..db.models import Order, Product, OrderItem, User
from ..db.schemas import OrderCreate, OrderInDB
from datetime import datetime
router = APIRouter()
@router.post("/", response_model=OrderInDB)
async def create_order(
order: OrderCreate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> OrderInDB:
# Ensure user belongs to the branch they're creating the order for
if current_user.branch_id != order.branch_id and not current_user.is_superuser:
raise HTTPException(
status_code=403,
detail="You can only create orders for your own branch"
)
# Calculate total and validate products
total = 0
order_items = []
for item in order.items:
# Get product
stmt = select(Product).where(
Product.id == item.product_id,
Product.branch_id == order.branch_id # Ensure product belongs to the same branch
)
result = await db.execute(stmt)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(
status_code=404,
detail=f"Product {item.product_id} not found in this branch"
)
if product.inventory_count < item.quantity:
raise HTTPException(
status_code=400,
detail=f"Insufficient inventory for product {item.product_id}"
)
# Update inventory
product.inventory_count -= item.quantity
total += product.price * item.quantity
# Create order item
order_item = OrderItem(
product_id=item.product_id,
quantity=item.quantity,
price=product.price
)
order_items.append(order_item)
# Create order
db_order = Order(
customer_id=order.customer_id,
branch_id=order.branch_id,
total_amount=total,
status="pending",
items=order_items
)
db.add(db_order)
await db.commit()
await db.refresh(db_order)
return db_order
@router.get("/", response_model=List[OrderInDB])
async def list_orders(
skip: int = 0,
limit: int = 10,
status: Optional[str] = None,
branch_id: Optional[int] = Query(None, description="Filter orders by branch"),
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> List[OrderInDB]:
query = select(Order)
# Filter by status if provided
if status:
query = query.where(Order.status == status)
# Filter by branch if provided, otherwise use user's branch
if branch_id:
if not current_user.is_superuser and branch_id != current_user.branch_id:
raise HTTPException(
status_code=403,
detail="You can only view orders from your own branch"
)
query = query.where(Order.branch_id == branch_id)
elif not current_user.is_superuser:
# Non-superusers can only see orders from their branch
query = query.where(Order.branch_id == current_user.branch_id)
query = query.offset(skip).limit(limit)
result = await db.execute(query)
return result.scalars().all()
@router.get("/{order_id}", response_model=OrderInDB)
async def get_order(
order_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> OrderInDB:
stmt = select(Order).where(Order.id == order_id)
result = await db.execute(stmt)
order = result.scalar_one_or_none()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
# Check if user has access to this order's branch
if not current_user.is_superuser and order.branch_id != current_user.branch_id:
raise HTTPException(status_code=403, detail="You cannot access orders from other branches")
return order
@router.put("/{order_id}/status", response_model=OrderInDB)
async def update_order_status(
order_id: int,
status: str,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
) -> OrderInDB:
valid_statuses = ["pending", "processing", "shipped", "delivered", "cancelled"]
if status not in valid_statuses:
raise HTTPException(status_code=400, detail="Invalid status")
stmt = select(Order).where(Order.id == order_id)
result = await db.execute(stmt)
order = result.scalar_one_or_none()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
# Check if user has access to this order's branch
if not current_user.is_superuser and order.branch_id != current_user.branch_id:
raise HTTPException(status_code=403, detail="You cannot modify orders from other branches")
order.status = status
order.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(order)
return order
@router.delete("/{order_id}")
async def delete_order(
order_id: int,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db)
):
# Get the order
stmt = select(Order).where(Order.id == order_id)
result = await db.execute(stmt)
order = result.scalar_one_or_none()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
# Check if user has access to this order's branch
if not current_user.is_superuser and order.branch_id != current_user.branch_id:
raise HTTPException(status_code=403, detail="You cannot delete orders from other branches")
# Restore inventory for each product
for item in order.items:
product_stmt = select(Product).where(Product.id == item.product_id)
product_result = await db.execute(product_stmt)
product = product_result.scalar_one_or_none()
if product:
product.inventory_count += item.quantity
await db.delete(order)
await db.commit()
return {"status": "success", "message": "Order deleted and inventory restored"}