Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Any | |
| import httpx | |
| from ..config import settings | |
| def is_configured() -> bool: | |
| return bool( | |
| settings.sheets_sync_enabled | |
| and settings.google_script_url.strip() | |
| and settings.google_script_token.strip() | |
| ) | |
| async def _call(action: str, payload: dict[str, Any] | None = None, method: str = "POST") -> dict[str, Any]: | |
| if not is_configured(): | |
| raise RuntimeError("Google Sheets no esta configurado para sincronizacion.") | |
| payload = payload or {} | |
| async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: | |
| if method == "GET": | |
| params = {"action": action, "token": settings.google_script_token, **payload} | |
| response = await client.get(settings.google_script_url, params=params) | |
| else: | |
| response = await client.post( | |
| settings.google_script_url, | |
| json={"action": action, "token": settings.google_script_token, **payload}, | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| async def add_product(record: dict[str, Any]) -> dict[str, Any]: | |
| return await _call("addRecord", {"record": record}) | |
| async def consume_product(consumption: dict[str, Any]) -> dict[str, Any]: | |
| return await _call("consumeProduct", {"consumption": consumption}) | |
| async def list_products() -> dict[str, Any]: | |
| return await _call("listRecords", {"query": ""}, method="GET") | |
| async def list_movements() -> dict[str, Any]: | |
| return await _call("listMovements", method="GET") | |
| async def replace_snapshot(records: list[dict[str, Any]], movements: list[dict[str, Any]]) -> dict[str, Any]: | |
| return await _call("replaceSnapshot", {"records": records, "movements": movements}) | |