from __future__ import annotations import asyncio from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from ..config import settings from ..models import Movement, Product, SyncQueue from . import sheets_backup from .datetime_utils import now_in_config_timezone FULL_SNAPSHOT_KEY = "full_snapshot" def sheets_sync_available() -> bool: return bool( settings.sheets_sync_enabled and settings.google_script_url.strip() and settings.google_script_token.strip() ) def now_iso() -> str: return now_in_config_timezone().replace(microsecond=0).isoformat() def product_to_dict(product: Product) -> dict: return { "id": product.id, "producto": product.producto, "precio": product.precio, "cantidad": product.cantidad, "unidad": product.unidad, "fechaCaducidad": product.fecha_caducidad, "fechaIngreso": product.fecha_ingreso, "fechaProduccion": product.fecha_produccion, "categoria": product.categoria, "caducidadEstimada": bool(product.caducidad_estimada), "notas": product.notas, "fuente": product.fuente, "stockActual": product.stock_actual, "consumidoTotal": product.consumido_total, "createdAt": product.created_at, "updatedAt": product.updated_at, } def movement_to_dict(movement: Movement) -> dict: return { "id": movement.id, "productId": movement.product_id, "producto": movement.producto, "tipo": movement.tipo, "cantidad": movement.cantidad, "unidad": movement.unidad, "notas": movement.notas, "fuente": movement.fuente, "createdAt": movement.created_at, } async def enqueue_full_sync(session: AsyncSession) -> None: if not sheets_sync_available(): return item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY) timestamp = now_iso() if item is None: item = SyncQueue( queue_key=FULL_SNAPSHOT_KEY, status="pending", attempts=0, last_error="", created_at=timestamp, updated_at=timestamp, ) session.add(item) else: item.status = "pending" item.updated_at = timestamp item.last_error = "" async def process_sync_queue_once(session_factory: async_sessionmaker, force: bool = False) -> dict: if not sheets_sync_available(): return {"ok": False, "reason": "sheets-disabled"} async with session_factory() as session: item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY) if item is None: if not force: return {"ok": True, "synced": False, "reason": "queue-empty"} timestamp = now_iso() item = SyncQueue( queue_key=FULL_SNAPSHOT_KEY, status="pending", attempts=0, last_error="", created_at=timestamp, updated_at=timestamp, ) session.add(item) await session.commit() elif item.status not in {"pending", "failed"} and not force: return {"ok": True, "synced": False, "reason": f"queue-{item.status}"} item.status = "running" item.updated_at = now_iso() await session.commit() products_result = await session.execute(select(Product).order_by(Product.producto.asc(), Product.created_at.asc())) movements_result = await session.execute(select(Movement).order_by(Movement.created_at.asc())) records = [product_to_dict(item) for item in products_result.scalars().all()] movements = [movement_to_dict(item) for item in movements_result.scalars().all()] try: await sheets_backup.replace_snapshot(records, movements) item.status = "idle" item.updated_at = now_iso() item.last_error = "" await session.commit() return {"ok": True, "synced": True, "products": len(records), "movements": len(movements)} except Exception as exc: item.status = "failed" item.attempts += 1 item.last_error = str(exc) item.updated_at = now_iso() await session.commit() return {"ok": False, "synced": False, "reason": str(exc), "attempts": item.attempts} async def sync_loop(session_factory: async_sessionmaker) -> None: while True: await asyncio.sleep(max(30, settings.sync_interval_seconds)) try: await process_sync_queue_once(session_factory) except Exception as exc: print(f"Error en cola de sincronizacion: {exc}")