bot_fam / python_backend /app /services /sync_service.py
JairoDanielMT's picture
Add memory, pending actions, sync queue, and Lima reminders
2552437
from __future__ import annotations
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ..config import settings
from ..models import Movement, Product, SyncQueue
from . import sheets_backup
from .datetime_utils import now_in_config_timezone
FULL_SNAPSHOT_KEY = "full_snapshot"
def sheets_sync_available() -> bool:
return bool(
settings.sheets_sync_enabled
and settings.google_script_url.strip()
and settings.google_script_token.strip()
)
def now_iso() -> str:
return now_in_config_timezone().replace(microsecond=0).isoformat()
def product_to_dict(product: Product) -> dict:
return {
"id": product.id,
"producto": product.producto,
"precio": product.precio,
"cantidad": product.cantidad,
"unidad": product.unidad,
"fechaCaducidad": product.fecha_caducidad,
"fechaIngreso": product.fecha_ingreso,
"fechaProduccion": product.fecha_produccion,
"categoria": product.categoria,
"caducidadEstimada": bool(product.caducidad_estimada),
"notas": product.notas,
"fuente": product.fuente,
"stockActual": product.stock_actual,
"consumidoTotal": product.consumido_total,
"createdAt": product.created_at,
"updatedAt": product.updated_at,
}
def movement_to_dict(movement: Movement) -> dict:
return {
"id": movement.id,
"productId": movement.product_id,
"producto": movement.producto,
"tipo": movement.tipo,
"cantidad": movement.cantidad,
"unidad": movement.unidad,
"notas": movement.notas,
"fuente": movement.fuente,
"createdAt": movement.created_at,
}
async def enqueue_full_sync(session: AsyncSession) -> None:
if not sheets_sync_available():
return
item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY)
timestamp = now_iso()
if item is None:
item = SyncQueue(
queue_key=FULL_SNAPSHOT_KEY,
status="pending",
attempts=0,
last_error="",
created_at=timestamp,
updated_at=timestamp,
)
session.add(item)
else:
item.status = "pending"
item.updated_at = timestamp
item.last_error = ""
async def process_sync_queue_once(session_factory: async_sessionmaker, force: bool = False) -> dict:
if not sheets_sync_available():
return {"ok": False, "reason": "sheets-disabled"}
async with session_factory() as session:
item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY)
if item is None:
if not force:
return {"ok": True, "synced": False, "reason": "queue-empty"}
timestamp = now_iso()
item = SyncQueue(
queue_key=FULL_SNAPSHOT_KEY,
status="pending",
attempts=0,
last_error="",
created_at=timestamp,
updated_at=timestamp,
)
session.add(item)
await session.commit()
elif item.status not in {"pending", "failed"} and not force:
return {"ok": True, "synced": False, "reason": f"queue-{item.status}"}
item.status = "running"
item.updated_at = now_iso()
await session.commit()
products_result = await session.execute(select(Product).order_by(Product.producto.asc(), Product.created_at.asc()))
movements_result = await session.execute(select(Movement).order_by(Movement.created_at.asc()))
records = [product_to_dict(item) for item in products_result.scalars().all()]
movements = [movement_to_dict(item) for item in movements_result.scalars().all()]
try:
await sheets_backup.replace_snapshot(records, movements)
item.status = "idle"
item.updated_at = now_iso()
item.last_error = ""
await session.commit()
return {"ok": True, "synced": True, "products": len(records), "movements": len(movements)}
except Exception as exc:
item.status = "failed"
item.attempts += 1
item.last_error = str(exc)
item.updated_at = now_iso()
await session.commit()
return {"ok": False, "synced": False, "reason": str(exc), "attempts": item.attempts}
async def sync_loop(session_factory: async_sessionmaker) -> None:
while True:
await asyncio.sleep(max(30, settings.sync_interval_seconds))
try:
await process_sync_queue_once(session_factory)
except Exception as exc:
print(f"Error en cola de sincronizacion: {exc}")