Admin-Desk2 / app /services /pos_client.py
Fred808's picture
Upload pos_client.py
9b74d99 verified
from typing import List
from httpx import AsyncClient
from pydantic import BaseModel
from datetime import datetime
from ..core.config import settings
class OrderSchema(BaseModel):
id: str
branch_id: str
customer_id: str
items: List[dict]
total_amount: float
status: str
created_at: datetime
updated_at: datetime | None
async def get_orders(branch_id: str) -> List[OrderSchema]:
"""
Fetch orders from the POS system for a specific branch
Args:
branch_id: The ID of the branch to fetch orders for
Returns:
List[OrderSchema]: A list of orders from the POS system
"""
async with AsyncClient() as client:
response = await client.get(
f"{settings.POS_API_URL}/internal/orders",
params={"branch_id": branch_id},
headers={"Authorization": f"Bearer {settings.SERVICE_TOKEN}"}
)
response.raise_for_status()
return [OrderSchema(**order) for order in response.json()]