Desk-Back2 / app /db /crud.py
Fred808's picture
Upload 77 files
5111c4b verified
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete, and_, func
from sqlalchemy.orm import selectinload
from ..models.database import MenuItem, Order, Payment, User, Category, OrderItem
from ..models.orders import OrderStatus
from ..models.payment import PaymentStatus
from typing import List, Optional, Dict
from datetime import datetime
# User Operations
async def get_user_by_email(db: AsyncSession, email: str) -> Optional[User]:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def create_user(db: AsyncSession, user: User) -> User:
db.add(user)
await db.commit()
await db.refresh(user)
return user
# Menu Operations
async def get_menu_items(db: AsyncSession) -> List[MenuItem]:
result = await db.execute(
select(MenuItem).options(selectinload(MenuItem.category))
)
return result.scalars().all()
async def get_menu_item_by_id(db: AsyncSession, item_id: int) -> Optional[MenuItem]:
result = await db.execute(
select(MenuItem)
.options(selectinload(MenuItem.category))
.where(MenuItem.id == item_id)
)
return result.scalar_one_or_none()
async def create_menu_item_db(db: AsyncSession, item: MenuItem) -> MenuItem:
db.add(item)
await db.commit()
await db.refresh(item)
return item
async def update_menu_item_db(db: AsyncSession, item_id: int, item: MenuItem) -> Optional[MenuItem]:
query = update(MenuItem).where(MenuItem.id == item_id).values(**item.dict(exclude={'id'}))
await db.execute(query)
await db.commit()
return await get_menu_item_by_id(db, item_id)
async def delete_menu_item_db(db: AsyncSession, item_id: int) -> bool:
query = delete(MenuItem).where(MenuItem.id == item_id)
result = await db.execute(query)
await db.commit()
return result.rowcount > 0
async def get_categories_db(db: AsyncSession) -> List[Category]:
result = await db.execute(select(Category))
return result.scalars().all()
# Order Operations
async def create_order_db(db: AsyncSession, order: Order) -> Order:
db.add(order)
await db.commit()
await db.refresh(order)
return order
async def get_order_by_id(db: AsyncSession, order_id: int) -> Optional[Order]:
result = await db.execute(
select(Order)
.options(selectinload(Order.items))
.where(Order.id == order_id)
)
return result.scalar_one_or_none()
async def update_order_status(db: AsyncSession, order_id: int, new_status: OrderStatus) -> Order:
"""Update the status of an order."""
query = update(Order).where(Order.id == order_id).values(status=new_status)
await db.execute(query)
await db.commit()
return await get_order_by_id(db, order_id)
async def get_orders_by_status(db: AsyncSession, status: OrderStatus) -> List[Order]:
"""Get all orders with a specific status."""
result = await db.execute(
select(Order)
.options(selectinload(Order.items))
.where(Order.status == status)
.order_by(Order.created_at.desc())
)
return result.scalars().all()
async def get_orders_by_date_range(db: AsyncSession, start_date: str, end_date: str) -> List[Order]:
"""Get orders within a date range."""
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
result = await db.execute(
select(Order)
.options(selectinload(Order.items))
.where(and_(Order.created_at >= start, Order.created_at <= end))
.order_by(Order.created_at.desc())
)
return result.scalars().all()
# Payment Operations
async def create_payment_db(db: AsyncSession, payment: Payment) -> Payment:
db.add(payment)
await db.commit()
await db.refresh(payment)
return payment
async def get_payment_by_id(db: AsyncSession, payment_id: int) -> Optional[Payment]:
result = await db.execute(select(Payment).where(Payment.id == payment_id))
return result.scalar_one_or_none()
async def update_payment_status(db: AsyncSession, payment_id: int, new_status: PaymentStatus) -> Optional[Payment]:
"""Update the status of a payment transaction."""
query = (
update(Payment)
.where(Payment.id == payment_id)
.values(
status=new_status,
updated_at=datetime.utcnow()
)
)
await db.execute(query)
await db.commit()
return await get_payment_by_id(db, payment_id)
# Report Operations
async def get_sales_by_date_range(db: AsyncSession, start_date: str, end_date: str) -> List[Order]:
"""Get all sales within a date range."""
start = datetime.fromisoformat(start_date)
end = datetime.fromisoformat(end_date).replace(hour=23, minute=59, second=59)
result = await db.execute(
select(Order)
.options(selectinload(Order.items))
.where(
and_(
Order.created_at >= start,
Order.created_at <= end,
Order.status == OrderStatus.COMPLETED
)
)
.order_by(Order.created_at.desc())
)
return result.scalars().all()
async def get_top_selling_items(db: AsyncSession, date: str, limit: int = 5) -> List[Dict]:
"""Get top selling items for a specific date."""
day_start = datetime.fromisoformat(date)
day_end = day_start.replace(hour=23, minute=59, second=59)
result = await db.execute(
select(
MenuItem.name,
func.sum(OrderItem.quantity).label('total_quantity'),
func.sum(OrderItem.quantity * OrderItem.unit_price).label('total_revenue')
)
.join(OrderItem, MenuItem.id == OrderItem.menu_item_id)
.join(Order, OrderItem.order_id == Order.id)
.where(
and_(
Order.created_at >= day_start,
Order.created_at <= day_end,
Order.status == OrderStatus.COMPLETED
)
)
.group_by(MenuItem.id, MenuItem.name)
.order_by(func.sum(OrderItem.quantity).desc())
.limit(limit)
)
return [
{
"item_name": row[0],
"quantity_sold": row[1],
"total_revenue": row[2]
}
for row in result.all()
]
async def get_daily_revenue(db: AsyncSession, date: str) -> float:
"""Get total revenue for a specific date."""
day_start = datetime.fromisoformat(date)
day_end = day_start.replace(hour=23, minute=59, second=59)
result = await db.execute(
select(func.sum(Payment.amount))
.join(Order, Payment.order_id == Order.id)
.where(
and_(
Payment.created_at >= day_start,
Payment.created_at <= day_end,
Payment.status == PaymentStatus.COMPLETED
)
)
)
total = result.scalar_one_or_none()
return float(total) if total is not None else 0.0