File size: 4,751 Bytes
2552437
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from __future__ import annotations

import asyncio

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from ..config import settings
from ..models import Movement, Product, SyncQueue
from . import sheets_backup
from .datetime_utils import now_in_config_timezone


FULL_SNAPSHOT_KEY = "full_snapshot"


def sheets_sync_available() -> bool:
    return bool(
        settings.sheets_sync_enabled
        and settings.google_script_url.strip()
        and settings.google_script_token.strip()
    )


def now_iso() -> str:
    return now_in_config_timezone().replace(microsecond=0).isoformat()


def product_to_dict(product: Product) -> dict:
    return {
        "id": product.id,
        "producto": product.producto,
        "precio": product.precio,
        "cantidad": product.cantidad,
        "unidad": product.unidad,
        "fechaCaducidad": product.fecha_caducidad,
        "fechaIngreso": product.fecha_ingreso,
        "fechaProduccion": product.fecha_produccion,
        "categoria": product.categoria,
        "caducidadEstimada": bool(product.caducidad_estimada),
        "notas": product.notas,
        "fuente": product.fuente,
        "stockActual": product.stock_actual,
        "consumidoTotal": product.consumido_total,
        "createdAt": product.created_at,
        "updatedAt": product.updated_at,
    }


def movement_to_dict(movement: Movement) -> dict:
    return {
        "id": movement.id,
        "productId": movement.product_id,
        "producto": movement.producto,
        "tipo": movement.tipo,
        "cantidad": movement.cantidad,
        "unidad": movement.unidad,
        "notas": movement.notas,
        "fuente": movement.fuente,
        "createdAt": movement.created_at,
    }


async def enqueue_full_sync(session: AsyncSession) -> None:
    if not sheets_sync_available():
        return

    item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY)
    timestamp = now_iso()
    if item is None:
        item = SyncQueue(
            queue_key=FULL_SNAPSHOT_KEY,
            status="pending",
            attempts=0,
            last_error="",
            created_at=timestamp,
            updated_at=timestamp,
        )
        session.add(item)
    else:
        item.status = "pending"
        item.updated_at = timestamp
        item.last_error = ""


async def process_sync_queue_once(session_factory: async_sessionmaker, force: bool = False) -> dict:
    if not sheets_sync_available():
        return {"ok": False, "reason": "sheets-disabled"}

    async with session_factory() as session:
        item = await session.get(SyncQueue, FULL_SNAPSHOT_KEY)
        if item is None:
            if not force:
                return {"ok": True, "synced": False, "reason": "queue-empty"}
            timestamp = now_iso()
            item = SyncQueue(
                queue_key=FULL_SNAPSHOT_KEY,
                status="pending",
                attempts=0,
                last_error="",
                created_at=timestamp,
                updated_at=timestamp,
            )
            session.add(item)
            await session.commit()
        elif item.status not in {"pending", "failed"} and not force:
            return {"ok": True, "synced": False, "reason": f"queue-{item.status}"}

        item.status = "running"
        item.updated_at = now_iso()
        await session.commit()

        products_result = await session.execute(select(Product).order_by(Product.producto.asc(), Product.created_at.asc()))
        movements_result = await session.execute(select(Movement).order_by(Movement.created_at.asc()))
        records = [product_to_dict(item) for item in products_result.scalars().all()]
        movements = [movement_to_dict(item) for item in movements_result.scalars().all()]

        try:
            await sheets_backup.replace_snapshot(records, movements)
            item.status = "idle"
            item.updated_at = now_iso()
            item.last_error = ""
            await session.commit()
            return {"ok": True, "synced": True, "products": len(records), "movements": len(movements)}
        except Exception as exc:
            item.status = "failed"
            item.attempts += 1
            item.last_error = str(exc)
            item.updated_at = now_iso()
            await session.commit()
            return {"ok": False, "synced": False, "reason": str(exc), "attempts": item.attempts}


async def sync_loop(session_factory: async_sessionmaker) -> None:
    while True:
        await asyncio.sleep(max(30, settings.sync_interval_seconds))
        try:
            await process_sync_queue_once(session_factory)
        except Exception as exc:
            print(f"Error en cola de sincronizacion: {exc}")