Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| from sqlalchemy import select | |
| from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker | |
| from ..config import settings | |
| from ..models import Movement, Product, SyncQueue | |
| from . import sheets_backup | |
| from .datetime_utils import now_in_config_timezone | |
| FULL_SNAPSHOT_KEY = "full_snapshot" | |
| def sheets_sync_available() -> bool: | |
| return bool( | |
| settings.sheets_sync_enabled | |
| and settings.google_script_url.strip() | |
| and settings.google_script_token.strip() | |
| ) | |
| def now_iso() -> str: | |
| return now_in_config_timezone().replace(microsecond=0).isoformat() | |
| def product_to_dict(product: Product) -> dict: | |
| return { | |
| "id": product.id, | |
| "producto": product.producto, | |
| "precio": product.precio, | |
| "cantidad": product.cantidad, | |
| "unidad": product.unidad, | |
| "fechaCaducidad": product.fecha_caducidad, | |
| "fechaIngreso": product.fecha_ingreso, | |
| "fechaProduccion": product.fecha_produccion, | |
| "categoria": product.categoria, | |
| "caducidadEstimada": bool(product.caducidad_estimada), | |
| "notas": product.notas, | |
| "fuente": product.fuente, | |
| "stockActual": product.stock_actual, | |
| "consumidoTotal": product.consumido_total, | |
| "createdAt": product.created_at, | |
| "updatedAt": product.updated_at, | |
| } | |
| def movement_to_dict(movement: Movement) -> dict: | |
| return { | |
| "id": movement.id, | |
| "productId": movement.product_id, | |
| "producto": movement.producto, | |
| "tipo": movement.tipo, | |
| "cantidad": movement.cantidad, | |
| "unidad": movement.unidad, | |
| "notas": movement.notas, | |
| "fuente": movement.fuente, | |
| "createdAt": movement.created_at, | |
| } | |
| async def enqueue_full_sync(session: AsyncSession) -> None: | |
| if not sheets_sync_available(): | |
| return | |
| item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY) | |
| timestamp = now_iso() | |
| if item is None: | |
| item = SyncQueue( | |
| queue_key=FULL_SNAPSHOT_KEY, | |
| status="pending", | |
| attempts=0, | |
| last_error="", | |
| created_at=timestamp, | |
| updated_at=timestamp, | |
| ) | |
| session.add(item) | |
| else: | |
| item.status = "pending" | |
| item.updated_at = timestamp | |
| item.last_error = "" | |
| async def process_sync_queue_once(session_factory: async_sessionmaker, force: bool = False) -> dict: | |
| if not sheets_sync_available(): | |
| return {"ok": False, "reason": "sheets-disabled"} | |
| async with session_factory() as session: | |
| item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY) | |
| if item is None: | |
| if not force: | |
| return {"ok": True, "synced": False, "reason": "queue-empty"} | |
| timestamp = now_iso() | |
| item = SyncQueue( | |
| queue_key=FULL_SNAPSHOT_KEY, | |
| status="pending", | |
| attempts=0, | |
| last_error="", | |
| created_at=timestamp, | |
| updated_at=timestamp, | |
| ) | |
| session.add(item) | |
| await session.commit() | |
| elif item.status not in {"pending", "failed"} and not force: | |
| return {"ok": True, "synced": False, "reason": f"queue-{item.status}"} | |
| item.status = "running" | |
| item.updated_at = now_iso() | |
| await session.commit() | |
| products_result = await session.execute(select(Product).order_by(Product.producto.asc(), Product.created_at.asc())) | |
| movements_result = await session.execute(select(Movement).order_by(Movement.created_at.asc())) | |
| records = [product_to_dict(item) for item in products_result.scalars().all()] | |
| movements = [movement_to_dict(item) for item in movements_result.scalars().all()] | |
| try: | |
| await sheets_backup.replace_snapshot(records, movements) | |
| item.status = "idle" | |
| item.updated_at = now_iso() | |
| item.last_error = "" | |
| await session.commit() | |
| return {"ok": True, "synced": True, "products": len(records), "movements": len(movements)} | |
| except Exception as exc: | |
| item.status = "failed" | |
| item.attempts += 1 | |
| item.last_error = str(exc) | |
| item.updated_at = now_iso() | |
| await session.commit() | |
| return {"ok": False, "synced": False, "reason": str(exc), "attempts": item.attempts} | |
| async def sync_loop(session_factory: async_sessionmaker) -> None: | |
| while True: | |
| await asyncio.sleep(max(30, settings.sync_interval_seconds)) | |
| try: | |
| await process_sync_queue_once(session_factory) | |
| except Exception as exc: | |
| print(f"Error en cola de sincronizacion: {exc}") | |