Spaces:
Sleeping
Sleeping
File size: 4,751 Bytes
2552437 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | from __future__ import annotations
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ..config import settings
from ..models import Movement, Product, SyncQueue
from . import sheets_backup
from .datetime_utils import now_in_config_timezone
FULL_SNAPSHOT_KEY = "full_snapshot"
def sheets_sync_available() -> bool:
return bool(
settings.sheets_sync_enabled
and settings.google_script_url.strip()
and settings.google_script_token.strip()
)
def now_iso() -> str:
return now_in_config_timezone().replace(microsecond=0).isoformat()
def product_to_dict(product: Product) -> dict:
return {
"id": product.id,
"producto": product.producto,
"precio": product.precio,
"cantidad": product.cantidad,
"unidad": product.unidad,
"fechaCaducidad": product.fecha_caducidad,
"fechaIngreso": product.fecha_ingreso,
"fechaProduccion": product.fecha_produccion,
"categoria": product.categoria,
"caducidadEstimada": bool(product.caducidad_estimada),
"notas": product.notas,
"fuente": product.fuente,
"stockActual": product.stock_actual,
"consumidoTotal": product.consumido_total,
"createdAt": product.created_at,
"updatedAt": product.updated_at,
}
def movement_to_dict(movement: Movement) -> dict:
return {
"id": movement.id,
"productId": movement.product_id,
"producto": movement.producto,
"tipo": movement.tipo,
"cantidad": movement.cantidad,
"unidad": movement.unidad,
"notas": movement.notas,
"fuente": movement.fuente,
"createdAt": movement.created_at,
}
async def enqueue_full_sync(session: AsyncSession) -> None:
if not sheets_sync_available():
return
item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY)
timestamp = now_iso()
if item is None:
item = SyncQueue(
queue_key=FULL_SNAPSHOT_KEY,
status="pending",
attempts=0,
last_error="",
created_at=timestamp,
updated_at=timestamp,
)
session.add(item)
else:
item.status = "pending"
item.updated_at = timestamp
item.last_error = ""
async def process_sync_queue_once(session_factory: async_sessionmaker, force: bool = False) -> dict:
if not sheets_sync_available():
return {"ok": False, "reason": "sheets-disabled"}
async with session_factory() as session:
item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY)
if item is None:
if not force:
return {"ok": True, "synced": False, "reason": "queue-empty"}
timestamp = now_iso()
item = SyncQueue(
queue_key=FULL_SNAPSHOT_KEY,
status="pending",
attempts=0,
last_error="",
created_at=timestamp,
updated_at=timestamp,
)
session.add(item)
await session.commit()
elif item.status not in {"pending", "failed"} and not force:
return {"ok": True, "synced": False, "reason": f"queue-{item.status}"}
item.status = "running"
item.updated_at = now_iso()
await session.commit()
products_result = await session.execute(select(Product).order_by(Product.producto.asc(), Product.created_at.asc()))
movements_result = await session.execute(select(Movement).order_by(Movement.created_at.asc()))
records = [product_to_dict(item) for item in products_result.scalars().all()]
movements = [movement_to_dict(item) for item in movements_result.scalars().all()]
try:
await sheets_backup.replace_snapshot(records, movements)
item.status = "idle"
item.updated_at = now_iso()
item.last_error = ""
await session.commit()
return {"ok": True, "synced": True, "products": len(records), "movements": len(movements)}
except Exception as exc:
item.status = "failed"
item.attempts += 1
item.last_error = str(exc)
item.updated_at = now_iso()
await session.commit()
return {"ok": False, "synced": False, "reason": str(exc), "attempts": item.attempts}
async def sync_loop(session_factory: async_sessionmaker) -> None:
while True:
await asyncio.sleep(max(30, settings.sync_interval_seconds))
try:
await process_sync_queue_once(session_factory)
except Exception as exc:
print(f"Error en cola de sincronizacion: {exc}")
|