Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import uuid | |
| from datetime import date, datetime | |
| import unicodedata | |
| from sqlalchemy import delete, select | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from ..models import Movement, Product, TelegramUser | |
| from .datetime_utils import NO_EXPIRY_DATE, now_in_config_timezone, today_in_config_timezone | |
| from .expiry_rules import estimate_expiry_for_category | |
| from . import sheets_backup | |
| from .sync_service import enqueue_full_sync, sheets_sync_available | |
| def now_iso() -> str: | |
| return datetime.utcnow().replace(microsecond=0).isoformat() | |
| def normalize_product_payload(payload: dict) -> dict: | |
| normalized = dict(payload) | |
| today = today_in_config_timezone() | |
| category = str(normalized.get("categoria", "") or "").strip().lower() | |
| normalized["categoria"] = category | |
| normalized["caducidadEstimada"] = bool(normalized.get("caducidadEstimada", False)) | |
| expiry = str(normalized.get("fechaCaducidad", "") or "").strip() | |
| if not expiry or expiry.lower() in {"no-caduca", "no caduca", "sin-caducidad", "sin caducidad"}: | |
| estimated = estimate_expiry_for_category(category) if category else "" | |
| if estimated: | |
| normalized["fechaCaducidad"] = estimated | |
| normalized["caducidadEstimada"] = estimated != NO_EXPIRY_DATE | |
| notes = str(normalized.get("notas", "") or "").strip() | |
| extra = ( | |
| f"Fecha de caducidad estimada por categoria {category}." | |
| if estimated != NO_EXPIRY_DATE | |
| else "Producto sin caducidad segun categoria." | |
| ) | |
| normalized["notas"] = f"{notes} {extra}".strip() if notes else extra | |
| else: | |
| normalized["fechaCaducidad"] = NO_EXPIRY_DATE | |
| notes = str(normalized.get("notas", "") or "").strip() | |
| extra = "Producto sin caducidad declarada." | |
| normalized["notas"] = f"{notes} {extra}".strip() if notes else extra | |
| else: | |
| normalized["fechaCaducidad"] = expiry | |
| ingreso = str(normalized.get("fechaIngreso", "") or "").strip() | |
| normalized["fechaIngreso"] = ingreso or today | |
| produccion = str(normalized.get("fechaProduccion", "") or "").strip() | |
| if not produccion: | |
| normalized["fechaProduccion"] = today | |
| notes = str(normalized.get("notas", "") or "").strip() | |
| extra = "Fecha de produccion asumida como hoy por dato faltante." | |
| normalized["notas"] = f"{notes} {extra}".strip() if notes else extra | |
| else: | |
| normalized["fechaProduccion"] = produccion | |
| return normalized | |
| def product_to_dict(product: Product) -> dict: | |
| return { | |
| "id": product.id, | |
| "producto": product.producto, | |
| "precio": product.precio, | |
| "cantidad": product.cantidad, | |
| "unidad": product.unidad, | |
| "fechaCaducidad": product.fecha_caducidad, | |
| "fechaIngreso": product.fecha_ingreso, | |
| "fechaProduccion": product.fecha_produccion, | |
| "categoria": product.categoria, | |
| "caducidadEstimada": bool(product.caducidad_estimada), | |
| "notas": product.notas, | |
| "fuente": product.fuente, | |
| "stockActual": product.stock_actual, | |
| "consumidoTotal": product.consumido_total, | |
| "createdAt": product.created_at, | |
| "updatedAt": product.updated_at, | |
| } | |
| def movement_to_dict(movement: Movement) -> dict: | |
| return { | |
| "id": movement.id, | |
| "productId": movement.product_id, | |
| "producto": movement.producto, | |
| "tipo": movement.tipo, | |
| "cantidad": movement.cantidad, | |
| "unidad": movement.unidad, | |
| "notas": movement.notas, | |
| "fuente": movement.fuente, | |
| "createdAt": movement.created_at, | |
| } | |
| def normalize_product_key(name: str) -> str: | |
| text = unicodedata.normalize("NFKD", name or "").encode("ascii", "ignore").decode("ascii") | |
| text = text.lower() | |
| text = "".join(char if char.isalnum() or char.isspace() else " " for char in text) | |
| return " ".join(text.split()) | |
| def aggregate_product_records(records: list[dict]) -> list[dict]: | |
| grouped: dict[tuple[str, str, str], dict] = {} | |
| for record in records: | |
| key = ( | |
| normalize_product_key(record.get("producto", "")), | |
| str(record.get("fechaCaducidad", "") or ""), | |
| str(record.get("unidad", "") or "unidad").lower(), | |
| ) | |
| current = grouped.get(key) | |
| if current is None: | |
| grouped[key] = dict(record) | |
| continue | |
| current["cantidad"] = float(current.get("cantidad", 0) or 0) + float(record.get("cantidad", 0) or 0) | |
| current["stockActual"] = float(current.get("stockActual", 0) or 0) + float(record.get("stockActual", 0) or 0) | |
| current["consumidoTotal"] = float(current.get("consumidoTotal", 0) or 0) + float(record.get("consumidoTotal", 0) or 0) | |
| current["precio"] = max(float(current.get("precio", 0) or 0), float(record.get("precio", 0) or 0)) | |
| current["caducidadEstimada"] = bool(current.get("caducidadEstimada", False) or record.get("caducidadEstimada", False)) | |
| existing_notes = str(current.get("notas", "") or "").strip() | |
| incoming_notes = str(record.get("notas", "") or "").strip() | |
| if incoming_notes and incoming_notes not in existing_notes: | |
| current["notas"] = f"{existing_notes} | {incoming_notes}".strip(" |") | |
| if len(str(record.get("producto", ""))) > len(str(current.get("producto", ""))): | |
| current["producto"] = record.get("producto", current["producto"]) | |
| if str(record.get("updatedAt", "")) > str(current.get("updatedAt", "")): | |
| current["updatedAt"] = record.get("updatedAt", current.get("updatedAt", "")) | |
| return sorted( | |
| grouped.values(), | |
| key=lambda item: ( | |
| item.get("fechaCaducidad", ""), | |
| normalize_product_key(item.get("producto", "")), | |
| ), | |
| ) | |
| async def register_telegram_user(session: AsyncSession, chat_id: str, username: str = "", first_name: str = "", last_name: str = "") -> None: | |
| user = await session.get(TelegramUser, str(chat_id)) | |
| if user is None: | |
| user = TelegramUser( | |
| chat_id=str(chat_id), | |
| username=username or "", | |
| first_name=first_name or "", | |
| last_name=last_name or "", | |
| is_active=1, | |
| updated_at=now_iso(), | |
| ) | |
| session.add(user) | |
| else: | |
| user.username = username or "" | |
| user.first_name = first_name or "" | |
| user.last_name = last_name or "" | |
| user.is_active = 1 | |
| user.updated_at = now_iso() | |
| await session.commit() | |
| async def list_active_users(session: AsyncSession) -> list[TelegramUser]: | |
| result = await session.execute(select(TelegramUser).where(TelegramUser.is_active == 1)) | |
| return list(result.scalars().all()) | |
| async def create_product(session: AsyncSession, payload: dict) -> dict: | |
| payload = normalize_product_payload(payload) | |
| timestamp = now_iso() | |
| product = Product( | |
| id=str(uuid.uuid4()), | |
| producto=payload["producto"], | |
| precio=float(payload["precio"]), | |
| cantidad=float(payload["cantidad"]), | |
| unidad=payload["unidad"], | |
| fecha_caducidad=payload["fechaCaducidad"], | |
| fecha_ingreso=payload["fechaIngreso"], | |
| fecha_produccion=payload["fechaProduccion"], | |
| categoria=payload.get("categoria", ""), | |
| caducidad_estimada=1 if payload.get("caducidadEstimada", False) else 0, | |
| notas=payload.get("notas", ""), | |
| fuente=payload.get("fuente", "web"), | |
| stock_actual=float(payload["cantidad"]), | |
| consumido_total=0, | |
| created_at=timestamp, | |
| updated_at=timestamp, | |
| ) | |
| movement = Movement( | |
| id=str(uuid.uuid4()), | |
| product_id=product.id, | |
| producto=product.producto, | |
| tipo="ingreso", | |
| cantidad=product.cantidad, | |
| unidad=product.unidad, | |
| notas=product.notas, | |
| fuente=product.fuente, | |
| created_at=timestamp, | |
| ) | |
| session.add(product) | |
| session.add(movement) | |
| await enqueue_full_sync(session) | |
| await session.commit() | |
| return product_to_dict(product) | |
| async def list_products(session: AsyncSession, query: str = "") -> list[dict]: | |
| result = await session.execute(select(Product).order_by(Product.fecha_caducidad.asc(), Product.producto.asc())) | |
| products = list(result.scalars().all()) | |
| if query: | |
| needle = query.lower().strip() | |
| products = [p for p in products if needle in p.producto.lower() or needle in p.notas.lower()] | |
| return [product_to_dict(product) for product in products] | |
| async def list_movements_local(session: AsyncSession) -> list[dict]: | |
| result = await session.execute(select(Movement).order_by(Movement.created_at.desc())) | |
| return [movement_to_dict(item) for item in result.scalars().all()] | |
| async def consume_product(session: AsyncSession, payload: dict) -> dict: | |
| result = await session.execute( | |
| select(Product) | |
| .where(Product.producto.ilike(payload["producto"]), Product.stock_actual > 0) | |
| .order_by(Product.fecha_caducidad.asc(), Product.created_at.asc()) | |
| .limit(1) | |
| ) | |
| product = result.scalar_one_or_none() | |
| if product is None: | |
| raise ValueError(f'No existe stock disponible para "{payload["producto"]}".') | |
| if product.unidad.lower() != payload["unidad"].lower(): | |
| raise ValueError(f"La unidad esperada para {product.producto} es {product.unidad}.") | |
| if product.stock_actual < payload["cantidad"]: | |
| raise ValueError(f"Stock insuficiente de {product.producto}.") | |
| product.stock_actual -= float(payload["cantidad"]) | |
| product.consumido_total += float(payload["cantidad"]) | |
| product.updated_at = now_iso() | |
| movement = Movement( | |
| id=str(uuid.uuid4()), | |
| product_id=product.id, | |
| producto=product.producto, | |
| tipo="consumo", | |
| cantidad=float(payload["cantidad"]), | |
| unidad=product.unidad, | |
| notas=payload.get("notas", ""), | |
| fuente=payload.get("fuente", "telegram-consumo"), | |
| created_at=product.updated_at, | |
| ) | |
| session.add(movement) | |
| await enqueue_full_sync(session) | |
| await session.commit() | |
| response = product_to_dict(product) | {"consumedNow": float(payload["cantidad"])} | |
| return response | |
| async def get_expiring_products(session: AsyncSession, days_ahead: int) -> list[dict]: | |
| products = await list_products(session) | |
| today = now_in_config_timezone().date() | |
| expiring = [] | |
| for item in products: | |
| if item["stockActual"] <= 0 or not item["fechaCaducidad"]: | |
| continue | |
| if item["fechaCaducidad"] == NO_EXPIRY_DATE: | |
| continue | |
| expiry = date.fromisoformat(item["fechaCaducidad"]) | |
| diff = (expiry - today).days | |
| if 0 <= diff <= days_ahead: | |
| expiring.append(item) | |
| return aggregate_product_records(expiring) | |
| async def list_products_grouped(session: AsyncSession, query: str = "") -> list[dict]: | |
| return aggregate_product_records(await list_products(session, query)) | |
| async def sync_from_sheets(session: AsyncSession) -> dict: | |
| if not sheets_sync_available(): | |
| return {"ok": False, "reason": "sheets-disabled"} | |
| products_response, movements_response = await sheets_backup.list_products(), await sheets_backup.list_movements() | |
| records = products_response.get("records", []) | |
| movements = movements_response.get("movements", []) | |
| await session.execute(delete(Product)) | |
| await session.execute(delete(Movement)) | |
| for item in records: | |
| session.add( | |
| Product( | |
| id=item["id"], | |
| producto=item["producto"], | |
| precio=float(item.get("precio", 0)), | |
| cantidad=float(item.get("cantidad", 0)), | |
| unidad=item.get("unidad", "unidad"), | |
| fecha_caducidad=item.get("fechaCaducidad", ""), | |
| fecha_ingreso=item.get("fechaIngreso", ""), | |
| fecha_produccion=item.get("fechaProduccion", ""), | |
| categoria=item.get("categoria", ""), | |
| caducidad_estimada=1 if item.get("caducidadEstimada", False) else 0, | |
| notas=item.get("notas", ""), | |
| fuente=item.get("fuente", "web"), | |
| stock_actual=float(item.get("stockActual", item.get("cantidad", 0))), | |
| consumido_total=float(item.get("consumidoTotal", 0)), | |
| created_at=item.get("createdAt", now_iso()), | |
| updated_at=item.get("updatedAt", item.get("createdAt", now_iso())), | |
| ) | |
| ) | |
| for item in movements: | |
| session.add( | |
| Movement( | |
| id=item["id"], | |
| product_id=item.get("productId", ""), | |
| producto=item["producto"], | |
| tipo=item["tipo"], | |
| cantidad=float(item.get("cantidad", 0)), | |
| unidad=item.get("unidad", "unidad"), | |
| notas=item.get("notas", ""), | |
| fuente=item.get("fuente", ""), | |
| created_at=item.get("createdAt", now_iso()), | |
| ) | |
| ) | |
| await session.commit() | |
| return {"products": len(records), "movements": len(movements)} | |
| async def sync_to_sheets(session: AsyncSession) -> dict: | |
| if not sheets_sync_available(): | |
| return {"ok": False, "reason": "sheets-disabled"} | |
| records = await list_products(session) | |
| movements = await list_movements_local(session) | |
| await sheets_backup.replace_snapshot(records, movements) | |
| return {"ok": True, "products": len(records), "movements": len(movements)} | |